Skip to main content

enact_core/inbox/
store.rs

1//! Inbox Store - Storage for inbox messages
2//!
3//! ## Invariants
4//!
5//! - **INV-INBOX-002**: Control messages (pause/cancel) MUST be processed first
6//!   - Implemented via `priority_order()` sorting in `drain_messages()`
7//! - **INV-INBOX-004**: Messages are scoped to a specific ExecutionId
8//!   - All operations require ExecutionId parameter
9//!
10//! @see docs/TECHNICAL/31-MID-EXECUTION-GUIDANCE.md
11
12use super::message::InboxMessage;
13use crate::kernel::ExecutionId;
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17/// InboxStore trait - async storage for inbox messages
18///
19/// Implementations must ensure thread-safety for concurrent access.
20pub trait InboxStore: Send + Sync {
21    /// Push a message to the inbox for a specific execution
22    ///
23    /// ## Arguments
24    /// * `execution_id` - Target execution (INV-INBOX-004)
25    /// * `message` - The message to push
26    fn push(&self, execution_id: &ExecutionId, message: InboxMessage);
27
28    /// Get the number of pending messages for an execution
29    fn len(&self, execution_id: &ExecutionId) -> usize;
30
31    /// Check if there are any pending messages
32    fn is_empty(&self, execution_id: &ExecutionId) -> bool {
33        self.len(execution_id) == 0
34    }
35
36    /// Check if there are any control messages (highest priority)
37    ///
38    /// Used for fast-path cancellation/pause checks without draining.
39    fn has_control_messages(&self, execution_id: &ExecutionId) -> bool;
40
41    /// Drain all messages for an execution, sorted by priority
42    ///
43    /// ## Invariant INV-INBOX-002
44    /// Messages are returned sorted by priority_order():
45    /// 1. Control (pause/resume/cancel) - highest
46    /// 2. Evidence (contradicts_plan)
47    /// 3. Evidence (other)
48    /// 4. Guidance (high priority)
49    /// 5. Guidance (other)
50    /// 6. A2A - lowest
51    fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage>;
52
53    /// Peek at the next message without removing it
54    fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
55
56    /// Pop the highest-priority message
57    fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;
58
59    /// Clear all messages for an execution
60    fn clear(&self, execution_id: &ExecutionId);
61}
62
63/// In-memory inbox store implementation
64///
65/// Thread-safe storage using RwLock. Suitable for single-node deployments.
66/// For distributed deployments, use Redis-backed implementation.
67#[derive(Default)]
68pub struct InMemoryInboxStore {
69    /// Messages keyed by ExecutionId
70    messages: RwLock<HashMap<String, Vec<InboxMessage>>>,
71}
72
73impl InMemoryInboxStore {
74    /// Create a new empty inbox store
75    pub fn new() -> Self {
76        Self {
77            messages: RwLock::new(HashMap::new()),
78        }
79    }
80
81    /// Create an Arc-wrapped instance for sharing
82    pub fn shared() -> Arc<Self> {
83        Arc::new(Self::new())
84    }
85}
86
87impl InboxStore for InMemoryInboxStore {
88    fn push(&self, execution_id: &ExecutionId, message: InboxMessage) {
89        let mut guard = self.messages.write().expect("lock poisoned");
90        guard
91            .entry(execution_id.to_string())
92            .or_default()
93            .push(message);
94    }
95
96    fn len(&self, execution_id: &ExecutionId) -> usize {
97        let guard = self.messages.read().expect("lock poisoned");
98        guard
99            .get(&execution_id.to_string())
100            .map(|v| v.len())
101            .unwrap_or(0)
102    }
103
104    fn has_control_messages(&self, execution_id: &ExecutionId) -> bool {
105        let guard = self.messages.read().expect("lock poisoned");
106        guard
107            .get(&execution_id.to_string())
108            .map(|v| v.iter().any(|m| m.is_control()))
109            .unwrap_or(false)
110    }
111
112    fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage> {
113        let mut guard = self.messages.write().expect("lock poisoned");
114        let mut messages = guard.remove(&execution_id.to_string()).unwrap_or_default();
115
116        // INV-INBOX-002: Sort by priority (control messages first)
117        messages.sort_by_key(|m| m.priority_order());
118
119        messages
120    }
121
122    fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
123        let guard = self.messages.read().expect("lock poisoned");
124        guard.get(&execution_id.to_string()).and_then(|v| {
125            // Return highest priority message
126            v.iter().min_by_key(|m| m.priority_order()).cloned()
127        })
128    }
129
130    fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
131        let mut guard = self.messages.write().expect("lock poisoned");
132        let messages = guard.get_mut(&execution_id.to_string())?;
133
134        if messages.is_empty() {
135            return None;
136        }
137
138        // Find index of highest priority message (INV-INBOX-002)
139        let min_idx = messages
140            .iter()
141            .enumerate()
142            .min_by_key(|(_, m)| m.priority_order())
143            .map(|(i, _)| i)?;
144
145        Some(messages.remove(min_idx))
146    }
147
148    fn clear(&self, execution_id: &ExecutionId) {
149        let mut guard = self.messages.write().expect("lock poisoned");
150        guard.remove(&execution_id.to_string());
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use crate::inbox::message::{
158        ControlAction, ControlMessage, EvidenceImpact, EvidenceSource, EvidenceUpdate,
159        GuidanceMessage,
160    };
161
162    fn test_execution_id() -> ExecutionId {
163        ExecutionId::new()
164    }
165
166    #[test]
167    fn test_push_and_drain() {
168        let store = InMemoryInboxStore::new();
169        let exec_id = test_execution_id();
170
171        // Push messages
172        let guidance =
173            InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "Focus on EU"));
174        store.push(&exec_id, guidance);
175
176        assert_eq!(store.len(&exec_id), 1);
177        assert!(!store.is_empty(&exec_id));
178
179        // Drain
180        let messages = store.drain_messages(&exec_id);
181        assert_eq!(messages.len(), 1);
182        assert!(store.is_empty(&exec_id));
183    }
184
185    #[test]
186    fn test_priority_sorting_inv_inbox_002() {
187        let store = InMemoryInboxStore::new();
188        let exec_id = test_execution_id();
189
190        // Push in reverse priority order
191        let guidance =
192            InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low priority"));
193        let evidence = InboxMessage::Evidence(EvidenceUpdate::new(
194            exec_id.clone(),
195            EvidenceSource::Discovery,
196            "Found something",
197            serde_json::json!({}),
198            EvidenceImpact::Informational,
199        ));
200        let control = InboxMessage::Control(ControlMessage::new(
201            exec_id.clone(),
202            ControlAction::Pause,
203            "admin",
204        ));
205
206        store.push(&exec_id, guidance);
207        store.push(&exec_id, evidence);
208        store.push(&exec_id, control);
209
210        // Drain should return sorted by priority
211        let messages = store.drain_messages(&exec_id);
212        assert_eq!(messages.len(), 3);
213
214        // INV-INBOX-002: Control first
215        assert!(messages[0].is_control());
216        assert!(matches!(messages[1], InboxMessage::Evidence(_)));
217        assert!(matches!(messages[2], InboxMessage::Guidance(_)));
218    }
219
220    #[test]
221    fn test_has_control_messages() {
222        let store = InMemoryInboxStore::new();
223        let exec_id = test_execution_id();
224
225        // No messages
226        assert!(!store.has_control_messages(&exec_id));
227
228        // Add guidance only
229        let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "test"));
230        store.push(&exec_id, guidance);
231        assert!(!store.has_control_messages(&exec_id));
232
233        // Add control message
234        let control = InboxMessage::Control(ControlMessage::new(
235            exec_id.clone(),
236            ControlAction::Cancel,
237            "admin",
238        ));
239        store.push(&exec_id, control);
240        assert!(store.has_control_messages(&exec_id));
241    }
242
243    #[test]
244    fn test_pop_returns_highest_priority() {
245        let store = InMemoryInboxStore::new();
246        let exec_id = test_execution_id();
247
248        // Push low priority first, then high priority
249        let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low"));
250        let control = InboxMessage::Control(ControlMessage::new(
251            exec_id.clone(),
252            ControlAction::Pause,
253            "admin",
254        ));
255
256        store.push(&exec_id, guidance);
257        store.push(&exec_id, control);
258
259        // Pop should return control first (highest priority)
260        let msg = store.pop(&exec_id).unwrap();
261        assert!(msg.is_control());
262
263        // Next pop returns guidance
264        let msg = store.pop(&exec_id).unwrap();
265        assert!(matches!(msg, InboxMessage::Guidance(_)));
266
267        // Empty now
268        assert!(store.pop(&exec_id).is_none());
269    }
270
271    #[test]
272    fn test_peek_does_not_remove() {
273        let store = InMemoryInboxStore::new();
274        let exec_id = test_execution_id();
275
276        let control = InboxMessage::Control(ControlMessage::new(
277            exec_id.clone(),
278            ControlAction::Pause,
279            "admin",
280        ));
281        store.push(&exec_id, control);
282
283        // Peek multiple times - message remains
284        let msg1 = store.peek(&exec_id);
285        let msg2 = store.peek(&exec_id);
286
287        assert!(msg1.is_some());
288        assert!(msg2.is_some());
289        assert_eq!(store.len(&exec_id), 1);
290    }
291
292    #[test]
293    fn test_execution_isolation_inv_inbox_004() {
294        let store = InMemoryInboxStore::new();
295        let exec_id_1 = test_execution_id();
296        let exec_id_2 = test_execution_id();
297
298        // Push to different executions
299        let control_1 = InboxMessage::Control(ControlMessage::new(
300            exec_id_1.clone(),
301            ControlAction::Pause,
302            "admin",
303        ));
304        let control_2 = InboxMessage::Control(ControlMessage::new(
305            exec_id_2.clone(),
306            ControlAction::Cancel,
307            "admin",
308        ));
309
310        store.push(&exec_id_1, control_1);
311        store.push(&exec_id_2, control_2);
312
313        // Each execution has its own messages
314        assert_eq!(store.len(&exec_id_1), 1);
315        assert_eq!(store.len(&exec_id_2), 1);
316
317        // Drain exec_1 doesn't affect exec_2
318        let msgs = store.drain_messages(&exec_id_1);
319        assert_eq!(msgs.len(), 1);
320        assert_eq!(store.len(&exec_id_1), 0);
321        assert_eq!(store.len(&exec_id_2), 1);
322    }
323
324    #[test]
325    fn test_clear() {
326        let store = InMemoryInboxStore::new();
327        let exec_id = test_execution_id();
328
329        for _ in 0..5 {
330            let control = InboxMessage::Control(ControlMessage::new(
331                exec_id.clone(),
332                ControlAction::Pause,
333                "admin",
334            ));
335            store.push(&exec_id, control);
336        }
337
338        assert_eq!(store.len(&exec_id), 5);
339
340        store.clear(&exec_id);
341
342        assert_eq!(store.len(&exec_id), 0);
343        assert!(store.is_empty(&exec_id));
344    }
345
346    #[test]
347    fn test_inbox_drain_ordering() {
348        // Verify messages are drained in FIFO order within the same priority
349        let store = InMemoryInboxStore::new();
350        let exec_id = test_execution_id();
351
352        // Push 3 guidance messages in order: msg1, msg2, msg3
353        // All have the same priority so FIFO should be preserved
354        let msg1 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "first"));
355        let msg2 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "second"));
356        let msg3 = InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "third"));
357
358        // Store IDs to verify order
359        let id1 = msg1.id().to_string();
360        let id2 = msg2.id().to_string();
361        let id3 = msg3.id().to_string();
362
363        store.push(&exec_id, msg1);
364        store.push(&exec_id, msg2);
365        store.push(&exec_id, msg3);
366
367        // Drain and verify they come out in same order (FIFO within priority)
368        let messages = store.drain_messages(&exec_id);
369        assert_eq!(messages.len(), 3);
370
371        // Since stable sort is used and all have same priority, FIFO is preserved
372        assert_eq!(messages[0].id(), id1);
373        assert_eq!(messages[1].id(), id2);
374        assert_eq!(messages[2].id(), id3);
375    }
376
377    #[test]
378    fn test_control_priority() {
379        // Verify control messages (Cancel, Pause) are processed before other messages
380        let store = InMemoryInboxStore::new();
381        let exec_id = test_execution_id();
382
383        // Push a guidance message first
384        let guidance =
385            InboxMessage::Guidance(GuidanceMessage::from_user(exec_id.clone(), "low priority"));
386        store.push(&exec_id, guidance);
387
388        // Push a control message second
389        let control = InboxMessage::Control(ControlMessage::new(
390            exec_id.clone(),
391            ControlAction::Cancel,
392            "admin",
393        ));
394        let control_id = control.id().to_string();
395        store.push(&exec_id, control);
396
397        // Drain - control should come first despite being pushed second
398        let messages = store.drain_messages(&exec_id);
399        assert_eq!(messages.len(), 2);
400        assert!(messages[0].is_control());
401        assert_eq!(messages[0].id(), control_id);
402        assert!(matches!(messages[1], InboxMessage::Guidance(_)));
403    }
404
405    #[test]
406    fn test_inbox_scoped_to_execution() {
407        // Verify messages for one execution don't leak to another (INV-INBOX-004)
408        let store = InMemoryInboxStore::new();
409        let exec_id_1 = test_execution_id();
410        let exec_id_2 = test_execution_id();
411
412        // Push message to exec_id_1 only
413        let guidance = InboxMessage::Guidance(GuidanceMessage::from_user(
414            exec_id_1.clone(),
415            "message for exec1",
416        ));
417        let msg_id = guidance.id().to_string();
418        store.push(&exec_id_1, guidance);
419
420        // Drain exec_id_2 - should be empty
421        let messages_2 = store.drain_messages(&exec_id_2);
422        assert!(messages_2.is_empty(), "exec_id_2 should have no messages");
423
424        // Drain exec_id_1 - should have the message
425        let messages_1 = store.drain_messages(&exec_id_1);
426        assert_eq!(messages_1.len(), 1);
427        assert_eq!(messages_1[0].id(), msg_id);
428    }
429
430    #[test]
431    fn test_pause_resume_execution() {
432        // Test pause/resume control flow
433        let store = InMemoryInboxStore::new();
434        let exec_id = test_execution_id();
435
436        // Push pause control message
437        let pause = InboxMessage::Control(ControlMessage::new(
438            exec_id.clone(),
439            ControlAction::Pause,
440            "admin",
441        ));
442        store.push(&exec_id, pause);
443
444        // Verify has_control_messages returns true
445        assert!(
446            store.has_control_messages(&exec_id),
447            "should have control messages after push"
448        );
449
450        // Drain and verify it's a pause
451        let messages = store.drain_messages(&exec_id);
452        assert_eq!(messages.len(), 1);
453        if let InboxMessage::Control(ctrl) = &messages[0] {
454            assert_eq!(ctrl.action, ControlAction::Pause);
455        } else {
456            panic!("expected Control message");
457        }
458
459        // Push resume message
460        let resume = InboxMessage::Control(ControlMessage::new(
461            exec_id.clone(),
462            ControlAction::Resume,
463            "admin",
464        ));
465        store.push(&exec_id, resume);
466
467        // Drain and verify it's a resume
468        let messages = store.drain_messages(&exec_id);
469        assert_eq!(messages.len(), 1);
470        if let InboxMessage::Control(ctrl) = &messages[0] {
471            assert_eq!(ctrl.action, ControlAction::Resume);
472        } else {
473            panic!("expected Control message");
474        }
475    }
476
477    #[test]
478    fn test_cancel_long_running() {
479        // Test cancel control message with reason
480        let store = InMemoryInboxStore::new();
481        let exec_id = test_execution_id();
482
483        // Push cancel control message with reason
484        let cancel_reason = "Execution timed out after 30 minutes";
485        let cancel = InboxMessage::Control(
486            ControlMessage::new(exec_id.clone(), ControlAction::Cancel, "system")
487                .with_reason(cancel_reason),
488        );
489        store.push(&exec_id, cancel);
490
491        // Drain and verify
492        let messages = store.drain_messages(&exec_id);
493        assert_eq!(messages.len(), 1);
494
495        // Verify it's a Control message
496        assert!(messages[0].is_control(), "should be a Control message");
497
498        // Verify action is Cancel and reason is preserved
499        if let InboxMessage::Control(ctrl) = &messages[0] {
500            assert_eq!(ctrl.action, ControlAction::Cancel);
501            assert_eq!(
502                ctrl.reason.as_deref(),
503                Some(cancel_reason),
504                "reason should be preserved"
505            );
506            assert_eq!(ctrl.actor, "system");
507        } else {
508            panic!("expected Control message");
509        }
510    }
511
512    #[test]
513    fn test_approval_flow_hitl() {
514        // Test Human-in-the-Loop plan approval/rejection flow
515        // This verifies the HITL integration where ApprovePlan/RejectPlan
516        // sends guidance messages to the inbox with high priority
517        use crate::inbox::message::GuidancePriority;
518
519        let store = InMemoryInboxStore::new();
520        let exec_id = test_execution_id();
521
522        // Simulate ApprovePlan: push PLAN_APPROVED guidance with high priority
523        let approval_msg = GuidanceMessage::from_user(
524            exec_id.clone(),
525            "PLAN_APPROVED: User approved the proposed plan. Proceed with execution.",
526        )
527        .with_priority(GuidancePriority::High);
528        let approval_id = approval_msg.id.clone();
529        store.push(&exec_id, InboxMessage::Guidance(approval_msg));
530
531        // Verify message is in inbox
532        assert_eq!(store.len(&exec_id), 1);
533        assert!(
534            !store.has_control_messages(&exec_id),
535            "guidance is not a control message"
536        );
537
538        // Drain and verify approval message content
539        let messages = store.drain_messages(&exec_id);
540        assert_eq!(messages.len(), 1);
541
542        if let InboxMessage::Guidance(g) = &messages[0] {
543            assert_eq!(g.id, approval_id);
544            assert!(g.content.contains("PLAN_APPROVED"));
545            assert_eq!(g.priority, GuidancePriority::High);
546            assert_eq!(g.from, crate::inbox::message::GuidanceSource::User);
547        } else {
548            panic!("expected Guidance message for approval");
549        }
550
551        // Test rejection flow
552        let rejection_msg = GuidanceMessage::from_user(
553            exec_id.clone(),
554            "PLAN_REJECTED: User rejected the plan. Reason: Need more details on approach.",
555        )
556        .with_priority(GuidancePriority::High);
557        let rejection_id = rejection_msg.id.clone();
558        store.push(&exec_id, InboxMessage::Guidance(rejection_msg));
559
560        let messages = store.drain_messages(&exec_id);
561        assert_eq!(messages.len(), 1);
562
563        if let InboxMessage::Guidance(g) = &messages[0] {
564            assert_eq!(g.id, rejection_id);
565            assert!(g.content.contains("PLAN_REJECTED"));
566            assert_eq!(g.priority, GuidancePriority::High);
567        } else {
568            panic!("expected Guidance message for rejection");
569        }
570    }
571
572    #[test]
573    fn test_hitl_priority_ordering() {
574        // Verify high-priority HITL guidance is processed before normal guidance
575        // but after control messages (INV-INBOX-002)
576        use crate::inbox::message::GuidancePriority;
577
578        let store = InMemoryInboxStore::new();
579        let exec_id = test_execution_id();
580
581        // Push messages in reverse priority order
582        // 1. Normal guidance (lowest)
583        let normal_guidance = InboxMessage::Guidance(GuidanceMessage::from_user(
584            exec_id.clone(),
585            "normal guidance",
586        ));
587
588        // 2. High priority HITL approval (higher)
589        let hitl_approval = InboxMessage::Guidance(
590            GuidanceMessage::from_user(exec_id.clone(), "PLAN_APPROVED")
591                .with_priority(GuidancePriority::High),
592        );
593
594        // 3. Control message (highest)
595        let control = InboxMessage::Control(ControlMessage::new(
596            exec_id.clone(),
597            ControlAction::Pause,
598            "admin",
599        ));
600
601        // Push in reverse order
602        store.push(&exec_id, normal_guidance);
603        store.push(&exec_id, hitl_approval);
604        store.push(&exec_id, control);
605
606        // Drain - should be sorted by priority
607        let messages = store.drain_messages(&exec_id);
608        assert_eq!(messages.len(), 3);
609
610        // INV-INBOX-002: Control first
611        assert!(messages[0].is_control(), "control message should be first");
612
613        // High priority guidance second
614        if let InboxMessage::Guidance(g) = &messages[1] {
615            assert!(
616                g.content.contains("PLAN_APPROVED"),
617                "high priority HITL should be second"
618            );
619            assert_eq!(g.priority, GuidancePriority::High);
620        } else {
621            panic!("expected high priority guidance second");
622        }
623
624        // Normal guidance last
625        if let InboxMessage::Guidance(g) = &messages[2] {
626            assert!(
627                g.content.contains("normal guidance"),
628                "normal guidance should be last"
629            );
630            assert_eq!(g.priority, GuidancePriority::Medium);
631        } else {
632            panic!("expected normal guidance last");
633        }
634    }
635}