opentelemetry_configuration/
fallback.rs

1//! Export fallback handling for failed OTLP exports.
2//!
3//! When an export fails after all retry attempts, the fallback handler is
4//! invoked with the original OTLP request payload. This allows users to
5//! preserve telemetry data by writing to disk, queuing, or sending to an
6//! alternative endpoint.
7//!
8//! # Example
9//!
10//! ```no_run
11//! use opentelemetry_configuration::{ExportFallback, ExportFailure, FallbackHandler};
12//! use std::path::PathBuf;
13//!
14//! // Use a predefined fallback
15//! let fallback = ExportFallback::Stdout;
16//!
17//! // Or use a closure for custom handling
18//! let fallback = ExportFallback::custom(|failure| {
19//!     eprintln!("Export failed: {}", failure.error);
20//!     // Write the protobuf payload somewhere
21//!     let bytes = failure.request.to_protobuf();
22//!     // ... send to S3, queue, backup collector, etc.
23//!     Ok(())
24//! });
25//! ```
26
27use opentelemetry_proto::tonic::collector::{
28    logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
29    trace::v1::ExportTraceServiceRequest,
30};
31use prost::Message;
32use std::fmt;
33use std::path::PathBuf;
34use std::sync::Arc;
35use std::time::SystemTime;
36
37/// The original OTLP request that failed to export.
38///
39/// This enum contains the exact protobuf payload that was meant to be sent
40/// to the collector. You can serialise it with [`to_protobuf()`](Self::to_protobuf)
41/// and send it via any transport mechanism.
42#[derive(Debug, Clone)]
43pub enum FailedRequest {
44    /// A traces export request.
45    Traces(ExportTraceServiceRequest),
46    /// A metrics export request.
47    Metrics(ExportMetricsServiceRequest),
48    /// A logs export request.
49    Logs(ExportLogsServiceRequest),
50}
51
52impl FailedRequest {
53    /// Serialises the request to protobuf bytes.
54    ///
55    /// This is the canonical wire format expected by OTLP collectors.
56    /// You can send these bytes to any collector endpoint using your
57    /// preferred transport (HTTP, gRPC, file, queue, etc.).
58    ///
59    /// # Example
60    ///
61    /// ```ignore
62    /// let bytes = failure.request.to_protobuf();
63    /// s3_client.put_object(bucket, key, bytes).await?;
64    /// ```
65    pub fn to_protobuf(&self) -> Vec<u8> {
66        match self {
67            Self::Traces(req) => req.encode_to_vec(),
68            Self::Metrics(req) => req.encode_to_vec(),
69            Self::Logs(req) => req.encode_to_vec(),
70        }
71    }
72
73    /// Serialises the request to JSON.
74    ///
75    /// OTLP/JSON is less compact than protobuf but useful for debugging
76    /// and systems that prefer JSON transport.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if JSON serialisation fails.
81    pub fn to_json(&self) -> serde_json::Result<String> {
82        match self {
83            Self::Traces(req) => serde_json::to_string(req),
84            Self::Metrics(req) => serde_json::to_string(req),
85            Self::Logs(req) => serde_json::to_string(req),
86        }
87    }
88
89    /// Serialises the request to pretty-printed JSON.
90    ///
91    /// Useful for debugging and human-readable output.
92    pub fn to_json_pretty(&self) -> serde_json::Result<String> {
93        match self {
94            Self::Traces(req) => serde_json::to_string_pretty(req),
95            Self::Metrics(req) => serde_json::to_string_pretty(req),
96            Self::Logs(req) => serde_json::to_string_pretty(req),
97        }
98    }
99
100    /// Returns the estimated serialised size in bytes.
101    ///
102    /// Useful for implementing size-based filtering or batching logic
103    /// in fallback handlers.
104    pub fn encoded_len(&self) -> usize {
105        match self {
106            Self::Traces(req) => req.encoded_len(),
107            Self::Metrics(req) => req.encoded_len(),
108            Self::Logs(req) => req.encoded_len(),
109        }
110    }
111
112    /// Returns the signal type as a string.
113    ///
114    /// Useful for logging, metrics, and routing decisions.
115    pub fn signal_type(&self) -> &'static str {
116        match self {
117            Self::Traces(_) => "traces",
118            Self::Metrics(_) => "metrics",
119            Self::Logs(_) => "logs",
120        }
121    }
122
123    /// Returns the OTLP HTTP path for this signal type.
124    ///
125    /// Useful when forwarding to an alternative collector.
126    pub fn otlp_path(&self) -> &'static str {
127        match self {
128            Self::Traces(_) => "/v1/traces",
129            Self::Metrics(_) => "/v1/metrics",
130            Self::Logs(_) => "/v1/logs",
131        }
132    }
133
134    /// Returns the number of items in the request.
135    ///
136    /// For traces, this is the number of spans.
137    /// For metrics, this is the number of data points.
138    /// For logs, this is the number of log records.
139    pub fn item_count(&self) -> usize {
140        match self {
141            Self::Traces(req) => req
142                .resource_spans
143                .iter()
144                .flat_map(|rs| &rs.scope_spans)
145                .map(|ss| ss.spans.len())
146                .sum(),
147            Self::Metrics(req) => req
148                .resource_metrics
149                .iter()
150                .flat_map(|rm| &rm.scope_metrics)
151                .flat_map(|sm| &sm.metrics)
152                .map(|m| match &m.data {
153                    Some(data) => count_metric_data_points(data),
154                    None => 0,
155                })
156                .sum(),
157            Self::Logs(req) => req
158                .resource_logs
159                .iter()
160                .flat_map(|rl| &rl.scope_logs)
161                .map(|sl| sl.log_records.len())
162                .sum(),
163        }
164    }
165}
166
167fn count_metric_data_points(data: &opentelemetry_proto::tonic::metrics::v1::metric::Data) -> usize {
168    use opentelemetry_proto::tonic::metrics::v1::metric::Data;
169    match data {
170        Data::Gauge(g) => g.data_points.len(),
171        Data::Sum(s) => s.data_points.len(),
172        Data::Histogram(h) => h.data_points.len(),
173        Data::ExponentialHistogram(eh) => eh.data_points.len(),
174        Data::Summary(s) => s.data_points.len(),
175    }
176}
177
178/// Details of a failed export after all retry attempts have been exhausted.
179#[derive(Debug)]
180pub struct ExportFailure {
181    /// The error that caused the export to fail.
182    pub error: Box<dyn std::error::Error + Send + Sync>,
183
184    /// The original OTLP request that failed to export.
185    ///
186    /// This contains the full payload ready to be serialised and sent
187    /// via an alternative transport.
188    pub request: FailedRequest,
189
190    /// When the failure occurred.
191    pub timestamp: SystemTime,
192}
193
194impl ExportFailure {
195    /// Creates a new export failure.
196    pub fn new(
197        error: impl Into<Box<dyn std::error::Error + Send + Sync>>,
198        request: FailedRequest,
199    ) -> Self {
200        Self {
201            error: error.into(),
202            request,
203            timestamp: SystemTime::now(),
204        }
205    }
206
207    /// Returns the size of the failed request in bytes.
208    pub fn size_bytes(&self) -> usize {
209        self.request.encoded_len()
210    }
211
212    /// Returns the error message as a string.
213    pub fn error_message(&self) -> String {
214        self.error.to_string()
215    }
216}
217
218/// Handler for export failures after all retry attempts have been exhausted.
219///
220/// Implementations should focus on preserving data (write to disk, queue,
221/// alternative endpoint) rather than attempting additional retries.
222///
223/// # Example
224///
225/// ```ignore
226/// use opentelemetry_configuration::{FallbackHandler, ExportFailure};
227///
228/// struct S3FallbackHandler {
229///     bucket: String,
230///     client: aws_sdk_s3::Client,
231/// }
232///
233/// impl FallbackHandler for S3FallbackHandler {
234///     fn handle_failure(&self, failure: ExportFailure)
235///         -> Result<(), Box<dyn std::error::Error + Send + Sync>>
236///     {
237///         let key = format!(
238///             "failed-exports/{}/{}.pb",
239///             failure.request.signal_type(),
240///             failure.timestamp.duration_since(std::time::UNIX_EPOCH)?.as_millis()
241///         );
242///
243///         // In a real implementation, you'd use async here
244///         let bytes = failure.request.to_protobuf();
245///         // self.client.put_object().bucket(&self.bucket).key(&key).body(bytes)...
246///
247///         Ok(())
248///     }
249/// }
250/// ```
251pub trait FallbackHandler: Send + Sync {
252    /// Handle a failed export.
253    ///
254    /// # Arguments
255    ///
256    /// * `failure` - Details of the failed export including the original OTLP request
257    ///
258    /// # Returns
259    ///
260    /// Returns `Ok(())` if the fallback successfully handled the failure,
261    /// or an error if the fallback itself failed. Errors are logged but
262    /// otherwise ignored.
263    fn handle_failure(
264        &self,
265        failure: ExportFailure,
266    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
267}
268
269/// Predefined fallback strategies for common scenarios.
270#[derive(Clone, Default)]
271pub enum ExportFallback {
272    /// Discard failed exports silently.
273    ///
274    /// Use this when telemetry loss is acceptable and you don't want
275    /// any overhead from fallback handling.
276    None,
277
278    /// Log failed exports via `tracing::warn!`.
279    ///
280    /// This is the default. It logs the error and signal type but not
281    /// the full payload. Uses target `otel_lifecycle` so it can be filtered
282    /// via `RUST_LOG=otel_lifecycle=warn`.
283    #[default]
284    LogError,
285
286    /// Write failed exports as JSON to stdout.
287    ///
288    /// Useful for Lambda where stdout goes to CloudWatch Logs.
289    /// The output is a JSON object with `signal_type`, `error`, and `request` fields.
290    Stdout,
291
292    /// Write failed exports as JSON to stderr.
293    Stderr,
294
295    /// Write failed exports as protobuf files to a directory.
296    ///
297    /// Files are named `{signal_type}-{timestamp_ms}.pb`.
298    File(PathBuf),
299
300    /// Use a custom fallback handler.
301    Custom(Arc<dyn FallbackHandler>),
302}
303
304impl ExportFallback {
305    /// Creates a custom fallback from a closure.
306    ///
307    /// # Example
308    ///
309    /// ```no_run
310    /// use opentelemetry_configuration::ExportFallback;
311    ///
312    /// let fallback = ExportFallback::custom(|failure| {
313    ///     // Write protobuf bytes to your preferred destination
314    ///     let bytes = failure.request.to_protobuf();
315    ///     eprintln!(
316    ///         "Failed to export {} ({} bytes): {}",
317    ///         failure.request.signal_type(),
318    ///         bytes.len(),
319    ///         failure.error
320    ///     );
321    ///     Ok(())
322    /// });
323    /// ```
324    pub fn custom<F>(f: F) -> Self
325    where
326        F: Fn(ExportFailure) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
327            + Send
328            + Sync
329            + 'static,
330    {
331        Self::Custom(Arc::new(ClosureFallbackHandler(f)))
332    }
333
334    /// Handles a failed export using this fallback strategy.
335    ///
336    /// Errors are logged via `tracing` with target `otel_lifecycle`.
337    /// To see these warnings, enable the target in your `RUST_LOG` filter:
338    /// `RUST_LOG=otel_lifecycle=warn`
339    pub fn handle(&self, failure: ExportFailure) {
340        let result = match self {
341            Self::None => Ok(()),
342            Self::LogError => {
343                tracing::warn!(
344                    target: "otel_lifecycle",
345                    signal_type = failure.request.signal_type(),
346                    item_count = failure.request.item_count(),
347                    size_bytes = failure.size_bytes(),
348                    error = %failure.error,
349                    "Export failed"
350                );
351                Ok(())
352            }
353            Self::Stdout => write_json_to_stdout(&failure),
354            Self::Stderr => write_json_to_stderr(&failure),
355            Self::File(dir) => write_protobuf_to_file(dir, &failure),
356            Self::Custom(handler) => handler.handle_failure(failure),
357        };
358
359        if let Err(e) = result {
360            tracing::error!(target: "otel_lifecycle", error = %e, "Fallback handler failed");
361        }
362    }
363}
364
365impl fmt::Debug for ExportFallback {
366    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367        match self {
368            Self::None => write!(f, "None"),
369            Self::LogError => write!(f, "LogError"),
370            Self::Stdout => write!(f, "Stdout"),
371            Self::Stderr => write!(f, "Stderr"),
372            Self::File(path) => f.debug_tuple("File").field(path).finish(),
373            Self::Custom(_) => write!(f, "Custom(...)"),
374        }
375    }
376}
377
378struct ClosureFallbackHandler<F>(F);
379
380impl<F> FallbackHandler for ClosureFallbackHandler<F>
381where
382    F: Fn(ExportFailure) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
383{
384    fn handle_failure(
385        &self,
386        failure: ExportFailure,
387    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
388        (self.0)(failure)
389    }
390}
391
392fn write_json_to_stdout(
393    failure: &ExportFailure,
394) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
395    let json = serde_json::json!({
396        "otlp_fallback": {
397            "signal_type": failure.request.signal_type(),
398            "error": failure.error_message(),
399            "item_count": failure.request.item_count(),
400            "size_bytes": failure.size_bytes(),
401            "timestamp": failure.timestamp
402                .duration_since(SystemTime::UNIX_EPOCH)
403                .map(|d| d.as_millis())
404                .unwrap_or(0),
405            "request": match &failure.request {
406                FailedRequest::Traces(req) => serde_json::to_value(req)?,
407                FailedRequest::Metrics(req) => serde_json::to_value(req)?,
408                FailedRequest::Logs(req) => serde_json::to_value(req)?,
409            }
410        }
411    });
412    println!("{}", serde_json::to_string(&json)?);
413    Ok(())
414}
415
416fn write_json_to_stderr(
417    failure: &ExportFailure,
418) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
419    let json = serde_json::json!({
420        "otlp_fallback": {
421            "signal_type": failure.request.signal_type(),
422            "error": failure.error_message(),
423            "item_count": failure.request.item_count(),
424            "size_bytes": failure.size_bytes(),
425            "timestamp": failure.timestamp
426                .duration_since(SystemTime::UNIX_EPOCH)
427                .map(|d| d.as_millis())
428                .unwrap_or(0),
429            "request": match &failure.request {
430                FailedRequest::Traces(req) => serde_json::to_value(req)?,
431                FailedRequest::Metrics(req) => serde_json::to_value(req)?,
432                FailedRequest::Logs(req) => serde_json::to_value(req)?,
433            }
434        }
435    });
436    eprintln!("{}", serde_json::to_string(&json)?);
437    Ok(())
438}
439
440fn write_protobuf_to_file(
441    dir: &PathBuf,
442    failure: &ExportFailure,
443) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
444    let timestamp_ms = failure
445        .timestamp
446        .duration_since(SystemTime::UNIX_EPOCH)
447        .map(|d| d.as_millis())
448        .unwrap_or(0);
449
450    let filename = format!("{}-{}.pb", failure.request.signal_type(), timestamp_ms);
451    let path = dir.join(filename);
452
453    std::fs::create_dir_all(dir)?;
454    std::fs::write(path, failure.request.to_protobuf())?;
455
456    Ok(())
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use opentelemetry_proto::tonic::{
463        common::v1::AnyValue,
464        common::v1::any_value::Value as AnyValueEnum,
465        logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
466        trace::v1::{ResourceSpans, ScopeSpans, Span},
467    };
468
469    fn create_test_traces_request() -> ExportTraceServiceRequest {
470        ExportTraceServiceRequest {
471            resource_spans: vec![ResourceSpans {
472                resource: None,
473                scope_spans: vec![ScopeSpans {
474                    scope: None,
475                    spans: vec![
476                        Span {
477                            name: "test-span-1".to_string(),
478                            ..Default::default()
479                        },
480                        Span {
481                            name: "test-span-2".to_string(),
482                            ..Default::default()
483                        },
484                    ],
485                    schema_url: String::new(),
486                }],
487                schema_url: String::new(),
488            }],
489        }
490    }
491
492    fn create_test_logs_request() -> ExportLogsServiceRequest {
493        ExportLogsServiceRequest {
494            resource_logs: vec![ResourceLogs {
495                resource: None,
496                scope_logs: vec![ScopeLogs {
497                    scope: None,
498                    log_records: vec![LogRecord {
499                        body: Some(AnyValue {
500                            value: Some(AnyValueEnum::StringValue("test log".to_string())),
501                        }),
502                        ..Default::default()
503                    }],
504                    schema_url: String::new(),
505                }],
506                schema_url: String::new(),
507            }],
508        }
509    }
510
511    #[test]
512    fn test_failed_request_signal_type() {
513        let traces = FailedRequest::Traces(create_test_traces_request());
514        assert_eq!(traces.signal_type(), "traces");
515
516        let logs = FailedRequest::Logs(create_test_logs_request());
517        assert_eq!(logs.signal_type(), "logs");
518    }
519
520    #[test]
521    fn test_failed_request_otlp_path() {
522        let traces = FailedRequest::Traces(create_test_traces_request());
523        assert_eq!(traces.otlp_path(), "/v1/traces");
524
525        let logs = FailedRequest::Logs(create_test_logs_request());
526        assert_eq!(logs.otlp_path(), "/v1/logs");
527    }
528
529    #[test]
530    fn test_failed_request_item_count() {
531        let traces = FailedRequest::Traces(create_test_traces_request());
532        assert_eq!(traces.item_count(), 2);
533
534        let logs = FailedRequest::Logs(create_test_logs_request());
535        assert_eq!(logs.item_count(), 1);
536    }
537
538    #[test]
539    fn test_failed_request_to_protobuf() {
540        let traces = FailedRequest::Traces(create_test_traces_request());
541        let bytes = traces.to_protobuf();
542        assert!(!bytes.is_empty());
543        assert!(bytes.len() > 10);
544    }
545
546    #[test]
547    fn test_failed_request_encoded_len() {
548        let traces = FailedRequest::Traces(create_test_traces_request());
549        let len = traces.encoded_len();
550        let bytes = traces.to_protobuf();
551        assert_eq!(len, bytes.len());
552    }
553
554    #[test]
555    fn test_failed_request_to_json() {
556        let traces = FailedRequest::Traces(create_test_traces_request());
557        let json = traces.to_json().unwrap();
558        assert!(json.contains("test-span-1"));
559        assert!(json.contains("test-span-2"));
560    }
561
562    #[test]
563    fn test_export_failure_creation() {
564        let request = FailedRequest::Traces(create_test_traces_request());
565        let failure = ExportFailure::new("connection refused", request);
566
567        assert_eq!(failure.error_message(), "connection refused");
568        assert!(failure.size_bytes() > 0);
569    }
570
571    #[test]
572    fn test_export_fallback_none() {
573        let fallback = ExportFallback::None;
574        let request = FailedRequest::Traces(create_test_traces_request());
575        let failure = ExportFailure::new("test error", request);
576
577        // Should not panic
578        fallback.handle(failure);
579    }
580
581    #[test]
582    fn test_export_fallback_custom() {
583        use std::sync::atomic::{AtomicBool, Ordering};
584
585        let called = Arc::new(AtomicBool::new(false));
586        let called_clone = called.clone();
587
588        let fallback = ExportFallback::custom(move |failure| {
589            called_clone.store(true, Ordering::SeqCst);
590            assert_eq!(failure.request.signal_type(), "traces");
591            Ok(())
592        });
593
594        let request = FailedRequest::Traces(create_test_traces_request());
595        let failure = ExportFailure::new("test error", request);
596
597        fallback.handle(failure);
598        assert!(called.load(Ordering::SeqCst));
599    }
600
601    #[test]
602    fn test_export_fallback_debug() {
603        assert_eq!(format!("{:?}", ExportFallback::None), "None");
604        assert_eq!(format!("{:?}", ExportFallback::LogError), "LogError");
605        assert_eq!(format!("{:?}", ExportFallback::Stdout), "Stdout");
606        assert!(
607            format!(
608                "{:?}",
609                ExportFallback::Custom(Arc::new(ClosureFallbackHandler(|_| Ok(()))))
610            )
611            .contains("Custom")
612        );
613    }
614
615    #[test]
616    fn test_export_fallback_default() {
617        let fallback = ExportFallback::default();
618        assert!(matches!(fallback, ExportFallback::LogError));
619    }
620}