rpcnet 0.1.0

RPC library based on QUIC+TLS encryption
Documentation
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

#[derive(Debug, Clone)]
pub struct PartitionConfig {
    pub threshold: f64,
    pub grace_period: Duration,
    pub read_only_on_partition: bool,
}

impl Default for PartitionConfig {
    fn default() -> Self {
        Self {
            threshold: 0.5,
            grace_period: Duration::from_secs(30),
            read_only_on_partition: false,
        }
    }
}

impl PartitionConfig {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn with_threshold(mut self, threshold: f64) -> Self {
        self.threshold = threshold;
        self
    }

    pub fn with_grace_period(mut self, period: Duration) -> Self {
        self.grace_period = period;
        self
    }

    pub fn with_read_only_on_partition(mut self, read_only: bool) -> Self {
        self.read_only_on_partition = read_only;
        self
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PartitionStatus {
    Unknown,
    Healthy {
        current_size: usize,
        expected_size: usize,
    },
    Suspect {
        current_size: usize,
        expected_size: usize,
        grace_remaining: Duration,
    },
    Partitioned {
        current_size: usize,
        expected_size: usize,
        minority: bool,
    },
}

pub struct PartitionDetector {
    expected_size: Arc<AtomicUsize>,
    config: PartitionConfig,
    detection_start: Arc<RwLock<Option<Instant>>>,
}

impl Clone for PartitionDetector {
    fn clone(&self) -> Self {
        Self {
            expected_size: self.expected_size.clone(),
            config: self.config.clone(),
            detection_start: self.detection_start.clone(),
        }
    }
}

impl PartitionDetector {
    pub fn new(config: PartitionConfig) -> Self {
        Self {
            expected_size: Arc::new(AtomicUsize::new(0)),
            config,
            detection_start: Arc::new(RwLock::new(None)),
        }
    }

    pub async fn check_partition(&self, current_size: usize) -> PartitionStatus {
        let expected = self.expected_size.load(Ordering::SeqCst);

        if expected == 0 {
            return PartitionStatus::Unknown;
        }

        let threshold_size = (expected as f64 * self.config.threshold).ceil() as usize;

        if current_size < threshold_size {
            let now = Instant::now();
            let start_time = {
                let mut detection_start = self.detection_start.write().unwrap();
                *detection_start.get_or_insert(now)
            };

            let elapsed = now.duration_since(start_time);

            if elapsed >= self.config.grace_period {
                let minority = current_size < (expected / 2);
                PartitionStatus::Partitioned {
                    current_size,
                    expected_size: expected,
                    minority,
                }
            } else {
                let grace_remaining = self.config.grace_period - elapsed;
                PartitionStatus::Suspect {
                    current_size,
                    expected_size: expected,
                    grace_remaining,
                }
            }
        } else {
            let mut detection_start = self.detection_start.write().unwrap();
            *detection_start = None;

            PartitionStatus::Healthy {
                current_size,
                expected_size: expected,
            }
        }
    }

    pub fn update_expected_size(&self, size: usize) {
        self.expected_size.store(size, Ordering::SeqCst);
    }

    pub fn config(&self) -> &PartitionConfig {
        &self.config
    }

    pub fn expected_size(&self) -> usize {
        self.expected_size.load(Ordering::SeqCst)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::time::sleep;

    #[tokio::test]
    async fn test_healthy_cluster() {
        let detector = PartitionDetector::new(PartitionConfig::default());
        detector.update_expected_size(10);

        let status = detector.check_partition(10).await;

        assert_eq!(
            status,
            PartitionStatus::Healthy {
                current_size: 10,
                expected_size: 10
            }
        );
    }

    #[tokio::test]
    async fn test_suspect_below_threshold() {
        let config = PartitionConfig::default().with_grace_period(Duration::from_secs(1));
        let detector = PartitionDetector::new(config);
        detector.update_expected_size(10);

        let status = detector.check_partition(4).await;

        match status {
            PartitionStatus::Suspect {
                current_size,
                expected_size,
                ..
            } => {
                assert_eq!(current_size, 4);
                assert_eq!(expected_size, 10);
            }
            _ => panic!("Expected Suspect status"),
        }
    }

    #[tokio::test]
    async fn test_partition_after_grace_period() {
        let config = PartitionConfig::default().with_grace_period(Duration::from_millis(100));
        let detector = PartitionDetector::new(config);
        detector.update_expected_size(10);

        let status = detector.check_partition(4).await;
        assert!(matches!(status, PartitionStatus::Suspect { .. }));

        sleep(Duration::from_millis(150)).await;

        let status = detector.check_partition(4).await;
        match status {
            PartitionStatus::Partitioned {
                current_size,
                expected_size,
                minority,
            } => {
                assert_eq!(current_size, 4);
                assert_eq!(expected_size, 10);
                assert!(minority);
            }
            _ => panic!("Expected Partitioned status"),
        }
    }

    #[tokio::test]
    async fn test_minority_detection() {
        let config = PartitionConfig::default().with_grace_period(Duration::from_millis(100));
        let detector = PartitionDetector::new(config);
        detector.update_expected_size(10);

        detector.check_partition(3).await;
        sleep(Duration::from_millis(150)).await;

        let status = detector.check_partition(3).await;
        match status {
            PartitionStatus::Partitioned {
                minority,
                current_size,
                ..
            } => {
                assert_eq!(current_size, 3);
                assert!(minority);
            }
            _ => panic!("Expected Partitioned with minority=true"),
        }
    }

    #[tokio::test]
    async fn test_recovery_from_suspect() {
        let config = PartitionConfig::default().with_grace_period(Duration::from_secs(1));
        let detector = PartitionDetector::new(config);
        detector.update_expected_size(10);

        let status = detector.check_partition(4).await;
        assert!(matches!(status, PartitionStatus::Suspect { .. }));

        let status = detector.check_partition(10).await;
        assert_eq!(
            status,
            PartitionStatus::Healthy {
                current_size: 10,
                expected_size: 10
            }
        );
    }

    #[tokio::test]
    async fn test_threshold_boundary() {
        let detector = PartitionDetector::new(PartitionConfig::default());
        detector.update_expected_size(10);

        let status = detector.check_partition(5).await;
        assert_eq!(
            status,
            PartitionStatus::Healthy {
                current_size: 5,
                expected_size: 10
            }
        );

        let status = detector.check_partition(4).await;
        assert!(matches!(status, PartitionStatus::Suspect { .. }));
    }

    #[tokio::test]
    async fn test_unknown_status() {
        let detector = PartitionDetector::new(PartitionConfig::default());

        let status = detector.check_partition(5).await;
        assert_eq!(status, PartitionStatus::Unknown);
    }

    #[tokio::test]
    async fn test_grace_remaining_calculation() {
        let config = PartitionConfig::default().with_grace_period(Duration::from_secs(10));
        let detector = PartitionDetector::new(config);
        detector.update_expected_size(10);

        let status = detector.check_partition(4).await;

        match status {
            PartitionStatus::Suspect {
                grace_remaining, ..
            } => {
                assert!(grace_remaining <= Duration::from_secs(10));
                assert!(grace_remaining > Duration::from_secs(9));
            }
            _ => panic!("Expected Suspect status"),
        }
    }

    #[tokio::test]
    async fn test_config_builder() {
        let config = PartitionConfig::new()
            .with_threshold(0.6)
            .with_grace_period(Duration::from_secs(15))
            .with_read_only_on_partition(true);

        assert_eq!(config.threshold, 0.6);
        assert_eq!(config.grace_period, Duration::from_secs(15));
        assert!(config.read_only_on_partition);
    }

    #[tokio::test]
    async fn test_concurrent_checks() {
        use std::sync::Arc;
        use tokio::task;

        let config = PartitionConfig::default().with_grace_period(Duration::from_millis(100));
        let detector = Arc::new(PartitionDetector::new(config));
        detector.update_expected_size(10);

        let mut handles = vec![];
        for _ in 0..10 {
            let d = detector.clone();
            let handle = task::spawn(async move { d.check_partition(4).await });
            handles.push(handle);
        }

        for handle in handles {
            let status = handle.await.unwrap();
            assert!(matches!(status, PartitionStatus::Suspect { .. }));
        }
    }
}