1use crate::error::TraceEngineError;
2use crate::parquet::tracing::traits::arrow_schema_to_delta;
3use crate::parquet::tracing::traits::attribute_field;
4use crate::parquet::tracing::traits::TraceSchemaExt;
5use crate::storage::ObjectStore;
6use arrow::array::*;
7use arrow::datatypes::*;
8use arrow_array::RecordBatch;
9use datafusion::prelude::SessionContext;
10use deltalake::operations::optimize::OptimizeType;
11use deltalake::DeltaTable;
12use scouter_settings::ObjectStorageSettings;
13use scouter_types::sql::TraceSpan;
14use scouter_types::SpanId;
15use scouter_types::TraceId;
16use scouter_types::{Attribute, SpanEvent, SpanLink};
17use serde_json::Value;
18use std::sync::Arc;
19use tokio::sync::oneshot;
20use tokio::sync::{mpsc, RwLock as AsyncRwLock};
21use tokio::time::{interval, Duration};
22use tracing::{debug, error, info, instrument};
23use url::Url;
24const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
25
26pub enum TableCommand {
27 Write {
28 spans: Vec<TraceSpan>,
29 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
30 },
31 Optimize {
32 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
33 },
34 Shutdown,
35}
36
37async fn build_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
38 let base_url = object_store.get_base_url()?; Ok(base_url)
40}
41
42#[instrument(skip_all)]
43async fn create_table(table_url: Url, schema: SchemaRef) -> Result<DeltaTable, TraceEngineError> {
44 info!("Creating new Delta table at URL: {}", table_url);
45
46 let table = DeltaTable::try_from_url(table_url).await?;
47
48 let delta_fields = arrow_schema_to_delta(&schema);
49
50 table
51 .create()
52 .with_table_name(TRACE_SPAN_TABLE_NAME)
53 .with_columns(delta_fields)
54 .await
55 .map_err(Into::into)
56}
57
58#[instrument(skip_all)]
59async fn build_or_create_table(
60 object_store: &ObjectStore,
61 schema: SchemaRef,
62) -> Result<DeltaTable, TraceEngineError> {
63 let table_url = build_url(object_store).await?;
64
65 info!("Attempting to load table at URL: {}", table_url);
66
67 if table_url.scheme() == "file" {
69 if let Ok(path) = table_url.to_file_path() {
70 if !path.exists() {
71 info!("Creating directory for local table: {:?}", path);
72 std::fs::create_dir_all(&path)?;
73 }
74 }
75 }
76
77 match DeltaTable::try_from_url(table_url.clone()).await {
78 Ok(table) => {
79 info!("Loaded existing Delta table");
80 Ok(table)
81 }
82 Err(deltalake::DeltaTableError::NotATable(_)) => {
83 info!("Table does not exist, creating new table");
84 create_table(table_url, schema).await
85 }
86 Err(e) => Err(e.into()),
87 }
88}
89pub struct TraceSpanDBEngine {
98 schema: Arc<Schema>,
99 pub object_store: ObjectStore,
100 table: Arc<AsyncRwLock<DeltaTable>>,
101 pub ctx: Arc<SessionContext>,
102}
103
104impl TraceSchemaExt for TraceSpanDBEngine {}
105
106impl TraceSpanDBEngine {
107 pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, TraceEngineError> {
108 let object_store = ObjectStore::new(storage_settings)?;
109 let schema = Arc::new(Self::create_schema());
110 let delta_table = build_or_create_table(&object_store, schema.clone()).await?;
111 let ctx = object_store.get_session()?;
112 ctx.register_table(TRACE_SPAN_TABLE_NAME, Arc::new(delta_table.clone()))?;
113
114 Ok(TraceSpanDBEngine {
115 schema,
116 object_store,
117 table: Arc::new(AsyncRwLock::new(delta_table)),
118 ctx: Arc::new(ctx),
119 })
120 }
121
122 pub fn build_batch(&self, spans: Vec<TraceSpan>) -> Result<RecordBatch, TraceEngineError> {
124 let start_time = std::time::Instant::now();
126 let mut builder = TraceSpanBatchBuilder::new(self.schema.clone());
127
128 for span in spans {
129 builder.append(&span)?;
130 }
131
132 let record_batch = builder
133 .finish()
134 .inspect_err(|e| error!("Failed to build RecordBatch: {}", e))?;
135
136 let duration = start_time.elapsed();
137 debug!(
138 "Built RecordBatch with {} rows in {:?}",
139 record_batch.num_rows(),
140 duration
141 );
142 Ok(record_batch)
143 }
144
145 async fn write_spans(&self, spans: Vec<TraceSpan>) -> Result<(), TraceEngineError> {
148 info!("Engine received write request for {} spans", spans.len());
149
150 let batch = self
151 .build_batch(spans)
152 .inspect_err(|e| error!("failed to build batch: {:?}", e))?;
153 info!("Built batch with {} rows", batch.num_rows());
154
155 let mut table_guard = self.table.write().await;
156 info!("Acquired table write lock");
157
158 if let Err(e) = table_guard.update_incremental(None).await {
160 info!("Table update skipped (table may be newly created): {}", e);
162 }
163
164 let current_table = table_guard.clone();
165
166 let updated_table = current_table
167 .write(vec![batch])
168 .with_save_mode(deltalake::protocol::SaveMode::Append)
169 .await?;
170
171 info!("Successfully wrote batch to Delta Lake");
172
173 {
175 self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
176 self.ctx
177 .register_table(TRACE_SPAN_TABLE_NAME, Arc::new(updated_table.clone()))?;
178 }
179
180 *table_guard = updated_table;
181
182 Ok(())
183 }
184
185 async fn optimize_table(&self) -> Result<(), TraceEngineError> {
186 let mut table_guard = self.table.write().await;
187
188 let current_table = table_guard.clone();
189
190 let (updated_table, _metrics) = current_table
191 .optimize()
192 .with_target_size(128 * 1024 * 1024)
193 .with_type(OptimizeType::ZOrder(vec![
194 "start_time".to_string(),
195 "service_name".to_string(),
196 ]))
197 .await?;
198
199 self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
201 self.ctx
202 .register_table(TRACE_SPAN_TABLE_NAME, Arc::new(updated_table.clone()))?;
203
204 *table_guard = updated_table;
205
206 Ok(())
207 }
208
209 #[instrument(skip_all, name = "buffering_actor")]
210 pub fn start_actor(
211 self,
212 compaction_interval_hours: u64,
213 ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
214 let (tx, mut rx) = mpsc::channel::<TableCommand>(100);
215
216 let handle = tokio::spawn(async move {
217 let mut compaction_ticker =
218 interval(Duration::from_secs(compaction_interval_hours * 3600));
219 compaction_ticker.tick().await;
220
221 loop {
222 tokio::select! {
223 Some(cmd) = rx.recv() => {
224 match cmd {
225 TableCommand::Write { spans, respond_to } => {
226 match self.write_spans(spans).await {
227 Ok(_) => {
228 let _ = respond_to.send(Ok(()));
229 }
230 Err(e) => {
231 tracing::error!("Write failed: {}", e);
232 let _ = respond_to.send(Err(e));
233 }
234 }
235 }
236 TableCommand::Optimize { respond_to } => {
237 match self.optimize_table().await {
238 Ok(_) => {
239 tracing::info!("Compaction completed");
240 let _ = respond_to.send(Ok(()));
241 }
242 Err(e) => {
243 tracing::error!("Compaction failed: {}", e);
244 let _ = respond_to.send(Err(e));
245 }
246 }
247 }
248 TableCommand::Shutdown => {
249 tracing::info!("Shutting down table engine");
250 break;
251 }
252 }
253 }
254 _ = compaction_ticker.tick() => {
255 if let Err(e) = self.optimize_table().await {
256 tracing::error!("Scheduled compaction failed: {}", e);
257 } else {
258 tracing::info!("Scheduled compaction completed");
259 }
260 }
261 }
262 }
263 });
264
265 (tx, handle)
266 }
267}
268
269pub struct TraceSpanBatchBuilder {
276 schema: SchemaRef,
277
278 trace_id: FixedSizeBinaryBuilder,
280 span_id: FixedSizeBinaryBuilder,
281 parent_span_id: FixedSizeBinaryBuilder,
282 root_span_id: FixedSizeBinaryBuilder,
283
284 service_name: StringDictionaryBuilder<Int32Type>,
286 span_name: StringBuilder,
287 span_kind: StringDictionaryBuilder<Int8Type>,
288
289 start_time: TimestampMicrosecondBuilder,
291 end_time: TimestampMicrosecondBuilder,
292 duration_ms: Int64Builder,
293
294 status_code: Int32Builder,
296 status_message: StringBuilder,
297
298 depth: Int32Builder,
300 span_order: Int32Builder,
301 path: ListBuilder<StringBuilder>,
302
303 attributes: MapBuilder<StringBuilder, StringViewBuilder>,
305
306 events: ListBuilder<StructBuilder>,
308 links: ListBuilder<StructBuilder>,
309
310 input: StringViewBuilder,
312 output: StringViewBuilder,
313
314 search_blob: StringViewBuilder,
316}
317
318impl TraceSpanBatchBuilder {
319 pub fn new(schema: SchemaRef) -> Self {
320 let trace_id = FixedSizeBinaryBuilder::new(16);
322 let span_id = FixedSizeBinaryBuilder::new(8);
323 let parent_span_id = FixedSizeBinaryBuilder::new(8);
324 let root_span_id = FixedSizeBinaryBuilder::new(8);
325
326 let service_name = StringDictionaryBuilder::<Int32Type>::new();
327 let span_name = StringBuilder::new();
328 let span_kind = StringDictionaryBuilder::<Int8Type>::new();
329
330 let start_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
331 let end_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
332 let duration_ms = Int64Builder::new();
333
334 let status_code = Int32Builder::new();
335 let status_message = StringBuilder::new();
336
337 let depth = Int32Builder::new();
338 let span_order = Int32Builder::new();
339 let path = ListBuilder::new(StringBuilder::new());
340
341 let map_field_name = MapFieldNames {
342 entry: "key_value".to_string(),
343 key: "key".to_string(),
344 value: "value".to_string(),
345 };
346 let attributes = MapBuilder::new(
347 Some(map_field_name.clone()),
348 StringBuilder::new(),
349 StringViewBuilder::new(),
350 );
351
352 let event_fields = vec![
354 Field::new("name", DataType::Utf8, false),
355 Field::new(
356 "timestamp",
357 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
358 false,
359 ),
360 attribute_field(),
361 Field::new("dropped_attributes_count", DataType::UInt32, false),
362 ];
363
364 let event_struct_builders = vec![
365 Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
366 Box::new(TimestampMicrosecondBuilder::new().with_timezone("UTC"))
367 as Box<dyn ArrayBuilder>,
368 Box::new(MapBuilder::new(
369 Some(map_field_name.clone()),
370 StringBuilder::new(),
371 StringViewBuilder::new(),
372 )) as Box<dyn ArrayBuilder>,
373 Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
374 ];
375
376 let event_struct_builder = StructBuilder::new(event_fields, event_struct_builders);
377 let events = ListBuilder::new(event_struct_builder);
378
379 let link_fields = vec![
381 Field::new("trace_id", DataType::FixedSizeBinary(16), false),
382 Field::new("span_id", DataType::FixedSizeBinary(8), false),
383 Field::new("trace_state", DataType::Utf8, false),
384 attribute_field(),
385 Field::new("dropped_attributes_count", DataType::UInt32, false),
386 ];
387
388 let link_struct_builders = vec![
389 Box::new(FixedSizeBinaryBuilder::new(16)) as Box<dyn ArrayBuilder>,
390 Box::new(FixedSizeBinaryBuilder::new(8)) as Box<dyn ArrayBuilder>,
391 Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
392 Box::new(MapBuilder::new(
393 Some(map_field_name.clone()),
394 StringBuilder::new(),
395 StringViewBuilder::new(),
396 )) as Box<dyn ArrayBuilder>,
397 Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
398 ];
399
400 let link_struct_builder = StructBuilder::new(link_fields, link_struct_builders);
401 let links = ListBuilder::new(link_struct_builder);
402
403 let input = StringViewBuilder::new();
404 let output = StringViewBuilder::new();
405 let search_blob = StringViewBuilder::new();
406
407 Self {
408 schema,
409 trace_id,
410 span_id,
411 parent_span_id,
412 root_span_id,
413 service_name,
414 span_name,
415 span_kind,
416 start_time,
417 end_time,
418 duration_ms,
419 status_code,
420 status_message,
421 depth,
422 span_order,
423 path,
424 attributes,
425 events,
426 links,
427 input,
428 output,
429 search_blob,
430 }
431 }
432
433 pub fn append(&mut self, span: &TraceSpan) -> Result<(), TraceEngineError> {
435 Self::append_id_as_bytes(&span.trace_id, &mut self.trace_id, 16).inspect_err(|e| {
437 error!("Failed to append trace_id for span {}: {}", span.span_id, e);
438 })?;
439 Self::append_id_as_bytes(&span.span_id, &mut self.span_id, 8).inspect_err(|e| {
440 error!("Failed to append span_id for span {}: {}", span.span_id, e);
441 })?;
442 Self::append_id_as_bytes(&span.root_span_id, &mut self.root_span_id, 8).inspect_err(
443 |e| {
444 error!(
445 "Failed to append root_span_id for span {}: {}",
446 span.span_id, e
447 );
448 },
449 )?;
450
451 match &span.parent_span_id {
452 Some(pid) => Self::append_id_as_bytes(pid, &mut self.parent_span_id, 8)?,
453 None => self.parent_span_id.append_null(),
454 }
455
456 self.service_name.append_value(&span.service_name);
458 self.span_name.append_value(&span.span_name);
459
460 match &span.span_kind {
461 Some(kind) => self.span_kind.append_value(kind),
462 None => self.span_kind.append_null(),
463 }
464
465 self.start_time
467 .append_value(span.start_time.timestamp_micros());
468
469 self.end_time.append_value(span.end_time.timestamp_micros());
470
471 self.duration_ms.append_value(span.duration_ms);
472
473 self.status_code.append_value(span.status_code);
475 match &span.status_message {
476 Some(msg) => self.status_message.append_value(msg),
477 None => self.status_message.append_null(),
478 }
479
480 self.depth.append_value(span.depth);
482 self.span_order.append_value(span.span_order);
483
484 for path_segment in &span.path {
486 self.path.values().append_value(path_segment);
487 }
488 self.path.append(true);
489
490 self.append_attributes(&span.attributes).inspect_err(|e| {
492 error!(
493 "Failed to append attributes for span {}: {}",
494 span.span_id, e
495 );
496 })?;
497
498 self.append_events(&span.events).inspect_err(|e| {
500 error!("Failed to append events for span {}: {}", span.span_id, e);
501 })?;
502
503 self.append_links(&span.links).inspect_err(|e| {
505 error!("Failed to append links for span {}: {}", span.span_id, e);
506 })?;
507
508 match &span.input {
510 Some(v) => self.input.append_value(v.to_string()),
511 None => self.input.append_null(),
512 }
513
514 match &span.output {
515 Some(v) => self.output.append_value(v.to_string()),
516 None => self.output.append_null(),
517 }
518
519 let search_text = self.build_search_blob(span);
521 self.search_blob.append_value(search_text);
522
523 Ok(())
524 }
525
526 fn append_id_as_bytes(
528 hex_str: &str,
529 builder: &mut FixedSizeBinaryBuilder,
530 expected_size: usize,
531 ) -> Result<(), TraceEngineError> {
532 match expected_size {
533 16 => {
534 let bytes = TraceId::hex_to_bytes(hex_str)?;
535 builder.append_value(&bytes)?;
536 }
537 8 => {
538 let bytes = SpanId::hex_to_bytes(hex_str)?;
539 builder.append_value(&bytes)?;
540 }
541 _ => {
542 return Err(TraceEngineError::InvalidHexId(
543 hex_str.to_string(),
544 "Unsupported ID size".to_string(),
545 ))
546 }
547 }
548 Ok(())
549 }
550
551 fn append_attributes(&mut self, attributes: &[Attribute]) -> Result<(), TraceEngineError> {
553 for attr in attributes {
554 self.attributes.keys().append_value(&attr.key);
555
556 let value_str = match &attr.value {
558 Value::String(s) => s.clone(),
559 Value::Null => String::new(),
560 other => other.to_string(),
561 };
562
563 self.attributes.values().append_value(value_str);
564 }
565 self.attributes.append(true)?;
566 Ok(())
567 }
568
569 fn append_events(&mut self, events: &[SpanEvent]) -> Result<(), TraceEngineError> {
571 let event_struct = self.events.values();
572 for event in events {
573 let name_builder = event_struct
575 .field_builder::<StringBuilder>(0)
576 .ok_or_else(|| TraceEngineError::DowncastError("event name builder"))?;
577 name_builder.append_value(&event.name);
578
579 let time_builder = event_struct
581 .field_builder::<TimestampMicrosecondBuilder>(1)
582 .ok_or_else(|| TraceEngineError::DowncastError("event timestamp builder"))?;
583 time_builder.append_value(event.timestamp.timestamp_micros());
584
585 let attr_builder = event_struct
587 .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(2)
588 .ok_or_else(|| TraceEngineError::DowncastError("event attributes builder"))?;
589
590 for attr in &event.attributes {
591 attr_builder.keys().append_value(&attr.key);
592 let value_str = match &attr.value {
593 Value::String(s) => s.clone(),
594 Value::Null => String::new(),
595 other => other.to_string(),
596 };
597 attr_builder.values().append_value(value_str);
598 }
599 attr_builder.append(true)?;
600
601 let dropped_builder =
603 event_struct
604 .field_builder::<UInt32Builder>(3)
605 .ok_or_else(|| {
606 TraceEngineError::DowncastError("dropped attributes count builder")
607 })?;
608 dropped_builder.append_value(event.dropped_attributes_count);
609
610 event_struct.append(true);
611 }
612
613 self.events.append(true);
614 Ok(())
615 }
616
617 fn append_links(&mut self, links: &[SpanLink]) -> Result<(), TraceEngineError> {
619 let link_struct = self.links.values();
620
621 for link in links {
622 let trace_builder = link_struct
624 .field_builder::<FixedSizeBinaryBuilder>(0)
625 .ok_or_else(|| TraceEngineError::DowncastError("link trace_id builder"))?;
626
627 let trace_bytes = TraceId::hex_to_bytes(&link.trace_id).map_err(|e| {
628 TraceEngineError::InvalidHexId(link.trace_id.clone(), e.to_string())
629 })?;
630 trace_builder.append_value(&trace_bytes)?;
631
632 let span_builder = link_struct
634 .field_builder::<FixedSizeBinaryBuilder>(1)
635 .ok_or_else(|| TraceEngineError::DowncastError("link span_id builder"))?;
636
637 let span_bytes = SpanId::hex_to_bytes(&link.span_id)
638 .map_err(|e| TraceEngineError::InvalidHexId(link.span_id.clone(), e.to_string()))?;
639 span_builder.append_value(&span_bytes)?;
640
641 let state_builder = link_struct
643 .field_builder::<StringBuilder>(2)
644 .ok_or_else(|| TraceEngineError::DowncastError("link trace_state builder"))?;
645 state_builder.append_value(&link.trace_state);
646
647 let attr_builder = link_struct
649 .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(3)
650 .ok_or_else(|| TraceEngineError::DowncastError("link attributes builder"))?;
651
652 for attr in &link.attributes {
653 attr_builder.keys().append_value(&attr.key);
654 let value_str = match &attr.value {
655 Value::String(s) => s.clone(),
656 Value::Null => String::new(),
657 other => other.to_string(),
658 };
659 attr_builder.values().append_value(value_str);
660 }
661 attr_builder.append(true)?;
662
663 let dropped_builder =
665 link_struct
666 .field_builder::<UInt32Builder>(4)
667 .ok_or_else(|| {
668 TraceEngineError::DowncastError("link dropped attributes count builder")
669 })?;
670 dropped_builder.append_value(link.dropped_attributes_count);
671
672 link_struct.append(true);
673 }
674
675 self.links.append(true);
676 Ok(())
677 }
678
679 fn build_search_blob(&self, span: &TraceSpan) -> String {
683 let mut search = String::with_capacity(512);
684
685 search.push_str(&span.service_name);
687 search.push(' ');
688 search.push_str(&span.span_name);
689 search.push(' ');
690
691 if let Some(msg) = &span.status_message {
693 search.push_str(msg);
694 search.push(' ');
695 }
696
697 for attr in &span.attributes {
699 search.push_str(&attr.key);
700 search.push(':');
701
702 let value_str = match &attr.value {
703 Value::String(s) => s.as_str(),
704 Value::Number(n) => {
705 search.push_str(&n.to_string());
706 continue;
707 }
708 Value::Bool(b) => {
709 search.push_str(&b.to_string());
710 continue;
711 }
712 Value::Null => continue,
713 _ => {
714 search.push_str(&attr.value.to_string());
715 continue;
716 }
717 };
718
719 search.push_str(value_str);
720 search.push(' ');
721 }
722
723 for event in &span.events {
725 search.push_str(&event.name);
726 search.push(' ');
727 }
728
729 search
730 }
731
732 pub fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
734 let batch = RecordBatch::try_new(
735 self.schema.clone(),
736 vec![
737 Arc::new(self.trace_id.finish()),
738 Arc::new(self.span_id.finish()),
739 Arc::new(self.parent_span_id.finish()),
740 Arc::new(self.root_span_id.finish()),
741 Arc::new(self.service_name.finish()),
742 Arc::new(self.span_name.finish()),
743 Arc::new(self.span_kind.finish()),
744 Arc::new(self.start_time.finish()),
745 Arc::new(self.end_time.finish()),
746 Arc::new(self.duration_ms.finish()),
747 Arc::new(self.status_code.finish()),
748 Arc::new(self.status_message.finish()),
749 Arc::new(self.depth.finish()),
750 Arc::new(self.span_order.finish()),
751 Arc::new(self.path.finish()),
752 Arc::new(self.attributes.finish()),
753 Arc::new(self.events.finish()),
754 Arc::new(self.links.finish()),
755 Arc::new(self.input.finish()),
756 Arc::new(self.output.finish()),
757 Arc::new(self.search_blob.finish()),
758 ],
759 )?;
760
761 Ok(batch)
762 }
763}