1pub mod route_table;
39
40use std::collections::HashMap;
41use std::fmt;
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use meerkat_machine_schema::identity::{
46 CompositionId, EffectVariantId, FieldId, InputVariantId, MachineId, MachineInstanceId, RouteId,
47 SignalVariantId,
48};
49use thiserror::Error;
50
51pub use route_table::{RouteTable, RouteTableError, RoutedInputDescriptor, RoutedSignalDescriptor};
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct ProducerInstance {
59 pub composition: CompositionId,
61 pub instance_id: MachineInstanceId,
63 pub machine: MachineId,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum EffectPayload<E> {
76 Emitted {
78 variant: EffectVariantId,
80 body: E,
82 },
83}
84
85impl<E: ProducerEffect> EffectPayload<E> {
86 pub fn variant(&self) -> &EffectVariantId {
88 match self {
89 Self::Emitted { variant, .. } => variant,
90 }
91 }
92
93 pub fn body(&self) -> &E {
95 match self {
96 Self::Emitted { body, .. } => body,
97 }
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum SignalPayload<S> {
105 Emitted {
107 variant: EffectVariantId,
111 body: S,
113 },
114}
115
116impl<S: ProducerSignal> SignalPayload<S> {
117 pub fn variant(&self) -> &EffectVariantId {
119 match self {
120 Self::Emitted { variant, .. } => variant,
121 }
122 }
123
124 pub fn body(&self) -> &S {
126 match self {
127 Self::Emitted { body, .. } => body,
128 }
129 }
130}
131
132#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct RouteKey {
135 pub composition: CompositionId,
136 pub route_id: RouteId,
137}
138
139pub trait ProducerEffect: fmt::Debug + Send + Sync + 'static {
144 fn variant_id(&self) -> EffectVariantId;
151
152 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
160}
161
162pub trait ProducerSignal: fmt::Debug + Send + Sync + 'static {
172 fn variant_id(&self) -> EffectVariantId;
174
175 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
177}
178
179#[derive(Debug, Clone)]
188pub enum FieldValue<'a> {
189 Str(&'a str),
191 U64(u64),
193 I64(i64),
195 Bool(bool),
197 Opaque(Arc<dyn std::any::Any + Send + Sync>),
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DispatchOutcome {
207 pub route: RouteKey,
209 pub consumer: MachineInstanceId,
211 pub applied_input: InputVariantId,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct SignalDispatchOutcome {
218 pub route: RouteKey,
220 pub consumer: MachineInstanceId,
222 pub applied_signal: SignalVariantId,
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Error)]
232pub enum DispatchRefusal {
233 #[error("dispatcher composition {expected} does not match producer composition {actual}")]
235 CompositionMismatch {
236 expected: CompositionId,
237 actual: CompositionId,
238 },
239 #[error(
241 "no input route declared for producer {instance} effect variant {variant} in composition {composition}"
242 )]
243 UnresolvedRoute {
244 composition: CompositionId,
245 instance: MachineInstanceId,
246 variant: EffectVariantId,
247 },
248 #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
251 MissingProducerField {
252 route: RouteId,
253 variant: EffectVariantId,
254 field: FieldId,
255 },
256 #[error(
260 "no consumer surface registered for target instance {instance} in composition {composition}"
261 )]
262 UnwiredConsumer {
263 composition: CompositionId,
264 instance: MachineInstanceId,
265 },
266 #[error("consumer {instance} refused input {variant}: {reason}")]
271 ConsumerRefused {
272 instance: MachineInstanceId,
273 variant: InputVariantId,
274 reason: String,
275 },
276}
277
278#[derive(Debug, Clone, PartialEq, Eq, Error)]
280pub enum SignalDispatchRefusal {
281 #[error("dispatcher composition {expected} does not match producer composition {actual}")]
283 CompositionMismatch {
284 expected: CompositionId,
285 actual: CompositionId,
286 },
287 #[error(
289 "no signal route declared for producer {instance} variant {variant} in composition {composition}"
290 )]
291 UnresolvedRoute {
292 composition: CompositionId,
293 instance: MachineInstanceId,
294 variant: EffectVariantId,
295 },
296 #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
299 MissingProducerField {
300 route: RouteId,
301 variant: EffectVariantId,
302 field: FieldId,
303 },
304 #[error(
307 "no signal consumer surface registered for target instance {instance} in composition {composition}"
308 )]
309 UnwiredConsumer {
310 composition: CompositionId,
311 instance: MachineInstanceId,
312 },
313 #[error("consumer {instance} refused signal {variant}: {reason}")]
315 ConsumerRefused {
316 instance: MachineInstanceId,
317 variant: SignalVariantId,
318 reason: String,
319 },
320}
321
322#[async_trait]
331pub trait ConsumerSurface: Send + Sync {
332 fn instance_id(&self) -> &MachineInstanceId;
335
336 async fn apply_routed_input(
341 &self,
342 variant: InputVariantId,
343 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
344 ) -> Result<(), String>;
345}
346
347#[async_trait]
349pub trait SignalConsumerSurface: Send + Sync {
350 fn instance_id(&self) -> &MachineInstanceId;
352
353 async fn receive_signal(
355 &self,
356 variant: SignalVariantId,
357 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
358 ) -> Result<(), String>;
359}
360
361#[derive(Debug, Clone)]
366pub enum OwnedFieldValue {
367 Str(String),
368 U64(u64),
369 I64(i64),
370 Bool(bool),
371 Opaque(Arc<dyn std::any::Any + Send + Sync>),
372}
373
374impl FieldValue<'_> {
375 pub fn to_owned_value(&self) -> OwnedFieldValue {
379 match self {
380 FieldValue::Str(s) => OwnedFieldValue::Str((*s).to_owned()),
381 FieldValue::U64(v) => OwnedFieldValue::U64(*v),
382 FieldValue::I64(v) => OwnedFieldValue::I64(*v),
383 FieldValue::Bool(v) => OwnedFieldValue::Bool(*v),
384 FieldValue::Opaque(handle) => OwnedFieldValue::Opaque(Arc::clone(handle)),
385 }
386 }
387}
388
389#[async_trait]
397pub trait CompositionDispatcher: Send + Sync {
398 type Effect: ProducerEffect;
401
402 fn composition(&self) -> &CompositionId;
405
406 async fn dispatch(
409 &self,
410 producer: ProducerInstance,
411 effect: EffectPayload<Self::Effect>,
412 ) -> Result<DispatchOutcome, DispatchRefusal>;
413}
414
415#[async_trait]
417pub trait CompositionSignalDispatcher: Send + Sync {
418 type Signal: ProducerSignal;
420
421 fn composition(&self) -> &CompositionId;
423
424 async fn dispatch_signal(
427 &self,
428 producer: ProducerInstance,
429 signal: SignalPayload<Self::Signal>,
430 ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal>;
431}
432
433pub trait ContextProvider<E: ProducerEffect>: Send + Sync {
457 fn provide_context(
470 &self,
471 producer: &ProducerInstance,
472 effect: &EffectPayload<E>,
473 ) -> Vec<(FieldId, OwnedFieldValue)>;
474}
475
476pub enum CompositionBinding<E: ProducerEffect> {
500 Standalone,
503 Wired(Arc<dyn CompositionDispatcher<Effect = E>>),
507 OwnerProvided {
513 dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
514 context: Arc<dyn ContextProvider<E>>,
515 },
516}
517
518impl<E: ProducerEffect> fmt::Debug for CompositionBinding<E> {
519 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
520 match self {
521 Self::Standalone => f.debug_struct("CompositionBinding::Standalone").finish(),
522 Self::Wired(_) => f
523 .debug_struct("CompositionBinding::Wired")
524 .field("dispatcher", &"<dyn CompositionDispatcher>")
525 .finish(),
526 Self::OwnerProvided { .. } => f
527 .debug_struct("CompositionBinding::OwnerProvided")
528 .field("dispatcher", &"<dyn CompositionDispatcher>")
529 .field("context", &"<dyn ContextProvider>")
530 .finish(),
531 }
532 }
533}
534
535impl<E: ProducerEffect> CompositionBinding<E> {
536 pub fn standalone() -> Self {
543 Self::Standalone
544 }
545
546 pub fn wired_with(dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>) -> Self {
552 Self::Wired(dispatcher)
553 }
554
555 pub fn owner_provided(
563 dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
564 context: Arc<dyn ContextProvider<E>>,
565 ) -> Self {
566 Self::OwnerProvided {
567 dispatcher,
568 context,
569 }
570 }
571
572 pub fn is_standalone(&self) -> bool {
574 matches!(self, Self::Standalone)
575 }
576
577 pub fn wired(&self) -> Option<&Arc<dyn CompositionDispatcher<Effect = E>>> {
586 match self {
587 Self::Standalone => None,
588 Self::Wired(d) => Some(d),
589 Self::OwnerProvided { dispatcher, .. } => Some(dispatcher),
590 }
591 }
592
593 pub fn context_provider(&self) -> Option<&Arc<dyn ContextProvider<E>>> {
602 match self {
603 Self::Standalone | Self::Wired(_) => None,
604 Self::OwnerProvided { context, .. } => Some(context),
605 }
606 }
607}
608
609pub struct CatalogCompositionDispatcher<E: ProducerEffect> {
624 composition: CompositionId,
625 table: RouteTable,
626 consumers: HashMap<MachineInstanceId, Arc<dyn ConsumerSurface>>,
627 _effect: std::marker::PhantomData<fn(E)>,
628}
629
630pub struct CatalogCompositionSignalDispatcher<S: ProducerSignal> {
636 composition: CompositionId,
637 table: RouteTable,
638 consumers: HashMap<MachineInstanceId, Arc<dyn SignalConsumerSurface>>,
639 _signal: std::marker::PhantomData<fn(S)>,
640}
641
642impl<S: ProducerSignal> fmt::Debug for CatalogCompositionSignalDispatcher<S> {
643 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
644 f.debug_struct("CatalogCompositionSignalDispatcher")
645 .field("composition", &self.composition)
646 .field("signal_routes", &self.table.signal_route_count())
647 .field("consumers", &self.consumers.len())
648 .finish()
649 }
650}
651
652impl<E: ProducerEffect> fmt::Debug for CatalogCompositionDispatcher<E> {
653 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
654 f.debug_struct("CatalogCompositionDispatcher")
655 .field("composition", &self.composition)
656 .field("routes", &self.table.len())
657 .field("consumers", &self.consumers.len())
658 .finish()
659 }
660}
661
662impl<E: ProducerEffect> CatalogCompositionDispatcher<E> {
663 pub fn new(composition: CompositionId, table: RouteTable) -> Self {
666 Self {
667 composition,
668 table,
669 consumers: HashMap::new(),
670 _effect: std::marker::PhantomData,
671 }
672 }
673
674 pub fn with_consumer(mut self, surface: Arc<dyn ConsumerSurface>) -> Self {
681 self.consumers
682 .insert(surface.instance_id().clone(), surface);
683 self
684 }
685}
686
687impl<S: ProducerSignal> CatalogCompositionSignalDispatcher<S> {
688 pub fn new(composition: CompositionId, table: RouteTable) -> Self {
690 Self {
691 composition,
692 table,
693 consumers: HashMap::new(),
694 _signal: std::marker::PhantomData,
695 }
696 }
697
698 pub fn with_consumer(mut self, surface: Arc<dyn SignalConsumerSurface>) -> Self {
700 self.consumers
701 .insert(surface.instance_id().clone(), surface);
702 self
703 }
704}
705
706#[async_trait]
707impl<E: ProducerEffect> CompositionDispatcher for CatalogCompositionDispatcher<E> {
708 type Effect = E;
709
710 fn composition(&self) -> &CompositionId {
711 &self.composition
712 }
713
714 async fn dispatch(
715 &self,
716 producer: ProducerInstance,
717 effect: EffectPayload<Self::Effect>,
718 ) -> Result<DispatchOutcome, DispatchRefusal> {
719 if producer.composition != self.composition {
720 return Err(DispatchRefusal::CompositionMismatch {
721 expected: self.composition.clone(),
722 actual: producer.composition,
723 });
724 }
725
726 let variant = effect.variant().clone();
727 let body = effect.body();
728
729 let descriptor = self
730 .table
731 .resolve(&producer.instance_id, &variant)
732 .ok_or_else(|| DispatchRefusal::UnresolvedRoute {
733 composition: self.composition.clone(),
734 instance: producer.instance_id.clone(),
735 variant: variant.clone(),
736 })?;
737
738 let mut projected: Vec<(FieldId, OwnedFieldValue)> =
739 Vec::with_capacity(descriptor.bindings.len());
740 for (from_field, to_field) in &descriptor.bindings {
741 let value =
742 body.field(from_field)
743 .ok_or_else(|| DispatchRefusal::MissingProducerField {
744 route: descriptor.route_id.clone(),
745 variant: variant.clone(),
746 field: from_field.clone(),
747 })?;
748 projected.push((to_field.clone(), value.to_owned_value()));
749 }
750
751 let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
752 DispatchRefusal::UnwiredConsumer {
753 composition: self.composition.clone(),
754 instance: descriptor.instance_id.clone(),
755 }
756 })?;
757
758 consumer
759 .apply_routed_input(descriptor.input_variant.clone(), projected)
760 .await
761 .map_err(|reason| DispatchRefusal::ConsumerRefused {
762 instance: descriptor.instance_id.clone(),
763 variant: descriptor.input_variant.clone(),
764 reason,
765 })?;
766
767 Ok(DispatchOutcome {
768 route: RouteKey {
769 composition: self.composition.clone(),
770 route_id: descriptor.route_id.clone(),
771 },
772 consumer: descriptor.instance_id.clone(),
773 applied_input: descriptor.input_variant.clone(),
774 })
775 }
776}
777
778#[async_trait]
779impl<S: ProducerSignal> CompositionSignalDispatcher for CatalogCompositionSignalDispatcher<S> {
780 type Signal = S;
781
782 fn composition(&self) -> &CompositionId {
783 &self.composition
784 }
785
786 async fn dispatch_signal(
787 &self,
788 producer: ProducerInstance,
789 signal: SignalPayload<Self::Signal>,
790 ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal> {
791 if producer.composition != self.composition {
792 return Err(SignalDispatchRefusal::CompositionMismatch {
793 expected: self.composition.clone(),
794 actual: producer.composition,
795 });
796 }
797
798 let variant = signal.variant().clone();
799 let body = signal.body();
800
801 let descriptor = self
802 .table
803 .resolve_signal(&producer.instance_id, &variant)
804 .ok_or_else(|| SignalDispatchRefusal::UnresolvedRoute {
805 composition: self.composition.clone(),
806 instance: producer.instance_id.clone(),
807 variant: variant.clone(),
808 })?;
809
810 let mut projected: Vec<(FieldId, OwnedFieldValue)> =
811 Vec::with_capacity(descriptor.bindings.len());
812 for (from_field, to_field) in &descriptor.bindings {
813 let value = body.field(from_field).ok_or_else(|| {
814 SignalDispatchRefusal::MissingProducerField {
815 route: descriptor.route_id.clone(),
816 variant: variant.clone(),
817 field: from_field.clone(),
818 }
819 })?;
820 projected.push((to_field.clone(), value.to_owned_value()));
821 }
822
823 let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
824 SignalDispatchRefusal::UnwiredConsumer {
825 composition: self.composition.clone(),
826 instance: descriptor.instance_id.clone(),
827 }
828 })?;
829
830 consumer
831 .receive_signal(descriptor.signal_variant.clone(), projected)
832 .await
833 .map_err(|reason| SignalDispatchRefusal::ConsumerRefused {
834 instance: descriptor.instance_id.clone(),
835 variant: descriptor.signal_variant.clone(),
836 reason,
837 })?;
838
839 Ok(SignalDispatchOutcome {
840 route: RouteKey {
841 composition: self.composition.clone(),
842 route_id: descriptor.route_id.clone(),
843 },
844 consumer: descriptor.instance_id.clone(),
845 applied_signal: descriptor.signal_variant.clone(),
846 })
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
854
855 #[derive(Debug, Clone, PartialEq, Eq)]
861 enum SeamEffect {
862 Mob(MobEffect),
863 }
864
865 #[derive(Debug, Clone, PartialEq, Eq)]
866 enum MobEffect {
867 RequestRuntimeBinding {
868 agent_runtime_id: String,
869 fence_token: u64,
870 generation: u64,
871 session_id: String,
872 },
873 }
874
875 impl ProducerEffect for SeamEffect {
876 fn variant_id(&self) -> EffectVariantId {
877 match self {
878 Self::Mob(MobEffect::RequestRuntimeBinding { .. }) => {
879 EffectVariantId::parse("RequestRuntimeBinding").expect("slug")
880 }
881 }
882 }
883
884 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
885 match self {
886 Self::Mob(MobEffect::RequestRuntimeBinding {
887 agent_runtime_id,
888 fence_token,
889 generation,
890 session_id,
891 }) => match id.as_str() {
892 "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
893 "fence_token" => Some(FieldValue::U64(*fence_token)),
894 "generation" => Some(FieldValue::U64(*generation)),
895 "session_id" => Some(FieldValue::Str(session_id)),
896 _ => None,
897 },
898 }
899 }
900 }
901
902 #[allow(clippy::enum_variant_names)]
906 #[derive(Debug, Clone, PartialEq, Eq)]
907 enum SeamSignal {
908 RuntimeBound {
909 agent_runtime_id: String,
910 fence_token: u64,
911 },
912 RuntimeRetired {
913 agent_runtime_id: String,
914 fence_token: u64,
915 },
916 RuntimeDestroyed {
917 agent_runtime_id: String,
918 fence_token: u64,
919 },
920 }
921
922 impl ProducerSignal for SeamSignal {
923 fn variant_id(&self) -> EffectVariantId {
924 let slug = match self {
925 Self::RuntimeBound { .. } => "RuntimeBound",
926 Self::RuntimeRetired { .. } => "RuntimeRetired",
927 Self::RuntimeDestroyed { .. } => "RuntimeDestroyed",
928 };
929 EffectVariantId::parse(slug).expect("signal source slug")
930 }
931
932 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
933 let (agent_runtime_id, fence_token) = match self {
934 Self::RuntimeBound {
935 agent_runtime_id,
936 fence_token,
937 }
938 | Self::RuntimeRetired {
939 agent_runtime_id,
940 fence_token,
941 }
942 | Self::RuntimeDestroyed {
943 agent_runtime_id,
944 fence_token,
945 } => (agent_runtime_id, fence_token),
946 };
947 match id.as_str() {
948 "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
949 "fence_token" => Some(FieldValue::U64(*fence_token)),
950 _ => None,
951 }
952 }
953 }
954
955 #[derive(Default)]
956 struct RecordingMeerkatSurface {
957 log: tokio::sync::Mutex<Vec<(InputVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
958 }
959
960 #[async_trait]
961 impl ConsumerSurface for RecordingMeerkatSurface {
962 fn instance_id(&self) -> &MachineInstanceId {
963 static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
966 ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
967 }
968
969 async fn apply_routed_input(
970 &self,
971 variant: InputVariantId,
972 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
973 ) -> Result<(), String> {
974 self.log.lock().await.push((variant, projected_fields));
975 Ok(())
976 }
977 }
978
979 #[derive(Default)]
980 struct RecordingMobSignalSurface {
981 log: tokio::sync::Mutex<Vec<(SignalVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
982 }
983
984 #[async_trait]
985 impl SignalConsumerSurface for RecordingMobSignalSurface {
986 fn instance_id(&self) -> &MachineInstanceId {
987 static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
988 ID.get_or_init(|| MachineInstanceId::parse("mob").unwrap())
989 }
990
991 async fn receive_signal(
992 &self,
993 variant: SignalVariantId,
994 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
995 ) -> Result<(), String> {
996 self.log.lock().await.push((variant, projected_fields));
997 Ok(())
998 }
999 }
1000
1001 fn mob_producer() -> ProducerInstance {
1002 ProducerInstance {
1003 composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1004 instance_id: MachineInstanceId::parse("mob").unwrap(),
1005 machine: MachineId::parse("MobMachine").unwrap(),
1006 }
1007 }
1008
1009 fn meerkat_producer() -> ProducerInstance {
1010 ProducerInstance {
1011 composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1012 instance_id: MachineInstanceId::parse("meerkat").unwrap(),
1013 machine: MachineId::parse("MeerkatMachine").unwrap(),
1014 }
1015 }
1016
1017 fn sample_effect() -> EffectPayload<SeamEffect> {
1018 EffectPayload::Emitted {
1019 variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1020 body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1021 agent_runtime_id: "rt-1".into(),
1022 fence_token: 7,
1023 generation: 3,
1024 session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1025 }),
1026 }
1027 }
1028
1029 fn build_dispatcher(
1030 consumer: Arc<RecordingMeerkatSurface>,
1031 ) -> CatalogCompositionDispatcher<SeamEffect> {
1032 let schema = meerkat_mob_seam_composition();
1033 let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1034 CatalogCompositionDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1035 }
1036
1037 fn sample_signal() -> SignalPayload<SeamSignal> {
1038 let body = SeamSignal::RuntimeBound {
1039 agent_runtime_id: "rt-1".into(),
1040 fence_token: 7,
1041 };
1042 SignalPayload::Emitted {
1043 variant: body.variant_id(),
1044 body,
1045 }
1046 }
1047
1048 fn build_signal_dispatcher(
1049 consumer: Arc<RecordingMobSignalSurface>,
1050 ) -> CatalogCompositionSignalDispatcher<SeamSignal> {
1051 let schema = meerkat_mob_seam_composition();
1052 let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1053 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1054 }
1055
1056 #[tokio::test]
1057 async fn dispatches_mob_routed_effect_to_meerkat_consumer() {
1058 let consumer = Arc::new(RecordingMeerkatSurface::default());
1059 let dispatcher = build_dispatcher(Arc::clone(&consumer));
1060
1061 let outcome = dispatcher
1062 .dispatch(mob_producer(), sample_effect())
1063 .await
1064 .expect("well-formed routed effect");
1065
1066 assert_eq!(outcome.consumer.as_str(), "meerkat");
1067 assert_eq!(outcome.applied_input.as_str(), "PrepareBindings");
1068 assert_eq!(
1069 outcome.route.route_id.as_str(),
1070 "binding_request_reaches_meerkat"
1071 );
1072
1073 let log = consumer.log.lock().await;
1074 assert_eq!(
1075 log.len(),
1076 1,
1077 "dispatcher must call the consumer exactly once"
1078 );
1079 let (variant, fields) = &log[0];
1080 assert_eq!(variant.as_str(), "PrepareBindings");
1081 let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1082 assert_eq!(
1083 field_names,
1084 vec![
1085 "agent_runtime_id",
1086 "fence_token",
1087 "generation",
1088 "session_id"
1089 ]
1090 );
1091 match &fields[0].1 {
1092 OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1093 other => panic!("expected Str, got {other:?}"),
1094 }
1095 match &fields[1].1 {
1096 OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1097 other => panic!("expected U64, got {other:?}"),
1098 }
1099 match &fields[2].1 {
1100 OwnedFieldValue::U64(v) => assert_eq!(*v, 3),
1101 other => panic!("expected U64, got {other:?}"),
1102 }
1103 match &fields[3].1 {
1104 OwnedFieldValue::Str(s) => assert_eq!(s, "019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
1105 other => panic!("expected Str for session_id, got {other:?}"),
1106 }
1107 }
1108
1109 #[tokio::test]
1110 async fn dispatches_meerkat_routed_signal_to_mob_consumer() {
1111 let consumer = Arc::new(RecordingMobSignalSurface::default());
1112 let dispatcher = build_signal_dispatcher(Arc::clone(&consumer));
1113
1114 let outcome = dispatcher
1115 .dispatch_signal(meerkat_producer(), sample_signal())
1116 .await
1117 .expect("well-formed routed signal");
1118
1119 assert_eq!(outcome.consumer.as_str(), "mob");
1120 assert_eq!(outcome.applied_signal.as_str(), "ObserveRuntimeReady");
1121 assert_eq!(outcome.route.route_id.as_str(), "runtime_bound_reaches_mob");
1122
1123 let log = consumer.log.lock().await;
1124 assert_eq!(
1125 log.len(),
1126 1,
1127 "dispatcher must call the signal consumer exactly once"
1128 );
1129 let (variant, fields) = &log[0];
1130 assert_eq!(variant.as_str(), "ObserveRuntimeReady");
1131 let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1132 assert_eq!(field_names, vec!["agent_runtime_id", "fence_token"]);
1133 match &fields[0].1 {
1134 OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1135 other => panic!("expected Str, got {other:?}"),
1136 }
1137 match &fields[1].1 {
1138 OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1139 other => panic!("expected U64, got {other:?}"),
1140 }
1141 }
1142
1143 #[tokio::test]
1144 async fn signal_dispatch_refuses_input_route_typed() {
1145 let consumer = Arc::new(RecordingMobSignalSurface::default());
1146 let dispatcher = build_signal_dispatcher(consumer);
1147
1148 let payload = SignalPayload::Emitted {
1149 variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1150 body: SeamSignal::RuntimeBound {
1151 agent_runtime_id: "rt-1".into(),
1152 fence_token: 7,
1153 },
1154 };
1155
1156 let err = dispatcher
1157 .dispatch_signal(mob_producer(), payload)
1158 .await
1159 .expect_err("input route is out of the signal surface");
1160
1161 assert!(matches!(err, SignalDispatchRefusal::UnresolvedRoute { .. }));
1162 }
1163
1164 #[tokio::test]
1165 async fn signal_dispatch_refuses_unwired_consumer_typed() {
1166 let schema = meerkat_mob_seam_composition();
1167 let table = RouteTable::from_schema(&schema).unwrap();
1168 let dispatcher: CatalogCompositionSignalDispatcher<SeamSignal> =
1169 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table);
1170
1171 let err = dispatcher
1172 .dispatch_signal(meerkat_producer(), sample_signal())
1173 .await
1174 .expect_err("unwired signal consumer");
1175
1176 assert!(matches!(err, SignalDispatchRefusal::UnwiredConsumer { .. }));
1177 }
1178
1179 #[tokio::test]
1180 async fn signal_dispatch_refuses_missing_field_typed() {
1181 #[derive(Debug)]
1182 struct BrokenSignal;
1183
1184 impl ProducerSignal for BrokenSignal {
1185 fn variant_id(&self) -> EffectVariantId {
1186 EffectVariantId::parse("RuntimeBound").unwrap()
1187 }
1188
1189 fn field(&self, _id: &FieldId) -> Option<FieldValue<'_>> {
1190 None
1191 }
1192 }
1193
1194 let schema = meerkat_mob_seam_composition();
1195 let table = RouteTable::from_schema(&schema).unwrap();
1196 let consumer = Arc::new(RecordingMobSignalSurface::default());
1197 let dispatcher: CatalogCompositionSignalDispatcher<BrokenSignal> =
1198 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
1199 .with_consumer(consumer);
1200
1201 let err = dispatcher
1202 .dispatch_signal(
1203 meerkat_producer(),
1204 SignalPayload::Emitted {
1205 variant: EffectVariantId::parse("RuntimeBound").unwrap(),
1206 body: BrokenSignal,
1207 },
1208 )
1209 .await
1210 .expect_err("missing producer field");
1211
1212 assert!(matches!(
1213 err,
1214 SignalDispatchRefusal::MissingProducerField { .. }
1215 ));
1216 }
1217
1218 #[tokio::test]
1219 async fn refuses_mismatched_composition() {
1220 let consumer = Arc::new(RecordingMeerkatSurface::default());
1221 let dispatcher = build_dispatcher(consumer);
1222
1223 let mut wrong = mob_producer();
1224 wrong.composition = CompositionId::parse("some_other_composition").unwrap();
1225
1226 let err = dispatcher
1227 .dispatch(wrong, sample_effect())
1228 .await
1229 .expect_err("composition mismatch");
1230
1231 assert!(matches!(err, DispatchRefusal::CompositionMismatch { .. }));
1232 }
1233
1234 #[tokio::test]
1235 async fn refuses_unrouted_effect_typed() {
1236 let consumer = Arc::new(RecordingMeerkatSurface::default());
1237 let dispatcher = build_dispatcher(consumer);
1238
1239 let payload = EffectPayload::Emitted {
1243 variant: EffectVariantId::parse("UnknownEffect").unwrap(),
1244 body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1245 agent_runtime_id: "rt".into(),
1246 fence_token: 0,
1247 generation: 0,
1248 session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1249 }),
1250 };
1251
1252 let err = dispatcher
1253 .dispatch(mob_producer(), payload)
1254 .await
1255 .expect_err("unresolved route");
1256
1257 assert!(matches!(err, DispatchRefusal::UnresolvedRoute { .. }));
1258 }
1259
1260 #[tokio::test]
1261 async fn refuses_unwired_consumer_typed() {
1262 let schema = meerkat_mob_seam_composition();
1266 let table = RouteTable::from_schema(&schema).unwrap();
1267 let dispatcher: CatalogCompositionDispatcher<SeamEffect> =
1268 CatalogCompositionDispatcher::new(schema.name.clone(), table);
1269
1270 let err = dispatcher
1271 .dispatch(mob_producer(), sample_effect())
1272 .await
1273 .expect_err("unwired consumer");
1274
1275 assert!(matches!(err, DispatchRefusal::UnwiredConsumer { .. }));
1276 }
1277
1278 #[tokio::test]
1279 async fn standalone_binding_has_no_dispatcher() {
1280 let binding: CompositionBinding<SeamEffect> = CompositionBinding::Standalone;
1281 assert!(binding.is_standalone());
1282 assert!(binding.wired().is_none());
1283 }
1284
1285 #[tokio::test]
1286 async fn wired_binding_exposes_dispatcher() {
1287 let consumer = Arc::new(RecordingMeerkatSurface::default());
1288 let dispatcher = Arc::new(build_dispatcher(consumer));
1289 let binding: CompositionBinding<SeamEffect> = CompositionBinding::Wired(dispatcher);
1290 assert!(!binding.is_standalone());
1291 assert!(binding.wired().is_some());
1292 assert!(
1293 binding.context_provider().is_none(),
1294 "plain Wired binding has no owner-supplied context"
1295 );
1296 }
1297
1298 struct PinnedSessionContext {
1304 session_id: String,
1305 }
1306
1307 impl ContextProvider<SeamEffect> for PinnedSessionContext {
1308 fn provide_context(
1309 &self,
1310 _producer: &ProducerInstance,
1311 _effect: &EffectPayload<SeamEffect>,
1312 ) -> Vec<(FieldId, OwnedFieldValue)> {
1313 vec![(
1314 FieldId::parse("session_id").expect("field id"),
1315 OwnedFieldValue::Str(self.session_id.clone()),
1316 )]
1317 }
1318 }
1319
1320 #[tokio::test]
1321 async fn owner_provided_binding_exposes_both_dispatcher_and_context() {
1322 let consumer = Arc::new(RecordingMeerkatSurface::default());
1323 let dispatcher = Arc::new(build_dispatcher(consumer));
1324 let context = Arc::new(PinnedSessionContext {
1325 session_id: "session-abc".into(),
1326 });
1327 let binding: CompositionBinding<SeamEffect> =
1328 CompositionBinding::owner_provided(dispatcher, context);
1329
1330 assert!(!binding.is_standalone());
1331 assert!(
1332 binding.wired().is_some(),
1333 "OwnerProvided is a superset of Wired for dispatcher access"
1334 );
1335 assert!(
1336 binding.context_provider().is_some(),
1337 "OwnerProvided must expose the owner-supplied context"
1338 );
1339
1340 let provider = binding.context_provider().expect("context provider");
1345 let producer = mob_producer();
1346 let effect = sample_effect();
1347 let fields = provider.provide_context(&producer, &effect);
1348 assert_eq!(fields.len(), 1);
1349 assert_eq!(fields[0].0.as_str(), "session_id");
1350 match &fields[0].1 {
1351 OwnedFieldValue::Str(s) => assert_eq!(s, "session-abc"),
1352 other => panic!("expected Str context field, got {other:?}"),
1353 }
1354 }
1355
1356 #[tokio::test]
1357 async fn composition_binding_constructors_parallel_machine_halves() {
1358 let standalone: CompositionBinding<SeamEffect> = CompositionBinding::standalone();
1364 assert!(standalone.is_standalone());
1365 assert!(standalone.wired().is_none());
1366 assert!(standalone.context_provider().is_none());
1367
1368 let consumer = Arc::new(RecordingMeerkatSurface::default());
1369 let dispatcher: Arc<dyn CompositionDispatcher<Effect = SeamEffect>> =
1370 Arc::new(build_dispatcher(consumer));
1371 let wired: CompositionBinding<SeamEffect> =
1372 CompositionBinding::wired_with(Arc::clone(&dispatcher));
1373 assert!(!wired.is_standalone());
1374 assert!(wired.wired().is_some());
1375 assert!(wired.context_provider().is_none());
1376
1377 let context = Arc::new(PinnedSessionContext {
1378 session_id: "session-xyz".into(),
1379 });
1380 let owner_provided: CompositionBinding<SeamEffect> =
1381 CompositionBinding::owner_provided(dispatcher, context);
1382 assert!(!owner_provided.is_standalone());
1383 assert!(owner_provided.wired().is_some());
1384 assert!(owner_provided.context_provider().is_some());
1385 }
1386}