rpcnet 0.1.0

RPC library based on QUIC+TLS encryption
Documentation
use crate::cluster::gossip::{NodeId, NodeState};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct Incarnation(u64);

#[derive(Debug, Clone)]
pub struct NodeStatus {
    pub node_id: NodeId,
    pub addr: SocketAddr,
    pub incarnation: Incarnation,
    pub state: NodeState,
    pub last_seen: Instant,
    pub tags: HashMap<String, String>,
}

impl Incarnation {
    pub fn initial() -> Self {
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards")
            .as_millis() as u64;
        Self(timestamp)
    }

    pub fn from_value(value: u64) -> Self {
        Self(value)
    }

    pub fn increment(&mut self) {
        self.0 = self.0.wrapping_add(1);
    }

    pub fn compare(&self, other: &Incarnation) -> Ordering {
        const HALF_MAX: u64 = u64::MAX / 2;

        let diff = self.0.wrapping_sub(other.0);

        if diff == 0 {
            Ordering::Equal
        } else if diff < HALF_MAX {
            Ordering::Greater
        } else {
            Ordering::Less
        }
    }

    pub fn value(&self) -> u64 {
        self.0
    }
}

pub fn resolve_conflict<'a>(a: &'a NodeStatus, b: &'a NodeStatus) -> &'a NodeStatus {
    match a.incarnation.compare(&b.incarnation) {
        Ordering::Greater => a,
        Ordering::Less => b,
        Ordering::Equal => {
            if a.node_id > b.node_id {
                a
            } else {
                b
            }
        }
    }
}

impl PartialOrd for Incarnation {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(std::cmp::Ord::cmp(self, other))
    }
}

impl Ord for Incarnation {
    fn cmp(&self, other: &Self) -> Ordering {
        self.compare(other)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_initial_incarnation() {
        let inc1 = Incarnation::initial();
        let inc2 = Incarnation::initial();

        assert!(inc2.0 >= inc1.0);
    }

    #[test]
    fn test_increment() {
        let mut inc = Incarnation(100);
        inc.increment();
        assert_eq!(inc.0, 101);

        let mut inc_max = Incarnation(u64::MAX);
        inc_max.increment();
        assert_eq!(inc_max.0, 0);
    }

    #[test]
    fn test_wraparound_comparison() {
        let near_max = Incarnation(u64::MAX - 10);
        let near_zero = Incarnation(10);

        assert_eq!(near_zero.compare(&near_max), Ordering::Greater);
        assert_eq!(near_max.compare(&near_zero), Ordering::Less);
    }

    #[test]
    fn test_wraparound_edge_case() {
        let max_val = Incarnation(u64::MAX);
        let zero_val = Incarnation(0);
        let one_val = Incarnation(1);

        assert_eq!(zero_val.compare(&max_val), Ordering::Greater);
        assert_eq!(one_val.compare(&max_val), Ordering::Greater);
        assert_eq!(max_val.compare(&zero_val), Ordering::Less);
    }

    #[test]
    fn test_equal_incarnations() {
        let inc1 = Incarnation(500);
        let inc2 = Incarnation(500);

        assert_eq!(inc1.compare(&inc2), Ordering::Equal);
    }

    #[test]
    fn test_normal_comparison() {
        let inc1 = Incarnation(100);
        let inc2 = Incarnation(200);

        assert_eq!(inc1.compare(&inc2), Ordering::Less);
        assert_eq!(inc2.compare(&inc1), Ordering::Greater);
    }

    #[test]
    fn test_half_max_boundary() {
        let base = Incarnation(1000);
        let just_before_half = Incarnation(1000 + (u64::MAX / 2) - 1);
        let just_after_half = Incarnation(1000 + (u64::MAX / 2) + 1);

        assert_eq!(just_before_half.compare(&base), Ordering::Greater);
        assert_eq!(just_after_half.compare(&base), Ordering::Less);
    }

    #[test]
    fn test_serialization() {
        let inc = Incarnation(12345);
        let serialized = bincode::serialize(&inc).unwrap();
        let deserialized: Incarnation = bincode::deserialize(&serialized).unwrap();

        assert_eq!(inc, deserialized);
    }

    #[test]
    fn test_ord_trait() {
        let inc1 = Incarnation(100);
        let inc2 = Incarnation(200);

        assert!(inc2 > inc1);
        assert!(inc1 < inc2);
    }

    #[test]
    fn test_wraparound_with_ord() {
        let near_max = Incarnation(u64::MAX - 10);
        let near_zero = Incarnation(10);

        assert!(near_zero > near_max);
        assert!(near_max < near_zero);
    }

    #[test]
    fn test_resolve_conflict_higher_incarnation() {
        let status_a = NodeStatus {
            node_id: NodeId::new("node-a"),
            addr: "127.0.0.1:8001".parse().unwrap(),
            incarnation: Incarnation(100),
            state: NodeState::Alive,
            last_seen: Instant::now(),
            tags: HashMap::new(),
        };

        let status_b = NodeStatus {
            node_id: NodeId::new("node-b"),
            addr: "127.0.0.1:8002".parse().unwrap(),
            incarnation: Incarnation(50),
            state: NodeState::Alive,
            last_seen: Instant::now(),
            tags: HashMap::new(),
        };

        let winner = super::resolve_conflict(&status_a, &status_b);
        assert_eq!(winner.node_id.as_str(), "node-a");
    }

    #[test]
    fn test_resolve_conflict_equal_incarnation_uses_node_id() {
        let status_a = NodeStatus {
            node_id: NodeId::new("node-b"),
            addr: "127.0.0.1:8001".parse().unwrap(),
            incarnation: Incarnation(100),
            state: NodeState::Alive,
            last_seen: Instant::now(),
            tags: HashMap::new(),
        };

        let status_b = NodeStatus {
            node_id: NodeId::new("node-a"),
            addr: "127.0.0.1:8002".parse().unwrap(),
            incarnation: Incarnation(100),
            state: NodeState::Alive,
            last_seen: Instant::now(),
            tags: HashMap::new(),
        };

        let winner = super::resolve_conflict(&status_a, &status_b);
        assert_eq!(winner.node_id.as_str(), "node-b");
    }

    #[test]
    fn test_resolve_conflict_deterministic() {
        let status_a = NodeStatus {
            node_id: NodeId::new("node-a"),
            addr: "127.0.0.1:8001".parse().unwrap(),
            incarnation: Incarnation(100),
            state: NodeState::Alive,
            last_seen: Instant::now(),
            tags: HashMap::new(),
        };

        let status_b = status_a.clone();

        let winner1 = super::resolve_conflict(&status_a, &status_b);
        let winner2 = super::resolve_conflict(&status_b, &status_a);

        assert_eq!(winner1.node_id, winner2.node_id);
    }
}