use super::storage::UsesWriter;
use super::types::{UseEvent, UseEventType};
use chrono::Utc;
use crossbeam_channel::{Sender, TrySendError, bounded};
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Instant;
const CHANNEL_CAPACITY: usize = 1000;
#[derive(Clone)]
pub struct UsesCollector {
sender: Sender<UseEvent>,
enabled: Arc<AtomicBool>,
dropped_events: Arc<AtomicU64>,
}
impl UsesCollector {
#[must_use]
pub fn new(uses_dir: &Path, enabled: bool) -> Self {
let (sender, receiver) = bounded(CHANNEL_CAPACITY);
let writer = UsesWriter::new(uses_dir.to_path_buf());
std::thread::spawn(move || writer.run(&receiver));
Self {
sender,
enabled: Arc::new(AtomicBool::new(enabled)),
dropped_events: Arc::new(AtomicU64::new(0)),
}
}
#[cfg(test)]
#[must_use]
pub fn new_test() -> Self {
let (sender, receiver) = bounded(CHANNEL_CAPACITY);
std::thread::spawn(move || {
while receiver.recv().is_ok() {
}
});
Self {
sender,
enabled: Arc::new(AtomicBool::new(true)),
dropped_events: Arc::new(AtomicU64::new(0)),
}
}
#[must_use]
pub fn disabled() -> Self {
let (sender, _receiver) = bounded(1);
Self {
sender,
enabled: Arc::new(AtomicBool::new(false)),
dropped_events: Arc::new(AtomicU64::new(0)),
}
}
pub fn record(&self, event: UseEvent) {
if self.enabled.load(Ordering::Relaxed)
&& let Err(TrySendError::Full(_)) = self.sender.try_send(event)
{
self.dropped_events.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_event(&self, event_type: UseEventType) {
self.record(UseEvent::new(event_type));
}
#[must_use]
pub fn timed(&self, event_type: UseEventType) -> CollectorTimedUse<'_> {
CollectorTimedUse::new(self, event_type)
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::Relaxed);
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
#[must_use]
pub fn dropped_count(&self) -> u64 {
self.dropped_events.load(Ordering::Relaxed)
}
pub fn reset_dropped_count(&self) {
self.dropped_events.store(0, Ordering::Relaxed);
}
}
pub struct CollectorTimedUse<'a> {
collector: &'a UsesCollector,
event_type: Option<UseEventType>,
start: Instant,
}
impl<'a> CollectorTimedUse<'a> {
fn new(collector: &'a UsesCollector, event_type: UseEventType) -> Self {
Self {
collector,
event_type: Some(event_type),
start: Instant::now(),
}
}
pub fn cancel(&mut self) {
self.event_type = None;
}
pub fn complete_with(mut self, event_type: UseEventType) {
self.event_type = Some(event_type);
}
}
impl Drop for CollectorTimedUse<'_> {
fn drop(&mut self) {
if let Some(event_type) = self.event_type.take() {
let duration_ms = u64::try_from(self.start.elapsed().as_millis()).unwrap_or(u64::MAX);
let event = UseEvent {
timestamp: Utc::now(),
event_type,
duration_ms: Some(duration_ms),
};
self.collector.record(event);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::uses::types::QueryKind;
use std::time::Duration;
#[test]
fn test_collector_fire_and_forget() {
let collector = UsesCollector::new_test();
let start = Instant::now();
for _ in 0..1000 {
collector.record(UseEvent::new(UseEventType::QueryExecuted {
kind: QueryKind::CallChain,
result_count: 42,
}));
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"Recording 1000 events took {elapsed:?}"
);
}
#[test]
fn test_collector_disabled() {
let collector = UsesCollector::disabled();
collector.record(UseEvent::new(UseEventType::QueryExecuted {
kind: QueryKind::CallChain,
result_count: 42,
}));
assert!(!collector.is_enabled());
}
#[test]
fn test_collector_respects_disabled() {
let collector = UsesCollector::new_test();
collector.set_enabled(false);
collector.record(UseEvent::new(UseEventType::QueryExecuted {
kind: QueryKind::CallChain,
result_count: 42,
}));
collector.set_enabled(true);
assert!(collector.is_enabled());
}
#[test]
fn test_timed_use_records_duration() {
let collector = UsesCollector::new_test();
{
let _timer = collector.timed(UseEventType::QueryExecuted {
kind: QueryKind::SymbolLookup,
result_count: 0,
});
std::thread::sleep(Duration::from_millis(10));
}
}
#[test]
fn test_timed_use_cancel() {
let collector = UsesCollector::new_test();
{
let mut timer = collector.timed(UseEventType::QueryExecuted {
kind: QueryKind::SymbolLookup,
result_count: 0,
});
timer.cancel();
}
}
#[test]
fn test_dropped_count() {
let collector = UsesCollector::disabled();
assert_eq!(collector.dropped_count(), 0);
collector.reset_dropped_count();
assert_eq!(collector.dropped_count(), 0);
}
#[test]
fn test_record_event_convenience() {
let collector = UsesCollector::new_test();
collector.record_event(UseEventType::QueryExecuted {
kind: QueryKind::CallChain,
result_count: 42,
});
}
}