apcore/observability/
exporters.rs1use async_trait::async_trait;
5use parking_lot::Mutex;
6use std::collections::VecDeque;
7use std::sync::Arc;
8
9use super::span::{Span, SpanExporter};
10use crate::errors::ModuleError;
11
12#[derive(Debug)]
14pub struct StdoutExporter;
15
16#[async_trait]
17impl SpanExporter for StdoutExporter {
18 async fn export(&self, span: &Span) -> Result<(), ModuleError> {
19 let json = serde_json::to_string(span).map_err(|e| {
20 ModuleError::new(
21 crate::errors::ErrorCode::GeneralInternalError,
22 format!("Failed to serialize span: {e}"),
23 )
24 })?;
25 tracing::info!(target: "apcore.span", span = %json);
29 Ok(())
30 }
31
32 async fn shutdown(&self) -> Result<(), ModuleError> {
33 Ok(())
34 }
35}
36
37const DEFAULT_MAX_SPANS: usize = 1000;
39
40#[derive(Debug, Clone)]
42pub struct InMemoryExporter {
43 spans: Arc<Mutex<VecDeque<Span>>>,
44 max_spans: usize,
45}
46
47impl InMemoryExporter {
48 #[must_use]
50 pub fn new() -> Self {
51 Self {
52 spans: Arc::new(Mutex::new(VecDeque::new())),
53 max_spans: DEFAULT_MAX_SPANS,
54 }
55 }
56
57 #[must_use]
59 pub fn with_max_spans(max_spans: usize) -> Self {
60 Self {
61 spans: Arc::new(Mutex::new(VecDeque::new())),
62 max_spans,
63 }
64 }
65
66 #[must_use]
68 pub fn get_spans(&self) -> Vec<Span> {
69 self.spans.lock().iter().cloned().collect()
70 }
71
72 pub fn clear(&self) {
74 self.spans.lock().clear();
75 }
76}
77
78impl Default for InMemoryExporter {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait]
85impl SpanExporter for InMemoryExporter {
86 async fn export(&self, span: &Span) -> Result<(), ModuleError> {
87 let mut spans = self.spans.lock();
88 spans.push_back(span.clone());
89 while spans.len() > self.max_spans {
90 spans.pop_front();
91 }
92 Ok(())
93 }
94
95 async fn shutdown(&self) -> Result<(), ModuleError> {
96 Ok(())
97 }
98}
99
100#[derive(Debug)]
102pub struct OTLPExporter {
103 pub endpoint: String,
104}
105
106impl OTLPExporter {
107 pub fn new(endpoint: impl Into<String>) -> Self {
109 Self {
110 endpoint: endpoint.into(),
111 }
112 }
113}
114
115#[cfg(feature = "events")]
117fn span_to_otlp(span: &Span) -> serde_json::Value {
118 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
119 let start_ns = (span.start_time * 1_000_000_000.0) as u64;
121 let mut otlp_span = serde_json::json!({
122 "traceId": span.trace_id,
123 "spanId": span.span_id,
124 "name": span.name,
125 "startTimeUnixNano": start_ns,
126 "status": match span.status {
127 super::span::SpanStatus::Ok => serde_json::json!({"code": 1}),
128 super::span::SpanStatus::Error => serde_json::json!({"code": 2}),
129 super::span::SpanStatus::Unset => serde_json::json!({"code": 0}),
130 },
131 "attributes": span.attributes.iter().map(|(k, v)| {
132 serde_json::json!({
133 "key": k,
134 "value": { "stringValue": v.to_string() }
135 })
136 }).collect::<Vec<_>>(),
137 });
138 if let Some(ref parent_id) = span.parent_span_id {
139 otlp_span["parentSpanId"] = serde_json::json!(parent_id);
140 }
141 if let Some(end_time) = span.end_time {
142 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
143 let end_ns = (end_time * 1_000_000_000.0) as u64;
145 otlp_span["endTimeUnixNano"] = serde_json::json!(end_ns);
146 }
147 otlp_span
148}
149
150#[cfg(feature = "events")]
151#[async_trait]
152impl SpanExporter for OTLPExporter {
153 async fn export(&self, span: &Span) -> Result<(), ModuleError> {
154 let client = reqwest::Client::new();
155 let payload = serde_json::json!({
156 "resourceSpans": [{
157 "scopeSpans": [{
158 "spans": [span_to_otlp(span)]
159 }]
160 }]
161 });
162 client
163 .post(format!("{}/v1/traces", self.endpoint))
164 .header("Content-Type", "application/json")
165 .json(&payload)
166 .send()
167 .await
168 .map_err(|e| {
169 ModuleError::new(
170 crate::errors::ErrorCode::GeneralInternalError,
171 format!("OTLP export failed: {e}"),
172 )
173 })?;
174 Ok(())
175 }
176
177 async fn shutdown(&self) -> Result<(), ModuleError> {
178 Ok(())
179 }
180}
181
182#[cfg(not(feature = "events"))]
183#[async_trait]
184impl SpanExporter for OTLPExporter {
185 async fn export(&self, _span: &Span) -> Result<(), ModuleError> {
186 tracing::warn!(
189 "OTLPExporter::export called but the `events` feature is not enabled; span discarded"
190 );
191 Ok(())
192 }
193
194 async fn shutdown(&self) -> Result<(), ModuleError> {
195 Ok(())
196 }
197}
198
199pub struct CompositeExporter {
208 inner: Vec<Box<dyn SpanExporter>>,
209}
210
211impl std::fmt::Debug for CompositeExporter {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 f.debug_struct("CompositeExporter")
214 .field("count", &self.inner.len())
215 .finish()
216 }
217}
218
219impl CompositeExporter {
220 #[must_use]
223 pub fn new(inner: Vec<Box<dyn SpanExporter>>) -> Self {
224 Self { inner }
225 }
226
227 #[must_use]
229 pub fn len(&self) -> usize {
230 self.inner.len()
231 }
232
233 #[must_use]
235 pub fn is_empty(&self) -> bool {
236 self.inner.is_empty()
237 }
238}
239
240#[async_trait]
241impl SpanExporter for CompositeExporter {
242 async fn export(&self, span: &Span) -> Result<(), ModuleError> {
243 for (idx, exporter) in self.inner.iter().enumerate() {
244 if let Err(e) = exporter.export(span).await {
245 tracing::warn!(
246 exporter_index = idx,
247 error = %e.message,
248 "CompositeExporter: wrapped exporter failed; continuing with remaining exporters"
249 );
250 }
251 }
252 Ok(())
253 }
254
255 async fn shutdown(&self) -> Result<(), ModuleError> {
256 for (idx, exporter) in self.inner.iter().enumerate() {
257 if let Err(e) = exporter.shutdown().await {
258 tracing::warn!(
259 exporter_index = idx,
260 error = %e.message,
261 "CompositeExporter: wrapped exporter shutdown failed"
262 );
263 }
264 }
265 Ok(())
266 }
267}