use std::collections::HashMap;
use std::fmt;
use meerkat_machine_schema::identity::{
EffectVariantId, FieldId, InputVariantId, MachineInstanceId, RouteId, SignalVariantId,
};
use meerkat_machine_schema::{
CompositionSchema, Route, RouteBindingSource, RouteTargetKind, RouteVariantId,
};
use thiserror::Error;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutedInputDescriptor {
pub route_id: RouteId,
pub instance_id: MachineInstanceId,
pub input_variant: InputVariantId,
pub bindings: Vec<(FieldId, FieldId)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutedSignalDescriptor {
pub route_id: RouteId,
pub instance_id: MachineInstanceId,
pub signal_variant: SignalVariantId,
pub bindings: Vec<(FieldId, FieldId)>,
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum RouteTableError {
#[error(
"composition declares duplicate input route for producer {instance} variant {variant}: \
existing={existing_route}, duplicate={duplicate_route}"
)]
DuplicateInputRoute {
instance: MachineInstanceId,
variant: EffectVariantId,
existing_route: RouteId,
duplicate_route: RouteId,
},
#[error(
"composition declares duplicate signal route for producer {instance} variant {variant}: \
existing={existing_route}, duplicate={duplicate_route}"
)]
DuplicateSignalRoute {
instance: MachineInstanceId,
variant: EffectVariantId,
existing_route: RouteId,
duplicate_route: RouteId,
},
#[error("input-kind route {route} in composition has a signal-typed variant id `{variant}`")]
InputRouteCarriesSignalVariant { route: RouteId, variant: String },
#[error("signal-kind route {route} in composition has an input-typed variant id `{variant}`")]
SignalRouteCarriesInputVariant { route: RouteId, variant: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct InputKey {
instance: MachineInstanceId,
variant: EffectVariantId,
}
#[derive(Clone)]
pub struct RouteTable {
inputs: HashMap<InputKey, RoutedInputDescriptor>,
signals: HashMap<InputKey, RoutedSignalDescriptor>,
}
impl fmt::Debug for RouteTable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RouteTable")
.field("input_routes", &self.inputs.len())
.field("signal_routes", &self.signals.len())
.finish()
}
}
impl RouteTable {
pub fn from_schema(schema: &CompositionSchema) -> Result<Self, RouteTableError> {
let mut inputs: HashMap<InputKey, RoutedInputDescriptor> = HashMap::new();
let mut signals: HashMap<InputKey, RoutedSignalDescriptor> = HashMap::new();
for route in &schema.routes {
let key = InputKey {
instance: route.from_machine.clone(),
variant: route.effect_variant.clone(),
};
match route.to.kind {
RouteTargetKind::Input => {
let descriptor = Self::build_input_descriptor(route)?;
if let Some(existing) = inputs.get(&key) {
return Err(RouteTableError::DuplicateInputRoute {
instance: key.instance.clone(),
variant: key.variant.clone(),
existing_route: existing.route_id.clone(),
duplicate_route: route.name.clone(),
});
}
inputs.insert(key, descriptor);
}
RouteTargetKind::Signal => {
let descriptor = Self::build_signal_descriptor(route)?;
if let Some(existing) = signals.get(&key) {
return Err(RouteTableError::DuplicateSignalRoute {
instance: key.instance.clone(),
variant: key.variant.clone(),
existing_route: existing.route_id.clone(),
duplicate_route: route.name.clone(),
});
}
signals.insert(key, descriptor);
}
}
}
Ok(Self { inputs, signals })
}
fn build_input_descriptor(route: &Route) -> Result<RoutedInputDescriptor, RouteTableError> {
let input_variant = match &route.to.input_variant {
RouteVariantId::Input(id) => id.clone(),
RouteVariantId::Signal(id) => {
return Err(RouteTableError::InputRouteCarriesSignalVariant {
route: route.name.clone(),
variant: id.as_str().to_owned(),
});
}
};
Ok(RoutedInputDescriptor {
route_id: route.name.clone(),
instance_id: route.to.machine.clone(),
input_variant,
bindings: Self::field_bindings(route),
})
}
fn build_signal_descriptor(route: &Route) -> Result<RoutedSignalDescriptor, RouteTableError> {
let signal_variant = match &route.to.input_variant {
RouteVariantId::Signal(id) => id.clone(),
RouteVariantId::Input(id) => {
return Err(RouteTableError::SignalRouteCarriesInputVariant {
route: route.name.clone(),
variant: id.as_str().to_owned(),
});
}
};
Ok(RoutedSignalDescriptor {
route_id: route.name.clone(),
instance_id: route.to.machine.clone(),
signal_variant,
bindings: Self::field_bindings(route),
})
}
fn field_bindings(route: &Route) -> Vec<(FieldId, FieldId)> {
route
.bindings
.iter()
.filter_map(|binding| match &binding.source {
RouteBindingSource::Field { from_field, .. } => {
Some((from_field.clone(), binding.to_field.clone()))
}
RouteBindingSource::Literal(_) | RouteBindingSource::OwnerProvided => None,
})
.collect()
}
pub fn resolve(
&self,
instance_id: &MachineInstanceId,
effect_variant: &EffectVariantId,
) -> Option<&RoutedInputDescriptor> {
self.inputs.get(&InputKey {
instance: instance_id.clone(),
variant: effect_variant.clone(),
})
}
pub fn resolve_signal(
&self,
instance_id: &MachineInstanceId,
effect_variant: &EffectVariantId,
) -> Option<&RoutedSignalDescriptor> {
self.signals.get(&InputKey {
instance: instance_id.clone(),
variant: effect_variant.clone(),
})
}
pub fn len(&self) -> usize {
self.inputs.len()
}
pub fn is_empty(&self) -> bool {
self.inputs.is_empty()
}
pub fn signal_route_count(&self) -> usize {
self.signals.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
#[test]
fn builds_from_seam_schema_with_expected_routes() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
assert_eq!(table.len(), 4);
assert_eq!(table.signal_route_count(), 3);
}
#[test]
fn resolves_request_runtime_binding_to_prepare_bindings() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let mob = MachineInstanceId::parse("mob").unwrap();
let variant = EffectVariantId::parse("RequestRuntimeBinding").unwrap();
let descriptor = table.resolve(&mob, &variant).expect("known route");
assert_eq!(
descriptor.route_id.as_str(),
"binding_request_reaches_meerkat"
);
assert_eq!(descriptor.instance_id.as_str(), "meerkat");
assert_eq!(descriptor.input_variant.as_str(), "PrepareBindings");
let field_pairs: Vec<(&str, &str)> = descriptor
.bindings
.iter()
.map(|(from, to)| (from.as_str(), to.as_str()))
.collect();
assert_eq!(
field_pairs,
vec![
("agent_runtime_id", "agent_runtime_id"),
("fence_token", "fence_token"),
("generation", "generation"),
("session_id", "session_id"),
]
);
}
#[test]
fn signal_routes_are_isolated_from_input_lookup() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let meerkat = MachineInstanceId::parse("meerkat").unwrap();
let runtime_bound = EffectVariantId::parse("RuntimeBound").unwrap();
assert!(table.resolve(&meerkat, &runtime_bound).is_none());
}
#[test]
fn work_request_projects_agent_runtime_id_to_runtime_id() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let mob = MachineInstanceId::parse("mob").unwrap();
let variant = EffectVariantId::parse("RequestRuntimeIngress").unwrap();
let descriptor = table.resolve(&mob, &variant).expect("known route");
assert_eq!(descriptor.route_id.as_str(), "work_request_reaches_meerkat");
assert_eq!(descriptor.input_variant.as_str(), "Ingest");
let field_pairs: Vec<(&str, &str)> = descriptor
.bindings
.iter()
.map(|(from, to)| (from.as_str(), to.as_str()))
.collect();
assert_eq!(
field_pairs,
vec![
("agent_runtime_id", "runtime_id"),
("fence_token", "fence_token"),
("generation", "generation"),
("session_id", "session_id"),
("work_id", "work_id"),
("origin", "origin"),
]
);
}
#[test]
fn resolves_runtime_bound_signal_binding_to_mob_signal() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let meerkat = MachineInstanceId::parse("meerkat").unwrap();
let runtime_bound = EffectVariantId::parse("RuntimeBound").unwrap();
let descriptor = table
.resolve_signal(&meerkat, &runtime_bound)
.expect("known signal route");
assert_eq!(descriptor.route_id.as_str(), "runtime_bound_reaches_mob");
assert_eq!(descriptor.instance_id.as_str(), "mob");
assert_eq!(descriptor.signal_variant.as_str(), "ObserveRuntimeReady");
let field_pairs: Vec<(&str, &str)> = descriptor
.bindings
.iter()
.map(|(from, to)| (from.as_str(), to.as_str()))
.collect();
assert_eq!(
field_pairs,
vec![
("agent_runtime_id", "agent_runtime_id"),
("fence_token", "fence_token"),
]
);
}
#[test]
fn resolve_returns_none_for_unknown_variant() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let mob = MachineInstanceId::parse("mob").unwrap();
let variant = EffectVariantId::parse("NoSuchVariant").unwrap();
assert!(table.resolve(&mob, &variant).is_none());
}
}