Skip to main content

rime_xds/
server.rs

1use crate::{ResourceMap, Snapshot};
2use bytes::Bytes;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::watch;
6use xxhash_rust::xxh3::xxh3_64;
7
8pub struct Update {
9    pub resources: Vec<(String, Bytes)>,
10    pub removed: Vec<String>,
11}
12
13impl Update {
14    pub fn is_empty(&self) -> bool {
15        self.resources.is_empty() && self.removed.is_empty()
16    }
17}
18
19pub struct Server {
20    snapshot: Arc<Snapshot>,
21}
22
23impl Server {
24    pub fn new(snapshot: Arc<Snapshot>) -> Self {
25        Self { snapshot }
26    }
27
28    pub fn subscribe(&self, watch: watch::Receiver<()>) -> Subscriber {
29        Subscriber {
30            snapshot: Arc::clone(&self.snapshot),
31            watch,
32            acked: HashMap::new(),
33        }
34    }
35}
36
37pub struct Subscriber {
38    snapshot: Arc<Snapshot>,
39    watch: watch::Receiver<()>,
40    acked: HashMap<String, u64>,
41}
42
43impl Subscriber {
44    // Computes the full initial update and marks the current watch state as seen
45    // so that subsequent calls to `changed` only fire on future snapshot mutations.
46    pub fn initial(&mut self) -> Update {
47        let _ = self.watch.borrow_and_update();
48        let guard = self.snapshot.load();
49        self.diff(&guard)
50    }
51
52    pub async fn changed(&mut self) -> Option<Update> {
53        self.watch.changed().await.ok()?;
54        let guard = self.snapshot.load();
55        Some(self.diff(&guard))
56    }
57
58    pub fn acknowledge(&mut self, name: &str, version: u64) {
59        self.acked.insert(name.to_owned(), version);
60    }
61
62    pub fn acknowledge_removed(&mut self, name: &str) {
63        self.acked.remove(name);
64    }
65
66    fn diff(&self, map: &ResourceMap) -> Update {
67        let mut resources = Vec::new();
68        for (name, bytes) in map {
69            let hash = xxh3_64(bytes);
70            if self.acked.get(name.as_str()) != Some(&hash) {
71                resources.push((name.clone(), bytes.clone()));
72            }
73        }
74        let removed = self
75            .acked
76            .keys()
77            .filter(|k| !map.contains_key(*k))
78            .cloned()
79            .collect();
80        Update { resources, removed }
81    }
82}
83
84#[mutants::skip]
85fn _assert_send() {
86    fn check<T: Send>() {}
87    check::<Server>();
88    check::<Subscriber>();
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use crate::Plane;
95    use std::time::Duration;
96    use xxhash_rust::xxh3::xxh3_64;
97
98    fn setup() -> (Plane, watch::Receiver<()>, Server) {
99        let (plane, rx) = Plane::new();
100        let server = Server::new(plane.snapshot());
101        (plane, rx, server)
102    }
103
104    fn b(s: &'static str) -> Bytes {
105        Bytes::from_static(s.as_bytes())
106    }
107
108    // Wraps changed() with a hard deadline so mutations that prevent the watch
109    // from firing produce an immediate panic rather than a 20-second timeout.
110    async fn next(sub: &mut Subscriber) -> Update {
111        tokio::time::timeout(Duration::from_millis(200), sub.changed())
112            .await
113            .expect("subscriber timed out — watch was never fired")
114            .expect("watch channel closed")
115    }
116
117    #[test]
118    fn is_empty_true_when_both_vecs_empty() {
119        let u = Update { resources: vec![], removed: vec![] };
120        assert!(u.is_empty());
121    }
122
123    #[test]
124    fn is_empty_false_when_resources_nonempty() {
125        let u = Update { resources: vec![("svc".into(), Bytes::new())], removed: vec![] };
126        assert!(!u.is_empty());
127    }
128
129    #[test]
130    fn is_empty_false_when_removed_nonempty() {
131        let u = Update { resources: vec![], removed: vec!["svc".into()] };
132        assert!(!u.is_empty());
133    }
134
135    #[test]
136    fn initial_sends_all_resources() {
137        let (mut plane, rx, server) = setup();
138        plane.apply("svc-a", b("v1"));
139        plane.apply("svc-b", b("v2"));
140
141        let mut sub = server.subscribe(rx);
142        let update = sub.initial();
143
144        let mut names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
145        names.sort();
146        assert_eq!(names, ["svc-a", "svc-b"]);
147        assert!(update.removed.is_empty());
148    }
149
150    #[test]
151    fn initial_empty_snapshot_is_empty_update() {
152        let (_, rx, server) = setup();
153        let mut sub = server.subscribe(rx);
154        let update = sub.initial();
155        assert!(update.is_empty());
156    }
157
158    #[tokio::test]
159    async fn initial_marks_watch_as_seen() {
160        let (_plane, rx, server) = setup();
161        let mut sub = server.subscribe(rx);
162        let _ = sub.initial();
163
164        let result = tokio::time::timeout(Duration::from_millis(10), sub.changed()).await;
165        assert!(result.is_err(), "changed() should not resolve without a new write");
166    }
167
168    #[tokio::test]
169    async fn changed_sends_only_diff() {
170        let (mut plane, rx, server) = setup();
171        plane.apply("svc-a", b("v1"));
172        plane.apply("svc-b", b("v1"));
173
174        let mut sub = server.subscribe(rx);
175        let _ = sub.initial();
176        sub.acknowledge("svc-a", xxh3_64(b"v1"));
177        sub.acknowledge("svc-b", xxh3_64(b"v1"));
178
179        plane.apply("svc-a", b("v2"));
180
181        let update = next(&mut sub).await;
182        assert_eq!(update.resources.len(), 1);
183        assert_eq!(update.resources[0].0, "svc-a");
184        assert_eq!(update.resources[0].1, b("v2"));
185        assert!(update.removed.is_empty());
186    }
187
188    #[tokio::test]
189    async fn acknowledge_prevents_resend() {
190        let (mut plane, rx, server) = setup();
191        plane.apply("svc-a", b("v1"));
192
193        let mut sub = server.subscribe(rx);
194        let _ = sub.initial();
195        sub.acknowledge("svc-a", xxh3_64(b"v1"));
196
197        plane.apply("svc-b", b("v1"));
198
199        let update = next(&mut sub).await;
200        assert_eq!(update.resources.len(), 1);
201        assert_eq!(update.resources[0].0, "svc-b");
202    }
203
204    #[tokio::test]
205    async fn removed_resource_appears_in_removed_list() {
206        let (mut plane, rx, server) = setup();
207        plane.apply("svc-a", b("v1"));
208
209        let mut sub = server.subscribe(rx);
210        let _ = sub.initial();
211        sub.acknowledge("svc-a", xxh3_64(b"v1"));
212
213        plane.remove("svc-a");
214
215        let update = next(&mut sub).await;
216        assert!(update.resources.is_empty());
217        assert_eq!(update.removed, ["svc-a"]);
218    }
219
220    #[tokio::test]
221    async fn acknowledge_removed_clears_acked_entry() {
222        let (mut plane, rx, server) = setup();
223        plane.apply("svc-a", b("v1"));
224
225        let mut sub = server.subscribe(rx);
226        let _ = sub.initial();
227        sub.acknowledge("svc-a", xxh3_64(b"v1"));
228
229        plane.remove("svc-a");
230        let update = next(&mut sub).await;
231        assert_eq!(update.removed, ["svc-a"]);
232        sub.acknowledge_removed("svc-a");
233
234        plane.apply("svc-a", b("v1"));
235        let update = next(&mut sub).await;
236        assert_eq!(update.resources.len(), 1);
237        assert_eq!(update.resources[0].0, "svc-a");
238    }
239
240    #[test]
241    fn new_subscriber_sees_full_snapshot() {
242        let (mut plane, rx, server) = setup();
243        plane.apply("svc-a", b("v1"));
244        plane.apply("svc-b", b("v2"));
245
246        let mut sub = server.subscribe(rx);
247        let update = sub.initial();
248
249        assert_eq!(update.resources.len(), 2);
250    }
251
252    #[tokio::test]
253    async fn two_subscribers_track_independently() {
254        let (mut plane, rx, server) = setup();
255        plane.apply("svc-a", b("v1"));
256        plane.apply("svc-b", b("v1"));
257
258        let mut sub1 = server.subscribe(rx.clone());
259        let mut sub2 = server.subscribe(rx);
260
261        let _ = sub1.initial();
262        sub1.acknowledge("svc-a", xxh3_64(b"v1"));
263        sub1.acknowledge("svc-b", xxh3_64(b"v1"));
264
265        let _ = sub2.initial();
266
267        plane.apply("svc-a", b("v2"));
268
269        let u1 = next(&mut sub1).await;
270        assert_eq!(u1.resources.len(), 1);
271        assert_eq!(u1.resources[0].0, "svc-a");
272
273        let u2 = next(&mut sub2).await;
274        // sub2 acked nothing: svc-a (changed) + svc-b (still unacked) both come through
275        assert_eq!(u2.resources.len(), 2);
276    }
277
278    #[tokio::test]
279    async fn unacked_resource_resent_on_next_change() {
280        let (mut plane, rx, server) = setup();
281        plane.apply("svc-a", b("v1"));
282        plane.apply("svc-b", b("v1"));
283
284        let mut sub = server.subscribe(rx);
285        let _ = sub.initial();
286        sub.acknowledge("svc-b", xxh3_64(b"v1"));
287
288        plane.apply("svc-b", b("v2"));
289
290        let update = next(&mut sub).await;
291        let names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
292        assert!(names.contains(&"svc-a"));
293        assert!(names.contains(&"svc-b"));
294    }
295}