Skip to main content

meerkat_core/
completion_feed.rs

1//! Canonical completion delivery seam for async operation terminals.
2//!
3//! The [`CompletionFeed`] trait provides a monotonic, append-only read handle
4//! over operation terminal events. The runtime registry is the sole writer;
5//! consumers (agent boundary, idle wake) are read-only.
6//!
7//! This replaces the prior parallel-truth paths (shell job projection,
8//! detached wake booleans, poll_external_updates background_completions).
9
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13
14use crate::ops::OperationId;
15use crate::ops_lifecycle::{OperationKind, OperationTerminalOutcome};
16
17/// Monotonic sequence number assigned to each completion event.
18pub type CompletionSeq = u64;
19
20/// A single completion event in the feed.
21#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
22pub struct CompletionEntry {
23    pub seq: CompletionSeq,
24    pub operation_id: OperationId,
25    pub kind: OperationKind,
26    pub display_name: String,
27    pub terminal_outcome: OperationTerminalOutcome,
28    pub completed_at_ms: Option<u64>,
29}
30
31/// A batch of completion entries with an atomically-captured watermark.
32///
33/// The `watermark` is captured at the same instant as the entry snapshot,
34/// ensuring no completions land between the read and the watermark check.
35#[derive(Debug, Clone)]
36pub struct CompletionBatch {
37    pub entries: Vec<CompletionEntry>,
38    /// Feed watermark at the time entries were read.
39    pub watermark: CompletionSeq,
40}
41
42/// Read-only handle to the canonical completion feed.
43///
44/// Consumers call [`list_since`](CompletionFeed::list_since) to drain entries
45/// past a local cursor, and [`wait_for_advance`](CompletionFeed::wait_for_advance)
46/// to block until the watermark advances past a given sequence.
47pub trait CompletionFeed: Send + Sync + std::fmt::Debug {
48    /// Current feed watermark (highest seq written).
49    fn watermark(&self) -> CompletionSeq;
50
51    /// Return all entries with `seq > after_seq`, plus the current watermark.
52    ///
53    /// The watermark in the returned batch is captured atomically with the
54    /// entry snapshot — no entry can land between the snapshot and the
55    /// watermark read.
56    fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch;
57
58    /// Block until the feed watermark advances past `after_seq`.
59    ///
60    /// Returns the new watermark. If the watermark is already past `after_seq`,
61    /// returns immediately.
62    fn wait_for_advance(
63        &self,
64        after_seq: CompletionSeq,
65    ) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>>;
66}
67
68// ---------------------------------------------------------------------------
69// Enrichment provider for agent-boundary completion projection
70// ---------------------------------------------------------------------------
71
72/// Shell-level enrichment for a completed operation.
73///
74/// Carries display details (job ID, detail string) that the agent boundary
75/// needs for `BackgroundJobCompleted` events. These come from the shell
76/// `JobManager`, not from the feed itself.
77#[derive(Debug, Clone)]
78pub struct CompletionEnrichmentData {
79    pub job_id: String,
80    pub detail: String,
81}
82
83/// Provider of shell-level enrichment for completed operations.
84///
85/// The shell `JobManager` implements this trait. The agent boundary
86/// calls [`enrich`](CompletionEnrichmentProvider::enrich) to look up
87/// display details by operation ID.
88pub trait CompletionEnrichmentProvider: Send + Sync {
89    fn enrich(&self, operation_id: &OperationId) -> Option<CompletionEnrichmentData>;
90}
91
92// ---------------------------------------------------------------------------
93// Arc delegation
94// ---------------------------------------------------------------------------
95
96impl<T: CompletionFeed + ?Sized> CompletionFeed for Arc<T> {
97    fn watermark(&self) -> CompletionSeq {
98        (**self).watermark()
99    }
100
101    fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
102        (**self).list_since(after_seq)
103    }
104
105    fn wait_for_advance(
106        &self,
107        after_seq: CompletionSeq,
108    ) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
109        (**self).wait_for_advance(after_seq)
110    }
111}
112
113#[cfg(test)]
114#[allow(clippy::unwrap_used, dead_code)]
115pub(crate) mod tests {
116    use super::*;
117    use std::sync::atomic::{AtomicU64, Ordering};
118
119    /// Simple in-memory mock for testing feed-dependent code paths.
120    #[derive(Debug)]
121    pub struct MockCompletionFeed {
122        watermark: AtomicU64,
123        entries: std::sync::Mutex<Vec<CompletionEntry>>,
124    }
125
126    impl MockCompletionFeed {
127        pub fn new() -> Self {
128            Self {
129                watermark: AtomicU64::new(0),
130                entries: std::sync::Mutex::new(Vec::new()),
131            }
132        }
133
134        /// Create a feed pre-seeded at a given watermark (simulates prior activity).
135        pub fn with_watermark(watermark: CompletionSeq) -> Self {
136            Self {
137                watermark: AtomicU64::new(watermark),
138                entries: std::sync::Mutex::new(Vec::new()),
139            }
140        }
141
142        pub fn push(&self, entry: CompletionEntry) {
143            let mut entries = self.entries.lock().unwrap();
144            self.watermark.store(entry.seq, Ordering::Release);
145            entries.push(entry);
146        }
147    }
148
149    impl CompletionFeed for MockCompletionFeed {
150        fn watermark(&self) -> CompletionSeq {
151            self.watermark.load(Ordering::Acquire)
152        }
153
154        fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
155            let entries = self.entries.lock().unwrap();
156            CompletionBatch {
157                entries: entries
158                    .iter()
159                    .filter(|e| e.seq > after_seq)
160                    .cloned()
161                    .collect(),
162                watermark: self.watermark.load(Ordering::Acquire),
163            }
164        }
165
166        fn wait_for_advance(
167            &self,
168            _after_seq: CompletionSeq,
169        ) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
170            Box::pin(async move { self.watermark.load(Ordering::Acquire) })
171        }
172    }
173
174    #[test]
175    fn mock_feed_with_watermark_seeds_at_expected_value() {
176        let feed = MockCompletionFeed::with_watermark(42);
177        assert_eq!(feed.watermark(), 42);
178        assert!(feed.list_since(0).entries.is_empty());
179    }
180}