Skip to main content

apcore/observability/
usage_exporter.rs

1// APCore Protocol — Usage exporter push trait + periodic driver.
2// Spec reference: Issue #45 §3 — push-style usage export.
3//
4// `UsageExporter` is the language-Rust counterpart of Python's
5// `UsageExporter` Protocol and TypeScript's `UsageExporter` interface. The
6// concrete `PeriodicUsageExporter` polls a `UsageCollector` on a fixed
7// interval and forwards a JSON snapshot of all module summaries to the
8// configured exporter implementation.
9//
10// Behavioural contract (cross-language parity):
11//   * `export(summary)` MUST be called exactly once per tick. Implementations
12//     SHOULD treat individual export failures as recoverable — returning an
13//     error logs a warning but does NOT abort the periodic loop.
14//   * `shutdown()` is invoked at most once during `stop()` after the loop has
15//     been cancelled, giving exporters a chance to flush in-flight buffers.
16//   * `stop()` is idempotent — repeated calls are safe.
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use serde_json::Value;
23use tokio::sync::Mutex;
24use tokio::task::JoinHandle;
25
26use crate::errors::ModuleError;
27use crate::observability::usage::UsageCollector;
28
29/// Push-style usage exporter (#45 §3).
30///
31/// Implementations forward periodic JSON snapshots of usage stats to an
32/// external sink (HTTP collector, OpenTelemetry exporter, custom stdout
33/// formatter, …). The `summary` payload mirrors the JSON shape produced by
34/// `serde_json::to_value(&UsageCollector::get_all_summaries())`.
35#[async_trait]
36pub trait UsageExporter: Send + Sync {
37    /// Export a single summary snapshot. Errors are surfaced to callers but
38    /// MUST NOT terminate the periodic driver.
39    async fn export(&self, summary: &Value) -> Result<(), ModuleError>;
40
41    /// Flush any buffered state and release resources. Called once during
42    /// `PeriodicUsageExporter::stop()`.
43    async fn shutdown(&self) -> Result<(), ModuleError>;
44}
45
46/// No-op exporter — useful as a default placeholder in tests and bootstrap
47/// configurations.
48pub struct NoopUsageExporter;
49
50#[async_trait]
51impl UsageExporter for NoopUsageExporter {
52    async fn export(&self, _summary: &Value) -> Result<(), ModuleError> {
53        Ok(())
54    }
55
56    async fn shutdown(&self) -> Result<(), ModuleError> {
57        Ok(())
58    }
59}
60
61/// Periodic driver: spawns a background task that polls a `UsageCollector`
62/// and pushes a JSON snapshot to the supplied exporter on each tick.
63pub struct PeriodicUsageExporter {
64    collector: Arc<UsageCollector>,
65    exporter: Arc<dyn UsageExporter>,
66    interval: Duration,
67    handle: Mutex<Option<JoinHandle<()>>>,
68}
69
70impl PeriodicUsageExporter {
71    /// Create a new periodic exporter. The driver is inert until
72    /// [`Self::start`] is called.
73    #[must_use]
74    pub fn new(
75        collector: Arc<UsageCollector>,
76        exporter: Arc<dyn UsageExporter>,
77        interval: Duration,
78    ) -> Self {
79        Self {
80            collector,
81            exporter,
82            interval,
83            handle: Mutex::new(None),
84        }
85    }
86
87    /// Spawn the periodic export task. If the driver is already running this
88    /// is a no-op.
89    pub async fn start(&self) {
90        let mut guard = self.handle.lock().await;
91        if guard.is_some() {
92            return;
93        }
94        let collector = self.collector.clone();
95        let exporter = self.exporter.clone();
96        let interval = self.interval;
97        let handle = tokio::spawn(async move {
98            let mut ticker = tokio::time::interval(interval);
99            // Skip the immediate first tick so callers observe one full
100            // interval between `start()` and the first export — matches
101            // Python's `asyncio.sleep(interval)` loop semantics.
102            ticker.tick().await;
103            loop {
104                ticker.tick().await;
105                let summaries = collector.get_all_summaries();
106                let payload = serde_json::to_value(&summaries).unwrap_or(Value::Null);
107                if let Err(err) = exporter.export(&payload).await {
108                    tracing::warn!(error = %err, "UsageExporter.export failed; continuing");
109                }
110            }
111        });
112        *guard = Some(handle);
113    }
114
115    /// Stop the periodic task and invoke `shutdown()` on the exporter. Safe
116    /// to call multiple times — subsequent calls are no-ops.
117    pub async fn stop(&self) {
118        let handle = {
119            let mut guard = self.handle.lock().await;
120            guard.take()
121        };
122        if let Some(handle) = handle {
123            handle.abort();
124            // Await termination; ignore the JoinError that aborting produces.
125            let _ = handle.await;
126        }
127        if let Err(err) = self.exporter.shutdown().await {
128            tracing::warn!(error = %err, "UsageExporter.shutdown failed");
129        }
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use std::sync::atomic::{AtomicUsize, Ordering};
137
138    #[derive(Default)]
139    struct Counter {
140        exports: AtomicUsize,
141        shutdowns: AtomicUsize,
142    }
143
144    #[async_trait]
145    impl UsageExporter for Counter {
146        async fn export(&self, _summary: &Value) -> Result<(), ModuleError> {
147            self.exports.fetch_add(1, Ordering::SeqCst);
148            Ok(())
149        }
150        async fn shutdown(&self) -> Result<(), ModuleError> {
151            self.shutdowns.fetch_add(1, Ordering::SeqCst);
152            Ok(())
153        }
154    }
155
156    #[tokio::test]
157    async fn noop_exporter_returns_ok() {
158        let exporter = NoopUsageExporter;
159        exporter.export(&Value::Null).await.unwrap();
160        exporter.shutdown().await.unwrap();
161    }
162
163    #[tokio::test]
164    async fn periodic_driver_calls_exporter() {
165        let collector = Arc::new(UsageCollector::new());
166        let counter = Arc::new(Counter::default());
167        let exporter: Arc<dyn UsageExporter> = counter.clone();
168        let driver = PeriodicUsageExporter::new(collector, exporter, Duration::from_millis(25));
169        driver.start().await;
170        tokio::time::sleep(Duration::from_millis(120)).await;
171        driver.stop().await;
172        assert!(counter.exports.load(Ordering::SeqCst) >= 2);
173        assert_eq!(counter.shutdowns.load(Ordering::SeqCst), 1);
174    }
175
176    #[tokio::test]
177    async fn periodic_driver_double_start_is_idempotent() {
178        let collector = Arc::new(UsageCollector::new());
179        let counter = Arc::new(Counter::default());
180        let exporter: Arc<dyn UsageExporter> = counter.clone();
181        let driver = PeriodicUsageExporter::new(collector, exporter, Duration::from_millis(25));
182        driver.start().await;
183        driver.start().await; // must not spawn a second task
184        driver.stop().await;
185    }
186}