Skip to main content

meerkat_runtime/composition/
mod.rs

1//! Composition dispatcher — THE typed execution path for routed effects.
2//!
3//! Wave-b V2 rebuilds composition dispatch as a typed, *mandatory* runtime seam.
4//! The deleted `composition_dispatch.rs` and `recompute_mob_peer_overlay*.rs`
5//! (wave-a tombstones `ce2dbe35e` / `f5e366f38`) were stringly-typed helpers
6//! that callers opted into. This module is their structural opposite:
7//!
8//! * **Typed end-to-end.** Producer identity is [`ProducerInstance`] carrying
9//!   typed [`CompositionId`], [`MachineInstanceId`], [`MachineId`]. Effects
10//!   travel as [`EffectPayload<E>`] where `E` is the producer composition's
11//!   typed seam-effect sum (the [`ProducerEffect`] trait bound). Route
12//!   resolution returns a typed [`RoutedInputDescriptor`] carrying
13//!   [`MachineInstanceId`] / [`InputVariantId`] / `Vec<(FieldId, FieldId)>`.
14//!   Signal-kind routes travel through the parallel [`SignalPayload<S>`] /
15//!   [`CompositionSignalDispatcher`] surface and resolve to typed
16//!   [`RoutedSignalDescriptor`] values.
17//! * **Mandatory, not optional.** The trait has no fallback surface. Routed
18//!   effects whose route is declared in the composition schema MUST resolve
19//!   through the dispatcher; unresolved routes are a typed
20//!   [`DispatchRefusal::UnresolvedRoute`] error, not a silent drop.
21//!   Signal-kind routes live on a separate index inside [`RouteTable`] and
22//!   MUST resolve through [`CompositionSignalDispatcher`].
23//! * **Compile-time presence/absence.** A `MeerkatMachine` either has a
24//!   composition dispatcher attached (via the `with_composition` constructor)
25//!   or it does not (the standalone / single-machine test path). The two
26//!   cases are distinguished by a typed [`CompositionBinding`] discriminant,
27//!   never by `Option<Arc<dyn CompositionDispatcher>>`.
28//!
29//! The default catalog-backed dispatcher ([`CatalogCompositionDispatcher`])
30//! consumes a [`RouteTable`] built from any
31//! [`meerkat_machine_schema::CompositionSchema`] and delivers each resolved
32//! [`RoutedInputDescriptor`] to a per-consumer-instance [`ConsumerSurface`]
33//! supplied at wire-up. The per-composition codegen module emitted by
34//! `meerkat-machine-codegen` (B-4 + B-4b) plugs in as the
35//! [`ProducerEffect`] implementation — `route_to_input` is equivalent to
36//! consulting the [`RouteTable`] built from the same schema.
37
38pub mod route_table;
39
40use std::collections::HashMap;
41use std::fmt;
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use meerkat_machine_schema::identity::{
46    CompositionId, EffectVariantId, FieldId, InputVariantId, MachineId, MachineInstanceId, RouteId,
47    SignalVariantId,
48};
49use thiserror::Error;
50
51pub use route_table::{RouteTable, RouteTableError, RoutedInputDescriptor, RoutedSignalDescriptor};
52
53/// Typed identity of the producing machine instance inside a composition.
54///
55/// Unlike the deleted string-keyed helpers, every field is a typed newtype
56/// so cross-instance mixups are rejected at compile time.
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct ProducerInstance {
59    /// Composition that contains the producer.
60    pub composition: CompositionId,
61    /// Instance id of the producer *within* `composition`.
62    pub instance_id: MachineInstanceId,
63    /// Underlying machine name (the schema this instance is an instance of).
64    pub machine: MachineId,
65}
66
67/// Typed effect payload. Generic over the producer composition's seam-effect
68/// sum (see the codegen-emitted `{Composition}Effect` enum).
69///
70/// The variant carries the typed [`EffectVariantId`] alongside the body so
71/// the dispatcher can look up a route without pattern-matching on the
72/// producer's effect enum (the [`ProducerEffect`] trait hides that under
73/// [`ProducerEffect::variant_id`]).
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum EffectPayload<E> {
76    /// Producer emitted a typed effect variant.
77    Emitted {
78        /// Typed variant id (matches the producer's effect enum tag).
79        variant: EffectVariantId,
80        /// The typed effect body.
81        body: E,
82    },
83}
84
85impl<E: ProducerEffect> EffectPayload<E> {
86    /// Borrow the typed variant id.
87    pub fn variant(&self) -> &EffectVariantId {
88        match self {
89            Self::Emitted { variant, .. } => variant,
90        }
91    }
92
93    /// Borrow the typed body.
94    pub fn body(&self) -> &E {
95        match self {
96            Self::Emitted { body, .. } => body,
97        }
98    }
99}
100
101/// Typed signal-route payload. Generic over the producer composition's
102/// seam-signal sum.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum SignalPayload<S> {
105    /// Producer emitted a typed signal-route source variant.
106    Emitted {
107        /// Typed source variant id. In the composition schema this is the
108        /// route's producer-side `effect_variant`; signal-kind routes still
109        /// originate from a producer effect and target a consumer signal.
110        variant: EffectVariantId,
111        /// The typed signal source body.
112        body: S,
113    },
114}
115
116impl<S: ProducerSignal> SignalPayload<S> {
117    /// Borrow the typed source variant id.
118    pub fn variant(&self) -> &EffectVariantId {
119        match self {
120            Self::Emitted { variant, .. } => variant,
121        }
122    }
123
124    /// Borrow the typed body.
125    pub fn body(&self) -> &S {
126        match self {
127            Self::Emitted { body, .. } => body,
128        }
129    }
130}
131
132/// Typed route key: `(composition, route)`.
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct RouteKey {
135    pub composition: CompositionId,
136    pub route_id: RouteId,
137}
138
139/// Marker trait for the seam-effect sum emitted by
140/// `meerkat-machine-codegen::render_composition_driver`. Producer effect
141/// enums implement this to expose the typed variant id alongside their
142/// domain body — the dispatcher consults it without inspecting the enum.
143pub trait ProducerEffect: fmt::Debug + Send + Sync + 'static {
144    /// Typed variant id for this effect value.
145    ///
146    /// The codegen emits one arm per distinct `{producer_instance}::{variant}`
147    /// pair; implementers return the matching [`EffectVariantId`]. This is
148    /// the single handle the dispatcher needs to resolve the route without
149    /// case-matching on the producer's concrete enum.
150    fn variant_id(&self) -> EffectVariantId;
151
152    /// Borrow a field value by [`FieldId`].
153    ///
154    /// Used by the dispatcher to project producer fields into the typed
155    /// consumer input as declared by the composition's route bindings.
156    /// Returns `None` if the requested field is not present on this
157    /// variant. The dispatcher treats that as
158    /// [`DispatchRefusal::MissingProducerField`].
159    fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
160}
161
162/// Marker trait for the seam-signal source sum consumed by
163/// [`CompositionSignalDispatcher`].
164///
165/// Signal-kind composition routes use the same producer-side
166/// `EffectVariantId` namespace as input routes, but their target is a
167/// consumer [`SignalVariantId`]. This trait mirrors [`ProducerEffect`] so
168/// signal dispatch has the same typed projection discipline without
169/// requiring callers to smuggle signal payloads through the input
170/// dispatcher.
171pub trait ProducerSignal: fmt::Debug + Send + Sync + 'static {
172    /// Typed producer-side variant id for this signal source value.
173    fn variant_id(&self) -> EffectVariantId;
174
175    /// Borrow a producer field by [`FieldId`].
176    fn field(&self, id: &FieldId) -> Option<FieldValue<'_>>;
177}
178
179/// Typed view over a producer-field value projected through a route binding.
180///
181/// The `ProducerEffect::field` implementation returns one of these so the
182/// dispatcher can move the value into the consumer input without a
183/// `serde_json::Value` round-trip. The variant set is intentionally small;
184/// richer shapes are expressed by the producer keeping the typed value
185/// inside its effect body and the consumer accepting it via the same
186/// typed enum (the codegen emits the matching types on both sides).
187#[derive(Debug, Clone)]
188pub enum FieldValue<'a> {
189    /// Borrowed string slice (owning producer retains the backing `String`).
190    Str(&'a str),
191    /// Unsigned 64-bit integer.
192    U64(u64),
193    /// Signed 64-bit integer.
194    I64(i64),
195    /// Boolean flag.
196    Bool(bool),
197    /// Opaque typed handle — producer and consumer agree on the Rust type.
198    /// The dispatcher moves the `Arc<dyn Any>` across without inspecting it.
199    /// This is *not* a `serde_json::Value` escape hatch: the contained Rust
200    /// type is determined by the typed route binding, not by ad-hoc JSON.
201    Opaque(Arc<dyn std::any::Any + Send + Sync>),
202}
203
204/// Outcome when a routed effect is successfully dispatched.
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct DispatchOutcome {
207    /// Route that was resolved for this effect.
208    pub route: RouteKey,
209    /// Target consumer instance the typed input was delivered to.
210    pub consumer: MachineInstanceId,
211    /// Typed input variant applied on the consumer.
212    pub applied_input: InputVariantId,
213}
214
215/// Outcome when a routed signal is successfully dispatched.
216#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct SignalDispatchOutcome {
218    /// Route that was resolved for this signal source.
219    pub route: RouteKey,
220    /// Target consumer instance the typed signal was delivered to.
221    pub consumer: MachineInstanceId,
222    /// Typed signal variant applied on the consumer.
223    pub applied_signal: SignalVariantId,
224}
225
226/// Reasons the dispatcher refuses a routed effect.
227///
228/// Unlike the deleted helper path, there is no "silently drop unknown
229/// effects" arm. Every failure is a typed variant so callers and RMAT
230/// audits can enumerate them without parsing error strings.
231#[derive(Debug, Clone, PartialEq, Eq, Error)]
232pub enum DispatchRefusal {
233    /// The producer is not registered for this dispatcher's composition.
234    #[error("dispatcher composition {expected} does not match producer composition {actual}")]
235    CompositionMismatch {
236        expected: CompositionId,
237        actual: CompositionId,
238    },
239    /// No input-kind route is declared for `(producer.instance_id, variant)`.
240    #[error(
241        "no input route declared for producer {instance} effect variant {variant} in composition {composition}"
242    )]
243    UnresolvedRoute {
244        composition: CompositionId,
245        instance: MachineInstanceId,
246        variant: EffectVariantId,
247    },
248    /// A route-binding references a producer field that the effect body did
249    /// not supply (via [`ProducerEffect::field`]).
250    #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
251    MissingProducerField {
252        route: RouteId,
253        variant: EffectVariantId,
254        field: FieldId,
255    },
256    /// No [`ConsumerSurface`] is registered for the resolved target
257    /// instance. This is a wiring bug at construction time, not a runtime
258    /// signal — the dispatcher refuses rather than queueing forever.
259    #[error(
260        "no consumer surface registered for target instance {instance} in composition {composition}"
261    )]
262    UnwiredConsumer {
263        composition: CompositionId,
264        instance: MachineInstanceId,
265    },
266    /// The consumer surface rejected the typed input (e.g. because the
267    /// consumer machine is no longer accepting inputs). The inner message
268    /// is the consumer-side rejection reason and is opaque to the
269    /// dispatcher — typed by the consumer's own error.
270    #[error("consumer {instance} refused input {variant}: {reason}")]
271    ConsumerRefused {
272        instance: MachineInstanceId,
273        variant: InputVariantId,
274        reason: String,
275    },
276}
277
278/// Reasons the signal dispatcher refuses a routed signal.
279#[derive(Debug, Clone, PartialEq, Eq, Error)]
280pub enum SignalDispatchRefusal {
281    /// The producer is not registered for this dispatcher's composition.
282    #[error("dispatcher composition {expected} does not match producer composition {actual}")]
283    CompositionMismatch {
284        expected: CompositionId,
285        actual: CompositionId,
286    },
287    /// No signal-kind route is declared for `(producer.instance_id, variant)`.
288    #[error(
289        "no signal route declared for producer {instance} variant {variant} in composition {composition}"
290    )]
291    UnresolvedRoute {
292        composition: CompositionId,
293        instance: MachineInstanceId,
294        variant: EffectVariantId,
295    },
296    /// A route-binding references a producer field that the signal body
297    /// did not supply.
298    #[error("route {route} requires producer field {field} on variant {variant}, not provided")]
299    MissingProducerField {
300        route: RouteId,
301        variant: EffectVariantId,
302        field: FieldId,
303    },
304    /// No [`SignalConsumerSurface`] is registered for the resolved target
305    /// instance.
306    #[error(
307        "no signal consumer surface registered for target instance {instance} in composition {composition}"
308    )]
309    UnwiredConsumer {
310        composition: CompositionId,
311        instance: MachineInstanceId,
312    },
313    /// The consumer surface rejected the typed signal.
314    #[error("consumer {instance} refused signal {variant}: {reason}")]
315    ConsumerRefused {
316        instance: MachineInstanceId,
317        variant: SignalVariantId,
318        reason: String,
319    },
320}
321
322/// Delivery surface for one consumer instance inside a composition.
323///
324/// A consumer (e.g. the `meerkat` machine instance when mob routes
325/// `RequestRuntimeBinding` at it) implements this trait and registers an
326/// instance at composition wire-up. The dispatcher invokes it exactly once
327/// per resolved [`RoutedInput`]. The implementation is responsible for
328/// materializing the consumer-side typed input — the dispatcher only moves
329/// typed data across the seam.
330#[async_trait]
331pub trait ConsumerSurface: Send + Sync {
332    /// Instance id this surface serves. The dispatcher matches against
333    /// [`RoutedInput::instance_id`] to pick the right surface.
334    fn instance_id(&self) -> &MachineInstanceId;
335
336    /// Apply a typed routed input. `projected_fields` carries the per-
337    /// consumer-field values resolved from the producer via the route's
338    /// field-bindings, owned so the surface can move them into its typed
339    /// input constructor.
340    async fn apply_routed_input(
341        &self,
342        variant: InputVariantId,
343        projected_fields: Vec<(FieldId, OwnedFieldValue)>,
344    ) -> Result<(), String>;
345}
346
347/// Delivery surface for one signal-consuming instance inside a composition.
348#[async_trait]
349pub trait SignalConsumerSurface: Send + Sync {
350    /// Instance id this surface serves.
351    fn instance_id(&self) -> &MachineInstanceId;
352
353    /// Receive a typed routed signal.
354    async fn receive_signal(
355        &self,
356        variant: SignalVariantId,
357        projected_fields: Vec<(FieldId, OwnedFieldValue)>,
358    ) -> Result<(), String>;
359}
360
361/// Owned counterpart of [`FieldValue`] used when delivering a routed input
362/// across the consumer-surface boundary. Moving owned values means the
363/// consumer can construct its typed input without re-borrowing the
364/// producer.
365#[derive(Debug, Clone)]
366pub enum OwnedFieldValue {
367    Str(String),
368    U64(u64),
369    I64(i64),
370    Bool(bool),
371    Opaque(Arc<dyn std::any::Any + Send + Sync>),
372}
373
374impl FieldValue<'_> {
375    /// Lift a borrowed field value into its owned counterpart, cloning the
376    /// backing `&str` when required. The [`Arc<dyn Any>`] path is shared,
377    /// not cloned.
378    pub fn to_owned_value(&self) -> OwnedFieldValue {
379        match self {
380            FieldValue::Str(s) => OwnedFieldValue::Str((*s).to_owned()),
381            FieldValue::U64(v) => OwnedFieldValue::U64(*v),
382            FieldValue::I64(v) => OwnedFieldValue::I64(*v),
383            FieldValue::Bool(v) => OwnedFieldValue::Bool(*v),
384            FieldValue::Opaque(handle) => OwnedFieldValue::Opaque(Arc::clone(handle)),
385        }
386    }
387}
388
389/// Composition dispatcher trait.
390///
391/// Monomorphized over the producer composition's seam-effect sum
392/// ([`CompositionDispatcher::Effect`]). Making the effect an associated type
393/// (rather than a generic on the method) keeps the trait dyn-safe — a
394/// `MeerkatMachine` can hold `Arc<dyn CompositionDispatcher<Effect = ...>>`
395/// without leaking the machine kernel's monomorphization concerns.
396#[async_trait]
397pub trait CompositionDispatcher: Send + Sync {
398    /// Seam-effect sum this dispatcher handles. Matches the codegen-emitted
399    /// `{Composition}Effect` enum.
400    type Effect: ProducerEffect;
401
402    /// Composition id this dispatcher owns. Every [`ProducerInstance`]
403    /// passed to [`CompositionDispatcher::dispatch`] must match.
404    fn composition(&self) -> &CompositionId;
405
406    /// Dispatch a routed effect. Returns [`DispatchOutcome`] on success or
407    /// a typed [`DispatchRefusal`]. There is no silent-drop arm.
408    async fn dispatch(
409        &self,
410        producer: ProducerInstance,
411        effect: EffectPayload<Self::Effect>,
412    ) -> Result<DispatchOutcome, DispatchRefusal>;
413}
414
415/// Composition signal dispatcher trait.
416#[async_trait]
417pub trait CompositionSignalDispatcher: Send + Sync {
418    /// Seam-signal source sum this dispatcher handles.
419    type Signal: ProducerSignal;
420
421    /// Composition id this dispatcher owns.
422    fn composition(&self) -> &CompositionId;
423
424    /// Dispatch a routed signal. Returns [`SignalDispatchOutcome`] on
425    /// success or a typed [`SignalDispatchRefusal`].
426    async fn dispatch_signal(
427        &self,
428        producer: ProducerInstance,
429        signal: SignalPayload<Self::Signal>,
430    ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal>;
431}
432
433/// Typed, owner-supplied context provider for an [`OwnerProvided`][op] binding.
434///
435/// Issue #342 — some routes need consumer-side fields that aren't in the
436/// producer's effect body (the canonical case is `session_id` on the
437/// `meerkat_mob_seam` composition: the mob effect doesn't carry it, but
438/// the consumer's applied input requires it). Rather than smuggle that
439/// state through a `serde_json::Value` side channel, the runtime that
440/// owns the dispatcher supplies it through a typed context provider.
441///
442/// **Exactly one method, no `serde_json::Value` in the signature.** The
443/// returned fields are typed [`OwnedFieldValue`]s keyed by
444/// [`FieldId`] — the same representation the route-binding table already
445/// uses for producer-field projections. The dispatcher can merge the
446/// provider's fields with producer-projected fields when constructing
447/// the typed input for a `ConsumerSurface`.
448///
449/// Implementations are synchronous and infallible: context retrieval
450/// should be an in-process lookup against state the runtime already
451/// owns (pinned session id, realm id, bind-epoch, …). Anything that
452/// could fail belongs on the producer effect body or on the consumer
453/// surface.
454///
455/// [op]: CompositionBinding::OwnerProvided
456pub trait ContextProvider<E: ProducerEffect>: Send + Sync {
457    /// Produce the owner-supplied typed context fields for a routed
458    /// `effect` emitted by `producer`.
459    ///
460    /// The returned vector's `FieldId`s must match the route's
461    /// [`BindingSource::ContextField`][bs] references declared in the
462    /// composition schema (#342). Missing ids surface as
463    /// [`DispatchRefusal::MissingProducerField`] at the dispatcher in
464    /// the same way unfulfilled producer fields do — the dispatcher
465    /// treats producer and owner-provided fields uniformly once
466    /// projection starts.
467    ///
468    /// [bs]: # "See issue #342: BindingSource gains ContextField(FieldId)"
469    fn provide_context(
470        &self,
471        producer: &ProducerInstance,
472        effect: &EffectPayload<E>,
473    ) -> Vec<(FieldId, OwnedFieldValue)>;
474}
475
476/// Typed binding attached to a runtime that holds a dispatcher.
477///
478/// Discriminates the "machine participates in a composition" case from the
479/// "machine is standalone" case *at the type level*: no
480/// `Option<Arc<dyn CompositionDispatcher>>`. Callers obtain the concrete
481/// dispatcher via [`CompositionBinding::wired`] and honor
482/// [`CompositionBinding::is_standalone`] to tell the two apart. The two
483/// constructor halves on `MeerkatMachine` (`with_composition(...)` vs
484/// `standalone(...)` / `ephemeral()` / `persistent()`) are the public
485/// face of this distinction.
486///
487/// **OwnerProvided (#342)**: some routes need consumer-side fields that
488/// aren't in the producer effect body — the canonical case is
489/// `session_id` on the `meerkat_mob_seam` composition. The
490/// `OwnerProvided` variant pairs a dispatcher with a typed
491/// [`ContextProvider`] so the runtime that owns the dispatcher supplies
492/// the missing fields from its own typed state at dispatch time.
493/// `OwnerProvided` is semantically a superset of `Wired`: callers that
494/// only need the dispatcher reach it through the same
495/// [`wired`](Self::wired) accessor; callers that need the context
496/// provider reach it through
497/// [`context_provider`](Self::context_provider), which returns `Some`
498/// only for `OwnerProvided`.
499pub enum CompositionBinding<E: ProducerEffect> {
500    /// Machine is not part of a composition. Routed-effect dispatch is not
501    /// available.
502    Standalone,
503    /// Machine participates in a composition and owns a typed dispatcher.
504    /// No owner-supplied context: all route bindings project from the
505    /// producer's effect body.
506    Wired(Arc<dyn CompositionDispatcher<Effect = E>>),
507    /// Machine participates in a composition that declares routes with
508    /// owner-supplied context (issue #342). The `context` is consulted
509    /// alongside the producer effect at dispatch time to fulfil route
510    /// bindings whose source is `ContextField` rather than
511    /// `ProducerField`.
512    OwnerProvided {
513        dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
514        context: Arc<dyn ContextProvider<E>>,
515    },
516}
517
518impl<E: ProducerEffect> fmt::Debug for CompositionBinding<E> {
519    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
520        match self {
521            Self::Standalone => f.debug_struct("CompositionBinding::Standalone").finish(),
522            Self::Wired(_) => f
523                .debug_struct("CompositionBinding::Wired")
524                .field("dispatcher", &"<dyn CompositionDispatcher>")
525                .finish(),
526            Self::OwnerProvided { .. } => f
527                .debug_struct("CompositionBinding::OwnerProvided")
528                .field("dispatcher", &"<dyn CompositionDispatcher>")
529                .field("context", &"<dyn ContextProvider>")
530                .finish(),
531        }
532    }
533}
534
535impl<E: ProducerEffect> CompositionBinding<E> {
536    /// Construct a `Standalone` binding.
537    ///
538    /// Mirrors `MeerkatMachine::standalone(...)` at the binding level so
539    /// call sites that wire a runtime without composition can say so
540    /// positively instead of spelling the enum variant. Equivalent to
541    /// `CompositionBinding::Standalone`.
542    pub fn standalone() -> Self {
543        Self::Standalone
544    }
545
546    /// Construct a `Wired` binding from a composition dispatcher.
547    ///
548    /// Use this when every route binding projects from the producer
549    /// effect body alone. If any route declares an owner-supplied
550    /// context field, use [`Self::owner_provided`] instead.
551    pub fn wired_with(dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>) -> Self {
552        Self::Wired(dispatcher)
553    }
554
555    /// Construct an `OwnerProvided` binding from a composition
556    /// dispatcher and a typed context provider.
557    ///
558    /// Use this for compositions whose route bindings reference owner-
559    /// supplied context fields (per issue #342) — the provider is
560    /// consulted at dispatch time for each routed effect so the
561    /// missing fields can be fulfilled from the runtime's own state.
562    pub fn owner_provided(
563        dispatcher: Arc<dyn CompositionDispatcher<Effect = E>>,
564        context: Arc<dyn ContextProvider<E>>,
565    ) -> Self {
566        Self::OwnerProvided {
567            dispatcher,
568            context,
569        }
570    }
571
572    /// Report whether this machine is standalone (no composition attached).
573    pub fn is_standalone(&self) -> bool {
574        matches!(self, Self::Standalone)
575    }
576
577    /// Borrow the wired dispatcher, if any.
578    ///
579    /// Returns `None` only for [`CompositionBinding::Standalone`].
580    /// Both `Wired` and `OwnerProvided` expose their dispatcher through
581    /// this accessor so call sites that only need to dispatch a typed
582    /// effect don't have to branch on context-provider presence — the
583    /// type split exists so this is enforced at the construction
584    /// boundary, not re-checked at every call site.
585    pub fn wired(&self) -> Option<&Arc<dyn CompositionDispatcher<Effect = E>>> {
586        match self {
587            Self::Standalone => None,
588            Self::Wired(d) => Some(d),
589            Self::OwnerProvided { dispatcher, .. } => Some(dispatcher),
590        }
591    }
592
593    /// Borrow the owner-supplied [`ContextProvider`], if any.
594    ///
595    /// Returns `Some` only for [`CompositionBinding::OwnerProvided`].
596    /// `Standalone` has no dispatcher; `Wired` has a dispatcher but no
597    /// owner-supplied context, so callers that walk route bindings and
598    /// encounter a `ContextField` source on a `Wired` binding should
599    /// surface a typed refusal rather than silently treat it as an
600    /// empty context.
601    pub fn context_provider(&self) -> Option<&Arc<dyn ContextProvider<E>>> {
602        match self {
603            Self::Standalone | Self::Wired(_) => None,
604            Self::OwnerProvided { context, .. } => Some(context),
605        }
606    }
607}
608
609/// Default catalog-backed dispatcher.
610///
611/// Consumes a [`RouteTable`] (built from a
612/// [`meerkat_machine_schema::CompositionSchema`]) plus a map of consumer
613/// surfaces keyed by [`MachineInstanceId`]. Every routed effect goes
614/// through the same three steps:
615///
616/// 1. Look up the input-kind route for `(producer.instance_id, effect.variant)`.
617/// 2. Project the producer's field values into the consumer-field bindings.
618/// 3. Deliver via the consumer surface registered for the target instance.
619///
620/// No step has a silent-drop fallback. Unresolved routes, signal-kind
621/// targets, missing producer fields, and unwired consumers are all typed
622/// [`DispatchRefusal`] errors.
623pub struct CatalogCompositionDispatcher<E: ProducerEffect> {
624    composition: CompositionId,
625    table: RouteTable,
626    consumers: HashMap<MachineInstanceId, Arc<dyn ConsumerSurface>>,
627    _effect: std::marker::PhantomData<fn(E)>,
628}
629
630/// Default catalog-backed signal dispatcher.
631///
632/// This is the signal-kind mirror of [`CatalogCompositionDispatcher`]:
633/// it consumes the same [`RouteTable`] but resolves through the signal
634/// index and delivers to [`SignalConsumerSurface`].
635pub struct CatalogCompositionSignalDispatcher<S: ProducerSignal> {
636    composition: CompositionId,
637    table: RouteTable,
638    consumers: HashMap<MachineInstanceId, Arc<dyn SignalConsumerSurface>>,
639    _signal: std::marker::PhantomData<fn(S)>,
640}
641
642impl<S: ProducerSignal> fmt::Debug for CatalogCompositionSignalDispatcher<S> {
643    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
644        f.debug_struct("CatalogCompositionSignalDispatcher")
645            .field("composition", &self.composition)
646            .field("signal_routes", &self.table.signal_route_count())
647            .field("consumers", &self.consumers.len())
648            .finish()
649    }
650}
651
652impl<E: ProducerEffect> fmt::Debug for CatalogCompositionDispatcher<E> {
653    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
654        f.debug_struct("CatalogCompositionDispatcher")
655            .field("composition", &self.composition)
656            .field("routes", &self.table.len())
657            .field("consumers", &self.consumers.len())
658            .finish()
659    }
660}
661
662impl<E: ProducerEffect> CatalogCompositionDispatcher<E> {
663    /// Build a new dispatcher for `composition`, using `table` as the typed
664    /// route index.
665    pub fn new(composition: CompositionId, table: RouteTable) -> Self {
666        Self {
667            composition,
668            table,
669            consumers: HashMap::new(),
670            _effect: std::marker::PhantomData,
671        }
672    }
673
674    /// Register a consumer surface for a target instance.
675    ///
676    /// Panics are impossible — duplicate registrations replace the prior
677    /// entry. (Duplicate wiring is a construction bug; the callers in
678    /// wave-b prove registration happens exactly once per instance in the
679    /// composition schema.)
680    pub fn with_consumer(mut self, surface: Arc<dyn ConsumerSurface>) -> Self {
681        self.consumers
682            .insert(surface.instance_id().clone(), surface);
683        self
684    }
685}
686
687impl<S: ProducerSignal> CatalogCompositionSignalDispatcher<S> {
688    /// Build a new signal dispatcher for `composition`.
689    pub fn new(composition: CompositionId, table: RouteTable) -> Self {
690        Self {
691            composition,
692            table,
693            consumers: HashMap::new(),
694            _signal: std::marker::PhantomData,
695        }
696    }
697
698    /// Register a signal consumer surface for a target instance.
699    pub fn with_consumer(mut self, surface: Arc<dyn SignalConsumerSurface>) -> Self {
700        self.consumers
701            .insert(surface.instance_id().clone(), surface);
702        self
703    }
704}
705
706#[async_trait]
707impl<E: ProducerEffect> CompositionDispatcher for CatalogCompositionDispatcher<E> {
708    type Effect = E;
709
710    fn composition(&self) -> &CompositionId {
711        &self.composition
712    }
713
714    async fn dispatch(
715        &self,
716        producer: ProducerInstance,
717        effect: EffectPayload<Self::Effect>,
718    ) -> Result<DispatchOutcome, DispatchRefusal> {
719        if producer.composition != self.composition {
720            return Err(DispatchRefusal::CompositionMismatch {
721                expected: self.composition.clone(),
722                actual: producer.composition,
723            });
724        }
725
726        let variant = effect.variant().clone();
727        let body = effect.body();
728
729        let descriptor = self
730            .table
731            .resolve(&producer.instance_id, &variant)
732            .ok_or_else(|| DispatchRefusal::UnresolvedRoute {
733                composition: self.composition.clone(),
734                instance: producer.instance_id.clone(),
735                variant: variant.clone(),
736            })?;
737
738        let mut projected: Vec<(FieldId, OwnedFieldValue)> =
739            Vec::with_capacity(descriptor.bindings.len());
740        for (from_field, to_field) in &descriptor.bindings {
741            let value =
742                body.field(from_field)
743                    .ok_or_else(|| DispatchRefusal::MissingProducerField {
744                        route: descriptor.route_id.clone(),
745                        variant: variant.clone(),
746                        field: from_field.clone(),
747                    })?;
748            projected.push((to_field.clone(), value.to_owned_value()));
749        }
750
751        let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
752            DispatchRefusal::UnwiredConsumer {
753                composition: self.composition.clone(),
754                instance: descriptor.instance_id.clone(),
755            }
756        })?;
757
758        consumer
759            .apply_routed_input(descriptor.input_variant.clone(), projected)
760            .await
761            .map_err(|reason| DispatchRefusal::ConsumerRefused {
762                instance: descriptor.instance_id.clone(),
763                variant: descriptor.input_variant.clone(),
764                reason,
765            })?;
766
767        Ok(DispatchOutcome {
768            route: RouteKey {
769                composition: self.composition.clone(),
770                route_id: descriptor.route_id.clone(),
771            },
772            consumer: descriptor.instance_id.clone(),
773            applied_input: descriptor.input_variant.clone(),
774        })
775    }
776}
777
778#[async_trait]
779impl<S: ProducerSignal> CompositionSignalDispatcher for CatalogCompositionSignalDispatcher<S> {
780    type Signal = S;
781
782    fn composition(&self) -> &CompositionId {
783        &self.composition
784    }
785
786    async fn dispatch_signal(
787        &self,
788        producer: ProducerInstance,
789        signal: SignalPayload<Self::Signal>,
790    ) -> Result<SignalDispatchOutcome, SignalDispatchRefusal> {
791        if producer.composition != self.composition {
792            return Err(SignalDispatchRefusal::CompositionMismatch {
793                expected: self.composition.clone(),
794                actual: producer.composition,
795            });
796        }
797
798        let variant = signal.variant().clone();
799        let body = signal.body();
800
801        let descriptor = self
802            .table
803            .resolve_signal(&producer.instance_id, &variant)
804            .ok_or_else(|| SignalDispatchRefusal::UnresolvedRoute {
805                composition: self.composition.clone(),
806                instance: producer.instance_id.clone(),
807                variant: variant.clone(),
808            })?;
809
810        let mut projected: Vec<(FieldId, OwnedFieldValue)> =
811            Vec::with_capacity(descriptor.bindings.len());
812        for (from_field, to_field) in &descriptor.bindings {
813            let value = body.field(from_field).ok_or_else(|| {
814                SignalDispatchRefusal::MissingProducerField {
815                    route: descriptor.route_id.clone(),
816                    variant: variant.clone(),
817                    field: from_field.clone(),
818                }
819            })?;
820            projected.push((to_field.clone(), value.to_owned_value()));
821        }
822
823        let consumer = self.consumers.get(&descriptor.instance_id).ok_or_else(|| {
824            SignalDispatchRefusal::UnwiredConsumer {
825                composition: self.composition.clone(),
826                instance: descriptor.instance_id.clone(),
827            }
828        })?;
829
830        consumer
831            .receive_signal(descriptor.signal_variant.clone(), projected)
832            .await
833            .map_err(|reason| SignalDispatchRefusal::ConsumerRefused {
834                instance: descriptor.instance_id.clone(),
835                variant: descriptor.signal_variant.clone(),
836                reason,
837            })?;
838
839        Ok(SignalDispatchOutcome {
840            route: RouteKey {
841                composition: self.composition.clone(),
842                route_id: descriptor.route_id.clone(),
843            },
844            consumer: descriptor.instance_id.clone(),
845            applied_signal: descriptor.signal_variant.clone(),
846        })
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use meerkat_machine_schema::catalog::meerkat_mob_seam_composition;
854
855    /// Hand-written stand-in for the codegen-emitted `MeerkatMobSeamEffect`
856    /// sum. Matches the shape the B-4b tests pin for the live catalog:
857    /// one variant per producer instance, each wrapping a typed effect
858    /// body (we cover the `RequestRuntimeBinding` arm for the dispatcher
859    /// path).
860    #[derive(Debug, Clone, PartialEq, Eq)]
861    enum SeamEffect {
862        Mob(MobEffect),
863    }
864
865    #[derive(Debug, Clone, PartialEq, Eq)]
866    enum MobEffect {
867        RequestRuntimeBinding {
868            agent_runtime_id: String,
869            fence_token: u64,
870            generation: u64,
871            session_id: String,
872        },
873    }
874
875    impl ProducerEffect for SeamEffect {
876        fn variant_id(&self) -> EffectVariantId {
877            match self {
878                Self::Mob(MobEffect::RequestRuntimeBinding { .. }) => {
879                    EffectVariantId::parse("RequestRuntimeBinding").expect("slug")
880                }
881            }
882        }
883
884        fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
885            match self {
886                Self::Mob(MobEffect::RequestRuntimeBinding {
887                    agent_runtime_id,
888                    fence_token,
889                    generation,
890                    session_id,
891                }) => match id.as_str() {
892                    "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
893                    "fence_token" => Some(FieldValue::U64(*fence_token)),
894                    "generation" => Some(FieldValue::U64(*generation)),
895                    "session_id" => Some(FieldValue::Str(session_id)),
896                    _ => None,
897                },
898            }
899        }
900    }
901
902    /// Hand-written stand-in for the codegen-emitted signal source sum.
903    /// These are the MeerkatMachine routed lifecycle effects that the
904    /// `meerkat_mob_seam` schema routes to MobMachine signals.
905    #[allow(clippy::enum_variant_names)]
906    #[derive(Debug, Clone, PartialEq, Eq)]
907    enum SeamSignal {
908        RuntimeBound {
909            agent_runtime_id: String,
910            fence_token: u64,
911        },
912        RuntimeRetired {
913            agent_runtime_id: String,
914            fence_token: u64,
915        },
916        RuntimeDestroyed {
917            agent_runtime_id: String,
918            fence_token: u64,
919        },
920    }
921
922    impl ProducerSignal for SeamSignal {
923        fn variant_id(&self) -> EffectVariantId {
924            let slug = match self {
925                Self::RuntimeBound { .. } => "RuntimeBound",
926                Self::RuntimeRetired { .. } => "RuntimeRetired",
927                Self::RuntimeDestroyed { .. } => "RuntimeDestroyed",
928            };
929            EffectVariantId::parse(slug).expect("signal source slug")
930        }
931
932        fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
933            let (agent_runtime_id, fence_token) = match self {
934                Self::RuntimeBound {
935                    agent_runtime_id,
936                    fence_token,
937                }
938                | Self::RuntimeRetired {
939                    agent_runtime_id,
940                    fence_token,
941                }
942                | Self::RuntimeDestroyed {
943                    agent_runtime_id,
944                    fence_token,
945                } => (agent_runtime_id, fence_token),
946            };
947            match id.as_str() {
948                "agent_runtime_id" => Some(FieldValue::Str(agent_runtime_id)),
949                "fence_token" => Some(FieldValue::U64(*fence_token)),
950                _ => None,
951            }
952        }
953    }
954
955    #[derive(Default)]
956    struct RecordingMeerkatSurface {
957        log: tokio::sync::Mutex<Vec<(InputVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
958    }
959
960    #[async_trait]
961    impl ConsumerSurface for RecordingMeerkatSurface {
962        fn instance_id(&self) -> &MachineInstanceId {
963            // leak is fine in tests: we want a 'static reference; the
964            // instance_id is stable for the lifetime of the test binary.
965            static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
966            ID.get_or_init(|| MachineInstanceId::parse("meerkat").unwrap())
967        }
968
969        async fn apply_routed_input(
970            &self,
971            variant: InputVariantId,
972            projected_fields: Vec<(FieldId, OwnedFieldValue)>,
973        ) -> Result<(), String> {
974            self.log.lock().await.push((variant, projected_fields));
975            Ok(())
976        }
977    }
978
979    #[derive(Default)]
980    struct RecordingMobSignalSurface {
981        log: tokio::sync::Mutex<Vec<(SignalVariantId, Vec<(FieldId, OwnedFieldValue)>)>>,
982    }
983
984    #[async_trait]
985    impl SignalConsumerSurface for RecordingMobSignalSurface {
986        fn instance_id(&self) -> &MachineInstanceId {
987            static ID: std::sync::OnceLock<MachineInstanceId> = std::sync::OnceLock::new();
988            ID.get_or_init(|| MachineInstanceId::parse("mob").unwrap())
989        }
990
991        async fn receive_signal(
992            &self,
993            variant: SignalVariantId,
994            projected_fields: Vec<(FieldId, OwnedFieldValue)>,
995        ) -> Result<(), String> {
996            self.log.lock().await.push((variant, projected_fields));
997            Ok(())
998        }
999    }
1000
1001    fn mob_producer() -> ProducerInstance {
1002        ProducerInstance {
1003            composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1004            instance_id: MachineInstanceId::parse("mob").unwrap(),
1005            machine: MachineId::parse("MobMachine").unwrap(),
1006        }
1007    }
1008
1009    fn meerkat_producer() -> ProducerInstance {
1010        ProducerInstance {
1011            composition: CompositionId::parse("meerkat_mob_seam").unwrap(),
1012            instance_id: MachineInstanceId::parse("meerkat").unwrap(),
1013            machine: MachineId::parse("MeerkatMachine").unwrap(),
1014        }
1015    }
1016
1017    fn sample_effect() -> EffectPayload<SeamEffect> {
1018        EffectPayload::Emitted {
1019            variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1020            body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1021                agent_runtime_id: "rt-1".into(),
1022                fence_token: 7,
1023                generation: 3,
1024                session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1025            }),
1026        }
1027    }
1028
1029    fn build_dispatcher(
1030        consumer: Arc<RecordingMeerkatSurface>,
1031    ) -> CatalogCompositionDispatcher<SeamEffect> {
1032        let schema = meerkat_mob_seam_composition();
1033        let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1034        CatalogCompositionDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1035    }
1036
1037    fn sample_signal() -> SignalPayload<SeamSignal> {
1038        let body = SeamSignal::RuntimeBound {
1039            agent_runtime_id: "rt-1".into(),
1040            fence_token: 7,
1041        };
1042        SignalPayload::Emitted {
1043            variant: body.variant_id(),
1044            body,
1045        }
1046    }
1047
1048    fn build_signal_dispatcher(
1049        consumer: Arc<RecordingMobSignalSurface>,
1050    ) -> CatalogCompositionSignalDispatcher<SeamSignal> {
1051        let schema = meerkat_mob_seam_composition();
1052        let table = RouteTable::from_schema(&schema).expect("seam schema routes are well-formed");
1053        CatalogCompositionSignalDispatcher::new(schema.name.clone(), table).with_consumer(consumer)
1054    }
1055
1056    #[tokio::test]
1057    async fn dispatches_mob_routed_effect_to_meerkat_consumer() {
1058        let consumer = Arc::new(RecordingMeerkatSurface::default());
1059        let dispatcher = build_dispatcher(Arc::clone(&consumer));
1060
1061        let outcome = dispatcher
1062            .dispatch(mob_producer(), sample_effect())
1063            .await
1064            .expect("well-formed routed effect");
1065
1066        assert_eq!(outcome.consumer.as_str(), "meerkat");
1067        assert_eq!(outcome.applied_input.as_str(), "PrepareBindings");
1068        assert_eq!(
1069            outcome.route.route_id.as_str(),
1070            "binding_request_reaches_meerkat"
1071        );
1072
1073        let log = consumer.log.lock().await;
1074        assert_eq!(
1075            log.len(),
1076            1,
1077            "dispatcher must call the consumer exactly once"
1078        );
1079        let (variant, fields) = &log[0];
1080        assert_eq!(variant.as_str(), "PrepareBindings");
1081        let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1082        assert_eq!(
1083            field_names,
1084            vec![
1085                "agent_runtime_id",
1086                "fence_token",
1087                "generation",
1088                "session_id"
1089            ]
1090        );
1091        match &fields[0].1 {
1092            OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1093            other => panic!("expected Str, got {other:?}"),
1094        }
1095        match &fields[1].1 {
1096            OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1097            other => panic!("expected U64, got {other:?}"),
1098        }
1099        match &fields[2].1 {
1100            OwnedFieldValue::U64(v) => assert_eq!(*v, 3),
1101            other => panic!("expected U64, got {other:?}"),
1102        }
1103        match &fields[3].1 {
1104            OwnedFieldValue::Str(s) => assert_eq!(s, "019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
1105            other => panic!("expected Str for session_id, got {other:?}"),
1106        }
1107    }
1108
1109    #[tokio::test]
1110    async fn dispatches_meerkat_routed_signal_to_mob_consumer() {
1111        let consumer = Arc::new(RecordingMobSignalSurface::default());
1112        let dispatcher = build_signal_dispatcher(Arc::clone(&consumer));
1113
1114        let outcome = dispatcher
1115            .dispatch_signal(meerkat_producer(), sample_signal())
1116            .await
1117            .expect("well-formed routed signal");
1118
1119        assert_eq!(outcome.consumer.as_str(), "mob");
1120        assert_eq!(outcome.applied_signal.as_str(), "ObserveRuntimeReady");
1121        assert_eq!(outcome.route.route_id.as_str(), "runtime_bound_reaches_mob");
1122
1123        let log = consumer.log.lock().await;
1124        assert_eq!(
1125            log.len(),
1126            1,
1127            "dispatcher must call the signal consumer exactly once"
1128        );
1129        let (variant, fields) = &log[0];
1130        assert_eq!(variant.as_str(), "ObserveRuntimeReady");
1131        let field_names: Vec<&str> = fields.iter().map(|(k, _)| k.as_str()).collect();
1132        assert_eq!(field_names, vec!["agent_runtime_id", "fence_token"]);
1133        match &fields[0].1 {
1134            OwnedFieldValue::Str(s) => assert_eq!(s, "rt-1"),
1135            other => panic!("expected Str, got {other:?}"),
1136        }
1137        match &fields[1].1 {
1138            OwnedFieldValue::U64(v) => assert_eq!(*v, 7),
1139            other => panic!("expected U64, got {other:?}"),
1140        }
1141    }
1142
1143    #[tokio::test]
1144    async fn signal_dispatch_refuses_input_route_typed() {
1145        let consumer = Arc::new(RecordingMobSignalSurface::default());
1146        let dispatcher = build_signal_dispatcher(consumer);
1147
1148        let payload = SignalPayload::Emitted {
1149            variant: EffectVariantId::parse("RequestRuntimeBinding").unwrap(),
1150            body: SeamSignal::RuntimeBound {
1151                agent_runtime_id: "rt-1".into(),
1152                fence_token: 7,
1153            },
1154        };
1155
1156        let err = dispatcher
1157            .dispatch_signal(mob_producer(), payload)
1158            .await
1159            .expect_err("input route is out of the signal surface");
1160
1161        assert!(matches!(err, SignalDispatchRefusal::UnresolvedRoute { .. }));
1162    }
1163
1164    #[tokio::test]
1165    async fn signal_dispatch_refuses_unwired_consumer_typed() {
1166        let schema = meerkat_mob_seam_composition();
1167        let table = RouteTable::from_schema(&schema).unwrap();
1168        let dispatcher: CatalogCompositionSignalDispatcher<SeamSignal> =
1169            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table);
1170
1171        let err = dispatcher
1172            .dispatch_signal(meerkat_producer(), sample_signal())
1173            .await
1174            .expect_err("unwired signal consumer");
1175
1176        assert!(matches!(err, SignalDispatchRefusal::UnwiredConsumer { .. }));
1177    }
1178
1179    #[tokio::test]
1180    async fn signal_dispatch_refuses_missing_field_typed() {
1181        #[derive(Debug)]
1182        struct BrokenSignal;
1183
1184        impl ProducerSignal for BrokenSignal {
1185            fn variant_id(&self) -> EffectVariantId {
1186                EffectVariantId::parse("RuntimeBound").unwrap()
1187            }
1188
1189            fn field(&self, _id: &FieldId) -> Option<FieldValue<'_>> {
1190                None
1191            }
1192        }
1193
1194        let schema = meerkat_mob_seam_composition();
1195        let table = RouteTable::from_schema(&schema).unwrap();
1196        let consumer = Arc::new(RecordingMobSignalSurface::default());
1197        let dispatcher: CatalogCompositionSignalDispatcher<BrokenSignal> =
1198            CatalogCompositionSignalDispatcher::new(schema.name.clone(), table)
1199                .with_consumer(consumer);
1200
1201        let err = dispatcher
1202            .dispatch_signal(
1203                meerkat_producer(),
1204                SignalPayload::Emitted {
1205                    variant: EffectVariantId::parse("RuntimeBound").unwrap(),
1206                    body: BrokenSignal,
1207                },
1208            )
1209            .await
1210            .expect_err("missing producer field");
1211
1212        assert!(matches!(
1213            err,
1214            SignalDispatchRefusal::MissingProducerField { .. }
1215        ));
1216    }
1217
1218    #[tokio::test]
1219    async fn refuses_mismatched_composition() {
1220        let consumer = Arc::new(RecordingMeerkatSurface::default());
1221        let dispatcher = build_dispatcher(consumer);
1222
1223        let mut wrong = mob_producer();
1224        wrong.composition = CompositionId::parse("some_other_composition").unwrap();
1225
1226        let err = dispatcher
1227            .dispatch(wrong, sample_effect())
1228            .await
1229            .expect_err("composition mismatch");
1230
1231        assert!(matches!(err, DispatchRefusal::CompositionMismatch { .. }));
1232    }
1233
1234    #[tokio::test]
1235    async fn refuses_unrouted_effect_typed() {
1236        let consumer = Arc::new(RecordingMeerkatSurface::default());
1237        let dispatcher = build_dispatcher(consumer);
1238
1239        // The schema has no route for `Mob::UnknownEffect`; use the well-
1240        // formed producer but label the variant with an id that has no
1241        // declared route.
1242        let payload = EffectPayload::Emitted {
1243            variant: EffectVariantId::parse("UnknownEffect").unwrap(),
1244            body: SeamEffect::Mob(MobEffect::RequestRuntimeBinding {
1245                agent_runtime_id: "rt".into(),
1246                fence_token: 0,
1247                generation: 0,
1248                session_id: "019dbd3d-d7ad-75a1-96d0-8013927e78f8".into(),
1249            }),
1250        };
1251
1252        let err = dispatcher
1253            .dispatch(mob_producer(), payload)
1254            .await
1255            .expect_err("unresolved route");
1256
1257        assert!(matches!(err, DispatchRefusal::UnresolvedRoute { .. }));
1258    }
1259
1260    #[tokio::test]
1261    async fn refuses_unwired_consumer_typed() {
1262        // Build a dispatcher with NO consumer surface registered. The route
1263        // resolves but the delivery step must return UnwiredConsumer, not
1264        // silently succeed.
1265        let schema = meerkat_mob_seam_composition();
1266        let table = RouteTable::from_schema(&schema).unwrap();
1267        let dispatcher: CatalogCompositionDispatcher<SeamEffect> =
1268            CatalogCompositionDispatcher::new(schema.name.clone(), table);
1269
1270        let err = dispatcher
1271            .dispatch(mob_producer(), sample_effect())
1272            .await
1273            .expect_err("unwired consumer");
1274
1275        assert!(matches!(err, DispatchRefusal::UnwiredConsumer { .. }));
1276    }
1277
1278    #[tokio::test]
1279    async fn standalone_binding_has_no_dispatcher() {
1280        let binding: CompositionBinding<SeamEffect> = CompositionBinding::Standalone;
1281        assert!(binding.is_standalone());
1282        assert!(binding.wired().is_none());
1283    }
1284
1285    #[tokio::test]
1286    async fn wired_binding_exposes_dispatcher() {
1287        let consumer = Arc::new(RecordingMeerkatSurface::default());
1288        let dispatcher = Arc::new(build_dispatcher(consumer));
1289        let binding: CompositionBinding<SeamEffect> = CompositionBinding::Wired(dispatcher);
1290        assert!(!binding.is_standalone());
1291        assert!(binding.wired().is_some());
1292        assert!(
1293            binding.context_provider().is_none(),
1294            "plain Wired binding has no owner-supplied context"
1295        );
1296    }
1297
1298    /// Owner-supplied context provider for routes that need typed
1299    /// fields not in the producer effect body. In production this would
1300    /// be a runtime-owned struct (e.g. one carrying a pinned
1301    /// `SessionId`); the test just returns a canned pair to exercise
1302    /// the trait's single-method signature.
1303    struct PinnedSessionContext {
1304        session_id: String,
1305    }
1306
1307    impl ContextProvider<SeamEffect> for PinnedSessionContext {
1308        fn provide_context(
1309            &self,
1310            _producer: &ProducerInstance,
1311            _effect: &EffectPayload<SeamEffect>,
1312        ) -> Vec<(FieldId, OwnedFieldValue)> {
1313            vec![(
1314                FieldId::parse("session_id").expect("field id"),
1315                OwnedFieldValue::Str(self.session_id.clone()),
1316            )]
1317        }
1318    }
1319
1320    #[tokio::test]
1321    async fn owner_provided_binding_exposes_both_dispatcher_and_context() {
1322        let consumer = Arc::new(RecordingMeerkatSurface::default());
1323        let dispatcher = Arc::new(build_dispatcher(consumer));
1324        let context = Arc::new(PinnedSessionContext {
1325            session_id: "session-abc".into(),
1326        });
1327        let binding: CompositionBinding<SeamEffect> =
1328            CompositionBinding::owner_provided(dispatcher, context);
1329
1330        assert!(!binding.is_standalone());
1331        assert!(
1332            binding.wired().is_some(),
1333            "OwnerProvided is a superset of Wired for dispatcher access"
1334        );
1335        assert!(
1336            binding.context_provider().is_some(),
1337            "OwnerProvided must expose the owner-supplied context"
1338        );
1339
1340        // The typed provider returns the expected single owner-supplied
1341        // field. Matches the #342 use case: `session_id` is absent from
1342        // the producer effect body but present in the projected fields
1343        // the consumer needs.
1344        let provider = binding.context_provider().expect("context provider");
1345        let producer = mob_producer();
1346        let effect = sample_effect();
1347        let fields = provider.provide_context(&producer, &effect);
1348        assert_eq!(fields.len(), 1);
1349        assert_eq!(fields[0].0.as_str(), "session_id");
1350        match &fields[0].1 {
1351            OwnedFieldValue::Str(s) => assert_eq!(s, "session-abc"),
1352            other => panic!("expected Str context field, got {other:?}"),
1353        }
1354    }
1355
1356    #[tokio::test]
1357    async fn composition_binding_constructors_parallel_machine_halves() {
1358        // `CompositionBinding::standalone()` is the binding-level mirror
1359        // of `MeerkatMachine::standalone(...)`; `wired_with` and
1360        // `owner_provided` mirror `MeerkatMachine::with_composition(...)`.
1361        // The constructor split exists so call sites say positively which
1362        // half they are wiring, rather than spelling the enum variant.
1363        let standalone: CompositionBinding<SeamEffect> = CompositionBinding::standalone();
1364        assert!(standalone.is_standalone());
1365        assert!(standalone.wired().is_none());
1366        assert!(standalone.context_provider().is_none());
1367
1368        let consumer = Arc::new(RecordingMeerkatSurface::default());
1369        let dispatcher: Arc<dyn CompositionDispatcher<Effect = SeamEffect>> =
1370            Arc::new(build_dispatcher(consumer));
1371        let wired: CompositionBinding<SeamEffect> =
1372            CompositionBinding::wired_with(Arc::clone(&dispatcher));
1373        assert!(!wired.is_standalone());
1374        assert!(wired.wired().is_some());
1375        assert!(wired.context_provider().is_none());
1376
1377        let context = Arc::new(PinnedSessionContext {
1378            session_id: "session-xyz".into(),
1379        });
1380        let owner_provided: CompositionBinding<SeamEffect> =
1381            CompositionBinding::owner_provided(dispatcher, context);
1382        assert!(!owner_provided.is_standalone());
1383        assert!(owner_provided.wired().is_some());
1384        assert!(owner_provided.context_provider().is_some());
1385    }
1386}