scouter_tracing/exporter/
scouter.rs1use crate::exporter::TraceError;
2use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
3use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
4use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
5use opentelemetry_sdk::{
6 error::{OTelSdkError, OTelSdkResult},
7 trace::{SpanData, SpanExporter},
8};
9use scouter_events::producer::RustScouterProducer;
10use scouter_events::queue::types::TransportConfig;
11use scouter_state::app_state;
12use scouter_types::{MessageRecord, TraceServerRecord};
13use std::fmt;
14use std::sync::Arc;
15use tracing::{debug, error, instrument};
16pub struct ScouterSpanExporter {
17 space: String,
18 name: String,
19 version: String,
20 producer: Arc<RustScouterProducer>,
21}
22
23impl fmt::Debug for ScouterSpanExporter {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 f.debug_struct("ScouterSpanExporter")
26 .field("space", &self.space)
27 .field("name", &self.name)
28 .field("version", &self.version)
29 .finish()
30 }
31}
32
33impl ScouterSpanExporter {
34 pub fn new(
35 space: String,
36 name: String,
37 version: String,
38 transport_config: TransportConfig,
39 ) -> Result<Self, TraceError> {
40 let producer = app_state()
41 .handle()
42 .block_on(async { RustScouterProducer::new(transport_config).await })?;
43 Ok(ScouterSpanExporter {
44 space,
45 name,
46 version,
47 producer: Arc::new(producer),
48 })
49 }
50}
51
52impl SpanExporter for ScouterSpanExporter {
53 #[instrument(name = "ScouterSpanExporter::export", skip_all)]
54 async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
55 let producer = self.producer.clone(); let space = self.space.clone();
57 let name = self.name.clone();
58 let version = self.version.clone();
59
60 debug!("Preparing to export {} spans to Scouter", batch.len());
61 let export_future = async move {
62 let resource_spans =
64 group_spans_by_resource_and_scope(batch, &ResourceAttributesWithSchema::default());
65 let req = ExportTraceServiceRequest { resource_spans };
66
67 let record = TraceServerRecord {
69 request: req,
70 space,
71 name,
72 version,
73 };
74 let message = MessageRecord::TraceServerRecord(record);
75
76 producer.publish(message).await.map_err(|e| {
78 let msg = format!("Failed to publish message to scouter: {}", e);
79 error!("{}", msg);
80 OTelSdkError::InternalFailure(msg)
81 })?;
82
83 Ok(()) as Result<(), OTelSdkError>
85 };
86
87 let runtime_handle = app_state().handle();
88 runtime_handle
89 .spawn(export_future)
90 .await
91 .map_err(|e| OTelSdkError::InternalFailure(format!("Task spawn failed: {}", e)))?
92 }
93
94 fn shutdown(&mut self) -> OTelSdkResult {
95 Ok(())
96 }
97}