forge_orchestration/controlplane/
watch.rs

1//! Watch and subscription system for real-time updates
2//!
3//! Implements Kubernetes-style watch with:
4//! - Resource version tracking
5//! - Bookmark events
6//! - Efficient fan-out to multiple subscribers
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use parking_lot::RwLock;
11use tokio::sync::broadcast;
12use serde::{Deserialize, Serialize};
13
14use super::ResourceKind;
15
16/// Resource version for tracking changes
17#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
18pub struct ResourceVersion(pub u64);
19
20impl ResourceVersion {
21    /// Create new resource version
22    pub fn new(version: u64) -> Self {
23        Self(version)
24    }
25
26    /// Get inner value
27    pub fn value(&self) -> u64 {
28        self.0
29    }
30}
31
32impl std::fmt::Display for ResourceVersion {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "{}", self.0)
35    }
36}
37
38/// Watch event types
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum WatchEvent {
41    /// Resource was added
42    Added {
43        /// Resource kind
44        kind: ResourceKind,
45        /// Resource key
46        key: String,
47        /// Resource value
48        value: serde_json::Value,
49        /// Resource version
50        version: u64,
51    },
52    /// Resource was modified
53    Modified {
54        /// Resource kind
55        kind: ResourceKind,
56        /// Resource key
57        key: String,
58        /// Resource value
59        value: serde_json::Value,
60        /// Resource version
61        version: u64,
62    },
63    /// Resource was deleted
64    Deleted {
65        /// Resource kind
66        kind: ResourceKind,
67        /// Resource key
68        key: String,
69        /// Last known value
70        value: serde_json::Value,
71        /// Resource version
72        version: u64,
73    },
74    /// Bookmark event (periodic version update)
75    Bookmark {
76        /// Resource kind
77        kind: ResourceKind,
78        /// Current resource version
79        version: u64,
80    },
81    /// Error event
82    Error {
83        /// Error message
84        message: String,
85        /// Error code
86        code: u32,
87    },
88}
89
90impl WatchEvent {
91    /// Get the event type as string
92    pub fn event_type(&self) -> &str {
93        match self {
94            WatchEvent::Added { .. } => "ADDED",
95            WatchEvent::Modified { .. } => "MODIFIED",
96            WatchEvent::Deleted { .. } => "DELETED",
97            WatchEvent::Bookmark { .. } => "BOOKMARK",
98            WatchEvent::Error { .. } => "ERROR",
99        }
100    }
101
102    /// Get the resource version
103    pub fn version(&self) -> Option<u64> {
104        match self {
105            WatchEvent::Added { version, .. } => Some(*version),
106            WatchEvent::Modified { version, .. } => Some(*version),
107            WatchEvent::Deleted { version, .. } => Some(*version),
108            WatchEvent::Bookmark { version, .. } => Some(*version),
109            WatchEvent::Error { .. } => None,
110        }
111    }
112}
113
114/// Watch stream for receiving events
115pub struct WatchStream {
116    /// Broadcast receiver
117    rx: broadcast::Receiver<WatchEvent>,
118    /// Resource kind being watched
119    kind: ResourceKind,
120    /// Minimum resource version to receive
121    min_version: Option<u64>,
122}
123
124impl WatchStream {
125    /// Create new watch stream
126    pub fn new(rx: broadcast::Receiver<WatchEvent>, kind: ResourceKind) -> Self {
127        Self {
128            rx,
129            kind,
130            min_version: None,
131        }
132    }
133
134    /// Set minimum resource version
135    pub fn from_version(mut self, version: u64) -> Self {
136        self.min_version = Some(version);
137        self
138    }
139
140    /// Receive next event
141    pub async fn recv(&mut self) -> Option<WatchEvent> {
142        loop {
143            match self.rx.recv().await {
144                Ok(event) => {
145                    // Filter by kind
146                    let matches_kind = match &event {
147                        WatchEvent::Added { kind, .. } => kind == &self.kind,
148                        WatchEvent::Modified { kind, .. } => kind == &self.kind,
149                        WatchEvent::Deleted { kind, .. } => kind == &self.kind,
150                        WatchEvent::Bookmark { kind, .. } => kind == &self.kind,
151                        WatchEvent::Error { .. } => true,
152                    };
153
154                    if !matches_kind {
155                        continue;
156                    }
157
158                    // Filter by version
159                    if let Some(min_version) = self.min_version {
160                        if let Some(version) = event.version() {
161                            if version <= min_version {
162                                continue;
163                            }
164                        }
165                    }
166
167                    return Some(event);
168                }
169                Err(broadcast::error::RecvError::Lagged(n)) => {
170                    // Subscriber fell behind, return error event
171                    return Some(WatchEvent::Error {
172                        message: format!("Watch lagged by {} events", n),
173                        code: 410, // Gone
174                    });
175                }
176                Err(broadcast::error::RecvError::Closed) => {
177                    return None;
178                }
179            }
180        }
181    }
182}
183
184/// Watch registry for managing subscriptions
185pub struct WatchRegistry {
186    /// Broadcast sender for events
187    tx: broadcast::Sender<WatchEvent>,
188    /// Active watchers by kind
189    watchers: RwLock<HashMap<ResourceKind, usize>>,
190}
191
192impl WatchRegistry {
193    /// Create new watch registry
194    pub fn new() -> Self {
195        let (tx, _) = broadcast::channel(1024);
196        Self {
197            tx,
198            watchers: RwLock::new(HashMap::new()),
199        }
200    }
201
202    /// Subscribe to events for a resource kind
203    pub fn subscribe(&self, kind: ResourceKind) -> WatchStream {
204        let rx = self.tx.subscribe();
205        
206        // Track watcher count
207        let mut watchers = self.watchers.write();
208        *watchers.entry(kind.clone()).or_insert(0) += 1;
209        
210        WatchStream::new(rx, kind)
211    }
212
213    /// Notify all watchers of an event
214    pub fn notify(&self, event: WatchEvent) {
215        // Ignore send errors (no receivers)
216        let _ = self.tx.send(event);
217    }
218
219    /// Get number of watchers for a kind
220    pub fn watcher_count(&self, kind: &ResourceKind) -> usize {
221        self.watchers.read().get(kind).copied().unwrap_or(0)
222    }
223
224    /// Send bookmark events to all watchers
225    pub fn send_bookmarks(&self, version: u64) {
226        let kinds: Vec<_> = self.watchers.read().keys().cloned().collect();
227        
228        for kind in kinds {
229            self.notify(WatchEvent::Bookmark {
230                kind,
231                version,
232            });
233        }
234    }
235}
236
237impl Default for WatchRegistry {
238    fn default() -> Self {
239        Self::new()
240    }
241}
242
243/// Watch cache for efficient list-then-watch
244pub struct WatchCache<T> {
245    /// Cached items
246    items: RwLock<HashMap<String, (T, u64)>>,
247    /// Current resource version
248    version: RwLock<u64>,
249    /// Watch registry
250    registry: Arc<WatchRegistry>,
251    /// Resource kind
252    kind: ResourceKind,
253}
254
255impl<T: Clone + Send + Sync + 'static> WatchCache<T> {
256    /// Create new watch cache
257    pub fn new(kind: ResourceKind, registry: Arc<WatchRegistry>) -> Self {
258        Self {
259            items: RwLock::new(HashMap::new()),
260            version: RwLock::new(0),
261            registry,
262            kind,
263        }
264    }
265
266    /// Add or update item
267    pub fn set(&self, key: impl Into<String>, value: T, version: u64) {
268        let key = key.into();
269        let mut items = self.items.write();
270        let is_new = !items.contains_key(&key);
271        items.insert(key.clone(), (value, version));
272        
273        *self.version.write() = version;
274    }
275
276    /// Get item
277    pub fn get(&self, key: &str) -> Option<T> {
278        self.items.read().get(key).map(|(v, _)| v.clone())
279    }
280
281    /// Remove item
282    pub fn remove(&self, key: &str) -> Option<T> {
283        self.items.write().remove(key).map(|(v, _)| v)
284    }
285
286    /// List all items
287    pub fn list(&self) -> Vec<T> {
288        self.items.read().values().map(|(v, _)| v.clone()).collect()
289    }
290
291    /// Get current version
292    pub fn version(&self) -> u64 {
293        *self.version.read()
294    }
295
296    /// Subscribe to changes
297    pub fn watch(&self) -> WatchStream {
298        self.registry.subscribe(self.kind.clone())
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    #[test]
307    fn test_watch_event_type() {
308        let event = WatchEvent::Added {
309            kind: ResourceKind::Workload,
310            key: "test".to_string(),
311            value: serde_json::json!({}),
312            version: 1,
313        };
314        assert_eq!(event.event_type(), "ADDED");
315        assert_eq!(event.version(), Some(1));
316    }
317
318    #[test]
319    fn test_watch_registry() {
320        let registry = WatchRegistry::new();
321        
322        let _stream = registry.subscribe(ResourceKind::Workload);
323        assert_eq!(registry.watcher_count(&ResourceKind::Workload), 1);
324    }
325
326    #[tokio::test]
327    async fn test_watch_stream() {
328        let registry = Arc::new(WatchRegistry::new());
329        let mut stream = registry.subscribe(ResourceKind::Workload);
330
331        // Send event in background
332        let registry_clone = registry.clone();
333        tokio::spawn(async move {
334            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
335            registry_clone.notify(WatchEvent::Added {
336                kind: ResourceKind::Workload,
337                key: "test".to_string(),
338                value: serde_json::json!({"name": "test"}),
339                version: 1,
340            });
341        });
342
343        let event = tokio::time::timeout(
344            std::time::Duration::from_millis(100),
345            stream.recv()
346        ).await.unwrap();
347
348        assert!(event.is_some());
349        assert_eq!(event.unwrap().event_type(), "ADDED");
350    }
351}