rpcnet 0.1.0

RPC library based on QUIC+TLS encryption
Documentation
use crate::cluster::incarnation::Incarnation;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use thiserror::Error;

pub const MAX_UPDATES_PER_MESSAGE: usize = 20;
pub const MAX_MESSAGE_SIZE: usize = 4096;

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Priority {
    Critical = 3,
    High = 2,
    Medium = 1,
    Low = 0,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct NodeId(pub String);

impl NodeId {
    pub fn new(id: impl Into<String>) -> Self {
        Self(id.into())
    }

    pub fn as_str(&self) -> &str {
        &self.0
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeUpdate {
    pub node_id: NodeId,
    pub addr: SocketAddr,
    pub incarnation: Incarnation,
    pub state: NodeState,
    pub tags: HashMap<String, String>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeState {
    Alive,
    Suspect,
    Failed,
    Left,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GossipMessage {
    pub updates: Vec<NodeUpdate>,
}

impl GossipMessage {
    pub fn new(updates: Vec<NodeUpdate>) -> Self {
        Self { updates }
    }

    pub fn check_size(&self) -> Result<(), GossipError> {
        if self.updates.len() > MAX_UPDATES_PER_MESSAGE {
            return Err(GossipError::TooManyUpdates {
                count: self.updates.len(),
                max: MAX_UPDATES_PER_MESSAGE,
            });
        }

        let serialized =
            bincode::serialize(self).map_err(|e| GossipError::SerializationError { source: e })?;

        if serialized.len() > MAX_MESSAGE_SIZE {
            return Err(GossipError::MessageTooLarge {
                size: serialized.len(),
                max: MAX_MESSAGE_SIZE,
            });
        }

        Ok(())
    }
}

#[derive(Debug, Error)]
pub enum GossipError {
    #[error("Too many updates in message: {count} (max: {max})")]
    TooManyUpdates { count: usize, max: usize },

    #[error("Message size {size} exceeds maximum {max}")]
    MessageTooLarge { size: usize, max: usize },

    #[error("Serialization error: {source}")]
    SerializationError { source: Box<bincode::ErrorKind> },
}

#[cfg(test)]
mod tests {
    use super::*;

    fn create_test_update(id: &str, _incarnation_val: u64) -> NodeUpdate {
        NodeUpdate {
            node_id: NodeId::new(id),
            addr: "127.0.0.1:8000".parse().unwrap(),
            incarnation: Incarnation::initial(),
            state: NodeState::Alive,
            tags: HashMap::new(),
        }
    }

    #[test]
    fn test_message_with_max_updates() {
        let updates: Vec<NodeUpdate> = (0..MAX_UPDATES_PER_MESSAGE)
            .map(|i| create_test_update(&format!("node-{}", i), i as u64))
            .collect();

        let msg = GossipMessage::new(updates);
        assert!(msg.check_size().is_ok());
    }

    #[test]
    fn test_message_exceeds_max_updates() {
        let updates: Vec<NodeUpdate> = (0..MAX_UPDATES_PER_MESSAGE + 1)
            .map(|i| create_test_update(&format!("node-{}", i), i as u64))
            .collect();

        let msg = GossipMessage::new(updates);
        let result = msg.check_size();

        assert!(result.is_err());
        match result {
            Err(GossipError::TooManyUpdates { count, max }) => {
                assert_eq!(count, MAX_UPDATES_PER_MESSAGE + 1);
                assert_eq!(max, MAX_UPDATES_PER_MESSAGE);
            }
            _ => panic!("Expected TooManyUpdates error"),
        }
    }

    #[test]
    fn test_message_size_check() {
        let mut tags = HashMap::new();
        for i in 0..100 {
            tags.insert(format!("key-{}", i), format!("value-{}", i));
        }

        let mut update = create_test_update("large-node", 1);
        update.tags = tags;

        let updates = vec![update; 20];
        let msg = GossipMessage::new(updates);

        let serialized = bincode::serialize(&msg).unwrap();
        if serialized.len() > MAX_MESSAGE_SIZE {
            assert!(msg.check_size().is_err());
        } else {
            assert!(msg.check_size().is_ok());
        }
    }

    #[test]
    fn test_empty_message() {
        let msg = GossipMessage::new(vec![]);
        assert!(msg.check_size().is_ok());
    }

    #[test]
    fn test_priority_ordering() {
        assert!(Priority::Critical > Priority::High);
        assert!(Priority::High > Priority::Medium);
        assert!(Priority::Medium > Priority::Low);
    }

    #[test]
    fn test_node_id() {
        let id1 = NodeId::new("node-1");
        let id2 = NodeId::new("node-1".to_string());

        assert_eq!(id1.as_str(), "node-1");
        assert_eq!(id1, id2);
    }
}