1use super::message::InboxMessage;
13use crate::kernel::ExecutionId;
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17pub trait InboxStore: Send + Sync {
21 fn push(&self, execution_id: &ExecutionId, message: InboxMessage);
27
28 fn len(&self, execution_id: &ExecutionId) -> usize;
30
31 fn is_empty(&self, execution_id: &ExecutionId) -> bool {
33 self.len(execution_id) == 0
34 }
35
36 fn has_control_messages(&self, execution_id: &ExecutionId) -> bool;
40
41 fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage>;
52
53 fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
55
56 fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
58
59 fn clear(&self, execution_id: &ExecutionId);
61}
62
63#[derive(Default)]
68pub struct InMemoryInboxStore {
69 messages: RwLock<HashMap<String, Vec<InboxMessage>>>,
71}
72
73impl InMemoryInboxStore {
74 pub fn new() -> Self {
76 Self {
77 messages: RwLock::new(HashMap::new()),
78 }
79 }
80
81 pub fn shared() -> Arc<Self> {
83 Arc::new(Self::new())
84 }
85}
86
87impl InboxStore for InMemoryInboxStore {
88 fn push(&self, execution_id: &ExecutionId, message: InboxMessage) {
89 let mut guard = self.messages.write().expect("lock poisoned");
90 guard
91 .entry(execution_id.to_string())
92 .or_default()
93 .push(message);
94 }
95
96 fn len(&self, execution_id: &ExecutionId) -> usize {
97 let guard = self.messages.read().expect("lock poisoned");
98 guard
99 .get(&execution_id.to_string())
100 .map(|v| v.len())
101 .unwrap_or(0)
102 }
103
104 fn has_control_messages(&self, execution_id: &ExecutionId) -> bool {
105 let guard = self.messages.read().expect("lock poisoned");
106 guard
107 .get(&execution_id.to_string())
108 .map(|v| v.iter().any(|m| m.is_control()))
109 .unwrap_or(false)
110 }
111
112 fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage> {
113 let mut guard = self.messages.write().expect("lock poisoned");
114 let mut messages = guard.remove(&execution_id.to_string()).unwrap_or_default();
115
116 messages.sort_by_key(|m| m.priority_order());
118
119 messages
120 }
121
122 fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
123 let guard = self.messages.read().expect("lock poisoned");
124 guard.get(&execution_id.to_string()).and_then(|v| {
125 v.iter().min_by_key(|m| m.priority_order()).cloned()
127 })
128 }
129
130 fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
131 let mut guard = self.messages.write().expect("lock poisoned");
132 let messages = guard.get_mut(&execution_id.to_string())?;
133
134 if messages.is_empty() {
135 return None;
136 }
137
138 let min_idx = messages
140 .iter()
141 .enumerate()
142 .min_by_key(|(_, m)| m.priority_order())
143 .map(|(i, _)| i)?;
144
145 Some(messages.remove(min_idx))
146 }
147
148 fn clear(&self, execution_id: &ExecutionId) {
149 let mut guard = self.messages.write().expect("lock poisoned");
150 guard.remove(&execution_id.to_string());
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157 use crate::inbox::message::{
158 ControlAction, ControlMessage, EvidenceImpact, EvidenceSource, EvidenceUpdate,
159 GuidanceMessage,
160 };
161
162 fn test_execution_id() -> ExecutionId {
163 ExecutionId::new()
164 }
165
166 #[test]
167 fn test_push_and_drain() {
168 let store = InMemoryInboxStore::new();
169 let exec_id = test_execution_id();
170
171 let guidance =
173 InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "Focus on EU"));
174 store.push(&exec_id, guidance);
175
176 assert_eq!(store.len(&exec_id), 1);
177 assert!(!store.is_empty(&exec_id));
178
179 let messages = store.drain_messages(&exec_id);
181 assert_eq!(messages.len(), 1);
182 assert!(store.is_empty(&exec_id));
183 }
184
185 #[test]
186 fn test_priority_sorting_inv_inbox_002() {
187 let store = InMemoryInboxStore::new();
188 let exec_id = test_execution_id();
189
190 let guidance =
192 InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low priority"));
193 let evidence = InboxMessage::Evidence(EvidenceUpdate::new(
194 exec_id.clone(),
195 EvidenceSource::Discovery,
196 "Found something",
197 serde_json::json!({}),
198 EvidenceImpact::Informational,
199 ));
200 let control = InboxMessage::Control(ControlMessage::new(
201 exec_id.clone(),
202 ControlAction::Pause,
203 "admin",
204 ));
205
206 store.push(&exec_id, guidance);
207 store.push(&exec_id, evidence);
208 store.push(&exec_id, control);
209
210 let messages = store.drain_messages(&exec_id);
212 assert_eq!(messages.len(), 3);
213
214 assert!(messages[0].is_control());
216 assert!(matches!(messages[1], InboxMessage::Evidence(_)));
217 assert!(matches!(messages[2], InboxMessage::Guidance(_)));
218 }
219
220 #[test]
221 fn test_has_control_messages() {
222 let store = InMemoryInboxStore::new();
223 let exec_id = test_execution_id();
224
225 assert!(!store.has_control_messages(&exec_id));
227
228 let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "test"));
230 store.push(&exec_id, guidance);
231 assert!(!store.has_control_messages(&exec_id));
232
233 let control = InboxMessage::Control(ControlMessage::new(
235 exec_id.clone(),
236 ControlAction::Cancel,
237 "admin",
238 ));
239 store.push(&exec_id, control);
240 assert!(store.has_control_messages(&exec_id));
241 }
242
243 #[test]
244 fn test_pop_returns_highest_priority() {
245 let store = InMemoryInboxStore::new();
246 let exec_id = test_execution_id();
247
248 let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low"));
250 let control = InboxMessage::Control(ControlMessage::new(
251 exec_id.clone(),
252 ControlAction::Pause,
253 "admin",
254 ));
255
256 store.push(&exec_id, guidance);
257 store.push(&exec_id, control);
258
259 let msg = store.pop(&exec_id).unwrap();
261 assert!(msg.is_control());
262
263 let msg = store.pop(&exec_id).unwrap();
265 assert!(matches!(msg, InboxMessage::Guidance(_)));
266
267 assert!(store.pop(&exec_id).is_none());
269 }
270
271 #[test]
272 fn test_peek_does_not_remove() {
273 let store = InMemoryInboxStore::new();
274 let exec_id = test_execution_id();
275
276 let control = InboxMessage::Control(ControlMessage::new(
277 exec_id.clone(),
278 ControlAction::Pause,
279 "admin",
280 ));
281 store.push(&exec_id, control);
282
283 let msg1 = store.peek(&exec_id);
285 let msg2 = store.peek(&exec_id);
286
287 assert!(msg1.is_some());
288 assert!(msg2.is_some());
289 assert_eq!(store.len(&exec_id), 1);
290 }
291
292 #[test]
293 fn test_execution_isolation_inv_inbox_004() {
294 let store = InMemoryInboxStore::new();
295 let exec_id_1 = test_execution_id();
296 let exec_id_2 = test_execution_id();
297
298 let control_1 = InboxMessage::Control(ControlMessage::new(
300 exec_id_1.clone(),
301 ControlAction::Pause,
302 "admin",
303 ));
304 let control_2 = InboxMessage::Control(ControlMessage::new(
305 exec_id_2.clone(),
306 ControlAction::Cancel,
307 "admin",
308 ));
309
310 store.push(&exec_id_1, control_1);
311 store.push(&exec_id_2, control_2);
312
313 assert_eq!(store.len(&exec_id_1), 1);
315 assert_eq!(store.len(&exec_id_2), 1);
316
317 let msgs = store.drain_messages(&exec_id_1);
319 assert_eq!(msgs.len(), 1);
320 assert_eq!(store.len(&exec_id_1), 0);
321 assert_eq!(store.len(&exec_id_2), 1);
322 }
323
324 #[test]
325 fn test_clear() {
326 let store = InMemoryInboxStore::new();
327 let exec_id = test_execution_id();
328
329 for _ in 0..5 {
330 let control = InboxMessage::Control(ControlMessage::new(
331 exec_id.clone(),
332 ControlAction::Pause,
333 "admin",
334 ));
335 store.push(&exec_id, control);
336 }
337
338 assert_eq!(store.len(&exec_id), 5);
339
340 store.clear(&exec_id);
341
342 assert_eq!(store.len(&exec_id), 0);
343 assert!(store.is_empty(&exec_id));
344 }
345
346 #[test]
347 fn test_inbox_drain_ordering() {
348 let store = InMemoryInboxStore::new();
350 let exec_id = test_execution_id();
351
352 let msg1 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "first"));
355 let msg2 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "second"));
356 let msg3 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "third"));
357
358 let id1 = msg1.id().to_string();
360 let id2 = msg2.id().to_string();
361 let id3 = msg3.id().to_string();
362
363 store.push(&exec_id, msg1);
364 store.push(&exec_id, msg2);
365 store.push(&exec_id, msg3);
366
367 let messages = store.drain_messages(&exec_id);
369 assert_eq!(messages.len(), 3);
370
371 assert_eq!(messages[0].id(), id1);
373 assert_eq!(messages[1].id(), id2);
374 assert_eq!(messages[2].id(), id3);
375 }
376
377 #[test]
378 fn test_control_priority() {
379 let store = InMemoryInboxStore::new();
381 let exec_id = test_execution_id();
382
383 let guidance =
385 InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low priority"));
386 store.push(&exec_id, guidance);
387
388 let control = InboxMessage::Control(ControlMessage::new(
390 exec_id.clone(),
391 ControlAction::Cancel,
392 "admin",
393 ));
394 let control_id = control.id().to_string();
395 store.push(&exec_id, control);
396
397 let messages = store.drain_messages(&exec_id);
399 assert_eq!(messages.len(), 2);
400 assert!(messages[0].is_control());
401 assert_eq!(messages[0].id(), control_id);
402 assert!(matches!(messages[1], InboxMessage::Guidance(_)));
403 }
404
405 #[test]
406 fn test_inbox_scoped_to_execution() {
407 let store = InMemoryInboxStore::new();
409 let exec_id_1 = test_execution_id();
410 let exec_id_2 = test_execution_id();
411
412 let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(
414 exec_id_1.clone(),
415 "message for exec1",
416 ));
417 let msg_id = guidance.id().to_string();
418 store.push(&exec_id_1, guidance);
419
420 let messages_2 = store.drain_messages(&exec_id_2);
422 assert!(messages_2.is_empty(), "exec_id_2 should have no messages");
423
424 let messages_1 = store.drain_messages(&exec_id_1);
426 assert_eq!(messages_1.len(), 1);
427 assert_eq!(messages_1[0].id(), msg_id);
428 }
429
430 #[test]
431 fn test_pause_resume_execution() {
432 let store = InMemoryInboxStore::new();
434 let exec_id = test_execution_id();
435
436 let pause = InboxMessage::Control(ControlMessage::new(
438 exec_id.clone(),
439 ControlAction::Pause,
440 "admin",
441 ));
442 store.push(&exec_id, pause);
443
444 assert!(
446 store.has_control_messages(&exec_id),
447 "should have control messages after push"
448 );
449
450 let messages = store.drain_messages(&exec_id);
452 assert_eq!(messages.len(), 1);
453 if let InboxMessage::Control(ctrl) = &messages[0] {
454 assert_eq!(ctrl.action, ControlAction::Pause);
455 } else {
456 panic!("expected Control message");
457 }
458
459 let resume = InboxMessage::Control(ControlMessage::new(
461 exec_id.clone(),
462 ControlAction::Resume,
463 "admin",
464 ));
465 store.push(&exec_id, resume);
466
467 let messages = store.drain_messages(&exec_id);
469 assert_eq!(messages.len(), 1);
470 if let InboxMessage::Control(ctrl) = &messages[0] {
471 assert_eq!(ctrl.action, ControlAction::Resume);
472 } else {
473 panic!("expected Control message");
474 }
475 }
476
477 #[test]
478 fn test_cancel_long_running() {
479 let store = InMemoryInboxStore::new();
481 let exec_id = test_execution_id();
482
483 let cancel_reason = "Execution timed out after 30 minutes";
485 let cancel = InboxMessage::Control(
486 ControlMessage::new(exec_id.clone(), ControlAction::Cancel, "system")
487 .with_reason(cancel_reason),
488 );
489 store.push(&exec_id, cancel);
490
491 let messages = store.drain_messages(&exec_id);
493 assert_eq!(messages.len(), 1);
494
495 assert!(messages[0].is_control(), "should be a Control message");
497
498 if let InboxMessage::Control(ctrl) = &messages[0] {
500 assert_eq!(ctrl.action, ControlAction::Cancel);
501 assert_eq!(
502 ctrl.reason.as_deref(),
503 Some(cancel_reason),
504 "reason should be preserved"
505 );
506 assert_eq!(ctrl.actor, "system");
507 } else {
508 panic!("expected Control message");
509 }
510 }
511
512 #[test]
513 fn test_approval_flow_hitl() {
514 use crate::inbox::message::GuidancePriority;
518
519 let store = InMemoryInboxStore::new();
520 let exec_id = test_execution_id();
521
522 let approval_msg = GuidanceMessage::from_user(
524 exec_id.clone(),
525 "PLAN_APPROVED: User approved the proposed plan. Proceed with execution.",
526 )
527 .with_priority(GuidancePriority::High);
528 let approval_id = approval_msg.id.clone();
529 store.push(&exec_id, InboxMessage::Guidance(approval_msg));
530
531 assert_eq!(store.len(&exec_id), 1);
533 assert!(
534 !store.has_control_messages(&exec_id),
535 "guidance is not a control message"
536 );
537
538 let messages = store.drain_messages(&exec_id);
540 assert_eq!(messages.len(), 1);
541
542 if let InboxMessage::Guidance(g) = &messages[0] {
543 assert_eq!(g.id, approval_id);
544 assert!(g.content.contains("PLAN_APPROVED"));
545 assert_eq!(g.priority, GuidancePriority::High);
546 assert_eq!(g.from, crate::inbox::message::GuidanceSource::User);
547 } else {
548 panic!("expected Guidance message for approval");
549 }
550
551 let rejection_msg = GuidanceMessage::from_user(
553 exec_id.clone(),
554 "PLAN_REJECTED: User rejected the plan. Reason: Need more details on approach.",
555 )
556 .with_priority(GuidancePriority::High);
557 let rejection_id = rejection_msg.id.clone();
558 store.push(&exec_id, InboxMessage::Guidance(rejection_msg));
559
560 let messages = store.drain_messages(&exec_id);
561 assert_eq!(messages.len(), 1);
562
563 if let InboxMessage::Guidance(g) = &messages[0] {
564 assert_eq!(g.id, rejection_id);
565 assert!(g.content.contains("PLAN_REJECTED"));
566 assert_eq!(g.priority, GuidancePriority::High);
567 } else {
568 panic!("expected Guidance message for rejection");
569 }
570 }
571
572 #[test]
573 fn test_hitl_priority_ordering() {
574 use crate::inbox::message::GuidancePriority;
577
578 let store = InMemoryInboxStore::new();
579 let exec_id = test_execution_id();
580
581 let normal_guidance = InboxMessage::Guidance(GuidanceMessage::from_user(
584 exec_id.clone(),
585 "normal guidance",
586 ));
587
588 let hitl_approval = InboxMessage::Guidance(
590 GuidanceMessage::from_user(exec_id.clone(), "PLAN_APPROVED")
591 .with_priority(GuidancePriority::High),
592 );
593
594 let control = InboxMessage::Control(ControlMessage::new(
596 exec_id.clone(),
597 ControlAction::Pause,
598 "admin",
599 ));
600
601 store.push(&exec_id, normal_guidance);
603 store.push(&exec_id, hitl_approval);
604 store.push(&exec_id, control);
605
606 let messages = store.drain_messages(&exec_id);
608 assert_eq!(messages.len(), 3);
609
610 assert!(messages[0].is_control(), "control message should be first");
612
613 if let InboxMessage::Guidance(g) = &messages[1] {
615 assert!(
616 g.content.contains("PLAN_APPROVED"),
617 "high priority HITL should be second"
618 );
619 assert_eq!(g.priority, GuidancePriority::High);
620 } else {
621 panic!("expected high priority guidance second");
622 }
623
624 if let InboxMessage::Guidance(g) = &messages[2] {
626 assert!(
627 g.content.contains("normal guidance"),
628 "normal guidance should be last"
629 );
630 assert_eq!(g.priority, GuidancePriority::Medium);
631 } else {
632 panic!("expected normal guidance last");
633 }
634 }
635}