scouter_tracing/exporter/
scouter.rs

1use crate::exporter::TraceError;
2use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
3use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
4use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
5use opentelemetry_sdk::{
6    error::{OTelSdkError, OTelSdkResult},
7    trace::{SpanData, SpanExporter},
8};
9use scouter_events::producer::RustScouterProducer;
10use scouter_events::queue::types::TransportConfig;
11use scouter_state::app_state;
12use scouter_types::{MessageRecord, TraceServerRecord};
13use std::fmt;
14use std::sync::Arc;
15use tracing::{debug, error, instrument};
16pub struct ScouterSpanExporter {
17    space: String,
18    name: String,
19    version: String,
20    producer: Arc<RustScouterProducer>,
21}
22
23impl fmt::Debug for ScouterSpanExporter {
24    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25        f.debug_struct("ScouterSpanExporter")
26            .field("space", &self.space)
27            .field("name", &self.name)
28            .field("version", &self.version)
29            .finish()
30    }
31}
32
33impl ScouterSpanExporter {
34    pub fn new(
35        space: String,
36        name: String,
37        version: String,
38        transport_config: TransportConfig,
39    ) -> Result<Self, TraceError> {
40        let producer = app_state()
41            .handle()
42            .block_on(async { RustScouterProducer::new(transport_config).await })?;
43        Ok(ScouterSpanExporter {
44            space,
45            name,
46            version,
47            producer: Arc::new(producer),
48        })
49    }
50}
51
52impl SpanExporter for ScouterSpanExporter {
53    #[instrument(name = "ScouterSpanExporter::export", skip_all)]
54    async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
55        let producer = self.producer.clone(); // Requires RustScouterProducer: Clone
56        let space = self.space.clone();
57        let name = self.name.clone();
58        let version = self.version.clone();
59
60        debug!("Preparing to export {} spans to Scouter", batch.len());
61        let export_future = async move {
62            // Note: No explicit type annotation here
63            let resource_spans =
64                group_spans_by_resource_and_scope(batch, &ResourceAttributesWithSchema::default());
65            let req = ExportTraceServiceRequest { resource_spans };
66
67            // Note: `self` is consumed by the async move block.
68            let record = TraceServerRecord {
69                request: req,
70                space,
71                name,
72                version,
73            };
74            let message = MessageRecord::TraceServerRecord(record);
75
76            // This fallible call requires the block to resolve to a Result
77            producer.publish(message).await.map_err(|e| {
78                let msg = format!("Failed to publish message to scouter: {}", e);
79                error!("{}", msg);
80                OTelSdkError::InternalFailure(msg)
81            })?;
82
83            // Explicitly return the Ok(()) that the outer spawn expects
84            Ok(()) as Result<(), OTelSdkError>
85        };
86
87        let runtime_handle = app_state().handle();
88        runtime_handle
89            .spawn(export_future)
90            .await
91            .map_err(|e| OTelSdkError::InternalFailure(format!("Task spawn failed: {}", e)))?
92    }
93
94    fn shutdown(&mut self) -> OTelSdkResult {
95        Ok(())
96    }
97}