rime-xds 0.1.0

Lock-free delta-xDS control plane for Envoy. Incremental updates, zero-copy fan-out, and content-addressed versioning — the performant Rust alternative to go-control-plane.
Documentation
use crate::{Delta, Index, ResourceMap, Snapshot};
use bytes::Bytes;
use std::sync::Arc;
use tokio::sync::watch;

pub struct Plane {
    index: Index,
    snapshot: Arc<Snapshot>,
    notify: watch::Sender<()>,
}

impl Plane {
    pub fn new() -> (Self, watch::Receiver<()>) {
        let (notify, rx) = watch::channel(());
        let plane = Self {
            index: Index::new(),
            snapshot: Arc::new(Snapshot::new()),
            notify,
        };
        (plane, rx)
    }

    pub fn snapshot(&self) -> Arc<Snapshot> {
        Arc::clone(&self.snapshot)
    }

    pub fn apply(&mut self, name: &str, bytes: Bytes) -> bool {
        if !self.index.update(name, &bytes) {
            return false;
        }
        let mut next = (**self.snapshot.load()).clone();
        next.insert(name.to_owned(), bytes);
        self.snapshot.store(Arc::new(next));
        self.notify.send(()).ok();
        true
    }

    pub fn remove(&mut self, name: &str) -> bool {
        if !self.index.remove(name) {
            return false;
        }
        let mut next = (**self.snapshot.load()).clone();
        next.remove(name);
        self.snapshot.store(Arc::new(next));
        self.notify.send(()).ok();
        true
    }

    pub fn reconcile(&mut self, map: ResourceMap) -> Delta {
        let delta = self.index.reconcile(&map);
        if !delta.is_empty() {
            self.snapshot.store(Arc::new(map));
            self.notify.send(()).ok();
        }
        delta
    }
}

#[mutants::skip]
fn _assert_plane_is_send() {
    fn check<T: Send>() {}
    check::<Plane>();
}

#[cfg(test)]
mod tests {
    use super::*;

    fn bytes(s: &'static str) -> Bytes {
        Bytes::from_static(s.as_bytes())
    }

    #[test]
    fn apply_new_resource_visible_in_snapshot() {
        let (mut plane, _rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        let guard = plane.snapshot().load();
        assert_eq!(guard.get("svc-a").unwrap(), &bytes("v1"));
    }

    #[test]
    fn apply_same_content_is_noop() {
        let (mut plane, _rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        assert!(!plane.apply("svc-a", bytes("v1")));
    }

    #[test]
    fn apply_mutated_content_updates_snapshot() {
        let (mut plane, _rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        plane.apply("svc-a", bytes("v2"));
        assert_eq!(
            plane.snapshot().load().get("svc-a").unwrap(),
            &bytes("v2"),
        );
    }

    #[test]
    fn remove_existing_drops_from_snapshot() {
        let (mut plane, _rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        assert!(plane.remove("svc-a"));
        assert!(plane.snapshot().load().get("svc-a").is_none());
    }

    #[test]
    fn remove_nonexistent_is_noop() {
        let (mut plane, _rx) = Plane::new();
        assert!(!plane.remove("svc-a"));
    }

    #[test]
    fn remove_then_apply_same_bytes_is_changed() {
        let (mut plane, _rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        plane.remove("svc-a");
        assert!(plane.apply("svc-a", bytes("v1")));
    }

    #[test]
    fn reconcile_replaces_snapshot() {
        let (mut plane, _rx) = Plane::new();
        let mut map = ResourceMap::new();
        map.insert("svc-a".to_owned(), bytes("v1"));
        map.insert("svc-b".to_owned(), bytes("v2"));
        plane.reconcile(map);

        let guard = plane.snapshot().load();
        assert_eq!(guard.get("svc-a").unwrap(), &bytes("v1"));
        assert_eq!(guard.get("svc-b").unwrap(), &bytes("v2"));
    }

    #[test]
    fn reconcile_unchanged_returns_empty_delta() {
        let (mut plane, _rx) = Plane::new();
        let mut map = ResourceMap::new();
        map.insert("svc-a".to_owned(), bytes("v1"));
        plane.reconcile(map.clone());
        let delta = plane.reconcile(map);
        assert!(delta.is_empty());
    }

    #[test]
    fn apply_fires_watch() {
        let (mut plane, rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        assert!(rx.has_changed().unwrap());
    }

    #[test]
    fn unchanged_apply_does_not_fire_watch() {
        let (mut plane, mut rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        let _ = rx.borrow_and_update();
        plane.apply("svc-a", bytes("v1"));
        assert!(!rx.has_changed().unwrap());
    }

    #[test]
    fn remove_fires_watch() {
        let (mut plane, mut rx) = Plane::new();
        plane.apply("svc-a", bytes("v1"));
        let _ = rx.borrow_and_update();
        plane.remove("svc-a");
        assert!(rx.has_changed().unwrap());
    }

    #[test]
    fn reconcile_with_changes_fires_watch() {
        let (mut plane, mut rx) = Plane::new();
        let mut map = ResourceMap::new();
        map.insert("svc-a".to_owned(), bytes("v1"));
        plane.reconcile(map.clone());
        let _ = rx.borrow_and_update();

        map.insert("svc-a".to_owned(), bytes("v2"));
        plane.reconcile(map);
        assert!(rx.has_changed().unwrap());
    }

    #[test]
    fn reconcile_without_changes_is_silent() {
        let (mut plane, mut rx) = Plane::new();
        let mut map = ResourceMap::new();
        map.insert("svc-a".to_owned(), bytes("v1"));
        plane.reconcile(map.clone());
        let _ = rx.borrow_and_update();
        plane.reconcile(map);
        assert!(!rx.has_changed().unwrap());
    }

    #[test]
    fn snapshot_handle_sees_updates_after_issue() {
        let (mut plane, _rx) = Plane::new();
        let snap = plane.snapshot();

        plane.apply("svc-a", bytes("v1"));
        assert_eq!(snap.load().get("svc-a").unwrap(), &bytes("v1"));

        plane.apply("svc-a", bytes("v2"));
        assert_eq!(snap.load().get("svc-a").unwrap(), &bytes("v2"));
    }
}