Skip to main content

meerkat_runtime/meerkat_machine/
composition.rs

1//! Wave-c C-6c — consumer side of the `meerkat_mob_seam` composition.
2//!
3//! Wave-c C-6p landed the producer side: the mob actor converts each
4//! emitted `MobMachineEffect::Request*` variant into a typed
5//! [`MobSeamEffect`][mse] and routes it through a
6//! [`CompositionDispatcher`][cd]. Until this module lands, that
7//! dispatcher resolves the typed route but fails with
8//! [`DispatchRefusal::UnwiredConsumer`][dr] because no
9//! [`ConsumerSurface`][cs] is registered for the `meerkat` target
10//! instance.
11//!
12//! C-6c closes that seam: [`MeerkatConsumerSurface`] is the typed
13//! consumer surface. The dispatcher invokes
14//! [`ConsumerSurface::apply_routed_input`] with the typed
15//! [`InputVariantId`] + projected field bindings declared by the
16//! `meerkat_mob_seam` composition schema
17//! (`meerkat-machine-schema/src/catalog/compositions.rs::meerkat_mob_seam_composition`).
18//! This surface translates each of the four routed variants —
19//! `PrepareBindings`, `Ingest`, `Retire`, `Destroy` — into the
20//! corresponding `MeerkatMachineInput` and applies it against the
21//! session's shared DSL authority.
22//!
23//! The route bindings declared in the schema are the sole source of
24//! truth for field projection shape. If a route binding references a
25//! producer field the effect body did not populate, the dispatcher
26//! returns [`DispatchRefusal::MissingProducerField`][dr] before this
27//! surface is reached, so [`apply_routed_input`][cs_apply] can assume
28//! the declared bindings are present.
29//!
30//! [mse]: meerkat_mob::runtime::composition::MobSeamEffect
31//! [cd]: crate::composition::CompositionDispatcher
32//! [cs]: crate::composition::ConsumerSurface
33//! [cs_apply]: crate::composition::ConsumerSurface::apply_routed_input
34//! [dr]: crate::composition::DispatchRefusal
35
36use std::sync::{Arc, OnceLock};
37
38use async_trait::async_trait;
39use meerkat_core::types::SessionId;
40use meerkat_machine_schema::identity::{
41    EffectVariantId, FieldId, InputVariantId, MachineInstanceId,
42};
43
44use crate::composition::{
45    CompositionSignalDispatcher, FieldValue, ProducerInstance, ProducerSignal, SignalPayload,
46};
47use crate::composition::{ConsumerSurface, OwnedFieldValue, SignalDispatchOutcome};
48use crate::generated::meerkat_mob_seam as seam_facts;
49use crate::meerkat_machine::{MeerkatMachine, dsl as mm_dsl};
50
51/// Consumer-side surface for the `meerkat_mob_seam` composition.
52///
53/// Implements [`ConsumerSurface`] for the `meerkat` target instance. The
54/// dispatcher hands the surface one routed input at a time; the surface
55/// translates the typed [`InputVariantId`] + projected-field tuple into
56/// the matching [`MeerkatMachineInput`][mmi] and applies it against the
57/// session's shared DSL authority on the owning [`MeerkatMachine`].
58///
59/// Session selection: routed effects prefer a projected `session_id`.
60/// `Ingest` may arrive without one, so the shared surface resolves its
61/// projected `runtime_id` through each registered session's DSL-owned
62/// `active_runtime_id`. Zero or multiple matches are refused rather than
63/// guessing.
64///
65/// [mmi]: crate::meerkat_machine::dsl::MeerkatMachineInput
66pub struct MeerkatConsumerSurface {
67    machine: Arc<MeerkatMachine>,
68    /// Optional pinned session id. `None` means the surface infers the
69    /// session from the `agent_runtime_id` field of each routed input;
70    /// `Some(id)` means every routed input is applied against that
71    /// session and the surface refuses variants whose projected
72    /// `agent_runtime_id` disagrees.
73    pinned_session: Option<SessionId>,
74}
75
76/// Producer-side signal source sum for MeerkatMachine lifecycle effects
77/// routed through the `meerkat_mob_seam` signal surface.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum MeerkatSeamSignal {
80    RuntimeBound {
81        agent_runtime_id: mm_dsl::AgentRuntimeId,
82        fence_token: mm_dsl::FenceToken,
83    },
84    RuntimeRetired {
85        agent_runtime_id: mm_dsl::AgentRuntimeId,
86        fence_token: mm_dsl::FenceToken,
87    },
88    RuntimeDestroyed {
89        agent_runtime_id: mm_dsl::AgentRuntimeId,
90        fence_token: mm_dsl::FenceToken,
91    },
92}
93
94impl MeerkatSeamSignal {
95    pub fn variant_id(&self) -> EffectVariantId {
96        match self {
97            Self::RuntimeBound { .. } => seam_facts::effects::meerkat::runtime_bound(),
98            Self::RuntimeRetired { .. } => seam_facts::effects::meerkat::runtime_retired(),
99            Self::RuntimeDestroyed { .. } => seam_facts::effects::meerkat::runtime_destroyed(),
100        }
101    }
102
103    pub fn generated_signal_route(&self) -> Option<seam_facts::TypedRoutedSignal> {
104        seam_facts::route_to_signal(
105            &seam_facts::producers::meerkat_instance_id(),
106            &self.variant_id(),
107        )
108    }
109
110    fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
111        let (agent_runtime_id, fence_token) = match self {
112            Self::RuntimeBound {
113                agent_runtime_id,
114                fence_token,
115            }
116            | Self::RuntimeRetired {
117                agent_runtime_id,
118                fence_token,
119            }
120            | Self::RuntimeDestroyed {
121                agent_runtime_id,
122                fence_token,
123            } => (agent_runtime_id, fence_token),
124        };
125        if id == &seam_facts::fields::agent_runtime_id() {
126            Some(FieldValue::Str(agent_runtime_id.0.as_str()))
127        } else if id == &seam_facts::fields::fence_token() {
128            Some(FieldValue::U64(fence_token.0))
129        } else {
130            None
131        }
132    }
133}
134
135impl ProducerSignal for MeerkatSeamSignal {
136    fn variant_id(&self) -> EffectVariantId {
137        self.variant_id()
138    }
139
140    fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
141        self.field(id)
142    }
143}
144
145pub type MeerkatCompositionSignalDispatcher =
146    Arc<dyn CompositionSignalDispatcher<Signal = MeerkatSeamSignal>>;
147
148pub fn meerkat_producer_instance() -> ProducerInstance {
149    let producer = seam_facts::producers::meerkat();
150    ProducerInstance {
151        composition: seam_facts::composition_id(),
152        instance_id: producer.instance_id,
153        machine: producer.machine,
154    }
155}
156
157pub fn lift_routed_signal(effect: &mm_dsl::MeerkatMachineEffect) -> Option<MeerkatSeamSignal> {
158    match effect {
159        mm_dsl::MeerkatMachineEffect::RuntimeBound {
160            agent_runtime_id,
161            fence_token,
162        } => Some(MeerkatSeamSignal::RuntimeBound {
163            agent_runtime_id: agent_runtime_id.clone(),
164            fence_token: *fence_token,
165        }),
166        mm_dsl::MeerkatMachineEffect::RuntimeRetired {
167            agent_runtime_id,
168            fence_token,
169        } => Some(MeerkatSeamSignal::RuntimeRetired {
170            agent_runtime_id: agent_runtime_id.clone(),
171            fence_token: *fence_token,
172        }),
173        mm_dsl::MeerkatMachineEffect::RuntimeDestroyed {
174            agent_runtime_id,
175            fence_token,
176        } => Some(MeerkatSeamSignal::RuntimeDestroyed {
177            agent_runtime_id: agent_runtime_id.clone(),
178            fence_token: *fence_token,
179        }),
180        _ => None,
181    }
182}
183
184pub async fn dispatch_routed_signal(
185    dispatcher: &MeerkatCompositionSignalDispatcher,
186    signal: MeerkatSeamSignal,
187) -> Result<SignalDispatchOutcome, String> {
188    let variant = signal.variant_id();
189    dispatcher
190        .dispatch_signal(
191            meerkat_producer_instance(),
192            SignalPayload::Emitted {
193                variant,
194                body: signal,
195            },
196        )
197        .await
198        .map_err(|refusal| refusal.to_string())
199}
200
201impl std::fmt::Debug for MeerkatConsumerSurface {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        f.debug_struct("MeerkatConsumerSurface")
204            .field("pinned_session", &self.pinned_session)
205            .finish_non_exhaustive()
206    }
207}
208
209impl MeerkatConsumerSurface {
210    /// Build a consumer surface backed by the given machine. The surface
211    /// resolves each routed input's target session from projected fields.
212    pub fn new(machine: Arc<MeerkatMachine>) -> Self {
213        Self {
214            machine,
215            pinned_session: None,
216        }
217    }
218
219    /// Build a consumer surface pinned to `session_id`. All routed
220    /// inputs are applied against this session; variants that carry a
221    /// `session_id` are additionally checked for agreement and refused on
222    /// mismatch.
223    pub fn pinned(machine: Arc<MeerkatMachine>, session_id: SessionId) -> Self {
224        Self {
225            machine,
226            pinned_session: Some(session_id),
227        }
228    }
229
230    async fn resolve_session(
231        &self,
232        variant: &InputVariantId,
233        projected: &[(FieldId, OwnedFieldValue)],
234    ) -> Result<SessionId, String> {
235        // Typed session_id is the canonical source (Shape 4 — producer DSL
236        // emits `session_id: SessionId` alongside `agent_runtime_id`; see
237        // `MobMachineEffect::RequestRuntimeBinding` in
238        // `meerkat-machine-schema/src/catalog/dsl/mob_machine.rs:168`).
239        let projected_session_id = projected
240            .iter()
241            .find(|(id, _)| id == &seam_facts::fields::session_id())
242            .and_then(|(_, v)| match v {
243                OwnedFieldValue::Str(s) => Some(s.clone()),
244                _ => None,
245            });
246
247        match (&self.pinned_session, projected_session_id) {
248            (Some(pinned), Some(sid)) if sid != pinned.to_string() => Err(format!(
249                "routed session_id `{sid}` does not match pinned session `{pinned}`"
250            )),
251            (Some(pinned), _) => Ok(pinned.clone()),
252            (None, Some(sid)) => SessionId::parse(&sid)
253                .map_err(|e| format!("routed session_id `{sid}` is not a valid UUID: {e}")),
254            (None, None) if variant == &seam_facts::inputs::ingest() => {
255                let runtime_id = project_str(projected, &seam_facts::fields::runtime_id())?;
256                self.machine
257                    .resolve_registered_session_for_runtime_id(&mm_dsl::AgentRuntimeId::from(
258                        runtime_id.to_string(),
259                    ))
260                    .await
261            }
262            (None, None) => Err(
263                "routed input did not project `session_id` and surface is not pinned \
264                 to a session — no session can be resolved"
265                    .into(),
266            ),
267        }
268    }
269}
270
271impl MeerkatMachine {
272    async fn resolve_registered_session_for_runtime_id(
273        &self,
274        runtime_id: &mm_dsl::AgentRuntimeId,
275    ) -> Result<SessionId, String> {
276        let sessions = self.sessions.read().await;
277        let mut matches = Vec::new();
278
279        for (session_id, entry) in sessions.iter() {
280            let authority = entry
281                .dsl_authority
282                .lock()
283                .unwrap_or_else(std::sync::PoisonError::into_inner);
284            if authority.state.active_runtime_id.as_ref() == Some(runtime_id) {
285                matches.push(session_id.clone());
286            }
287        }
288
289        match matches.len() {
290            0 => Err(format!(
291                "routed Ingest runtime_id `{}` did not match any registered session active_runtime_id",
292                runtime_id.0
293            )),
294            1 => Ok(matches.remove(0)),
295            count => Err(format!(
296                "routed Ingest runtime_id `{}` matched {count} registered sessions; refusing ambiguous delivery",
297                runtime_id.0
298            )),
299        }
300    }
301}
302
303#[allow(clippy::panic)]
304fn meerkat_instance_id() -> &'static MachineInstanceId {
305    static ID: OnceLock<MachineInstanceId> = OnceLock::new();
306    ID.get_or_init(seam_facts::producers::meerkat_instance_id)
307}
308
309fn project_u64(fields: &[(FieldId, OwnedFieldValue)], field: &FieldId) -> Result<u64, String> {
310    fields
311        .iter()
312        .find(|(id, _)| id == field)
313        .ok_or_else(|| format!("missing projected field `{}`", field.as_str()))
314        .and_then(|(_, v)| match v {
315            OwnedFieldValue::U64(n) => Ok(*n),
316            other => Err(format!(
317                "projected field `{}` is not U64: {other:?}",
318                field.as_str()
319            )),
320        })
321}
322
323fn project_str<'a>(
324    fields: &'a [(FieldId, OwnedFieldValue)],
325    field: &FieldId,
326) -> Result<&'a str, String> {
327    fields
328        .iter()
329        .find(|(id, _)| id == field)
330        .ok_or_else(|| format!("missing projected field `{}`", field.as_str()))
331        .and_then(|(_, v)| match v {
332            OwnedFieldValue::Str(s) => Ok(s.as_str()),
333            other => Err(format!(
334                "projected field `{}` is not Str: {other:?}",
335                field.as_str()
336            )),
337        })
338}
339
340fn project_work_origin(
341    fields: &[(FieldId, OwnedFieldValue)],
342    field: &FieldId,
343) -> Result<mm_dsl::WorkOrigin, String> {
344    fields
345        .iter()
346        .find(|(id, _)| id == field)
347        .ok_or_else(|| format!("missing projected field `{}`", field.as_str()))
348        .and_then(|(_, v)| match v {
349            OwnedFieldValue::Opaque(value) => value
350                .downcast_ref::<mm_dsl::WorkOrigin>()
351                .copied()
352                .ok_or_else(|| format!("projected field `{}` is not WorkOrigin", field.as_str())),
353            other => Err(format!(
354                "projected field `{}` is not WorkOrigin: {other:?}",
355                field.as_str()
356            )),
357        })
358}
359
360#[async_trait]
361impl ConsumerSurface for MeerkatConsumerSurface {
362    fn instance_id(&self) -> &MachineInstanceId {
363        meerkat_instance_id()
364    }
365
366    async fn apply_routed_input(
367        &self,
368        variant: InputVariantId,
369        projected: Vec<(FieldId, OwnedFieldValue)>,
370    ) -> Result<(), String> {
371        let session_id = self.resolve_session(&variant, &projected).await?;
372        let input = if variant == seam_facts::inputs::prepare_bindings() {
373            let rt = project_str(&projected, &seam_facts::fields::agent_runtime_id())?;
374            let fence = project_u64(&projected, &seam_facts::fields::fence_token())?;
375            let gen_ = project_u64(&projected, &seam_facts::fields::generation())?;
376            let sid = project_str(&projected, &seam_facts::fields::session_id())?;
377            mm_dsl::MeerkatMachineInput::PrepareBindings {
378                agent_runtime_id: mm_dsl::AgentRuntimeId::from(rt.to_string()),
379                fence_token: mm_dsl::FenceToken(fence),
380                generation: mm_dsl::Generation(gen_),
381                session_id: mm_dsl::SessionId::from(sid.to_string()),
382            }
383        } else if variant == seam_facts::inputs::ingest() {
384            // Route binding `work_request_reaches_meerkat` delivers
385            // producer `agent_runtime_id` into the consumer's canonical
386            // `runtime_id` field; producer `work_id` → consumer
387            // `work_id`; producer `origin` → consumer `origin`.
388            let rt = project_str(&projected, &seam_facts::fields::runtime_id())?;
389            let work_id = project_str(&projected, &seam_facts::fields::work_id())?;
390            let origin = project_work_origin(&projected, &seam_facts::fields::origin())?;
391            mm_dsl::MeerkatMachineInput::Ingest {
392                runtime_id: mm_dsl::AgentRuntimeId::from(rt.to_string()),
393                work_id: mm_dsl::WorkId::from(work_id.to_string()),
394                origin,
395            }
396        } else if variant == seam_facts::inputs::retire() {
397            let sid = project_str(&projected, &seam_facts::fields::session_id())?;
398            mm_dsl::MeerkatMachineInput::Retire {
399                session_id: mm_dsl::SessionId::from(sid.to_string()),
400            }
401        } else if variant == seam_facts::inputs::destroy() {
402            let sid = project_str(&projected, &seam_facts::fields::session_id())?;
403            mm_dsl::MeerkatMachineInput::Destroy {
404                session_id: mm_dsl::SessionId::from(sid.to_string()),
405            }
406        } else {
407            return Err(format!(
408                "meerkat consumer surface does not accept routed input `{}`; \
409                     only PrepareBindings/Ingest/Retire/Destroy are declared in the \
410                     `meerkat_mob_seam` schema",
411                variant.as_str()
412            ));
413        };
414
415        self.machine
416            .apply_routed_meerkat_input(&session_id, input)
417            .await
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use crate::composition::{
425        CatalogCompositionSignalDispatcher, OwnedFieldValue, RouteTable, SignalConsumerSurface,
426    };
427    use meerkat_machine_schema::identity::SignalVariantId;
428
429    fn fld(slug: &str) -> FieldId {
430        FieldId::parse(slug).expect("field slug")
431    }
432
433    fn iv(slug: &str) -> InputVariantId {
434        InputVariantId::parse(slug).expect("input variant slug")
435    }
436
437    fn sid(slug: &str) -> SessionId {
438        SessionId::parse(slug).expect("session id")
439    }
440
441    async fn bind_runtime(
442        surface: &MeerkatConsumerSurface,
443        session_id: &SessionId,
444        runtime_id: &str,
445    ) {
446        surface
447            .apply_routed_input(
448                iv("PrepareBindings"),
449                vec![
450                    (
451                        fld("agent_runtime_id"),
452                        OwnedFieldValue::Str(runtime_id.into()),
453                    ),
454                    (fld("fence_token"), OwnedFieldValue::U64(1)),
455                    (fld("generation"), OwnedFieldValue::U64(0)),
456                    (
457                        fld("session_id"),
458                        OwnedFieldValue::Str(session_id.to_string()),
459                    ),
460                ],
461            )
462            .await
463            .expect("bind runtime");
464    }
465
466    #[tokio::test]
467    async fn prepare_bindings_requires_all_three_fields() {
468        let machine = Arc::new(MeerkatMachine::ephemeral());
469        let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
470        let err = surface
471            .apply_routed_input(
472                iv("PrepareBindings"),
473                vec![
474                    (fld("agent_runtime_id"), OwnedFieldValue::Str("rt-1".into())),
475                    // fence_token missing on purpose.
476                    (fld("generation"), OwnedFieldValue::U64(3)),
477                    (
478                        fld("session_id"),
479                        OwnedFieldValue::Str("00000000-0000-0000-0000-000000000001".into()),
480                    ),
481                ],
482            )
483            .await
484            .expect_err("missing fence_token");
485        assert!(err.contains("fence_token"), "{err}");
486    }
487
488    #[tokio::test]
489    async fn unknown_variant_is_refused_typed() {
490        let machine = Arc::new(MeerkatMachine::ephemeral());
491        // Pin the surface so resolve_session doesn't fail earlier on
492        // missing session_id — this test focuses on variant-rejection,
493        // not session resolution.
494        let pinned =
495            SessionId::parse("00000000-0000-0000-0000-000000000001").expect("uuid literal");
496        let surface = MeerkatConsumerSurface::pinned(Arc::clone(&machine), pinned);
497        let err = surface
498            .apply_routed_input(iv("Recycle"), vec![])
499            .await
500            .expect_err("Recycle is not a routed variant");
501        assert!(err.contains("Recycle"), "{err}");
502    }
503
504    #[tokio::test]
505    async fn unpinned_surface_requires_projected_session_id_for_retire() {
506        let machine = Arc::new(MeerkatMachine::ephemeral());
507        let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
508        // Retire has no fields in the schema; an unpinned surface
509        // therefore cannot resolve a session and must refuse rather
510        // than pick arbitrarily.
511        let err = surface
512            .apply_routed_input(iv("Retire"), vec![])
513            .await
514            .expect_err("Retire without target");
515        assert!(err.contains("session_id"), "{err}");
516    }
517
518    #[tokio::test]
519    async fn pinned_surface_rejects_mismatched_session_id() {
520        let machine = Arc::new(MeerkatMachine::ephemeral());
521        let pinned =
522            SessionId::parse("00000000-0000-0000-0000-000000000001").expect("uuid literal");
523        let surface = MeerkatConsumerSurface::pinned(Arc::clone(&machine), pinned);
524        let err = surface
525            .apply_routed_input(
526                iv("PrepareBindings"),
527                vec![
528                    (
529                        fld("agent_runtime_id"),
530                        OwnedFieldValue::Str("rt-other".into()),
531                    ),
532                    (fld("fence_token"), OwnedFieldValue::U64(1)),
533                    (fld("generation"), OwnedFieldValue::U64(0)),
534                    (
535                        fld("session_id"),
536                        OwnedFieldValue::Str("00000000-0000-0000-0000-000000000002".into()),
537                    ),
538                ],
539            )
540            .await
541            .expect_err("session_id disagrees with pinned session");
542        assert!(err.contains("pinned"), "{err}");
543    }
544
545    #[tokio::test]
546    async fn ingest_prefers_projected_session_id() {
547        let machine = Arc::new(MeerkatMachine::ephemeral());
548        let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
549        let session_id = sid("00000000-0000-0000-0000-000000000001");
550        machine.register_session(session_id.clone()).await;
551
552        surface
553            .apply_routed_input(
554                iv("Ingest"),
555                vec![
556                    (fld("runtime_id"), OwnedFieldValue::Str("rt-other".into())),
557                    (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
558                    (
559                        fld("origin"),
560                        OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
561                    ),
562                    (
563                        fld("session_id"),
564                        OwnedFieldValue::Str(session_id.to_string()),
565                    ),
566                ],
567            )
568            .await
569            .expect("session_id targets the routed input");
570    }
571
572    #[tokio::test]
573    async fn ingest_resolves_session_from_runtime_id_when_session_id_absent() {
574        let machine = Arc::new(MeerkatMachine::ephemeral());
575        let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
576        let session_id = sid("00000000-0000-0000-0000-000000000001");
577        machine.register_session(session_id.clone()).await;
578        bind_runtime(&surface, &session_id, "rt-match").await;
579
580        surface
581            .apply_routed_input(
582                iv("Ingest"),
583                vec![
584                    (fld("runtime_id"), OwnedFieldValue::Str("rt-match".into())),
585                    (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
586                    (
587                        fld("origin"),
588                        OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
589                    ),
590                ],
591            )
592            .await
593            .expect("runtime_id resolves to the registered session");
594    }
595
596    #[tokio::test]
597    async fn ingest_without_matching_runtime_id_is_refused() {
598        let machine = Arc::new(MeerkatMachine::ephemeral());
599        let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
600        machine
601            .register_session(sid("00000000-0000-0000-0000-000000000001"))
602            .await;
603
604        let err = surface
605            .apply_routed_input(
606                iv("Ingest"),
607                vec![
608                    (fld("runtime_id"), OwnedFieldValue::Str("rt-missing".into())),
609                    (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
610                    (
611                        fld("origin"),
612                        OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
613                    ),
614                ],
615            )
616            .await
617            .expect_err("runtime_id has no registered owner");
618        assert!(
619            err.contains("did not match any registered session"),
620            "{err}"
621        );
622    }
623
624    #[tokio::test]
625    async fn ingest_with_ambiguous_runtime_id_is_refused() {
626        let machine = Arc::new(MeerkatMachine::ephemeral());
627        let surface = MeerkatConsumerSurface::new(Arc::clone(&machine));
628        let first = sid("00000000-0000-0000-0000-000000000001");
629        let second = sid("00000000-0000-0000-0000-000000000002");
630        machine.register_session(first.clone()).await;
631        machine.register_session(second.clone()).await;
632        bind_runtime(&surface, &first, "rt-shared").await;
633        bind_runtime(&surface, &second, "rt-shared").await;
634
635        let err = surface
636            .apply_routed_input(
637                iv("Ingest"),
638                vec![
639                    (fld("runtime_id"), OwnedFieldValue::Str("rt-shared".into())),
640                    (fld("work_id"), OwnedFieldValue::Str("work-1".into())),
641                    (
642                        fld("origin"),
643                        OwnedFieldValue::Opaque(Arc::new(mm_dsl::WorkOrigin::Ingest)),
644                    ),
645                ],
646            )
647            .await
648            .expect_err("runtime_id is ambiguous");
649        assert!(err.contains("ambiguous delivery"), "{err}");
650    }
651
652    #[derive(Default)]
653    struct RecordingSignalSurface {
654        log: tokio::sync::Mutex<Vec<(SignalVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
655    }
656
657    #[async_trait]
658    impl SignalConsumerSurface for RecordingSignalSurface {
659        fn instance_id(&self) -> &MachineInstanceId {
660            static ID: OnceLock<MachineInstanceId> = OnceLock::new();
661            ID.get_or_init(|| MachineInstanceId::parse("mob").expect("canonical instance id"))
662        }
663
664        async fn receive_signal(
665            &self,
666            variant: SignalVariantId,
667            projected_fields: Vec<(FieldId, OwnedFieldValue)>,
668        ) -> Result<(), String> {
669            self.log.lock().await.push((variant, projected_fields));
670            Ok(())
671        }
672    }
673
674    #[tokio::test]
675    async fn routed_prepare_bindings_dispatches_runtime_bound_signal() {
676        let machine = Arc::new(MeerkatMachine::ephemeral());
677        let session_id = SessionId::new();
678        machine.register_session(session_id.clone()).await;
679
680        let signal_surface = Arc::new(RecordingSignalSurface::default());
681        let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
682        let table = RouteTable::from_schema(&schema).expect("catalog routes");
683        let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
684            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
685                .with_consumer(signal_surface.clone());
686        machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
687
688        machine
689            .apply_routed_meerkat_input(
690                &session_id,
691                mm_dsl::MeerkatMachineInput::PrepareBindings {
692                    agent_runtime_id: mm_dsl::AgentRuntimeId("rt-1".into()),
693                    fence_token: mm_dsl::FenceToken(11),
694                    generation: mm_dsl::Generation(0),
695                    session_id: mm_dsl::SessionId(session_id.to_string()),
696                },
697            )
698            .await
699            .expect("routed input applies and emits signal");
700
701        let log = signal_surface.log.lock().await;
702        assert_eq!(log.len(), 1);
703        assert_eq!(log[0].0.as_str(), "ObserveRuntimeReady");
704        assert_eq!(log[0].1[0].0.as_str(), "agent_runtime_id");
705        assert!(matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value == "rt-1"));
706        assert_eq!(log[0].1[1].0.as_str(), "fence_token");
707        assert!(matches!(log[0].1[1].1, OwnedFieldValue::U64(11)));
708    }
709
710    #[test]
711    fn routed_meerkat_signal_projection_tracks_generated_route_facts() {
712        use crate::generated::meerkat_mob_seam as seam_facts;
713
714        let cases = vec![
715            (
716                MeerkatSeamSignal::RuntimeBound {
717                    agent_runtime_id: mm_dsl::AgentRuntimeId("rt-bound".into()),
718                    fence_token: mm_dsl::FenceToken(11),
719                },
720                seam_facts::route_runtime_bound_reaches_mob(),
721            ),
722            (
723                MeerkatSeamSignal::RuntimeRetired {
724                    agent_runtime_id: mm_dsl::AgentRuntimeId("rt-retired".into()),
725                    fence_token: mm_dsl::FenceToken(12),
726                },
727                seam_facts::route_runtime_retired_reaches_mob(),
728            ),
729            (
730                MeerkatSeamSignal::RuntimeDestroyed {
731                    agent_runtime_id: mm_dsl::AgentRuntimeId("rt-destroyed".into()),
732                    fence_token: mm_dsl::FenceToken(13),
733                },
734                seam_facts::route_runtime_destroyed_reaches_mob(),
735            ),
736        ];
737
738        for (signal, expected_route) in cases {
739            let route = signal.generated_signal_route().expect("generated route");
740            assert_eq!(route, expected_route);
741            for (producer_field, _) in &route.bindings {
742                assert!(
743                    signal.field(producer_field).is_some(),
744                    "generated route `{}` requires producer field `{}`",
745                    route.route_id.as_str(),
746                    producer_field.as_str()
747                );
748            }
749        }
750    }
751
752    #[tokio::test]
753    async fn local_session_bindings_do_not_dispatch_runtime_bound_signal() {
754        let machine = Arc::new(MeerkatMachine::ephemeral());
755        let session_id = SessionId::new();
756
757        let signal_surface = Arc::new(RecordingSignalSurface::default());
758        let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
759        let table = RouteTable::from_schema(&schema).expect("catalog routes");
760        let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
761            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
762                .with_consumer(signal_surface.clone());
763        machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
764
765        let bindings = machine
766            .prepare_local_session_bindings(session_id.clone())
767            .await
768            .expect("local bindings prepare");
769
770        assert_eq!(bindings.session_id(), &session_id);
771        assert!(
772            signal_surface.log.lock().await.is_empty(),
773            "local resource preparation must not publish cross-machine runtime readiness"
774        );
775        {
776            let sessions = machine.sessions.read().await;
777            let entry = sessions.get(&session_id).expect("session registered");
778            let authority = entry
779                .dsl_authority
780                .lock()
781                .unwrap_or_else(std::sync::PoisonError::into_inner);
782            assert!(
783                authority.state.active_runtime_id.is_none(),
784                "local resource preparation must leave binding identity unclaimed"
785            );
786            assert!(
787                authority.state.active_fence_token.is_none(),
788                "local resource preparation must leave binding fence unclaimed"
789            );
790        }
791
792        machine
793            .apply_routed_meerkat_input(
794                &session_id,
795                mm_dsl::MeerkatMachineInput::PrepareBindings {
796                    agent_runtime_id: mm_dsl::AgentRuntimeId("rt-authoritative".into()),
797                    fence_token: mm_dsl::FenceToken(13),
798                    generation: mm_dsl::Generation(0),
799                    session_id: mm_dsl::SessionId(session_id.to_string()),
800                },
801            )
802            .await
803            .expect("authoritative binding still applies after local resource prep");
804
805        let log = signal_surface.log.lock().await;
806        assert_eq!(log.len(), 1);
807        assert_eq!(log[0].0.as_str(), "ObserveRuntimeReady");
808        assert!(
809            matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value == "rt-authoritative")
810        );
811        {
812            let sessions = machine.sessions.read().await;
813            let entry = sessions.get(&session_id).expect("session registered");
814            let authority = entry
815                .dsl_authority
816                .lock()
817                .unwrap_or_else(std::sync::PoisonError::into_inner);
818            assert!(
819                matches!(&authority.state.active_runtime_id, Some(value) if value.0 == "rt-authoritative")
820            );
821            assert!(matches!(
822                authority.state.active_fence_token,
823                Some(mm_dsl::FenceToken(13))
824            ));
825        }
826    }
827
828    #[tokio::test]
829    async fn session_owned_prepare_bindings_is_idempotent_without_reemitting_runtime_bound() {
830        let machine = Arc::new(MeerkatMachine::ephemeral());
831        let session_id = SessionId::new();
832
833        let signal_surface = Arc::new(RecordingSignalSurface::default());
834        let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
835        let table = RouteTable::from_schema(&schema).expect("catalog routes");
836        let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
837            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
838                .with_consumer(signal_surface.clone());
839        machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
840
841        machine
842            .prepare_bindings(session_id.clone())
843            .await
844            .expect("initial session-owned binding prepares");
845        machine
846            .prepare_bindings(session_id.clone())
847            .await
848            .expect("duplicate session-owned binding returns existing handles");
849
850        let log = signal_surface.log.lock().await;
851        assert_eq!(
852            log.len(),
853            1,
854            "duplicate handle resolution must not publish a second RuntimeBound signal"
855        );
856        assert_eq!(log[0].0.as_str(), "ObserveRuntimeReady");
857        assert!(
858            matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value.starts_with("rt:session:"))
859        );
860    }
861
862    #[tokio::test]
863    async fn session_owned_prepare_bindings_rejects_conflicting_authoritative_runtime() {
864        let machine = Arc::new(MeerkatMachine::ephemeral());
865        let session_id = SessionId::new();
866        machine.register_session(session_id.clone()).await;
867
868        let signal_surface = Arc::new(RecordingSignalSurface::default());
869        let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
870        let table = RouteTable::from_schema(&schema).expect("catalog routes");
871        let dispatcher: CatalogCompositionSignalDispatcher<MeerkatSeamSignal> =
872            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
873                .with_consumer(signal_surface.clone());
874        machine.set_composition_signal_dispatcher(Arc::new(dispatcher));
875
876        machine
877            .apply_routed_meerkat_input(
878                &session_id,
879                mm_dsl::MeerkatMachineInput::PrepareBindings {
880                    agent_runtime_id: mm_dsl::AgentRuntimeId("operator-rt:0".into()),
881                    fence_token: mm_dsl::FenceToken(17),
882                    generation: mm_dsl::Generation(0),
883                    session_id: mm_dsl::SessionId(session_id.to_string()),
884                },
885            )
886            .await
887            .expect("mob-owned authoritative binding applies");
888
889        let err = machine
890            .prepare_bindings(session_id.clone())
891            .await
892            .expect_err("session-owned binding must not overwrite mob-owned authority");
893        assert!(
894            err.to_string()
895                .contains("already has authoritative runtime binding"),
896            "{err}"
897        );
898
899        let state = machine
900            .session_dsl_state(&session_id)
901            .await
902            .expect("session state remains available");
903        assert!(
904            matches!(&state.active_runtime_id, Some(value) if value.0 == "operator-rt:0"),
905            "conflicting prepare_bindings must not rewrite active_runtime_id: {:?}",
906            state.active_runtime_id
907        );
908        assert!(matches!(
909            state.active_fence_token,
910            Some(mm_dsl::FenceToken(17))
911        ));
912
913        let log = signal_surface.log.lock().await;
914        assert_eq!(
915            log.len(),
916            1,
917            "rejected session-owned binding must not publish a shadow RuntimeBound signal"
918        );
919        assert!(matches!(&log[0].1[0].1, OwnedFieldValue::Str(value) if value == "operator-rt:0"));
920    }
921}