rime-xds 0.1.0

Lock-free delta-xDS control plane for Envoy. Incremental updates, zero-copy fan-out, and content-addressed versioning — the performant Rust alternative to go-control-plane.
Documentation
use crate::{ResourceMap, Snapshot};
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::watch;
use xxhash_rust::xxh3::xxh3_64;

pub struct Update {
    pub resources: Vec<(String, Bytes)>,
    pub removed: Vec<String>,
}

impl Update {
    pub fn is_empty(&self) -> bool {
        self.resources.is_empty() && self.removed.is_empty()
    }
}

pub struct Server {
    snapshot: Arc<Snapshot>,
}

impl Server {
    pub fn new(snapshot: Arc<Snapshot>) -> Self {
        Self { snapshot }
    }

    pub fn subscribe(&self, watch: watch::Receiver<()>) -> Subscriber {
        Subscriber {
            snapshot: Arc::clone(&self.snapshot),
            watch,
            acked: HashMap::new(),
        }
    }
}

pub struct Subscriber {
    snapshot: Arc<Snapshot>,
    watch: watch::Receiver<()>,
    acked: HashMap<String, u64>,
}

impl Subscriber {
    // Computes the full initial update and marks the current watch state as seen
    // so that subsequent calls to `changed` only fire on future snapshot mutations.
    pub fn initial(&mut self) -> Update {
        let _ = self.watch.borrow_and_update();
        let guard = self.snapshot.load();
        self.diff(&guard)
    }

    pub async fn changed(&mut self) -> Option<Update> {
        self.watch.changed().await.ok()?;
        let guard = self.snapshot.load();
        Some(self.diff(&guard))
    }

    pub fn acknowledge(&mut self, name: &str, version: u64) {
        self.acked.insert(name.to_owned(), version);
    }

    pub fn acknowledge_removed(&mut self, name: &str) {
        self.acked.remove(name);
    }

    fn diff(&self, map: &ResourceMap) -> Update {
        let mut resources = Vec::new();
        for (name, bytes) in map {
            let hash = xxh3_64(bytes);
            if self.acked.get(name.as_str()) != Some(&hash) {
                resources.push((name.clone(), bytes.clone()));
            }
        }
        let removed = self
            .acked
            .keys()
            .filter(|k| !map.contains_key(*k))
            .cloned()
            .collect();
        Update { resources, removed }
    }
}

#[mutants::skip]
fn _assert_send() {
    fn check<T: Send>() {}
    check::<Server>();
    check::<Subscriber>();
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::Plane;
    use std::time::Duration;
    use xxhash_rust::xxh3::xxh3_64;

    fn setup() -> (Plane, watch::Receiver<()>, Server) {
        let (plane, rx) = Plane::new();
        let server = Server::new(plane.snapshot());
        (plane, rx, server)
    }

    fn b(s: &'static str) -> Bytes {
        Bytes::from_static(s.as_bytes())
    }

    // Wraps changed() with a hard deadline so mutations that prevent the watch
    // from firing produce an immediate panic rather than a 20-second timeout.
    async fn next(sub: &mut Subscriber) -> Update {
        tokio::time::timeout(Duration::from_millis(200), sub.changed())
            .await
            .expect("subscriber timed out — watch was never fired")
            .expect("watch channel closed")
    }

    #[test]
    fn is_empty_true_when_both_vecs_empty() {
        let u = Update { resources: vec![], removed: vec![] };
        assert!(u.is_empty());
    }

    #[test]
    fn is_empty_false_when_resources_nonempty() {
        let u = Update { resources: vec![("svc".into(), Bytes::new())], removed: vec![] };
        assert!(!u.is_empty());
    }

    #[test]
    fn is_empty_false_when_removed_nonempty() {
        let u = Update { resources: vec![], removed: vec!["svc".into()] };
        assert!(!u.is_empty());
    }

    #[test]
    fn initial_sends_all_resources() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));
        plane.apply("svc-b", b("v2"));

        let mut sub = server.subscribe(rx);
        let update = sub.initial();

        let mut names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
        names.sort();
        assert_eq!(names, ["svc-a", "svc-b"]);
        assert!(update.removed.is_empty());
    }

    #[test]
    fn initial_empty_snapshot_is_empty_update() {
        let (_, rx, server) = setup();
        let mut sub = server.subscribe(rx);
        let update = sub.initial();
        assert!(update.is_empty());
    }

    #[tokio::test]
    async fn initial_marks_watch_as_seen() {
        let (_plane, rx, server) = setup();
        let mut sub = server.subscribe(rx);
        let _ = sub.initial();

        let result = tokio::time::timeout(Duration::from_millis(10), sub.changed()).await;
        assert!(result.is_err(), "changed() should not resolve without a new write");
    }

    #[tokio::test]
    async fn changed_sends_only_diff() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));
        plane.apply("svc-b", b("v1"));

        let mut sub = server.subscribe(rx);
        let _ = sub.initial();
        sub.acknowledge("svc-a", xxh3_64(b"v1"));
        sub.acknowledge("svc-b", xxh3_64(b"v1"));

        plane.apply("svc-a", b("v2"));

        let update = next(&mut sub).await;
        assert_eq!(update.resources.len(), 1);
        assert_eq!(update.resources[0].0, "svc-a");
        assert_eq!(update.resources[0].1, b("v2"));
        assert!(update.removed.is_empty());
    }

    #[tokio::test]
    async fn acknowledge_prevents_resend() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));

        let mut sub = server.subscribe(rx);
        let _ = sub.initial();
        sub.acknowledge("svc-a", xxh3_64(b"v1"));

        plane.apply("svc-b", b("v1"));

        let update = next(&mut sub).await;
        assert_eq!(update.resources.len(), 1);
        assert_eq!(update.resources[0].0, "svc-b");
    }

    #[tokio::test]
    async fn removed_resource_appears_in_removed_list() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));

        let mut sub = server.subscribe(rx);
        let _ = sub.initial();
        sub.acknowledge("svc-a", xxh3_64(b"v1"));

        plane.remove("svc-a");

        let update = next(&mut sub).await;
        assert!(update.resources.is_empty());
        assert_eq!(update.removed, ["svc-a"]);
    }

    #[tokio::test]
    async fn acknowledge_removed_clears_acked_entry() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));

        let mut sub = server.subscribe(rx);
        let _ = sub.initial();
        sub.acknowledge("svc-a", xxh3_64(b"v1"));

        plane.remove("svc-a");
        let update = next(&mut sub).await;
        assert_eq!(update.removed, ["svc-a"]);
        sub.acknowledge_removed("svc-a");

        plane.apply("svc-a", b("v1"));
        let update = next(&mut sub).await;
        assert_eq!(update.resources.len(), 1);
        assert_eq!(update.resources[0].0, "svc-a");
    }

    #[test]
    fn new_subscriber_sees_full_snapshot() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));
        plane.apply("svc-b", b("v2"));

        let mut sub = server.subscribe(rx);
        let update = sub.initial();

        assert_eq!(update.resources.len(), 2);
    }

    #[tokio::test]
    async fn two_subscribers_track_independently() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));
        plane.apply("svc-b", b("v1"));

        let mut sub1 = server.subscribe(rx.clone());
        let mut sub2 = server.subscribe(rx);

        let _ = sub1.initial();
        sub1.acknowledge("svc-a", xxh3_64(b"v1"));
        sub1.acknowledge("svc-b", xxh3_64(b"v1"));

        let _ = sub2.initial();

        plane.apply("svc-a", b("v2"));

        let u1 = next(&mut sub1).await;
        assert_eq!(u1.resources.len(), 1);
        assert_eq!(u1.resources[0].0, "svc-a");

        let u2 = next(&mut sub2).await;
        // sub2 acked nothing: svc-a (changed) + svc-b (still unacked) both come through
        assert_eq!(u2.resources.len(), 2);
    }

    #[tokio::test]
    async fn unacked_resource_resent_on_next_change() {
        let (mut plane, rx, server) = setup();
        plane.apply("svc-a", b("v1"));
        plane.apply("svc-b", b("v1"));

        let mut sub = server.subscribe(rx);
        let _ = sub.initial();
        sub.acknowledge("svc-b", xxh3_64(b"v1"));

        plane.apply("svc-b", b("v2"));

        let update = next(&mut sub).await;
        let names: Vec<_> = update.resources.iter().map(|(n, _)| n.as_str()).collect();
        assert!(names.contains(&"svc-a"));
        assert!(names.contains(&"svc-b"));
    }
}