forge_orchestration/controlplane/
watch.rs1use std::collections::HashMap;
9use std::sync::Arc;
10use parking_lot::RwLock;
11use tokio::sync::broadcast;
12use serde::{Deserialize, Serialize};
13
14use super::ResourceKind;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
18pub struct ResourceVersion(pub u64);
19
20impl ResourceVersion {
21 pub fn new(version: u64) -> Self {
23 Self(version)
24 }
25
26 pub fn value(&self) -> u64 {
28 self.0
29 }
30}
31
32impl std::fmt::Display for ResourceVersion {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "{}", self.0)
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum WatchEvent {
41 Added {
43 kind: ResourceKind,
45 key: String,
47 value: serde_json::Value,
49 version: u64,
51 },
52 Modified {
54 kind: ResourceKind,
56 key: String,
58 value: serde_json::Value,
60 version: u64,
62 },
63 Deleted {
65 kind: ResourceKind,
67 key: String,
69 value: serde_json::Value,
71 version: u64,
73 },
74 Bookmark {
76 kind: ResourceKind,
78 version: u64,
80 },
81 Error {
83 message: String,
85 code: u32,
87 },
88}
89
90impl WatchEvent {
91 pub fn event_type(&self) -> &str {
93 match self {
94 WatchEvent::Added { .. } => "ADDED",
95 WatchEvent::Modified { .. } => "MODIFIED",
96 WatchEvent::Deleted { .. } => "DELETED",
97 WatchEvent::Bookmark { .. } => "BOOKMARK",
98 WatchEvent::Error { .. } => "ERROR",
99 }
100 }
101
102 pub fn version(&self) -> Option<u64> {
104 match self {
105 WatchEvent::Added { version, .. } => Some(*version),
106 WatchEvent::Modified { version, .. } => Some(*version),
107 WatchEvent::Deleted { version, .. } => Some(*version),
108 WatchEvent::Bookmark { version, .. } => Some(*version),
109 WatchEvent::Error { .. } => None,
110 }
111 }
112}
113
114pub struct WatchStream {
116 rx: broadcast::Receiver<WatchEvent>,
118 kind: ResourceKind,
120 min_version: Option<u64>,
122}
123
124impl WatchStream {
125 pub fn new(rx: broadcast::Receiver<WatchEvent>, kind: ResourceKind) -> Self {
127 Self {
128 rx,
129 kind,
130 min_version: None,
131 }
132 }
133
134 pub fn from_version(mut self, version: u64) -> Self {
136 self.min_version = Some(version);
137 self
138 }
139
140 pub async fn recv(&mut self) -> Option<WatchEvent> {
142 loop {
143 match self.rx.recv().await {
144 Ok(event) => {
145 let matches_kind = match &event {
147 WatchEvent::Added { kind, .. } => kind == &self.kind,
148 WatchEvent::Modified { kind, .. } => kind == &self.kind,
149 WatchEvent::Deleted { kind, .. } => kind == &self.kind,
150 WatchEvent::Bookmark { kind, .. } => kind == &self.kind,
151 WatchEvent::Error { .. } => true,
152 };
153
154 if !matches_kind {
155 continue;
156 }
157
158 if let Some(min_version) = self.min_version {
160 if let Some(version) = event.version() {
161 if version <= min_version {
162 continue;
163 }
164 }
165 }
166
167 return Some(event);
168 }
169 Err(broadcast::error::RecvError::Lagged(n)) => {
170 return Some(WatchEvent::Error {
172 message: format!("Watch lagged by {} events", n),
173 code: 410, });
175 }
176 Err(broadcast::error::RecvError::Closed) => {
177 return None;
178 }
179 }
180 }
181 }
182}
183
184pub struct WatchRegistry {
186 tx: broadcast::Sender<WatchEvent>,
188 watchers: RwLock<HashMap<ResourceKind, usize>>,
190}
191
192impl WatchRegistry {
193 pub fn new() -> Self {
195 let (tx, _) = broadcast::channel(1024);
196 Self {
197 tx,
198 watchers: RwLock::new(HashMap::new()),
199 }
200 }
201
202 pub fn subscribe(&self, kind: ResourceKind) -> WatchStream {
204 let rx = self.tx.subscribe();
205
206 let mut watchers = self.watchers.write();
208 *watchers.entry(kind.clone()).or_insert(0) += 1;
209
210 WatchStream::new(rx, kind)
211 }
212
213 pub fn notify(&self, event: WatchEvent) {
215 let _ = self.tx.send(event);
217 }
218
219 pub fn watcher_count(&self, kind: &ResourceKind) -> usize {
221 self.watchers.read().get(kind).copied().unwrap_or(0)
222 }
223
224 pub fn send_bookmarks(&self, version: u64) {
226 let kinds: Vec<_> = self.watchers.read().keys().cloned().collect();
227
228 for kind in kinds {
229 self.notify(WatchEvent::Bookmark {
230 kind,
231 version,
232 });
233 }
234 }
235}
236
237impl Default for WatchRegistry {
238 fn default() -> Self {
239 Self::new()
240 }
241}
242
243pub struct WatchCache<T> {
245 items: RwLock<HashMap<String, (T, u64)>>,
247 version: RwLock<u64>,
249 registry: Arc<WatchRegistry>,
251 kind: ResourceKind,
253}
254
255impl<T: Clone + Send + Sync + 'static> WatchCache<T> {
256 pub fn new(kind: ResourceKind, registry: Arc<WatchRegistry>) -> Self {
258 Self {
259 items: RwLock::new(HashMap::new()),
260 version: RwLock::new(0),
261 registry,
262 kind,
263 }
264 }
265
266 pub fn set(&self, key: impl Into<String>, value: T, version: u64) {
268 let key = key.into();
269 let mut items = self.items.write();
270 let is_new = !items.contains_key(&key);
271 items.insert(key.clone(), (value, version));
272
273 *self.version.write() = version;
274 }
275
276 pub fn get(&self, key: &str) -> Option<T> {
278 self.items.read().get(key).map(|(v, _)| v.clone())
279 }
280
281 pub fn remove(&self, key: &str) -> Option<T> {
283 self.items.write().remove(key).map(|(v, _)| v)
284 }
285
286 pub fn list(&self) -> Vec<T> {
288 self.items.read().values().map(|(v, _)| v.clone()).collect()
289 }
290
291 pub fn version(&self) -> u64 {
293 *self.version.read()
294 }
295
296 pub fn watch(&self) -> WatchStream {
298 self.registry.subscribe(self.kind.clone())
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 #[test]
307 fn test_watch_event_type() {
308 let event = WatchEvent::Added {
309 kind: ResourceKind::Workload,
310 key: "test".to_string(),
311 value: serde_json::json!({}),
312 version: 1,
313 };
314 assert_eq!(event.event_type(), "ADDED");
315 assert_eq!(event.version(), Some(1));
316 }
317
318 #[test]
319 fn test_watch_registry() {
320 let registry = WatchRegistry::new();
321
322 let _stream = registry.subscribe(ResourceKind::Workload);
323 assert_eq!(registry.watcher_count(&ResourceKind::Workload), 1);
324 }
325
326 #[tokio::test]
327 async fn test_watch_stream() {
328 let registry = Arc::new(WatchRegistry::new());
329 let mut stream = registry.subscribe(ResourceKind::Workload);
330
331 let registry_clone = registry.clone();
333 tokio::spawn(async move {
334 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
335 registry_clone.notify(WatchEvent::Added {
336 kind: ResourceKind::Workload,
337 key: "test".to_string(),
338 value: serde_json::json!({"name": "test"}),
339 version: 1,
340 });
341 });
342
343 let event = tokio::time::timeout(
344 std::time::Duration::from_millis(100),
345 stream.recv()
346 ).await.unwrap();
347
348 assert!(event.is_some());
349 assert_eq!(event.unwrap().event_type(), "ADDED");
350 }
351}