use crate::{ResourceMap, Snapshot};
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::watch;
use xxhash_rust::xxh3::xxh3_64;
pub struct Update {
pub resources: Vec<(String, Bytes)>,
pub removed: Vec<String>,
}
impl Update {
pub fn is_empty(&self) -> bool {
self.resources.is_empty() && self.removed.is_empty()
}
}
pub struct Server {
snapshot: Arc<Snapshot>,
}
impl Server {
pub fn new(snapshot: Arc<Snapshot>) -> Self {
Self { snapshot }
}
pub fn subscribe(&self, watch: watch::Receiver<()>) -> Subscriber {
Subscriber {
snapshot: Arc::clone(&self.snapshot),
watch,
acked: HashMap::new(),
}
}
}
pub struct Subscriber {
snapshot: Arc<Snapshot>,
watch: watch::Receiver<()>,
acked: HashMap<String, u64>,
}
impl Subscriber {
pub fn initial(&mut self) -> Update {
let _ = self.watch.borrow_and_update();
let guard = self.snapshot.load();
self.diff(&guard)
}
pub async fn changed(&mut self) -> Option<Update> {
self.watch.changed().await.ok()?;
let guard = self.snapshot.load();
Some(self.diff(&guard))
}
pub fn acknowledge(&mut self, name: &str, version: u64) {
self.acked.insert(name.to_owned(), version);
}
pub fn acknowledge_removed(&mut self, name: &str) {
self.acked.remove(name);
}
fn diff(&self, map: &ResourceMap) -> Update {
let mut resources = Vec::new();
for (name, bytes) in map {
let hash = xxh3_64(bytes);
if self.acked.get(name.as_str()) != Some(&hash) {
resources.push((name.clone(), bytes.clone()));
}
}
let removed = self
.acked
.keys()
.filter(|k| !map.contains_key(*k))
.cloned()
.collect();
Update { resources, removed }
}
}
#[mutants::skip]
fn _assert_send() {
fn check<T: Send>() {}
check::<Server>();
check::<Subscriber>();
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Plane;
use std::time::Duration;
use xxhash_rust::xxh3::xxh3_64;
fn setup() -> (Plane, watch::Receiver<()>, Server) {
let (plane, rx) = Plane::new();
let server = Server::new(plane.snapshot());
(plane, rx, server)
}
fn b(s: &'static str) -> Bytes {
Bytes::from_static(s.as_bytes())
}
async fn next(sub: &mut Subscriber) -> Update {
tokio::time::timeout(Duration::from_millis(200), sub.changed())
.await
.expect("subscriber timed out — watch was never fired")
.expect("watch channel closed")
}
#[test]
fn is_empty_true_when_both_vecs_empty() {
let u = Update { resources: vec![], removed: vec![] };
assert!(u.is_empty());
}
#[test]
fn is_empty_false_when_resources_nonempty() {
let u = Update { resources: vec![("svc".into(), Bytes::new())], removed: vec![] };
assert!(!u.is_empty());
}
#[test]
fn is_empty_false_when_removed_nonempty() {
let u = Update { resources: vec![], removed: vec!["svc".into()] };
assert!(!u.is_empty());
}
#[test]
fn initial_sends_all_resources() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
plane.apply("svc-b", b("v2"));
let mut sub = server.subscribe(rx);
let update = sub.initial();
let mut names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
names.sort();
assert_eq!(names, ["svc-a", "svc-b"]);
assert!(update.removed.is_empty());
}
#[test]
fn initial_empty_snapshot_is_empty_update() {
let (_, rx, server) = setup();
let mut sub = server.subscribe(rx);
let update = sub.initial();
assert!(update.is_empty());
}
#[tokio::test]
async fn initial_marks_watch_as_seen() {
let (_plane, rx, server) = setup();
let mut sub = server.subscribe(rx);
let _ = sub.initial();
let result = tokio::time::timeout(Duration::from_millis(10), sub.changed()).await;
assert!(result.is_err(), "changed() should not resolve without a new write");
}
#[tokio::test]
async fn changed_sends_only_diff() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
plane.apply("svc-b", b("v1"));
let mut sub = server.subscribe(rx);
let _ = sub.initial();
sub.acknowledge("svc-a", xxh3_64(b"v1"));
sub.acknowledge("svc-b", xxh3_64(b"v1"));
plane.apply("svc-a", b("v2"));
let update = next(&mut sub).await;
assert_eq!(update.resources.len(), 1);
assert_eq!(update.resources[0].0, "svc-a");
assert_eq!(update.resources[0].1, b("v2"));
assert!(update.removed.is_empty());
}
#[tokio::test]
async fn acknowledge_prevents_resend() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
let mut sub = server.subscribe(rx);
let _ = sub.initial();
sub.acknowledge("svc-a", xxh3_64(b"v1"));
plane.apply("svc-b", b("v1"));
let update = next(&mut sub).await;
assert_eq!(update.resources.len(), 1);
assert_eq!(update.resources[0].0, "svc-b");
}
#[tokio::test]
async fn removed_resource_appears_in_removed_list() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
let mut sub = server.subscribe(rx);
let _ = sub.initial();
sub.acknowledge("svc-a", xxh3_64(b"v1"));
plane.remove("svc-a");
let update = next(&mut sub).await;
assert!(update.resources.is_empty());
assert_eq!(update.removed, ["svc-a"]);
}
#[tokio::test]
async fn acknowledge_removed_clears_acked_entry() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
let mut sub = server.subscribe(rx);
let _ = sub.initial();
sub.acknowledge("svc-a", xxh3_64(b"v1"));
plane.remove("svc-a");
let update = next(&mut sub).await;
assert_eq!(update.removed, ["svc-a"]);
sub.acknowledge_removed("svc-a");
plane.apply("svc-a", b("v1"));
let update = next(&mut sub).await;
assert_eq!(update.resources.len(), 1);
assert_eq!(update.resources[0].0, "svc-a");
}
#[test]
fn new_subscriber_sees_full_snapshot() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
plane.apply("svc-b", b("v2"));
let mut sub = server.subscribe(rx);
let update = sub.initial();
assert_eq!(update.resources.len(), 2);
}
#[tokio::test]
async fn two_subscribers_track_independently() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
plane.apply("svc-b", b("v1"));
let mut sub1 = server.subscribe(rx.clone());
let mut sub2 = server.subscribe(rx);
let _ = sub1.initial();
sub1.acknowledge("svc-a", xxh3_64(b"v1"));
sub1.acknowledge("svc-b", xxh3_64(b"v1"));
let _ = sub2.initial();
plane.apply("svc-a", b("v2"));
let u1 = next(&mut sub1).await;
assert_eq!(u1.resources.len(), 1);
assert_eq!(u1.resources[0].0, "svc-a");
let u2 = next(&mut sub2).await;
assert_eq!(u2.resources.len(), 2);
}
#[tokio::test]
async fn unacked_resource_resent_on_next_change() {
let (mut plane, rx, server) = setup();
plane.apply("svc-a", b("v1"));
plane.apply("svc-b", b("v1"));
let mut sub = server.subscribe(rx);
let _ = sub.initial();
sub.acknowledge("svc-b", xxh3_64(b"v1"));
plane.apply("svc-b", b("v2"));
let update = next(&mut sub).await;
let names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
assert!(names.contains(&"svc-a"));
assert!(names.contains(&"svc-b"));
}
}