Skip to main content

cbtop/federated_metrics/
types.rs

1//! Core types for federated metrics aggregation.
2
3use std::collections::HashMap;
4
5/// Result type for federated operations
6pub type FederatedResult<T> = Result<T, FederatedError>;
7
8/// Errors in federated operations
9#[derive(Debug, Clone, PartialEq)]
10pub enum FederatedError {
11    /// Host not found in federation
12    HostNotFound { host_id: String },
13    /// Network partition detected
14    PartitionDetected { affected_hosts: Vec<String> },
15    /// Merge conflict that couldn't be resolved
16    MergeConflict { reason: String },
17    /// Clock drift too large
18    ClockDriftExceeded { drift_ms: i64, max_ms: i64 },
19    /// Memory limit exceeded
20    MemoryLimitExceeded {
21        used_bytes: usize,
22        limit_bytes: usize,
23    },
24    /// Invalid configuration
25    InvalidConfig { reason: String },
26}
27
28impl std::fmt::Display for FederatedError {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        match self {
31            Self::HostNotFound { host_id } => write!(f, "Host not found: {}", host_id),
32            Self::PartitionDetected { affected_hosts } => {
33                write!(f, "Partition detected affecting: {:?}", affected_hosts)
34            }
35            Self::MergeConflict { reason } => write!(f, "Merge conflict: {}", reason),
36            Self::ClockDriftExceeded { drift_ms, max_ms } => {
37                write!(f, "Clock drift {}ms exceeds max {}ms", drift_ms, max_ms)
38            }
39            Self::MemoryLimitExceeded {
40                used_bytes,
41                limit_bytes,
42            } => {
43                write!(f, "Memory {} exceeds limit {}", used_bytes, limit_bytes)
44            }
45            Self::InvalidConfig { reason } => write!(f, "Invalid config: {}", reason),
46        }
47    }
48}
49
50impl std::error::Error for FederatedError {}
51
52/// Unique identifier for a metric sample
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
54pub struct SampleId {
55    /// Host that generated the sample
56    pub host_id: String,
57    /// Logical timestamp (Lamport clock)
58    pub logical_time: u64,
59    /// Unique sequence number within host
60    pub sequence: u64,
61}
62
63impl SampleId {
64    /// Create a new sample ID
65    pub fn new(host_id: impl Into<String>, logical_time: u64, sequence: u64) -> Self {
66        Self {
67            host_id: host_id.into(),
68            logical_time,
69            sequence,
70        }
71    }
72}
73
74/// A single metric sample with vector clock
75#[derive(Debug, Clone)]
76pub struct MetricSample {
77    /// Unique identifier
78    pub id: SampleId,
79    /// Metric name
80    pub metric_name: String,
81    /// Metric value
82    pub value: f64,
83    /// Wall clock timestamp (nanoseconds)
84    pub timestamp_ns: u64,
85    /// Vector clock for causal ordering
86    pub vector_clock: HashMap<String, u64>,
87}
88
89impl MetricSample {
90    /// Create a new metric sample
91    pub fn new(
92        host_id: impl Into<String>,
93        logical_time: u64,
94        sequence: u64,
95        metric_name: impl Into<String>,
96        value: f64,
97    ) -> Self {
98        let host = host_id.into();
99        let mut vector_clock = HashMap::new();
100        vector_clock.insert(host.clone(), logical_time);
101
102        Self {
103            id: SampleId::new(host, logical_time, sequence),
104            metric_name: metric_name.into(),
105            value,
106            timestamp_ns: std::time::SystemTime::now()
107                .duration_since(std::time::UNIX_EPOCH)
108                .map(|d| d.as_nanos() as u64)
109                .unwrap_or(0),
110            vector_clock,
111        }
112    }
113}