buswatch_tui/data/
monitor.rs

1//! Monitor data parsing and health computation.
2//!
3//! This module transforms raw monitor snapshots into processed data
4//! with health status computed based on configurable thresholds.
5
6use std::fs;
7use std::path::Path;
8use std::time::{Duration, Instant};
9
10use anyhow::Result;
11
12use crate::source::{ModuleMetrics, Snapshot};
13
14/// Thresholds for health status computation.
15///
16/// These thresholds determine when a topic or module is considered
17/// in warning or critical state.
18#[derive(Debug, Clone)]
19pub struct Thresholds {
20    /// Duration after which a pending read/write triggers a warning.
21    pub pending_warning: Duration,
22    /// Duration after which a pending read/write triggers critical status.
23    pub pending_critical: Duration,
24    /// Unread message count that triggers a warning.
25    pub unread_warning: u64,
26    /// Unread message count that triggers critical status.
27    pub unread_critical: u64,
28}
29
30impl Default for Thresholds {
31    fn default() -> Self {
32        Self {
33            pending_warning: Duration::from_secs(1),
34            pending_critical: Duration::from_secs(10),
35            unread_warning: 1000,
36            unread_critical: 5000,
37        }
38    }
39}
40
41/// Health status for a module or topic.
42///
43/// Ordered from healthy to critical for use with `max()`.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
45pub enum HealthStatus {
46    /// No issues detected.
47    Healthy,
48    /// Approaching threshold limits.
49    Warning,
50    /// Exceeded critical thresholds.
51    Critical,
52}
53
54impl HealthStatus {
55    /// Returns a short symbol for display (e.g., "OK", "WARN", "CRIT").
56    pub fn symbol(&self) -> &'static str {
57        match self {
58            HealthStatus::Healthy => "OK",
59            HealthStatus::Warning => "WARN",
60            HealthStatus::Critical => "CRIT",
61        }
62    }
63}
64
65/// Parsed topic read data with computed health status.
66#[derive(Debug, Clone)]
67pub struct TopicRead {
68    /// The topic name being read from.
69    pub topic: String,
70    /// Total number of messages read.
71    pub read: u64,
72    /// How long the oldest message has been pending (if any).
73    pub pending_for: Option<Duration>,
74    /// Number of unread messages waiting in the queue.
75    pub unread: Option<u64>,
76    /// Computed health status based on thresholds.
77    pub status: HealthStatus,
78}
79
80/// Parsed topic write data with computed health status.
81#[derive(Debug, Clone)]
82pub struct TopicWrite {
83    /// The topic name being written to.
84    pub topic: String,
85    /// Total number of messages written.
86    pub written: u64,
87    /// How long the oldest write has been pending (if any).
88    pub pending_for: Option<Duration>,
89    /// Computed health status based on thresholds.
90    pub status: HealthStatus,
91}
92
93/// Parsed module data with aggregated statistics and health.
94#[derive(Debug, Clone)]
95pub struct ModuleData {
96    /// The module name.
97    pub name: String,
98    /// All topics this module reads from.
99    pub reads: Vec<TopicRead>,
100    /// All topics this module writes to.
101    pub writes: Vec<TopicWrite>,
102    /// Sum of all messages read across all topics.
103    pub total_read: u64,
104    /// Sum of all messages written across all topics.
105    pub total_written: u64,
106    /// Overall health (worst status across all topics).
107    pub health: HealthStatus,
108}
109
110/// Complete parsed monitor data ready for display.
111#[derive(Debug, Clone)]
112pub struct MonitorData {
113    /// All modules sorted by health status (critical first).
114    pub modules: Vec<ModuleData>,
115    /// When this snapshot was processed.
116    pub last_updated: Instant,
117}
118
119impl MonitorData {
120    /// Load and parse monitor data from a JSON file.
121    ///
122    /// This is the traditional file-based loading method.
123    pub fn load(path: &Path, thresholds: &Thresholds) -> Result<Self> {
124        let content = fs::read_to_string(path)?;
125        Self::parse(&content, thresholds)
126    }
127
128    /// Parse monitor data from a JSON string.
129    pub fn parse(content: &str, thresholds: &Thresholds) -> Result<Self> {
130        let snapshot: Snapshot = serde_json::from_str(content)?;
131        Ok(Self::from_snapshot(snapshot, thresholds))
132    }
133
134    /// Convert a Snapshot into processed MonitorData.
135    ///
136    /// This is the primary conversion method used by all data sources.
137    pub fn from_snapshot(snapshot: Snapshot, thresholds: &Thresholds) -> Self {
138        let mut modules: Vec<ModuleData> = snapshot
139            .modules
140            .into_iter()
141            .map(|(name, state)| Self::parse_module(name, state, thresholds))
142            .collect();
143
144        // Sort by health status (critical first), then by name
145        modules.sort_by(|a, b| b.health.cmp(&a.health).then_with(|| a.name.cmp(&b.name)));
146
147        Self {
148            modules,
149            last_updated: Instant::now(),
150        }
151    }
152
153    fn parse_module(name: String, state: ModuleMetrics, thresholds: &Thresholds) -> ModuleData {
154        let mut reads: Vec<TopicRead> = state
155            .reads
156            .into_iter()
157            .map(|(topic, r)| {
158                let pending_for = r.pending.map(|p| p.to_duration());
159                let status = Self::compute_read_status(pending_for, r.backlog, thresholds);
160                TopicRead {
161                    topic,
162                    read: r.count,
163                    pending_for,
164                    unread: r.backlog,
165                    status,
166                }
167            })
168            .collect();
169
170        let mut writes: Vec<TopicWrite> = state
171            .writes
172            .into_iter()
173            .map(|(topic, w)| {
174                let pending_for = w.pending.map(|p| p.to_duration());
175                let status = Self::compute_write_status(pending_for, thresholds);
176                TopicWrite {
177                    topic,
178                    written: w.count,
179                    pending_for,
180                    status,
181                }
182            })
183            .collect();
184
185        // Sort topics by status (critical first), then by name
186        reads.sort_by(|a, b| b.status.cmp(&a.status).then_with(|| a.topic.cmp(&b.topic)));
187        writes.sort_by(|a, b| b.status.cmp(&a.status).then_with(|| a.topic.cmp(&b.topic)));
188
189        let total_read = reads.iter().map(|r| r.read).sum();
190        let total_written = writes.iter().map(|w| w.written).sum();
191
192        // Module health is the worst of all its topics
193        let health = reads
194            .iter()
195            .map(|r| r.status)
196            .chain(writes.iter().map(|w| w.status))
197            .max()
198            .unwrap_or(HealthStatus::Healthy);
199
200        ModuleData {
201            name,
202            reads,
203            writes,
204            total_read,
205            total_written,
206            health,
207        }
208    }
209
210    fn compute_read_status(
211        pending_for: Option<Duration>,
212        unread: Option<u64>,
213        thresholds: &Thresholds,
214    ) -> HealthStatus {
215        let pending_status = pending_for.map_or(HealthStatus::Healthy, |d| {
216            if d >= thresholds.pending_critical {
217                HealthStatus::Critical
218            } else if d >= thresholds.pending_warning {
219                HealthStatus::Warning
220            } else {
221                HealthStatus::Healthy
222            }
223        });
224
225        let unread_status = unread.map_or(HealthStatus::Healthy, |u| {
226            if u >= thresholds.unread_critical {
227                HealthStatus::Critical
228            } else if u >= thresholds.unread_warning {
229                HealthStatus::Warning
230            } else {
231                HealthStatus::Healthy
232            }
233        });
234
235        pending_status.max(unread_status)
236    }
237
238    fn compute_write_status(
239        pending_for: Option<Duration>,
240        thresholds: &Thresholds,
241    ) -> HealthStatus {
242        pending_for.map_or(HealthStatus::Healthy, |d| {
243            if d >= thresholds.pending_critical {
244                HealthStatus::Critical
245            } else if d >= thresholds.pending_warning {
246                HealthStatus::Warning
247            } else {
248                HealthStatus::Healthy
249            }
250        })
251    }
252
253    /// Get all unhealthy topics across all modules.
254    pub fn unhealthy_topics(&self) -> Vec<(&ModuleData, UnhealthyTopic)> {
255        let mut result = Vec::new();
256
257        for module in &self.modules {
258            for read in &module.reads {
259                if read.status != HealthStatus::Healthy {
260                    result.push((module, UnhealthyTopic::Read(read.clone())));
261                }
262            }
263            for write in &module.writes {
264                if write.status != HealthStatus::Healthy {
265                    result.push((module, UnhealthyTopic::Write(write.clone())));
266                }
267            }
268        }
269
270        // Sort by status (critical first)
271        result.sort_by(|a, b| b.1.status().cmp(&a.1.status()));
272        result
273    }
274}
275
276/// An unhealthy topic (either read or write).
277///
278/// Used by the Bottleneck view to display topics that need attention.
279#[derive(Debug, Clone)]
280pub enum UnhealthyTopic {
281    /// A read subscription with warning or critical status.
282    Read(TopicRead),
283    /// A write publication with warning or critical status.
284    Write(TopicWrite),
285}
286
287impl UnhealthyTopic {
288    /// Returns the health status of this topic.
289    pub fn status(&self) -> HealthStatus {
290        match self {
291            UnhealthyTopic::Read(r) => r.status,
292            UnhealthyTopic::Write(w) => w.status,
293        }
294    }
295
296    /// Returns the topic name.
297    pub fn topic(&self) -> &str {
298        match self {
299            UnhealthyTopic::Read(r) => &r.topic,
300            UnhealthyTopic::Write(w) => &w.topic,
301        }
302    }
303
304    /// Returns how long the topic has been pending.
305    pub fn pending_for(&self) -> Option<Duration> {
306        match self {
307            UnhealthyTopic::Read(r) => r.pending_for,
308            UnhealthyTopic::Write(w) => w.pending_for,
309        }
310    }
311}