#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
use meerkat_core::interaction::{
InboxInteraction, InteractionContent, InteractionId, ResponseStatus,
};
use meerkat_runtime::comms_bridge::interaction_to_peer_input;
use meerkat_runtime::driver::ephemeral::EphemeralRuntimeDriver;
use meerkat_runtime::identifiers::LogicalRuntimeId;
use meerkat_runtime::input::{Input, InputDurability, PeerConvention};
use meerkat_runtime::input_state::InputLifecycleState;
use meerkat_runtime::policy_table::DefaultPolicyTable;
use meerkat_runtime::runtime_state::RuntimeState;
use meerkat_runtime::traits::RuntimeDriver;
use uuid::Uuid;
fn iid() -> InteractionId {
InteractionId(Uuid::now_v7())
}
fn make_message(from: &str, body: &str) -> InboxInteraction {
InboxInteraction {
id: iid(),
from: from.into(),
content: InteractionContent::Message { body: body.into() },
rendered_text: format!("[{from}]: {body}"),
}
}
fn make_response(from: &str, status: ResponseStatus) -> InboxInteraction {
let in_reply_to = iid();
InboxInteraction {
id: iid(),
from: from.into(),
content: InteractionContent::Response {
in_reply_to,
status,
result: serde_json::json!({"ok": true}),
},
rendered_text: format!("[{from}]: response ({status:?})"),
}
}
fn make_request(from: &str, intent: &str) -> InboxInteraction {
InboxInteraction {
id: iid(),
from: from.into(),
content: InteractionContent::Request {
intent: intent.into(),
params: serde_json::json!({}),
},
rendered_text: format!("[{from}]: request ({intent})"),
}
}
fn rid() -> LogicalRuntimeId {
LogicalRuntimeId::new("test-runtime")
}
#[tokio::test]
async fn completed_response_idle_wakes() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(ref p) = input {
assert!(matches!(
p.convention,
Some(PeerConvention::ResponseTerminal { .. })
));
assert_eq!(p.header.durability, InputDurability::Durable);
} else {
panic!("Expected PeerInput");
}
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn accepted_response_no_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_response("peer-1", ResponseStatus::Accepted);
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(ref p) = input {
assert!(matches!(
p.convention,
Some(PeerConvention::ResponseProgress { .. })
));
assert_eq!(p.header.durability, InputDurability::Ephemeral);
} else {
panic!("Expected PeerInput");
}
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(
policy.apply_mode,
meerkat_runtime::ApplyMode::StageRunBoundary
);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::None);
assert_eq!(policy.queue_mode, meerkat_runtime::QueueMode::Coalesce);
assert_eq!(
policy.consume_point,
meerkat_runtime::ConsumePoint::OnRunComplete
);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(!driver.take_wake_requested());
if let meerkat_runtime::AcceptOutcome::Accepted { input_id, .. } = &outcome {
let state = driver.input_state(input_id).unwrap();
assert_eq!(state.current_state, InputLifecycleState::Queued);
}
}
#[tokio::test]
async fn failed_response_idle_wakes() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_response("peer-1", ResponseStatus::Failed);
let input = interaction_to_peer_input(&interaction, &rid());
if let Input::Peer(ref p) = input {
assert!(matches!(
p.convention,
Some(PeerConvention::ResponseTerminal {
status: meerkat_runtime::ResponseTerminalStatus::Failed,
..
})
));
} else {
panic!("Expected PeerInput");
}
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn response_with_passthrough_message_both_queued() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let resp = make_response("peer-1", ResponseStatus::Completed);
let input1 = interaction_to_peer_input(&resp, &rid());
driver.accept_input(input1).await.unwrap();
let msg = make_message("peer-2", "hello");
let input2 = interaction_to_peer_input(&msg, &rid());
driver.accept_input(input2).await.unwrap();
assert_eq!(driver.queue().len(), 2);
assert!(driver.take_wake_requested()); }
#[tokio::test]
async fn response_after_completed_turn_wakes() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let run_id = meerkat_core::lifecycle::RunId::new();
driver
.state_machine_mut()
.start_run(run_id.clone())
.unwrap();
driver.state_machine_mut().complete_run().unwrap();
let resp = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&resp, &rid());
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested()); }
#[tokio::test]
async fn peer_lifecycle_accepts_as_requests() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let req1 = make_request("peer-1", "mob.peer_added");
let input1 = interaction_to_peer_input(&req1, &rid());
if let Input::Peer(ref p) = input1 {
assert!(matches!(p.convention, Some(PeerConvention::Request { .. })));
}
let policy = DefaultPolicyTable::resolve(&input1, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input1).await.unwrap();
assert!(outcome.is_accepted());
}
#[tokio::test]
async fn peer_lifecycle_net_out_both_accepted() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let added = make_request("peer-1", "mob.peer_added");
let retired = make_request("peer-1", "mob.peer_retired");
let input1 = interaction_to_peer_input(&added, &rid());
let input2 = interaction_to_peer_input(&retired, &rid());
let o1 = driver.accept_input(input1).await.unwrap();
let o2 = driver.accept_input(input2).await.unwrap();
assert!(o1.is_accepted());
assert!(o2.is_accepted());
assert_eq!(driver.queue().len(), 2); }
#[tokio::test]
async fn silent_intent_maps_to_request_with_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_request("coordinator", "mob.peer_added");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
}
#[tokio::test]
async fn non_silent_intent_triggers_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_request("coordinator", "custom.action");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn message_triggers_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_message("peer-1", "hello world");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::WakeIfIdle);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
assert_eq!(driver.queue().len(), 1);
}
#[tokio::test]
async fn request_triggers_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
let interaction = make_request("peer-1", "analyze");
let input = interaction_to_peer_input(&interaction, &rid());
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(driver.take_wake_requested());
}
#[tokio::test]
async fn no_input_no_wake() {
let driver = EphemeralRuntimeDriver::new(rid());
assert!(driver.queue().is_empty());
assert_eq!(driver.runtime_state(), RuntimeState::Idle);
}
#[tokio::test]
async fn message_while_running_checkpoint_no_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
driver
.state_machine_mut()
.start_run(meerkat_core::lifecycle::RunId::new())
.unwrap();
let interaction = make_message("peer-1", "hello");
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, false);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(
policy.wake_mode,
meerkat_runtime::WakeMode::InterruptYielding
);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(!driver.take_wake_requested()); }
#[tokio::test]
async fn terminal_response_while_running_no_wake() {
let mut driver = EphemeralRuntimeDriver::new(rid());
driver
.state_machine_mut()
.start_run(meerkat_core::lifecycle::RunId::new())
.unwrap();
let interaction = make_response("peer-1", ResponseStatus::Completed);
let input = interaction_to_peer_input(&interaction, &rid());
let policy = DefaultPolicyTable::resolve(&input, false);
assert_eq!(policy.apply_mode, meerkat_runtime::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, meerkat_runtime::WakeMode::None);
let outcome = driver.accept_input(input).await.unwrap();
assert!(outcome.is_accepted());
assert!(!driver.take_wake_requested());
}