apcore/observability/
usage_exporter.rs1use std::sync::Arc;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use serde_json::Value;
23use tokio::sync::Mutex;
24use tokio::task::JoinHandle;
25
26use crate::errors::ModuleError;
27use crate::observability::usage::UsageCollector;
28
29#[async_trait]
36pub trait UsageExporter: Send + Sync {
37 async fn export(&self, summary: &Value) -> Result<(), ModuleError>;
40
41 async fn shutdown(&self) -> Result<(), ModuleError>;
44}
45
46pub struct NoopUsageExporter;
49
50#[async_trait]
51impl UsageExporter for NoopUsageExporter {
52 async fn export(&self, _summary: &Value) -> Result<(), ModuleError> {
53 Ok(())
54 }
55
56 async fn shutdown(&self) -> Result<(), ModuleError> {
57 Ok(())
58 }
59}
60
61pub struct PeriodicUsageExporter {
64 collector: Arc<UsageCollector>,
65 exporter: Arc<dyn UsageExporter>,
66 interval: Duration,
67 handle: Mutex<Option<JoinHandle<()>>>,
68}
69
70impl PeriodicUsageExporter {
71 #[must_use]
74 pub fn new(
75 collector: Arc<UsageCollector>,
76 exporter: Arc<dyn UsageExporter>,
77 interval: Duration,
78 ) -> Self {
79 Self {
80 collector,
81 exporter,
82 interval,
83 handle: Mutex::new(None),
84 }
85 }
86
87 pub async fn start(&self) {
90 let mut guard = self.handle.lock().await;
91 if guard.is_some() {
92 return;
93 }
94 let collector = self.collector.clone();
95 let exporter = self.exporter.clone();
96 let interval = self.interval;
97 let handle = tokio::spawn(async move {
98 let mut ticker = tokio::time::interval(interval);
99 ticker.tick().await;
103 loop {
104 ticker.tick().await;
105 let summaries = collector.get_all_summaries();
106 let payload = serde_json::to_value(&summaries).unwrap_or(Value::Null);
107 if let Err(err) = exporter.export(&payload).await {
108 tracing::warn!(error = %err, "UsageExporter.export failed; continuing");
109 }
110 }
111 });
112 *guard = Some(handle);
113 }
114
115 pub async fn stop(&self) {
118 let handle = {
119 let mut guard = self.handle.lock().await;
120 guard.take()
121 };
122 if let Some(handle) = handle {
123 handle.abort();
124 let _ = handle.await;
126 }
127 if let Err(err) = self.exporter.shutdown().await {
128 tracing::warn!(error = %err, "UsageExporter.shutdown failed");
129 }
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use std::sync::atomic::{AtomicUsize, Ordering};
137
138 #[derive(Default)]
139 struct Counter {
140 exports: AtomicUsize,
141 shutdowns: AtomicUsize,
142 }
143
144 #[async_trait]
145 impl UsageExporter for Counter {
146 async fn export(&self, _summary: &Value) -> Result<(), ModuleError> {
147 self.exports.fetch_add(1, Ordering::SeqCst);
148 Ok(())
149 }
150 async fn shutdown(&self) -> Result<(), ModuleError> {
151 self.shutdowns.fetch_add(1, Ordering::SeqCst);
152 Ok(())
153 }
154 }
155
156 #[tokio::test]
157 async fn noop_exporter_returns_ok() {
158 let exporter = NoopUsageExporter;
159 exporter.export(&Value::Null).await.unwrap();
160 exporter.shutdown().await.unwrap();
161 }
162
163 #[tokio::test]
164 async fn periodic_driver_calls_exporter() {
165 let collector = Arc::new(UsageCollector::new());
166 let counter = Arc::new(Counter::default());
167 let exporter: Arc<dyn UsageExporter> = counter.clone();
168 let driver = PeriodicUsageExporter::new(collector, exporter, Duration::from_millis(25));
169 driver.start().await;
170 tokio::time::sleep(Duration::from_millis(120)).await;
171 driver.stop().await;
172 assert!(counter.exports.load(Ordering::SeqCst) >= 2);
173 assert_eq!(counter.shutdowns.load(Ordering::SeqCst), 1);
174 }
175
176 #[tokio::test]
177 async fn periodic_driver_double_start_is_idempotent() {
178 let collector = Arc::new(UsageCollector::new());
179 let counter = Arc::new(Counter::default());
180 let exporter: Arc<dyn UsageExporter> = counter.clone();
181 let driver = PeriodicUsageExporter::new(collector, exporter, Duration::from_millis(25));
182 driver.start().await;
183 driver.start().await; driver.stop().await;
185 }
186}