meerkat_runtime/composition/
route_table.rs1use std::collections::HashMap;
13use std::fmt;
14
15use meerkat_machine_schema::identity::{
16 EffectVariantId, FieldId, InputVariantId, MachineInstanceId, RouteId, SignalVariantId,
17};
18use meerkat_machine_schema::{
19 CompositionSchema, Route, RouteBindingSource, RouteTargetKind, RouteVariantId,
20};
21use thiserror::Error;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct RoutedInputDescriptor {
34 pub route_id: RouteId,
35 pub instance_id: MachineInstanceId,
36 pub input_variant: InputVariantId,
37 pub bindings: Vec<(FieldId, FieldId)>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct RoutedSignalDescriptor {
47 pub route_id: RouteId,
48 pub instance_id: MachineInstanceId,
49 pub signal_variant: SignalVariantId,
50 pub bindings: Vec<(FieldId, FieldId)>,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Error)]
55pub enum RouteTableError {
56 #[error(
60 "composition declares duplicate input route for producer {instance} variant {variant}: \
61 existing={existing_route}, duplicate={duplicate_route}"
62 )]
63 DuplicateInputRoute {
64 instance: MachineInstanceId,
65 variant: EffectVariantId,
66 existing_route: RouteId,
67 duplicate_route: RouteId,
68 },
69 #[error(
74 "composition declares duplicate signal route for producer {instance} variant {variant}: \
75 existing={existing_route}, duplicate={duplicate_route}"
76 )]
77 DuplicateSignalRoute {
78 instance: MachineInstanceId,
79 variant: EffectVariantId,
80 existing_route: RouteId,
81 duplicate_route: RouteId,
82 },
83 #[error("input-kind route {route} in composition has a signal-typed variant id `{variant}`")]
89 InputRouteCarriesSignalVariant { route: RouteId, variant: String },
90 #[error("signal-kind route {route} in composition has an input-typed variant id `{variant}`")]
95 SignalRouteCarriesInputVariant { route: RouteId, variant: String },
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Hash)]
99struct InputKey {
100 instance: MachineInstanceId,
101 variant: EffectVariantId,
102}
103
104#[derive(Clone)]
110pub struct RouteTable {
111 inputs: HashMap<InputKey, RoutedInputDescriptor>,
112 signals: HashMap<InputKey, RoutedSignalDescriptor>,
113}
114
115impl fmt::Debug for RouteTable {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("RouteTable")
118 .field("input_routes", &self.inputs.len())
119 .field("signal_routes", &self.signals.len())
120 .finish()
121 }
122}
123
124impl RouteTable {
125 pub fn from_schema(schema: &CompositionSchema) -> Result<Self, RouteTableError> {
133 let mut inputs: HashMap<InputKey, RoutedInputDescriptor> = HashMap::new();
134 let mut signals: HashMap<InputKey, RoutedSignalDescriptor> = HashMap::new();
135
136 for route in &schema.routes {
137 let key = InputKey {
138 instance: route.from_machine.clone(),
139 variant: route.effect_variant.clone(),
140 };
141
142 match route.to.kind {
143 RouteTargetKind::Input => {
144 let descriptor = Self::build_input_descriptor(route)?;
145 if let Some(existing) = inputs.get(&key) {
146 return Err(RouteTableError::DuplicateInputRoute {
147 instance: key.instance.clone(),
148 variant: key.variant.clone(),
149 existing_route: existing.route_id.clone(),
150 duplicate_route: route.name.clone(),
151 });
152 }
153 inputs.insert(key, descriptor);
154 }
155 RouteTargetKind::Signal => {
156 let descriptor = Self::build_signal_descriptor(route)?;
157 if let Some(existing) = signals.get(&key) {
158 return Err(RouteTableError::DuplicateSignalRoute {
159 instance: key.instance.clone(),
160 variant: key.variant.clone(),
161 existing_route: existing.route_id.clone(),
162 duplicate_route: route.name.clone(),
163 });
164 }
165 signals.insert(key, descriptor);
166 }
167 }
168 }
169
170 Ok(Self { inputs, signals })
171 }
172
173 fn build_input_descriptor(route: &Route) -> Result<RoutedInputDescriptor, RouteTableError> {
174 let input_variant = match &route.to.input_variant {
175 RouteVariantId::Input(id) => id.clone(),
176 RouteVariantId::Signal(id) => {
177 return Err(RouteTableError::InputRouteCarriesSignalVariant {
178 route: route.name.clone(),
179 variant: id.as_str().to_owned(),
180 });
181 }
182 };
183
184 Ok(RoutedInputDescriptor {
185 route_id: route.name.clone(),
186 instance_id: route.to.machine.clone(),
187 input_variant,
188 bindings: Self::field_bindings(route),
189 })
190 }
191
192 fn build_signal_descriptor(route: &Route) -> Result<RoutedSignalDescriptor, RouteTableError> {
193 let signal_variant = match &route.to.input_variant {
194 RouteVariantId::Signal(id) => id.clone(),
195 RouteVariantId::Input(id) => {
196 return Err(RouteTableError::SignalRouteCarriesInputVariant {
197 route: route.name.clone(),
198 variant: id.as_str().to_owned(),
199 });
200 }
201 };
202
203 Ok(RoutedSignalDescriptor {
204 route_id: route.name.clone(),
205 instance_id: route.to.machine.clone(),
206 signal_variant,
207 bindings: Self::field_bindings(route),
208 })
209 }
210
211 fn field_bindings(route: &Route) -> Vec<(FieldId, FieldId)> {
212 route
213 .bindings
214 .iter()
215 .filter_map(|binding| match &binding.source {
216 RouteBindingSource::Field { from_field, .. } => {
217 Some((from_field.clone(), binding.to_field.clone()))
218 }
219 RouteBindingSource::Literal(_) | RouteBindingSource::OwnerProvided => None,
220 })
221 .collect()
222 }
223
224 pub fn resolve(
229 &self,
230 instance_id: &MachineInstanceId,
231 effect_variant: &EffectVariantId,
232 ) -> Option<&RoutedInputDescriptor> {
233 self.inputs.get(&InputKey {
234 instance: instance_id.clone(),
235 variant: effect_variant.clone(),
236 })
237 }
238
239 pub fn resolve_signal(
243 &self,
244 instance_id: &MachineInstanceId,
245 effect_variant: &EffectVariantId,
246 ) -> Option<&RoutedSignalDescriptor> {
247 self.signals.get(&InputKey {
248 instance: instance_id.clone(),
249 variant: effect_variant.clone(),
250 })
251 }
252
253 pub fn len(&self) -> usize {
255 self.inputs.len()
256 }
257
258 pub fn is_empty(&self) -> bool {
260 self.inputs.is_empty()
261 }
262
263 pub fn signal_route_count(&self) -> usize {
265 self.signals.len()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
273
274 #[test]
275 fn builds_from_seam_schema_with_expected_routes() {
276 let schema = meerkat_mob_seam_composition();
277 let table = RouteTable::from_schema(&schema).unwrap();
278
279 assert_eq!(table.len(), 4);
281 assert_eq!(table.signal_route_count(), 3);
282 }
283
284 #[test]
285 fn resolves_request_runtime_binding_to_prepare_bindings() {
286 let schema = meerkat_mob_seam_composition();
287 let table = RouteTable::from_schema(&schema).unwrap();
288
289 let mob = MachineInstanceId::parse("mob").unwrap();
290 let variant = EffectVariantId::parse("RequestRuntimeBinding").unwrap();
291 let descriptor = table.resolve(&mob, &variant).expect("known route");
292
293 assert_eq!(
294 descriptor.route_id.as_str(),
295 "binding_request_reaches_meerkat"
296 );
297 assert_eq!(descriptor.instance_id.as_str(), "meerkat");
298 assert_eq!(descriptor.input_variant.as_str(), "PrepareBindings");
299 let field_pairs: Vec<(&str, &str)> = descriptor
300 .bindings
301 .iter()
302 .map(|(from, to)| (from.as_str(), to.as_str()))
303 .collect();
304 assert_eq!(
305 field_pairs,
306 vec![
307 ("agent_runtime_id", "agent_runtime_id"),
308 ("fence_token", "fence_token"),
309 ("generation", "generation"),
310 ("session_id", "session_id"),
311 ]
312 );
313 }
314
315 #[test]
316 fn signal_routes_are_isolated_from_input_lookup() {
317 let schema = meerkat_mob_seam_composition();
318 let table = RouteTable::from_schema(&schema).unwrap();
319
320 let meerkat = MachineInstanceId::parse("meerkat").unwrap();
321 let runtime_bound = EffectVariantId::parse("RuntimeBound").unwrap();
322
323 assert!(table.resolve(&meerkat, &runtime_bound).is_none());
326 }
327
328 #[test]
329 fn work_request_projects_agent_runtime_id_to_runtime_id() {
330 let schema = meerkat_mob_seam_composition();
331 let table = RouteTable::from_schema(&schema).unwrap();
332
333 let mob = MachineInstanceId::parse("mob").unwrap();
334 let variant = EffectVariantId::parse("RequestRuntimeIngress").unwrap();
335 let descriptor = table.resolve(&mob, &variant).expect("known route");
336
337 assert_eq!(descriptor.route_id.as_str(), "work_request_reaches_meerkat");
338 assert_eq!(descriptor.input_variant.as_str(), "Ingest");
339 let field_pairs: Vec<(&str, &str)> = descriptor
340 .bindings
341 .iter()
342 .map(|(from, to)| (from.as_str(), to.as_str()))
343 .collect();
344 assert_eq!(
345 field_pairs,
346 vec![
347 ("agent_runtime_id", "runtime_id"),
348 ("work_id", "work_id"),
349 ("origin", "origin"),
350 ]
351 );
352 }
353
354 #[test]
355 fn resolves_runtime_bound_signal_binding_to_mob_signal() {
356 let schema = meerkat_mob_seam_composition();
357 let table = RouteTable::from_schema(&schema).unwrap();
358
359 let meerkat = MachineInstanceId::parse("meerkat").unwrap();
360 let runtime_bound = EffectVariantId::parse("RuntimeBound").unwrap();
361 let descriptor = table
362 .resolve_signal(&meerkat, &runtime_bound)
363 .expect("known signal route");
364
365 assert_eq!(descriptor.route_id.as_str(), "runtime_bound_reaches_mob");
366 assert_eq!(descriptor.instance_id.as_str(), "mob");
367 assert_eq!(descriptor.signal_variant.as_str(), "ObserveRuntimeReady");
368 let field_pairs: Vec<(&str, &str)> = descriptor
369 .bindings
370 .iter()
371 .map(|(from, to)| (from.as_str(), to.as_str()))
372 .collect();
373 assert_eq!(
374 field_pairs,
375 vec![
376 ("agent_runtime_id", "agent_runtime_id"),
377 ("fence_token", "fence_token"),
378 ]
379 );
380 }
381
382 #[test]
383 fn resolve_returns_none_for_unknown_variant() {
384 let schema = meerkat_mob_seam_composition();
385 let table = RouteTable::from_schema(&schema).unwrap();
386
387 let mob = MachineInstanceId::parse("mob").unwrap();
388 let variant = EffectVariantId::parse("NoSuchVariant").unwrap();
389 assert!(table.resolve(&mob, &variant).is_none());
390 }
391}