1use std::sync::{Arc, OnceLock};
37
38use async_trait::async_trait;
39use meerkat_core::types::SessionId;
40use meerkat_machine_schema::identity::{
41 EffectVariantId, FieldId, InputVariantId, MachineInstanceId,
42};
43
44use crate::composition::{
45 CompositionSignalDispatcher, FieldValue, ProducerInstance, ProducerSignal, SignalPayload,
46};
47use crate::composition::{ConsumerSurface, OwnedFieldValue, SignalDispatchOutcome};
48use crate::generated::meerkat_mob_seam as seam_facts;
49use crate::meerkat_machine::{MeerkatMachine, dsl as mm_dsl};
50
51pub struct MeerkatConsumerSurface {
67 machine: Arc<MeerkatMachine>,
68 pinned_session: Option<SessionId>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum MeerkatSeamSignal {
80 RuntimeBound {
81 agent_runtime_id: mm_dsl::AgentRuntimeId,
82 fence_token: mm_dsl::FenceToken,
83 },
84 RuntimeRetired {
85 agent_runtime_id: mm_dsl::AgentRuntimeId,
86 fence_token: mm_dsl::FenceToken,
87 },
88 RuntimeDestroyed {
89 agent_runtime_id: mm_dsl::AgentRuntimeId,
90 fence_token: mm_dsl::FenceToken,
91 },
92}
93
94impl MeerkatSeamSignal {
95 pub fn variant_id(&self) -> EffectVariantId {
96 match self {
97 Self::RuntimeBound { .. } => seam_facts::effects::meerkat::runtime_bound(),
98 Self::RuntimeRetired { .. } => seam_facts::effects::meerkat::runtime_retired(),
99 Self::RuntimeDestroyed { .. } => seam_facts::effects::meerkat::runtime_destroyed(),
100 }
101 }
102
103 pub fn generated_signal_route(&self) -> Option<seam_facts::TypedRoutedSignal> {
104 seam_facts::route_to_signal(
105 &seam_facts::producers::meerkat_instance_id(),
106 &self.variant_id(),
107 )
108 }
109
110 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
111 let (agent_runtime_id, fence_token) = match self {
112 Self::RuntimeBound {
113 agent_runtime_id,
114 fence_token,
115 }
116 | Self::RuntimeRetired {
117 agent_runtime_id,
118 fence_token,
119 }
120 | Self::RuntimeDestroyed {
121 agent_runtime_id,
122 fence_token,
123 } => (agent_runtime_id, fence_token),
124 };
125 if id == &seam_facts::fields::agent_runtime_id() {
126 Some(FieldValue::Str(agent_runtime_id.0.as_str()))
127 } else if id == &seam_facts::fields::fence_token() {
128 Some(FieldValue::U64(fence_token.0))
129 } else {
130 None
131 }
132 }
133}
134
135impl ProducerSignal for MeerkatSeamSignal {
136 fn variant_id(&self) -> EffectVariantId {
137 self.variant_id()
138 }
139
140 fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
141 self.field(id)
142 }
143}
144
145pub type MeerkatCompositionSignalDispatcher =
146 Arc<dyn CompositionSignalDispatcher<Signal = MeerkatSeamSignal>>;
147
148pub fn meerkat_producer_instance() -> ProducerInstance {
149 let producer = seam_facts::producers::meerkat();
150 ProducerInstance {
151 composition: seam_facts::composition_id(),
152 instance_id: producer.instance_id,
153 machine: producer.machine,
154 }
155}
156
157pub fn lift_routed_signal(effect: &mm_dsl::MeerkatMachineEffect) -> Option<MeerkatSeamSignal> {
158 match effect {
159 mm_dsl::MeerkatMachineEffect::RuntimeBound {
160 agent_runtime_id,
161 fence_token,
162 } => Some(MeerkatSeamSignal::RuntimeBound {
163 agent_runtime_id: agent_runtime_id.clone(),
164 fence_token: *fence_token,
165 }),
166 mm_dsl::MeerkatMachineEffect::RuntimeRetired {
167 agent_runtime_id,
168 fence_token,
169 } => Some(MeerkatSeamSignal::RuntimeRetired {
170 agent_runtime_id: agent_runtime_id.clone(),
171 fence_token: *fence_token,
172 }),
173 mm_dsl::MeerkatMachineEffect::RuntimeDestroyed {
174 agent_runtime_id,
175 fence_token,
176 } => Some(MeerkatSeamSignal::RuntimeDestroyed {
177 agent_runtime_id: agent_runtime_id.clone(),
178 fence_token: *fence_token,
179 }),
180 _ => None,
181 }
182}
183
184pub async fn dispatch_routed_signal(
185 dispatcher: &MeerkatCompositionSignalDispatcher,
186 signal: MeerkatSeamSignal,
187) -> Result<SignalDispatchOutcome, String> {
188 let variant = signal.variant_id();
189 dispatcher
190 .dispatch_signal(
191 meerkat_producer_instance(),
192 SignalPayload::Emitted {
193 variant,
194 body: signal,
195 },
196 )
197 .await
198 .map_err(|refusal| refusal.to_string())
199}
200
201impl std::fmt::Debug for MeerkatConsumerSurface {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 f.debug_struct("MeerkatConsumerSurface")
204 .field("pinned_session", &self.pinned_session)
205 .finish_non_exhaustive()
206 }
207}
208
209impl MeerkatConsumerSurface {
210 pub fn new(machine: Arc<MeerkatMachine>) -> Self {
213 Self {
214 machine,
215 pinned_session: None,
216 }
217 }
218
219 pub fn pinned(machine: Arc<MeerkatMachine>, session_id: SessionId) -> Self {
224 Self {
225 machine,
226 pinned_session: Some(session_id),
227 }
228 }
229
230 async fn resolve_session(
231 &self,
232 variant: &InputVariantId,
233 projected: &[(FieldId, OwnedFieldValue)],
234 ) -> Result<SessionId, String> {
235 let projected_session_id = projected
240 .iter()
241 .find(|(id, _)| id == &seam_facts::fields::session_id())
242 .and_then(|(_, v)| match v {
243 OwnedFieldValue::Str(s) => Some(s.clone()),
244 _ => None,
245 });
246
247 match (&self.pinned_session, projected_session_id) {
248 (Some(pinned), Some(sid)) if sid != pinned.to_string() => Err(format!(
249 "routed session_id `{sid}` does not match pinned session `{pinned}`"
250 )),
251 (Some(pinned), _) => Ok(pinned.clone()),
252 (None, Some(sid)) => SessionId::parse(&sid)
253 .map_err(|e| format!("routed session_id `{sid}` is not a valid UUID: {e}")),
254 (None, None) if variant == &seam_facts::inputs::ingest() => {
255 let runtime_id = project_str(projected, &seam_facts::fields::runtime_id())?;
256 self.machine
257 .resolve_registered_session_for_runtime_id(&mm_dsl::AgentRuntimeId::from(
258 runtime_id.to_string(),
259 ))
260 .await
261 }
262 (None, None) => Err(
263 "routed input did not project `session_id` and surface is not pinned \
264 to a session — no session can be resolved"
265 .into(),
266 ),
267 }
268 }
269}
270
271impl MeerkatMachine {
272 async fn resolve_registered_session_for_runtime_id(
273 &self,
274 runtime_id: &mm_dsl::AgentRuntimeId,
275 ) -> Result<SessionId, String> {
276 let sessions = self.sessions.read().await;
277 let mut matches = Vec::new();
278
279 for (session_id, entry) in sessions.iter() {
280 let authority = entry
281 .dsl_authority
282 .lock()
283 .unwrap_or_else(std::sync::PoisonError::into_inner);
284 if authority.state.active_runtime_id.as_ref() == Some(runtime_id) {
285 matches.push(session_id.clone());
286 }
287 }
288
289 match matches.len() {
290 0 => Err(format!(
291 "routed Ingest runtime_id `{}` did not match any registered session active_runtime_id",
292 runtime_id.0
293 )),
294 1 => Ok(matches.remove(0)),
295 count => Err(format!(
296 "routed Ingest runtime_id `{}` matched {count} registered sessions; refusing ambiguous delivery",
297 runtime_id.0
298 )),
299 }
300 }
301}
302
303#[allow(clippy::panic)]
304fn meerkat_instance_id() -> &'static MachineInstanceId {
305 static ID: OnceLock<MachineInstanceId> = OnceLock::new();
306 ID.get_or_init(seam_facts::producers::meerkat_instance_id)
307}
308
309fn project_u64(fields: &[(FieldId, OwnedFieldValue)], field: &FieldId) -> Result<u64, String> {
310 fields
311 .iter()
312 .find(|(id, _)| id == field)
313 .ok_or_else(|| format!("missing projected field `{}`", field.as_str()))
314 .and_then(|(_, v)| match v {
315 OwnedFieldValue::U64(n) => Ok(*n),
316 other => Err(format!(
317 "projected field `{}` is not U64: {other:?}",
318 field.as_str()
319 )),
320 })
321}
322
323fn project_str<'a>(
324 fields: &'a [(FieldId, OwnedFieldValue)],
325 field: &FieldId,
326) -> Result<&'a str, String> {
327 fields
328 .iter()
329 .find(|(id, _)| id == field)
330 .ok_or_else(|| format!("missing projected field `{}`", field.as_str()))
331 .and_then(|(_, v)| match v {
332 OwnedFieldValue::Str(s) => Ok(s.as_str()),
333 other => Err(format!(
334 "projected field `{}` is not Str: {other:?}",
335 field.as_str()
336 )),
337 })
338}
339
340fn project_work_origin(
341 fields: &[(FieldId, OwnedFieldValue)],
342 field: &FieldId,
343) -> Result<mm_dsl::WorkOrigin, String> {
344 fields
345 .iter()
346 .find(|(id, _)| id == field)
347 .ok_or_else(|| format!("missing projected field `{}`", field.as_str()))
348 .and_then(|(_, v)| match v {
349 OwnedFieldValue::Opaque(value) => value
350 .downcast_ref::<mm_dsl::WorkOrigin>()
351 .copied()
352 .ok_or_else(|| format!("projected field `{}` is not WorkOrigin", field.as_str())),
353 other => Err(format!(
354 "projected field `{}` is not WorkOrigin: {other:?}",
355 field.as_str()
356 )),
357 })
358}
359
360#[async_trait]
361impl ConsumerSurface for MeerkatConsumerSurface {
362 fn instance_id(&self) -> &MachineInstanceId {
363 meerkat_instance_id()
364 }
365
366 async fn apply_routed_input(
367 &self,
368 variant: InputVariantId,
369 projected: Vec<(FieldId, OwnedFieldValue)>,
370 ) -> Result<(), String> {
371 let session_id = self.resolve_session(&variant, &projected).await?;
372 let input = if variant == seam_facts::inputs::prepare_bindings() {
373 let rt = project_str(&projected, &seam_facts::fields::agent_runtime_id())?;
374 let fence = project_u64(&projected, &seam_facts::fields::fence_token())?;
375 let gen_ = project_u64(&projected, &seam_facts::fields::generation())?;
376 let sid = project_str(&projected, &seam_facts::fields::session_id())?;
377 mm_dsl::MeerkatMachineInput::PrepareBindings {
378 agent_runtime_id: mm_dsl::AgentRuntimeId::from(rt.to_string()),
379 fence_token: mm_dsl::FenceToken(fence),
380 generation: mm_dsl::Generation(gen_),
381 session_id: mm_dsl::SessionId::from(sid.to_string()),
382 }
383 } else if variant == seam_facts::inputs::ingest() {
384 let rt = project_str(&projected, &seam_facts::fields::runtime_id())?;
389 let work_id = project_str(&projected, &seam_facts::fields::work_id())?;
390 let origin = project_work_origin(&projected, &seam_facts::fields::origin())?;
391 mm_dsl::MeerkatMachineInput::Ingest {
392 runtime_id: mm_dsl::AgentRuntimeId::from(rt.to_string()),
393 work_id: mm_dsl::WorkId::from(work_id.to_string()),
394 origin,
395 }
396 } else if variant == seam_facts::inputs::retire() {
397 let sid = project_str(&projected, &seam_facts::fields::session_id())?;
398 mm_dsl::MeerkatMachineInput::Retire {
399 session_id: mm_dsl::SessionId::from(sid.to_string()),
400 }
401 } else if variant == seam_facts::inputs::destroy() {
402 let sid = project_str(&projected, &seam_facts::fields::session_id())?;
403 mm_dsl::MeerkatMachineInput::Destroy {
404 session_id: mm_dsl::SessionId::from(sid.to_string()),
405 }
406 } else {
407 return Err(format!(
408 "meerkat consumer surface does not accept routed input `{}`; \
409 only PrepareBindings/Ingest/Retire/Destroy are declared in the \
410 `meerkat_mob_seam` schema",
411 variant.as_str()
412 ));
413 };
414
415 self.machine
416 .apply_routed_meerkat_input(&session_id, input)
417 .await
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use crate::composition::{
425 CatalogCompositionSignalDispatcher, OwnedFieldValue, RouteTable, SignalConsumerSurface,
426 };
427 use meerkat_machine_schema::identity::SignalVariantId;
428
429 fn fld(slug: &str) -> FieldId {
430 FieldId::parse(slug).expect("field slug")
431 }
432
433 fn iv(slug: &str) -> InputVariantId {
434 InputVariantId::parse(slug).expect("input variant slug")
435 }
436
437 fn sid(slug: &str) -> SessionId {
438 SessionId::parse(slug).expect("session id")
439 }
440
441 async fn bind_runtime(
442 surface: &MeerkatConsumerSurface,
443 session_id: &SessionId,
444 runtime_id: &str,
445 ) {
446 surface
447 .apply_routed_input(
448 iv("PrepareBindings"),
449 vec![
450 (
451 fld("agent_runtime_id"),
452 OwnedFieldValue::Str(runtime_id.into()),
453 ),
454 (fld("fence_token"), OwnedFieldValue::U64(1)),
455 (fld("generation"), OwnedFieldValue::U64(0)),
456 (
457 fld("session_id"),
458 OwnedFieldValue::Str(session_id.to_string()),
459 ),
460 ],
461 )
462 .await
463 .expect("bind runtime");
464 }
465
466 #[tokio::test]
467 async fn prepare_bindings_requires_all_three_fields() {
468 let machine = Arc::new(MeerkatMachine::ephemeral());
469 let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
470 let err = surface
471 .apply_routed_input(
472 iv("PrepareBindings"),
473 vec![
474 (fld("agent_runtime_id"), OwnedFieldValue::Str("rt-1".into())),
475 (fld("generation"), OwnedFieldValue::U64(3)),
477 (
478 fld("session_id"),
479 OwnedFieldValue::Str("00000000-0000-0000-0000-000000000001".into()),
480 ),
481 ],
482 )
483 .await
484 .expect_err("missing fence_token");
485 assert!(err.contains("fence_token"), "{err}");
486 }
487
488 #[tokio::test]
489 async fn unknown_variant_is_refused_typed() {
490 let machine = Arc::new(MeerkatMachine::ephemeral());
491 let pinned =
495 SessionId::parse("00000000-0000-0000-0000-000000000001").expect("uuid literal");
496 let surface = MeerkatConsumerSurface::pinned(Arc::clone(&machine), pinned);
497 let err = surface
498 .apply_routed_input(iv("Recycle"), vec![])
499 .await
500 .expect_err("Recycle is not a routed variant");
501 assert!(err.contains("Recycle"), "{err}");
502 }
503
504 #[tokio::test]
505 async fn unpinned_surface_requires_projected_session_id_for_retire() {
506 let machine = Arc::new(MeerkatMachine::ephemeral());
507 let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
508 let err = surface
512 .apply_routed_input(iv("Retire"), vec![])
513 .await
514 .expect_err("Retire without target");
515 assert!(err.contains("session_id"), "{err}");
516 }
517
518 #[tokio::test]
519 async fn pinned_surface_rejects_mismatched_session_id() {
520 let machine = Arc::new(MeerkatMachine::ephemeral());
521 let pinned =
522 SessionId::parse("00000000-0000-0000-0000-000000000001").expect("uuid literal");
523 let surface = MeerkatConsumerSurface::pinned(Arc::clone(&machine), pinned);
524 let err = surface
525 .apply_routed_input(
526 iv("PrepareBindings"),
527 vec![
528 (
529 fld("agent_runtime_id"),
530 OwnedFieldValue::Str("rt-other".into()),
531 ),
532 (fld("fence_token"), OwnedFieldValue::U64(1)),
533 (fld("generation"), OwnedFieldValue::U64(0)),
534 (
535 fld("session_id"),
536 OwnedFieldValue::Str("00000000-0000-0000-0000-000000000002".into()),
537 ),
538 ],
539 )
540 .await
541 .expect_err("session_id disagrees with pinned session");
542 assert!(err.contains("pinned"), "{err}");
543 }
544
545 #[tokio::test]
546 async fn ingest_prefers_projected_session_id() {
547 let machine = Arc::new(MeerkatMachine::ephemeral());
548 let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
549 let session_id = sid("00000000-0000-0000-0000-000000000001");
550 machine.register_session(session_id.clone()).await;
551
552 surface
553 .apply_routed_input(
554 iv("Ingest"),
555 vec![
556 (fld("runtime_id"), OwnedFieldValue::Str("rt-other".into())),
557 (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
558 (
559 fld("origin"),
560 OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
561 ),
562 (
563 fld("session_id"),
564 OwnedFieldValue::Str(session_id.to_string()),
565 ),
566 ],
567 )
568 .await
569 .expect("session_id targets the routed input");
570 }
571
572 #[tokio::test]
573 async fn ingest_resolves_session_from_runtime_id_when_session_id_absent() {
574 let machine = Arc::new(MeerkatMachine::ephemeral());
575 let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
576 let session_id = sid("00000000-0000-0000-0000-000000000001");
577 machine.register_session(session_id.clone()).await;
578 bind_runtime(&surface, &session_id, "rt-match").await;
579
580 surface
581 .apply_routed_input(
582 iv("Ingest"),
583 vec![
584 (fld("runtime_id"), OwnedFieldValue::Str("rt-match".into())),
585 (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
586 (
587 fld("origin"),
588 OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
589 ),
590 ],
591 )
592 .await
593 .expect("runtime_id resolves to the registered session");
594 }
595
596 #[tokio::test]
597 async fn ingest_without_matching_runtime_id_is_refused() {
598 let machine = Arc::new(MeerkatMachine::ephemeral());
599 let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
600 machine
601 .register_session(sid("00000000-0000-0000-0000-000000000001"))
602 .await;
603
604 let err = surface
605 .apply_routed_input(
606 iv("Ingest"),
607 vec![
608 (fld("runtime_id"), OwnedFieldValue::Str("rt-missing".into())),
609 (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
610 (
611 fld("origin"),
612 OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
613 ),
614 ],
615 )
616 .await
617 .expect_err("runtime_id has no registered owner");
618 assert!(
619 err.contains("did not match any registered session"),
620 "{err}"
621 );
622 }
623
624 #[tokio::test]
625 async fn ingest_with_ambiguous_runtime_id_is_refused() {
626 let machine = Arc::new(MeerkatMachine::ephemeral());
627 let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
628 let first = sid("00000000-0000-0000-0000-000000000001");
629 let second = sid("00000000-0000-0000-0000-000000000002");
630 machine.register_session(first.clone()).await;
631 machine.register_session(second.clone()).await;
632 bind_runtime(&surface, &first, "rt-shared").await;
633 bind_runtime(&surface, &second, "rt-shared").await;
634
635 let err = surface
636 .apply_routed_input(
637 iv("Ingest"),
638 vec![
639 (fld("runtime_id"), OwnedFieldValue::Str("rt-shared".into())),
640 (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
641 (
642 fld("origin"),
643 OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
644 ),
645 ],
646 )
647 .await
648 .expect_err("runtime_id is ambiguous");
649 assert!(err.contains("ambiguous delivery"), "{err}");
650 }
651
652 #[derive(Default)]
653 struct RecordingSignalSurface {
654 log: tokio::sync::Mutex<Vec<(SignalVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
655 }
656
657 #[async_trait]
658 impl SignalConsumerSurface for RecordingSignalSurface {
659 fn instance_id(&self) -> &MachineInstanceId {
660 static ID: OnceLock<MachineInstanceId> = OnceLock::new();
661 ID.get_or_init(|| MachineInstanceId::parse("mob").expect("canonical instance id"))
662 }
663
664 async fn receive_signal(
665 &self,
666 variant: SignalVariantId,
667 projected_fields: Vec<(FieldId, OwnedFieldValue)>,
668 ) -> Result<(), String> {
669 self.log.lock().await.push((variant, projected_fields));
670 Ok(())
671 }
672 }
673
674 #[tokio::test]
675 async fn routed_prepare_bindings_dispatches_runtime_bound_signal() {
676 let machine = Arc::new(MeerkatMachine::ephemeral());
677 let session_id = SessionId::new();
678 machine.register_session(session_id.clone()).await;
679
680 let signal_surface = Arc::new(RecordingSignalSurface::default());
681 let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
682 let table = RouteTable::from_schema(&schema).expect("catalog routes");
683 let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
684 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
685 .with_consumer(signal_surface.clone());
686 machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
687
688 machine
689 .apply_routed_meerkat_input(
690 &session_id,
691 mm_dsl::MeerkatMachineInput::PrepareBindings {
692 agent_runtime_id: mm_dsl::AgentRuntimeId("rt-1".into()),
693 fence_token: mm_dsl::FenceToken(11),
694 generation: mm_dsl::Generation(0),
695 session_id: mm_dsl::SessionId(session_id.to_string()),
696 },
697 )
698 .await
699 .expect("routed input applies and emits signal");
700
701 let log = signal_surface.log.lock().await;
702 assert_eq!(log.len(), 1);
703 assert_eq!(log[0].0.as_str(), "ObserveRuntimeReady");
704 assert_eq!(log[0].1[0].0.as_str(), "agent_runtime_id");
705 assert!(matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value == "rt-1"));
706 assert_eq!(log[0].1[1].0.as_str(), "fence_token");
707 assert!(matches!(log[0].1[1].1, OwnedFieldValue::U64(11)));
708 }
709
710 #[test]
711 fn routed_meerkat_signal_projection_tracks_generated_route_facts() {
712 use crate::generated::meerkat_mob_seam as seam_facts;
713
714 let cases = vec![
715 (
716 MeerkatSeamSignal::RuntimeBound {
717 agent_runtime_id: mm_dsl::AgentRuntimeId("rt-bound".into()),
718 fence_token: mm_dsl::FenceToken(11),
719 },
720 seam_facts::route_runtime_bound_reaches_mob(),
721 ),
722 (
723 MeerkatSeamSignal::RuntimeRetired {
724 agent_runtime_id: mm_dsl::AgentRuntimeId("rt-retired".into()),
725 fence_token: mm_dsl::FenceToken(12),
726 },
727 seam_facts::route_runtime_retired_reaches_mob(),
728 ),
729 (
730 MeerkatSeamSignal::RuntimeDestroyed {
731 agent_runtime_id: mm_dsl::AgentRuntimeId("rt-destroyed".into()),
732 fence_token: mm_dsl::FenceToken(13),
733 },
734 seam_facts::route_runtime_destroyed_reaches_mob(),
735 ),
736 ];
737
738 for (signal, expected_route) in cases {
739 let route = signal.generated_signal_route().expect("generated route");
740 assert_eq!(route, expected_route);
741 for (producer_field, _) in &route.bindings {
742 assert!(
743 signal.field(producer_field).is_some(),
744 "generated route `{}` requires producer field `{}`",
745 route.route_id.as_str(),
746 producer_field.as_str()
747 );
748 }
749 }
750 }
751
752 #[tokio::test]
753 async fn local_session_bindings_do_not_dispatch_runtime_bound_signal() {
754 let machine = Arc::new(MeerkatMachine::ephemeral());
755 let session_id = SessionId::new();
756
757 let signal_surface = Arc::new(RecordingSignalSurface::default());
758 let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
759 let table = RouteTable::from_schema(&schema).expect("catalog routes");
760 let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
761 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
762 .with_consumer(signal_surface.clone());
763 machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
764
765 let bindings = machine
766 .prepare_local_session_bindings(session_id.clone())
767 .await
768 .expect("local bindings prepare");
769
770 assert_eq!(bindings.session_id(), &session_id);
771 assert!(
772 signal_surface.log.lock().await.is_empty(),
773 "local resource preparation must not publish cross-machine runtime readiness"
774 );
775 {
776 let sessions = machine.sessions.read().await;
777 let entry = sessions.get(&session_id).expect("session registered");
778 let authority = entry
779 .dsl_authority
780 .lock()
781 .unwrap_or_else(std::sync::PoisonError::into_inner);
782 assert!(
783 authority.state.active_runtime_id.is_none(),
784 "local resource preparation must leave binding identity unclaimed"
785 );
786 assert!(
787 authority.state.active_fence_token.is_none(),
788 "local resource preparation must leave binding fence unclaimed"
789 );
790 }
791
792 machine
793 .apply_routed_meerkat_input(
794 &session_id,
795 mm_dsl::MeerkatMachineInput::PrepareBindings {
796 agent_runtime_id: mm_dsl::AgentRuntimeId("rt-authoritative".into()),
797 fence_token: mm_dsl::FenceToken(13),
798 generation: mm_dsl::Generation(0),
799 session_id: mm_dsl::SessionId(session_id.to_string()),
800 },
801 )
802 .await
803 .expect("authoritative binding still applies after local resource prep");
804
805 let log = signal_surface.log.lock().await;
806 assert_eq!(log.len(), 1);
807 assert_eq!(log[0].0.as_str(), "ObserveRuntimeReady");
808 assert!(
809 matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value == "rt-authoritative")
810 );
811 {
812 let sessions = machine.sessions.read().await;
813 let entry = sessions.get(&session_id).expect("session registered");
814 let authority = entry
815 .dsl_authority
816 .lock()
817 .unwrap_or_else(std::sync::PoisonError::into_inner);
818 assert!(
819 matches!(&authority.state.active_runtime_id, Some(value) if value.0 == "rt-authoritative")
820 );
821 assert!(matches!(
822 authority.state.active_fence_token,
823 Some(mm_dsl::FenceToken(13))
824 ));
825 }
826 }
827
828 #[tokio::test]
829 async fn session_owned_prepare_bindings_is_idempotent_without_reemitting_runtime_bound() {
830 let machine = Arc::new(MeerkatMachine::ephemeral());
831 let session_id = SessionId::new();
832
833 let signal_surface = Arc::new(RecordingSignalSurface::default());
834 let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
835 let table = RouteTable::from_schema(&schema).expect("catalog routes");
836 let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
837 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
838 .with_consumer(signal_surface.clone());
839 machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
840
841 machine
842 .prepare_bindings(session_id.clone())
843 .await
844 .expect("initial session-owned binding prepares");
845 machine
846 .prepare_bindings(session_id.clone())
847 .await
848 .expect("duplicate session-owned binding returns existing handles");
849
850 let log = signal_surface.log.lock().await;
851 assert_eq!(
852 log.len(),
853 1,
854 "duplicate handle resolution must not publish a second RuntimeBound signal"
855 );
856 assert_eq!(log[0].0.as_str(), "ObserveRuntimeReady");
857 assert!(
858 matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value.starts_with("rt:session:"))
859 );
860 }
861
862 #[tokio::test]
863 async fn session_owned_prepare_bindings_rejects_conflicting_authoritative_runtime() {
864 let machine = Arc::new(MeerkatMachine::ephemeral());
865 let session_id = SessionId::new();
866 machine.register_session(session_id.clone()).await;
867
868 let signal_surface = Arc::new(RecordingSignalSurface::default());
869 let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
870 let table = RouteTable::from_schema(&schema).expect("catalog routes");
871 let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
872 CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
873 .with_consumer(signal_surface.clone());
874 machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
875
876 machine
877 .apply_routed_meerkat_input(
878 &session_id,
879 mm_dsl::MeerkatMachineInput::PrepareBindings {
880 agent_runtime_id: mm_dsl::AgentRuntimeId("operator-rt:0".into()),
881 fence_token: mm_dsl::FenceToken(17),
882 generation: mm_dsl::Generation(0),
883 session_id: mm_dsl::SessionId(session_id.to_string()),
884 },
885 )
886 .await
887 .expect("mob-owned authoritative binding applies");
888
889 let err = machine
890 .prepare_bindings(session_id.clone())
891 .await
892 .expect_err("session-owned binding must not overwrite mob-owned authority");
893 assert!(
894 err.to_string()
895 .contains("already has authoritative runtime binding"),
896 "{err}"
897 );
898
899 let state = machine
900 .session_dsl_state(&session_id)
901 .await
902 .expect("session state remains available");
903 assert!(
904 matches!(&state.active_runtime_id, Some(value) if value.0 == "operator-rt:0"),
905 "conflicting prepare_bindings must not rewrite active_runtime_id: {:?}",
906 state.active_runtime_id
907 );
908 assert!(matches!(
909 state.active_fence_token,
910 Some(mm_dsl::FenceToken(17))
911 ));
912
913 let log = signal_surface.log.lock().await;
914 assert_eq!(
915 log.len(),
916 1,
917 "rejected session-owned binding must not publish a shadow RuntimeBound signal"
918 );
919 assert!(matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value == "operator-rt:0"));
920 }
921}