Skip to main content

meerkat_runtime/composition/
route_table.rs

1//! Typed route index consumed by [`super::CatalogCompositionDispatcher`].
2//!
3//! The table is built once from a
4//! [`meerkat_machine_schema::CompositionSchema`] and then looked up by
5//! `(producer_instance, effect_variant)`. It mirrors the
6//! `route_to_input` function the codegen emits (B-4 + B-4b) but consumes
7//! the schema directly so the runtime doesn't need to depend on a
8//! generated file tree at wire-up time. Input-kind and signal-kind routes
9//! are indexed separately and consumed by their matching typed dispatcher
10//! surfaces.
11
12use std::collections::HashMap;
13use std::fmt;
14
15use meerkat_machine_schema::identity::{
16    EffectVariantId, FieldId, InputVariantId, MachineInstanceId, RouteId, SignalVariantId,
17};
18use meerkat_machine_schema::{
19    CompositionSchema, Route, RouteBindingSource, RouteTargetKind, RouteVariantId,
20};
21use thiserror::Error;
22
23/// Typed route descriptor returned by [`RouteTable::resolve`].
24///
25/// Always input-kind: `RouteTable::resolve` consults only the input
26/// index, so signal-kind routes never surface here. `bindings` pairs
27/// producer-side [`FieldId`]s with the consumer-side [`FieldId`]s they
28/// must populate, in declaration order. Literal / owner-provided
29/// bindings are filtered out at table build time — the dispatcher only
30/// handles producer-field bindings (the other kinds are supplied by
31/// the consumer surface or literal machinery upstream).
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct RoutedInputDescriptor {
34    pub route_id: RouteId,
35    pub instance_id: MachineInstanceId,
36    pub input_variant: InputVariantId,
37    pub bindings: Vec<(FieldId, FieldId)>,
38}
39
40/// Typed signal-route descriptor returned by [`RouteTable::resolve_signal`].
41///
42/// Always signal-kind: input routes are isolated on the input index and
43/// never surface through signal resolution. `bindings` has the same
44/// producer-field-to-consumer-field meaning as [`RoutedInputDescriptor`].
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct RoutedSignalDescriptor {
47    pub route_id: RouteId,
48    pub instance_id: MachineInstanceId,
49    pub signal_variant: SignalVariantId,
50    pub bindings: Vec<(FieldId, FieldId)>,
51}
52
53/// Errors surfaced when building a [`RouteTable`] from a schema.
54#[derive(Debug, Clone, PartialEq, Eq, Error)]
55pub enum RouteTableError {
56    /// Two routes declared for the same `(producer_instance, variant)` pair.
57    /// Wave-b V2 requires unique routing per producer variant; multi-target
58    /// fan-out is explicitly out of scope.
59    #[error(
60        "composition declares duplicate input route for producer {instance} variant {variant}: \
61         existing={existing_route}, duplicate={duplicate_route}"
62    )]
63    DuplicateInputRoute {
64        instance: MachineInstanceId,
65        variant: EffectVariantId,
66        existing_route: RouteId,
67        duplicate_route: RouteId,
68    },
69    /// Two signal routes declared for the same `(producer_instance,
70    /// variant)` pair. Signal routes are a typed dispatch surface too,
71    /// so duplicate declarations are rejected instead of last-writer
72    /// silently winning.
73    #[error(
74        "composition declares duplicate signal route for producer {instance} variant {variant}: \
75         existing={existing_route}, duplicate={duplicate_route}"
76    )]
77    DuplicateSignalRoute {
78        instance: MachineInstanceId,
79        variant: EffectVariantId,
80        existing_route: RouteId,
81        duplicate_route: RouteId,
82    },
83    /// An Input-kind route carried a Signal-typed variant id. Schema
84    /// validation normally rejects this at declaration time; the table
85    /// builder surfaces it as a typed error rather than panicking so
86    /// callers handling hand-assembled schemas see a deterministic
87    /// failure instead of a crash.
88    #[error("input-kind route {route} in composition has a signal-typed variant id `{variant}`")]
89    InputRouteCarriesSignalVariant { route: RouteId, variant: String },
90    /// A Signal-kind route carried an Input-typed variant id. Schema
91    /// validation normally rejects this at declaration time; the table
92    /// builder returns a typed error to keep malformed hand-assembled
93    /// schemas deterministic.
94    #[error("signal-kind route {route} in composition has an input-typed variant id `{variant}`")]
95    SignalRouteCarriesInputVariant { route: RouteId, variant: String },
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Hash)]
99struct InputKey {
100    instance: MachineInstanceId,
101    variant: EffectVariantId,
102}
103
104/// Typed route index.
105///
106/// Two indices: one for `Input`-kind routes (consulted by the dispatcher)
107/// and one for `Signal`-kind routes (reserved for the signal surface; the
108/// dispatcher refuses these with a typed error).
109#[derive(Clone)]
110pub struct RouteTable {
111    inputs: HashMap<InputKey, RoutedInputDescriptor>,
112    signals: HashMap<InputKey, RoutedSignalDescriptor>,
113}
114
115impl fmt::Debug for RouteTable {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("RouteTable")
118            .field("input_routes", &self.inputs.len())
119            .field("signal_routes", &self.signals.len())
120            .finish()
121    }
122}
123
124impl RouteTable {
125    /// Build a typed route table from a composition schema.
126    ///
127    /// Returns [`RouteTableError::DuplicateInputRoute`] if the schema
128    /// declares two input-kind routes for the same
129    /// `(producer_instance, effect_variant)` pair. Signal-kind routes are
130    /// indexed but never trigger duplicate-input collisions (they live on
131    /// a separate map).
132    pub fn from_schema(schema: &CompositionSchema) -> Result<Self, RouteTableError> {
133        let mut inputs: HashMap<InputKey, RoutedInputDescriptor> = HashMap::new();
134        let mut signals: HashMap<InputKey, RoutedSignalDescriptor> = HashMap::new();
135
136        for route in &schema.routes {
137            let key = InputKey {
138                instance: route.from_machine.clone(),
139                variant: route.effect_variant.clone(),
140            };
141
142            match route.to.kind {
143                RouteTargetKind::Input => {
144                    let descriptor = Self::build_input_descriptor(route)?;
145                    if let Some(existing) = inputs.get(&key) {
146                        return Err(RouteTableError::DuplicateInputRoute {
147                            instance: key.instance.clone(),
148                            variant: key.variant.clone(),
149                            existing_route: existing.route_id.clone(),
150                            duplicate_route: route.name.clone(),
151                        });
152                    }
153                    inputs.insert(key, descriptor);
154                }
155                RouteTargetKind::Signal => {
156                    let descriptor = Self::build_signal_descriptor(route)?;
157                    if let Some(existing) = signals.get(&key) {
158                        return Err(RouteTableError::DuplicateSignalRoute {
159                            instance: key.instance.clone(),
160                            variant: key.variant.clone(),
161                            existing_route: existing.route_id.clone(),
162                            duplicate_route: route.name.clone(),
163                        });
164                    }
165                    signals.insert(key, descriptor);
166                }
167            }
168        }
169
170        Ok(Self { inputs, signals })
171    }
172
173    fn build_input_descriptor(route: &Route) -> Result<RoutedInputDescriptor, RouteTableError> {
174        let input_variant = match &route.to.input_variant {
175            RouteVariantId::Input(id) => id.clone(),
176            RouteVariantId::Signal(id) => {
177                return Err(RouteTableError::InputRouteCarriesSignalVariant {
178                    route: route.name.clone(),
179                    variant: id.as_str().to_owned(),
180                });
181            }
182        };
183
184        Ok(RoutedInputDescriptor {
185            route_id: route.name.clone(),
186            instance_id: route.to.machine.clone(),
187            input_variant,
188            bindings: Self::field_bindings(route),
189        })
190    }
191
192    fn build_signal_descriptor(route: &Route) -> Result<RoutedSignalDescriptor, RouteTableError> {
193        let signal_variant = match &route.to.input_variant {
194            RouteVariantId::Signal(id) => id.clone(),
195            RouteVariantId::Input(id) => {
196                return Err(RouteTableError::SignalRouteCarriesInputVariant {
197                    route: route.name.clone(),
198                    variant: id.as_str().to_owned(),
199                });
200            }
201        };
202
203        Ok(RoutedSignalDescriptor {
204            route_id: route.name.clone(),
205            instance_id: route.to.machine.clone(),
206            signal_variant,
207            bindings: Self::field_bindings(route),
208        })
209    }
210
211    fn field_bindings(route: &Route) -> Vec<(FieldId, FieldId)> {
212        route
213            .bindings
214            .iter()
215            .filter_map(|binding| match &binding.source {
216                RouteBindingSource::Field { from_field, .. } => {
217                    Some((from_field.clone(), binding.to_field.clone()))
218                }
219                RouteBindingSource::Literal(_) | RouteBindingSource::OwnerProvided => None,
220            })
221            .collect()
222    }
223
224    /// Resolve a producer `(instance_id, effect_variant)` to the typed
225    /// input descriptor. Returns `None` if no input-kind route exists for
226    /// the pair — the caller then returns
227    /// [`super::DispatchRefusal::UnresolvedRoute`].
228    pub fn resolve(
229        &self,
230        instance_id: &MachineInstanceId,
231        effect_variant: &EffectVariantId,
232    ) -> Option<&RoutedInputDescriptor> {
233        self.inputs.get(&InputKey {
234            instance: instance_id.clone(),
235            variant: effect_variant.clone(),
236        })
237    }
238
239    /// Resolve a producer `(instance_id, effect_variant)` to the typed
240    /// signal descriptor. Returns `None` if no signal-kind route exists
241    /// for the pair.
242    pub fn resolve_signal(
243        &self,
244        instance_id: &MachineInstanceId,
245        effect_variant: &EffectVariantId,
246    ) -> Option<&RoutedSignalDescriptor> {
247        self.signals.get(&InputKey {
248            instance: instance_id.clone(),
249            variant: effect_variant.clone(),
250        })
251    }
252
253    /// Count of input-kind routes. Primarily for diagnostics.
254    pub fn len(&self) -> usize {
255        self.inputs.len()
256    }
257
258    /// `true` when no input-kind routes are declared.
259    pub fn is_empty(&self) -> bool {
260        self.inputs.is_empty()
261    }
262
263    /// Number of signal-kind routes this table is aware of.
264    pub fn signal_route_count(&self) -> usize {
265        self.signals.len()
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
273
274    #[test]
275    fn builds_from_seam_schema_with_expected_routes() {
276        let schema = meerkat_mob_seam_composition();
277        let table = RouteTable::from_schema(&schema).unwrap();
278
279        // The live schema declares 4 Input-kind and 3 Signal-kind routes.
280        assert_eq!(table.len(), 4);
281        assert_eq!(table.signal_route_count(), 3);
282    }
283
284    #[test]
285    fn resolves_request_runtime_binding_to_prepare_bindings() {
286        let schema = meerkat_mob_seam_composition();
287        let table = RouteTable::from_schema(&schema).unwrap();
288
289        let mob = MachineInstanceId::parse("mob").unwrap();
290        let variant = EffectVariantId::parse("RequestRuntimeBinding").unwrap();
291        let descriptor = table.resolve(&mob, &variant).expect("known route");
292
293        assert_eq!(
294            descriptor.route_id.as_str(),
295            "binding_request_reaches_meerkat"
296        );
297        assert_eq!(descriptor.instance_id.as_str(), "meerkat");
298        assert_eq!(descriptor.input_variant.as_str(), "PrepareBindings");
299        let field_pairs: Vec<(&str, &str)> = descriptor
300            .bindings
301            .iter()
302            .map(|(from, to)| (from.as_str(), to.as_str()))
303            .collect();
304        assert_eq!(
305            field_pairs,
306            vec![
307                ("agent_runtime_id", "agent_runtime_id"),
308                ("fence_token", "fence_token"),
309                ("generation", "generation"),
310                ("session_id", "session_id"),
311            ]
312        );
313    }
314
315    #[test]
316    fn signal_routes_are_isolated_from_input_lookup() {
317        let schema = meerkat_mob_seam_composition();
318        let table = RouteTable::from_schema(&schema).unwrap();
319
320        let meerkat = MachineInstanceId::parse("meerkat").unwrap();
321        let runtime_bound = EffectVariantId::parse("RuntimeBound").unwrap();
322
323        // `RuntimeBound` is a Signal-kind route; it must NOT surface in
324        // the input-lookup path.
325        assert!(table.resolve(&meerkat, &runtime_bound).is_none());
326    }
327
328    #[test]
329    fn work_request_projects_agent_runtime_id_to_runtime_id() {
330        let schema = meerkat_mob_seam_composition();
331        let table = RouteTable::from_schema(&schema).unwrap();
332
333        let mob = MachineInstanceId::parse("mob").unwrap();
334        let variant = EffectVariantId::parse("RequestRuntimeIngress").unwrap();
335        let descriptor = table.resolve(&mob, &variant).expect("known route");
336
337        assert_eq!(descriptor.route_id.as_str(), "work_request_reaches_meerkat");
338        assert_eq!(descriptor.input_variant.as_str(), "Ingest");
339        let field_pairs: Vec<(&str, &str)> = descriptor
340            .bindings
341            .iter()
342            .map(|(from, to)| (from.as_str(), to.as_str()))
343            .collect();
344        assert_eq!(
345            field_pairs,
346            vec![
347                ("agent_runtime_id", "runtime_id"),
348                ("work_id", "work_id"),
349                ("origin", "origin"),
350            ]
351        );
352    }
353
354    #[test]
355    fn resolves_runtime_bound_signal_binding_to_mob_signal() {
356        let schema = meerkat_mob_seam_composition();
357        let table = RouteTable::from_schema(&schema).unwrap();
358
359        let meerkat = MachineInstanceId::parse("meerkat").unwrap();
360        let runtime_bound = EffectVariantId::parse("RuntimeBound").unwrap();
361        let descriptor = table
362            .resolve_signal(&meerkat, &runtime_bound)
363            .expect("known signal route");
364
365        assert_eq!(descriptor.route_id.as_str(), "runtime_bound_reaches_mob");
366        assert_eq!(descriptor.instance_id.as_str(), "mob");
367        assert_eq!(descriptor.signal_variant.as_str(), "ObserveRuntimeReady");
368        let field_pairs: Vec<(&str, &str)> = descriptor
369            .bindings
370            .iter()
371            .map(|(from, to)| (from.as_str(), to.as_str()))
372            .collect();
373        assert_eq!(
374            field_pairs,
375            vec![
376                ("agent_runtime_id", "agent_runtime_id"),
377                ("fence_token", "fence_token"),
378            ]
379        );
380    }
381
382    #[test]
383    fn resolve_returns_none_for_unknown_variant() {
384        let schema = meerkat_mob_seam_composition();
385        let table = RouteTable::from_schema(&schema).unwrap();
386
387        let mob = MachineInstanceId::parse("mob").unwrap();
388        let variant = EffectVariantId::parse("NoSuchVariant").unwrap();
389        assert!(table.resolve(&mob, &variant).is_none());
390    }
391}