buswatch_types/
snapshot.rs

1//! Snapshot - a point-in-time view of message bus state.
2
3use alloc::collections::BTreeMap;
4use alloc::string::String;
5
6use crate::{ModuleMetrics, ModuleMetricsBuilder, SchemaVersion};
7
8/// A point-in-time snapshot of message bus metrics.
9///
10/// This is the top-level type that captures the state of all modules
11/// in a message bus system. Snapshots are typically emitted periodically
12/// (e.g., every second) and consumed by monitoring tools like buswatch.
13///
14/// # Example
15///
16/// ```rust
17/// use buswatch_types::Snapshot;
18/// use std::time::Duration;
19///
20/// let snapshot = Snapshot::builder()
21///     .module("order-service", |m| {
22///         m.read("orders.new", |r| r.count(500).backlog(10))
23///          .write("orders.validated", |w| w.count(495))
24///     })
25///     .build();
26///
27/// // Serialize with serde (requires "serde" feature)
28/// // let json = serde_json::to_string(&snapshot)?;
29/// ```
30#[derive(Debug, Clone, PartialEq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
33pub struct Snapshot {
34    /// Schema version for forward compatibility.
35    #[cfg_attr(feature = "minicbor", n(0))]
36    pub version: SchemaVersion,
37
38    /// Unix timestamp in milliseconds when this snapshot was taken.
39    #[cfg_attr(feature = "minicbor", n(1))]
40    pub timestamp_ms: u64,
41
42    /// Metrics for each module, keyed by module name.
43    #[cfg_attr(feature = "minicbor", n(2))]
44    pub modules: BTreeMap<String, ModuleMetrics>,
45}
46
47impl Snapshot {
48    /// Create a new snapshot with the current timestamp.
49    #[cfg(feature = "std")]
50    pub fn new() -> Self {
51        Self {
52            version: SchemaVersion::current(),
53            timestamp_ms: current_timestamp_ms(),
54            modules: BTreeMap::new(),
55        }
56    }
57
58    /// Create a new snapshot with a specific timestamp.
59    pub fn with_timestamp(timestamp_ms: u64) -> Self {
60        Self {
61            version: SchemaVersion::current(),
62            timestamp_ms,
63            modules: BTreeMap::new(),
64        }
65    }
66
67    /// Create a builder for constructing snapshots.
68    pub fn builder() -> SnapshotBuilder {
69        SnapshotBuilder::new()
70    }
71
72    /// Check if the snapshot is empty (no modules).
73    pub fn is_empty(&self) -> bool {
74        self.modules.is_empty()
75    }
76
77    /// Number of modules in the snapshot.
78    pub fn len(&self) -> usize {
79        self.modules.len()
80    }
81
82    /// Get metrics for a specific module.
83    pub fn get(&self, module: &str) -> Option<&ModuleMetrics> {
84        self.modules.get(module)
85    }
86
87    /// Iterate over all modules.
88    pub fn iter(&self) -> impl Iterator<Item = (&String, &ModuleMetrics)> {
89        self.modules.iter()
90    }
91
92    /// Total messages read across all modules.
93    pub fn total_reads(&self) -> u64 {
94        self.modules.values().map(|m| m.total_reads()).sum()
95    }
96
97    /// Total messages written across all modules.
98    pub fn total_writes(&self) -> u64 {
99        self.modules.values().map(|m| m.total_writes()).sum()
100    }
101}
102
103#[cfg(feature = "std")]
104impl Default for Snapshot {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110/// Builder for constructing `Snapshot` instances.
111#[derive(Debug)]
112pub struct SnapshotBuilder {
113    timestamp_ms: Option<u64>,
114    modules: BTreeMap<String, ModuleMetrics>,
115}
116
117impl SnapshotBuilder {
118    /// Create a new builder.
119    pub fn new() -> Self {
120        Self {
121            timestamp_ms: None,
122            modules: BTreeMap::new(),
123        }
124    }
125
126    /// Set a specific timestamp (milliseconds since Unix epoch).
127    pub fn timestamp_ms(mut self, ts: u64) -> Self {
128        self.timestamp_ms = Some(ts);
129        self
130    }
131
132    /// Add a module with metrics built using a closure.
133    pub fn module<F>(mut self, name: impl Into<String>, f: F) -> Self
134    where
135        F: FnOnce(ModuleMetricsBuilder) -> ModuleMetricsBuilder,
136    {
137        let metrics = f(ModuleMetricsBuilder::new()).build();
138        self.modules.insert(name.into(), metrics);
139        self
140    }
141
142    /// Add a module with pre-built metrics.
143    pub fn module_metrics(mut self, name: impl Into<String>, metrics: ModuleMetrics) -> Self {
144        self.modules.insert(name.into(), metrics);
145        self
146    }
147
148    /// Build the snapshot.
149    #[cfg(feature = "std")]
150    pub fn build(self) -> Snapshot {
151        Snapshot {
152            version: SchemaVersion::current(),
153            timestamp_ms: self.timestamp_ms.unwrap_or_else(current_timestamp_ms),
154            modules: self.modules,
155        }
156    }
157
158    /// Build the snapshot with a specific timestamp (for no_std).
159    #[cfg(not(feature = "std"))]
160    pub fn build(self) -> Snapshot {
161        Snapshot {
162            version: SchemaVersion::current(),
163            timestamp_ms: self.timestamp_ms.unwrap_or(0),
164            modules: self.modules,
165        }
166    }
167}
168
169impl Default for SnapshotBuilder {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175/// Get current timestamp in milliseconds since Unix epoch.
176#[cfg(feature = "std")]
177fn current_timestamp_ms() -> u64 {
178    use std::time::{SystemTime, UNIX_EPOCH};
179    SystemTime::now()
180        .duration_since(UNIX_EPOCH)
181        .map(|d| d.as_millis() as u64)
182        .unwrap_or(0)
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn test_snapshot_builder() {
191        let snapshot = Snapshot::builder()
192            .timestamp_ms(1703160000000)
193            .module("producer", |m| {
194                m.write("events", |w| w.count(1000).rate(100.0))
195            })
196            .module("consumer", |m| {
197                m.read("events", |r| r.count(950).backlog(50))
198            })
199            .build();
200
201        assert_eq!(snapshot.len(), 2);
202        assert_eq!(snapshot.timestamp_ms, 1703160000000);
203        assert_eq!(snapshot.total_writes(), 1000);
204        assert_eq!(snapshot.total_reads(), 950);
205    }
206
207    #[test]
208    fn test_snapshot_version() {
209        let snapshot = Snapshot::builder().build();
210        assert!(snapshot.version.is_compatible());
211    }
212
213    #[cfg(feature = "serde")]
214    #[test]
215    fn test_serde_roundtrip() {
216        let snapshot = Snapshot::builder()
217            .timestamp_ms(1703160000000)
218            .module("test", |m| m.read("topic", |r| r.count(42).backlog(5)))
219            .build();
220
221        let json = serde_json::to_string(&snapshot).unwrap();
222        let parsed: Snapshot = serde_json::from_str(&json).unwrap();
223
224        assert_eq!(snapshot, parsed);
225    }
226
227    #[cfg(feature = "minicbor")]
228    #[test]
229    fn test_minicbor_roundtrip() {
230        let snapshot = Snapshot::builder()
231            .timestamp_ms(1703160000000)
232            .module("test", |m| m.read("topic", |r| r.count(42).backlog(5)))
233            .build();
234
235        let bytes = minicbor::to_vec(&snapshot).unwrap();
236        let parsed: Snapshot = minicbor::decode(&bytes).unwrap();
237
238        assert_eq!(snapshot, parsed);
239    }
240}