use meerkat_core::lifecycle::InputId;
use meerkat_core::types::ContentInput;
use meerkat_core::types::SessionId;
use crate::MeerkatMachine;
use crate::input::{
FlowStepInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
};
#[allow(unused_imports)]
use crate::service_ext::SessionServiceRuntimeExt as _;
use crate::traits::{RuntimeControlPlaneError, RuntimeDriverError};
pub fn create_flow_step_input(
step_id: &str,
instructions: ContentInput,
flow_id: &str,
step_index: usize,
turn_metadata: Option<meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata>,
) -> Input {
Input::FlowStep(FlowStepInput {
header: InputHeader {
id: InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::Flow {
flow_id: flow_id.into(),
step_index,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
step_id: step_id.into(),
content: instructions,
turn_metadata,
})
}
pub async fn register_mob_member(
adapter: &MeerkatMachine,
session_id: SessionId,
) -> Result<(), RuntimeControlPlaneError> {
adapter.register_session(session_id).await
}
pub async fn unregister_mob_member(adapter: &MeerkatMachine, session_id: &SessionId) {
adapter.unregister_session(session_id).await;
}
pub async fn deliver_flow_step(
adapter: &MeerkatMachine,
session_id: &SessionId,
step_id: &str,
instructions: impl Into<ContentInput>,
flow_id: &str,
step_index: usize,
) -> Result<crate::AcceptOutcome, RuntimeDriverError> {
let input = create_flow_step_input(step_id, instructions.into(), flow_id, step_index, None);
adapter.accept_input(session_id, input).await
}
pub async fn retire_mob_member(
adapter: &MeerkatMachine,
session_id: &SessionId,
) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
adapter.retire_runtime(session_id).await
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::policy_table::DefaultPolicyTable;
use std::sync::Arc;
#[tokio::test]
async fn spawn_creates_runtime_driver_session() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await.unwrap();
let state = adapter.runtime_state(&sid).await.unwrap();
assert_eq!(state, crate::RuntimeState::Idle);
}
#[tokio::test]
async fn flow_step_delivered_as_input() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await.unwrap();
let outcome = deliver_flow_step(&adapter, &sid, "step-1", "analyze the data", "flow-1", 0)
.await
.unwrap();
assert!(outcome.is_accepted());
let input = create_flow_step_input("s", "i".into(), "f", 0, None);
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, crate::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, crate::WakeMode::WakeIfIdle);
}
#[tokio::test]
async fn retire_without_runtime_loop_abandons_pending_inputs() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await.unwrap();
deliver_flow_step(&adapter, &sid, "s1", "do it", "f1", 0)
.await
.unwrap();
let report = retire_mob_member(&adapter, &sid).await.unwrap();
assert_eq!(report.inputs_abandoned, 1);
assert_eq!(report.inputs_pending_drain, 0);
}
#[tokio::test]
async fn create_flow_step_input_preserves_multimodal_blocks() -> Result<(), String> {
let input = create_flow_step_input(
"s",
ContentInput::Blocks(vec![
meerkat_core::types::ContentBlock::Text {
text: "inspect image".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
},
]),
"f",
0,
None,
);
let flow_step = match input {
Input::FlowStep(flow_step) => flow_step,
other => return Err(format!("expected flow step input, got {other:?}")),
};
assert_eq!(
flow_step.content.text_content(),
"inspect image\n[image: image/png]"
);
assert!(matches!(
&flow_step.content,
meerkat_core::types::ContentInput::Blocks(blocks) if blocks.len() == 2
));
Ok(())
}
#[tokio::test]
async fn unregister_removes_driver() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await.unwrap();
unregister_mob_member(&adapter, &sid).await;
let result = adapter.runtime_state(&sid).await;
assert!(result.is_err());
}
#[tokio::test]
async fn register_exposes_driver_state() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await.unwrap();
let active = adapter.list_active_inputs(&sid).await.unwrap();
assert!(active.is_empty()); }
#[tokio::test]
async fn register_session_surfaces_failure_on_destroyed_session() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
adapter.register_session(sid.clone()).await.unwrap();
let runtime_id = MeerkatMachine::logical_runtime_id(&sid);
crate::traits::RuntimeControlPlane::destroy(&*adapter, &runtime_id)
.await
.unwrap();
let result = adapter.register_session(sid.clone()).await;
assert!(
matches!(result, Err(RuntimeControlPlaneError::Internal(_))),
"register_session on a destroyed session must surface a typed control-plane error, got {result:?}"
);
}
#[tokio::test]
async fn register_mob_member_propagates_failure_on_destroyed_session() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await.unwrap();
let runtime_id = MeerkatMachine::logical_runtime_id(&sid);
crate::traits::RuntimeControlPlane::destroy(&*adapter, &runtime_id)
.await
.unwrap();
let result = register_mob_member(&adapter, sid.clone()).await;
assert!(
result.is_err(),
"register_mob_member must propagate the typed registration failure, got {result:?}"
);
}
#[tokio::test]
async fn set_session_silent_intents_surfaces_failure_on_destroyed_session() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let sid = SessionId::new();
adapter.register_session(sid.clone()).await.unwrap();
let runtime_id = MeerkatMachine::logical_runtime_id(&sid);
crate::traits::RuntimeControlPlane::destroy(&*adapter, &runtime_id)
.await
.unwrap();
let result = adapter
.set_session_silent_intents(&sid, vec!["status".to_string()])
.await;
assert!(
matches!(result, Err(RuntimeDriverError::Destroyed)),
"set_session_silent_intents must surface the inner command failure, got {result:?}"
);
}
}