Skip to main content

meerkat_runtime/composition/
mod.rs

1//! Composition dispatcher — THE typed execution path for routed effects.
2//!
3//! Wave-b V2 rebuilds composition dispatch as a typed, *mandatory* runtime seam.
4//! The deleted `composition_dispatch.rs` and `recompute_mob_peer_overlay*.rs`
5//! (wave-a tombstones `ce2dbe35e` / `f5e366f38`) were stringly-typed helpers
6//! that callers opted into. This module is their structural opposite:
7//!
8//! * **Typed end-to-end.** Producer identity is [`ProducerInstance`] carrying
9//!   typed [`CompositionId`], [`MachineInstanceId`], [`MachineId`]. Effects
10//!   travel as [`EffectPayload<E>`] where `E` is the producer composition's
11//!   typed seam-effect sum (the [`ProducerEffect`] trait bound). Route
12//!   resolution returns a typed [`RoutedInputDescriptor`] carrying
13//!   [`MachineInstanceId`] / [`InputVariantId`] / `Vec<(FieldId, FieldId)>`.
14//!   Signal-kind routes travel through the parallel [`SignalPayload<S>`] /
15//!   [`CompositionSignalDispatcher`] surface and resolve to typed
16//!   [`RoutedSignalDescriptor`] values.
17//! * **Mandatory, not optional.** The trait has no fallback surface. Routed
18//!   effects whose route is declared in the composition schema MUST resolve
19//!   through the dispatcher; unresolved routes are a typed
20//!   [`DispatchRefusal::UnresolvedRoute`] error, not a silent drop.
21//!   Signal-kind routes live on a separate index inside [`RouteTable`] and
22//!   MUST resolve through [`CompositionSignalDispatcher`].
23//! * **Compile-time presence/absence.** A `MeerkatMachine` either has a
24//!   composition dispatcher attached (via the `with_composition` constructor)
25//!   or it does not (the standalone / single-machine test path). The two
26//!   cases are distinguished by a typed [`CompositionBinding`] discriminant,
27//!   never by `Option<Arc<dyn CompositionDispatcher>>`.
28//!
29//! The default catalog-backed dispatcher ([`CatalogCompositionDispatcher`])
30//! consumes a [`RouteTable`] built from any
31//! [`meerkat_machine_schema::CompositionSchema`] and delivers each resolved
32//! [`RoutedInputDescriptor`] to a per-consumer-instance [`ConsumerSurface`]
33//! supplied at wire-up. The per-composition codegen module emitted by
34//! `meerkat-machine-codegen` (B-4 + B-4b) plugs in as the
35//! [`ProducerEffect`] implementation — `route_to_input` is equivalent to
36//! consulting the [`RouteTable`] built from the same schema.
37
38pub mod route_table;
39
40use std::collections::HashMap;
41use std::fmt;
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use meerkat_machine_schema::identity::{
46    CompositionId, EffectVariantId, FieldId, InputVariantId, MachineId, MachineInstanceId, RouteId,
47    SignalVariantId,
48};
49use thiserror::Error;
50
51pub use route_table::{RouteTable, RouteTableError, RoutedInputDescriptor, RoutedSignalDescriptor};
52
53/// Typed identity of the producing machine instance inside a composition.
54///
55/// Unlike the deleted string-keyed helpers, every field is a typed newtype
56/// so cross-instance mixups are rejected at compile time.
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct ProducerInstance {
59    /// Composition that contains the producer.
60    pub composition: CompositionId,
61    /// Instance id of the producer *within* `composition`.
62    pub instance_id: MachineInstanceId,
63    /// Underlying machine name (the schema this instance is an instance of).
64    pub machine: MachineId,
65}
66
67/// Typed effect payload. Generic over the producer composition's seam-effect
68/// sum (see the codegen-emitted `{Composition}Effect` enum).
69///
70/// The variant carries the typed [`EffectVariantId`] alongside the body so
71/// the dispatcher can look up a route without pattern-matching on the
72/// producer's effect enum (the [`ProducerEffect`] trait hides that under
73/// [`ProducerEffect::variant_id`]).
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum EffectPayload<E> {
76    /// Producer emitted a typed effect variant.
77    Emitted {
78        /// Typed variant id (matches the producer's effect enum tag).
79        variant: EffectVariantId,
80        /// The typed effect body.
81        body: E,
82    },
83}
84
85impl<E: ProducerEffect> EffectPayload<E> {
86    /// Borrow the typed variant id.
87    pub fn variant(&self) -> &EffectVariantId {
88        match self {
89            Self::Emitted { variant, .. } => variant,
90        }
91    }
92
93    /// Borrow the typed body.
94    pub fn body(&self) -> &E {
95        match self {
96            Self::Emitted { body, .. } => body,
97        }
98    }
99}
100
101/// Typed signal-route payload. Generic over the producer composition's
102/// seam-signal sum.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum SignalPayload<S> {
105    /// Producer emitted a typed signal-route source variant.
106    Emitted {
107        /// Typed source variant id. In the composition schema this is the
108        /// route's producer-side `effect_variant`; signal-kind routes still
109        /// originate from a producer effect and target a consumer signal.
110        variant: EffectVariantId,
111        /// The typed signal source body.
112        body: S,
113    },
114}
115
116impl<S: ProducerSignal> SignalPayload<S> {
117    /// Borrow the typed source variant id.
118    pub fn variant(&self) -> &EffectVariantId {
119        match self {
120            Self::Emitted { variant, .. } => variant,
121        }
122    }
123
124    /// Borrow the typed body.
125    pub fn body(&self) -> &S {
126        match self {
127            Self::Emitted { body, .. } => body,
128        }
129    }
130}
131
132/// Typed route key: `(composition, route)`.
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct RouteKey {
135    pub composition: CompositionId,
136    pub route_id: RouteId,
137}
138
139/// Marker trait for the seam-effect sum emitted by
140/// `meerkat-machine-codegen::render_composition_driver`. Producer effect
141/// enums implement this to expose the typed variant id alongside their
142/// domain body — the dispatcher consults it without inspecting the enum.
143pub trait ProducerEffect: fmt::Debug + Send + Sync + 'static {
144    /// Typed variant id for this effect value.
145    ///
146    /// The codegen emits one arm per distinct `{producer_instance}::{variant}`
147    /// pair; implementers return the matching [`EffectVariantId`]. This is
148    /// the single handle the dispatcher needs to resolve the route without
149    /// case-matching on the producer's concrete enum.
150    fn variant_id(&self) -> EffectVariantId;
151
152    /// Borrow a field value by [`FieldId`].
153    ///
154    /// Used by the dispatcher to project producer fields into the typed
155    /// consumer input as declared by the composition's route bindings.
156    /// Returns `None` if the requested field is not present on this
157    /// variant. The dispatcher treats that as
158    /// [`DispatchRefusal::MissingProducerField`].
159    fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
160}
161
162/// Marker trait for the seam-signal source sum consumed by
163/// [`CompositionSignalDispatcher`].
164///
165/// Signal-kind composition routes use the same producer-side
166/// `EffectVariantId` namespace as input routes, but their target is a
167/// consumer [`SignalVariantId`]. This trait mirrors [`ProducerEffect`] so
168/// signal dispatch has the same typed projection discipline without
169/// requiring callers to smuggle signal payloads through the input
170/// dispatcher.
171pub trait ProducerSignal: fmt::Debug + Send + Sync + 'static {
172    /// Typed producer-side variant id for this signal source value.
173    fn variant_id(&self) -> EffectVariantId;
174
175    /// Borrow a producer field by [`FieldId`].
176    fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
177}
178
179/// Typed view over a producer-field value projected through a route binding.
180///
181/// The `ProducerEffect::field` implementation returns one of these so the
182/// dispatcher can move the value into the consumer input without a
183/// `serde_json::Value` round-trip. The variant set is intentionally small;
184/// richer shapes are expressed by the producer keeping the typed value
185/// inside its effect body and the consumer accepting it via the same
186/// typed enum (the codegen emits the matching types on both sides).
187#[derive(Debug, Clone)]
188pub enum FieldValue<'a> {
189    /// Borrowed string slice (owning producer retains the backing `String`).
190    Str(&'a str),
191    /// Unsigned 64-bit integer.
192    U64(u64),
193    /// Signed 64-bit integer.
194    I64(i64),
195    /// Boolean flag.
196    Bool(bool),
197    /// Opaque typed handle — producer and consumer agree on the Rust type.
198    /// The dispatcher moves the `Arc<dyn Any>` across without inspecting it.
199    /// This is *not* a `serde_json::Value` escape hatch: the contained Rust
200    /// type is determined by the typed route binding, not by ad-hoc JSON.
201    Opaque(Arc<dyn std::any::Any + Send + Sync>),
202}
203
204/// Outcome when a routed effect is successfully dispatched.
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DispatchOutcome {
207    /// Route that was resolved for this effect.
208    pub route: RouteKey,
209    /// Target consumer instance the typed input was delivered to.
210    pub consumer: MachineInstanceId,
211    /// Typed input variant applied on the consumer.
212    pub applied_input: InputVariantId,
213}
214
215/// Outcome when a routed signal is successfully dispatched.
216#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct SignalDispatchOutcome {
218    /// Route that was resolved for this signal source.
219    pub route: RouteKey,
220    /// Target consumer instance the typed signal was delivered to.
221    pub consumer: MachineInstanceId,
222    /// Typed signal variant applied on the consumer.
223    pub applied_signal: SignalVariantId,
224}
225
226/// Reasons the dispatcher refuses a routed effect.
227///
228/// Unlike the deleted helper path, there is no "silently drop unknown
229/// effects" arm. Every failure is a typed variant so callers and RMAT
230/// audits can enumerate them without parsing error strings.
231#[derive(Debug, Clone, PartialEq, Eq, Error)]
232pub enum DispatchRefusal {
233    /// The producer is not registered for this dispatcher's composition.
234    #[error("dispatcher composition {expected} does not match producer composition {actual}")]
235    CompositionMismatch {
236        expected: CompositionId,
237        actual: CompositionId,
238    },
239    /// No input-kind route is declared for `(producer.instance_id, variant)`.
240    #[error(
241        "no input route declared for producer {instance} effect variant {variant} in composition {composition}"
242    )]
243    UnresolvedRoute {
244        composition: CompositionId,
245        instance: MachineInstanceId,
246        variant: EffectVariantId,
247    },
248    /// A route-binding references a producer field that the effect body did
249    /// not supply (via [`ProducerEffect::field`]).
250    #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
251    MissingProducerField {
252        route: RouteId,
253        variant: EffectVariantId,
254        field: FieldId,
255    },
256    /// No [`ConsumerSurface`] is registered for the resolved target
257    /// instance. This is a wiring bug at construction time, not a runtime
258    /// signal — the dispatcher refuses rather than queueing forever.
259    #[error(
260        "no consumer surface registered for target instance {instance} in composition {composition}"
261    )]
262    UnwiredConsumer {
263        composition: CompositionId,
264        instance: MachineInstanceId,
265    },
266    /// The consumer surface rejected the typed input (e.g. because the
267    /// consumer machine is no longer accepting inputs). The inner
268    /// [`ConsumerError`] is the consumer-side typed rejection and is opaque
269    /// to the dispatcher, but its stable `error_code` survives the seam so
270    /// callers and RMAT audits can enumerate refusals without parsing
271    /// strings.
272    #[error("consumer {instance} refused input {variant}: {error}")]
273    ConsumerRefused {
274        instance: MachineInstanceId,
275        variant: InputVariantId,
276        error: ConsumerError,
277    },
278}
279
280/// Reasons the signal dispatcher refuses a routed signal.
281#[derive(Debug, Clone, PartialEq, Eq, Error)]
282pub enum SignalDispatchRefusal {
283    /// The producer is not registered for this dispatcher's composition.
284    #[error("dispatcher composition {expected} does not match producer composition {actual}")]
285    CompositionMismatch {
286        expected: CompositionId,
287        actual: CompositionId,
288    },
289    /// No signal-kind route is declared for `(producer.instance_id, variant)`.
290    #[error(
291        "no signal route declared for producer {instance} variant {variant} in composition {composition}"
292    )]
293    UnresolvedRoute {
294        composition: CompositionId,
295        instance: MachineInstanceId,
296        variant: EffectVariantId,
297    },
298    /// A route-binding references a producer field that the signal body
299    /// did not supply.
300    #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
301    MissingProducerField {
302        route: RouteId,
303        variant: EffectVariantId,
304        field: FieldId,
305    },
306    /// No [`SignalConsumerSurface`] is registered for the resolved target
307    /// instance.
308    #[error(
309        "no signal consumer surface registered for target instance {instance} in composition {composition}"
310    )]
311    UnwiredConsumer {
312        composition: CompositionId,
313        instance: MachineInstanceId,
314    },
315    /// The consumer surface rejected the typed signal. The inner
316    /// [`ConsumerError`] preserves the consumer's stable `error_code` across
317    /// the dispatch seam instead of flattening it into an untyped string.
318    #[error("consumer {instance} refused signal {variant}: {error}")]
319    ConsumerRefused {
320        instance: MachineInstanceId,
321        variant: SignalVariantId,
322        error: ConsumerError,
323    },
324}
325
326/// Opaque, typed consumer-side rejection.
327///
328/// The consumer surface owns a typed kernel error; the dispatcher must not
329/// flatten it into a bare string and lose the stable discriminant. This
330/// newtype is the dispatcher-facing projection of that typed error: it carries
331/// the consumer's stable [`ConsumerError::error_code`] (the same
332/// `&'static str` convention used by `meerkat_core::error`) alongside a
333/// human-readable message. The dispatcher moves it across the seam verbatim,
334/// so [`DispatchRefusal::ConsumerRefused`] / [`SignalDispatchRefusal::ConsumerRefused`]
335/// expose the typed code rather than re-parsing a flattened reason string.
336#[derive(Debug, Clone, PartialEq, Eq, Error)]
337#[error("{message} [{error_code}]")]
338pub struct ConsumerError {
339    /// Stable discriminant the consumer kernel owns (e.g. the consumer's
340    /// own typed `error_code()`). Opaque to the dispatcher, but enumerable
341    /// by callers and RMAT audits without parsing the message.
342    error_code: &'static str,
343    /// Human-readable rejection detail.
344    message: String,
345}
346
347impl ConsumerError {
348    /// Build a consumer rejection from the consumer kernel's stable
349    /// `error_code` and a human-readable message.
350    pub fn new(error_code: &'static str, message: impl Into<String>) -> Self {
351        Self {
352            error_code,
353            message: message.into(),
354        }
355    }
356
357    /// Stable typed discriminant preserved across the dispatch seam.
358    pub fn error_code(&self) -> &'static str {
359        self.error_code
360    }
361
362    /// Human-readable rejection detail.
363    pub fn message(&self) -> &str {
364        &self.message
365    }
366}
367
368impl From<String> for ConsumerError {
369    /// Consumer-surface input projection/refusal detail that the kernel
370    /// produced as a bare string is carried across the dispatch seam under a
371    /// stable `consumer_projection_failed` discriminant (the string stays as
372    /// human-readable detail, but the typed code is what the dispatcher and
373    /// RMAT audits observe — never a re-parsed message).
374    fn from(message: String) -> Self {
375        Self::new("consumer_projection_failed", message)
376    }
377}
378
379/// Delivery surface for one consumer instance inside a composition.
380///
381/// A consumer (e.g. the `meerkat` machine instance when mob routes
382/// `RequestRuntimeBinding` at it) implements this trait and registers an
383/// instance at composition wire-up. The dispatcher invokes it exactly once
384/// per resolved [`RoutedInput`]. The implementation is responsible for
385/// materializing the consumer-side typed input — the dispatcher only moves
386/// typed data across the seam.
387#[async_trait]
388pub trait ConsumerSurface: Send + Sync {
389    /// Instance id this surface serves. The dispatcher matches against
390    /// [`RoutedInput::instance_id`] to pick the right surface.
391    fn instance_id(&self) -> &MachineInstanceId;
392
393    /// Apply a typed routed input. `projected_fields` carries the per-
394    /// consumer-field values resolved from the producer via the route's
395    /// field-bindings, owned so the surface can move them into its typed
396    /// input constructor.
397    async fn apply_routed_input(
398        &self,
399        variant: InputVariantId,
400        projected_fields: Vec<(FieldId, OwnedFieldValue)>,
401    ) -> Result<(), ConsumerError>;
402}
403
404/// Delivery surface for one signal-consuming instance inside a composition.
405#[async_trait]
406pub trait SignalConsumerSurface: Send + Sync {
407    /// Instance id this surface serves.
408    fn instance_id(&self) -> &MachineInstanceId;
409
410    /// Receive a typed routed signal.
411    async fn receive_signal(
412        &self,
413        variant: SignalVariantId,
414        projected_fields: Vec<(FieldId, OwnedFieldValue)>,
415    ) -> Result<(), ConsumerError>;
416}
417
418/// Owned counterpart of [`FieldValue`] used when delivering a routed input
419/// across the consumer-surface boundary. Moving owned values means the
420/// consumer can construct its typed input without re-borrowing the
421/// producer.
422#[derive(Debug, Clone)]
423pub enum OwnedFieldValue {
424    Str(String),
425    U64(u64),
426    I64(i64),
427    Bool(bool),
428    Opaque(Arc<dyn std::any::Any + Send + Sync>),
429}
430
431impl FieldValue<'_> {
432    /// Lift a borrowed field value into its owned counterpart, cloning the
433    /// backing `&str` when required. The [`Arc<dyn Any>`] path is shared,
434    /// not cloned.
435    pub fn to_owned_value(&self) -> OwnedFieldValue {
436        match self {
437            FieldValue::Str(s) => OwnedFieldValue::Str((*s).to_owned()),
438            FieldValue::U64(v) => OwnedFieldValue::U64(*v),
439            FieldValue::I64(v) => OwnedFieldValue::I64(*v),
440            FieldValue::Bool(v) => OwnedFieldValue::Bool(*v),
441            FieldValue::Opaque(handle) => OwnedFieldValue::Opaque(Arc::clone(handle)),
442        }
443    }
444}
445
446/// Composition dispatcher trait.
447///
448/// Monomorphized over the producer composition's seam-effect sum
449/// ([`CompositionDispatcher::Effect`]). Making the effect an associated type
450/// (rather than a generic on the method) keeps the trait dyn-safe — a
451/// `MeerkatMachine` can hold `Arc<dyn CompositionDispatcher<Effect = ...>>`
452/// without leaking the machine kernel's monomorphization concerns.
453#[async_trait]
454pub trait CompositionDispatcher: Send + Sync {
455    /// Seam-effect sum this dispatcher handles. Matches the codegen-emitted
456    /// `{Composition}Effect` enum.
457    type Effect: ProducerEffect;
458
459    /// Composition id this dispatcher owns. Every [`ProducerInstance`]
460    /// passed to [`CompositionDispatcher::dispatch`] must match.
461    fn composition(&self) -> &CompositionId;
462
463    /// Dispatch a routed effect. Returns [`DispatchOutcome`] on success or
464    /// a typed [`DispatchRefusal`]. There is no silent-drop arm.
465    async fn dispatch(
466        &self,
467        producer: ProducerInstance,
468        effect: EffectPayload<Self::Effect>,
469    ) -> Result<DispatchOutcome, DispatchRefusal>;
470}
471
472/// Composition signal dispatcher trait.
473#[async_trait]
474pub trait CompositionSignalDispatcher: Send + Sync {
475    /// Seam-signal source sum this dispatcher handles.
476    type Signal: ProducerSignal;
477
478    /// Composition id this dispatcher owns.
479    fn composition(&self) -> &CompositionId;
480
481    /// Dispatch a routed signal. Returns [`SignalDispatchOutcome`] on
482    /// success or a typed [`SignalDispatchRefusal`].
483    async fn dispatch_signal(
484        &self,
485        producer: ProducerInstance,
486        signal: SignalPayload<Self::Signal>,
487    ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal>;
488}
489
490/// Typed, owner-supplied context provider for an [`OwnerProvided`][op] binding.
491///
492/// Issue #342 — some routes need consumer-side fields that aren't in the
493/// producer's effect body (the canonical case is `session_id` on the
494/// `meerkat_mob_seam` composition: the mob effect doesn't carry it, but
495/// the consumer's applied input requires it). Rather than smuggle that
496/// state through a `serde_json::Value` side channel, the runtime that
497/// owns the dispatcher supplies it through a typed context provider.
498///
499/// **Exactly one method, no `serde_json::Value` in the signature.** The
500/// returned fields are typed [`OwnedFieldValue`]s keyed by
501/// [`FieldId`] — the same representation the route-binding table already
502/// uses for producer-field projections. The dispatcher can merge the
503/// provider's fields with producer-projected fields when constructing
504/// the typed input for a `ConsumerSurface`.
505///
506/// Implementations are synchronous and infallible: context retrieval
507/// should be an in-process lookup against state the runtime already
508/// owns (pinned session id, realm id, bind-epoch, …). Anything that
509/// could fail belongs on the producer effect body or on the consumer
510/// surface.
511///
512/// [op]: CompositionBinding::OwnerProvided
513pub trait ContextProvider<E: ProducerEffect>: Send + Sync {
514    /// Produce the owner-supplied typed context fields for a routed
515    /// `effect` emitted by `producer`.
516    ///
517    /// The returned vector's `FieldId`s must match the route's
518    /// [`BindingSource::ContextField`][bs] references declared in the
519    /// composition schema (#342). Missing ids surface as
520    /// [`DispatchRefusal::MissingProducerField`] at the dispatcher in
521    /// the same way unfulfilled producer fields do — the dispatcher
522    /// treats producer and owner-provided fields uniformly once
523    /// projection starts.
524    ///
525    /// [bs]: # "See issue #342: BindingSource gains ContextField(FieldId)"
526    fn provide_context(
527        &self,
528        producer: &ProducerInstance,
529        effect: &EffectPayload<E>,
530    ) -> Vec<(FieldId, OwnedFieldValue)>;
531}
532
533/// Typed binding attached to a runtime that holds a dispatcher.
534///
535/// Discriminates the "machine participates in a composition" case from the
536/// "machine is standalone" case *at the type level*: no
537/// `Option<Arc<dyn CompositionDispatcher>>`. Callers obtain the concrete
538/// dispatcher via [`CompositionBinding::wired`] and honor
539/// [`CompositionBinding::is_standalone`] to tell the two apart. The two
540/// constructor halves on `MeerkatMachine` (`with_composition(...)` vs
541/// `standalone(...)` / `ephemeral()` / `persistent()`) are the public
542/// face of this distinction.
543///
544/// **OwnerProvided (#342)**: some routes need consumer-side fields that
545/// aren't in the producer effect body — the canonical case is
546/// `session_id` on the `meerkat_mob_seam` composition. The
547/// `OwnerProvided` variant pairs a dispatcher with a typed
548/// [`ContextProvider`] so the runtime that owns the dispatcher supplies
549/// the missing fields from its own typed state at dispatch time.
550/// `OwnerProvided` is semantically a superset of `Wired`: callers that
551/// only need the dispatcher reach it through the same
552/// [`wired`](Self::wired) accessor; callers that need the context
553/// provider reach it through
554/// [`context_provider`](Self::context_provider), which returns `Some`
555/// only for `OwnerProvided`.
556pub enum CompositionBinding<E: ProducerEffect> {
557    /// Machine is not part of a composition. Routed-effect dispatch is not
558    /// available.
559    Standalone,
560    /// Machine participates in a composition and owns a typed dispatcher.
561    /// No owner-supplied context: all route bindings project from the
562    /// producer's effect body.
563    Wired(Arc<dyn CompositionDispatcher<Effect = E>>),
564    /// Machine participates in a composition that declares routes with
565    /// owner-supplied context (issue #342). The `context` is consulted
566    /// alongside the producer effect at dispatch time to fulfil route
567    /// bindings whose source is `ContextField` rather than
568    /// `ProducerField`.
569    OwnerProvided {
570        dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
571        context: Arc<dyn ContextProvider<E>>,
572    },
573}
574
575impl<E: ProducerEffect> fmt::Debug for CompositionBinding<E> {
576    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577        match self {
578            Self::Standalone => f.debug_struct("CompositionBinding::Standalone").finish(),
579            Self::Wired(_) => f
580                .debug_struct("CompositionBinding::Wired")
581                .field("dispatcher", &"<dyn CompositionDispatcher>")
582                .finish(),
583            Self::OwnerProvided { .. } => f
584                .debug_struct("CompositionBinding::OwnerProvided")
585                .field("dispatcher", &"<dyn CompositionDispatcher>")
586                .field("context", &"<dyn ContextProvider>")
587                .finish(),
588        }
589    }
590}
591
592impl<E: ProducerEffect> CompositionBinding<E> {
593    /// Construct a `Standalone` binding.
594    ///
595    /// Mirrors `MeerkatMachine::standalone(...)` at the binding level so
596    /// call sites that wire a runtime without composition can say so
597    /// positively instead of spelling the enum variant. Equivalent to
598    /// `CompositionBinding::Standalone`.
599    pub fn standalone() -> Self {
600        Self::Standalone
601    }
602
603    /// Construct a `Wired` binding from a composition dispatcher.
604    ///
605    /// Use this when every route binding projects from the producer
606    /// effect body alone. If any route declares an owner-supplied
607    /// context field, use [`Self::owner_provided`] instead.
608    pub fn wired_with(dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>) -> Self {
609        Self::Wired(dispatcher)
610    }
611
612    /// Construct an `OwnerProvided` binding from a composition
613    /// dispatcher and a typed context provider.
614    ///
615    /// Use this for compositions whose route bindings reference owner-
616    /// supplied context fields (per issue #342) — the provider is
617    /// consulted at dispatch time for each routed effect so the
618    /// missing fields can be fulfilled from the runtime's own state.
619    pub fn owner_provided(
620        dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
621        context: Arc<dyn ContextProvider<E>>,
622    ) -> Self {
623        Self::OwnerProvided {
624            dispatcher,
625            context,
626        }
627    }
628
629    /// Report whether this machine is standalone (no composition attached).
630    pub fn is_standalone(&self) -> bool {
631        matches!(self, Self::Standalone)
632    }
633
634    /// Borrow the wired dispatcher, if any.
635    ///
636    /// Returns `None` only for [`CompositionBinding::Standalone`].
637    /// Both `Wired` and `OwnerProvided` expose their dispatcher through
638    /// this accessor so call sites that only need to dispatch a typed
639    /// effect don't have to branch on context-provider presence — the
640    /// type split exists so this is enforced at the construction
641    /// boundary, not re-checked at every call site.
642    pub fn wired(&self) -> Option<&Arc<dyn CompositionDispatcher<Effect = E>>> {
643        match self {
644            Self::Standalone => None,
645            Self::Wired(d) => Some(d),
646            Self::OwnerProvided { dispatcher, .. } => Some(dispatcher),
647        }
648    }
649
650    /// Borrow the owner-supplied [`ContextProvider`], if any.
651    ///
652    /// Returns `Some` only for [`CompositionBinding::OwnerProvided`].
653    /// `Standalone` has no dispatcher; `Wired` has a dispatcher but no
654    /// owner-supplied context, so callers that walk route bindings and
655    /// encounter a `ContextField` source on a `Wired` binding should
656    /// surface a typed refusal rather than silently treat it as an
657    /// empty context.
658    pub fn context_provider(&self) -> Option<&Arc<dyn ContextProvider<E>>> {
659        match self {
660            Self::Standalone | Self::Wired(_) => None,
661            Self::OwnerProvided { context, .. } => Some(context),
662        }
663    }
664}
665
666/// Default catalog-backed dispatcher.
667///
668/// Consumes a [`RouteTable`] (built from a
669/// [`meerkat_machine_schema::CompositionSchema`]) plus a map of consumer
670/// surfaces keyed by [`MachineInstanceId`]. Every routed effect goes
671/// through the same three steps:
672///
673/// 1. Look up the input-kind route for `(producer.instance_id, effect.variant)`.
674/// 2. Project the producer's field values into the consumer-field bindings.
675/// 3. Deliver via the consumer surface registered for the target instance.
676///
677/// No step has a silent-drop fallback. Unresolved routes, signal-kind
678/// targets, missing producer fields, and unwired consumers are all typed
679/// [`DispatchRefusal`] errors.
680pub struct CatalogCompositionDispatcher<E: ProducerEffect> {
681    composition: CompositionId,
682    table: RouteTable,
683    consumers: HashMap<MachineInstanceId, Arc<dyn ConsumerSurface>>,
684    _effect: std::marker::PhantomData<fn(E)>,
685}
686
687/// Default catalog-backed signal dispatcher.
688///
689/// This is the signal-kind mirror of [`CatalogCompositionDispatcher`]:
690/// it consumes the same [`RouteTable`] but resolves through the signal
691/// index and delivers to [`SignalConsumerSurface`].
692pub struct CatalogCompositionSignalDispatcher<S: ProducerSignal> {
693    composition: CompositionId,
694    table: RouteTable,
695    consumers: HashMap<MachineInstanceId, Arc<dyn SignalConsumerSurface>>,
696    _signal: std::marker::PhantomData<fn(S)>,
697}
698
699impl<S: ProducerSignal> fmt::Debug for CatalogCompositionSignalDispatcher<S> {
700    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
701        f.debug_struct("CatalogCompositionSignalDispatcher")
702            .field("composition", &self.composition)
703            .field("signal_routes", &self.table.signal_route_count())
704            .field("consumers", &self.consumers.len())
705            .finish()
706    }
707}
708
709impl<E: ProducerEffect> fmt::Debug for CatalogCompositionDispatcher<E> {
710    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711        f.debug_struct("CatalogCompositionDispatcher")
712            .field("composition", &self.composition)
713            .field("routes", &self.table.len())
714            .field("consumers", &self.consumers.len())
715            .finish()
716    }
717}
718
719impl<E: ProducerEffect> CatalogCompositionDispatcher<E> {
720    /// Build a new dispatcher for `composition`, using `table` as the typed
721    /// route index.
722    pub fn new(composition: CompositionId, table: RouteTable) -> Self {
723        Self {
724            composition,
725            table,
726            consumers: HashMap::new(),
727            _effect: std::marker::PhantomData,
728        }
729    }
730
731    /// Register a consumer surface for a target instance.
732    ///
733    /// Panics are impossible — duplicate registrations replace the prior
734    /// entry. (Duplicate wiring is a construction bug; the callers in
735    /// wave-b prove registration happens exactly once per instance in the
736    /// composition schema.)
737    pub fn with_consumer(mut self, surface: Arc<dyn ConsumerSurface>) -> Self {
738        self.consumers
739            .insert(surface.instance_id().clone(), surface);
740        self
741    }
742}
743
744impl<S: ProducerSignal> CatalogCompositionSignalDispatcher<S> {
745    /// Build a new signal dispatcher for `composition`.
746    pub fn new(composition: CompositionId, table: RouteTable) -> Self {
747        Self {
748            composition,
749            table,
750            consumers: HashMap::new(),
751            _signal: std::marker::PhantomData,
752        }
753    }
754
755    /// Register a signal consumer surface for a target instance.
756    pub fn with_consumer(mut self, surface: Arc<dyn SignalConsumerSurface>) -> Self {
757        self.consumers
758            .insert(surface.instance_id().clone(), surface);
759        self
760    }
761}
762
763#[async_trait]
764impl<E: ProducerEffect> CompositionDispatcher for CatalogCompositionDispatcher<E> {
765    type Effect = E;
766
767    fn composition(&self) -> &CompositionId {
768        &self.composition
769    }
770
771    async fn dispatch(
772        &self,
773        producer: ProducerInstance,
774        effect: EffectPayload<Self::Effect>,
775    ) -> Result<DispatchOutcome, DispatchRefusal> {
776        if producer.composition != self.composition {
777            return Err(DispatchRefusal::CompositionMismatch {
778                expected: self.composition.clone(),
779                actual: producer.composition,
780            });
781        }
782
783        let variant = effect.variant().clone();
784        let body = effect.body();
785
786        let descriptor = self
787            .table
788            .resolve(&producer.instance_id, &variant)
789            .ok_or_else(|| DispatchRefusal::UnresolvedRoute {
790                composition: self.composition.clone(),
791                instance: producer.instance_id.clone(),
792                variant: variant.clone(),
793            })?;
794
795        let mut projected: Vec<(FieldId, OwnedFieldValue)> =
796            Vec::with_capacity(descriptor.bindings.len());
797        for (from_field, to_field) in &descriptor.bindings {
798            let value =
799                body.field(from_field)
800                    .ok_or_else(|| DispatchRefusal::MissingProducerField {
801                        route: descriptor.route_id.clone(),
802                        variant: variant.clone(),
803                        field: from_field.clone(),
804                    })?;
805            projected.push((to_field.clone(), value.to_owned_value()));
806        }
807
808        let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
809            DispatchRefusal::UnwiredConsumer {
810                composition: self.composition.clone(),
811                instance: descriptor.instance_id.clone(),
812            }
813        })?;
814
815        consumer
816            .apply_routed_input(descriptor.input_variant.clone(), projected)
817            .await
818            .map_err(|error| DispatchRefusal::ConsumerRefused {
819                instance: descriptor.instance_id.clone(),
820                variant: descriptor.input_variant.clone(),
821                error,
822            })?;
823
824        Ok(DispatchOutcome {
825            route: RouteKey {
826                composition: self.composition.clone(),
827                route_id: descriptor.route_id.clone(),
828            },
829            consumer: descriptor.instance_id.clone(),
830            applied_input: descriptor.input_variant.clone(),
831        })
832    }
833}
834
835#[async_trait]
836impl<S: ProducerSignal> CompositionSignalDispatcher for CatalogCompositionSignalDispatcher<S> {
837    type Signal = S;
838
839    fn composition(&self) -> &CompositionId {
840        &self.composition
841    }
842
843    async fn dispatch_signal(
844        &self,
845        producer: ProducerInstance,
846        signal: SignalPayload<Self::Signal>,
847    ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal> {
848        if producer.composition != self.composition {
849            return Err(SignalDispatchRefusal::CompositionMismatch {
850                expected: self.composition.clone(),
851                actual: producer.composition,
852            });
853        }
854
855        let variant = signal.variant().clone();
856        let body = signal.body();
857
858        let descriptor = self
859            .table
860            .resolve_signal(&producer.instance_id, &variant)
861            .ok_or_else(|| SignalDispatchRefusal::UnresolvedRoute {
862                composition: self.composition.clone(),
863                instance: producer.instance_id.clone(),
864                variant: variant.clone(),
865            })?;
866
867        let mut projected: Vec<(FieldId, OwnedFieldValue)> =
868            Vec::with_capacity(descriptor.bindings.len());
869        for (from_field, to_field) in &descriptor.bindings {
870            let value = body.field(from_field).ok_or_else(|| {
871                SignalDispatchRefusal::MissingProducerField {
872                    route: descriptor.route_id.clone(),
873                    variant: variant.clone(),
874                    field: from_field.clone(),
875                }
876            })?;
877            projected.push((to_field.clone(), value.to_owned_value()));
878        }
879
880        let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
881            SignalDispatchRefusal::UnwiredConsumer {
882                composition: self.composition.clone(),
883                instance: descriptor.instance_id.clone(),
884            }
885        })?;
886
887        consumer
888            .receive_signal(descriptor.signal_variant.clone(), projected)
889            .await
890            .map_err(|error| SignalDispatchRefusal::ConsumerRefused {
891                instance: descriptor.instance_id.clone(),
892                variant: descriptor.signal_variant.clone(),
893                error,
894            })?;
895
896        Ok(SignalDispatchOutcome {
897            route: RouteKey {
898                composition: self.composition.clone(),
899                route_id: descriptor.route_id.clone(),
900            },
901            consumer: descriptor.instance_id.clone(),
902            applied_signal: descriptor.signal_variant.clone(),
903        })
904    }
905}
906
907#[cfg(test)]
908mod tests {
909    use super::*;
910    use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
911
912    /// Hand-written stand-in for the codegen-emitted `MeerkatMobSeamEffect`
913    /// sum. Matches the shape the B-4b tests pin for the live catalog:
914    /// one variant per producer instance, each wrapping a typed effect
915    /// body (we cover the `RequestRuntimeBinding` arm for the dispatcher
916    /// path).
917    #[derive(Debug, Clone, PartialEq, Eq)]
918    enum SeamEffect {
919        Mob(MobEffect),
920    }
921
922    #[derive(Debug, Clone, PartialEq, Eq)]
923    enum MobEffect {
924        RequestRuntimeBinding {
925            agent_runtime_id: String,
926            fence_token: u64,
927            generation: u64,
928            session_id: String,
929        },
930    }
931
932    impl ProducerEffect for SeamEffect {
933        fn variant_id(&self) -> EffectVariantId {
934            match self {
935                Self::Mob(MobEffect::RequestRuntimeBinding { .. }) => {
936                    EffectVariantId::parse("RequestRuntimeBinding").expect("slug")
937                }
938            }
939        }
940
941        fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
942            match self {
943                Self::Mob(MobEffect::RequestRuntimeBinding {
944                    agent_runtime_id,
945                    fence_token,
946                    generation,
947                    session_id,
948                }) => match id.as_str() {
949                    "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
950                    "fence_token" => Some(FieldValue::U64(*fence_token)),
951                    "generation" => Some(FieldValue::U64(*generation)),
952                    "session_id" => Some(FieldValue::Str(session_id)),
953                    _ => None,
954                },
955            }
956        }
957    }
958
959    /// Hand-written stand-in for the codegen-emitted signal source sum.
960    /// These are the MeerkatMachine routed lifecycle effects that the
961    /// `meerkat_mob_seam` schema routes to MobMachine signals.
962    #[allow(clippy::enum_variant_names)]
963    #[derive(Debug, Clone, PartialEq, Eq)]
964    enum SeamSignal {
965        RuntimeBound {
966            agent_runtime_id: String,
967            fence_token: u64,
968        },
969        RuntimeRetired {
970            agent_runtime_id: String,
971            fence_token: u64,
972        },
973        RuntimeDestroyed {
974            agent_runtime_id: String,
975            fence_token: u64,
976        },
977    }
978
979    impl ProducerSignal for SeamSignal {
980        fn variant_id(&self) -> EffectVariantId {
981            let slug = match self {
982                Self::RuntimeBound { .. } => "RuntimeBound",
983                Self::RuntimeRetired { .. } => "RuntimeRetired",
984                Self::RuntimeDestroyed { .. } => "RuntimeDestroyed",
985            };
986            EffectVariantId::parse(slug).expect("signal source slug")
987        }
988
989        fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
990            let (agent_runtime_id, fence_token) = match self {
991                Self::RuntimeBound {
992                    agent_runtime_id,
993                    fence_token,
994                }
995                | Self::RuntimeRetired {
996                    agent_runtime_id,
997                    fence_token,
998                }
999                | Self::RuntimeDestroyed {
1000                    agent_runtime_id,
1001                    fence_token,
1002                } => (agent_runtime_id, fence_token),
1003            };
1004            match id.as_str() {
1005                "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
1006                "fence_token" => Some(FieldValue::U64(*fence_token)),
1007                _ => None,
1008            }
1009        }
1010    }
1011
1012    #[derive(Default)]
1013    struct RecordingMeerkatSurface {
1014        log: tokio::sync::Mutex<Vec<(InputVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
1015    }
1016
1017    #[async_trait]
1018    impl ConsumerSurface for RecordingMeerkatSurface {
1019        fn instance_id(&self) -> &MachineInstanceId {
1020            // leak is fine in tests: we want a 'static reference; the
1021            // instance_id is stable for the lifetime of the test binary.
1022            static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
1023            ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
1024        }
1025
1026        async fn apply_routed_input(
1027            &self,
1028            variant: InputVariantId,
1029            projected_fields: Vec<(FieldId, OwnedFieldValue)>,
1030        ) -> Result<(), ConsumerError> {
1031            self.log.lock().await.push((variant, projected_fields));
1032            Ok(())
1033        }
1034    }
1035
1036    #[derive(Default)]
1037    struct RecordingMobSignalSurface {
1038        log: tokio::sync::Mutex<Vec<(SignalVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
1039    }
1040
1041    #[async_trait]
1042    impl SignalConsumerSurface for RecordingMobSignalSurface {
1043        fn instance_id(&self) -> &MachineInstanceId {
1044            static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
1045            ID.get_or_init(|| MachineInstanceId::parse("mob").unwrap())
1046        }
1047
1048        async fn receive_signal(
1049            &self,
1050            variant: SignalVariantId,
1051            projected_fields: Vec<(FieldId, OwnedFieldValue)>,
1052        ) -> Result<(), ConsumerError> {
1053            self.log.lock().await.push((variant, projected_fields));
1054            Ok(())
1055        }
1056    }
1057
1058    fn mob_producer() -> ProducerInstance {
1059        ProducerInstance {
1060            composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1061            instance_id: MachineInstanceId::parse("mob").unwrap(),
1062            machine: MachineId::parse("MobMachine").unwrap(),
1063        }
1064    }
1065
1066    fn meerkat_producer() -> ProducerInstance {
1067        ProducerInstance {
1068            composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1069            instance_id: MachineInstanceId::parse("meerkat").unwrap(),
1070            machine: MachineId::parse("MeerkatMachine").unwrap(),
1071        }
1072    }
1073
1074    fn sample_effect() -> EffectPayload<SeamEffect> {
1075        EffectPayload::Emitted {
1076            variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1077            body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1078                agent_runtime_id: "rt-1".into(),
1079                fence_token: 7,
1080                generation: 3,
1081                session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1082            }),
1083        }
1084    }
1085
1086    fn build_dispatcher(
1087        consumer: Arc<RecordingMeerkatSurface>,
1088    ) -> CatalogCompositionDispatcher<SeamEffect> {
1089        let schema = meerkat_mob_seam_composition();
1090        let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1091        CatalogCompositionDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1092    }
1093
1094    fn sample_signal() -> SignalPayload<SeamSignal> {
1095        let body = SeamSignal::RuntimeBound {
1096            agent_runtime_id: "rt-1".into(),
1097            fence_token: 7,
1098        };
1099        SignalPayload::Emitted {
1100            variant: body.variant_id(),
1101            body,
1102        }
1103    }
1104
1105    fn build_signal_dispatcher(
1106        consumer: Arc<RecordingMobSignalSurface>,
1107    ) -> CatalogCompositionSignalDispatcher<SeamSignal> {
1108        let schema = meerkat_mob_seam_composition();
1109        let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1110        CatalogCompositionSignalDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1111    }
1112
1113    #[tokio::test]
1114    async fn dispatches_mob_routed_effect_to_meerkat_consumer() {
1115        let consumer = Arc::new(RecordingMeerkatSurface::default());
1116        let dispatcher = build_dispatcher(Arc::clone(&consumer));
1117
1118        let outcome = dispatcher
1119            .dispatch(mob_producer(), sample_effect())
1120            .await
1121            .expect("well-formed routed effect");
1122
1123        assert_eq!(outcome.consumer.as_str(), "meerkat");
1124        assert_eq!(outcome.applied_input.as_str(), "PrepareBindings");
1125        assert_eq!(
1126            outcome.route.route_id.as_str(),
1127            "binding_request_reaches_meerkat"
1128        );
1129
1130        let log = consumer.log.lock().await;
1131        assert_eq!(
1132            log.len(),
1133            1,
1134            "dispatcher must call the consumer exactly once"
1135        );
1136        let (variant, fields) = &log[0];
1137        assert_eq!(variant.as_str(), "PrepareBindings");
1138        let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1139        assert_eq!(
1140            field_names,
1141            vec![
1142                "agent_runtime_id",
1143                "fence_token",
1144                "generation",
1145                "session_id"
1146            ]
1147        );
1148        match &fields[0].1 {
1149            OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1150            other => panic!("expected Str, got {other:?}"),
1151        }
1152        match &fields[1].1 {
1153            OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1154            other => panic!("expected U64, got {other:?}"),
1155        }
1156        match &fields[2].1 {
1157            OwnedFieldValue::U64(v) => assert_eq!(*v, 3),
1158            other => panic!("expected U64, got {other:?}"),
1159        }
1160        match &fields[3].1 {
1161            OwnedFieldValue::Str(s) => assert_eq!(s, "019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
1162            other => panic!("expected Str for session_id, got {other:?}"),
1163        }
1164    }
1165
1166    #[tokio::test]
1167    async fn dispatches_meerkat_routed_signal_to_mob_consumer() {
1168        let consumer = Arc::new(RecordingMobSignalSurface::default());
1169        let dispatcher = build_signal_dispatcher(Arc::clone(&consumer));
1170
1171        let outcome = dispatcher
1172            .dispatch_signal(meerkat_producer(), sample_signal())
1173            .await
1174            .expect("well-formed routed signal");
1175
1176        assert_eq!(outcome.consumer.as_str(), "mob");
1177        assert_eq!(outcome.applied_signal.as_str(), "ObserveRuntimeReady");
1178        assert_eq!(outcome.route.route_id.as_str(), "runtime_bound_reaches_mob");
1179
1180        let log = consumer.log.lock().await;
1181        assert_eq!(
1182            log.len(),
1183            1,
1184            "dispatcher must call the signal consumer exactly once"
1185        );
1186        let (variant, fields) = &log[0];
1187        assert_eq!(variant.as_str(), "ObserveRuntimeReady");
1188        let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1189        assert_eq!(field_names, vec!["agent_runtime_id", "fence_token"]);
1190        match &fields[0].1 {
1191            OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1192            other => panic!("expected Str, got {other:?}"),
1193        }
1194        match &fields[1].1 {
1195            OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1196            other => panic!("expected U64, got {other:?}"),
1197        }
1198    }
1199
1200    #[tokio::test]
1201    async fn signal_dispatch_refuses_input_route_typed() {
1202        let consumer = Arc::new(RecordingMobSignalSurface::default());
1203        let dispatcher = build_signal_dispatcher(consumer);
1204
1205        let payload = SignalPayload::Emitted {
1206            variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1207            body: SeamSignal::RuntimeBound {
1208                agent_runtime_id: "rt-1".into(),
1209                fence_token: 7,
1210            },
1211        };
1212
1213        let err = dispatcher
1214            .dispatch_signal(mob_producer(), payload)
1215            .await
1216            .expect_err("input route is out of the signal surface");
1217
1218        assert!(matches!(err, SignalDispatchRefusal::UnresolvedRoute { .. }));
1219    }
1220
1221    #[tokio::test]
1222    async fn signal_dispatch_refuses_unwired_consumer_typed() {
1223        let schema = meerkat_mob_seam_composition();
1224        let table = RouteTable::from_schema(&schema).unwrap();
1225        let dispatcher: CatalogCompositionSignalDispatcher<SeamSignal> =
1226            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table);
1227
1228        let err = dispatcher
1229            .dispatch_signal(meerkat_producer(), sample_signal())
1230            .await
1231            .expect_err("unwired signal consumer");
1232
1233        assert!(matches!(err, SignalDispatchRefusal::UnwiredConsumer { .. }));
1234    }
1235
1236    #[tokio::test]
1237    async fn signal_dispatch_refuses_missing_field_typed() {
1238        #[derive(Debug)]
1239        struct BrokenSignal;
1240
1241        impl ProducerSignal for BrokenSignal {
1242            fn variant_id(&self) -> EffectVariantId {
1243                EffectVariantId::parse("RuntimeBound").unwrap()
1244            }
1245
1246            fn field(&self, _id: &FieldId) -> Option<FieldValue<'_>> {
1247                None
1248            }
1249        }
1250
1251        let schema = meerkat_mob_seam_composition();
1252        let table = RouteTable::from_schema(&schema).unwrap();
1253        let consumer = Arc::new(RecordingMobSignalSurface::default());
1254        let dispatcher: CatalogCompositionSignalDispatcher<BrokenSignal> =
1255            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
1256                .with_consumer(consumer);
1257
1258        let err = dispatcher
1259            .dispatch_signal(
1260                meerkat_producer(),
1261                SignalPayload::Emitted {
1262                    variant: EffectVariantId::parse("RuntimeBound").unwrap(),
1263                    body: BrokenSignal,
1264                },
1265            )
1266            .await
1267            .expect_err("missing producer field");
1268
1269        assert!(matches!(
1270            err,
1271            SignalDispatchRefusal::MissingProducerField { .. }
1272        ));
1273    }
1274
1275    #[tokio::test]
1276    async fn refuses_mismatched_composition() {
1277        let consumer = Arc::new(RecordingMeerkatSurface::default());
1278        let dispatcher = build_dispatcher(consumer);
1279
1280        let mut wrong = mob_producer();
1281        wrong.composition = CompositionId::parse("some_other_composition").unwrap();
1282
1283        let err = dispatcher
1284            .dispatch(wrong, sample_effect())
1285            .await
1286            .expect_err("composition mismatch");
1287
1288        assert!(matches!(err, DispatchRefusal::CompositionMismatch { .. }));
1289    }
1290
1291    #[tokio::test]
1292    async fn refuses_unrouted_effect_typed() {
1293        let consumer = Arc::new(RecordingMeerkatSurface::default());
1294        let dispatcher = build_dispatcher(consumer);
1295
1296        // The schema has no route for `Mob::UnknownEffect`; use the well-
1297        // formed producer but label the variant with an id that has no
1298        // declared route.
1299        let payload = EffectPayload::Emitted {
1300            variant: EffectVariantId::parse("UnknownEffect").unwrap(),
1301            body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1302                agent_runtime_id: "rt".into(),
1303                fence_token: 0,
1304                generation: 0,
1305                session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1306            }),
1307        };
1308
1309        let err = dispatcher
1310            .dispatch(mob_producer(), payload)
1311            .await
1312            .expect_err("unresolved route");
1313
1314        assert!(matches!(err, DispatchRefusal::UnresolvedRoute { .. }));
1315    }
1316
1317    #[tokio::test]
1318    async fn refuses_unwired_consumer_typed() {
1319        // Build a dispatcher with NO consumer surface registered. The route
1320        // resolves but the delivery step must return UnwiredConsumer, not
1321        // silently succeed.
1322        let schema = meerkat_mob_seam_composition();
1323        let table = RouteTable::from_schema(&schema).unwrap();
1324        let dispatcher: CatalogCompositionDispatcher<SeamEffect> =
1325            CatalogCompositionDispatcher::new(schema.name.clone(), table);
1326
1327        let err = dispatcher
1328            .dispatch(mob_producer(), sample_effect())
1329            .await
1330            .expect_err("unwired consumer");
1331
1332        assert!(matches!(err, DispatchRefusal::UnwiredConsumer { .. }));
1333    }
1334
1335    #[tokio::test]
1336    async fn standalone_binding_has_no_dispatcher() {
1337        let binding: CompositionBinding<SeamEffect> = CompositionBinding::Standalone;
1338        assert!(binding.is_standalone());
1339        assert!(binding.wired().is_none());
1340    }
1341
1342    #[tokio::test]
1343    async fn wired_binding_exposes_dispatcher() {
1344        let consumer = Arc::new(RecordingMeerkatSurface::default());
1345        let dispatcher = Arc::new(build_dispatcher(consumer));
1346        let binding: CompositionBinding<SeamEffect> = CompositionBinding::Wired(dispatcher);
1347        assert!(!binding.is_standalone());
1348        assert!(binding.wired().is_some());
1349        assert!(
1350            binding.context_provider().is_none(),
1351            "plain Wired binding has no owner-supplied context"
1352        );
1353    }
1354
1355    /// Owner-supplied context provider for routes that need typed
1356    /// fields not in the producer effect body. In production this would
1357    /// be a runtime-owned struct (e.g. one carrying a pinned
1358    /// `SessionId`); the test just returns a canned pair to exercise
1359    /// the trait's single-method signature.
1360    struct PinnedSessionContext {
1361        session_id: String,
1362    }
1363
1364    impl ContextProvider<SeamEffect> for PinnedSessionContext {
1365        fn provide_context(
1366            &self,
1367            _producer: &ProducerInstance,
1368            _effect: &EffectPayload<SeamEffect>,
1369        ) -> Vec<(FieldId, OwnedFieldValue)> {
1370            vec![(
1371                FieldId::parse("session_id").expect("field id"),
1372                OwnedFieldValue::Str(self.session_id.clone()),
1373            )]
1374        }
1375    }
1376
1377    #[tokio::test]
1378    async fn owner_provided_binding_exposes_both_dispatcher_and_context() {
1379        let consumer = Arc::new(RecordingMeerkatSurface::default());
1380        let dispatcher = Arc::new(build_dispatcher(consumer));
1381        let context = Arc::new(PinnedSessionContext {
1382            session_id: "session-abc".into(),
1383        });
1384        let binding: CompositionBinding<SeamEffect> =
1385            CompositionBinding::owner_provided(dispatcher, context);
1386
1387        assert!(!binding.is_standalone());
1388        assert!(
1389            binding.wired().is_some(),
1390            "OwnerProvided is a superset of Wired for dispatcher access"
1391        );
1392        assert!(
1393            binding.context_provider().is_some(),
1394            "OwnerProvided must expose the owner-supplied context"
1395        );
1396
1397        // The typed provider returns the expected single owner-supplied
1398        // field. Matches the #342 use case: `session_id` is absent from
1399        // the producer effect body but present in the projected fields
1400        // the consumer needs.
1401        let provider = binding.context_provider().expect("context provider");
1402        let producer = mob_producer();
1403        let effect = sample_effect();
1404        let fields = provider.provide_context(&producer, &effect);
1405        assert_eq!(fields.len(), 1);
1406        assert_eq!(fields[0].0.as_str(), "session_id");
1407        match &fields[0].1 {
1408            OwnedFieldValue::Str(s) => assert_eq!(s, "session-abc"),
1409            other => panic!("expected Str context field, got {other:?}"),
1410        }
1411    }
1412
1413    #[tokio::test]
1414    async fn composition_binding_constructors_parallel_machine_halves() {
1415        // `CompositionBinding::standalone()` is the binding-level mirror
1416        // of `MeerkatMachine::standalone(...)`; `wired_with` and
1417        // `owner_provided` mirror `MeerkatMachine::with_composition(...)`.
1418        // The constructor split exists so call sites say positively which
1419        // half they are wiring, rather than spelling the enum variant.
1420        let standalone: CompositionBinding<SeamEffect> = CompositionBinding::standalone();
1421        assert!(standalone.is_standalone());
1422        assert!(standalone.wired().is_none());
1423        assert!(standalone.context_provider().is_none());
1424
1425        let consumer = Arc::new(RecordingMeerkatSurface::default());
1426        let dispatcher: Arc<dyn CompositionDispatcher<Effect = SeamEffect>> =
1427            Arc::new(build_dispatcher(consumer));
1428        let wired: CompositionBinding<SeamEffect> =
1429            CompositionBinding::wired_with(Arc::clone(&dispatcher));
1430        assert!(!wired.is_standalone());
1431        assert!(wired.wired().is_some());
1432        assert!(wired.context_provider().is_none());
1433
1434        let context = Arc::new(PinnedSessionContext {
1435            session_id: "session-xyz".into(),
1436        });
1437        let owner_provided: CompositionBinding<SeamEffect> =
1438            CompositionBinding::owner_provided(dispatcher, context);
1439        assert!(!owner_provided.is_standalone());
1440        assert!(owner_provided.wired().is_some());
1441        assert!(owner_provided.context_provider().is_some());
1442    }
1443
1444    /// Consumer surface that always refuses with a typed [`ConsumerError`]
1445    /// carrying a known stable `error_code`. Row #33 gate fixture.
1446    struct RefusingMeerkatSurface;
1447
1448    #[async_trait]
1449    impl ConsumerSurface for RefusingMeerkatSurface {
1450        fn instance_id(&self) -> &MachineInstanceId {
1451            static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
1452            ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
1453        }
1454
1455        async fn apply_routed_input(
1456            &self,
1457            _variant: InputVariantId,
1458            _projected_fields: Vec<(FieldId, OwnedFieldValue)>,
1459        ) -> Result<(), ConsumerError> {
1460            Err(ConsumerError::new(
1461                "runtime_destroyed",
1462                "consumer machine no longer accepts inputs",
1463            ))
1464        }
1465    }
1466
1467    /// Row #33 gate: a consumer refusal must preserve the consumer's typed
1468    /// `error_code` through the dispatcher. Under the OLD `Result<(), String>`
1469    /// contract the discriminant was flattened into an opaque message; the
1470    /// dispatcher could only re-parse a string. This asserts the typed code
1471    /// survives on `DispatchRefusal::ConsumerRefused`.
1472    #[tokio::test]
1473    async fn consumer_refusal_preserves_typed_error_code_through_dispatcher() {
1474        let schema = meerkat_mob_seam_composition();
1475        let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1476        let dispatcher = CatalogCompositionDispatcher::new(schema.name.clone(), table)
1477            .with_consumer(Arc::new(RefusingMeerkatSurface));
1478
1479        let err = dispatcher
1480            .dispatch(mob_producer(), sample_effect())
1481            .await
1482            .expect_err("refusing consumer surface");
1483
1484        match err {
1485            DispatchRefusal::ConsumerRefused { error, .. } => {
1486                assert_eq!(
1487                    error.error_code(),
1488                    "runtime_destroyed",
1489                    "typed consumer error_code must survive the dispatch seam, not be flattened to a string"
1490                );
1491            }
1492            other => {
1493                panic!("expected ConsumerRefused carrying a typed ConsumerError, got {other:?}")
1494            }
1495        }
1496    }
1497}