1use std::fs;
7use std::path::Path;
8use std::time::{Duration, Instant};
9
10use anyhow::Result;
11
12use crate::source::{ModuleMetrics, Snapshot};
13
14#[derive(Debug, Clone)]
19pub struct Thresholds {
20 pub pending_warning: Duration,
22 pub pending_critical: Duration,
24 pub unread_warning: u64,
26 pub unread_critical: u64,
28}
29
30impl Default for Thresholds {
31 fn default() -> Self {
32 Self {
33 pending_warning: Duration::from_secs(1),
34 pending_critical: Duration::from_secs(10),
35 unread_warning: 1000,
36 unread_critical: 5000,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
45pub enum HealthStatus {
46 Healthy,
48 Warning,
50 Critical,
52}
53
54impl HealthStatus {
55 pub fn symbol(&self) -> &'static str {
57 match self {
58 HealthStatus::Healthy => "OK",
59 HealthStatus::Warning => "WARN",
60 HealthStatus::Critical => "CRIT",
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct TopicRead {
68 pub topic: String,
70 pub read: u64,
72 pub pending_for: Option<Duration>,
74 pub unread: Option<u64>,
76 pub status: HealthStatus,
78}
79
80#[derive(Debug, Clone)]
82pub struct TopicWrite {
83 pub topic: String,
85 pub written: u64,
87 pub pending_for: Option<Duration>,
89 pub status: HealthStatus,
91}
92
93#[derive(Debug, Clone)]
95pub struct ModuleData {
96 pub name: String,
98 pub reads: Vec<TopicRead>,
100 pub writes: Vec<TopicWrite>,
102 pub total_read: u64,
104 pub total_written: u64,
106 pub health: HealthStatus,
108}
109
110#[derive(Debug, Clone)]
112pub struct MonitorData {
113 pub modules: Vec<ModuleData>,
115 pub last_updated: Instant,
117}
118
119impl MonitorData {
120 pub fn load(path: &Path, thresholds: &Thresholds) -> Result<Self> {
124 let content = fs::read_to_string(path)?;
125 Self::parse(&content, thresholds)
126 }
127
128 pub fn parse(content: &str, thresholds: &Thresholds) -> Result<Self> {
130 let snapshot: Snapshot = serde_json::from_str(content)?;
131 Ok(Self::from_snapshot(snapshot, thresholds))
132 }
133
134 pub fn from_snapshot(snapshot: Snapshot, thresholds: &Thresholds) -> Self {
138 let mut modules: Vec<ModuleData> = snapshot
139 .modules
140 .into_iter()
141 .map(|(name, state)| Self::parse_module(name, state, thresholds))
142 .collect();
143
144 modules.sort_by(|a, b| b.health.cmp(&a.health).then_with(|| a.name.cmp(&b.name)));
146
147 Self {
148 modules,
149 last_updated: Instant::now(),
150 }
151 }
152
153 fn parse_module(name: String, state: ModuleMetrics, thresholds: &Thresholds) -> ModuleData {
154 let mut reads: Vec<TopicRead> = state
155 .reads
156 .into_iter()
157 .map(|(topic, r)| {
158 let pending_for = r.pending.map(|p| p.to_duration());
159 let status = Self::compute_read_status(pending_for, r.backlog, thresholds);
160 TopicRead {
161 topic,
162 read: r.count,
163 pending_for,
164 unread: r.backlog,
165 status,
166 }
167 })
168 .collect();
169
170 let mut writes: Vec<TopicWrite> = state
171 .writes
172 .into_iter()
173 .map(|(topic, w)| {
174 let pending_for = w.pending.map(|p| p.to_duration());
175 let status = Self::compute_write_status(pending_for, thresholds);
176 TopicWrite {
177 topic,
178 written: w.count,
179 pending_for,
180 status,
181 }
182 })
183 .collect();
184
185 reads.sort_by(|a, b| b.status.cmp(&a.status).then_with(|| a.topic.cmp(&b.topic)));
187 writes.sort_by(|a, b| b.status.cmp(&a.status).then_with(|| a.topic.cmp(&b.topic)));
188
189 let total_read = reads.iter().map(|r| r.read).sum();
190 let total_written = writes.iter().map(|w| w.written).sum();
191
192 let health = reads
194 .iter()
195 .map(|r| r.status)
196 .chain(writes.iter().map(|w| w.status))
197 .max()
198 .unwrap_or(HealthStatus::Healthy);
199
200 ModuleData {
201 name,
202 reads,
203 writes,
204 total_read,
205 total_written,
206 health,
207 }
208 }
209
210 fn compute_read_status(
211 pending_for: Option<Duration>,
212 unread: Option<u64>,
213 thresholds: &Thresholds,
214 ) -> HealthStatus {
215 let pending_status = pending_for.map_or(HealthStatus::Healthy, |d| {
216 if d >= thresholds.pending_critical {
217 HealthStatus::Critical
218 } else if d >= thresholds.pending_warning {
219 HealthStatus::Warning
220 } else {
221 HealthStatus::Healthy
222 }
223 });
224
225 let unread_status = unread.map_or(HealthStatus::Healthy, |u| {
226 if u >= thresholds.unread_critical {
227 HealthStatus::Critical
228 } else if u >= thresholds.unread_warning {
229 HealthStatus::Warning
230 } else {
231 HealthStatus::Healthy
232 }
233 });
234
235 pending_status.max(unread_status)
236 }
237
238 fn compute_write_status(
239 pending_for: Option<Duration>,
240 thresholds: &Thresholds,
241 ) -> HealthStatus {
242 pending_for.map_or(HealthStatus::Healthy, |d| {
243 if d >= thresholds.pending_critical {
244 HealthStatus::Critical
245 } else if d >= thresholds.pending_warning {
246 HealthStatus::Warning
247 } else {
248 HealthStatus::Healthy
249 }
250 })
251 }
252
253 pub fn unhealthy_topics(&self) -> Vec<(&ModuleData, UnhealthyTopic)> {
255 let mut result = Vec::new();
256
257 for module in &self.modules {
258 for read in &module.reads {
259 if read.status != HealthStatus::Healthy {
260 result.push((module, UnhealthyTopic::Read(read.clone())));
261 }
262 }
263 for write in &module.writes {
264 if write.status != HealthStatus::Healthy {
265 result.push((module, UnhealthyTopic::Write(write.clone())));
266 }
267 }
268 }
269
270 result.sort_by(|a, b| b.1.status().cmp(&a.1.status()));
272 result
273 }
274}
275
276#[derive(Debug, Clone)]
280pub enum UnhealthyTopic {
281 Read(TopicRead),
283 Write(TopicWrite),
285}
286
287impl UnhealthyTopic {
288 pub fn status(&self) -> HealthStatus {
290 match self {
291 UnhealthyTopic::Read(r) => r.status,
292 UnhealthyTopic::Write(w) => w.status,
293 }
294 }
295
296 pub fn topic(&self) -> &str {
298 match self {
299 UnhealthyTopic::Read(r) => &r.topic,
300 UnhealthyTopic::Write(w) => &w.topic,
301 }
302 }
303
304 pub fn pending_for(&self) -> Option<Duration> {
306 match self {
307 UnhealthyTopic::Read(r) => r.pending_for,
308 UnhealthyTopic::Write(w) => w.pending_for,
309 }
310 }
311}