use meerkat_core::lifecycle::InputId;
use meerkat_core::types::ContentInput;
use meerkat_core::types::SessionId;
use crate::RuntimeSessionAdapter;
use crate::input::{
FlowStepInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
};
#[allow(unused_imports)]
use crate::service_ext::SessionServiceRuntimeExt;
use crate::traits::RuntimeDriverError;
pub fn create_flow_step_input(
step_id: &str,
instructions: ContentInput,
flow_id: &str,
step_index: usize,
turn_metadata: Option<meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata>,
) -> Input {
let instructions_text = instructions.text_content();
let blocks = if instructions.has_images() {
Some(instructions.into_blocks())
} else {
None
};
Input::FlowStep(FlowStepInput {
header: InputHeader {
id: InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::Flow {
flow_id: flow_id.into(),
step_index,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
step_id: step_id.into(),
instructions: instructions_text,
blocks,
turn_metadata,
})
}
pub async fn register_mob_member(adapter: &RuntimeSessionAdapter, session_id: SessionId) {
adapter.register_session(session_id).await;
}
pub async fn unregister_mob_member(adapter: &RuntimeSessionAdapter, session_id: &SessionId) {
adapter.unregister_session(session_id).await;
}
pub async fn deliver_flow_step(
adapter: &RuntimeSessionAdapter,
session_id: &SessionId,
step_id: &str,
instructions: impl Into<ContentInput>,
flow_id: &str,
step_index: usize,
) -> Result<crate::AcceptOutcome, RuntimeDriverError> {
let input = create_flow_step_input(step_id, instructions.into(), flow_id, step_index, None);
adapter.accept_input(session_id, input).await
}
pub async fn retire_mob_member(
adapter: &RuntimeSessionAdapter,
session_id: &SessionId,
) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
adapter.retire_runtime(session_id).await
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::policy_table::DefaultPolicyTable;
#[tokio::test]
async fn spawn_creates_runtime_driver_session() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await;
let state = adapter.runtime_state(&sid).await.unwrap();
assert_eq!(state, crate::RuntimeState::Idle);
}
#[tokio::test]
async fn flow_step_delivered_as_input() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await;
let outcome = deliver_flow_step(&adapter, &sid, "step-1", "analyze the data", "flow-1", 0)
.await
.unwrap();
assert!(outcome.is_accepted());
let input = create_flow_step_input("s", "i".into(), "f", 0, None);
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, crate::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, crate::WakeMode::WakeIfIdle);
}
#[tokio::test]
async fn retire_without_runtime_loop_abandons_pending_inputs() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await;
deliver_flow_step(&adapter, &sid, "s1", "do it", "f1", 0)
.await
.unwrap();
let report = retire_mob_member(&adapter, &sid).await.unwrap();
assert_eq!(report.inputs_abandoned, 1);
assert_eq!(report.inputs_pending_drain, 0);
}
#[tokio::test]
async fn create_flow_step_input_preserves_multimodal_blocks() -> Result<(), String> {
let input = create_flow_step_input(
"s",
ContentInput::Blocks(vec![
meerkat_core::types::ContentBlock::Text {
text: "inspect image".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
},
]),
"f",
0,
None,
);
let flow_step = match input {
Input::FlowStep(flow_step) => flow_step,
other => return Err(format!("expected flow step input, got {other:?}")),
};
assert_eq!(flow_step.instructions, "inspect image\n[image: image/png]");
assert_eq!(flow_step.blocks.as_ref().map(Vec::len), Some(2));
Ok(())
}
#[tokio::test]
async fn unregister_removes_driver() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await;
unregister_mob_member(&adapter, &sid).await;
let result = adapter.runtime_state(&sid).await;
assert!(result.is_err());
}
#[tokio::test]
async fn wiring_emits_topology_event() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
register_mob_member(&adapter, sid.clone()).await;
let active = adapter.list_active_inputs(&sid).await.unwrap();
assert!(active.is_empty()); }
}