#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
use meerkat_core::interaction::{
InboxInteraction, InteractionContent, InteractionId, ResponseStatus,
};
use meerkat_runtime::comms_bridge::interaction_to_peer_input;
use meerkat_runtime::driver::ephemeral::EphemeralRuntimeDriver;
use meerkat_runtime::identifiers::LogicalRuntimeId;
use meerkat_runtime::input::{Input, InputDurability, PeerConvention};
use meerkat_runtime::input_state::InputLifecycleState;
use meerkat_runtime::policy_table::DefaultPolicyTable;
use meerkat_runtime::runtime_state::RuntimeState;
use meerkat_runtime::traits::RuntimeDriver;
use uuid::Uuid;
fn iid() -> InteractionId {
InteractionId(Uuid::now_v7())
}
fn make_message(from: &str, body: &str) -> InboxInteraction {
InboxInteraction {
id: iid(),
from: from.into(),
content: InteractionContent::Message {
body: body.into(),
blocks: None,
},
rendered_text: format!("[{from}]: {body}"),
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
}
}
fn make_message_with_blocks(from: &str, body: &str) -> InboxInteraction {
InboxInteraction {
id: iid(),
from: from.into(),
content: InteractionContent::Message {
body: body.into(),
blocks: Some(vec![
meerkat_core::types::ContentBlock::Text { text: body.into() },
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
},
]),
},
rendered_text: format!("[{from}]: {body}"),
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
}
}
fn make_response(from: &str, status: ResponseStatus) -> InboxInteraction {
let in_reply_to = iid();
InboxInteraction {
id: iid(),
from: from.into(),
content: InteractionContent::Response {
in_reply_to,
status,
result: serde_json::json!({"ok": true}),
},
rendered_text: format!("[{from}]: response ({status:?})"),
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
}
}
fn make_request(from: &str, intent: &str) -> InboxInteraction {
InboxInteraction {
id: iid(),
from: from.into(),
content: InteractionContent::Request {
intent: intent.into(),
params: serde_json::json!({}),
},
rendered_text: format!("[{from}]: request ({intent})"),
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
}
}
fn rid() -> LogicalRuntimeId {
LogicalRuntimeId::new("test-runtime")
}
#[tokio::test]
async fn completed_response_idle_wakes() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(ref p) = input {
assert!(matches!(
p.convention,
Some(PeerConvention::ResponseTerminal { .. })
));
assert_eq!(p.header.durability, InputDurability::Durable);
} else {
panic!("Expected PeerInput");
}
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn accepted_response_no_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_response("peer-1", ResponseStatus::Accepted);
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(ref p) = input {
assert!(matches!(
p.convention,
Some(PeerConvention::ResponseProgress { .. })
));
assert_eq!(p.header.durability, InputDurability::Ephemeral);
} else {
panic!("Expected PeerInput");
}
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(
policy.apply_mode,
meerkat_runtime::ApplyMode::StageRunBoundary
);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::None);
assert_eq!(policy.queue_mode, meerkat_runtime::QueueMode::Coalesce);
assert_eq!(
policy.consume_point,
meerkat_runtime::ConsumePoint::OnRunComplete
);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(!driver.take_wake_requested());
if let meerkat_runtime::AcceptOutcome::Accepted { input_id, .. } = &outcome {
let state = driver.input_state(input_id).unwrap();
assert_eq!(state.current_state(), InputLifecycleState::Queued);
}
}
#[tokio::test]
async fn failed_response_idle_wakes() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_response("peer-1", ResponseStatus::Failed);
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(ref p) = input {
assert!(matches!(
p.convention,
Some(PeerConvention::ResponseTerminal {
status: meerkat_runtime::ResponseTerminalStatus::Failed,
..
})
));
} else {
panic!("Expected PeerInput");
}
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn response_with_passthrough_message_both_queued() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let resp = make_response("peer-1", ResponseStatus::Completed);
let input1 = interaction_to_peer_input(&resp, &rid());
driver.accept_input(input1).await.unwrap();
let msg = make_message("peer-2", "hello");
let input2 = interaction_to_peer_input(&msg, &rid());
driver.accept_input(input2).await.unwrap();
assert_eq!(driver.queue().len(), 2);
assert!(driver.take_wake_requested()); }
#[tokio::test]
async fn response_after_completed_turn_wakes() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let run_id = meerkat_core::lifecycle::RunId::new();
driver.start_run(run_id.clone()).unwrap();
driver.complete_run().unwrap();
let resp = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&resp, &rid());
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested()); }
#[tokio::test]
async fn peer_lifecycle_accepts_as_requests() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let req1 = make_request("peer-1", "mob.peer_added");
let input1 = interaction_to_peer_input(&req1, &rid());
if let Input::Peer(ref p) = input1 {
assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
}
let policy = DefaultPolicyTable::resolve(&input1, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input1).await.unwrap();
assert!(outcome.is_accepted());
}
#[tokio::test]
async fn peer_lifecycle_net_out_both_accepted() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let added = make_request("peer-1", "mob.peer_added");
let retired = make_request("peer-1", "mob.peer_retired");
let input1 = interaction_to_peer_input(&added, &rid());
let input2 = interaction_to_peer_input(&retired, &rid());
let o1 = driver.accept_input(input1).await.unwrap();
let o2 = driver.accept_input(input2).await.unwrap();
assert!(o1.is_accepted());
assert!(o2.is_accepted());
assert_eq!(driver.queue().len(), 2); }
#[tokio::test]
async fn silent_intent_maps_to_request_with_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_request("coordinator", "mob.peer_added");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
}
#[tokio::test]
async fn non_silent_intent_triggers_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_request("coordinator", "custom.action");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn message_triggers_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_message("peer-1", "hello world");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
assert_eq!(driver.queue().len(), 1);
}
#[tokio::test]
async fn request_triggers_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_request("peer-1", "analyze");
let input = interaction_to_peer_input(&interaction, &rid());
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn request_prompt_uses_rendered_text_projection() {
let interaction = make_request("peer-1", "custom.action");
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(peer) = input {
assert_eq!(peer.body, interaction.rendered_text);
} else {
panic!("Expected PeerInput");
}
}
#[tokio::test]
async fn response_prompt_uses_rendered_text_projection() {
let interaction = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(peer) = input {
assert_eq!(peer.body, interaction.rendered_text);
} else {
panic!("Expected PeerInput");
}
}
#[tokio::test]
async fn message_blocks_survive_bridge() {
let interaction = make_message_with_blocks("peer-1", "look");
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(peer) = input {
assert!(peer.blocks.is_some());
assert_eq!(peer.body, interaction.rendered_text);
} else {
panic!("Expected PeerInput");
}
}
#[tokio::test]
async fn no_input_no_wake() {
let driver = EphemeralRuntimeDriver::new(rid());
assert!(driver.queue().is_empty());
assert_eq!(driver.runtime_state(), RuntimeState::Idle);
}
#[tokio::test]
async fn message_while_running_queues_without_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
driver
.start_run(meerkat_core::lifecycle::RunId::new())
.unwrap();
let interaction = make_message("peer-1", "hello");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, false);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::None);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(!driver.take_wake_requested());
}
#[tokio::test]
async fn terminal_response_while_running_no_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
driver
.start_run(meerkat_core::lifecycle::RunId::new())
.unwrap();
let interaction = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, false);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::None);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(!driver.take_wake_requested());
}
#[tokio::test]
async fn drain_terminal_response_produces_exactly_one_peer_input() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&interaction, &rid());
assert!(
matches!(&input, Input::Peer(_)),
"terminal response must map to Peer, got {:?}",
input.kind_id()
);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert_eq!(
driver.queue().len(),
1,
"terminal response must produce exactly 1 queued input"
);
let queued_ids = driver.queue().input_ids();
let queued_state = driver.input_state(&queued_ids[0]).unwrap();
if let Some(Input::Peer(peer)) = &queued_state.persisted_input {
assert!(
matches!(
peer.convention,
Some(PeerConvention::ResponseTerminal { .. })
),
"queued input must be ResponseTerminal"
);
}
}
#[tokio::test]
async fn terminal_response_with_steer_policy_while_running() {
let in_reply_to = iid();
let interaction = InboxInteraction {
id: iid(),
from: "peer-1".into(),
content: InteractionContent::Response {
in_reply_to,
status: ResponseStatus::Completed,
result: serde_json::json!({"ok": true}),
},
rendered_text: "[peer-1]: response (Completed)".into(),
handling_mode: meerkat_core::types::HandlingMode::Steer,
render_metadata: None,
};
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, false);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::None);
assert_eq!(
policy.routing_disposition,
meerkat_runtime::RoutingDisposition::Steer
);
let mut driver = EphemeralRuntimeDriver::new(rid());
driver
.start_run(meerkat_core::lifecycle::RunId::new())
.unwrap();
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn terminal_response_with_steer_policy_while_idle() {
let in_reply_to = iid();
let interaction = InboxInteraction {
id: iid(),
from: "peer-1".into(),
content: InteractionContent::Response {
in_reply_to,
status: ResponseStatus::Completed,
result: serde_json::json!({"ok": true}),
},
rendered_text: "[peer-1]: response (Completed)".into(),
handling_mode: meerkat_core::types::HandlingMode::Steer,
render_metadata: None,
};
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
assert_eq!(
policy.routing_disposition,
meerkat_runtime::RoutingDisposition::Steer
);
let mut driver = EphemeralRuntimeDriver::new(rid());
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}