Skip to main content

scouter_dataframe/parquet/tracing/
service.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use scouter_settings::ObjectStorageSettings;
5use scouter_types::sql::TraceSpan;
6use tokio::sync::mpsc;
7use tokio::time::{interval, Duration};
8use tracing::info;
9
10const BUFFER_SIZE: usize = 10_000;
11const FLUSH_INTERVAL_SECS: u64 = 5;
12
13pub struct TraceSpanService {
14    engine_tx: mpsc::Sender<TableCommand>,
15    span_tx: mpsc::Sender<Vec<TraceSpan>>,
16    shutdown_tx: mpsc::Sender<()>,
17    engine_handle: tokio::task::JoinHandle<()>,
18    buffer_handle: tokio::task::JoinHandle<()>,
19    pub query_service: TraceQueries,
20}
21
22impl TraceSpanService {
23    pub async fn new(
24        storage_settings: &ObjectStorageSettings,
25        compaction_interval_hours: u64,
26        flush_interval_secs: Option<u64>,
27    ) -> Result<Self, TraceEngineError> {
28        let engine = TraceSpanDBEngine::new(storage_settings).await?;
29        info!(
30            "TraceSpanService initialized with storage URI: {}",
31            storage_settings.storage_uri
32        );
33
34        let ctx = engine.ctx.clone();
35        let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
36        let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpan>>(100);
37        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
38
39        let buffer_handle = Self::start_buffering_actor(
40            engine_tx.clone(),
41            span_rx,
42            shutdown_rx,
43            flush_interval_secs,
44        );
45
46        Ok(TraceSpanService {
47            engine_tx,
48            span_tx,
49            shutdown_tx,
50            engine_handle,
51            buffer_handle,
52            query_service: TraceQueries::new(ctx),
53        })
54    }
55
56    fn start_buffering_actor(
57        engine_tx: mpsc::Sender<TableCommand>,
58        mut span_rx: mpsc::Receiver<Vec<TraceSpan>>,
59        mut shutdown_rx: mpsc::Receiver<()>,
60        flush_interval_secs: Option<u64>,
61    ) -> tokio::task::JoinHandle<()> {
62        tokio::spawn(async move {
63            let mut buffer = Vec::with_capacity(BUFFER_SIZE);
64            let mut flush_ticker = interval(Duration::from_secs(
65                flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
66            ));
67            flush_ticker.tick().await;
68
69            loop {
70                tokio::select! {
71                    Some(spans) = span_rx.recv() => {
72
73                        buffer.extend(spans);
74
75                        if buffer.len() >= BUFFER_SIZE {
76                            Self::flush_buffer(&engine_tx, &mut buffer).await;
77                        }
78                    }
79                    _ = flush_ticker.tick() => {
80                        if !buffer.is_empty() {
81                            println!("Flushing spans buffer with {} spans", buffer.len());
82                            Self::flush_buffer(&engine_tx, &mut buffer).await;
83                        }
84                    }
85                    _ = shutdown_rx.recv() => {
86                        info!("Buffer actor received shutdown signal");
87                        if !buffer.is_empty() {
88                            info!("Flushing final {} spans before shutdown", buffer.len());
89                            Self::flush_buffer(&engine_tx, &mut buffer).await;
90                        }
91                        break;
92                    }
93                }
94            }
95
96            info!("Buffering actor shutting down");
97        })
98    }
99
100    async fn flush_buffer(engine_tx: &mpsc::Sender<TableCommand>, buffer: &mut Vec<TraceSpan>) {
101        if buffer.is_empty() {
102            return;
103        }
104
105        let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(BUFFER_SIZE));
106        let span_count = spans_to_write.len();
107
108        info!("Sending write command to engine for {} spans", span_count);
109
110        let (tx, rx) = tokio::sync::oneshot::channel();
111
112        if let Err(e) = engine_tx
113            .send(TableCommand::Write {
114                spans: spans_to_write,
115                respond_to: tx,
116            })
117            .await
118        {
119            tracing::error!("Failed to send write command: {}", e);
120            return;
121        }
122
123        info!("Write command sent, waiting for response");
124
125        match rx.await {
126            Ok(Ok(())) => {
127                info!("Successfully flushed {} spans", span_count);
128            }
129            Ok(Err(e)) => {
130                tracing::error!("Write failed: {}", e);
131            }
132            Err(e) => {
133                tracing::error!("Failed to receive write response: {}", e);
134            }
135        }
136    }
137
138    pub async fn write_spans(&self, spans: Vec<TraceSpan>) -> Result<(), TraceEngineError> {
139        self.span_tx
140            .send(spans)
141            .await
142            .map_err(|_| TraceEngineError::ChannelClosed)?;
143        Ok(())
144    }
145
146    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
147        let (tx, rx) = tokio::sync::oneshot::channel();
148
149        self.engine_tx
150            .send(TableCommand::Optimize { respond_to: tx })
151            .await
152            .map_err(|_| TraceEngineError::ChannelClosed)?;
153
154        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
155    }
156
157    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
158        info!("TraceSpanService shutting down");
159
160        let _ = self.shutdown_tx.send(()).await;
161
162        if let Err(e) = self.buffer_handle.await {
163            tracing::error!("Buffer handle error: {}", e);
164        }
165
166        self.engine_tx
167            .send(TableCommand::Shutdown)
168            .await
169            .map_err(|_| TraceEngineError::ChannelClosed)?;
170
171        if let Err(e) = self.engine_handle.await {
172            tracing::error!("Engine handle error: {}", e);
173        }
174
175        info!("TraceSpanService shutdown complete");
176        Ok(())
177    }
178}
179#[cfg(test)]
180mod tests {
181
182    use super::*;
183    use crate::parquet::tracing::span_view::TraceSpanView;
184    use scouter_mocks::create_simple_trace;
185    use scouter_settings::ObjectStorageSettings;
186    use scouter_types::TraceId;
187    use tracing_subscriber;
188
189    fn cleanup() {
190        let _ = tracing_subscriber::fmt()
191            .with_max_level(tracing::Level::INFO)
192            .try_init();
193
194        let storage_settings = ObjectStorageSettings::default();
195        let current_dir = std::env::current_dir().unwrap();
196        let storage_path = current_dir.join(storage_settings.storage_root());
197        if storage_path.exists() {
198            std::fs::remove_dir_all(storage_path).unwrap();
199        }
200    }
201
202    #[tokio::test]
203    async fn test_service_initialization() -> Result<(), TraceEngineError> {
204        cleanup();
205
206        let storage_settings = ObjectStorageSettings::default();
207        let service = TraceSpanService::new(&storage_settings, 24, Some(2)).await?;
208        service.shutdown().await?;
209        cleanup();
210        Ok(())
211    }
212
213    #[tokio::test]
214    async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
215        cleanup();
216
217        let storage_settings = ObjectStorageSettings::default();
218        let service = TraceSpanService::new(&storage_settings, 24, Some(2)).await?;
219
220        let spans = create_simple_trace();
221        info!("Test: writing {} spans", spans.len());
222        service.write_spans(spans.clone()).await?;
223
224        info!("Test: waiting for flush");
225        tokio::time::sleep(Duration::from_secs(5)).await;
226
227        // get first span to verify to extract trace_id
228        let first_span: &TraceSpan = spans.first().unwrap();
229        // Convert hex string to binary bytes (16 bytes, not 32 bytes of UTF-8)
230        let trace_id_bytes = TraceId::hex_to_bytes(&first_span.trace_id)?;
231
232        info!("Test: querying spans for trace_id {:?}", trace_id_bytes);
233        let records = service
234            .query_service
235            .get_trace_spans(Some(&trace_id_bytes), None, None, None, None)
236            .await?;
237
238        let total_spans: usize = records.iter().map(|batch| batch.len()).sum();
239        println!(
240            "Queried {} spans across {} batches",
241            total_spans,
242            records.len()
243        );
244
245        assert_eq!(
246            total_spans, 3,
247            "Expected to query 3 spans but got {}",
248            total_spans
249        );
250
251        let span_views: Vec<TraceSpanView<'_>> = records
252            .iter() // Iterator over &TraceSpanBatch
253            .flat_map(|batch| batch.iter()) // batch.iter() creates TraceSpanView instances
254            .collect();
255
256        let serialized_spans = serde_json::to_string(&span_views).unwrap();
257
258        // load back at vec<TraceSpan>
259        let deserialized_spans: Vec<TraceSpan> = serde_json::from_str(&serialized_spans).unwrap();
260
261        assert_eq!(
262            deserialized_spans.len(),
263            3,
264            "Expected to deserialize 3 spans but got {}",
265            deserialized_spans.len()
266        );
267
268        let last_span: &TraceSpan = spans.last().unwrap();
269
270        let end_time = last_span.end_time;
271
272        // query with time filter
273        let records = service
274            .query_service
275            .get_trace_spans(None, None, None, Some(&end_time), None)
276            .await?;
277
278        // assert 3
279        let total_spans: usize = records.iter().map(|batch| batch.len()).sum();
280        assert_eq!(
281            total_spans, 3,
282            "Expected to query 3 spans with end_time filter but got {}",
283            total_spans
284        );
285
286        info!("Test: shutting down");
287        service.shutdown().await?;
288        //cleanup();
289        Ok(())
290    }
291}