use crate::error::MobError;
use crate::machines::mob_machine as mob_dsl;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use meerkat_machine_schema::identity::{
CompositionId, EffectVariantId, FieldId, MachineId, MachineInstanceId, SignalVariantId,
};
use meerkat_runtime::composition::{
CatalogCompositionSignalDispatcher, CompositionBinding, CompositionDispatcher, DispatchOutcome,
DispatchRefusal, EffectPayload, FieldValue, OwnedFieldValue, ProducerEffect, ProducerInstance,
RouteTable, SignalConsumerSurface,
};
use meerkat_runtime::generated::meerkat_mob_seam as seam_facts;
use meerkat_runtime::meerkat_machine::dsl as meerkat_dsl;
use std::sync::Arc;
use tokio::sync::mpsc;
pub type CompositionDispatcherHandle = Arc<dyn CompositionDispatcher<Effect = MobSeamEffect>>;
pub type MobCompositionBinding = CompositionBinding<MobSeamEffect>;
pub(crate) fn mob_seam_composition_id() -> CompositionId {
seam_facts::composition_id()
}
pub(crate) fn mob_producer_instance_id() -> MachineInstanceId {
seam_facts::producers::mob_instance_id()
}
pub(crate) fn mob_machine_id() -> MachineId {
seam_facts::producers::mob_machine_id()
}
pub fn mob_producer_instance() -> ProducerInstance {
ProducerInstance {
composition: mob_seam_composition_id(),
instance_id: mob_producer_instance_id(),
machine: mob_machine_id(),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MobSeamEffect {
Mob(mob_dsl::MobMachineEffect),
}
impl MobSeamEffect {
pub fn variant_id(&self) -> EffectVariantId {
match self {
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeBinding { .. }) => {
seam_facts::effects::mob::request_runtime_binding()
}
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeIngress { .. }) => {
seam_facts::effects::mob::request_runtime_ingress()
}
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeRetire { .. }) => {
seam_facts::effects::mob::request_runtime_retire()
}
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeDestroy { .. }) => {
seam_facts::effects::mob::request_runtime_destroy()
}
Self::Mob(other) => unreachable!("non-routed mob effect reached seam: {other:?}"),
}
}
pub fn generated_input_route(&self) -> Option<seam_facts::TypedRoutedInput> {
seam_facts::route_to_input(&mob_producer_instance_id(), &self.variant_id())
}
fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
match self {
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeBinding {
agent_identity: _,
agent_runtime_id,
fence_token,
generation,
session_id,
}) => {
if id == &seam_facts::fields::agent_runtime_id() {
Some(FieldValue::Str(agent_runtime_id.as_str()))
} else if id == &seam_facts::fields::fence_token() {
Some(FieldValue::U64(fence_token.0))
} else if id == &seam_facts::fields::generation() {
Some(FieldValue::U64(generation.0))
} else if id == &seam_facts::fields::session_id() {
Some(FieldValue::Str(session_id.0.as_str()))
} else {
None
}
}
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeIngress {
agent_runtime_id,
fence_token,
work_id,
origin,
}) => {
if id == &seam_facts::fields::agent_runtime_id() {
Some(FieldValue::Str(agent_runtime_id.as_str()))
} else if id == &seam_facts::fields::fence_token() {
Some(FieldValue::U64(fence_token.0))
} else if id == &seam_facts::fields::work_id() {
Some(FieldValue::Str(work_id.0.as_str()))
} else if id == &seam_facts::fields::origin() {
Some(FieldValue::Opaque(Arc::new(meerkat_work_origin(origin))))
} else {
None
}
}
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeRetire { session_id }) => {
if id == &seam_facts::fields::session_id() {
Some(FieldValue::Str(session_id.0.as_str()))
} else {
None
}
}
Self::Mob(mob_dsl::MobMachineEffect::RequestRuntimeDestroy { session_id }) => {
if id == &seam_facts::fields::session_id() {
Some(FieldValue::Str(session_id.0.as_str()))
} else {
None
}
}
Self::Mob(_) => None,
}
}
}
fn meerkat_work_origin(origin: &mob_dsl::WorkOrigin) -> meerkat_dsl::WorkOrigin {
match origin {
mob_dsl::WorkOrigin::External => meerkat_dsl::WorkOrigin::External,
mob_dsl::WorkOrigin::Internal => meerkat_dsl::WorkOrigin::Internal,
mob_dsl::WorkOrigin::Ingest => meerkat_dsl::WorkOrigin::Ingest,
}
}
impl ProducerEffect for MobSeamEffect {
fn variant_id(&self) -> EffectVariantId {
MobSeamEffect::variant_id(self)
}
fn field(&self, id: &FieldId) -> Option<FieldValue<'_>> {
MobSeamEffect::field(self, id)
}
}
pub fn lift_routed_effect(effect: &mob_dsl::MobMachineEffect) -> Option<MobSeamEffect> {
use mob_dsl::MobMachineEffect as DslEffect;
match effect {
DslEffect::RequestRuntimeBinding { .. }
| DslEffect::RequestRuntimeIngress { .. }
| DslEffect::RequestRuntimeRetire { .. }
| DslEffect::RequestRuntimeDestroy { .. } => Some(MobSeamEffect::Mob(effect.clone())),
_ => None,
}
}
#[cfg(feature = "runtime-adapter")]
pub fn wired_binding_from_runtime_adapter(
runtime_adapter: &Arc<meerkat_runtime::MeerkatMachine>,
) -> MobCompositionBinding {
use meerkat_runtime::composition::{
CatalogCompositionDispatcher, CompositionBinding, RouteTable,
};
let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema)
.expect("meerkat_mob_seam schema is well-formed by construction");
let consumer = Arc::new(
meerkat_runtime::meerkat_machine::composition::MeerkatConsumerSurface::new(Arc::clone(
runtime_adapter,
)),
);
let dispatcher: CatalogCompositionDispatcher<MobSeamEffect> =
CatalogCompositionDispatcher::new(schema.name.clone(), table).with_consumer(consumer);
CompositionBinding::Wired(Arc::new(dispatcher))
}
#[cfg(feature = "runtime-adapter")]
pub(super) fn attach_signal_dispatcher_to_runtime_adapter(
runtime_adapter: &Arc<meerkat_runtime::MeerkatMachine>,
command_tx: mpsc::Sender<super::state::MobCommand>,
) {
let schema = meerkat_machine_schema::catalog::meerkat_mob_seam_composition();
let table = RouteTable::from_schema(&schema)
.expect("meerkat_mob_seam schema is well-formed by construction");
let consumer = Arc::new(MobSignalConsumerSurface::new(command_tx));
let dispatcher: CatalogCompositionSignalDispatcher<
meerkat_runtime::meerkat_machine::composition::MeerkatSeamSignal,
> = CatalogCompositionSignalDispatcher::new(schema.name.clone(), table).with_consumer(consumer);
runtime_adapter.set_composition_signal_dispatcher(Arc::new(dispatcher));
}
#[cfg(feature = "runtime-adapter")]
struct MobSignalConsumerSurface {
command_tx: mpsc::Sender<super::state::MobCommand>,
instance_id: MachineInstanceId,
}
#[cfg(feature = "runtime-adapter")]
impl MobSignalConsumerSurface {
fn new(command_tx: mpsc::Sender<super::state::MobCommand>) -> Self {
Self {
command_tx,
instance_id: mob_producer_instance_id(),
}
}
}
#[cfg(feature = "runtime-adapter")]
fn signal_project_str<'a>(
fields: &'a [(FieldId, OwnedFieldValue)],
field: &FieldId,
) -> Result<&'a str, String> {
fields
.iter()
.find(|(id, _)| id == field)
.ok_or_else(|| format!("missing projected signal field `{}`", field.as_str()))
.and_then(|(_, value)| match value {
OwnedFieldValue::Str(value) => Ok(value.as_str()),
other => Err(format!(
"projected signal field `{}` is not Str: {other:?}",
field.as_str()
)),
})
}
#[cfg(feature = "runtime-adapter")]
fn signal_project_u64(
fields: &[(FieldId, OwnedFieldValue)],
field: &FieldId,
) -> Result<u64, String> {
fields
.iter()
.find(|(id, _)| id == field)
.ok_or_else(|| format!("missing projected signal field `{}`", field.as_str()))
.and_then(|(_, value)| match value {
OwnedFieldValue::U64(value) => Ok(*value),
other => Err(format!(
"projected signal field `{}` is not U64: {other:?}",
field.as_str()
)),
})
}
#[cfg(feature = "runtime-adapter")]
fn build_mob_signal(
variant: &SignalVariantId,
projected: &[(FieldId, OwnedFieldValue)],
) -> Result<mob_dsl::MobMachineSignal, String> {
let runtime_id = mob_dsl::AgentRuntimeId::from(
signal_project_str(projected, &seam_facts::fields::agent_runtime_id())?.to_string(),
);
let fence_token = mob_dsl::FenceToken(signal_project_u64(
projected,
&seam_facts::fields::fence_token(),
)?);
if variant == &seam_facts::signals::observe_runtime_ready() {
Ok(mob_dsl::MobMachineSignal::ObserveRuntimeReady {
agent_runtime_id: runtime_id,
fence_token,
})
} else if variant == &seam_facts::signals::observe_runtime_retired() {
Ok(mob_dsl::MobMachineSignal::ObserveRuntimeRetired {
agent_runtime_id: runtime_id,
fence_token,
})
} else if variant == &seam_facts::signals::observe_runtime_destroyed() {
Ok(mob_dsl::MobMachineSignal::ObserveRuntimeDestroyed {
agent_runtime_id: runtime_id,
fence_token,
})
} else {
Err(format!(
"mob signal consumer surface does not accept routed signal `{other}`; \
only ObserveRuntimeReady/ObserveRuntimeRetired/ObserveRuntimeDestroyed are declared",
other = variant.as_str()
))
}
}
#[cfg(feature = "runtime-adapter")]
#[async_trait::async_trait]
impl SignalConsumerSurface for MobSignalConsumerSurface {
fn instance_id(&self) -> &MachineInstanceId {
&self.instance_id
}
async fn receive_signal(
&self,
variant: SignalVariantId,
projected_fields: Vec<(FieldId, OwnedFieldValue)>,
) -> Result<(), String> {
let signal = build_mob_signal(&variant, &projected_fields)?;
self.command_tx
.send(super::state::MobCommand::ProjectMachineSignal { signal })
.await
.map_err(|error| format!("mob actor signal queue closed: {error}"))
}
}
pub async fn dispatch_routed_effect(
binding: &MobCompositionBinding,
effect: MobSeamEffect,
) -> Result<Option<DispatchOutcome>, MobError> {
let Some(dispatcher) = binding.wired() else {
return Ok(None);
};
let variant = effect.variant_id();
let payload = EffectPayload::Emitted {
variant,
body: effect,
};
dispatcher
.dispatch(mob_producer_instance(), payload)
.await
.map(Some)
.map_err(dispatch_refusal_to_mob_error)
}
fn dispatch_refusal_to_mob_error(refusal: DispatchRefusal) -> MobError {
match refusal {
DispatchRefusal::UnwiredConsumer {
composition,
instance,
} => MobError::WiringError(format!(
"composition `{composition}` has no consumer surface registered for instance `{instance}` \
— C-6c (meerkat_runtime::composition::ConsumerSurface on MeerkatMachine) pending"
)),
DispatchRefusal::UnresolvedRoute {
composition,
instance,
variant,
} => MobError::WiringError(format!(
"composition `{composition}` declares no input route for producer \
`{instance}` effect variant `{variant}`"
)),
DispatchRefusal::MissingProducerField {
route,
variant,
field,
} => MobError::WiringError(format!(
"route `{route}` requires producer field `{field}` on variant `{variant}`; \
producer did not provide it"
)),
DispatchRefusal::CompositionMismatch { expected, actual } => {
MobError::WiringError(format!(
"dispatcher composition `{expected}` does not match producer composition `{actual}`"
))
}
DispatchRefusal::ConsumerRefused {
instance,
variant,
reason,
} => MobError::Internal(format!(
"consumer `{instance}` refused routed input `{variant}`: {reason}"
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(slug: &str) -> EffectVariantId {
EffectVariantId::parse(slug).expect("slug")
}
fn fid(slug: &str) -> FieldId {
FieldId::parse(slug).expect("slug")
}
#[test]
fn request_runtime_binding_variant_id_matches_schema_slug() {
let body = mob_dsl::MobMachineEffect::RequestRuntimeBinding {
agent_identity: mob_dsl::AgentIdentity::from("agent"),
agent_runtime_id: mob_dsl::AgentRuntimeId::from("rt-1"),
fence_token: mob_dsl::FenceToken(7),
generation: mob_dsl::Generation(3),
session_id: mob_dsl::SessionId::from("session-1"),
};
assert_eq!(
MobSeamEffect::Mob(body).variant_id(),
ev("RequestRuntimeBinding"),
);
}
#[test]
fn request_runtime_binding_projects_all_route_field_bindings() {
let body = mob_dsl::MobMachineEffect::RequestRuntimeBinding {
agent_identity: mob_dsl::AgentIdentity::from("agent"),
agent_runtime_id: mob_dsl::AgentRuntimeId::from("rt-1"),
fence_token: mob_dsl::FenceToken(7),
generation: mob_dsl::Generation(3),
session_id: mob_dsl::SessionId::from("session-1"),
};
let effect = MobSeamEffect::Mob(body);
assert!(matches!(
effect.field(&fid("agent_runtime_id")).expect("present"),
FieldValue::Str("rt-1"),
));
assert!(matches!(
effect.field(&fid("fence_token")).expect("present"),
FieldValue::U64(7),
));
assert!(matches!(
effect.field(&fid("generation")).expect("present"),
FieldValue::U64(3),
));
assert!(effect.field(&fid("unknown_field")).is_none());
}
#[test]
fn routed_mob_effect_projection_tracks_generated_route_facts() {
use meerkat_runtime::generated::meerkat_mob_seam as seam_facts;
let cases = vec![
(
MobSeamEffect::Mob(mob_dsl::MobMachineEffect::RequestRuntimeBinding {
agent_identity: mob_dsl::AgentIdentity::from("agent"),
agent_runtime_id: mob_dsl::AgentRuntimeId::from("rt-1"),
fence_token: mob_dsl::FenceToken(7),
generation: mob_dsl::Generation(3),
session_id: mob_dsl::SessionId::from("session-1"),
}),
seam_facts::route_binding_request_reaches_meerkat(),
),
(
MobSeamEffect::Mob(mob_dsl::MobMachineEffect::RequestRuntimeIngress {
agent_runtime_id: mob_dsl::AgentRuntimeId::from("rt-1"),
fence_token: mob_dsl::FenceToken(7),
work_id: mob_dsl::WorkId::from("work-1"),
origin: mob_dsl::WorkOrigin::External,
}),
seam_facts::route_work_request_reaches_meerkat(),
),
(
MobSeamEffect::Mob(mob_dsl::MobMachineEffect::RequestRuntimeRetire {
session_id: mob_dsl::SessionId::from("session-1"),
}),
seam_facts::route_retire_request_reaches_meerkat(),
),
(
MobSeamEffect::Mob(mob_dsl::MobMachineEffect::RequestRuntimeDestroy {
session_id: mob_dsl::SessionId::from("session-1"),
}),
seam_facts::route_destroy_request_reaches_meerkat(),
),
];
for (effect, expected_route) in cases {
let route = effect.generated_input_route().expect("generated route");
assert_eq!(route, expected_route);
for (producer_field, _) in &route.bindings {
assert!(
effect.field(producer_field).is_some(),
"generated route `{}` requires producer field `{}`",
route.route_id.as_str(),
producer_field.as_str()
);
}
}
}
#[test]
fn retire_and_destroy_have_no_fields() {
let retire = MobSeamEffect::Mob(mob_dsl::MobMachineEffect::RequestRuntimeRetire {
session_id: mob_dsl::SessionId::from("019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
});
let destroy = MobSeamEffect::Mob(mob_dsl::MobMachineEffect::RequestRuntimeDestroy {
session_id: mob_dsl::SessionId::from("019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
});
assert_eq!(retire.variant_id(), ev("RequestRuntimeRetire"));
assert_eq!(destroy.variant_id(), ev("RequestRuntimeDestroy"));
assert!(retire.field(&fid("agent_runtime_id")).is_none());
assert!(destroy.field(&fid("agent_runtime_id")).is_none());
}
#[test]
fn ingress_exposes_schema_declared_producer_fields() {
let body = mob_dsl::MobMachineEffect::RequestRuntimeIngress {
agent_runtime_id: mob_dsl::AgentRuntimeId::from("rt-x"),
fence_token: mob_dsl::FenceToken(1),
work_id: mob_dsl::WorkId::from("w-1"),
origin: mob_dsl::WorkOrigin::External,
};
let effect = MobSeamEffect::Mob(body);
assert!(matches!(
effect
.field(&fid("agent_runtime_id"))
.expect("agent_runtime_id"),
FieldValue::Str("rt-x"),
));
assert!(effect.field(&fid("runtime_id")).is_none());
match effect.field(&fid("origin")).expect("origin") {
FieldValue::Opaque(value) => assert!(matches!(
value.downcast_ref::<meerkat_dsl::WorkOrigin>(),
Some(meerkat_dsl::WorkOrigin::External)
)),
other => panic!("origin should stay typed, got {other:?}"),
}
}
#[test]
fn lift_routes_only_routed_request_variants() {
use mob_dsl::MobMachineEffect as DslEffect;
let binding_in = DslEffect::RequestRuntimeBinding {
agent_identity: mob_dsl::AgentIdentity::from("a"),
agent_runtime_id: mob_dsl::AgentRuntimeId::from("rt"),
fence_token: mob_dsl::FenceToken(1),
generation: mob_dsl::Generation(0),
session_id: mob_dsl::SessionId::from("session-1"),
};
assert!(matches!(
lift_routed_effect(&binding_in),
Some(MobSeamEffect::Mob(
mob_dsl::MobMachineEffect::RequestRuntimeBinding { .. }
)),
));
let retire_in = DslEffect::RequestRuntimeRetire {
session_id: mob_dsl::SessionId::from("019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
};
assert!(matches!(
lift_routed_effect(&retire_in),
Some(MobSeamEffect::Mob(
mob_dsl::MobMachineEffect::RequestRuntimeRetire { .. }
)),
));
let local_only = DslEffect::PersistKickoffUpdate {
member_id: "m".into(),
phase: mob_dsl::KickoffPhase::Pending,
};
assert!(lift_routed_effect(&local_only).is_none());
}
#[tokio::test]
async fn standalone_binding_skips_dispatch_without_error() {
let binding: MobCompositionBinding = CompositionBinding::Standalone;
let effect = MobSeamEffect::Mob(mob_dsl::MobMachineEffect::RequestRuntimeRetire {
session_id: mob_dsl::SessionId::from("019dbd3d-d7ad-75a1-96d0-8013927e78f8"),
});
let outcome = dispatch_routed_effect(&binding, effect)
.await
.expect("standalone is not an error");
assert!(
outcome.is_none(),
"standalone dispatcher performs no routing"
);
}
#[cfg(feature = "runtime-adapter")]
#[tokio::test]
async fn mob_signal_consumer_backpressures_when_actor_queue_is_full() {
use super::super::state::MobCommand;
use std::time::Duration;
let (command_tx, mut command_rx) = mpsc::channel(1);
let first_signal = mob_dsl::MobMachineSignal::ObserveRuntimeReady {
agent_runtime_id: mob_dsl::AgentRuntimeId::from("rt-first"),
fence_token: mob_dsl::FenceToken(1),
};
command_tx
.try_send(MobCommand::ProjectMachineSignal {
signal: first_signal,
})
.expect("test precondition: bounded actor queue is full");
let consumer = MobSignalConsumerSurface::new(command_tx);
let mut receive = tokio::spawn(async move {
consumer
.receive_signal(
seam_facts::signals::observe_runtime_ready(),
vec![
(
seam_facts::fields::agent_runtime_id(),
OwnedFieldValue::Str("rt-backpressured".to_string()),
),
(seam_facts::fields::fence_token(), OwnedFieldValue::U64(7)),
],
)
.await
});
let premature = tokio::time::timeout(Duration::from_millis(50), &mut receive).await;
assert!(
premature.is_err(),
"full actor queue must backpressure routed lifecycle signals instead of failing fast"
);
match command_rx.recv().await.expect("first queued command") {
MobCommand::ProjectMachineSignal { signal } => assert!(matches!(
signal,
mob_dsl::MobMachineSignal::ObserveRuntimeReady {
agent_runtime_id,
fence_token: mob_dsl::FenceToken(1),
} if agent_runtime_id.0 == "rt-first"
)),
_ => panic!("unexpected command in test queue"),
}
receive
.await
.expect("signal receive task should not panic")
.expect("backpressured signal enqueue should complete once capacity opens");
match command_rx
.recv()
.await
.expect("backpressured signal command")
{
MobCommand::ProjectMachineSignal { signal } => assert!(matches!(
signal,
mob_dsl::MobMachineSignal::ObserveRuntimeReady {
agent_runtime_id,
fence_token: mob_dsl::FenceToken(7),
} if agent_runtime_id.0 == "rt-backpressured"
)),
_ => panic!("unexpected command in test queue"),
}
}
}