meerkat_core/
completion_feed.rs1use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13
14use crate::ops::OperationId;
15use crate::ops_lifecycle::{OperationKind, OperationTerminalOutcome};
16
17pub type CompletionSeq = u64;
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
22pub struct CompletionEntry {
23 pub seq: CompletionSeq,
24 pub operation_id: OperationId,
25 pub kind: OperationKind,
26 pub display_name: String,
27 pub terminal_outcome: OperationTerminalOutcome,
28 pub completed_at_ms: Option<u64>,
29}
30
31#[derive(Debug, Clone)]
36pub struct CompletionBatch {
37 pub entries: Vec<CompletionEntry>,
38 pub watermark: CompletionSeq,
40}
41
42pub trait CompletionFeed: Send + Sync + std::fmt::Debug {
48 fn watermark(&self) -> CompletionSeq;
50
51 fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch;
57
58 fn wait_for_advance(
63 &self,
64 after_seq: CompletionSeq,
65 ) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>>;
66}
67
68#[derive(Debug, Clone)]
78pub struct CompletionEnrichmentData {
79 pub job_id: String,
80 pub detail: String,
81}
82
83pub trait CompletionEnrichmentProvider: Send + Sync {
89 fn enrich(&self, operation_id: &OperationId) -> Option<CompletionEnrichmentData>;
90}
91
92impl<T: CompletionFeed + ?Sized> CompletionFeed for Arc<T> {
97 fn watermark(&self) -> CompletionSeq {
98 (**self).watermark()
99 }
100
101 fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
102 (**self).list_since(after_seq)
103 }
104
105 fn wait_for_advance(
106 &self,
107 after_seq: CompletionSeq,
108 ) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
109 (**self).wait_for_advance(after_seq)
110 }
111}
112
113#[cfg(test)]
114#[allow(clippy::unwrap_used, dead_code)]
115pub(crate) mod tests {
116 use super::*;
117 use std::sync::atomic::{AtomicU64, Ordering};
118
119 #[derive(Debug)]
121 pub struct MockCompletionFeed {
122 watermark: AtomicU64,
123 entries: std::sync::Mutex<Vec<CompletionEntry>>,
124 }
125
126 impl MockCompletionFeed {
127 pub fn new() -> Self {
128 Self {
129 watermark: AtomicU64::new(0),
130 entries: std::sync::Mutex::new(Vec::new()),
131 }
132 }
133
134 pub fn with_watermark(watermark: CompletionSeq) -> Self {
136 Self {
137 watermark: AtomicU64::new(watermark),
138 entries: std::sync::Mutex::new(Vec::new()),
139 }
140 }
141
142 pub fn push(&self, entry: CompletionEntry) {
143 let mut entries = self.entries.lock().unwrap();
144 self.watermark.store(entry.seq, Ordering::Release);
145 entries.push(entry);
146 }
147 }
148
149 impl CompletionFeed for MockCompletionFeed {
150 fn watermark(&self) -> CompletionSeq {
151 self.watermark.load(Ordering::Acquire)
152 }
153
154 fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
155 let entries = self.entries.lock().unwrap();
156 CompletionBatch {
157 entries: entries
158 .iter()
159 .filter(|e| e.seq > after_seq)
160 .cloned()
161 .collect(),
162 watermark: self.watermark.load(Ordering::Acquire),
163 }
164 }
165
166 fn wait_for_advance(
167 &self,
168 _after_seq: CompletionSeq,
169 ) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
170 Box::pin(async move { self.watermark.load(Ordering::Acquire) })
171 }
172 }
173
174 #[test]
175 fn mock_feed_with_watermark_seeds_at_expected_value() {
176 let feed = MockCompletionFeed::with_watermark(42);
177 assert_eq!(feed.watermark(), 42);
178 assert!(feed.list_since(0).entries.is_empty());
179 }
180}