rpcnet 0.1.0

RPC library based on QUIC+TLS encryption
Documentation
use crate::cluster::gossip::{NodeId, NodeState};
use crate::cluster::incarnation::{resolve_conflict, NodeStatus};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

pub trait NodeRegistry: Send + Sync {
    fn insert(&self, status: NodeStatus);
    fn get(&self, node_id: &NodeId) -> Option<NodeStatus>;
    fn remove(&self, node_id: &NodeId) -> Option<NodeStatus>;
    fn all_nodes(&self) -> Vec<NodeStatus>;
    fn alive_nodes(&self) -> Vec<NodeStatus>;
    fn len(&self) -> usize;
    fn is_empty(&self) -> bool;
}

#[derive(Clone)]
pub struct SharedNodeRegistry {
    inner: Arc<RwLock<HashMap<NodeId, NodeStatus>>>,
}

impl SharedNodeRegistry {
    pub fn new() -> Self {
        Self {
            inner: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            inner: Arc::new(RwLock::new(HashMap::with_capacity(capacity))),
        }
    }
}

impl Default for SharedNodeRegistry {
    fn default() -> Self {
        Self::new()
    }
}

impl NodeRegistry for SharedNodeRegistry {
    fn insert(&self, status: NodeStatus) {
        let mut registry = self.inner.write().unwrap();

        if let Some(existing) = registry.get(&status.node_id) {
            let winner = resolve_conflict(existing, &status);
            if std::ptr::eq(winner, &status) {
                registry.insert(status.node_id.clone(), status);
            }
        } else {
            registry.insert(status.node_id.clone(), status);
        }
    }

    fn get(&self, node_id: &NodeId) -> Option<NodeStatus> {
        let registry = self.inner.read().unwrap();
        registry.get(node_id).cloned()
    }

    fn remove(&self, node_id: &NodeId) -> Option<NodeStatus> {
        let mut registry = self.inner.write().unwrap();
        registry.remove(node_id)
    }

    fn all_nodes(&self) -> Vec<NodeStatus> {
        let registry = self.inner.read().unwrap();
        registry.values().cloned().collect()
    }

    fn alive_nodes(&self) -> Vec<NodeStatus> {
        let registry = self.inner.read().unwrap();
        registry
            .values()
            .filter(|status| status.state == NodeState::Alive)
            .cloned()
            .collect()
    }

    fn len(&self) -> usize {
        let registry = self.inner.read().unwrap();
        registry.len()
    }

    fn is_empty(&self) -> bool {
        let registry = self.inner.read().unwrap();
        registry.is_empty()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::cluster::incarnation::Incarnation;
    use std::collections::HashMap;

    use std::time::Instant;

    fn create_node_status(id: &str, _incarnation: u64, state: NodeState) -> NodeStatus {
        NodeStatus {
            node_id: NodeId::new(id),
            addr: "127.0.0.1:8000".parse().unwrap(),
            incarnation: Incarnation::initial(),
            state,
            last_seen: Instant::now(),
            tags: HashMap::new(),
        }
    }

    fn create_node_status_with_incarnation(
        id: &str,
        incarnation_value: u64,
        state: NodeState,
    ) -> NodeStatus {
        NodeStatus {
            node_id: NodeId::new(id),
            addr: "127.0.0.1:8000".parse().unwrap(),
            incarnation: Incarnation::from_value(incarnation_value),
            state,
            last_seen: Instant::now(),
            tags: HashMap::new(),
        }
    }

    #[test]
    fn test_insert_and_get() {
        let registry = SharedNodeRegistry::new();
        let status = create_node_status("node-1", 100, NodeState::Alive);

        registry.insert(status.clone());

        let retrieved = registry.get(&NodeId::new("node-1"));
        assert!(retrieved.is_some());
        assert_eq!(retrieved.unwrap().node_id.as_str(), "node-1");
    }

    #[test]
    fn test_remove() {
        let registry = SharedNodeRegistry::new();
        let status = create_node_status("node-1", 100, NodeState::Alive);

        registry.insert(status);
        assert_eq!(registry.len(), 1);

        let removed = registry.remove(&NodeId::new("node-1"));
        assert!(removed.is_some());
        assert_eq!(registry.len(), 0);
    }

    #[test]
    fn test_all_nodes() {
        let registry = SharedNodeRegistry::new();

        registry.insert(create_node_status("node-1", 100, NodeState::Alive));
        registry.insert(create_node_status("node-2", 200, NodeState::Suspect));
        registry.insert(create_node_status("node-3", 300, NodeState::Failed));

        let all = registry.all_nodes();
        assert_eq!(all.len(), 3);
    }

    #[test]
    fn test_alive_nodes() {
        let registry = SharedNodeRegistry::new();

        registry.insert(create_node_status("node-1", 100, NodeState::Alive));
        registry.insert(create_node_status("node-2", 200, NodeState::Suspect));
        registry.insert(create_node_status("node-3", 300, NodeState::Alive));
        registry.insert(create_node_status("node-4", 400, NodeState::Failed));

        let alive = registry.alive_nodes();
        assert_eq!(alive.len(), 2);

        let alive_ids: Vec<_> = alive.iter().map(|s| s.node_id.as_str()).collect();
        assert!(alive_ids.contains(&"node-1"));
        assert!(alive_ids.contains(&"node-3"));
    }

    #[test]
    fn test_incarnation_conflict_resolution() {
        let registry = SharedNodeRegistry::new();

        let mut status_old = create_node_status("node-1", 100, NodeState::Alive);
        status_old.incarnation = Incarnation::initial();

        registry.insert(status_old.clone());

        let mut status_new = create_node_status("node-1", 200, NodeState::Suspect);
        status_new.incarnation = status_old.incarnation;
        status_new.incarnation.increment();

        registry.insert(status_new.clone());

        let retrieved = registry.get(&NodeId::new("node-1")).unwrap();
        assert_eq!(retrieved.state, NodeState::Suspect);
    }

    #[test]
    fn test_concurrent_access() {
        use std::sync::Arc;
        use std::thread;

        let registry = Arc::new(SharedNodeRegistry::with_capacity(100));
        let mut handles = vec![];

        for i in 0..10 {
            let reg = registry.clone();
            let handle = thread::spawn(move || {
                for j in 0..10 {
                    let id = format!("node-{}-{}", i, j);
                    reg.insert(create_node_status(
                        &id,
                        (i * 10 + j) as u64,
                        NodeState::Alive,
                    ));
                }
            });
            handles.push(handle);
        }

        for handle in handles {
            handle.join().unwrap();
        }

        assert_eq!(registry.len(), 100);
    }

    #[test]
    fn test_clone_shares_data() {
        let registry = SharedNodeRegistry::new();
        let cloned = registry.clone();

        registry.insert(create_node_status("node-1", 100, NodeState::Alive));

        assert_eq!(cloned.len(), 1);
        assert!(cloned.get(&NodeId::new("node-1")).is_some());
    }

    #[test]
    fn test_is_empty() {
        let registry = SharedNodeRegistry::new();
        assert!(registry.is_empty());

        registry.insert(create_node_status("node-1", 100, NodeState::Alive));
        assert!(!registry.is_empty());

        registry.remove(&NodeId::new("node-1"));
        assert!(registry.is_empty());
    }

    #[test]
    fn test_state_transitions() {
        let registry = SharedNodeRegistry::new();
        let node_id = NodeId::new("node-1");

        let mut status = create_node_status("node-1", 100, NodeState::Alive);
        registry.insert(status.clone());

        status.state = NodeState::Suspect;
        status.incarnation.increment();
        registry.insert(status.clone());
        assert_eq!(registry.get(&node_id).unwrap().state, NodeState::Suspect);

        status.state = NodeState::Failed;
        status.incarnation.increment();
        registry.insert(status.clone());
        assert_eq!(registry.get(&node_id).unwrap().state, NodeState::Failed);
    }

    #[test]
    fn test_higher_incarnation_wins() {
        let registry = SharedNodeRegistry::new();
        let node_id = NodeId::new("node-1");

        let status_low = create_node_status_with_incarnation("node-1", 0, NodeState::Alive);
        let status_high = create_node_status_with_incarnation("node-1", 2, NodeState::Suspect);

        registry.insert(status_low.clone());
        registry.insert(status_high.clone());

        let result = registry.get(&node_id).unwrap();
        assert_eq!(result.incarnation.value(), 2);
        assert_eq!(result.state, NodeState::Suspect);
    }

    #[test]
    fn test_lower_incarnation_rejected() {
        let registry = SharedNodeRegistry::new();
        let node_id = NodeId::new("node-1");

        let status_high = create_node_status_with_incarnation("node-1", 2, NodeState::Alive);
        let status_low = create_node_status_with_incarnation("node-1", 0, NodeState::Suspect);

        registry.insert(status_high.clone());
        registry.insert(status_low.clone());

        let result = registry.get(&node_id).unwrap();
        assert_eq!(result.incarnation.value(), 2);
        assert_eq!(result.state, NodeState::Alive);
    }

    #[test]
    fn test_same_incarnation_alive_wins() {
        let registry = SharedNodeRegistry::new();
        let node_id = NodeId::new("node-1");

        let status_suspect = create_node_status_with_incarnation("node-1", 0, NodeState::Suspect);
        let status_alive = create_node_status_with_incarnation("node-1", 0, NodeState::Alive);

        registry.insert(status_suspect.clone());
        registry.insert(status_alive.clone());

        let result = registry.get(&node_id).unwrap();
        assert_eq!(result.state, NodeState::Alive);
    }

    #[test]
    fn test_conflict_resolution_suspect_vs_failed() {
        let registry = SharedNodeRegistry::new();
        let node_id = NodeId::new("node-1");

        let status_suspect = create_node_status_with_incarnation("node-1", 0, NodeState::Suspect);
        let status_failed = create_node_status_with_incarnation("node-1", 0, NodeState::Failed);

        registry.insert(status_suspect.clone());
        registry.insert(status_failed.clone());

        let result = registry.get(&node_id).unwrap();
        assert_eq!(result.state, NodeState::Failed);
    }
}