opentelemetry_lambda_extension/
exporter.rs1use crate::aggregator::BatchedSignal;
7use crate::config::{Compression, ExporterConfig, Protocol};
8use prost::Message;
9use reqwest::Client;
10use serde::Serialize;
11use std::io::Write;
12use std::time::Duration;
13
14const MAX_RETRIES: u32 = 3;
15const INITIAL_BACKOFF: Duration = Duration::from_millis(50);
16
17#[non_exhaustive]
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum ExportResult {
21 Success,
23 Fallback,
25 Skipped,
27}
28
29#[non_exhaustive]
31#[derive(Debug, thiserror::Error)]
32pub enum ExportError {
33 #[error("HTTP request failed")]
35 Http(#[from] reqwest::Error),
36
37 #[error("server returned {status}: {body}")]
39 Status {
40 status: u16,
42 body: String,
44 },
45
46 #[error("failed to encode request")]
48 Encode(#[source] Box<dyn std::error::Error + Send + Sync>),
49
50 #[error("no endpoint configured")]
52 NoEndpoint,
53}
54
55impl ExportError {
56 pub(crate) fn encode<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
57 Self::Encode(Box::new(error))
58 }
59
60 pub(crate) fn status(status: u16, body: impl Into<String>) -> Self {
61 Self::Status {
62 status,
63 body: body.into(),
64 }
65 }
66}
67
68pub struct OtlpExporter {
70 config: ExporterConfig,
71 client: Client,
72}
73
74impl OtlpExporter {
75 pub fn new(config: ExporterConfig) -> Result<Self, ExportError> {
81 let client = Client::builder()
82 .timeout(config.timeout)
83 .build()
84 .map_err(ExportError::Http)?;
85
86 Ok(Self { config, client })
87 }
88
89 pub fn with_defaults() -> Result<Self, ExportError> {
95 Self::new(ExporterConfig::default())
96 }
97
98 pub async fn export(&self, batch: BatchedSignal) -> ExportResult {
102 if self.config.endpoint.is_none() {
103 tracing::debug!("No endpoint configured, skipping export");
104 return ExportResult::Skipped;
105 }
106
107 let result = self.export_with_retry(&batch).await;
108
109 match result {
110 Ok(()) => ExportResult::Success,
111 Err(e) => {
112 tracing::warn!(error = %e, "Export failed after retries, falling back to stdout");
113 self.emit_to_stdout(&batch);
114 ExportResult::Fallback
115 }
116 }
117 }
118
119 async fn export_with_retry(&self, batch: &BatchedSignal) -> Result<(), ExportError> {
120 let mut last_error = None;
121 let mut backoff = INITIAL_BACKOFF;
122
123 for attempt in 0..MAX_RETRIES {
124 match self.try_export(batch).await {
125 Ok(()) => return Ok(()),
126 Err(ExportError::Status { status, ref body }) if !Self::is_retryable(status) => {
127 tracing::error!(status, "Received non-retryable status code, not retrying");
128 return Err(ExportError::status(status, body.clone()));
129 }
130 Err(e) => {
131 tracing::warn!(
132 attempt = attempt + 1,
133 max_retries = MAX_RETRIES,
134 error = %e,
135 "Export attempt failed"
136 );
137 last_error = Some(e);
138
139 if attempt + 1 < MAX_RETRIES {
140 tokio::time::sleep(backoff).await;
141 backoff *= 2;
142 }
143 }
144 }
145 }
146
147 Err(last_error.unwrap_or(ExportError::NoEndpoint))
148 }
149
150 fn is_retryable(status: u16) -> bool {
155 matches!(status, 408 | 429) || (500..600).contains(&status)
156 }
157
158 async fn try_export(&self, batch: &BatchedSignal) -> Result<(), ExportError> {
159 let endpoint = self
160 .config
161 .endpoint
162 .as_ref()
163 .ok_or(ExportError::NoEndpoint)?;
164
165 let (path, body, content_type) = match batch {
166 BatchedSignal::Traces(req) => {
167 ("/v1/traces", self.encode_request(req)?, self.content_type())
168 }
169 BatchedSignal::Metrics(req) => (
170 "/v1/metrics",
171 self.encode_request(req)?,
172 self.content_type(),
173 ),
174 BatchedSignal::Logs(req) => {
175 ("/v1/logs", self.encode_request(req)?, self.content_type())
176 }
177 };
178
179 let url = format!("{}{}", endpoint, path);
180
181 let mut request = self
182 .client
183 .post(&url)
184 .header("Content-Type", content_type)
185 .body(body);
186
187 for (key, value) in &self.config.headers {
188 request = request.header(key, value);
189 }
190
191 if self.config.compression == Compression::Gzip {
192 request = request.header("Content-Encoding", "gzip");
193 }
194
195 let response = request.send().await.map_err(ExportError::Http)?;
196
197 let status = response.status();
198 if status.is_success() {
199 Ok(())
200 } else {
201 let body = response.text().await.unwrap_or_default();
202 Err(ExportError::status(status.as_u16(), body))
203 }
204 }
205
206 fn encode_request<T: Message>(&self, request: &T) -> Result<Vec<u8>, ExportError> {
207 let mut buf = Vec::with_capacity(request.encoded_len());
208 request.encode(&mut buf).map_err(ExportError::encode)?;
209
210 if self.config.compression == Compression::Gzip {
211 use flate2::Compression as GzCompression;
212 use flate2::write::GzEncoder;
213
214 let mut encoder = GzEncoder::new(Vec::new(), GzCompression::default());
215 encoder.write_all(&buf).map_err(ExportError::encode)?;
216 encoder.finish().map_err(ExportError::encode)
217 } else {
218 Ok(buf)
219 }
220 }
221
222 fn content_type(&self) -> &'static str {
223 match self.config.protocol {
224 Protocol::Http => "application/x-protobuf",
225 Protocol::Grpc => "application/grpc",
226 }
227 }
228
229 fn emit_to_stdout(&self, batch: &BatchedSignal) {
230 use std::io::Write as _;
231
232 let fallback = match batch {
233 BatchedSignal::Traces(req) => OtlpFallback {
234 otlp_fallback: OtlpFallbackData {
235 signal_type: "traces",
236 request: serde_json::to_value(req).unwrap_or_default(),
237 },
238 },
239 BatchedSignal::Metrics(req) => OtlpFallback {
240 otlp_fallback: OtlpFallbackData {
241 signal_type: "metrics",
242 request: serde_json::to_value(req).unwrap_or_default(),
243 },
244 },
245 BatchedSignal::Logs(req) => OtlpFallback {
246 otlp_fallback: OtlpFallbackData {
247 signal_type: "logs",
248 request: serde_json::to_value(req).unwrap_or_default(),
249 },
250 },
251 };
252
253 if let Ok(json) = serde_json::to_string(&fallback) {
254 let mut stdout = std::io::stdout().lock();
256 let _ = writeln!(stdout, "{}", json);
257 }
258 }
259
260 pub fn has_endpoint(&self) -> bool {
262 self.config.endpoint.is_some()
263 }
264
265 pub fn endpoint(&self) -> Option<&str> {
267 self.config.endpoint.as_deref()
268 }
269}
270
271#[derive(Serialize)]
272struct OtlpFallback<'a> {
273 otlp_fallback: OtlpFallbackData<'a>,
274}
275
276#[derive(Serialize)]
277struct OtlpFallbackData<'a> {
278 #[serde(rename = "type")]
279 signal_type: &'a str,
280 request: serde_json::Value,
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
287 use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span};
288 use std::error::Error;
289
290 fn make_trace_batch() -> BatchedSignal {
291 BatchedSignal::Traces(ExportTraceServiceRequest {
292 resource_spans: vec![ResourceSpans {
293 scope_spans: vec![ScopeSpans {
294 spans: vec![Span {
295 name: "test-span".to_string(),
296 trace_id: vec![1; 16],
297 span_id: vec![1; 8],
298 ..Default::default()
299 }],
300 ..Default::default()
301 }],
302 ..Default::default()
303 }],
304 })
305 }
306
307 #[tokio::test]
308 async fn test_export_no_endpoint_skips() {
309 let exporter = OtlpExporter::with_defaults().unwrap();
310 let batch = make_trace_batch();
311
312 let result = exporter.export(batch).await;
313 assert_eq!(result, ExportResult::Skipped);
314 }
315
316 #[test]
317 fn test_has_endpoint() {
318 let exporter = OtlpExporter::with_defaults().unwrap();
319 assert!(!exporter.has_endpoint());
320
321 let config = ExporterConfig {
322 endpoint: Some("http://localhost:4318".to_string()),
323 ..Default::default()
324 };
325 let exporter = OtlpExporter::new(config).unwrap();
326 assert!(exporter.has_endpoint());
327 }
328
329 #[test]
330 fn test_encode_request() {
331 let config = ExporterConfig {
332 compression: Compression::None,
333 ..Default::default()
334 };
335 let exporter = OtlpExporter::new(config).unwrap();
336
337 let request = ExportTraceServiceRequest::default();
338 let encoded = exporter.encode_request(&request);
339 assert!(encoded.is_ok());
340 }
341
342 #[test]
343 fn test_encode_request_with_gzip() {
344 let config = ExporterConfig {
345 compression: Compression::Gzip,
346 ..Default::default()
347 };
348 let exporter = OtlpExporter::new(config).unwrap();
349
350 let request = ExportTraceServiceRequest::default();
351 let encoded = exporter.encode_request(&request);
352 assert!(encoded.is_ok());
353 }
354
355 #[test]
356 fn test_content_type() {
357 let config = ExporterConfig {
358 protocol: Protocol::Http,
359 ..Default::default()
360 };
361 let exporter = OtlpExporter::new(config).unwrap();
362 assert_eq!(exporter.content_type(), "application/x-protobuf");
363
364 let config = ExporterConfig {
365 protocol: Protocol::Grpc,
366 ..Default::default()
367 };
368 let exporter = OtlpExporter::new(config).unwrap();
369 assert_eq!(exporter.content_type(), "application/grpc");
370 }
371
372 #[test]
373 fn test_export_error_display() {
374 let err = ExportError::NoEndpoint;
375 assert_eq!(format!("{}", err), "no endpoint configured");
376
377 let err = ExportError::status(500, "Internal Server Error");
378 assert!(format!("{}", err).contains("500"));
379 assert!(matches!(err, ExportError::Status { status: 500, .. }));
380 }
381
382 #[test]
383 fn test_export_error_chain() {
384 let io_err = std::io::Error::other("test error");
385 let err = ExportError::encode(io_err);
386
387 assert!(err.source().is_some());
388 assert!(format!("{}", err).contains("encode"));
389 }
390
391 #[test]
392 fn test_is_retryable() {
393 assert!(OtlpExporter::is_retryable(408));
394 assert!(OtlpExporter::is_retryable(429));
395 assert!(OtlpExporter::is_retryable(500));
396 assert!(OtlpExporter::is_retryable(502));
397 assert!(OtlpExporter::is_retryable(503));
398 assert!(OtlpExporter::is_retryable(504));
399
400 assert!(!OtlpExporter::is_retryable(400));
401 assert!(!OtlpExporter::is_retryable(401));
402 assert!(!OtlpExporter::is_retryable(403));
403 assert!(!OtlpExporter::is_retryable(404));
404 assert!(!OtlpExporter::is_retryable(405));
405 }
406}