sqry_core/uses/collector.rs
1//! Uses collector - fire-and-forget event capture
2//!
3//! This module provides the main entry point for recording use events.
4//! Events are captured via a bounded channel and written to disk by a
5//! background thread, ensuring zero blocking on the main execution path.
6//!
7//! # Design
8//!
9//! - **Bounded channel** (1000 events) prevents unbounded memory growth
10//! - **`try_send` semantics** - drops events when saturated, never blocks
11//! - **Dropped event counter** - tracked for diagnostics visibility
12//! - **Fire-and-forget** - `record()` returns immediately
13//! - **Background thread** - handles disk I/O asynchronously
14//!
15//! # Usage
16//!
17//! ```rust,ignore
18//! use sqry_core::uses::{UsesCollector, UseEvent, UseEventType, QueryKind};
19//!
20//! // Create collector (typically done once at startup)
21//! let collector = UsesCollector::new(&uses_dir, true)?;
22//!
23//! // Record events (non-blocking)
24//! collector.record(UseEvent::new(UseEventType::QueryExecuted {
25//! kind: QueryKind::CallChain,
26//! result_count: 42,
27//! }));
28//!
29//! // Use RAII timer for automatic duration capture
30//! {
31//! let _timer = collector.timed(UseEventType::QueryExecuted {
32//! kind: QueryKind::SymbolLookup,
33//! result_count: 0, // Will be updated
34//! });
35//! // ... do work ...
36//! } // Event recorded on drop with duration
37//! ```
38
39use super::storage::UsesWriter;
40use super::types::{UseEvent, UseEventType};
41use chrono::Utc;
42use crossbeam_channel::{Sender, TrySendError, bounded};
43use std::path::Path;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
46use std::time::Instant;
47
48/// Capacity of the bounded event channel
49///
50/// This limits memory usage while providing headroom for bursts.
51/// If the queue fills up, events are dropped (tracked in `dropped_events`).
52const CHANNEL_CAPACITY: usize = 1000;
53
54/// Global uses collector - fire-and-forget event capture
55///
56/// Uses a bounded channel to prevent unbounded memory growth.
57/// Events are written to disk by a background thread.
58#[derive(Clone)]
59pub struct UsesCollector {
60 sender: Sender<UseEvent>,
61 enabled: Arc<AtomicBool>,
62 dropped_events: Arc<AtomicU64>,
63}
64
65impl UsesCollector {
66 /// Create a new uses collector
67 ///
68 /// Spawns a background thread that writes events to disk.
69 ///
70 /// # Arguments
71 ///
72 /// * `uses_dir` - Directory to store event logs (e.g., `~/.sqry/uses/`)
73 /// * `enabled` - Whether to actually record events
74 ///
75 /// # Returns
76 ///
77 /// A new `UsesCollector` instance. The background writer thread is
78 /// automatically spawned and will run until the collector is dropped.
79 #[must_use]
80 pub fn new(uses_dir: &Path, enabled: bool) -> Self {
81 let (sender, receiver) = bounded(CHANNEL_CAPACITY);
82
83 // Spawn background writer thread
84 let writer = UsesWriter::new(uses_dir.to_path_buf());
85 std::thread::spawn(move || writer.run(&receiver));
86
87 Self {
88 sender,
89 enabled: Arc::new(AtomicBool::new(enabled)),
90 dropped_events: Arc::new(AtomicU64::new(0)),
91 }
92 }
93
94 /// Create a collector for testing that discards events
95 ///
96 /// The test collector still tracks dropped events but doesn't write to disk.
97 #[cfg(test)]
98 #[must_use]
99 pub fn new_test() -> Self {
100 let (sender, receiver) = bounded(CHANNEL_CAPACITY);
101
102 // Spawn a thread that just drains the receiver
103 std::thread::spawn(move || {
104 while receiver.recv().is_ok() {
105 // Discard events
106 }
107 });
108
109 Self {
110 sender,
111 enabled: Arc::new(AtomicBool::new(true)),
112 dropped_events: Arc::new(AtomicU64::new(0)),
113 }
114 }
115
116 /// Create a disabled collector that doesn't record anything
117 #[must_use]
118 pub fn disabled() -> Self {
119 let (sender, _receiver) = bounded(1);
120
121 Self {
122 sender,
123 enabled: Arc::new(AtomicBool::new(false)),
124 dropped_events: Arc::new(AtomicU64::new(0)),
125 }
126 }
127
128 /// Record a use event (non-blocking, fire-and-forget)
129 ///
130 /// Uses `try_send` - drops events if queue is full (backpressure).
131 /// Disconnected errors are also ignored for graceful shutdown.
132 ///
133 /// # Arguments
134 ///
135 /// * `event` - The event to record
136 pub fn record(&self, event: UseEvent) {
137 if self.enabled.load(Ordering::Relaxed)
138 && let Err(TrySendError::Full(_)) = self.sender.try_send(event)
139 {
140 self.dropped_events.fetch_add(1, Ordering::Relaxed);
141 }
142 // Disconnected errors also ignored - graceful shutdown
143 }
144
145 /// Record an event of a specific type with the current timestamp
146 ///
147 /// Convenience method that creates a `UseEvent` and records it.
148 pub fn record_event(&self, event_type: UseEventType) {
149 self.record(UseEvent::new(event_type));
150 }
151
152 /// Create a scoped timer that auto-records duration on drop
153 ///
154 /// Returns a `CollectorTimedUse` that, when dropped, records the event
155 /// with the elapsed duration.
156 ///
157 /// # Arguments
158 ///
159 /// * `event_type` - The event type to record (duration will be added)
160 ///
161 /// # Returns
162 ///
163 /// A timer that records the event on drop
164 #[must_use]
165 pub fn timed(&self, event_type: UseEventType) -> CollectorTimedUse<'_> {
166 CollectorTimedUse::new(self, event_type)
167 }
168
169 /// Enable or disable event recording
170 pub fn set_enabled(&self, enabled: bool) {
171 self.enabled.store(enabled, Ordering::Relaxed);
172 }
173
174 /// Check if recording is enabled
175 #[must_use]
176 pub fn is_enabled(&self) -> bool {
177 self.enabled.load(Ordering::Relaxed)
178 }
179
180 /// Get count of dropped events (for diagnostics)
181 ///
182 /// Events are dropped when the queue is full (backpressure).
183 /// A high count may indicate performance issues.
184 #[must_use]
185 pub fn dropped_count(&self) -> u64 {
186 self.dropped_events.load(Ordering::Relaxed)
187 }
188
189 /// Reset the dropped events counter
190 pub fn reset_dropped_count(&self) {
191 self.dropped_events.store(0, Ordering::Relaxed);
192 }
193}
194
195/// RAII timer for collector - records event with duration on drop
196///
197/// Created via `UsesCollector::timed()`. When dropped, records the event
198/// with the elapsed duration since creation.
199pub struct CollectorTimedUse<'a> {
200 collector: &'a UsesCollector,
201 event_type: Option<UseEventType>,
202 start: Instant,
203}
204
205impl<'a> CollectorTimedUse<'a> {
206 /// Create a new timer for the given collector and event type
207 fn new(collector: &'a UsesCollector, event_type: UseEventType) -> Self {
208 Self {
209 collector,
210 event_type: Some(event_type),
211 start: Instant::now(),
212 }
213 }
214
215 /// Cancel the timer without recording an event
216 ///
217 /// Use this when an operation fails and you don't want to record it.
218 pub fn cancel(&mut self) {
219 self.event_type = None;
220 }
221
222 /// Complete the timer with an updated event type
223 ///
224 /// Use this when you need to update the event type after the operation
225 /// completes (e.g., to update result count).
226 pub fn complete_with(mut self, event_type: UseEventType) {
227 self.event_type = Some(event_type);
228 // Drop will record the event
229 }
230}
231
232impl Drop for CollectorTimedUse<'_> {
233 fn drop(&mut self) {
234 if let Some(event_type) = self.event_type.take() {
235 let duration_ms = u64::try_from(self.start.elapsed().as_millis()).unwrap_or(u64::MAX);
236 let event = UseEvent {
237 timestamp: Utc::now(),
238 event_type,
239 duration_ms: Some(duration_ms),
240 };
241 self.collector.record(event);
242 }
243 }
244}
245
246// ============================================================================
247// Tests
248// ============================================================================
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use crate::uses::types::QueryKind;
254 use std::time::Duration;
255
256 #[test]
257 fn test_collector_fire_and_forget() {
258 let collector = UsesCollector::new_test();
259 let start = Instant::now();
260
261 for _ in 0..1000 {
262 collector.record(UseEvent::new(UseEventType::QueryExecuted {
263 kind: QueryKind::CallChain,
264 result_count: 42,
265 }));
266 }
267
268 let elapsed = start.elapsed();
269 // 1000 events should complete in well under 100ms
270 assert!(
271 elapsed < Duration::from_millis(100),
272 "Recording 1000 events took {elapsed:?}"
273 );
274 }
275
276 #[test]
277 fn test_collector_disabled() {
278 let collector = UsesCollector::disabled();
279
280 // Should not panic or block
281 collector.record(UseEvent::new(UseEventType::QueryExecuted {
282 kind: QueryKind::CallChain,
283 result_count: 42,
284 }));
285
286 assert!(!collector.is_enabled());
287 }
288
289 #[test]
290 fn test_collector_respects_disabled() {
291 let collector = UsesCollector::new_test();
292 collector.set_enabled(false);
293
294 // Should not record when disabled
295 collector.record(UseEvent::new(UseEventType::QueryExecuted {
296 kind: QueryKind::CallChain,
297 result_count: 42,
298 }));
299
300 // Can re-enable
301 collector.set_enabled(true);
302 assert!(collector.is_enabled());
303 }
304
305 #[test]
306 fn test_timed_use_records_duration() {
307 let collector = UsesCollector::new_test();
308
309 {
310 let _timer = collector.timed(UseEventType::QueryExecuted {
311 kind: QueryKind::SymbolLookup,
312 result_count: 0,
313 });
314 std::thread::sleep(Duration::from_millis(10));
315 }
316
317 // Event should have been recorded (we can't easily check the duration
318 // without more infrastructure, but we verify no panics)
319 }
320
321 #[test]
322 fn test_timed_use_cancel() {
323 let collector = UsesCollector::new_test();
324
325 {
326 let mut timer = collector.timed(UseEventType::QueryExecuted {
327 kind: QueryKind::SymbolLookup,
328 result_count: 0,
329 });
330 timer.cancel();
331 }
332
333 // No event should be recorded (verified by no panics)
334 }
335
336 #[test]
337 fn test_dropped_count() {
338 let collector = UsesCollector::disabled();
339
340 assert_eq!(collector.dropped_count(), 0);
341 collector.reset_dropped_count();
342 assert_eq!(collector.dropped_count(), 0);
343 }
344
345 #[test]
346 fn test_record_event_convenience() {
347 let collector = UsesCollector::new_test();
348
349 collector.record_event(UseEventType::QueryExecuted {
350 kind: QueryKind::CallChain,
351 result_count: 42,
352 });
353
354 // Should not panic
355 }
356}