1use crate::traits::{CancelSignal, MonitorPlatform};
2use crate::types::{CaptureMethod, CaptureOutcome, CaptureStatus, TraceEvent};
3use std::thread;
4use std::time::{Duration, Instant};
5
6pub struct CaptureMonitor<P> {
7 platform: P,
8}
9
10#[derive(Clone, Debug, Default, PartialEq, Eq)]
11pub struct MonitorGuardStats {
12 pub emitted: u64,
13 pub dropped_duplicate: u64,
14 pub dropped_global_interval: u64,
15 pub dropped_same_text_interval: u64,
16 pub dropped_unstable: u64,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct MonitorSpamGuard {
21 pub suppress_identical: bool,
22 pub min_emit_interval: Duration,
23 pub min_emit_interval_same_text: Duration,
24 pub normalize_whitespace: bool,
25 pub stable_polls_required: usize,
26}
27
28impl Default for MonitorSpamGuard {
29 fn default() -> Self {
30 Self {
31 suppress_identical: true,
32 min_emit_interval: Duration::ZERO,
33 min_emit_interval_same_text: Duration::ZERO,
34 normalize_whitespace: false,
35 stable_polls_required: 1,
36 }
37 }
38}
39
40impl<P> CaptureMonitor<P>
41where
42 P: MonitorPlatform,
43{
44 pub fn new(platform: P) -> Self {
45 Self { platform }
46 }
47
48 pub fn next_event(&self) -> Option<String> {
49 self.platform.next_selection_change()
50 }
51
52 pub fn run<F>(&self, mut on_event: F) -> usize
53 where
54 F: FnMut(String),
55 {
56 let mut processed = 0;
57 while let Some(event) = self.next_event() {
58 on_event(event);
59 processed += 1;
60 }
61 processed
62 }
63
64 pub fn run_with_limit<F>(&self, max_events: usize, mut on_event: F) -> usize
65 where
66 F: FnMut(String),
67 {
68 if max_events == 0 {
69 return 0;
70 }
71 let mut processed = 0;
72 while processed < max_events {
73 match self.next_event() {
74 Some(event) => {
75 on_event(event);
76 processed += 1;
77 }
78 None => break,
79 }
80 }
81 processed
82 }
83
84 pub fn collect_events(&self, max_events: usize) -> Vec<String> {
85 let mut events = Vec::new();
86 self.run_with_limit(max_events, |event| events.push(event));
87 events
88 }
89
90 pub fn poll_until<F, C>(
91 &self,
92 poll_interval: Duration,
93 mut should_continue: C,
94 mut on_event: F,
95 ) -> usize
96 where
97 F: FnMut(String),
98 C: FnMut() -> bool,
99 {
100 let mut processed = 0;
101 while should_continue() {
102 if let Some(event) = self.next_event() {
103 on_event(event);
104 processed += 1;
105 continue;
106 }
107 thread::sleep(poll_interval);
108 }
109 processed
110 }
111
112 pub fn poll_until_cancelled<F, S>(
113 &self,
114 poll_interval: Duration,
115 cancel: &S,
116 on_event: F,
117 ) -> usize
118 where
119 F: FnMut(String),
120 S: CancelSignal,
121 {
122 self.poll_until(poll_interval, || !cancel.is_cancelled(), on_event)
123 }
124
125 pub fn poll_until_cancelled_coalesced<F, S>(
126 &self,
127 poll_interval: Duration,
128 min_emit_interval: Duration,
129 cancel: &S,
130 mut on_event: F,
131 ) -> usize
132 where
133 F: FnMut(String),
134 S: CancelSignal,
135 {
136 let mut processed = 0;
137 let mut last_emit_at: Option<Instant> = None;
138
139 while !cancel.is_cancelled() {
140 if let Some(event) = self.next_event() {
141 let should_emit = last_emit_at
142 .map(|last| last.elapsed() >= min_emit_interval)
143 .unwrap_or(true);
144 if should_emit {
145 on_event(event);
146 processed += 1;
147 last_emit_at = Some(Instant::now());
148 }
149 continue;
150 }
151 thread::sleep(poll_interval);
152 }
153
154 processed
155 }
156
157 pub fn poll_until_cancelled_guarded<F, S>(
158 &self,
159 poll_interval: Duration,
160 cancel: &S,
161 guard: &MonitorSpamGuard,
162 on_event: F,
163 ) -> usize
164 where
165 F: FnMut(String),
166 S: CancelSignal,
167 {
168 let stats =
169 self.poll_until_cancelled_guarded_with_stats(poll_interval, cancel, guard, on_event);
170 stats.emitted as usize
171 }
172
173 pub fn poll_until_cancelled_guarded_with_stats<F, S>(
174 &self,
175 poll_interval: Duration,
176 cancel: &S,
177 guard: &MonitorSpamGuard,
178 mut on_event: F,
179 ) -> MonitorGuardStats
180 where
181 F: FnMut(String),
182 S: CancelSignal,
183 {
184 let mut emitted = 0usize;
185 let mut last_emit_at: Option<Instant> = None;
186 let mut last_emitted_text: Option<String> = None;
187 let mut candidate_text: Option<String> = None;
188 let mut candidate_count = 0usize;
189 let stable_required = guard.stable_polls_required.max(1);
190 let mut stats = MonitorGuardStats::default();
191
192 while !cancel.is_cancelled() {
193 if let Some(event) = self.next_event() {
194 let normalized = normalize_event_text(&event, guard.normalize_whitespace);
195 match candidate_text.as_ref() {
196 Some(prev) if prev == &normalized => {
197 candidate_count += 1;
198 }
199 _ => {
200 candidate_text = Some(normalized.clone());
201 candidate_count = 1;
202 }
203 }
204
205 if candidate_count < stable_required {
206 stats.dropped_unstable += 1;
207 continue;
208 }
209
210 let now = Instant::now();
211 let too_soon_global = last_emit_at
212 .map(|last| now.duration_since(last) < guard.min_emit_interval)
213 .unwrap_or(false);
214 let same_as_last = last_emitted_text
215 .as_ref()
216 .map(|last| last == &normalized)
217 .unwrap_or(false);
218 let too_soon_same = same_as_last
219 && last_emit_at
220 .map(|last| now.duration_since(last) < guard.min_emit_interval_same_text)
221 .unwrap_or(false);
222 let blocked_duplicate = guard.suppress_identical && same_as_last;
223
224 if too_soon_global {
225 stats.dropped_global_interval += 1;
226 continue;
227 }
228 if too_soon_same {
229 stats.dropped_same_text_interval += 1;
230 continue;
231 }
232 if blocked_duplicate {
233 stats.dropped_duplicate += 1;
234 continue;
235 }
236
237 on_event(event);
238 emitted += 1;
239 stats.emitted += 1;
240 last_emit_at = Some(now);
241 last_emitted_text = Some(normalized);
242 continue;
243 }
244 thread::sleep(poll_interval);
245 }
246
247 debug_assert_eq!(stats.emitted as usize, emitted);
248 stats
249 }
250}
251
252fn normalize_event_text(input: &str, normalize_whitespace: bool) -> String {
253 if !normalize_whitespace {
254 return input.to_string();
255 }
256
257 input.split_whitespace().collect::<Vec<_>>().join(" ")
258}
259
260#[derive(Clone, Debug, Default, PartialEq, Eq)]
261pub struct MethodMetrics {
262 pub attempts: u64,
263 pub successes: u64,
264 pub empty_results: u64,
265 pub failures: u64,
266 pub total_latency: Duration,
267}
268
269impl MethodMetrics {
270 pub fn success_rate(&self) -> f64 {
271 if self.attempts == 0 {
272 return 0.0;
273 }
274 self.successes as f64 / self.attempts as f64
275 }
276
277 pub fn average_latency(&self) -> Duration {
278 if self.attempts == 0 {
279 return Duration::ZERO;
280 }
281 Duration::from_secs_f64(self.total_latency.as_secs_f64() / self.attempts as f64)
282 }
283}
284
285#[derive(Clone, Debug, Default, PartialEq, Eq)]
286pub struct CaptureMetrics {
287 pub total_captures: u64,
288 pub successes: u64,
289 pub failures: u64,
290 pub timed_out: u64,
291 pub cancelled: u64,
292 pub total_latency: Duration,
293 methods: Vec<(CaptureMethod, MethodMetrics)>,
294}
295
296impl CaptureMetrics {
297 pub fn record_outcome(&mut self, outcome: &CaptureOutcome) {
298 self.total_captures += 1;
299 match outcome {
300 CaptureOutcome::Success(success) => {
301 self.successes += 1;
302 if let Some(trace) = &success.trace {
303 self.total_latency += trace.total_elapsed;
304 self.record_trace_events(&trace.events);
305 }
306 }
307 CaptureOutcome::Failure(failure) => {
308 self.failures += 1;
309 if failure.status == CaptureStatus::TimedOut {
310 self.timed_out += 1;
311 }
312 if failure.status == CaptureStatus::Cancelled {
313 self.cancelled += 1;
314 }
315 if let Some(trace) = &failure.trace {
316 self.total_latency += trace.total_elapsed;
317 self.record_trace_events(&trace.events);
318 }
319 }
320 }
321 }
322
323 pub fn overall_success_rate(&self) -> f64 {
324 if self.total_captures == 0 {
325 return 0.0;
326 }
327 self.successes as f64 / self.total_captures as f64
328 }
329
330 pub fn average_latency(&self) -> Duration {
331 if self.total_captures == 0 {
332 return Duration::ZERO;
333 }
334 Duration::from_secs_f64(self.total_latency.as_secs_f64() / self.total_captures as f64)
335 }
336
337 pub fn method_metrics(&self, method: CaptureMethod) -> Option<&MethodMetrics> {
338 self.methods
339 .iter()
340 .find_map(|(candidate, metrics)| (*candidate == method).then_some(metrics))
341 }
342
343 fn record_trace_events(&mut self, events: &[TraceEvent]) {
344 for event in events {
345 match event {
346 TraceEvent::MethodFinished { method, elapsed } => {
347 let metrics = self.metrics_mut(*method);
348 metrics.attempts += 1;
349 metrics.total_latency += *elapsed;
350 }
351 TraceEvent::MethodSucceeded(method) => {
352 self.metrics_mut(*method).successes += 1;
353 }
354 TraceEvent::MethodReturnedEmpty(method) => {
355 self.metrics_mut(*method).empty_results += 1;
356 }
357 TraceEvent::MethodFailed { method, .. } => {
358 self.metrics_mut(*method).failures += 1;
359 }
360 _ => {}
361 }
362 }
363 }
364
365 fn metrics_mut(&mut self, method: CaptureMethod) -> &mut MethodMetrics {
366 if let Some(index) = self
367 .methods
368 .iter()
369 .position(|(candidate, _)| *candidate == method)
370 {
371 return &mut self.methods[index].1;
372 }
373 self.methods.push((method, MethodMetrics::default()));
374 let index = self.methods.len() - 1;
375 &mut self.methods[index].1
376 }
377}