opentelemetry_lambda_extension/
exporter.rs

1//! OTLP signal exporter with retry and fallback.
2//!
3//! This module provides the flusher for exporting OTLP signals to a remote endpoint.
4//! It includes retry logic with exponential backoff and stdout JSON fallback on failure.
5
6use crate::aggregator::BatchedSignal;
7use crate::config::{Compression, ExporterConfig, Protocol};
8use prost::Message;
9use reqwest::Client;
10use serde::Serialize;
11use std::io::Write;
12use std::time::Duration;
13
14const MAX_RETRIES: u32 = 3;
15const INITIAL_BACKOFF: Duration = Duration::from_millis(50);
16
17/// Result of an export operation.
18#[non_exhaustive]
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum ExportResult {
21    /// Export succeeded.
22    Success,
23    /// Export failed after retries, data written to stdout.
24    Fallback,
25    /// Export failed and no data was sent (no endpoint configured).
26    Skipped,
27}
28
29/// Error during export.
30#[non_exhaustive]
31#[derive(Debug, thiserror::Error)]
32pub enum ExportError {
33    /// HTTP request failed.
34    #[error("HTTP request failed")]
35    Http(#[from] reqwest::Error),
36
37    /// Server returned an error status.
38    #[error("server returned {status}: {body}")]
39    Status {
40        /// HTTP status code returned by server.
41        status: u16,
42        /// Response body from server.
43        body: String,
44    },
45
46    /// Encoding failed.
47    #[error("failed to encode request")]
48    Encode(#[source] Box<dyn std::error::Error + Send + Sync>),
49
50    /// No endpoint configured.
51    #[error("no endpoint configured")]
52    NoEndpoint,
53}
54
55impl ExportError {
56    pub(crate) fn encode<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
57        Self::Encode(Box::new(error))
58    }
59
60    pub(crate) fn status(status: u16, body: impl Into<String>) -> Self {
61        Self::Status {
62            status,
63            body: body.into(),
64        }
65    }
66}
67
68/// OTLP exporter for sending signals to a remote endpoint.
69pub struct OtlpExporter {
70    config: ExporterConfig,
71    client: Client,
72}
73
74impl OtlpExporter {
75    /// Creates a new OTLP exporter with the given configuration.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the HTTP client cannot be created.
80    pub fn new(config: ExporterConfig) -> Result<Self, ExportError> {
81        let client = Client::builder()
82            .timeout(config.timeout)
83            .build()
84            .map_err(ExportError::Http)?;
85
86        Ok(Self { config, client })
87    }
88
89    /// Creates a new exporter with default configuration.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the HTTP client cannot be created.
94    pub fn with_defaults() -> Result<Self, ExportError> {
95        Self::new(ExporterConfig::default())
96    }
97
98    /// Exports a batch of signals.
99    ///
100    /// This method handles retries and fallback to stdout on failure.
101    pub async fn export(&self, batch: BatchedSignal) -> ExportResult {
102        if self.config.endpoint.is_none() {
103            tracing::debug!("No endpoint configured, skipping export");
104            return ExportResult::Skipped;
105        }
106
107        let result = self.export_with_retry(&batch).await;
108
109        match result {
110            Ok(()) => ExportResult::Success,
111            Err(e) => {
112                tracing::warn!(error = %e, "Export failed after retries, falling back to stdout");
113                self.emit_to_stdout(&batch);
114                ExportResult::Fallback
115            }
116        }
117    }
118
119    async fn export_with_retry(&self, batch: &BatchedSignal) -> Result<(), ExportError> {
120        let mut last_error = None;
121        let mut backoff = INITIAL_BACKOFF;
122
123        for attempt in 0..MAX_RETRIES {
124            match self.try_export(batch).await {
125                Ok(()) => return Ok(()),
126                Err(ExportError::Status { status, ref body }) if !Self::is_retryable(status) => {
127                    tracing::error!(status, "Received non-retryable status code, not retrying");
128                    return Err(ExportError::status(status, body.clone()));
129                }
130                Err(e) => {
131                    tracing::warn!(
132                        attempt = attempt + 1,
133                        max_retries = MAX_RETRIES,
134                        error = %e,
135                        "Export attempt failed"
136                    );
137                    last_error = Some(e);
138
139                    if attempt + 1 < MAX_RETRIES {
140                        tokio::time::sleep(backoff).await;
141                        backoff *= 2;
142                    }
143                }
144            }
145        }
146
147        Err(last_error.unwrap_or(ExportError::NoEndpoint))
148    }
149
150    /// Determines if a status code is retryable per OTLP specification.
151    ///
152    /// Retryable: 408 (Request Timeout), 429 (Too Many Requests), 5xx (Server Errors)
153    /// Non-retryable: 400, 401, 403, 404, and other 4xx client errors
154    fn is_retryable(status: u16) -> bool {
155        matches!(status, 408 | 429) || (500..600).contains(&status)
156    }
157
158    async fn try_export(&self, batch: &BatchedSignal) -> Result<(), ExportError> {
159        let endpoint = self
160            .config
161            .endpoint
162            .as_ref()
163            .ok_or(ExportError::NoEndpoint)?;
164
165        let (path, body, content_type) = match batch {
166            BatchedSignal::Traces(req) => {
167                ("/v1/traces", self.encode_request(req)?, self.content_type())
168            }
169            BatchedSignal::Metrics(req) => (
170                "/v1/metrics",
171                self.encode_request(req)?,
172                self.content_type(),
173            ),
174            BatchedSignal::Logs(req) => {
175                ("/v1/logs", self.encode_request(req)?, self.content_type())
176            }
177        };
178
179        let url = format!("{}{}", endpoint, path);
180
181        let mut request = self
182            .client
183            .post(&url)
184            .header("Content-Type", content_type)
185            .body(body);
186
187        for (key, value) in &self.config.headers {
188            request = request.header(key, value);
189        }
190
191        if self.config.compression == Compression::Gzip {
192            request = request.header("Content-Encoding", "gzip");
193        }
194
195        let response = request.send().await.map_err(ExportError::Http)?;
196
197        let status = response.status();
198        if status.is_success() {
199            Ok(())
200        } else {
201            let body = response.text().await.unwrap_or_default();
202            Err(ExportError::status(status.as_u16(), body))
203        }
204    }
205
206    fn encode_request<T: Message>(&self, request: &T) -> Result<Vec<u8>, ExportError> {
207        let mut buf = Vec::with_capacity(request.encoded_len());
208        request.encode(&mut buf).map_err(ExportError::encode)?;
209
210        if self.config.compression == Compression::Gzip {
211            use flate2::Compression as GzCompression;
212            use flate2::write::GzEncoder;
213
214            let mut encoder = GzEncoder::new(Vec::new(), GzCompression::default());
215            encoder.write_all(&buf).map_err(ExportError::encode)?;
216            encoder.finish().map_err(ExportError::encode)
217        } else {
218            Ok(buf)
219        }
220    }
221
222    fn content_type(&self) -> &'static str {
223        match self.config.protocol {
224            Protocol::Http => "application/x-protobuf",
225            Protocol::Grpc => "application/grpc",
226        }
227    }
228
229    fn emit_to_stdout(&self, batch: &BatchedSignal) {
230        use std::io::Write as _;
231
232        let fallback = match batch {
233            BatchedSignal::Traces(req) => OtlpFallback {
234                otlp_fallback: OtlpFallbackData {
235                    signal_type: "traces",
236                    request: serde_json::to_value(req).unwrap_or_default(),
237                },
238            },
239            BatchedSignal::Metrics(req) => OtlpFallback {
240                otlp_fallback: OtlpFallbackData {
241                    signal_type: "metrics",
242                    request: serde_json::to_value(req).unwrap_or_default(),
243                },
244            },
245            BatchedSignal::Logs(req) => OtlpFallback {
246                otlp_fallback: OtlpFallbackData {
247                    signal_type: "logs",
248                    request: serde_json::to_value(req).unwrap_or_default(),
249                },
250            },
251        };
252
253        if let Ok(json) = serde_json::to_string(&fallback) {
254            // Use explicit I/O to handle broken pipes gracefully
255            let mut stdout = std::io::stdout().lock();
256            let _ = writeln!(stdout, "{}", json);
257        }
258    }
259
260    /// Returns whether an endpoint is configured.
261    pub fn has_endpoint(&self) -> bool {
262        self.config.endpoint.is_some()
263    }
264
265    /// Returns the configured endpoint URL.
266    pub fn endpoint(&self) -> Option<&str> {
267        self.config.endpoint.as_deref()
268    }
269}
270
271#[derive(Serialize)]
272struct OtlpFallback<'a> {
273    otlp_fallback: OtlpFallbackData<'a>,
274}
275
276#[derive(Serialize)]
277struct OtlpFallbackData<'a> {
278    #[serde(rename = "type")]
279    signal_type: &'a str,
280    request: serde_json::Value,
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
287    use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span};
288    use std::error::Error;
289
290    fn make_trace_batch() -> BatchedSignal {
291        BatchedSignal::Traces(ExportTraceServiceRequest {
292            resource_spans: vec![ResourceSpans {
293                scope_spans: vec![ScopeSpans {
294                    spans: vec![Span {
295                        name: "test-span".to_string(),
296                        trace_id: vec![1; 16],
297                        span_id: vec![1; 8],
298                        ..Default::default()
299                    }],
300                    ..Default::default()
301                }],
302                ..Default::default()
303            }],
304        })
305    }
306
307    #[tokio::test]
308    async fn test_export_no_endpoint_skips() {
309        let exporter = OtlpExporter::with_defaults().unwrap();
310        let batch = make_trace_batch();
311
312        let result = exporter.export(batch).await;
313        assert_eq!(result, ExportResult::Skipped);
314    }
315
316    #[test]
317    fn test_has_endpoint() {
318        let exporter = OtlpExporter::with_defaults().unwrap();
319        assert!(!exporter.has_endpoint());
320
321        let config = ExporterConfig {
322            endpoint: Some("http://localhost:4318".to_string()),
323            ..Default::default()
324        };
325        let exporter = OtlpExporter::new(config).unwrap();
326        assert!(exporter.has_endpoint());
327    }
328
329    #[test]
330    fn test_encode_request() {
331        let config = ExporterConfig {
332            compression: Compression::None,
333            ..Default::default()
334        };
335        let exporter = OtlpExporter::new(config).unwrap();
336
337        let request = ExportTraceServiceRequest::default();
338        let encoded = exporter.encode_request(&request);
339        assert!(encoded.is_ok());
340    }
341
342    #[test]
343    fn test_encode_request_with_gzip() {
344        let config = ExporterConfig {
345            compression: Compression::Gzip,
346            ..Default::default()
347        };
348        let exporter = OtlpExporter::new(config).unwrap();
349
350        let request = ExportTraceServiceRequest::default();
351        let encoded = exporter.encode_request(&request);
352        assert!(encoded.is_ok());
353    }
354
355    #[test]
356    fn test_content_type() {
357        let config = ExporterConfig {
358            protocol: Protocol::Http,
359            ..Default::default()
360        };
361        let exporter = OtlpExporter::new(config).unwrap();
362        assert_eq!(exporter.content_type(), "application/x-protobuf");
363
364        let config = ExporterConfig {
365            protocol: Protocol::Grpc,
366            ..Default::default()
367        };
368        let exporter = OtlpExporter::new(config).unwrap();
369        assert_eq!(exporter.content_type(), "application/grpc");
370    }
371
372    #[test]
373    fn test_export_error_display() {
374        let err = ExportError::NoEndpoint;
375        assert_eq!(format!("{}", err), "no endpoint configured");
376
377        let err = ExportError::status(500, "Internal Server Error");
378        assert!(format!("{}", err).contains("500"));
379        assert!(matches!(err, ExportError::Status { status: 500, .. }));
380    }
381
382    #[test]
383    fn test_export_error_chain() {
384        let io_err = std::io::Error::other("test error");
385        let err = ExportError::encode(io_err);
386
387        assert!(err.source().is_some());
388        assert!(format!("{}", err).contains("encode"));
389    }
390
391    #[test]
392    fn test_is_retryable() {
393        assert!(OtlpExporter::is_retryable(408));
394        assert!(OtlpExporter::is_retryable(429));
395        assert!(OtlpExporter::is_retryable(500));
396        assert!(OtlpExporter::is_retryable(502));
397        assert!(OtlpExporter::is_retryable(503));
398        assert!(OtlpExporter::is_retryable(504));
399
400        assert!(!OtlpExporter::is_retryable(400));
401        assert!(!OtlpExporter::is_retryable(401));
402        assert!(!OtlpExporter::is_retryable(403));
403        assert!(!OtlpExporter::is_retryable(404));
404        assert!(!OtlpExporter::is_retryable(405));
405    }
406}