Skip to main content

selection_capture/
monitor.rs

1use crate::traits::{CancelSignal, MonitorPlatform};
2use crate::types::{CaptureMethod, CaptureOutcome, CaptureStatus, TraceEvent};
3use std::thread;
4use std::time::{Duration, Instant};
5
6pub struct CaptureMonitor<P> {
7    platform: P,
8}
9
10#[derive(Clone, Debug, Default, PartialEq, Eq)]
11pub struct MonitorGuardStats {
12    pub emitted: u64,
13    pub dropped_duplicate: u64,
14    pub dropped_global_interval: u64,
15    pub dropped_same_text_interval: u64,
16    pub dropped_unstable: u64,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct MonitorSpamGuard {
21    pub suppress_identical: bool,
22    pub min_emit_interval: Duration,
23    pub min_emit_interval_same_text: Duration,
24    pub normalize_whitespace: bool,
25    pub stable_polls_required: usize,
26}
27
28impl Default for MonitorSpamGuard {
29    fn default() -> Self {
30        Self {
31            suppress_identical: true,
32            min_emit_interval: Duration::ZERO,
33            min_emit_interval_same_text: Duration::ZERO,
34            normalize_whitespace: false,
35            stable_polls_required: 1,
36        }
37    }
38}
39
40impl<P> CaptureMonitor<P>
41where
42    P: MonitorPlatform,
43{
44    pub fn new(platform: P) -> Self {
45        Self { platform }
46    }
47
48    pub fn next_event(&self) -> Option<String> {
49        self.platform.next_selection_change()
50    }
51
52    pub fn run<F>(&self, mut on_event: F) -> usize
53    where
54        F: FnMut(String),
55    {
56        let mut processed = 0;
57        while let Some(event) = self.next_event() {
58            on_event(event);
59            processed += 1;
60        }
61        processed
62    }
63
64    pub fn run_with_limit<F>(&self, max_events: usize, mut on_event: F) -> usize
65    where
66        F: FnMut(String),
67    {
68        if max_events == 0 {
69            return 0;
70        }
71        let mut processed = 0;
72        while processed < max_events {
73            match self.next_event() {
74                Some(event) => {
75                    on_event(event);
76                    processed += 1;
77                }
78                None => break,
79            }
80        }
81        processed
82    }
83
84    pub fn collect_events(&self, max_events: usize) -> Vec<String> {
85        let mut events = Vec::new();
86        self.run_with_limit(max_events, |event| events.push(event));
87        events
88    }
89
90    pub fn poll_until<F, C>(
91        &self,
92        poll_interval: Duration,
93        mut should_continue: C,
94        mut on_event: F,
95    ) -> usize
96    where
97        F: FnMut(String),
98        C: FnMut() -> bool,
99    {
100        let mut processed = 0;
101        while should_continue() {
102            if let Some(event) = self.next_event() {
103                on_event(event);
104                processed += 1;
105                continue;
106            }
107            thread::sleep(poll_interval);
108        }
109        processed
110    }
111
112    pub fn poll_until_cancelled<F, S>(
113        &self,
114        poll_interval: Duration,
115        cancel: &S,
116        on_event: F,
117    ) -> usize
118    where
119        F: FnMut(String),
120        S: CancelSignal,
121    {
122        self.poll_until(poll_interval, || !cancel.is_cancelled(), on_event)
123    }
124
125    pub fn poll_until_cancelled_coalesced<F, S>(
126        &self,
127        poll_interval: Duration,
128        min_emit_interval: Duration,
129        cancel: &S,
130        mut on_event: F,
131    ) -> usize
132    where
133        F: FnMut(String),
134        S: CancelSignal,
135    {
136        let mut processed = 0;
137        let mut last_emit_at: Option<Instant> = None;
138
139        while !cancel.is_cancelled() {
140            if let Some(event) = self.next_event() {
141                let should_emit = last_emit_at
142                    .map(|last| last.elapsed() >= min_emit_interval)
143                    .unwrap_or(true);
144                if should_emit {
145                    on_event(event);
146                    processed += 1;
147                    last_emit_at = Some(Instant::now());
148                }
149                continue;
150            }
151            thread::sleep(poll_interval);
152        }
153
154        processed
155    }
156
157    pub fn poll_until_cancelled_guarded<F, S>(
158        &self,
159        poll_interval: Duration,
160        cancel: &S,
161        guard: &MonitorSpamGuard,
162        on_event: F,
163    ) -> usize
164    where
165        F: FnMut(String),
166        S: CancelSignal,
167    {
168        let stats =
169            self.poll_until_cancelled_guarded_with_stats(poll_interval, cancel, guard, on_event);
170        stats.emitted as usize
171    }
172
173    pub fn poll_until_cancelled_guarded_with_stats<F, S>(
174        &self,
175        poll_interval: Duration,
176        cancel: &S,
177        guard: &MonitorSpamGuard,
178        mut on_event: F,
179    ) -> MonitorGuardStats
180    where
181        F: FnMut(String),
182        S: CancelSignal,
183    {
184        let mut emitted = 0usize;
185        let mut last_emit_at: Option<Instant> = None;
186        let mut last_emitted_text: Option<String> = None;
187        let mut candidate_text: Option<String> = None;
188        let mut candidate_count = 0usize;
189        let stable_required = guard.stable_polls_required.max(1);
190        let mut stats = MonitorGuardStats::default();
191
192        while !cancel.is_cancelled() {
193            if let Some(event) = self.next_event() {
194                let normalized = normalize_event_text(&event, guard.normalize_whitespace);
195                match candidate_text.as_ref() {
196                    Some(prev) if prev == &normalized => {
197                        candidate_count += 1;
198                    }
199                    _ => {
200                        candidate_text = Some(normalized.clone());
201                        candidate_count = 1;
202                    }
203                }
204
205                if candidate_count < stable_required {
206                    stats.dropped_unstable += 1;
207                    continue;
208                }
209
210                let now = Instant::now();
211                let too_soon_global = last_emit_at
212                    .map(|last| now.duration_since(last) < guard.min_emit_interval)
213                    .unwrap_or(false);
214                let same_as_last = last_emitted_text
215                    .as_ref()
216                    .map(|last| last == &normalized)
217                    .unwrap_or(false);
218                let too_soon_same = same_as_last
219                    && last_emit_at
220                        .map(|last| now.duration_since(last) < guard.min_emit_interval_same_text)
221                        .unwrap_or(false);
222                let blocked_duplicate = guard.suppress_identical && same_as_last;
223
224                if too_soon_global {
225                    stats.dropped_global_interval += 1;
226                    continue;
227                }
228                if too_soon_same {
229                    stats.dropped_same_text_interval += 1;
230                    continue;
231                }
232                if blocked_duplicate {
233                    stats.dropped_duplicate += 1;
234                    continue;
235                }
236
237                on_event(event);
238                emitted += 1;
239                stats.emitted += 1;
240                last_emit_at = Some(now);
241                last_emitted_text = Some(normalized);
242                continue;
243            }
244            thread::sleep(poll_interval);
245        }
246
247        debug_assert_eq!(stats.emitted as usize, emitted);
248        stats
249    }
250}
251
252fn normalize_event_text(input: &str, normalize_whitespace: bool) -> String {
253    if !normalize_whitespace {
254        return input.to_string();
255    }
256
257    input.split_whitespace().collect::<Vec<_>>().join(" ")
258}
259
260#[derive(Clone, Debug, Default, PartialEq, Eq)]
261pub struct MethodMetrics {
262    pub attempts: u64,
263    pub successes: u64,
264    pub empty_results: u64,
265    pub failures: u64,
266    pub total_latency: Duration,
267}
268
269impl MethodMetrics {
270    pub fn success_rate(&self) -> f64 {
271        if self.attempts == 0 {
272            return 0.0;
273        }
274        self.successes as f64 / self.attempts as f64
275    }
276
277    pub fn average_latency(&self) -> Duration {
278        if self.attempts == 0 {
279            return Duration::ZERO;
280        }
281        Duration::from_secs_f64(self.total_latency.as_secs_f64() / self.attempts as f64)
282    }
283}
284
285#[derive(Clone, Debug, Default, PartialEq, Eq)]
286pub struct CaptureMetrics {
287    pub total_captures: u64,
288    pub successes: u64,
289    pub failures: u64,
290    pub timed_out: u64,
291    pub cancelled: u64,
292    pub total_latency: Duration,
293    methods: Vec<(CaptureMethod, MethodMetrics)>,
294}
295
296impl CaptureMetrics {
297    pub fn record_outcome(&mut self, outcome: &CaptureOutcome) {
298        self.total_captures += 1;
299        match outcome {
300            CaptureOutcome::Success(success) => {
301                self.successes += 1;
302                if let Some(trace) = &success.trace {
303                    self.total_latency += trace.total_elapsed;
304                    self.record_trace_events(&trace.events);
305                }
306            }
307            CaptureOutcome::Failure(failure) => {
308                self.failures += 1;
309                if failure.status == CaptureStatus::TimedOut {
310                    self.timed_out += 1;
311                }
312                if failure.status == CaptureStatus::Cancelled {
313                    self.cancelled += 1;
314                }
315                if let Some(trace) = &failure.trace {
316                    self.total_latency += trace.total_elapsed;
317                    self.record_trace_events(&trace.events);
318                }
319            }
320        }
321    }
322
323    pub fn overall_success_rate(&self) -> f64 {
324        if self.total_captures == 0 {
325            return 0.0;
326        }
327        self.successes as f64 / self.total_captures as f64
328    }
329
330    pub fn average_latency(&self) -> Duration {
331        if self.total_captures == 0 {
332            return Duration::ZERO;
333        }
334        Duration::from_secs_f64(self.total_latency.as_secs_f64() / self.total_captures as f64)
335    }
336
337    pub fn method_metrics(&self, method: CaptureMethod) -> Option<&MethodMetrics> {
338        self.methods
339            .iter()
340            .find_map(|(candidate, metrics)| (*candidate == method).then_some(metrics))
341    }
342
343    fn record_trace_events(&mut self, events: &[TraceEvent]) {
344        for event in events {
345            match event {
346                TraceEvent::MethodFinished { method, elapsed } => {
347                    let metrics = self.metrics_mut(*method);
348                    metrics.attempts += 1;
349                    metrics.total_latency += *elapsed;
350                }
351                TraceEvent::MethodSucceeded(method) => {
352                    self.metrics_mut(*method).successes += 1;
353                }
354                TraceEvent::MethodReturnedEmpty(method) => {
355                    self.metrics_mut(*method).empty_results += 1;
356                }
357                TraceEvent::MethodFailed { method, .. } => {
358                    self.metrics_mut(*method).failures += 1;
359                }
360                _ => {}
361            }
362        }
363    }
364
365    fn metrics_mut(&mut self, method: CaptureMethod) -> &mut MethodMetrics {
366        if let Some(index) = self
367            .methods
368            .iter()
369            .position(|(candidate, _)| *candidate == method)
370        {
371            return &mut self.methods[index].1;
372        }
373        self.methods.push((method, MethodMetrics::default()));
374        let index = self.methods.len() - 1;
375        &mut self.methods[index].1
376    }
377}