use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::errors::ModuleError;
use crate::observability::usage::UsageCollector;
#[async_trait]
pub trait UsageExporter: Send + Sync {
async fn export(&self, summary: &Value) -> Result<(), ModuleError>;
async fn shutdown(&self) -> Result<(), ModuleError>;
}
pub struct NoopUsageExporter;
#[async_trait]
impl UsageExporter for NoopUsageExporter {
async fn export(&self, _summary: &Value) -> Result<(), ModuleError> {
Ok(())
}
async fn shutdown(&self) -> Result<(), ModuleError> {
Ok(())
}
}
pub struct PeriodicUsageExporter {
collector: Arc<UsageCollector>,
exporter: Arc<dyn UsageExporter>,
interval: Duration,
handle: Mutex<Option<JoinHandle<()>>>,
}
impl PeriodicUsageExporter {
#[must_use]
pub fn new(
collector: Arc<UsageCollector>,
exporter: Arc<dyn UsageExporter>,
interval: Duration,
) -> Self {
Self {
collector,
exporter,
interval,
handle: Mutex::new(None),
}
}
pub async fn start(&self) {
let mut guard = self.handle.lock().await;
if guard.is_some() {
return;
}
let collector = self.collector.clone();
let exporter = self.exporter.clone();
let interval = self.interval;
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
ticker.tick().await;
let summaries = collector.get_all_summaries();
let payload = serde_json::to_value(&summaries).unwrap_or(Value::Null);
if let Err(err) = exporter.export(&payload).await {
tracing::warn!(error = %err, "UsageExporter.export failed; continuing");
}
}
});
*guard = Some(handle);
}
pub async fn stop(&self) {
let handle = {
let mut guard = self.handle.lock().await;
guard.take()
};
if let Some(handle) = handle {
handle.abort();
let _ = handle.await;
}
if let Err(err) = self.exporter.shutdown().await {
tracing::warn!(error = %err, "UsageExporter.shutdown failed");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
struct Counter {
exports: AtomicUsize,
shutdowns: AtomicUsize,
}
#[async_trait]
impl UsageExporter for Counter {
async fn export(&self, _summary: &Value) -> Result<(), ModuleError> {
self.exports.fetch_add(1, Ordering::SeqCst);
Ok(())
}
async fn shutdown(&self) -> Result<(), ModuleError> {
self.shutdowns.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn noop_exporter_returns_ok() {
let exporter = NoopUsageExporter;
exporter.export(&Value::Null).await.unwrap();
exporter.shutdown().await.unwrap();
}
#[tokio::test]
async fn periodic_driver_calls_exporter() {
let collector = Arc::new(UsageCollector::new());
let counter = Arc::new(Counter::default());
let exporter: Arc<dyn UsageExporter> = counter.clone();
let driver = PeriodicUsageExporter::new(collector, exporter, Duration::from_millis(25));
driver.start().await;
tokio::time::sleep(Duration::from_millis(120)).await;
driver.stop().await;
assert!(counter.exports.load(Ordering::SeqCst) >= 2);
assert_eq!(counter.shutdowns.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn periodic_driver_double_start_is_idempotent() {
let collector = Arc::new(UsageCollector::new());
let counter = Arc::new(Counter::default());
let exporter: Arc<dyn UsageExporter> = counter.clone();
let driver = PeriodicUsageExporter::new(collector, exporter, Duration::from_millis(25));
driver.start().await;
driver.start().await; driver.stop().await;
}
}