opentelemetry_configuration/
fallback.rs1use opentelemetry_proto::tonic::collector::{
28 logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
29 trace::v1::ExportTraceServiceRequest,
30};
31use prost::Message;
32use std::fmt;
33use std::path::PathBuf;
34use std::sync::Arc;
35use std::time::SystemTime;
36
37#[derive(Debug, Clone)]
43pub enum FailedRequest {
44 Traces(ExportTraceServiceRequest),
46 Metrics(ExportMetricsServiceRequest),
48 Logs(ExportLogsServiceRequest),
50}
51
52impl FailedRequest {
53 pub fn to_protobuf(&self) -> Vec<u8> {
66 match self {
67 Self::Traces(req) => req.encode_to_vec(),
68 Self::Metrics(req) => req.encode_to_vec(),
69 Self::Logs(req) => req.encode_to_vec(),
70 }
71 }
72
73 pub fn to_json(&self) -> serde_json::Result<String> {
82 match self {
83 Self::Traces(req) => serde_json::to_string(req),
84 Self::Metrics(req) => serde_json::to_string(req),
85 Self::Logs(req) => serde_json::to_string(req),
86 }
87 }
88
89 pub fn to_json_pretty(&self) -> serde_json::Result<String> {
93 match self {
94 Self::Traces(req) => serde_json::to_string_pretty(req),
95 Self::Metrics(req) => serde_json::to_string_pretty(req),
96 Self::Logs(req) => serde_json::to_string_pretty(req),
97 }
98 }
99
100 pub fn encoded_len(&self) -> usize {
105 match self {
106 Self::Traces(req) => req.encoded_len(),
107 Self::Metrics(req) => req.encoded_len(),
108 Self::Logs(req) => req.encoded_len(),
109 }
110 }
111
112 pub fn signal_type(&self) -> &'static str {
116 match self {
117 Self::Traces(_) => "traces",
118 Self::Metrics(_) => "metrics",
119 Self::Logs(_) => "logs",
120 }
121 }
122
123 pub fn otlp_path(&self) -> &'static str {
127 match self {
128 Self::Traces(_) => "/v1/traces",
129 Self::Metrics(_) => "/v1/metrics",
130 Self::Logs(_) => "/v1/logs",
131 }
132 }
133
134 pub fn item_count(&self) -> usize {
140 match self {
141 Self::Traces(req) => req
142 .resource_spans
143 .iter()
144 .flat_map(|rs| &rs.scope_spans)
145 .map(|ss| ss.spans.len())
146 .sum(),
147 Self::Metrics(req) => req
148 .resource_metrics
149 .iter()
150 .flat_map(|rm| &rm.scope_metrics)
151 .flat_map(|sm| &sm.metrics)
152 .map(|m| match &m.data {
153 Some(data) => count_metric_data_points(data),
154 None => 0,
155 })
156 .sum(),
157 Self::Logs(req) => req
158 .resource_logs
159 .iter()
160 .flat_map(|rl| &rl.scope_logs)
161 .map(|sl| sl.log_records.len())
162 .sum(),
163 }
164 }
165}
166
167fn count_metric_data_points(data: &opentelemetry_proto::tonic::metrics::v1::metric::Data) -> usize {
168 use opentelemetry_proto::tonic::metrics::v1::metric::Data;
169 match data {
170 Data::Gauge(g) => g.data_points.len(),
171 Data::Sum(s) => s.data_points.len(),
172 Data::Histogram(h) => h.data_points.len(),
173 Data::ExponentialHistogram(eh) => eh.data_points.len(),
174 Data::Summary(s) => s.data_points.len(),
175 }
176}
177
178#[derive(Debug)]
180pub struct ExportFailure {
181 pub error: Box<dyn std::error::Error + Send + Sync>,
183
184 pub request: FailedRequest,
189
190 pub timestamp: SystemTime,
192}
193
194impl ExportFailure {
195 pub fn new(
197 error: impl Into<Box<dyn std::error::Error + Send + Sync>>,
198 request: FailedRequest,
199 ) -> Self {
200 Self {
201 error: error.into(),
202 request,
203 timestamp: SystemTime::now(),
204 }
205 }
206
207 pub fn size_bytes(&self) -> usize {
209 self.request.encoded_len()
210 }
211
212 pub fn error_message(&self) -> String {
214 self.error.to_string()
215 }
216}
217
218pub trait FallbackHandler: Send + Sync {
252 fn handle_failure(
264 &self,
265 failure: ExportFailure,
266 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
267}
268
269#[derive(Clone, Default)]
271pub enum ExportFallback {
272 None,
277
278 #[default]
284 LogError,
285
286 Stdout,
291
292 Stderr,
294
295 File(PathBuf),
299
300 Custom(Arc<dyn FallbackHandler>),
302}
303
304impl ExportFallback {
305 pub fn custom<F>(f: F) -> Self
325 where
326 F: Fn(ExportFailure) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
327 + Send
328 + Sync
329 + 'static,
330 {
331 Self::Custom(Arc::new(ClosureFallbackHandler(f)))
332 }
333
334 pub fn handle(&self, failure: ExportFailure) {
340 let result = match self {
341 Self::None => Ok(()),
342 Self::LogError => {
343 tracing::warn!(
344 target: "otel_lifecycle",
345 signal_type = failure.request.signal_type(),
346 item_count = failure.request.item_count(),
347 size_bytes = failure.size_bytes(),
348 error = %failure.error,
349 "Export failed"
350 );
351 Ok(())
352 }
353 Self::Stdout => write_json_to_stdout(&failure),
354 Self::Stderr => write_json_to_stderr(&failure),
355 Self::File(dir) => write_protobuf_to_file(dir, &failure),
356 Self::Custom(handler) => handler.handle_failure(failure),
357 };
358
359 if let Err(e) = result {
360 tracing::error!(target: "otel_lifecycle", error = %e, "Fallback handler failed");
361 }
362 }
363}
364
365impl fmt::Debug for ExportFallback {
366 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367 match self {
368 Self::None => write!(f, "None"),
369 Self::LogError => write!(f, "LogError"),
370 Self::Stdout => write!(f, "Stdout"),
371 Self::Stderr => write!(f, "Stderr"),
372 Self::File(path) => f.debug_tuple("File").field(path).finish(),
373 Self::Custom(_) => write!(f, "Custom(...)"),
374 }
375 }
376}
377
378struct ClosureFallbackHandler<F>(F);
379
380impl<F> FallbackHandler for ClosureFallbackHandler<F>
381where
382 F: Fn(ExportFailure) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
383{
384 fn handle_failure(
385 &self,
386 failure: ExportFailure,
387 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
388 (self.0)(failure)
389 }
390}
391
392fn write_json_to_stdout(
393 failure: &ExportFailure,
394) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
395 let json = serde_json::json!({
396 "otlp_fallback": {
397 "signal_type": failure.request.signal_type(),
398 "error": failure.error_message(),
399 "item_count": failure.request.item_count(),
400 "size_bytes": failure.size_bytes(),
401 "timestamp": failure.timestamp
402 .duration_since(SystemTime::UNIX_EPOCH)
403 .map(|d| d.as_millis())
404 .unwrap_or(0),
405 "request": match &failure.request {
406 FailedRequest::Traces(req) => serde_json::to_value(req)?,
407 FailedRequest::Metrics(req) => serde_json::to_value(req)?,
408 FailedRequest::Logs(req) => serde_json::to_value(req)?,
409 }
410 }
411 });
412 println!("{}", serde_json::to_string(&json)?);
413 Ok(())
414}
415
416fn write_json_to_stderr(
417 failure: &ExportFailure,
418) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
419 let json = serde_json::json!({
420 "otlp_fallback": {
421 "signal_type": failure.request.signal_type(),
422 "error": failure.error_message(),
423 "item_count": failure.request.item_count(),
424 "size_bytes": failure.size_bytes(),
425 "timestamp": failure.timestamp
426 .duration_since(SystemTime::UNIX_EPOCH)
427 .map(|d| d.as_millis())
428 .unwrap_or(0),
429 "request": match &failure.request {
430 FailedRequest::Traces(req) => serde_json::to_value(req)?,
431 FailedRequest::Metrics(req) => serde_json::to_value(req)?,
432 FailedRequest::Logs(req) => serde_json::to_value(req)?,
433 }
434 }
435 });
436 eprintln!("{}", serde_json::to_string(&json)?);
437 Ok(())
438}
439
440fn write_protobuf_to_file(
441 dir: &PathBuf,
442 failure: &ExportFailure,
443) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
444 let timestamp_ms = failure
445 .timestamp
446 .duration_since(SystemTime::UNIX_EPOCH)
447 .map(|d| d.as_millis())
448 .unwrap_or(0);
449
450 let filename = format!("{}-{}.pb", failure.request.signal_type(), timestamp_ms);
451 let path = dir.join(filename);
452
453 std::fs::create_dir_all(dir)?;
454 std::fs::write(path, failure.request.to_protobuf())?;
455
456 Ok(())
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use opentelemetry_proto::tonic::{
463 common::v1::AnyValue,
464 common::v1::any_value::Value as AnyValueEnum,
465 logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
466 trace::v1::{ResourceSpans, ScopeSpans, Span},
467 };
468
469 fn create_test_traces_request() -> ExportTraceServiceRequest {
470 ExportTraceServiceRequest {
471 resource_spans: vec![ResourceSpans {
472 resource: None,
473 scope_spans: vec![ScopeSpans {
474 scope: None,
475 spans: vec![
476 Span {
477 name: "test-span-1".to_string(),
478 ..Default::default()
479 },
480 Span {
481 name: "test-span-2".to_string(),
482 ..Default::default()
483 },
484 ],
485 schema_url: String::new(),
486 }],
487 schema_url: String::new(),
488 }],
489 }
490 }
491
492 fn create_test_logs_request() -> ExportLogsServiceRequest {
493 ExportLogsServiceRequest {
494 resource_logs: vec![ResourceLogs {
495 resource: None,
496 scope_logs: vec![ScopeLogs {
497 scope: None,
498 log_records: vec![LogRecord {
499 body: Some(AnyValue {
500 value: Some(AnyValueEnum::StringValue("test log".to_string())),
501 }),
502 ..Default::default()
503 }],
504 schema_url: String::new(),
505 }],
506 schema_url: String::new(),
507 }],
508 }
509 }
510
511 #[test]
512 fn test_failed_request_signal_type() {
513 let traces = FailedRequest::Traces(create_test_traces_request());
514 assert_eq!(traces.signal_type(), "traces");
515
516 let logs = FailedRequest::Logs(create_test_logs_request());
517 assert_eq!(logs.signal_type(), "logs");
518 }
519
520 #[test]
521 fn test_failed_request_otlp_path() {
522 let traces = FailedRequest::Traces(create_test_traces_request());
523 assert_eq!(traces.otlp_path(), "/v1/traces");
524
525 let logs = FailedRequest::Logs(create_test_logs_request());
526 assert_eq!(logs.otlp_path(), "/v1/logs");
527 }
528
529 #[test]
530 fn test_failed_request_item_count() {
531 let traces = FailedRequest::Traces(create_test_traces_request());
532 assert_eq!(traces.item_count(), 2);
533
534 let logs = FailedRequest::Logs(create_test_logs_request());
535 assert_eq!(logs.item_count(), 1);
536 }
537
538 #[test]
539 fn test_failed_request_to_protobuf() {
540 let traces = FailedRequest::Traces(create_test_traces_request());
541 let bytes = traces.to_protobuf();
542 assert!(!bytes.is_empty());
543 assert!(bytes.len() > 10);
544 }
545
546 #[test]
547 fn test_failed_request_encoded_len() {
548 let traces = FailedRequest::Traces(create_test_traces_request());
549 let len = traces.encoded_len();
550 let bytes = traces.to_protobuf();
551 assert_eq!(len, bytes.len());
552 }
553
554 #[test]
555 fn test_failed_request_to_json() {
556 let traces = FailedRequest::Traces(create_test_traces_request());
557 let json = traces.to_json().unwrap();
558 assert!(json.contains("test-span-1"));
559 assert!(json.contains("test-span-2"));
560 }
561
562 #[test]
563 fn test_export_failure_creation() {
564 let request = FailedRequest::Traces(create_test_traces_request());
565 let failure = ExportFailure::new("connection refused", request);
566
567 assert_eq!(failure.error_message(), "connection refused");
568 assert!(failure.size_bytes() > 0);
569 }
570
571 #[test]
572 fn test_export_fallback_none() {
573 let fallback = ExportFallback::None;
574 let request = FailedRequest::Traces(create_test_traces_request());
575 let failure = ExportFailure::new("test error", request);
576
577 fallback.handle(failure);
579 }
580
581 #[test]
582 fn test_export_fallback_custom() {
583 use std::sync::atomic::{AtomicBool, Ordering};
584
585 let called = Arc::new(AtomicBool::new(false));
586 let called_clone = called.clone();
587
588 let fallback = ExportFallback::custom(move |failure| {
589 called_clone.store(true, Ordering::SeqCst);
590 assert_eq!(failure.request.signal_type(), "traces");
591 Ok(())
592 });
593
594 let request = FailedRequest::Traces(create_test_traces_request());
595 let failure = ExportFailure::new("test error", request);
596
597 fallback.handle(failure);
598 assert!(called.load(Ordering::SeqCst));
599 }
600
601 #[test]
602 fn test_export_fallback_debug() {
603 assert_eq!(format!("{:?}", ExportFallback::None), "None");
604 assert_eq!(format!("{:?}", ExportFallback::LogError), "LogError");
605 assert_eq!(format!("{:?}", ExportFallback::Stdout), "Stdout");
606 assert!(
607 format!(
608 "{:?}",
609 ExportFallback::Custom(Arc::new(ClosureFallbackHandler(|_| Ok(()))))
610 )
611 .contains("Custom")
612 );
613 }
614
615 #[test]
616 fn test_export_fallback_default() {
617 let fallback = ExportFallback::default();
618 assert!(matches!(fallback, ExportFallback::LogError));
619 }
620}