use super::message::InboxMessage;
use crate::kernel::ExecutionId;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub trait InboxStore: Send + Sync {
fn push(&self, execution_id: &ExecutionId, message: InboxMessage);
fn len(&self, execution_id: &ExecutionId) -> usize;
fn is_empty(&self, execution_id: &ExecutionId) -> bool {
self.len(execution_id) == 0
}
fn has_control_messages(&self, execution_id: &ExecutionId) -> bool;
fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage>;
fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
fn clear(&self, execution_id: &ExecutionId);
}
#[derive(Default)]
pub struct InMemoryInboxStore {
messages: RwLock<HashMap<String, Vec<InboxMessage>>>,
}
impl InMemoryInboxStore {
pub fn new() -> Self {
Self {
messages: RwLock::new(HashMap::new()),
}
}
pub fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
}
impl InboxStore for InMemoryInboxStore {
fn push(&self, execution_id: &ExecutionId, message: InboxMessage) {
let mut guard = self.messages.write().expect("lock poisoned");
guard
.entry(execution_id.to_string())
.or_default()
.push(message);
}
fn len(&self, execution_id: &ExecutionId) -> usize {
let guard = self.messages.read().expect("lock poisoned");
guard
.get(&execution_id.to_string())
.map(|v| v.len())
.unwrap_or(0)
}
fn has_control_messages(&self, execution_id: &ExecutionId) -> bool {
let guard = self.messages.read().expect("lock poisoned");
guard
.get(&execution_id.to_string())
.map(|v| v.iter().any(|m| m.is_control()))
.unwrap_or(false)
}
fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage> {
let mut guard = self.messages.write().expect("lock poisoned");
let mut messages = guard.remove(&execution_id.to_string()).unwrap_or_default();
messages.sort_by_key(|m| m.priority_order());
messages
}
fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
let guard = self.messages.read().expect("lock poisoned");
guard.get(&execution_id.to_string()).and_then(|v| {
v.iter().min_by_key(|m| m.priority_order()).cloned()
})
}
fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
let mut guard = self.messages.write().expect("lock poisoned");
let messages = guard.get_mut(&execution_id.to_string())?;
if messages.is_empty() {
return None;
}
let min_idx = messages
.iter()
.enumerate()
.min_by_key(|(_, m)| m.priority_order())
.map(|(i, _)| i)?;
Some(messages.remove(min_idx))
}
fn clear(&self, execution_id: &ExecutionId) {
let mut guard = self.messages.write().expect("lock poisoned");
guard.remove(&execution_id.to_string());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::inbox::message::{
ControlAction, ControlMessage, EvidenceImpact, EvidenceSource, EvidenceUpdate,
GuidanceMessage,
};
fn test_execution_id() -> ExecutionId {
ExecutionId::new()
}
#[test]
fn test_push_and_drain() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let guidance =
InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "Focus on EU"));
store.push(&exec_id, guidance);
assert_eq!(store.len(&exec_id), 1);
assert!(!store.is_empty(&exec_id));
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 1);
assert!(store.is_empty(&exec_id));
}
#[test]
fn test_priority_sorting_inv_inbox_002() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let guidance =
InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low priority"));
let evidence = InboxMessage::Evidence(EvidenceUpdate::new(
exec_id.clone(),
EvidenceSource::Discovery,
"Found something",
serde_json::json!({}),
EvidenceImpact::Informational,
));
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, guidance);
store.push(&exec_id, evidence);
store.push(&exec_id, control);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 3);
assert!(messages[0].is_control());
assert!(matches!(messages[1], InboxMessage::Evidence(_)));
assert!(matches!(messages[2], InboxMessage::Guidance(_)));
}
#[test]
fn test_has_control_messages() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
assert!(!store.has_control_messages(&exec_id));
let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "test"));
store.push(&exec_id, guidance);
assert!(!store.has_control_messages(&exec_id));
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Cancel,
"admin",
));
store.push(&exec_id, control);
assert!(store.has_control_messages(&exec_id));
}
#[test]
fn test_pop_returns_highest_priority() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low"));
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, guidance);
store.push(&exec_id, control);
let msg = store.pop(&exec_id).unwrap();
assert!(msg.is_control());
let msg = store.pop(&exec_id).unwrap();
assert!(matches!(msg, InboxMessage::Guidance(_)));
assert!(store.pop(&exec_id).is_none());
}
#[test]
fn test_peek_does_not_remove() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, control);
let msg1 = store.peek(&exec_id);
let msg2 = store.peek(&exec_id);
assert!(msg1.is_some());
assert!(msg2.is_some());
assert_eq!(store.len(&exec_id), 1);
}
#[test]
fn test_execution_isolation_inv_inbox_004() {
let store = InMemoryInboxStore::new();
let exec_id_1 = test_execution_id();
let exec_id_2 = test_execution_id();
let control_1 = InboxMessage::Control(ControlMessage::new(
exec_id_1.clone(),
ControlAction::Pause,
"admin",
));
let control_2 = InboxMessage::Control(ControlMessage::new(
exec_id_2.clone(),
ControlAction::Cancel,
"admin",
));
store.push(&exec_id_1, control_1);
store.push(&exec_id_2, control_2);
assert_eq!(store.len(&exec_id_1), 1);
assert_eq!(store.len(&exec_id_2), 1);
let msgs = store.drain_messages(&exec_id_1);
assert_eq!(msgs.len(), 1);
assert_eq!(store.len(&exec_id_1), 0);
assert_eq!(store.len(&exec_id_2), 1);
}
#[test]
fn test_clear() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
for _ in 0..5 {
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, control);
}
assert_eq!(store.len(&exec_id), 5);
store.clear(&exec_id);
assert_eq!(store.len(&exec_id), 0);
assert!(store.is_empty(&exec_id));
}
#[test]
fn test_inbox_drain_ordering() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let msg1 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "first"));
let msg2 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "second"));
let msg3 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "third"));
let id1 = msg1.id().to_string();
let id2 = msg2.id().to_string();
let id3 = msg3.id().to_string();
store.push(&exec_id, msg1);
store.push(&exec_id, msg2);
store.push(&exec_id, msg3);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].id(), id1);
assert_eq!(messages[1].id(), id2);
assert_eq!(messages[2].id(), id3);
}
#[test]
fn test_control_priority() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let guidance =
InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low priority"));
store.push(&exec_id, guidance);
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Cancel,
"admin",
));
let control_id = control.id().to_string();
store.push(&exec_id, control);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 2);
assert!(messages[0].is_control());
assert_eq!(messages[0].id(), control_id);
assert!(matches!(messages[1], InboxMessage::Guidance(_)));
}
#[test]
fn test_inbox_scoped_to_execution() {
let store = InMemoryInboxStore::new();
let exec_id_1 = test_execution_id();
let exec_id_2 = test_execution_id();
let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(
exec_id_1.clone(),
"message for exec1",
));
let msg_id = guidance.id().to_string();
store.push(&exec_id_1, guidance);
let messages_2 = store.drain_messages(&exec_id_2);
assert!(messages_2.is_empty(), "exec_id_2 should have no messages");
let messages_1 = store.drain_messages(&exec_id_1);
assert_eq!(messages_1.len(), 1);
assert_eq!(messages_1[0].id(), msg_id);
}
#[test]
fn test_pause_resume_execution() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let pause = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, pause);
assert!(
store.has_control_messages(&exec_id),
"should have control messages after push"
);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 1);
if let InboxMessage::Control(ctrl) = &messages[0] {
assert_eq!(ctrl.action, ControlAction::Pause);
} else {
panic!("expected Control message");
}
let resume = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Resume,
"admin",
));
store.push(&exec_id, resume);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 1);
if let InboxMessage::Control(ctrl) = &messages[0] {
assert_eq!(ctrl.action, ControlAction::Resume);
} else {
panic!("expected Control message");
}
}
#[test]
fn test_cancel_long_running() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let cancel_reason = "Execution timed out after 30 minutes";
let cancel = InboxMessage::Control(
ControlMessage::new(exec_id.clone(), ControlAction::Cancel, "system")
.with_reason(cancel_reason),
);
store.push(&exec_id, cancel);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 1);
assert!(messages[0].is_control(), "should be a Control message");
if let InboxMessage::Control(ctrl) = &messages[0] {
assert_eq!(ctrl.action, ControlAction::Cancel);
assert_eq!(
ctrl.reason.as_deref(),
Some(cancel_reason),
"reason should be preserved"
);
assert_eq!(ctrl.actor, "system");
} else {
panic!("expected Control message");
}
}
#[test]
fn test_approval_flow_hitl() {
use crate::inbox::message::GuidancePriority;
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let approval_msg = GuidanceMessage::from_user(
exec_id.clone(),
"PLAN_APPROVED: User approved the proposed plan. Proceed with execution.",
)
.with_priority(GuidancePriority::High);
let approval_id = approval_msg.id.clone();
store.push(&exec_id, InboxMessage::Guidance(approval_msg));
assert_eq!(store.len(&exec_id), 1);
assert!(
!store.has_control_messages(&exec_id),
"guidance is not a control message"
);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 1);
if let InboxMessage::Guidance(g) = &messages[0] {
assert_eq!(g.id, approval_id);
assert!(g.content.contains("PLAN_APPROVED"));
assert_eq!(g.priority, GuidancePriority::High);
assert_eq!(g.from, crate::inbox::message::GuidanceSource::User);
} else {
panic!("expected Guidance message for approval");
}
let rejection_msg = GuidanceMessage::from_user(
exec_id.clone(),
"PLAN_REJECTED: User rejected the plan. Reason: Need more details on approach.",
)
.with_priority(GuidancePriority::High);
let rejection_id = rejection_msg.id.clone();
store.push(&exec_id, InboxMessage::Guidance(rejection_msg));
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 1);
if let InboxMessage::Guidance(g) = &messages[0] {
assert_eq!(g.id, rejection_id);
assert!(g.content.contains("PLAN_REJECTED"));
assert_eq!(g.priority, GuidancePriority::High);
} else {
panic!("expected Guidance message for rejection");
}
}
#[test]
fn test_hitl_priority_ordering() {
use crate::inbox::message::GuidancePriority;
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let normal_guidance = InboxMessage::Guidance(GuidanceMessage::from_user(
exec_id.clone(),
"normal guidance",
));
let hitl_approval = InboxMessage::Guidance(
GuidanceMessage::from_user(exec_id.clone(), "PLAN_APPROVED")
.with_priority(GuidancePriority::High),
);
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, normal_guidance);
store.push(&exec_id, hitl_approval);
store.push(&exec_id, control);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 3);
assert!(messages[0].is_control(), "control message should be first");
if let InboxMessage::Guidance(g) = &messages[1] {
assert!(
g.content.contains("PLAN_APPROVED"),
"high priority HITL should be second"
);
assert_eq!(g.priority, GuidancePriority::High);
} else {
panic!("expected high priority guidance second");
}
if let InboxMessage::Guidance(g) = &messages[2] {
assert!(
g.content.contains("normal guidance"),
"normal guidance should be last"
);
assert_eq!(g.priority, GuidancePriority::Medium);
} else {
panic!("expected normal guidance last");
}
}
}