1use crate::{ResourceMap, Snapshot};
2use bytes::Bytes;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::watch;
6use xxhash_rust::xxh3::xxh3_64;
7
8pub struct Update {
9 pub resources: Vec<(String, Bytes)>,
10 pub removed: Vec<String>,
11}
12
13impl Update {
14 pub fn is_empty(&self) -> bool {
15 self.resources.is_empty() && self.removed.is_empty()
16 }
17}
18
19pub struct Server {
20 snapshot: Arc<Snapshot>,
21}
22
23impl Server {
24 pub fn new(snapshot: Arc<Snapshot>) -> Self {
25 Self { snapshot }
26 }
27
28 pub fn subscribe(&self, watch: watch::Receiver<()>) -> Subscriber {
29 Subscriber {
30 snapshot: Arc::clone(&self.snapshot),
31 watch,
32 acked: HashMap::new(),
33 }
34 }
35}
36
37pub struct Subscriber {
38 snapshot: Arc<Snapshot>,
39 watch: watch::Receiver<()>,
40 acked: HashMap<String, u64>,
41}
42
43impl Subscriber {
44 pub fn initial(&mut self) -> Update {
47 let _ = self.watch.borrow_and_update();
48 let guard = self.snapshot.load();
49 self.diff(&guard)
50 }
51
52 pub async fn changed(&mut self) -> Option<Update> {
53 self.watch.changed().await.ok()?;
54 let guard = self.snapshot.load();
55 Some(self.diff(&guard))
56 }
57
58 pub fn acknowledge(&mut self, name: &str, version: u64) {
59 self.acked.insert(name.to_owned(), version);
60 }
61
62 pub fn acknowledge_removed(&mut self, name: &str) {
63 self.acked.remove(name);
64 }
65
66 fn diff(&self, map: &ResourceMap) -> Update {
67 let mut resources = Vec::new();
68 for (name, bytes) in map {
69 let hash = xxh3_64(bytes);
70 if self.acked.get(name.as_str()) != Some(&hash) {
71 resources.push((name.clone(), bytes.clone()));
72 }
73 }
74 let removed = self
75 .acked
76 .keys()
77 .filter(|k| !map.contains_key(*k))
78 .cloned()
79 .collect();
80 Update { resources, removed }
81 }
82}
83
84#[mutants::skip]
85fn _assert_send() {
86 fn check<T: Send>() {}
87 check::<Server>();
88 check::<Subscriber>();
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use crate::Plane;
95 use std::time::Duration;
96 use xxhash_rust::xxh3::xxh3_64;
97
98 fn setup() -> (Plane, watch::Receiver<()>, Server) {
99 let (plane, rx) = Plane::new();
100 let server = Server::new(plane.snapshot());
101 (plane, rx, server)
102 }
103
104 fn b(s: &'static str) -> Bytes {
105 Bytes::from_static(s.as_bytes())
106 }
107
108 async fn next(sub: &mut Subscriber) -> Update {
111 tokio::time::timeout(Duration::from_millis(200), sub.changed())
112 .await
113 .expect("subscriber timed out — watch was never fired")
114 .expect("watch channel closed")
115 }
116
117 #[test]
118 fn is_empty_true_when_both_vecs_empty() {
119 let u = Update { resources: vec![], removed: vec![] };
120 assert!(u.is_empty());
121 }
122
123 #[test]
124 fn is_empty_false_when_resources_nonempty() {
125 let u = Update { resources: vec![("svc".into(), Bytes::new())], removed: vec![] };
126 assert!(!u.is_empty());
127 }
128
129 #[test]
130 fn is_empty_false_when_removed_nonempty() {
131 let u = Update { resources: vec![], removed: vec!["svc".into()] };
132 assert!(!u.is_empty());
133 }
134
135 #[test]
136 fn initial_sends_all_resources() {
137 let (mut plane, rx, server) = setup();
138 plane.apply("svc-a", b("v1"));
139 plane.apply("svc-b", b("v2"));
140
141 let mut sub = server.subscribe(rx);
142 let update = sub.initial();
143
144 let mut names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
145 names.sort();
146 assert_eq!(names, ["svc-a", "svc-b"]);
147 assert!(update.removed.is_empty());
148 }
149
150 #[test]
151 fn initial_empty_snapshot_is_empty_update() {
152 let (_, rx, server) = setup();
153 let mut sub = server.subscribe(rx);
154 let update = sub.initial();
155 assert!(update.is_empty());
156 }
157
158 #[tokio::test]
159 async fn initial_marks_watch_as_seen() {
160 let (_plane, rx, server) = setup();
161 let mut sub = server.subscribe(rx);
162 let _ = sub.initial();
163
164 let result = tokio::time::timeout(Duration::from_millis(10), sub.changed()).await;
165 assert!(result.is_err(), "changed() should not resolve without a new write");
166 }
167
168 #[tokio::test]
169 async fn changed_sends_only_diff() {
170 let (mut plane, rx, server) = setup();
171 plane.apply("svc-a", b("v1"));
172 plane.apply("svc-b", b("v1"));
173
174 let mut sub = server.subscribe(rx);
175 let _ = sub.initial();
176 sub.acknowledge("svc-a", xxh3_64(b"v1"));
177 sub.acknowledge("svc-b", xxh3_64(b"v1"));
178
179 plane.apply("svc-a", b("v2"));
180
181 let update = next(&mut sub).await;
182 assert_eq!(update.resources.len(), 1);
183 assert_eq!(update.resources[0].0, "svc-a");
184 assert_eq!(update.resources[0].1, b("v2"));
185 assert!(update.removed.is_empty());
186 }
187
188 #[tokio::test]
189 async fn acknowledge_prevents_resend() {
190 let (mut plane, rx, server) = setup();
191 plane.apply("svc-a", b("v1"));
192
193 let mut sub = server.subscribe(rx);
194 let _ = sub.initial();
195 sub.acknowledge("svc-a", xxh3_64(b"v1"));
196
197 plane.apply("svc-b", b("v1"));
198
199 let update = next(&mut sub).await;
200 assert_eq!(update.resources.len(), 1);
201 assert_eq!(update.resources[0].0, "svc-b");
202 }
203
204 #[tokio::test]
205 async fn removed_resource_appears_in_removed_list() {
206 let (mut plane, rx, server) = setup();
207 plane.apply("svc-a", b("v1"));
208
209 let mut sub = server.subscribe(rx);
210 let _ = sub.initial();
211 sub.acknowledge("svc-a", xxh3_64(b"v1"));
212
213 plane.remove("svc-a");
214
215 let update = next(&mut sub).await;
216 assert!(update.resources.is_empty());
217 assert_eq!(update.removed, ["svc-a"]);
218 }
219
220 #[tokio::test]
221 async fn acknowledge_removed_clears_acked_entry() {
222 let (mut plane, rx, server) = setup();
223 plane.apply("svc-a", b("v1"));
224
225 let mut sub = server.subscribe(rx);
226 let _ = sub.initial();
227 sub.acknowledge("svc-a", xxh3_64(b"v1"));
228
229 plane.remove("svc-a");
230 let update = next(&mut sub).await;
231 assert_eq!(update.removed, ["svc-a"]);
232 sub.acknowledge_removed("svc-a");
233
234 plane.apply("svc-a", b("v1"));
235 let update = next(&mut sub).await;
236 assert_eq!(update.resources.len(), 1);
237 assert_eq!(update.resources[0].0, "svc-a");
238 }
239
240 #[test]
241 fn new_subscriber_sees_full_snapshot() {
242 let (mut plane, rx, server) = setup();
243 plane.apply("svc-a", b("v1"));
244 plane.apply("svc-b", b("v2"));
245
246 let mut sub = server.subscribe(rx);
247 let update = sub.initial();
248
249 assert_eq!(update.resources.len(), 2);
250 }
251
252 #[tokio::test]
253 async fn two_subscribers_track_independently() {
254 let (mut plane, rx, server) = setup();
255 plane.apply("svc-a", b("v1"));
256 plane.apply("svc-b", b("v1"));
257
258 let mut sub1 = server.subscribe(rx.clone());
259 let mut sub2 = server.subscribe(rx);
260
261 let _ = sub1.initial();
262 sub1.acknowledge("svc-a", xxh3_64(b"v1"));
263 sub1.acknowledge("svc-b", xxh3_64(b"v1"));
264
265 let _ = sub2.initial();
266
267 plane.apply("svc-a", b("v2"));
268
269 let u1 = next(&mut sub1).await;
270 assert_eq!(u1.resources.len(), 1);
271 assert_eq!(u1.resources[0].0, "svc-a");
272
273 let u2 = next(&mut sub2).await;
274 assert_eq!(u2.resources.len(), 2);
276 }
277
278 #[tokio::test]
279 async fn unacked_resource_resent_on_next_change() {
280 let (mut plane, rx, server) = setup();
281 plane.apply("svc-a", b("v1"));
282 plane.apply("svc-b", b("v1"));
283
284 let mut sub = server.subscribe(rx);
285 let _ = sub.initial();
286 sub.acknowledge("svc-b", xxh3_64(b"v1"));
287
288 plane.apply("svc-b", b("v2"));
289
290 let update = next(&mut sub).await;
291 let names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
292 assert!(names.contains(&"svc-a"));
293 assert!(names.contains(&"svc-b"));
294 }
295}