rime-xds 0.1.0

Lock-free delta-xDS control plane for Envoy. Incremental updates, zero-copy fan-out, and content-addressed versioning — the performant Rust alternative to go-control-plane.
Documentation
use crate::ResourceMap;
use std::collections::HashMap;
use xxhash_rust::xxh3::xxh3_64;

pub struct Delta {
    pub changed: Vec<String>,
    pub removed: Vec<String>,
}

impl Delta {
    pub fn is_empty(&self) -> bool {
        self.changed.is_empty() && self.removed.is_empty()
    }
}

pub struct Index {
    hashes: HashMap<String, u64>,
}

impl Index {
    pub fn new() -> Self {
        Self {
            hashes: HashMap::new(),
        }
    }

    #[mutants::skip]
    pub fn with_capacity(n: usize) -> Self {
        Self {
            hashes: HashMap::with_capacity(n),
        }
    }

    // No allocation on the unchanged fast path: get() borrows by &str,
    // insert() only fires when content actually differs.
    pub fn update(&mut self, name: &str, bytes: &[u8]) -> bool {
        let hash = xxh3_64(bytes);
        match self.hashes.get(name) {
            Some(&h) if h == hash => false,
            _ => {
                self.hashes.insert(name.to_owned(), hash);
                true
            }
        }
    }

    pub fn remove(&mut self, name: &str) -> bool {
        self.hashes.remove(name).is_some()
    }

    pub fn reconcile(&mut self, map: &ResourceMap) -> Delta {
        let mut changed = Vec::new();
        let mut next = HashMap::with_capacity(map.len());

        for (name, bytes) in map {
            let hash = xxh3_64(bytes);
            next.insert(name.clone(), hash);
            if self.hashes.get(name.as_str()) != Some(&hash) {
                changed.push(name.clone());
            }
        }

        let removed: Vec<String> = self
            .hashes
            .keys()
            .filter(|k| !next.contains_key(*k))
            .cloned()
            .collect();

        self.hashes = next;

        Delta { changed, removed }
    }
}

impl Default for Index {
    fn default() -> Self {
        Self::new()
    }
}

#[mutants::skip]
fn _assert_send_sync() {
    fn check<T: Send + Sync>() {}
    check::<Index>();
    check::<Delta>();
}

#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;

    fn map(pairs: &[(&str, &[u8])]) -> ResourceMap {
        pairs
            .iter()
            .map(|(k, v)| ((*k).to_owned(), Bytes::copy_from_slice(v)))
            .collect()
    }

    #[test]
    fn new_resource_is_changed() {
        let mut idx = Index::new();
        assert!(idx.update("svc-a", b"v1"));
    }

    #[test]
    fn same_content_is_unchanged() {
        let mut idx = Index::new();
        idx.update("svc-a", b"v1");
        assert!(!idx.update("svc-a", b"v1"));
    }

    #[test]
    fn mutated_content_is_changed() {
        let mut idx = Index::new();
        idx.update("svc-a", b"v1");
        assert!(idx.update("svc-a", b"v2"));
    }

    #[test]
    fn remove_existing_returns_true() {
        let mut idx = Index::new();
        idx.update("svc-a", b"v1");
        assert!(idx.remove("svc-a"));
    }

    #[test]
    fn remove_nonexistent_returns_false() {
        let mut idx = Index::new();
        assert!(!idx.remove("svc-a"));
    }

    #[test]
    fn remove_then_re_add_is_new_change() {
        let mut idx = Index::new();
        idx.update("svc-a", b"v1");
        idx.remove("svc-a");
        assert!(idx.update("svc-a", b"v1"));
    }

    #[test]
    fn reconcile_empty_index_all_changed() {
        let mut idx = Index::new();
        let m = map(&[("a", b"1"), ("b", b"2")]);
        let delta = idx.reconcile(&m);
        let mut got = delta.changed.clone();
        got.sort();
        assert_eq!(got, ["a", "b"]);
        assert!(delta.removed.is_empty());
    }

    #[test]
    fn reconcile_no_changes_is_empty() {
        let mut idx = Index::new();
        let m = map(&[("a", b"1"), ("b", b"2")]);
        idx.reconcile(&m);
        let delta = idx.reconcile(&m);
        assert!(delta.is_empty());
    }

    #[test]
    fn reconcile_detects_single_mutation() {
        let mut idx = Index::new();
        idx.reconcile(&map(&[("a", b"1"), ("b", b"2")]));
        let delta = idx.reconcile(&map(&[("a", b"1"), ("b", b"changed")]));
        assert_eq!(delta.changed, ["b"]);
        assert!(delta.removed.is_empty());
    }

    #[test]
    fn reconcile_detects_removal() {
        let mut idx = Index::new();
        idx.reconcile(&map(&[("a", b"1"), ("b", b"2")]));
        let delta = idx.reconcile(&map(&[("a", b"1")]));
        assert!(delta.changed.is_empty());
        assert_eq!(delta.removed, ["b"]);
    }

    #[test]
    fn reconcile_updates_index_state() {
        let mut idx = Index::new();
        idx.reconcile(&map(&[("a", b"v1")]));
        idx.reconcile(&map(&[("a", b"v2")]));
        // third reconcile with same v2 should be silent
        let delta = idx.reconcile(&map(&[("a", b"v2")]));
        assert!(delta.is_empty());
    }

    #[test]
    fn update_after_reconcile_respects_new_hash() {
        let mut idx = Index::new();
        idx.reconcile(&map(&[("a", b"v1")]));
        // v1 already indexed — update with same bytes
        assert!(!idx.update("a", b"v1"));
        // update with new bytes
        assert!(idx.update("a", b"v2"));
    }
}