1pub mod route_table;
39
40use std::collections::HashMap;
41use std::fmt;
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use meerkat_machine_schema::identity::{
46 CompositionId, EffectVariantId, FieldId, InputVariantId, MachineId, MachineInstanceId, RouteId,
47 SignalVariantId,
48};
49use thiserror::Error;
50
51pub use route_table::{RouteTable, RouteTableError, RoutedInputDescriptor, RoutedSignalDescriptor};
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct ProducerInstance {
59 pub composition: CompositionId,
61 pub instance_id: MachineInstanceId,
63 pub machine: MachineId,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum EffectPayload<E> {
76 Emitted {
78 variant: EffectVariantId,
80 body: E,
82 },
83}
84
85impl<E: ProducerEffect> EffectPayload<E> {
86 pub fn variant(&self) -> &EffectVariantId {
88 match self {
89 Self::Emitted { variant, .. } => variant,
90 }
91 }
92
93 pub fn body(&self) -> &E {
95 match self {
96 Self::Emitted { body, .. } => body,
97 }
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum SignalPayload<S> {
105 Emitted {
107 variant: EffectVariantId,
111 body: S,
113 },
114}
115
116impl<S: ProducerSignal> SignalPayload<S> {
117 pub fn variant(&self) -> &EffectVariantId {
119 match self {
120 Self::Emitted { variant, .. } => variant,
121 }
122 }
123
124 pub fn body(&self) -> &S {
126 match self {
127 Self::Emitted { body, .. } => body,
128 }
129 }
130}
131
132#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct RouteKey {
135 pub composition: CompositionId,
136 pub route_id: RouteId,
137}
138
139pub trait ProducerEffect: fmt::Debug + Send + Sync + 'static {
144 fn variant_id(&self) -> EffectVariantId;
151
152 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
160}
161
162pub trait ProducerSignal: fmt::Debug + Send + Sync + 'static {
172 fn variant_id(&self) -> EffectVariantId;
174
175 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
177}
178
179#[derive(Debug, Clone)]
188pub enum FieldValue<'a> {
189 Str(&'a str),
191 U64(u64),
193 I64(i64),
195 Bool(bool),
197 Opaque(Arc<dyn std::any::Any + Send + Sync>),
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DispatchOutcome {
207 pub route: RouteKey,
209 pub consumer: MachineInstanceId,
211 pub applied_input: InputVariantId,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct SignalDispatchOutcome {
218 pub route: RouteKey,
220 pub consumer: MachineInstanceId,
222 pub applied_signal: SignalVariantId,
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Error)]
232pub enum DispatchRefusal {
233 #[error("dispatcher composition {expected} does not match producer composition {actual}")]
235 CompositionMismatch {
236 expected: CompositionId,
237 actual: CompositionId,
238 },
239 #[error(
241 "no input route declared for producer {instance} effect variant {variant} in composition {composition}"
242 )]
243 UnresolvedRoute {
244 composition: CompositionId,
245 instance: MachineInstanceId,
246 variant: EffectVariantId,
247 },
248 #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
251 MissingProducerField {
252 route: RouteId,
253 variant: EffectVariantId,
254 field: FieldId,
255 },
256 #[error(
260 "no consumer surface registered for target instance {instance} in composition {composition}"
261 )]
262 UnwiredConsumer {
263 composition: CompositionId,
264 instance: MachineInstanceId,
265 },
266 #[error("consumer {instance} refused input {variant}: {error}")]
273 ConsumerRefused {
274 instance: MachineInstanceId,
275 variant: InputVariantId,
276 error: ConsumerError,
277 },
278}
279
280#[derive(Debug, Clone, PartialEq, Eq, Error)]
282pub enum SignalDispatchRefusal {
283 #[error("dispatcher composition {expected} does not match producer composition {actual}")]
285 CompositionMismatch {
286 expected: CompositionId,
287 actual: CompositionId,
288 },
289 #[error(
291 "no signal route declared for producer {instance} variant {variant} in composition {composition}"
292 )]
293 UnresolvedRoute {
294 composition: CompositionId,
295 instance: MachineInstanceId,
296 variant: EffectVariantId,
297 },
298 #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
301 MissingProducerField {
302 route: RouteId,
303 variant: EffectVariantId,
304 field: FieldId,
305 },
306 #[error(
309 "no signal consumer surface registered for target instance {instance} in composition {composition}"
310 )]
311 UnwiredConsumer {
312 composition: CompositionId,
313 instance: MachineInstanceId,
314 },
315 #[error("consumer {instance} refused signal {variant}: {error}")]
319 ConsumerRefused {
320 instance: MachineInstanceId,
321 variant: SignalVariantId,
322 error: ConsumerError,
323 },
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Error)]
337#[error("{message} [{error_code}]")]
338pub struct ConsumerError {
339 error_code: &'static str,
343 message: String,
345}
346
347impl ConsumerError {
348 pub fn new(error_code: &'static str, message: impl Into<String>) -> Self {
351 Self {
352 error_code,
353 message: message.into(),
354 }
355 }
356
357 pub fn error_code(&self) -> &'static str {
359 self.error_code
360 }
361
362 pub fn message(&self) -> &str {
364 &self.message
365 }
366}
367
368impl From<String> for ConsumerError {
369 fn from(message: String) -> Self {
375 Self::new("consumer_projection_failed", message)
376 }
377}
378
379#[async_trait]
388pub trait ConsumerSurface: Send + Sync {
389 fn instance_id(&self) -> &MachineInstanceId;
392
393 async fn apply_routed_input(
398 &self,
399 variant: InputVariantId,
400 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
401 ) -> Result<(), ConsumerError>;
402}
403
404#[async_trait]
406pub trait SignalConsumerSurface: Send + Sync {
407 fn instance_id(&self) -> &MachineInstanceId;
409
410 async fn receive_signal(
412 &self,
413 variant: SignalVariantId,
414 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
415 ) -> Result<(), ConsumerError>;
416}
417
418#[derive(Debug, Clone)]
423pub enum OwnedFieldValue {
424 Str(String),
425 U64(u64),
426 I64(i64),
427 Bool(bool),
428 Opaque(Arc<dyn std::any::Any + Send + Sync>),
429}
430
431impl FieldValue<'_> {
432 pub fn to_owned_value(&self) -> OwnedFieldValue {
436 match self {
437 FieldValue::Str(s) => OwnedFieldValue::Str((*s).to_owned()),
438 FieldValue::U64(v) => OwnedFieldValue::U64(*v),
439 FieldValue::I64(v) => OwnedFieldValue::I64(*v),
440 FieldValue::Bool(v) => OwnedFieldValue::Bool(*v),
441 FieldValue::Opaque(handle) => OwnedFieldValue::Opaque(Arc::clone(handle)),
442 }
443 }
444}
445
446#[async_trait]
454pub trait CompositionDispatcher: Send + Sync {
455 type Effect: ProducerEffect;
458
459 fn composition(&self) -> &CompositionId;
462
463 async fn dispatch(
466 &self,
467 producer: ProducerInstance,
468 effect: EffectPayload<Self::Effect>,
469 ) -> Result<DispatchOutcome, DispatchRefusal>;
470}
471
472#[async_trait]
474pub trait CompositionSignalDispatcher: Send + Sync {
475 type Signal: ProducerSignal;
477
478 fn composition(&self) -> &CompositionId;
480
481 async fn dispatch_signal(
484 &self,
485 producer: ProducerInstance,
486 signal: SignalPayload<Self::Signal>,
487 ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal>;
488}
489
490pub trait ContextProvider<E: ProducerEffect>: Send + Sync {
514 fn provide_context(
527 &self,
528 producer: &ProducerInstance,
529 effect: &EffectPayload<E>,
530 ) -> Vec<(FieldId, OwnedFieldValue)>;
531}
532
533pub enum CompositionBinding<E: ProducerEffect> {
557 Standalone,
560 Wired(Arc<dyn CompositionDispatcher<Effect = E>>),
564 OwnerProvided {
570 dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
571 context: Arc<dyn ContextProvider<E>>,
572 },
573}
574
575impl<E: ProducerEffect> fmt::Debug for CompositionBinding<E> {
576 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577 match self {
578 Self::Standalone => f.debug_struct("CompositionBinding::Standalone").finish(),
579 Self::Wired(_) => f
580 .debug_struct("CompositionBinding::Wired")
581 .field("dispatcher", &"<dyn CompositionDispatcher>")
582 .finish(),
583 Self::OwnerProvided { .. } => f
584 .debug_struct("CompositionBinding::OwnerProvided")
585 .field("dispatcher", &"<dyn CompositionDispatcher>")
586 .field("context", &"<dyn ContextProvider>")
587 .finish(),
588 }
589 }
590}
591
592impl<E: ProducerEffect> CompositionBinding<E> {
593 pub fn standalone() -> Self {
600 Self::Standalone
601 }
602
603 pub fn wired_with(dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>) -> Self {
609 Self::Wired(dispatcher)
610 }
611
612 pub fn owner_provided(
620 dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
621 context: Arc<dyn ContextProvider<E>>,
622 ) -> Self {
623 Self::OwnerProvided {
624 dispatcher,
625 context,
626 }
627 }
628
629 pub fn is_standalone(&self) -> bool {
631 matches!(self, Self::Standalone)
632 }
633
634 pub fn wired(&self) -> Option<&Arc<dyn CompositionDispatcher<Effect = E>>> {
643 match self {
644 Self::Standalone => None,
645 Self::Wired(d) => Some(d),
646 Self::OwnerProvided { dispatcher, .. } => Some(dispatcher),
647 }
648 }
649
650 pub fn context_provider(&self) -> Option<&Arc<dyn ContextProvider<E>>> {
659 match self {
660 Self::Standalone | Self::Wired(_) => None,
661 Self::OwnerProvided { context, .. } => Some(context),
662 }
663 }
664}
665
666pub struct CatalogCompositionDispatcher<E: ProducerEffect> {
681 composition: CompositionId,
682 table: RouteTable,
683 consumers: HashMap<MachineInstanceId, Arc<dyn ConsumerSurface>>,
684 _effect: std::marker::PhantomData<fn(E)>,
685}
686
687pub struct CatalogCompositionSignalDispatcher<S: ProducerSignal> {
693 composition: CompositionId,
694 table: RouteTable,
695 consumers: HashMap<MachineInstanceId, Arc<dyn SignalConsumerSurface>>,
696 _signal: std::marker::PhantomData<fn(S)>,
697}
698
699impl<S: ProducerSignal> fmt::Debug for CatalogCompositionSignalDispatcher<S> {
700 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
701 f.debug_struct("CatalogCompositionSignalDispatcher")
702 .field("composition", &self.composition)
703 .field("signal_routes", &self.table.signal_route_count())
704 .field("consumers", &self.consumers.len())
705 .finish()
706 }
707}
708
709impl<E: ProducerEffect> fmt::Debug for CatalogCompositionDispatcher<E> {
710 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711 f.debug_struct("CatalogCompositionDispatcher")
712 .field("composition", &self.composition)
713 .field("routes", &self.table.len())
714 .field("consumers", &self.consumers.len())
715 .finish()
716 }
717}
718
719impl<E: ProducerEffect> CatalogCompositionDispatcher<E> {
720 pub fn new(composition: CompositionId, table: RouteTable) -> Self {
723 Self {
724 composition,
725 table,
726 consumers: HashMap::new(),
727 _effect: std::marker::PhantomData,
728 }
729 }
730
731 pub fn with_consumer(mut self, surface: Arc<dyn ConsumerSurface>) -> Self {
738 self.consumers
739 .insert(surface.instance_id().clone(), surface);
740 self
741 }
742}
743
744impl<S: ProducerSignal> CatalogCompositionSignalDispatcher<S> {
745 pub fn new(composition: CompositionId, table: RouteTable) -> Self {
747 Self {
748 composition,
749 table,
750 consumers: HashMap::new(),
751 _signal: std::marker::PhantomData,
752 }
753 }
754
755 pub fn with_consumer(mut self, surface: Arc<dyn SignalConsumerSurface>) -> Self {
757 self.consumers
758 .insert(surface.instance_id().clone(), surface);
759 self
760 }
761}
762
763#[async_trait]
764impl<E: ProducerEffect> CompositionDispatcher for CatalogCompositionDispatcher<E> {
765 type Effect = E;
766
767 fn composition(&self) -> &CompositionId {
768 &self.composition
769 }
770
771 async fn dispatch(
772 &self,
773 producer: ProducerInstance,
774 effect: EffectPayload<Self::Effect>,
775 ) -> Result<DispatchOutcome, DispatchRefusal> {
776 if producer.composition != self.composition {
777 return Err(DispatchRefusal::CompositionMismatch {
778 expected: self.composition.clone(),
779 actual: producer.composition,
780 });
781 }
782
783 let variant = effect.variant().clone();
784 let body = effect.body();
785
786 let descriptor = self
787 .table
788 .resolve(&producer.instance_id, &variant)
789 .ok_or_else(|| DispatchRefusal::UnresolvedRoute {
790 composition: self.composition.clone(),
791 instance: producer.instance_id.clone(),
792 variant: variant.clone(),
793 })?;
794
795 let mut projected: Vec<(FieldId, OwnedFieldValue)> =
796 Vec::with_capacity(descriptor.bindings.len());
797 for (from_field, to_field) in &descriptor.bindings {
798 let value =
799 body.field(from_field)
800 .ok_or_else(|| DispatchRefusal::MissingProducerField {
801 route: descriptor.route_id.clone(),
802 variant: variant.clone(),
803 field: from_field.clone(),
804 })?;
805 projected.push((to_field.clone(), value.to_owned_value()));
806 }
807
808 let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
809 DispatchRefusal::UnwiredConsumer {
810 composition: self.composition.clone(),
811 instance: descriptor.instance_id.clone(),
812 }
813 })?;
814
815 consumer
816 .apply_routed_input(descriptor.input_variant.clone(), projected)
817 .await
818 .map_err(|error| DispatchRefusal::ConsumerRefused {
819 instance: descriptor.instance_id.clone(),
820 variant: descriptor.input_variant.clone(),
821 error,
822 })?;
823
824 Ok(DispatchOutcome {
825 route: RouteKey {
826 composition: self.composition.clone(),
827 route_id: descriptor.route_id.clone(),
828 },
829 consumer: descriptor.instance_id.clone(),
830 applied_input: descriptor.input_variant.clone(),
831 })
832 }
833}
834
835#[async_trait]
836impl<S: ProducerSignal> CompositionSignalDispatcher for CatalogCompositionSignalDispatcher<S> {
837 type Signal = S;
838
839 fn composition(&self) -> &CompositionId {
840 &self.composition
841 }
842
843 async fn dispatch_signal(
844 &self,
845 producer: ProducerInstance,
846 signal: SignalPayload<Self::Signal>,
847 ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal> {
848 if producer.composition != self.composition {
849 return Err(SignalDispatchRefusal::CompositionMismatch {
850 expected: self.composition.clone(),
851 actual: producer.composition,
852 });
853 }
854
855 let variant = signal.variant().clone();
856 let body = signal.body();
857
858 let descriptor = self
859 .table
860 .resolve_signal(&producer.instance_id, &variant)
861 .ok_or_else(|| SignalDispatchRefusal::UnresolvedRoute {
862 composition: self.composition.clone(),
863 instance: producer.instance_id.clone(),
864 variant: variant.clone(),
865 })?;
866
867 let mut projected: Vec<(FieldId, OwnedFieldValue)> =
868 Vec::with_capacity(descriptor.bindings.len());
869 for (from_field, to_field) in &descriptor.bindings {
870 let value = body.field(from_field).ok_or_else(|| {
871 SignalDispatchRefusal::MissingProducerField {
872 route: descriptor.route_id.clone(),
873 variant: variant.clone(),
874 field: from_field.clone(),
875 }
876 })?;
877 projected.push((to_field.clone(), value.to_owned_value()));
878 }
879
880 let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
881 SignalDispatchRefusal::UnwiredConsumer {
882 composition: self.composition.clone(),
883 instance: descriptor.instance_id.clone(),
884 }
885 })?;
886
887 consumer
888 .receive_signal(descriptor.signal_variant.clone(), projected)
889 .await
890 .map_err(|error| SignalDispatchRefusal::ConsumerRefused {
891 instance: descriptor.instance_id.clone(),
892 variant: descriptor.signal_variant.clone(),
893 error,
894 })?;
895
896 Ok(SignalDispatchOutcome {
897 route: RouteKey {
898 composition: self.composition.clone(),
899 route_id: descriptor.route_id.clone(),
900 },
901 consumer: descriptor.instance_id.clone(),
902 applied_signal: descriptor.signal_variant.clone(),
903 })
904 }
905}
906
907#[cfg(test)]
908mod tests {
909 use super::*;
910 use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
911
912 #[derive(Debug, Clone, PartialEq, Eq)]
918 enum SeamEffect {
919 Mob(MobEffect),
920 }
921
922 #[derive(Debug, Clone, PartialEq, Eq)]
923 enum MobEffect {
924 RequestRuntimeBinding {
925 agent_runtime_id: String,
926 fence_token: u64,
927 generation: u64,
928 session_id: String,
929 },
930 }
931
932 impl ProducerEffect for SeamEffect {
933 fn variant_id(&self) -> EffectVariantId {
934 match self {
935 Self::Mob(MobEffect::RequestRuntimeBinding { .. }) => {
936 EffectVariantId::parse("RequestRuntimeBinding").expect("slug")
937 }
938 }
939 }
940
941 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
942 match self {
943 Self::Mob(MobEffect::RequestRuntimeBinding {
944 agent_runtime_id,
945 fence_token,
946 generation,
947 session_id,
948 }) => match id.as_str() {
949 "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
950 "fence_token" => Some(FieldValue::U64(*fence_token)),
951 "generation" => Some(FieldValue::U64(*generation)),
952 "session_id" => Some(FieldValue::Str(session_id)),
953 _ => None,
954 },
955 }
956 }
957 }
958
959 #[allow(clippy::enum_variant_names)]
963 #[derive(Debug, Clone, PartialEq, Eq)]
964 enum SeamSignal {
965 RuntimeBound {
966 agent_runtime_id: String,
967 fence_token: u64,
968 },
969 RuntimeRetired {
970 agent_runtime_id: String,
971 fence_token: u64,
972 },
973 RuntimeDestroyed {
974 agent_runtime_id: String,
975 fence_token: u64,
976 },
977 }
978
979 impl ProducerSignal for SeamSignal {
980 fn variant_id(&self) -> EffectVariantId {
981 let slug = match self {
982 Self::RuntimeBound { .. } => "RuntimeBound",
983 Self::RuntimeRetired { .. } => "RuntimeRetired",
984 Self::RuntimeDestroyed { .. } => "RuntimeDestroyed",
985 };
986 EffectVariantId::parse(slug).expect("signal source slug")
987 }
988
989 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
990 let (agent_runtime_id, fence_token) = match self {
991 Self::RuntimeBound {
992 agent_runtime_id,
993 fence_token,
994 }
995 | Self::RuntimeRetired {
996 agent_runtime_id,
997 fence_token,
998 }
999 | Self::RuntimeDestroyed {
1000 agent_runtime_id,
1001 fence_token,
1002 } => (agent_runtime_id, fence_token),
1003 };
1004 match id.as_str() {
1005 "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
1006 "fence_token" => Some(FieldValue::U64(*fence_token)),
1007 _ => None,
1008 }
1009 }
1010 }
1011
1012 #[derive(Default)]
1013 struct RecordingMeerkatSurface {
1014 log: tokio::sync::Mutex<Vec<(InputVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
1015 }
1016
1017 #[async_trait]
1018 impl ConsumerSurface for RecordingMeerkatSurface {
1019 fn instance_id(&self) -> &MachineInstanceId {
1020 static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
1023 ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
1024 }
1025
1026 async fn apply_routed_input(
1027 &self,
1028 variant: InputVariantId,
1029 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
1030 ) -> Result<(), ConsumerError> {
1031 self.log.lock().await.push((variant, projected_fields));
1032 Ok(())
1033 }
1034 }
1035
1036 #[derive(Default)]
1037 struct RecordingMobSignalSurface {
1038 log: tokio::sync::Mutex<Vec<(SignalVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
1039 }
1040
1041 #[async_trait]
1042 impl SignalConsumerSurface for RecordingMobSignalSurface {
1043 fn instance_id(&self) -> &MachineInstanceId {
1044 static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
1045 ID.get_or_init(|| MachineInstanceId::parse("mob").unwrap())
1046 }
1047
1048 async fn receive_signal(
1049 &self,
1050 variant: SignalVariantId,
1051 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
1052 ) -> Result<(), ConsumerError> {
1053 self.log.lock().await.push((variant, projected_fields));
1054 Ok(())
1055 }
1056 }
1057
1058 fn mob_producer() -> ProducerInstance {
1059 ProducerInstance {
1060 composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1061 instance_id: MachineInstanceId::parse("mob").unwrap(),
1062 machine: MachineId::parse("MobMachine").unwrap(),
1063 }
1064 }
1065
1066 fn meerkat_producer() -> ProducerInstance {
1067 ProducerInstance {
1068 composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1069 instance_id: MachineInstanceId::parse("meerkat").unwrap(),
1070 machine: MachineId::parse("MeerkatMachine").unwrap(),
1071 }
1072 }
1073
1074 fn sample_effect() -> EffectPayload<SeamEffect> {
1075 EffectPayload::Emitted {
1076 variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1077 body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1078 agent_runtime_id: "rt-1".into(),
1079 fence_token: 7,
1080 generation: 3,
1081 session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1082 }),
1083 }
1084 }
1085
1086 fn build_dispatcher(
1087 consumer: Arc<RecordingMeerkatSurface>,
1088 ) -> CatalogCompositionDispatcher<SeamEffect> {
1089 let schema = meerkat_mob_seam_composition();
1090 let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1091 CatalogCompositionDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1092 }
1093
1094 fn sample_signal() -> SignalPayload<SeamSignal> {
1095 let body = SeamSignal::RuntimeBound {
1096 agent_runtime_id: "rt-1".into(),
1097 fence_token: 7,
1098 };
1099 SignalPayload::Emitted {
1100 variant: body.variant_id(),
1101 body,
1102 }
1103 }
1104
1105 fn build_signal_dispatcher(
1106 consumer: Arc<RecordingMobSignalSurface>,
1107 ) -> CatalogCompositionSignalDispatcher<SeamSignal> {
1108 let schema = meerkat_mob_seam_composition();
1109 let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1110 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1111 }
1112
1113 #[tokio::test]
1114 async fn dispatches_mob_routed_effect_to_meerkat_consumer() {
1115 let consumer = Arc::new(RecordingMeerkatSurface::default());
1116 let dispatcher = build_dispatcher(Arc::clone(&consumer));
1117
1118 let outcome = dispatcher
1119 .dispatch(mob_producer(), sample_effect())
1120 .await
1121 .expect("well-formed routed effect");
1122
1123 assert_eq!(outcome.consumer.as_str(), "meerkat");
1124 assert_eq!(outcome.applied_input.as_str(), "PrepareBindings");
1125 assert_eq!(
1126 outcome.route.route_id.as_str(),
1127 "binding_request_reaches_meerkat"
1128 );
1129
1130 let log = consumer.log.lock().await;
1131 assert_eq!(
1132 log.len(),
1133 1,
1134 "dispatcher must call the consumer exactly once"
1135 );
1136 let (variant, fields) = &log[0];
1137 assert_eq!(variant.as_str(), "PrepareBindings");
1138 let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1139 assert_eq!(
1140 field_names,
1141 vec![
1142 "agent_runtime_id",
1143 "fence_token",
1144 "generation",
1145 "session_id"
1146 ]
1147 );
1148 match &fields[0].1 {
1149 OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1150 other => panic!("expected Str, got {other:?}"),
1151 }
1152 match &fields[1].1 {
1153 OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1154 other => panic!("expected U64, got {other:?}"),
1155 }
1156 match &fields[2].1 {
1157 OwnedFieldValue::U64(v) => assert_eq!(*v, 3),
1158 other => panic!("expected U64, got {other:?}"),
1159 }
1160 match &fields[3].1 {
1161 OwnedFieldValue::Str(s) => assert_eq!(s, "019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
1162 other => panic!("expected Str for session_id, got {other:?}"),
1163 }
1164 }
1165
1166 #[tokio::test]
1167 async fn dispatches_meerkat_routed_signal_to_mob_consumer() {
1168 let consumer = Arc::new(RecordingMobSignalSurface::default());
1169 let dispatcher = build_signal_dispatcher(Arc::clone(&consumer));
1170
1171 let outcome = dispatcher
1172 .dispatch_signal(meerkat_producer(), sample_signal())
1173 .await
1174 .expect("well-formed routed signal");
1175
1176 assert_eq!(outcome.consumer.as_str(), "mob");
1177 assert_eq!(outcome.applied_signal.as_str(), "ObserveRuntimeReady");
1178 assert_eq!(outcome.route.route_id.as_str(), "runtime_bound_reaches_mob");
1179
1180 let log = consumer.log.lock().await;
1181 assert_eq!(
1182 log.len(),
1183 1,
1184 "dispatcher must call the signal consumer exactly once"
1185 );
1186 let (variant, fields) = &log[0];
1187 assert_eq!(variant.as_str(), "ObserveRuntimeReady");
1188 let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1189 assert_eq!(field_names, vec!["agent_runtime_id", "fence_token"]);
1190 match &fields[0].1 {
1191 OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1192 other => panic!("expected Str, got {other:?}"),
1193 }
1194 match &fields[1].1 {
1195 OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1196 other => panic!("expected U64, got {other:?}"),
1197 }
1198 }
1199
1200 #[tokio::test]
1201 async fn signal_dispatch_refuses_input_route_typed() {
1202 let consumer = Arc::new(RecordingMobSignalSurface::default());
1203 let dispatcher = build_signal_dispatcher(consumer);
1204
1205 let payload = SignalPayload::Emitted {
1206 variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1207 body: SeamSignal::RuntimeBound {
1208 agent_runtime_id: "rt-1".into(),
1209 fence_token: 7,
1210 },
1211 };
1212
1213 let err = dispatcher
1214 .dispatch_signal(mob_producer(), payload)
1215 .await
1216 .expect_err("input route is out of the signal surface");
1217
1218 assert!(matches!(err, SignalDispatchRefusal::UnresolvedRoute { .. }));
1219 }
1220
1221 #[tokio::test]
1222 async fn signal_dispatch_refuses_unwired_consumer_typed() {
1223 let schema = meerkat_mob_seam_composition();
1224 let table = RouteTable::from_schema(&schema).unwrap();
1225 let dispatcher: CatalogCompositionSignalDispatcher<SeamSignal> =
1226 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table);
1227
1228 let err = dispatcher
1229 .dispatch_signal(meerkat_producer(), sample_signal())
1230 .await
1231 .expect_err("unwired signal consumer");
1232
1233 assert!(matches!(err, SignalDispatchRefusal::UnwiredConsumer { .. }));
1234 }
1235
1236 #[tokio::test]
1237 async fn signal_dispatch_refuses_missing_field_typed() {
1238 #[derive(Debug)]
1239 struct BrokenSignal;
1240
1241 impl ProducerSignal for BrokenSignal {
1242 fn variant_id(&self) -> EffectVariantId {
1243 EffectVariantId::parse("RuntimeBound").unwrap()
1244 }
1245
1246 fn field(&self, _id: &FieldId) -> Option<FieldValue<'_>> {
1247 None
1248 }
1249 }
1250
1251 let schema = meerkat_mob_seam_composition();
1252 let table = RouteTable::from_schema(&schema).unwrap();
1253 let consumer = Arc::new(RecordingMobSignalSurface::default());
1254 let dispatcher: CatalogCompositionSignalDispatcher<BrokenSignal> =
1255 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
1256 .with_consumer(consumer);
1257
1258 let err = dispatcher
1259 .dispatch_signal(
1260 meerkat_producer(),
1261 SignalPayload::Emitted {
1262 variant: EffectVariantId::parse("RuntimeBound").unwrap(),
1263 body: BrokenSignal,
1264 },
1265 )
1266 .await
1267 .expect_err("missing producer field");
1268
1269 assert!(matches!(
1270 err,
1271 SignalDispatchRefusal::MissingProducerField { .. }
1272 ));
1273 }
1274
1275 #[tokio::test]
1276 async fn refuses_mismatched_composition() {
1277 let consumer = Arc::new(RecordingMeerkatSurface::default());
1278 let dispatcher = build_dispatcher(consumer);
1279
1280 let mut wrong = mob_producer();
1281 wrong.composition = CompositionId::parse("some_other_composition").unwrap();
1282
1283 let err = dispatcher
1284 .dispatch(wrong, sample_effect())
1285 .await
1286 .expect_err("composition mismatch");
1287
1288 assert!(matches!(err, DispatchRefusal::CompositionMismatch { .. }));
1289 }
1290
1291 #[tokio::test]
1292 async fn refuses_unrouted_effect_typed() {
1293 let consumer = Arc::new(RecordingMeerkatSurface::default());
1294 let dispatcher = build_dispatcher(consumer);
1295
1296 let payload = EffectPayload::Emitted {
1300 variant: EffectVariantId::parse("UnknownEffect").unwrap(),
1301 body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1302 agent_runtime_id: "rt".into(),
1303 fence_token: 0,
1304 generation: 0,
1305 session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1306 }),
1307 };
1308
1309 let err = dispatcher
1310 .dispatch(mob_producer(), payload)
1311 .await
1312 .expect_err("unresolved route");
1313
1314 assert!(matches!(err, DispatchRefusal::UnresolvedRoute { .. }));
1315 }
1316
1317 #[tokio::test]
1318 async fn refuses_unwired_consumer_typed() {
1319 let schema = meerkat_mob_seam_composition();
1323 let table = RouteTable::from_schema(&schema).unwrap();
1324 let dispatcher: CatalogCompositionDispatcher<SeamEffect> =
1325 CatalogCompositionDispatcher::new(schema.name.clone(), table);
1326
1327 let err = dispatcher
1328 .dispatch(mob_producer(), sample_effect())
1329 .await
1330 .expect_err("unwired consumer");
1331
1332 assert!(matches!(err, DispatchRefusal::UnwiredConsumer { .. }));
1333 }
1334
1335 #[tokio::test]
1336 async fn standalone_binding_has_no_dispatcher() {
1337 let binding: CompositionBinding<SeamEffect> = CompositionBinding::Standalone;
1338 assert!(binding.is_standalone());
1339 assert!(binding.wired().is_none());
1340 }
1341
1342 #[tokio::test]
1343 async fn wired_binding_exposes_dispatcher() {
1344 let consumer = Arc::new(RecordingMeerkatSurface::default());
1345 let dispatcher = Arc::new(build_dispatcher(consumer));
1346 let binding: CompositionBinding<SeamEffect> = CompositionBinding::Wired(dispatcher);
1347 assert!(!binding.is_standalone());
1348 assert!(binding.wired().is_some());
1349 assert!(
1350 binding.context_provider().is_none(),
1351 "plain Wired binding has no owner-supplied context"
1352 );
1353 }
1354
1355 struct PinnedSessionContext {
1361 session_id: String,
1362 }
1363
1364 impl ContextProvider<SeamEffect> for PinnedSessionContext {
1365 fn provide_context(
1366 &self,
1367 _producer: &ProducerInstance,
1368 _effect: &EffectPayload<SeamEffect>,
1369 ) -> Vec<(FieldId, OwnedFieldValue)> {
1370 vec![(
1371 FieldId::parse("session_id").expect("field id"),
1372 OwnedFieldValue::Str(self.session_id.clone()),
1373 )]
1374 }
1375 }
1376
1377 #[tokio::test]
1378 async fn owner_provided_binding_exposes_both_dispatcher_and_context() {
1379 let consumer = Arc::new(RecordingMeerkatSurface::default());
1380 let dispatcher = Arc::new(build_dispatcher(consumer));
1381 let context = Arc::new(PinnedSessionContext {
1382 session_id: "session-abc".into(),
1383 });
1384 let binding: CompositionBinding<SeamEffect> =
1385 CompositionBinding::owner_provided(dispatcher, context);
1386
1387 assert!(!binding.is_standalone());
1388 assert!(
1389 binding.wired().is_some(),
1390 "OwnerProvided is a superset of Wired for dispatcher access"
1391 );
1392 assert!(
1393 binding.context_provider().is_some(),
1394 "OwnerProvided must expose the owner-supplied context"
1395 );
1396
1397 let provider = binding.context_provider().expect("context provider");
1402 let producer = mob_producer();
1403 let effect = sample_effect();
1404 let fields = provider.provide_context(&producer, &effect);
1405 assert_eq!(fields.len(), 1);
1406 assert_eq!(fields[0].0.as_str(), "session_id");
1407 match &fields[0].1 {
1408 OwnedFieldValue::Str(s) => assert_eq!(s, "session-abc"),
1409 other => panic!("expected Str context field, got {other:?}"),
1410 }
1411 }
1412
1413 #[tokio::test]
1414 async fn composition_binding_constructors_parallel_machine_halves() {
1415 let standalone: CompositionBinding<SeamEffect> = CompositionBinding::standalone();
1421 assert!(standalone.is_standalone());
1422 assert!(standalone.wired().is_none());
1423 assert!(standalone.context_provider().is_none());
1424
1425 let consumer = Arc::new(RecordingMeerkatSurface::default());
1426 let dispatcher: Arc<dyn CompositionDispatcher<Effect = SeamEffect>> =
1427 Arc::new(build_dispatcher(consumer));
1428 let wired: CompositionBinding<SeamEffect> =
1429 CompositionBinding::wired_with(Arc::clone(&dispatcher));
1430 assert!(!wired.is_standalone());
1431 assert!(wired.wired().is_some());
1432 assert!(wired.context_provider().is_none());
1433
1434 let context = Arc::new(PinnedSessionContext {
1435 session_id: "session-xyz".into(),
1436 });
1437 let owner_provided: CompositionBinding<SeamEffect> =
1438 CompositionBinding::owner_provided(dispatcher, context);
1439 assert!(!owner_provided.is_standalone());
1440 assert!(owner_provided.wired().is_some());
1441 assert!(owner_provided.context_provider().is_some());
1442 }
1443
1444 struct RefusingMeerkatSurface;
1447
1448 #[async_trait]
1449 impl ConsumerSurface for RefusingMeerkatSurface {
1450 fn instance_id(&self) -> &MachineInstanceId {
1451 static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
1452 ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
1453 }
1454
1455 async fn apply_routed_input(
1456 &self,
1457 _variant: InputVariantId,
1458 _projected_fields: Vec<(FieldId, OwnedFieldValue)>,
1459 ) -> Result<(), ConsumerError> {
1460 Err(ConsumerError::new(
1461 "runtime_destroyed",
1462 "consumer machine no longer accepts inputs",
1463 ))
1464 }
1465 }
1466
1467 #[tokio::test]
1473 async fn consumer_refusal_preserves_typed_error_code_through_dispatcher() {
1474 let schema = meerkat_mob_seam_composition();
1475 let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1476 let dispatcher = CatalogCompositionDispatcher::new(schema.name.clone(), table)
1477 .with_consumer(Arc::new(RefusingMeerkatSurface));
1478
1479 let err = dispatcher
1480 .dispatch(mob_producer(), sample_effect())
1481 .await
1482 .expect_err("refusing consumer surface");
1483
1484 match err {
1485 DispatchRefusal::ConsumerRefused { error, .. } => {
1486 assert_eq!(
1487 error.error_code(),
1488 "runtime_destroyed",
1489 "typed consumer error_code must survive the dispatch seam, not be flattened to a string"
1490 );
1491 }
1492 other => {
1493 panic!("expected ConsumerRefused carrying a typed ConsumerError, got {other:?}")
1494 }
1495 }
1496 }
1497}