scouter_dataframe/parquet/tracing/
service.rs1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use scouter_settings::ObjectStorageSettings;
5use scouter_types::sql::TraceSpan;
6use tokio::sync::mpsc;
7use tokio::time::{interval, Duration};
8use tracing::info;
9
10const BUFFER_SIZE: usize = 10_000;
11const FLUSH_INTERVAL_SECS: u64 = 5;
12
13pub struct TraceSpanService {
14 engine_tx: mpsc::Sender<TableCommand>,
15 span_tx: mpsc::Sender<Vec<TraceSpan>>,
16 shutdown_tx: mpsc::Sender<()>,
17 engine_handle: tokio::task::JoinHandle<()>,
18 buffer_handle: tokio::task::JoinHandle<()>,
19 pub query_service: TraceQueries,
20}
21
22impl TraceSpanService {
23 pub async fn new(
24 storage_settings: &ObjectStorageSettings,
25 compaction_interval_hours: u64,
26 flush_interval_secs: Option<u64>,
27 ) -> Result<Self, TraceEngineError> {
28 let engine = TraceSpanDBEngine::new(storage_settings).await?;
29 info!(
30 "TraceSpanService initialized with storage URI: {}",
31 storage_settings.storage_uri
32 );
33
34 let ctx = engine.ctx.clone();
35 let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
36 let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpan>>(100);
37 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
38
39 let buffer_handle = Self::start_buffering_actor(
40 engine_tx.clone(),
41 span_rx,
42 shutdown_rx,
43 flush_interval_secs,
44 );
45
46 Ok(TraceSpanService {
47 engine_tx,
48 span_tx,
49 shutdown_tx,
50 engine_handle,
51 buffer_handle,
52 query_service: TraceQueries::new(ctx),
53 })
54 }
55
56 fn start_buffering_actor(
57 engine_tx: mpsc::Sender<TableCommand>,
58 mut span_rx: mpsc::Receiver<Vec<TraceSpan>>,
59 mut shutdown_rx: mpsc::Receiver<()>,
60 flush_interval_secs: Option<u64>,
61 ) -> tokio::task::JoinHandle<()> {
62 tokio::spawn(async move {
63 let mut buffer = Vec::with_capacity(BUFFER_SIZE);
64 let mut flush_ticker = interval(Duration::from_secs(
65 flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
66 ));
67 flush_ticker.tick().await;
68
69 loop {
70 tokio::select! {
71 Some(spans) = span_rx.recv() => {
72
73 buffer.extend(spans);
74
75 if buffer.len() >= BUFFER_SIZE {
76 Self::flush_buffer(&engine_tx, &mut buffer).await;
77 }
78 }
79 _ = flush_ticker.tick() => {
80 if !buffer.is_empty() {
81 println!("Flushing spans buffer with {} spans", buffer.len());
82 Self::flush_buffer(&engine_tx, &mut buffer).await;
83 }
84 }
85 _ = shutdown_rx.recv() => {
86 info!("Buffer actor received shutdown signal");
87 if !buffer.is_empty() {
88 info!("Flushing final {} spans before shutdown", buffer.len());
89 Self::flush_buffer(&engine_tx, &mut buffer).await;
90 }
91 break;
92 }
93 }
94 }
95
96 info!("Buffering actor shutting down");
97 })
98 }
99
100 async fn flush_buffer(engine_tx: &mpsc::Sender<TableCommand>, buffer: &mut Vec<TraceSpan>) {
101 if buffer.is_empty() {
102 return;
103 }
104
105 let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(BUFFER_SIZE));
106 let span_count = spans_to_write.len();
107
108 info!("Sending write command to engine for {} spans", span_count);
109
110 let (tx, rx) = tokio::sync::oneshot::channel();
111
112 if let Err(e) = engine_tx
113 .send(TableCommand::Write {
114 spans: spans_to_write,
115 respond_to: tx,
116 })
117 .await
118 {
119 tracing::error!("Failed to send write command: {}", e);
120 return;
121 }
122
123 info!("Write command sent, waiting for response");
124
125 match rx.await {
126 Ok(Ok(())) => {
127 info!("Successfully flushed {} spans", span_count);
128 }
129 Ok(Err(e)) => {
130 tracing::error!("Write failed: {}", e);
131 }
132 Err(e) => {
133 tracing::error!("Failed to receive write response: {}", e);
134 }
135 }
136 }
137
138 pub async fn write_spans(&self, spans: Vec<TraceSpan>) -> Result<(), TraceEngineError> {
139 self.span_tx
140 .send(spans)
141 .await
142 .map_err(|_| TraceEngineError::ChannelClosed)?;
143 Ok(())
144 }
145
146 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
147 let (tx, rx) = tokio::sync::oneshot::channel();
148
149 self.engine_tx
150 .send(TableCommand::Optimize { respond_to: tx })
151 .await
152 .map_err(|_| TraceEngineError::ChannelClosed)?;
153
154 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
155 }
156
157 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
158 info!("TraceSpanService shutting down");
159
160 let _ = self.shutdown_tx.send(()).await;
161
162 if let Err(e) = self.buffer_handle.await {
163 tracing::error!("Buffer handle error: {}", e);
164 }
165
166 self.engine_tx
167 .send(TableCommand::Shutdown)
168 .await
169 .map_err(|_| TraceEngineError::ChannelClosed)?;
170
171 if let Err(e) = self.engine_handle.await {
172 tracing::error!("Engine handle error: {}", e);
173 }
174
175 info!("TraceSpanService shutdown complete");
176 Ok(())
177 }
178}
179#[cfg(test)]
180mod tests {
181
182 use super::*;
183 use crate::parquet::tracing::span_view::TraceSpanView;
184 use scouter_mocks::create_simple_trace;
185 use scouter_settings::ObjectStorageSettings;
186 use scouter_types::TraceId;
187 use tracing_subscriber;
188
189 fn cleanup() {
190 let _ = tracing_subscriber::fmt()
191 .with_max_level(tracing::Level::INFO)
192 .try_init();
193
194 let storage_settings = ObjectStorageSettings::default();
195 let current_dir = std::env::current_dir().unwrap();
196 let storage_path = current_dir.join(storage_settings.storage_root());
197 if storage_path.exists() {
198 std::fs::remove_dir_all(storage_path).unwrap();
199 }
200 }
201
202 #[tokio::test]
203 async fn test_service_initialization() -> Result<(), TraceEngineError> {
204 cleanup();
205
206 let storage_settings = ObjectStorageSettings::default();
207 let service = TraceSpanService::new(&storage_settings, 24, Some(2)).await?;
208 service.shutdown().await?;
209 cleanup();
210 Ok(())
211 }
212
213 #[tokio::test]
214 async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
215 cleanup();
216
217 let storage_settings = ObjectStorageSettings::default();
218 let service = TraceSpanService::new(&storage_settings, 24, Some(2)).await?;
219
220 let spans = create_simple_trace();
221 info!("Test: writing {} spans", spans.len());
222 service.write_spans(spans.clone()).await?;
223
224 info!("Test: waiting for flush");
225 tokio::time::sleep(Duration::from_secs(5)).await;
226
227 let first_span: &TraceSpan = spans.first().unwrap();
229 let trace_id_bytes = TraceId::hex_to_bytes(&first_span.trace_id)?;
231
232 info!("Test: querying spans for trace_id {:?}", trace_id_bytes);
233 let records = service
234 .query_service
235 .get_trace_spans(Some(&trace_id_bytes), None, None, None, None)
236 .await?;
237
238 let total_spans: usize = records.iter().map(|batch| batch.len()).sum();
239 println!(
240 "Queried {} spans across {} batches",
241 total_spans,
242 records.len()
243 );
244
245 assert_eq!(
246 total_spans, 3,
247 "Expected to query 3 spans but got {}",
248 total_spans
249 );
250
251 let span_views: Vec<TraceSpanView<'_>> = records
252 .iter() .flat_map(|batch| batch.iter()) .collect();
255
256 let serialized_spans = serde_json::to_string(&span_views).unwrap();
257
258 let deserialized_spans: Vec<TraceSpan> = serde_json::from_str(&serialized_spans).unwrap();
260
261 assert_eq!(
262 deserialized_spans.len(),
263 3,
264 "Expected to deserialize 3 spans but got {}",
265 deserialized_spans.len()
266 );
267
268 let last_span: &TraceSpan = spans.last().unwrap();
269
270 let end_time = last_span.end_time;
271
272 let records = service
274 .query_service
275 .get_trace_spans(None, None, None, Some(&end_time), None)
276 .await?;
277
278 let total_spans: usize = records.iter().map(|batch| batch.len()).sum();
280 assert_eq!(
281 total_spans, 3,
282 "Expected to query 3 spans with end_time filter but got {}",
283 total_spans
284 );
285
286 info!("Test: shutting down");
287 service.shutdown().await?;
288 Ok(())
290 }
291}