Skip to main content

sqry_core/uses/
collector.rs

1//! Uses collector - fire-and-forget event capture
2//!
3//! This module provides the main entry point for recording use events.
4//! Events are captured via a bounded channel and written to disk by a
5//! background thread, ensuring zero blocking on the main execution path.
6//!
7//! # Design
8//!
9//! - **Bounded channel** (1000 events) prevents unbounded memory growth
10//! - **`try_send` semantics** - drops events when saturated, never blocks
11//! - **Dropped event counter** - tracked for diagnostics visibility
12//! - **Fire-and-forget** - `record()` returns immediately
13//! - **Background thread** - handles disk I/O asynchronously
14//!
15//! # Usage
16//!
17//! ```rust,ignore
18//! use sqry_core::uses::{UsesCollector, UseEvent, UseEventType, QueryKind};
19//!
20//! // Create collector (typically done once at startup)
21//! let collector = UsesCollector::new(&uses_dir, true)?;
22//!
23//! // Record events (non-blocking)
24//! collector.record(UseEvent::new(UseEventType::QueryExecuted {
25//!     kind: QueryKind::CallChain,
26//!     result_count: 42,
27//! }));
28//!
29//! // Use RAII timer for automatic duration capture
30//! {
31//!     let _timer = collector.timed(UseEventType::QueryExecuted {
32//!         kind: QueryKind::SymbolLookup,
33//!         result_count: 0,  // Will be updated
34//!     });
35//!     // ... do work ...
36//! } // Event recorded on drop with duration
37//! ```
38
39use super::storage::UsesWriter;
40use super::types::{UseEvent, UseEventType};
41use chrono::Utc;
42use crossbeam_channel::{Sender, TrySendError, bounded};
43use std::path::Path;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
46use std::time::Instant;
47
48/// Capacity of the bounded event channel
49///
50/// This limits memory usage while providing headroom for bursts.
51/// If the queue fills up, events are dropped (tracked in `dropped_events`).
52const CHANNEL_CAPACITY: usize = 1000;
53
54/// Global uses collector - fire-and-forget event capture
55///
56/// Uses a bounded channel to prevent unbounded memory growth.
57/// Events are written to disk by a background thread.
58#[derive(Clone)]
59pub struct UsesCollector {
60    sender: Sender<UseEvent>,
61    enabled: Arc<AtomicBool>,
62    dropped_events: Arc<AtomicU64>,
63}
64
65impl UsesCollector {
66    /// Create a new uses collector
67    ///
68    /// Spawns a background thread that writes events to disk.
69    ///
70    /// # Arguments
71    ///
72    /// * `uses_dir` - Directory to store event logs (e.g., `~/.sqry/uses/`)
73    /// * `enabled` - Whether to actually record events
74    ///
75    /// # Returns
76    ///
77    /// A new `UsesCollector` instance. The background writer thread is
78    /// automatically spawned and will run until the collector is dropped.
79    #[must_use]
80    pub fn new(uses_dir: &Path, enabled: bool) -> Self {
81        let (sender, receiver) = bounded(CHANNEL_CAPACITY);
82
83        // Spawn background writer thread
84        let writer = UsesWriter::new(uses_dir.to_path_buf());
85        std::thread::spawn(move || writer.run(&receiver));
86
87        Self {
88            sender,
89            enabled: Arc::new(AtomicBool::new(enabled)),
90            dropped_events: Arc::new(AtomicU64::new(0)),
91        }
92    }
93
94    /// Create a collector for testing that discards events
95    ///
96    /// The test collector still tracks dropped events but doesn't write to disk.
97    #[cfg(test)]
98    #[must_use]
99    pub fn new_test() -> Self {
100        let (sender, receiver) = bounded(CHANNEL_CAPACITY);
101
102        // Spawn a thread that just drains the receiver
103        std::thread::spawn(move || {
104            while receiver.recv().is_ok() {
105                // Discard events
106            }
107        });
108
109        Self {
110            sender,
111            enabled: Arc::new(AtomicBool::new(true)),
112            dropped_events: Arc::new(AtomicU64::new(0)),
113        }
114    }
115
116    /// Create a disabled collector that doesn't record anything
117    #[must_use]
118    pub fn disabled() -> Self {
119        let (sender, _receiver) = bounded(1);
120
121        Self {
122            sender,
123            enabled: Arc::new(AtomicBool::new(false)),
124            dropped_events: Arc::new(AtomicU64::new(0)),
125        }
126    }
127
128    /// Record a use event (non-blocking, fire-and-forget)
129    ///
130    /// Uses `try_send` - drops events if queue is full (backpressure).
131    /// Disconnected errors are also ignored for graceful shutdown.
132    ///
133    /// # Arguments
134    ///
135    /// * `event` - The event to record
136    pub fn record(&self, event: UseEvent) {
137        if self.enabled.load(Ordering::Relaxed)
138            && let Err(TrySendError::Full(_)) = self.sender.try_send(event)
139        {
140            self.dropped_events.fetch_add(1, Ordering::Relaxed);
141        }
142        // Disconnected errors also ignored - graceful shutdown
143    }
144
145    /// Record an event of a specific type with the current timestamp
146    ///
147    /// Convenience method that creates a `UseEvent` and records it.
148    pub fn record_event(&self, event_type: UseEventType) {
149        self.record(UseEvent::new(event_type));
150    }
151
152    /// Create a scoped timer that auto-records duration on drop
153    ///
154    /// Returns a `CollectorTimedUse` that, when dropped, records the event
155    /// with the elapsed duration.
156    ///
157    /// # Arguments
158    ///
159    /// * `event_type` - The event type to record (duration will be added)
160    ///
161    /// # Returns
162    ///
163    /// A timer that records the event on drop
164    #[must_use]
165    pub fn timed(&self, event_type: UseEventType) -> CollectorTimedUse<'_> {
166        CollectorTimedUse::new(self, event_type)
167    }
168
169    /// Enable or disable event recording
170    pub fn set_enabled(&self, enabled: bool) {
171        self.enabled.store(enabled, Ordering::Relaxed);
172    }
173
174    /// Check if recording is enabled
175    #[must_use]
176    pub fn is_enabled(&self) -> bool {
177        self.enabled.load(Ordering::Relaxed)
178    }
179
180    /// Get count of dropped events (for diagnostics)
181    ///
182    /// Events are dropped when the queue is full (backpressure).
183    /// A high count may indicate performance issues.
184    #[must_use]
185    pub fn dropped_count(&self) -> u64 {
186        self.dropped_events.load(Ordering::Relaxed)
187    }
188
189    /// Reset the dropped events counter
190    pub fn reset_dropped_count(&self) {
191        self.dropped_events.store(0, Ordering::Relaxed);
192    }
193}
194
195/// RAII timer for collector - records event with duration on drop
196///
197/// Created via `UsesCollector::timed()`. When dropped, records the event
198/// with the elapsed duration since creation.
199pub struct CollectorTimedUse<'a> {
200    collector: &'a UsesCollector,
201    event_type: Option<UseEventType>,
202    start: Instant,
203}
204
205impl<'a> CollectorTimedUse<'a> {
206    /// Create a new timer for the given collector and event type
207    fn new(collector: &'a UsesCollector, event_type: UseEventType) -> Self {
208        Self {
209            collector,
210            event_type: Some(event_type),
211            start: Instant::now(),
212        }
213    }
214
215    /// Cancel the timer without recording an event
216    ///
217    /// Use this when an operation fails and you don't want to record it.
218    pub fn cancel(&mut self) {
219        self.event_type = None;
220    }
221
222    /// Complete the timer with an updated event type
223    ///
224    /// Use this when you need to update the event type after the operation
225    /// completes (e.g., to update result count).
226    pub fn complete_with(mut self, event_type: UseEventType) {
227        self.event_type = Some(event_type);
228        // Drop will record the event
229    }
230}
231
232impl Drop for CollectorTimedUse<'_> {
233    fn drop(&mut self) {
234        if let Some(event_type) = self.event_type.take() {
235            let duration_ms = u64::try_from(self.start.elapsed().as_millis()).unwrap_or(u64::MAX);
236            let event = UseEvent {
237                timestamp: Utc::now(),
238                event_type,
239                duration_ms: Some(duration_ms),
240            };
241            self.collector.record(event);
242        }
243    }
244}
245
246// ============================================================================
247// Tests
248// ============================================================================
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::uses::types::QueryKind;
254    use std::time::Duration;
255
256    #[test]
257    fn test_collector_fire_and_forget() {
258        let collector = UsesCollector::new_test();
259        let start = Instant::now();
260
261        for _ in 0..1000 {
262            collector.record(UseEvent::new(UseEventType::QueryExecuted {
263                kind: QueryKind::CallChain,
264                result_count: 42,
265            }));
266        }
267
268        let elapsed = start.elapsed();
269        // 1000 events should complete in well under 100ms
270        assert!(
271            elapsed < Duration::from_millis(100),
272            "Recording 1000 events took {elapsed:?}"
273        );
274    }
275
276    #[test]
277    fn test_collector_disabled() {
278        let collector = UsesCollector::disabled();
279
280        // Should not panic or block
281        collector.record(UseEvent::new(UseEventType::QueryExecuted {
282            kind: QueryKind::CallChain,
283            result_count: 42,
284        }));
285
286        assert!(!collector.is_enabled());
287    }
288
289    #[test]
290    fn test_collector_respects_disabled() {
291        let collector = UsesCollector::new_test();
292        collector.set_enabled(false);
293
294        // Should not record when disabled
295        collector.record(UseEvent::new(UseEventType::QueryExecuted {
296            kind: QueryKind::CallChain,
297            result_count: 42,
298        }));
299
300        // Can re-enable
301        collector.set_enabled(true);
302        assert!(collector.is_enabled());
303    }
304
305    #[test]
306    fn test_timed_use_records_duration() {
307        let collector = UsesCollector::new_test();
308
309        {
310            let _timer = collector.timed(UseEventType::QueryExecuted {
311                kind: QueryKind::SymbolLookup,
312                result_count: 0,
313            });
314            std::thread::sleep(Duration::from_millis(10));
315        }
316
317        // Event should have been recorded (we can't easily check the duration
318        // without more infrastructure, but we verify no panics)
319    }
320
321    #[test]
322    fn test_timed_use_cancel() {
323        let collector = UsesCollector::new_test();
324
325        {
326            let mut timer = collector.timed(UseEventType::QueryExecuted {
327                kind: QueryKind::SymbolLookup,
328                result_count: 0,
329            });
330            timer.cancel();
331        }
332
333        // No event should be recorded (verified by no panics)
334    }
335
336    #[test]
337    fn test_dropped_count() {
338        let collector = UsesCollector::disabled();
339
340        assert_eq!(collector.dropped_count(), 0);
341        collector.reset_dropped_count();
342        assert_eq!(collector.dropped_count(), 0);
343    }
344
345    #[test]
346    fn test_record_event_convenience() {
347        let collector = UsesCollector::new_test();
348
349        collector.record_event(UseEventType::QueryExecuted {
350            kind: QueryKind::CallChain,
351            result_count: 42,
352        });
353
354        // Should not panic
355    }
356}