pub mod route_table;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use async_trait::async_trait;
use meerkat_machine_schema::identity::{
CompositionId, EffectVariantId, FieldId, InputVariantId, MachineId, MachineInstanceId, RouteId,
SignalVariantId,
};
use thiserror::Error;
pub use route_table::{RouteTable, RouteTableError, RoutedInputDescriptor, RoutedSignalDescriptor};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProducerInstance {
pub composition: CompositionId,
pub instance_id: MachineInstanceId,
pub machine: MachineId,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EffectPayload<E> {
Emitted {
variant: EffectVariantId,
body: E,
},
}
impl<E: ProducerEffect> EffectPayload<E> {
pub fn variant(&self) -> &EffectVariantId {
match self {
Self::Emitted { variant, .. } => variant,
}
}
pub fn body(&self) -> &E {
match self {
Self::Emitted { body, .. } => body,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SignalPayload<S> {
Emitted {
variant: EffectVariantId,
body: S,
},
}
impl<S: ProducerSignal> SignalPayload<S> {
pub fn variant(&self) -> &EffectVariantId {
match self {
Self::Emitted { variant, .. } => variant,
}
}
pub fn body(&self) -> &S {
match self {
Self::Emitted { body, .. } => body,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RouteKey {
pub composition: CompositionId,
pub route_id: RouteId,
}
pub trait ProducerEffect: fmt::Debug + Send + Sync + 'static {
fn variant_id(&self) -> EffectVariantId;
fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
}
pub trait ProducerSignal: fmt::Debug + Send + Sync + 'static {
fn variant_id(&self) -> EffectVariantId;
fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
}
#[derive(Debug, Clone)]
pub enum FieldValue<'a> {
Str(&'a str),
U64(u64),
I64(i64),
Bool(bool),
Opaque(Arc<dyn std::any::Any + Send + Sync>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DispatchOutcome {
pub route: RouteKey,
pub consumer: MachineInstanceId,
pub applied_input: InputVariantId,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SignalDispatchOutcome {
pub route: RouteKey,
pub consumer: MachineInstanceId,
pub applied_signal: SignalVariantId,
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum DispatchRefusal {
#[error("dispatcher composition {expected} does not match producer composition {actual}")]
CompositionMismatch {
expected: CompositionId,
actual: CompositionId,
},
#[error(
"no input route declared for producer {instance} effect variant {variant} in composition {composition}"
)]
UnresolvedRoute {
composition: CompositionId,
instance: MachineInstanceId,
variant: EffectVariantId,
},
#[error("route {route} requires producer field {field} on variant {variant}, not provided")]
MissingProducerField {
route: RouteId,
variant: EffectVariantId,
field: FieldId,
},
#[error(
"no consumer surface registered for target instance {instance} in composition {composition}"
)]
UnwiredConsumer {
composition: CompositionId,
instance: MachineInstanceId,
},
#[error("consumer {instance} refused input {variant}: {error}")]
ConsumerRefused {
instance: MachineInstanceId,
variant: InputVariantId,
error: ConsumerError,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum SignalDispatchRefusal {
#[error("dispatcher composition {expected} does not match producer composition {actual}")]
CompositionMismatch {
expected: CompositionId,
actual: CompositionId,
},
#[error(
"no signal route declared for producer {instance} variant {variant} in composition {composition}"
)]
UnresolvedRoute {
composition: CompositionId,
instance: MachineInstanceId,
variant: EffectVariantId,
},
#[error("route {route} requires producer field {field} on variant {variant}, not provided")]
MissingProducerField {
route: RouteId,
variant: EffectVariantId,
field: FieldId,
},
#[error(
"no signal consumer surface registered for target instance {instance} in composition {composition}"
)]
UnwiredConsumer {
composition: CompositionId,
instance: MachineInstanceId,
},
#[error("consumer {instance} refused signal {variant}: {error}")]
ConsumerRefused {
instance: MachineInstanceId,
variant: SignalVariantId,
error: ConsumerError,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
#[error("{message} [{error_code}]")]
pub struct ConsumerError {
error_code: &'static str,
message: String,
}
impl ConsumerError {
pub fn new(error_code: &'static str, message: impl Into<String>) -> Self {
Self {
error_code,
message: message.into(),
}
}
pub fn error_code(&self) -> &'static str {
self.error_code
}
pub fn message(&self) -> &str {
&self.message
}
}
impl From<String> for ConsumerError {
fn from(message: String) -> Self {
Self::new("consumer_projection_failed", message)
}
}
#[async_trait]
pub trait ConsumerSurface: Send + Sync {
fn instance_id(&self) -> &MachineInstanceId;
async fn apply_routed_input(
&self,
variant: InputVariantId,
projected_fields: Vec<(FieldId, OwnedFieldValue)>,
) -> Result<(), ConsumerError>;
}
#[async_trait]
pub trait SignalConsumerSurface: Send + Sync {
fn instance_id(&self) -> &MachineInstanceId;
async fn receive_signal(
&self,
variant: SignalVariantId,
projected_fields: Vec<(FieldId, OwnedFieldValue)>,
) -> Result<(), ConsumerError>;
}
#[derive(Debug, Clone)]
pub enum OwnedFieldValue {
Str(String),
U64(u64),
I64(i64),
Bool(bool),
Opaque(Arc<dyn std::any::Any + Send + Sync>),
}
impl FieldValue<'_> {
pub fn to_owned_value(&self) -> OwnedFieldValue {
match self {
FieldValue::Str(s) => OwnedFieldValue::Str((*s).to_owned()),
FieldValue::U64(v) => OwnedFieldValue::U64(*v),
FieldValue::I64(v) => OwnedFieldValue::I64(*v),
FieldValue::Bool(v) => OwnedFieldValue::Bool(*v),
FieldValue::Opaque(handle) => OwnedFieldValue::Opaque(Arc::clone(handle)),
}
}
}
#[async_trait]
pub trait CompositionDispatcher: Send + Sync {
type Effect: ProducerEffect;
fn composition(&self) -> &CompositionId;
async fn dispatch(
&self,
producer: ProducerInstance,
effect: EffectPayload<Self::Effect>,
) -> Result<DispatchOutcome, DispatchRefusal>;
}
#[async_trait]
pub trait CompositionSignalDispatcher: Send + Sync {
type Signal: ProducerSignal;
fn composition(&self) -> &CompositionId;
async fn dispatch_signal(
&self,
producer: ProducerInstance,
signal: SignalPayload<Self::Signal>,
) -> Result<SignalDispatchOutcome, SignalDispatchRefusal>;
}
pub trait ContextProvider<E: ProducerEffect>: Send + Sync {
fn provide_context(
&self,
producer: &ProducerInstance,
effect: &EffectPayload<E>,
) -> Vec<(FieldId, OwnedFieldValue)>;
}
pub enum CompositionBinding<E: ProducerEffect> {
Standalone,
Wired(Arc<dyn CompositionDispatcher<Effect = E>>),
OwnerProvided {
dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
context: Arc<dyn ContextProvider<E>>,
},
}
impl<E: ProducerEffect> fmt::Debug for CompositionBinding<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Standalone => f.debug_struct("CompositionBinding::Standalone").finish(),
Self::Wired(_) => f
.debug_struct("CompositionBinding::Wired")
.field("dispatcher", &"<dyn CompositionDispatcher>")
.finish(),
Self::OwnerProvided { .. } => f
.debug_struct("CompositionBinding::OwnerProvided")
.field("dispatcher", &"<dyn CompositionDispatcher>")
.field("context", &"<dyn ContextProvider>")
.finish(),
}
}
}
impl<E: ProducerEffect> CompositionBinding<E> {
pub fn standalone() -> Self {
Self::Standalone
}
pub fn wired_with(dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>) -> Self {
Self::Wired(dispatcher)
}
pub fn owner_provided(
dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
context: Arc<dyn ContextProvider<E>>,
) -> Self {
Self::OwnerProvided {
dispatcher,
context,
}
}
pub fn is_standalone(&self) -> bool {
matches!(self, Self::Standalone)
}
pub fn wired(&self) -> Option<&Arc<dyn CompositionDispatcher<Effect = E>>> {
match self {
Self::Standalone => None,
Self::Wired(d) => Some(d),
Self::OwnerProvided { dispatcher, .. } => Some(dispatcher),
}
}
pub fn context_provider(&self) -> Option<&Arc<dyn ContextProvider<E>>> {
match self {
Self::Standalone | Self::Wired(_) => None,
Self::OwnerProvided { context, .. } => Some(context),
}
}
}
pub struct CatalogCompositionDispatcher<E: ProducerEffect> {
composition: CompositionId,
table: RouteTable,
consumers: HashMap<MachineInstanceId, Arc<dyn ConsumerSurface>>,
_effect: std::marker::PhantomData<fn(E)>,
}
pub struct CatalogCompositionSignalDispatcher<S: ProducerSignal> {
composition: CompositionId,
table: RouteTable,
consumers: HashMap<MachineInstanceId, Arc<dyn SignalConsumerSurface>>,
_signal: std::marker::PhantomData<fn(S)>,
}
impl<S: ProducerSignal> fmt::Debug for CatalogCompositionSignalDispatcher<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CatalogCompositionSignalDispatcher")
.field("composition", &self.composition)
.field("signal_routes", &self.table.signal_route_count())
.field("consumers", &self.consumers.len())
.finish()
}
}
impl<E: ProducerEffect> fmt::Debug for CatalogCompositionDispatcher<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CatalogCompositionDispatcher")
.field("composition", &self.composition)
.field("routes", &self.table.len())
.field("consumers", &self.consumers.len())
.finish()
}
}
impl<E: ProducerEffect> CatalogCompositionDispatcher<E> {
pub fn new(composition: CompositionId, table: RouteTable) -> Self {
Self {
composition,
table,
consumers: HashMap::new(),
_effect: std::marker::PhantomData,
}
}
pub fn with_consumer(mut self, surface: Arc<dyn ConsumerSurface>) -> Self {
self.consumers
.insert(surface.instance_id().clone(), surface);
self
}
}
impl<S: ProducerSignal> CatalogCompositionSignalDispatcher<S> {
pub fn new(composition: CompositionId, table: RouteTable) -> Self {
Self {
composition,
table,
consumers: HashMap::new(),
_signal: std::marker::PhantomData,
}
}
pub fn with_consumer(mut self, surface: Arc<dyn SignalConsumerSurface>) -> Self {
self.consumers
.insert(surface.instance_id().clone(), surface);
self
}
}
#[async_trait]
impl<E: ProducerEffect> CompositionDispatcher for CatalogCompositionDispatcher<E> {
type Effect = E;
fn composition(&self) -> &CompositionId {
&self.composition
}
async fn dispatch(
&self,
producer: ProducerInstance,
effect: EffectPayload<Self::Effect>,
) -> Result<DispatchOutcome, DispatchRefusal> {
if producer.composition != self.composition {
return Err(DispatchRefusal::CompositionMismatch {
expected: self.composition.clone(),
actual: producer.composition,
});
}
let variant = effect.variant().clone();
let body = effect.body();
let descriptor = self
.table
.resolve(&producer.instance_id, &variant)
.ok_or_else(|| DispatchRefusal::UnresolvedRoute {
composition: self.composition.clone(),
instance: producer.instance_id.clone(),
variant: variant.clone(),
})?;
let mut projected: Vec<(FieldId, OwnedFieldValue)> =
Vec::with_capacity(descriptor.bindings.len());
for (from_field, to_field) in &descriptor.bindings {
let value =
body.field(from_field)
.ok_or_else(|| DispatchRefusal::MissingProducerField {
route: descriptor.route_id.clone(),
variant: variant.clone(),
field: from_field.clone(),
})?;
projected.push((to_field.clone(), value.to_owned_value()));
}
let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
DispatchRefusal::UnwiredConsumer {
composition: self.composition.clone(),
instance: descriptor.instance_id.clone(),
}
})?;
consumer
.apply_routed_input(descriptor.input_variant.clone(), projected)
.await
.map_err(|error| DispatchRefusal::ConsumerRefused {
instance: descriptor.instance_id.clone(),
variant: descriptor.input_variant.clone(),
error,
})?;
Ok(DispatchOutcome {
route: RouteKey {
composition: self.composition.clone(),
route_id: descriptor.route_id.clone(),
},
consumer: descriptor.instance_id.clone(),
applied_input: descriptor.input_variant.clone(),
})
}
}
#[async_trait]
impl<S: ProducerSignal> CompositionSignalDispatcher for CatalogCompositionSignalDispatcher<S> {
type Signal = S;
fn composition(&self) -> &CompositionId {
&self.composition
}
async fn dispatch_signal(
&self,
producer: ProducerInstance,
signal: SignalPayload<Self::Signal>,
) -> Result<SignalDispatchOutcome, SignalDispatchRefusal> {
if producer.composition != self.composition {
return Err(SignalDispatchRefusal::CompositionMismatch {
expected: self.composition.clone(),
actual: producer.composition,
});
}
let variant = signal.variant().clone();
let body = signal.body();
let descriptor = self
.table
.resolve_signal(&producer.instance_id, &variant)
.ok_or_else(|| SignalDispatchRefusal::UnresolvedRoute {
composition: self.composition.clone(),
instance: producer.instance_id.clone(),
variant: variant.clone(),
})?;
let mut projected: Vec<(FieldId, OwnedFieldValue)> =
Vec::with_capacity(descriptor.bindings.len());
for (from_field, to_field) in &descriptor.bindings {
let value = body.field(from_field).ok_or_else(|| {
SignalDispatchRefusal::MissingProducerField {
route: descriptor.route_id.clone(),
variant: variant.clone(),
field: from_field.clone(),
}
})?;
projected.push((to_field.clone(), value.to_owned_value()));
}
let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
SignalDispatchRefusal::UnwiredConsumer {
composition: self.composition.clone(),
instance: descriptor.instance_id.clone(),
}
})?;
consumer
.receive_signal(descriptor.signal_variant.clone(), projected)
.await
.map_err(|error| SignalDispatchRefusal::ConsumerRefused {
instance: descriptor.instance_id.clone(),
variant: descriptor.signal_variant.clone(),
error,
})?;
Ok(SignalDispatchOutcome {
route: RouteKey {
composition: self.composition.clone(),
route_id: descriptor.route_id.clone(),
},
consumer: descriptor.instance_id.clone(),
applied_signal: descriptor.signal_variant.clone(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
#[derive(Debug, Clone, PartialEq, Eq)]
enum SeamEffect {
Mob(MobEffect),
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum MobEffect {
RequestRuntimeBinding {
agent_runtime_id: String,
fence_token: u64,
generation: u64,
session_id: String,
},
}
impl ProducerEffect for SeamEffect {
fn variant_id(&self) -> EffectVariantId {
match self {
Self::Mob(MobEffect::RequestRuntimeBinding { .. }) => {
EffectVariantId::parse("RequestRuntimeBinding").expect("slug")
}
}
}
fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
match self {
Self::Mob(MobEffect::RequestRuntimeBinding {
agent_runtime_id,
fence_token,
generation,
session_id,
}) => match id.as_str() {
"agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
"fence_token" => Some(FieldValue::U64(*fence_token)),
"generation" => Some(FieldValue::U64(*generation)),
"session_id" => Some(FieldValue::Str(session_id)),
_ => None,
},
}
}
}
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Clone, PartialEq, Eq)]
enum SeamSignal {
RuntimeBound {
agent_runtime_id: String,
fence_token: u64,
},
RuntimeRetired {
agent_runtime_id: String,
fence_token: u64,
},
RuntimeDestroyed {
agent_runtime_id: String,
fence_token: u64,
},
}
impl ProducerSignal for SeamSignal {
fn variant_id(&self) -> EffectVariantId {
let slug = match self {
Self::RuntimeBound { .. } => "RuntimeBound",
Self::RuntimeRetired { .. } => "RuntimeRetired",
Self::RuntimeDestroyed { .. } => "RuntimeDestroyed",
};
EffectVariantId::parse(slug).expect("signal source slug")
}
fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
let (agent_runtime_id, fence_token) = match self {
Self::RuntimeBound {
agent_runtime_id,
fence_token,
}
| Self::RuntimeRetired {
agent_runtime_id,
fence_token,
}
| Self::RuntimeDestroyed {
agent_runtime_id,
fence_token,
} => (agent_runtime_id, fence_token),
};
match id.as_str() {
"agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
"fence_token" => Some(FieldValue::U64(*fence_token)),
_ => None,
}
}
}
#[derive(Default)]
struct RecordingMeerkatSurface {
log: tokio::sync::Mutex<Vec<(InputVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
}
#[async_trait]
impl ConsumerSurface for RecordingMeerkatSurface {
fn instance_id(&self) -> &MachineInstanceId {
static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
}
async fn apply_routed_input(
&self,
variant: InputVariantId,
projected_fields: Vec<(FieldId, OwnedFieldValue)>,
) -> Result<(), ConsumerError> {
self.log.lock().await.push((variant, projected_fields));
Ok(())
}
}
#[derive(Default)]
struct RecordingMobSignalSurface {
log: tokio::sync::Mutex<Vec<(SignalVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
}
#[async_trait]
impl SignalConsumerSurface for RecordingMobSignalSurface {
fn instance_id(&self) -> &MachineInstanceId {
static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
ID.get_or_init(|| MachineInstanceId::parse("mob").unwrap())
}
async fn receive_signal(
&self,
variant: SignalVariantId,
projected_fields: Vec<(FieldId, OwnedFieldValue)>,
) -> Result<(), ConsumerError> {
self.log.lock().await.push((variant, projected_fields));
Ok(())
}
}
fn mob_producer() -> ProducerInstance {
ProducerInstance {
composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
instance_id: MachineInstanceId::parse("mob").unwrap(),
machine: MachineId::parse("MobMachine").unwrap(),
}
}
fn meerkat_producer() -> ProducerInstance {
ProducerInstance {
composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
instance_id: MachineInstanceId::parse("meerkat").unwrap(),
machine: MachineId::parse("MeerkatMachine").unwrap(),
}
}
fn sample_effect() -> EffectPayload<SeamEffect> {
EffectPayload::Emitted {
variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
agent_runtime_id: "rt-1".into(),
fence_token: 7,
generation: 3,
session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
}),
}
}
fn build_dispatcher(
consumer: Arc<RecordingMeerkatSurface>,
) -> CatalogCompositionDispatcher<SeamEffect> {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
CatalogCompositionDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
}
fn sample_signal() -> SignalPayload<SeamSignal> {
let body = SeamSignal::RuntimeBound {
agent_runtime_id: "rt-1".into(),
fence_token: 7,
};
SignalPayload::Emitted {
variant: body.variant_id(),
body,
}
}
fn build_signal_dispatcher(
consumer: Arc<RecordingMobSignalSurface>,
) -> CatalogCompositionSignalDispatcher<SeamSignal> {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
CatalogCompositionSignalDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
}
#[tokio::test]
async fn dispatches_mob_routed_effect_to_meerkat_consumer() {
let consumer = Arc::new(RecordingMeerkatSurface::default());
let dispatcher = build_dispatcher(Arc::clone(&consumer));
let outcome = dispatcher
.dispatch(mob_producer(), sample_effect())
.await
.expect("well-formed routed effect");
assert_eq!(outcome.consumer.as_str(), "meerkat");
assert_eq!(outcome.applied_input.as_str(), "PrepareBindings");
assert_eq!(
outcome.route.route_id.as_str(),
"binding_request_reaches_meerkat"
);
let log = consumer.log.lock().await;
assert_eq!(
log.len(),
1,
"dispatcher must call the consumer exactly once"
);
let (variant, fields) = &log[0];
assert_eq!(variant.as_str(), "PrepareBindings");
let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
assert_eq!(
field_names,
vec![
"agent_runtime_id",
"fence_token",
"generation",
"session_id"
]
);
match &fields[0].1 {
OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
other => panic!("expected Str, got {other:?}"),
}
match &fields[1].1 {
OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
other => panic!("expected U64, got {other:?}"),
}
match &fields[2].1 {
OwnedFieldValue::U64(v) => assert_eq!(*v, 3),
other => panic!("expected U64, got {other:?}"),
}
match &fields[3].1 {
OwnedFieldValue::Str(s) => assert_eq!(s, "019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
other => panic!("expected Str for session_id, got {other:?}"),
}
}
#[tokio::test]
async fn dispatches_meerkat_routed_signal_to_mob_consumer() {
let consumer = Arc::new(RecordingMobSignalSurface::default());
let dispatcher = build_signal_dispatcher(Arc::clone(&consumer));
let outcome = dispatcher
.dispatch_signal(meerkat_producer(), sample_signal())
.await
.expect("well-formed routed signal");
assert_eq!(outcome.consumer.as_str(), "mob");
assert_eq!(outcome.applied_signal.as_str(), "ObserveRuntimeReady");
assert_eq!(outcome.route.route_id.as_str(), "runtime_bound_reaches_mob");
let log = consumer.log.lock().await;
assert_eq!(
log.len(),
1,
"dispatcher must call the signal consumer exactly once"
);
let (variant, fields) = &log[0];
assert_eq!(variant.as_str(), "ObserveRuntimeReady");
let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
assert_eq!(field_names, vec!["agent_runtime_id", "fence_token"]);
match &fields[0].1 {
OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
other => panic!("expected Str, got {other:?}"),
}
match &fields[1].1 {
OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
other => panic!("expected U64, got {other:?}"),
}
}
#[tokio::test]
async fn signal_dispatch_refuses_input_route_typed() {
let consumer = Arc::new(RecordingMobSignalSurface::default());
let dispatcher = build_signal_dispatcher(consumer);
let payload = SignalPayload::Emitted {
variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
body: SeamSignal::RuntimeBound {
agent_runtime_id: "rt-1".into(),
fence_token: 7,
},
};
let err = dispatcher
.dispatch_signal(mob_producer(), payload)
.await
.expect_err("input route is out of the signal surface");
assert!(matches!(err, SignalDispatchRefusal::UnresolvedRoute { .. }));
}
#[tokio::test]
async fn signal_dispatch_refuses_unwired_consumer_typed() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let dispatcher: CatalogCompositionSignalDispatcher<SeamSignal> =
CatalogCompositionSignalDispatcher::new(schema.name.clone(), table);
let err = dispatcher
.dispatch_signal(meerkat_producer(), sample_signal())
.await
.expect_err("unwired signal consumer");
assert!(matches!(err, SignalDispatchRefusal::UnwiredConsumer { .. }));
}
#[tokio::test]
async fn signal_dispatch_refuses_missing_field_typed() {
#[derive(Debug)]
struct BrokenSignal;
impl ProducerSignal for BrokenSignal {
fn variant_id(&self) -> EffectVariantId {
EffectVariantId::parse("RuntimeBound").unwrap()
}
fn field(&self, _id: &FieldId) -> Option<FieldValue<'_>> {
None
}
}
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let consumer = Arc::new(RecordingMobSignalSurface::default());
let dispatcher: CatalogCompositionSignalDispatcher<BrokenSignal> =
CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
.with_consumer(consumer);
let err = dispatcher
.dispatch_signal(
meerkat_producer(),
SignalPayload::Emitted {
variant: EffectVariantId::parse("RuntimeBound").unwrap(),
body: BrokenSignal,
},
)
.await
.expect_err("missing producer field");
assert!(matches!(
err,
SignalDispatchRefusal::MissingProducerField { .. }
));
}
#[tokio::test]
async fn refuses_mismatched_composition() {
let consumer = Arc::new(RecordingMeerkatSurface::default());
let dispatcher = build_dispatcher(consumer);
let mut wrong = mob_producer();
wrong.composition = CompositionId::parse("some_other_composition").unwrap();
let err = dispatcher
.dispatch(wrong, sample_effect())
.await
.expect_err("composition mismatch");
assert!(matches!(err, DispatchRefusal::CompositionMismatch { .. }));
}
#[tokio::test]
async fn refuses_unrouted_effect_typed() {
let consumer = Arc::new(RecordingMeerkatSurface::default());
let dispatcher = build_dispatcher(consumer);
let payload = EffectPayload::Emitted {
variant: EffectVariantId::parse("UnknownEffect").unwrap(),
body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
agent_runtime_id: "rt".into(),
fence_token: 0,
generation: 0,
session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
}),
};
let err = dispatcher
.dispatch(mob_producer(), payload)
.await
.expect_err("unresolved route");
assert!(matches!(err, DispatchRefusal::UnresolvedRoute { .. }));
}
#[tokio::test]
async fn refuses_unwired_consumer_typed() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).unwrap();
let dispatcher: CatalogCompositionDispatcher<SeamEffect> =
CatalogCompositionDispatcher::new(schema.name.clone(), table);
let err = dispatcher
.dispatch(mob_producer(), sample_effect())
.await
.expect_err("unwired consumer");
assert!(matches!(err, DispatchRefusal::UnwiredConsumer { .. }));
}
#[tokio::test]
async fn standalone_binding_has_no_dispatcher() {
let binding: CompositionBinding<SeamEffect> = CompositionBinding::Standalone;
assert!(binding.is_standalone());
assert!(binding.wired().is_none());
}
#[tokio::test]
async fn wired_binding_exposes_dispatcher() {
let consumer = Arc::new(RecordingMeerkatSurface::default());
let dispatcher = Arc::new(build_dispatcher(consumer));
let binding: CompositionBinding<SeamEffect> = CompositionBinding::Wired(dispatcher);
assert!(!binding.is_standalone());
assert!(binding.wired().is_some());
assert!(
binding.context_provider().is_none(),
"plain Wired binding has no owner-supplied context"
);
}
struct PinnedSessionContext {
session_id: String,
}
impl ContextProvider<SeamEffect> for PinnedSessionContext {
fn provide_context(
&self,
_producer: &ProducerInstance,
_effect: &EffectPayload<SeamEffect>,
) -> Vec<(FieldId, OwnedFieldValue)> {
vec![(
FieldId::parse("session_id").expect("field id"),
OwnedFieldValue::Str(self.session_id.clone()),
)]
}
}
#[tokio::test]
async fn owner_provided_binding_exposes_both_dispatcher_and_context() {
let consumer = Arc::new(RecordingMeerkatSurface::default());
let dispatcher = Arc::new(build_dispatcher(consumer));
let context = Arc::new(PinnedSessionContext {
session_id: "session-abc".into(),
});
let binding: CompositionBinding<SeamEffect> =
CompositionBinding::owner_provided(dispatcher, context);
assert!(!binding.is_standalone());
assert!(
binding.wired().is_some(),
"OwnerProvided is a superset of Wired for dispatcher access"
);
assert!(
binding.context_provider().is_some(),
"OwnerProvided must expose the owner-supplied context"
);
let provider = binding.context_provider().expect("context provider");
let producer = mob_producer();
let effect = sample_effect();
let fields = provider.provide_context(&producer, &effect);
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].0.as_str(), "session_id");
match &fields[0].1 {
OwnedFieldValue::Str(s) => assert_eq!(s, "session-abc"),
other => panic!("expected Str context field, got {other:?}"),
}
}
#[tokio::test]
async fn composition_binding_constructors_parallel_machine_halves() {
let standalone: CompositionBinding<SeamEffect> = CompositionBinding::standalone();
assert!(standalone.is_standalone());
assert!(standalone.wired().is_none());
assert!(standalone.context_provider().is_none());
let consumer = Arc::new(RecordingMeerkatSurface::default());
let dispatcher: Arc<dyn CompositionDispatcher<Effect = SeamEffect>> =
Arc::new(build_dispatcher(consumer));
let wired: CompositionBinding<SeamEffect> =
CompositionBinding::wired_with(Arc::clone(&dispatcher));
assert!(!wired.is_standalone());
assert!(wired.wired().is_some());
assert!(wired.context_provider().is_none());
let context = Arc::new(PinnedSessionContext {
session_id: "session-xyz".into(),
});
let owner_provided: CompositionBinding<SeamEffect> =
CompositionBinding::owner_provided(dispatcher, context);
assert!(!owner_provided.is_standalone());
assert!(owner_provided.wired().is_some());
assert!(owner_provided.context_provider().is_some());
}
struct RefusingMeerkatSurface;
#[async_trait]
impl ConsumerSurface for RefusingMeerkatSurface {
fn instance_id(&self) -> &MachineInstanceId {
static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
}
async fn apply_routed_input(
&self,
_variant: InputVariantId,
_projected_fields: Vec<(FieldId, OwnedFieldValue)>,
) -> Result<(), ConsumerError> {
Err(ConsumerError::new(
"runtime_destroyed",
"consumer machine no longer accepts inputs",
))
}
}
#[tokio::test]
async fn consumer_refusal_preserves_typed_error_code_through_dispatcher() {
let schema = meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
let dispatcher = CatalogCompositionDispatcher::new(schema.name.clone(), table)
.with_consumer(Arc::new(RefusingMeerkatSurface));
let err = dispatcher
.dispatch(mob_producer(), sample_effect())
.await
.expect_err("refusing consumer surface");
match err {
DispatchRefusal::ConsumerRefused { error, .. } => {
assert_eq!(
error.error_code(),
"runtime_destroyed",
"typed consumer error_code must survive the dispatch seam, not be flattened to a string"
);
}
other => {
panic!("expected ConsumerRefused carrying a typed ConsumerError, got {other:?}")
}
}
}
}