use super::message::InboxMessage;
use crate::kernel::ExecutionId;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub trait InboxStore: Send + Sync {
fn push(&self, execution_id: &ExecutionId, message: InboxMessage);
fn len(&self, execution_id: &ExecutionId) -> usize;
fn is_empty(&self, execution_id: &ExecutionId) -> bool {
self.len(execution_id) == 0
}
fn has_control_messages(&self, execution_id: &ExecutionId) -> bool;
fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage>;
fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
fn clear(&self, execution_id: &ExecutionId);
}
#[derive(Default)]
pub struct InMemoryInboxStore {
messages: RwLock<HashMap<String, Vec<InboxMessage>>>,
}
impl InMemoryInboxStore {
pub fn new() -> Self {
Self {
messages: RwLock::new(HashMap::new()),
}
}
pub fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
}
impl InboxStore for InMemoryInboxStore {
fn push(&self, execution_id: &ExecutionId, message: InboxMessage) {
let mut guard = self.messages.write().expect("lock poisoned");
guard
.entry(execution_id.to_string())
.or_default()
.push(message);
}
fn len(&self, execution_id: &ExecutionId) -> usize {
let guard = self.messages.read().expect("lock poisoned");
guard
.get(&execution_id.to_string())
.map(|v| v.len())
.unwrap_or(0)
}
fn has_control_messages(&self, execution_id: &ExecutionId) -> bool {
let guard = self.messages.read().expect("lock poisoned");
guard
.get(&execution_id.to_string())
.map(|v| v.iter().any(|m| m.is_control()))
.unwrap_or(false)
}
fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage> {
let mut guard = self.messages.write().expect("lock poisoned");
let mut messages = guard.remove(&execution_id.to_string()).unwrap_or_default();
messages.sort_by_key(|m| m.priority_order());
messages
}
fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
let guard = self.messages.read().expect("lock poisoned");
guard
.get(&execution_id.to_string())
.and_then(|v| {
v.iter()
.min_by_key(|m| m.priority_order())
.cloned()
})
}
fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
let mut guard = self.messages.write().expect("lock poisoned");
let messages = guard.get_mut(&execution_id.to_string())?;
if messages.is_empty() {
return None;
}
let min_idx = messages
.iter()
.enumerate()
.min_by_key(|(_, m)| m.priority_order())
.map(|(i, _)| i)?;
Some(messages.remove(min_idx))
}
fn clear(&self, execution_id: &ExecutionId) {
let mut guard = self.messages.write().expect("lock poisoned");
guard.remove(&execution_id.to_string());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::inbox::message::{
ControlAction, ControlMessage, EvidenceImpact, EvidenceUpdate, EvidenceSource,
GuidanceMessage,
};
fn test_execution_id() -> ExecutionId {
ExecutionId::new()
}
#[test]
fn test_push_and_drain() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let guidance = InboxMessage::Guidance(
GuidanceMessage::from_user(exec_id.clone(), "Focus on EU"),
);
store.push(&exec_id, guidance);
assert_eq!(store.len(&exec_id), 1);
assert!(!store.is_empty(&exec_id));
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 1);
assert!(store.is_empty(&exec_id));
}
#[test]
fn test_priority_sorting_inv_inbox_002() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let guidance = InboxMessage::Guidance(
GuidanceMessage::from_user(exec_id.clone(), "low priority"),
);
let evidence = InboxMessage::Evidence(EvidenceUpdate::new(
exec_id.clone(),
EvidenceSource::Discovery,
"Found something",
serde_json::json!({}),
EvidenceImpact::Informational,
));
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, guidance);
store.push(&exec_id, evidence);
store.push(&exec_id, control);
let messages = store.drain_messages(&exec_id);
assert_eq!(messages.len(), 3);
assert!(messages[0].is_control());
assert!(matches!(messages[1], InboxMessage::Evidence(_)));
assert!(matches!(messages[2], InboxMessage::Guidance(_)));
}
#[test]
fn test_has_control_messages() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
assert!(!store.has_control_messages(&exec_id));
let guidance = InboxMessage::Guidance(
GuidanceMessage::from_user(exec_id.clone(), "test"),
);
store.push(&exec_id, guidance);
assert!(!store.has_control_messages(&exec_id));
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Cancel,
"admin",
));
store.push(&exec_id, control);
assert!(store.has_control_messages(&exec_id));
}
#[test]
fn test_pop_returns_highest_priority() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let guidance = InboxMessage::Guidance(
GuidanceMessage::from_user(exec_id.clone(), "low"),
);
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, guidance);
store.push(&exec_id, control);
let msg = store.pop(&exec_id).unwrap();
assert!(msg.is_control());
let msg = store.pop(&exec_id).unwrap();
assert!(matches!(msg, InboxMessage::Guidance(_)));
assert!(store.pop(&exec_id).is_none());
}
#[test]
fn test_peek_does_not_remove() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, control);
let msg1 = store.peek(&exec_id);
let msg2 = store.peek(&exec_id);
assert!(msg1.is_some());
assert!(msg2.is_some());
assert_eq!(store.len(&exec_id), 1);
}
#[test]
fn test_execution_isolation_inv_inbox_004() {
let store = InMemoryInboxStore::new();
let exec_id_1 = test_execution_id();
let exec_id_2 = test_execution_id();
let control_1 = InboxMessage::Control(ControlMessage::new(
exec_id_1.clone(),
ControlAction::Pause,
"admin",
));
let control_2 = InboxMessage::Control(ControlMessage::new(
exec_id_2.clone(),
ControlAction::Cancel,
"admin",
));
store.push(&exec_id_1, control_1);
store.push(&exec_id_2, control_2);
assert_eq!(store.len(&exec_id_1), 1);
assert_eq!(store.len(&exec_id_2), 1);
let msgs = store.drain_messages(&exec_id_1);
assert_eq!(msgs.len(), 1);
assert_eq!(store.len(&exec_id_1), 0);
assert_eq!(store.len(&exec_id_2), 1);
}
#[test]
fn test_clear() {
let store = InMemoryInboxStore::new();
let exec_id = test_execution_id();
for _ in 0..5 {
let control = InboxMessage::Control(ControlMessage::new(
exec_id.clone(),
ControlAction::Pause,
"admin",
));
store.push(&exec_id, control);
}
assert_eq!(store.len(&exec_id), 5);
store.clear(&exec_id);
assert_eq!(store.len(&exec_id), 0);
assert!(store.is_empty(&exec_id));
}
}