rime-xds 0.1.0

Lock-free delta-xDS control plane for Envoy. Incremental updates, zero-copy fan-out, and content-addressed versioning — the performant Rust alternative to go-control-plane.
Documentation
use arc_swap::ArcSwap;
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;

pub type ResourceMap = HashMap<String, Bytes>;

pub struct Snapshot {
    inner: ArcSwap<ResourceMap>,
}

impl Snapshot {
    pub fn new() -> Self {
        Self {
            inner: ArcSwap::from_pointee(ResourceMap::new()),
        }
    }

    #[must_use]
    pub fn load(&self) -> arc_swap::Guard<Arc<ResourceMap>> {
        self.inner.load()
    }

    pub(crate) fn store(&self, map: Arc<ResourceMap>) {
        self.inner.store(map);
    }
}

impl Default for Snapshot {
    fn default() -> Self {
        Self::new()
    }
}

#[mutants::skip]
fn _assert_send_sync() {
    fn check<T: Send + Sync>() {}
    check::<Snapshot>();
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn empty_on_construction() {
        let snap = Snapshot::new();
        assert!(snap.load().is_empty());
    }

    #[test]
    fn store_and_load_roundtrip() {
        let snap = Snapshot::new();
        let mut map = ResourceMap::new();
        map.insert("svc-a".to_owned(), Bytes::from_static(b"v1"));
        snap.store(Arc::new(map));

        let guard = snap.load();
        assert_eq!(guard.get("svc-a").unwrap(), &Bytes::from_static(b"v1"));
    }

    #[test]
    fn store_replaces_atomically() {
        let snap = Snapshot::new();

        let mut m1 = ResourceMap::new();
        m1.insert("svc-a".to_owned(), Bytes::from_static(b"v1"));
        snap.store(Arc::new(m1));

        let mut m2 = ResourceMap::new();
        m2.insert("svc-a".to_owned(), Bytes::from_static(b"v2"));
        snap.store(Arc::new(m2));

        assert_eq!(
            snap.load().get("svc-a").unwrap(),
            &Bytes::from_static(b"v2"),
        );
    }

    #[test]
    fn bytes_clone_is_zero_copy() {
        let snap = Snapshot::new();
        let payload = Bytes::from(vec![0u8; 102_400]);
        let mut map = ResourceMap::new();
        map.insert("large".to_owned(), payload.clone());
        snap.store(Arc::new(map));

        let g1 = snap.load();
        let g2 = snap.load();

        // Both guards point to the same allocation — pointer equality confirms
        // no memcpy occurred.
        assert!(std::ptr::eq(
            g1.get("large").unwrap().as_ptr(),
            g2.get("large").unwrap().as_ptr(),
        ));
    }

    #[test]
    fn concurrent_readers_see_latest_write() {
        use std::sync::Arc as StdArc;
        use std::thread;

        let snap = StdArc::new(Snapshot::new());
        let readers: Vec<_> = (0..8)
            .map(|_| {
                let s = StdArc::clone(&snap);
                thread::spawn(move || {
                    for _ in 0..10_000 {
                        let _ = s.load().len();
                    }
                })
            })
            .collect();

        for i in 0..100u8 {
            let mut map = ResourceMap::new();
            map.insert("k".to_owned(), Bytes::from(vec![i]));
            snap.store(Arc::new(map));
        }

        for h in readers {
            h.join().unwrap();
        }
    }
}