Skip to main content

apcore/observability/
exporters.rs

1// APCore Protocol — Span exporters
2// Spec reference: Built-in span export destinations
3
4use async_trait::async_trait;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8
9use super::span::{Span, SpanExporter};
10use crate::errors::ModuleError;
11
12/// Exports spans to stdout as JSON.
13#[derive(Debug)]
14pub struct StdoutExporter;
15
16#[async_trait]
17impl SpanExporter for StdoutExporter {
18    async fn export(&self, span: &Span) -> Result<(), ModuleError> {
19        let json = serde_json::to_string(span).map_err(|e| {
20            ModuleError::new(
21                crate::errors::ErrorCode::GeneralInternalError,
22                format!("Failed to serialize span: {e}"),
23            )
24        })?;
25        // Route through tracing so the span line integrates with the
26        // application's tracing-subscriber configuration and does not
27        // bypass log aggregation pipelines.
28        tracing::info!(target: "apcore.span", span = %json);
29        Ok(())
30    }
31
32    async fn shutdown(&self) -> Result<(), ModuleError> {
33        Ok(())
34    }
35}
36
37/// Default maximum spans for `InMemoryExporter`.
38const DEFAULT_MAX_SPANS: usize = 1000;
39
40/// Exports spans to an in-memory buffer for testing.
41#[derive(Debug, Clone)]
42pub struct InMemoryExporter {
43    spans: Arc<Mutex<VecDeque<Span>>>,
44    max_spans: usize,
45}
46
47impl InMemoryExporter {
48    /// Create a new in-memory exporter with default capacity.
49    #[must_use]
50    pub fn new() -> Self {
51        Self {
52            spans: Arc::new(Mutex::new(VecDeque::new())),
53            max_spans: DEFAULT_MAX_SPANS,
54        }
55    }
56
57    /// Create with explicit max spans capacity.
58    #[must_use]
59    pub fn with_max_spans(max_spans: usize) -> Self {
60        Self {
61            spans: Arc::new(Mutex::new(VecDeque::new())),
62            max_spans,
63        }
64    }
65
66    /// Get all exported spans.
67    #[must_use]
68    pub fn get_spans(&self) -> Vec<Span> {
69        self.spans.lock().iter().cloned().collect()
70    }
71
72    /// Clear all exported spans.
73    pub fn clear(&self) {
74        self.spans.lock().clear();
75    }
76}
77
78impl Default for InMemoryExporter {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84#[async_trait]
85impl SpanExporter for InMemoryExporter {
86    async fn export(&self, span: &Span) -> Result<(), ModuleError> {
87        let mut spans = self.spans.lock();
88        spans.push_back(span.clone());
89        while spans.len() > self.max_spans {
90            spans.pop_front();
91        }
92        Ok(())
93    }
94
95    async fn shutdown(&self) -> Result<(), ModuleError> {
96        Ok(())
97    }
98}
99
100/// Exports spans to an OTLP-compatible endpoint.
101#[derive(Debug)]
102pub struct OTLPExporter {
103    pub endpoint: String,
104}
105
106impl OTLPExporter {
107    /// Create a new OTLP exporter.
108    pub fn new(endpoint: impl Into<String>) -> Self {
109        Self {
110            endpoint: endpoint.into(),
111        }
112    }
113}
114
115/// Convert a Span to OTLP JSON format.
116#[cfg(feature = "events")]
117fn span_to_otlp(span: &Span) -> serde_json::Value {
118    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
119    // intentional: nanosecond timestamps fit in u64 for dates post-1970
120    let start_ns = (span.start_time * 1_000_000_000.0) as u64;
121    let mut otlp_span = serde_json::json!({
122        "traceId": span.trace_id,
123        "spanId": span.span_id,
124        "name": span.name,
125        "startTimeUnixNano": start_ns,
126        "status": match span.status {
127            super::span::SpanStatus::Ok => serde_json::json!({"code": 1}),
128            super::span::SpanStatus::Error => serde_json::json!({"code": 2}),
129            super::span::SpanStatus::Unset => serde_json::json!({"code": 0}),
130        },
131        "attributes": span.attributes.iter().map(|(k, v)| {
132            serde_json::json!({
133                "key": k,
134                "value": { "stringValue": v.to_string() }
135            })
136        }).collect::<Vec<_>>(),
137    });
138    if let Some(ref parent_id) = span.parent_span_id {
139        otlp_span["parentSpanId"] = serde_json::json!(parent_id);
140    }
141    if let Some(end_time) = span.end_time {
142        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
143        // intentional: nanosecond timestamps fit in u64 for dates post-1970
144        let end_ns = (end_time * 1_000_000_000.0) as u64;
145        otlp_span["endTimeUnixNano"] = serde_json::json!(end_ns);
146    }
147    otlp_span
148}
149
150#[cfg(feature = "events")]
151#[async_trait]
152impl SpanExporter for OTLPExporter {
153    async fn export(&self, span: &Span) -> Result<(), ModuleError> {
154        let client = reqwest::Client::new();
155        let payload = serde_json::json!({
156            "resourceSpans": [{
157                "scopeSpans": [{
158                    "spans": [span_to_otlp(span)]
159                }]
160            }]
161        });
162        client
163            .post(format!("{}/v1/traces", self.endpoint))
164            .header("Content-Type", "application/json")
165            .json(&payload)
166            .send()
167            .await
168            .map_err(|e| {
169                ModuleError::new(
170                    crate::errors::ErrorCode::GeneralInternalError,
171                    format!("OTLP export failed: {e}"),
172                )
173            })?;
174        Ok(())
175    }
176
177    async fn shutdown(&self) -> Result<(), ModuleError> {
178        Ok(())
179    }
180}
181
182#[cfg(not(feature = "events"))]
183#[async_trait]
184impl SpanExporter for OTLPExporter {
185    async fn export(&self, _span: &Span) -> Result<(), ModuleError> {
186        // Without the `events` feature, export is a silent no-op.
187        // No network call is made; spans are discarded.
188        tracing::warn!(
189            "OTLPExporter::export called but the `events` feature is not enabled; span discarded"
190        );
191        Ok(())
192    }
193
194    async fn shutdown(&self) -> Result<(), ModuleError> {
195        Ok(())
196    }
197}
198
199/// Composite span exporter that fan-outs each span to multiple wrapped
200/// exporters with per-exporter error isolation. A failure in one wrapped
201/// exporter is logged and the remaining exporters still receive the span,
202/// matching apcore-python's `_CompositeExporter` (extensions.py:27-38).
203///
204/// Sync EXT-003: when ≥2 span exporters are registered via the
205/// `ExtensionManager`, they are wrapped in this struct so all of them are
206/// invoked instead of silently dropping the trailing entries.
207pub struct CompositeExporter {
208    inner: Vec<Box<dyn SpanExporter>>,
209}
210
211impl std::fmt::Debug for CompositeExporter {
212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213        f.debug_struct("CompositeExporter")
214            .field("count", &self.inner.len())
215            .finish()
216    }
217}
218
219impl CompositeExporter {
220    /// Wrap N span exporters into a single composite exporter that forwards
221    /// each span to all of them.
222    #[must_use]
223    pub fn new(inner: Vec<Box<dyn SpanExporter>>) -> Self {
224        Self { inner }
225    }
226
227    /// Returns the number of wrapped exporters.
228    #[must_use]
229    pub fn len(&self) -> usize {
230        self.inner.len()
231    }
232
233    /// Returns true if the composite has no wrapped exporters.
234    #[must_use]
235    pub fn is_empty(&self) -> bool {
236        self.inner.is_empty()
237    }
238}
239
240#[async_trait]
241impl SpanExporter for CompositeExporter {
242    async fn export(&self, span: &Span) -> Result<(), ModuleError> {
243        for (idx, exporter) in self.inner.iter().enumerate() {
244            if let Err(e) = exporter.export(span).await {
245                tracing::warn!(
246                    exporter_index = idx,
247                    error = %e.message,
248                    "CompositeExporter: wrapped exporter failed; continuing with remaining exporters"
249                );
250            }
251        }
252        Ok(())
253    }
254
255    async fn shutdown(&self) -> Result<(), ModuleError> {
256        for (idx, exporter) in self.inner.iter().enumerate() {
257            if let Err(e) = exporter.shutdown().await {
258                tracing::warn!(
259                    exporter_index = idx,
260                    error = %e.message,
261                    "CompositeExporter: wrapped exporter shutdown failed"
262                );
263            }
264        }
265        Ok(())
266    }
267}