1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::catalog::TraceCatalogProvider;
4use crate::parquet::tracing::traits::arrow_schema_to_delta;
5use crate::parquet::tracing::traits::attribute_field;
6use crate::parquet::tracing::traits::TraceSchemaExt;
7use crate::parquet::utils::{create_attr_match_udf, register_cloud_logstore_factories};
8use crate::storage::ObjectStore;
9use arrow::array::*;
10use arrow::datatypes::*;
11use arrow_array::RecordBatch;
12use chrono::{Datelike, Utc};
13use datafusion::catalog::CatalogProvider;
14use datafusion::prelude::SessionContext;
15use deltalake::datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
16use deltalake::datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
17use deltalake::datafusion::parquet::schema::types::ColumnPath;
18use deltalake::operations::optimize::OptimizeType;
19use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
20use scouter_settings::ObjectStorageSettings;
21use scouter_types::SpanId;
22use scouter_types::TraceId;
23use scouter_types::TraceSpanRecord;
24use scouter_types::{Attribute, SpanEvent, SpanLink};
25use serde_json::Value;
26use std::sync::Arc;
27use tokio::sync::oneshot;
28use tokio::sync::{mpsc, RwLock as AsyncRwLock};
29use tokio::time::{interval, Duration};
30use tracing::{debug, error, info, instrument};
31use url::Url;
32
33const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
34
35const TASK_OPTIMIZE: &str = "trace_optimize";
37const TASK_RETENTION: &str = "trace_retention";
38
39const UNIX_EPOCH_DAYS: i32 = 719_163;
42
43pub enum TableCommand {
44 Write {
45 spans: Vec<TraceSpanRecord>,
46 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
47 },
48 Optimize {
49 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
50 },
51 Vacuum {
52 retention_hours: u64,
53 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
54 },
55 Expire {
56 cutoff_date: chrono::NaiveDate,
57 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
58 },
59 Shutdown,
60}
61
62async fn build_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
63 let mut base = object_store.get_base_url()?;
64 let mut path = base.path().to_string();
65 if !path.ends_with('/') {
66 path.push('/');
67 }
68 path.push_str(TRACE_SPAN_TABLE_NAME);
69 base.set_path(&path);
70 Ok(base)
71}
72
73#[instrument(skip_all)]
74async fn create_table(
75 object_store: &ObjectStore,
76 table_url: Url,
77 schema: SchemaRef,
78) -> Result<DeltaTable, TraceEngineError> {
79 info!(
80 "Creating trace span table [{}://.../{} ]",
81 table_url.scheme(),
82 table_url
83 .path_segments()
84 .and_then(|mut s| s.next_back())
85 .unwrap_or(TRACE_SPAN_TABLE_NAME)
86 );
87
88 let store = object_store.as_dyn_object_store();
89 let table = DeltaTableBuilder::from_url(table_url.clone())?
90 .with_storage_backend(store, table_url)
91 .build()?;
92
93 let delta_fields = arrow_schema_to_delta(&schema);
94
95 table
96 .create()
97 .with_table_name(TRACE_SPAN_TABLE_NAME)
98 .with_columns(delta_fields)
99 .with_partition_columns(vec!["partition_date".to_string()])
100 .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
101 .with_configuration_property(
103 TableProperty::DataSkippingStatsColumns,
104 Some("start_time,end_time,service_name,duration_ms,status_code,partition_date"),
105 )
106 .await
107 .map_err(Into::into)
108}
109
110#[instrument(skip_all)]
111async fn build_or_create_table(
112 object_store: &ObjectStore,
113 schema: SchemaRef,
114) -> Result<DeltaTable, TraceEngineError> {
115 register_cloud_logstore_factories();
116 let table_url = build_url(object_store).await?;
117 info!(
118 "Attempting to load trace span table [{}://.../{} ]",
119 table_url.scheme(),
120 table_url
121 .path_segments()
122 .and_then(|mut s| s.next_back())
123 .unwrap_or(TRACE_SPAN_TABLE_NAME)
124 );
125
126 let is_delta_table = if table_url.scheme() == "file" {
130 if let Ok(path) = table_url.to_file_path() {
131 if !path.exists() {
132 info!("Creating directory for local table: {:?}", path);
133 std::fs::create_dir_all(&path)?;
134 }
135 path.join("_delta_log").exists()
136 } else {
137 false
138 }
139 } else {
140 let store = object_store.as_dyn_object_store();
141 match DeltaTableBuilder::from_url(table_url.clone()) {
142 Ok(builder) => builder
143 .with_storage_backend(store, table_url.clone())
144 .load()
145 .await
146 .is_ok(),
147 Err(_) => false,
148 }
149 };
150
151 if is_delta_table {
152 info!(
153 "Loaded existing trace span table [{}://.../{} ]",
154 table_url.scheme(),
155 table_url
156 .path_segments()
157 .and_then(|mut s| s.next_back())
158 .unwrap_or(TRACE_SPAN_TABLE_NAME)
159 );
160 let store = object_store.as_dyn_object_store();
161 let table = DeltaTableBuilder::from_url(table_url.clone())?
162 .with_storage_backend(store, table_url)
163 .load()
164 .await?;
165 Ok(table)
166 } else {
167 info!("Table does not exist, creating new table");
168 create_table(object_store, table_url, schema).await
169 }
170}
171
172pub const TRACE_CATALOG_NAME: &str = "scouter_tracing";
179const TRACE_DEFAULT_SCHEMA: &str = "default";
180
181pub struct TraceSpanDBEngine {
187 schema: Arc<Schema>,
188 pub object_store: ObjectStore,
189 table: Arc<AsyncRwLock<DeltaTable>>,
190 ctx: Arc<SessionContext>,
191 pub catalog: Arc<TraceCatalogProvider>,
194 control: ControlTableEngine,
195}
196
197impl TraceSchemaExt for TraceSpanDBEngine {}
198
199impl TraceSpanDBEngine {
200 pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, TraceEngineError> {
201 let object_store = ObjectStore::new(storage_settings)?;
202 let schema = Arc::new(Self::create_schema());
203 let delta_table = build_or_create_table(&object_store, schema.clone()).await?;
204
205 let ctx =
209 object_store.get_session_with_catalog(TRACE_CATALOG_NAME, TRACE_DEFAULT_SCHEMA)?;
210
211 let catalog = Arc::new(TraceCatalogProvider::new());
212 ctx.register_catalog(
213 TRACE_CATALOG_NAME,
214 Arc::clone(&catalog) as Arc<dyn CatalogProvider>,
215 );
216
217 ctx.register_udf(create_attr_match_udf());
220
221 if let Ok(provider) = delta_table.table_provider().await {
224 catalog.swap(TRACE_SPAN_TABLE_NAME, provider);
225 } else {
226 info!("Empty table at init — deferring catalog registration until first write");
227 }
228 let control = ControlTableEngine::new(&object_store, get_pod_id()).await?;
229
230 Ok(TraceSpanDBEngine {
231 schema,
232 object_store,
233 table: Arc::new(AsyncRwLock::new(delta_table)),
234 ctx: Arc::new(ctx),
235 catalog,
236 control,
237 })
238 }
239
240 pub fn ctx(&self) -> Arc<SessionContext> {
246 Arc::clone(&self.ctx)
247 }
248
249 pub fn build_batch(
251 &self,
252 spans: Vec<TraceSpanRecord>,
253 ) -> Result<RecordBatch, TraceEngineError> {
254 let start_time = std::time::Instant::now();
255 let mut builder = TraceSpanBatchBuilder::new(self.schema.clone());
256
257 for span in spans {
258 builder.append(&span)?;
259 }
260
261 let record_batch = builder
262 .finish()
263 .inspect_err(|e| error!("Failed to build RecordBatch: {}", e))?;
264
265 let duration = start_time.elapsed();
266 debug!(
267 "Built RecordBatch with {} rows in {:?}",
268 record_batch.num_rows(),
269 duration
270 );
271 Ok(record_batch)
272 }
273
274 fn build_writer_props() -> WriterProperties {
276 WriterProperties::builder()
277 .set_max_row_group_size(32_768)
280 .set_column_bloom_filter_enabled(ColumnPath::new(vec!["trace_id".to_string()]), true)
282 .set_column_bloom_filter_fpp(ColumnPath::new(vec!["trace_id".to_string()]), 0.01)
283 .set_column_bloom_filter_ndv(ColumnPath::new(vec!["trace_id".to_string()]), 32_768)
284 .set_column_bloom_filter_enabled(
286 ColumnPath::new(vec!["service_name".to_string()]),
287 true,
288 )
289 .set_column_bloom_filter_fpp(ColumnPath::new(vec!["service_name".to_string()]), 0.01)
290 .set_column_bloom_filter_ndv(ColumnPath::new(vec!["service_name".to_string()]), 256)
291 .set_column_bloom_filter_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
293 .set_column_bloom_filter_fpp(ColumnPath::new(vec!["span_name".to_string()]), 0.01)
294 .set_column_bloom_filter_ndv(ColumnPath::new(vec!["span_name".to_string()]), 32_768)
295 .set_column_statistics_enabled(
297 ColumnPath::new(vec!["start_time".to_string()]),
298 EnabledStatistics::Page,
299 )
300 .set_column_statistics_enabled(
303 ColumnPath::new(vec!["status_code".to_string()]),
304 EnabledStatistics::Page,
305 )
306 .set_column_encoding(
309 ColumnPath::new(vec!["start_time".to_string()]),
310 Encoding::DELTA_BINARY_PACKED,
311 )
312 .set_column_encoding(
313 ColumnPath::new(vec!["duration_ms".to_string()]),
314 Encoding::DELTA_BINARY_PACKED,
315 )
316 .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
319 .set_column_dictionary_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
321 .build()
322 }
323
324 async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
326 info!("Engine received write request for {} spans", spans.len());
327
328 let batch = self
329 .build_batch(spans)
330 .inspect_err(|e| error!("failed to build batch: {:?}", e))?;
331 info!("Built batch with {} rows", batch.num_rows());
332
333 let mut table_guard = self.table.write().await;
334 info!("Acquired table write lock");
335
336 let current_table = table_guard.clone();
347
348 let updated_table = current_table
349 .write(vec![batch])
350 .with_save_mode(deltalake::protocol::SaveMode::Append)
351 .with_writer_properties(Self::build_writer_props())
352 .with_partition_columns(vec!["partition_date".to_string()])
355 .await?;
356
357 info!("Successfully wrote batch to Delta Lake");
358
359 let new_provider = updated_table.table_provider().await?;
360 self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
362 updated_table.update_datafusion_session(&self.ctx.state())?;
365
366 *table_guard = updated_table;
367
368 Ok(())
369 }
370
371 async fn optimize_table(&self) -> Result<(), TraceEngineError> {
372 let mut table_guard = self.table.write().await;
373
374 let current_table = table_guard.clone();
375
376 let (updated_table, _metrics) = current_table
377 .optimize()
378 .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
379 .with_type(OptimizeType::ZOrder(vec![
380 "start_time".to_string(),
381 "service_name".to_string(),
382 ]))
383 .with_writer_properties(Self::build_writer_props())
387 .await?;
388
389 self.catalog
390 .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
391 updated_table.update_datafusion_session(&self.ctx.state())?;
392
393 *table_guard = updated_table;
394
395 Ok(())
396 }
397
398 async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
399 let mut table_guard = self.table.write().await;
400
401 let (updated_table, _metrics) = table_guard
402 .clone()
403 .vacuum()
404 .with_retention_period(chrono::Duration::hours(retention_hours as i64))
405 .with_enforce_retention_duration(false)
406 .await?;
407
408 self.catalog
409 .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
410 updated_table.update_datafusion_session(&self.ctx.state())?;
411
412 *table_guard = updated_table;
413
414 Ok(())
415 }
416
417 async fn expire_table(&self, cutoff_date: chrono::NaiveDate) -> Result<(), TraceEngineError> {
422 let mut table_guard = self.table.write().await;
423
424 let predicate = format!(
427 "partition_date < CAST('{}' AS DATE)",
428 cutoff_date.format("%Y-%m-%d")
429 );
430
431 let (updated_table, metrics) = table_guard
432 .clone()
433 .delete()
434 .with_predicate(predicate)
435 .await?;
436
437 info!(
438 "Expired {} rows older than {}",
439 metrics.num_deleted_rows, cutoff_date
440 );
441
442 self.catalog
443 .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
444 updated_table.update_datafusion_session(&self.ctx.state())?;
445
446 *table_guard = updated_table;
447
448 Ok(())
449 }
450
451 async fn try_run_optimize(&self, interval_hours: u64) {
455 match self.control.try_claim_task(TASK_OPTIMIZE).await {
456 Ok(true) => match self.optimize_table().await {
457 Ok(()) => {
458 if let Err(e) = self.vacuum_table(0).await {
462 error!("Post-optimize vacuum failed: {}", e);
463 }
464 let _ = self
465 .control
466 .release_task(
467 TASK_OPTIMIZE,
468 chrono::Duration::hours(interval_hours as i64),
469 )
470 .await;
471 }
472 Err(e) => {
473 error!("Optimize failed: {}", e);
474 let _ = self.control.release_task_on_failure(TASK_OPTIMIZE).await;
475 }
476 },
477 Ok(false) => { }
478 Err(e) => error!("Optimize claim check failed: {}", e),
479 }
480 }
481
482 async fn try_run_retention(&self, retention_days: u32) {
484 match self.control.try_claim_task(TASK_RETENTION).await {
485 Ok(true) => {
486 let cutoff =
487 (Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
488 match self.expire_table(cutoff).await {
489 Ok(()) => {
490 let _ = self.vacuum_table(0).await;
492 let _ = self
493 .control
494 .release_task(TASK_RETENTION, chrono::Duration::hours(24))
495 .await;
496 }
497 Err(e) => {
498 error!("Retention failed: {}", e);
499 let _ = self.control.release_task_on_failure(TASK_RETENTION).await;
500 }
501 }
502 }
503 Ok(false) => {}
504 Err(e) => error!("Retention claim check failed: {}", e),
505 }
506 }
507
508 async fn refresh_table(&self) -> Result<(), TraceEngineError> {
514 let mut table_guard = self.table.write().await;
515 let current_version = table_guard.version();
516
517 let mut refreshed = table_guard.clone();
520 match refreshed.update_incremental(None).await {
521 Ok(_) => {
522 if refreshed.version() > current_version {
523 info!(
524 "Span table refreshed: v{:?} → v{:?}",
525 current_version,
526 refreshed.version()
527 );
528 let new_provider = refreshed.table_provider().await?;
529 self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
531 refreshed.update_datafusion_session(&self.ctx.state())?;
532 *table_guard = refreshed;
533 }
534 }
535 Err(e) => {
536 debug!("Table refresh skipped: {}", e);
539 }
540 }
541 Ok(())
542 }
543
544 #[instrument(skip_all, name = "trace_engine_actor")]
545 pub fn start_actor(
546 self,
547 compaction_interval_hours: u64,
548 retention_days: Option<u32>,
549 refresh_interval_secs: u64,
550 ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
551 let (tx, mut rx) = mpsc::channel::<TableCommand>(100);
552
553 let handle = tokio::spawn(async move {
554 info!(refresh_interval_secs, "TraceSpanDBEngine actor started");
555
556 let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
559 scheduler_ticker.tick().await; let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs.max(1)));
566 refresh_ticker.tick().await;
567
568 loop {
569 tokio::select! {
570 Some(cmd) = rx.recv() => {
571 match cmd {
572 TableCommand::Write { spans, respond_to } => {
573 match self.write_spans(spans).await {
574 Ok(_) => { let _ = respond_to.send(Ok(())); }
575 Err(e) => {
576 tracing::error!("Write failed: {}", e);
577 let _ = respond_to.send(Err(e));
578 }
579 }
580 }
581 TableCommand::Optimize { respond_to } => {
582 let _ = respond_to.send(self.optimize_table().await);
585 if let Err(e) = self.vacuum_table(0).await {
586 error!("Post-optimize vacuum failed: {}", e);
587 }
588 }
589 TableCommand::Vacuum { retention_hours, respond_to } => {
590 let _ = respond_to.send(self.vacuum_table(retention_hours).await);
591 }
592 TableCommand::Expire { cutoff_date, respond_to } => {
593 let _ = respond_to.send(self.expire_table(cutoff_date).await);
594 }
595 TableCommand::Shutdown => {
596 tracing::info!("Shutting down table engine");
597 break;
598 }
599 }
600 }
601 _ = scheduler_ticker.tick() => {
602 self.try_run_optimize(compaction_interval_hours).await;
603 if let Some(days) = retention_days {
604 self.try_run_retention(days).await;
605 }
606 }
607 _ = refresh_ticker.tick() => {
608 if let Err(e) = self.refresh_table().await {
609 error!("Table refresh failed: {}", e);
610 }
611 }
612 }
613 }
614 });
615
616 (tx, handle)
617 }
618}
619
620pub struct TraceSpanBatchBuilder {
625 schema: SchemaRef,
626
627 trace_id: FixedSizeBinaryBuilder,
629 span_id: FixedSizeBinaryBuilder,
630 parent_span_id: FixedSizeBinaryBuilder,
631
632 flags: Int32Builder,
634 trace_state: StringBuilder,
635
636 scope_name: StringBuilder,
638 scope_version: StringBuilder,
639
640 service_name: StringDictionaryBuilder<Int32Type>,
642 span_name: StringBuilder,
643 span_kind: StringDictionaryBuilder<Int8Type>,
644
645 start_time: TimestampMicrosecondBuilder,
647 end_time: TimestampMicrosecondBuilder,
648 duration_ms: Int64Builder,
649
650 status_code: Int32Builder,
652 status_message: StringBuilder,
653
654 label: StringBuilder,
656
657 attributes: MapBuilder<StringBuilder, StringViewBuilder>,
659 resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
660
661 events: ListBuilder<StructBuilder>,
663 links: ListBuilder<StructBuilder>,
664
665 input: StringViewBuilder,
667 output: StringViewBuilder,
668
669 search_blob: StringViewBuilder,
671
672 partition_date: Date32Builder,
674}
675
676impl TraceSpanBatchBuilder {
677 pub fn new(schema: SchemaRef) -> Self {
678 let trace_id = FixedSizeBinaryBuilder::new(16);
679 let span_id = FixedSizeBinaryBuilder::new(8);
680 let parent_span_id = FixedSizeBinaryBuilder::new(8);
681
682 let flags = Int32Builder::new();
683 let trace_state = StringBuilder::new();
684
685 let scope_name = StringBuilder::new();
686 let scope_version = StringBuilder::new();
687
688 let service_name = StringDictionaryBuilder::<Int32Type>::new();
689 let span_name = StringBuilder::new();
690 let span_kind = StringDictionaryBuilder::<Int8Type>::new();
691
692 let start_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
693 let end_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
694 let duration_ms = Int64Builder::new();
695
696 let status_code = Int32Builder::new();
697 let status_message = StringBuilder::new();
698
699 let label = StringBuilder::new();
700
701 let map_field_name = MapFieldNames {
702 entry: "key_value".to_string(),
703 key: "key".to_string(),
704 value: "value".to_string(),
705 };
706 let attributes = MapBuilder::new(
707 Some(map_field_name.clone()),
708 StringBuilder::new(),
709 StringViewBuilder::new(),
710 );
711 let resource_attributes = MapBuilder::new(
712 Some(map_field_name.clone()),
713 StringBuilder::new(),
714 StringViewBuilder::new(),
715 );
716
717 let event_fields = vec![
718 Field::new("name", DataType::Utf8, false),
719 Field::new(
720 "timestamp",
721 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
722 false,
723 ),
724 attribute_field(),
725 Field::new("dropped_attributes_count", DataType::UInt32, false),
726 ];
727
728 let event_struct_builders = vec![
729 Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
730 Box::new(TimestampMicrosecondBuilder::new().with_timezone("UTC"))
731 as Box<dyn ArrayBuilder>,
732 Box::new(MapBuilder::new(
733 Some(map_field_name.clone()),
734 StringBuilder::new(),
735 StringViewBuilder::new(),
736 )) as Box<dyn ArrayBuilder>,
737 Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
738 ];
739
740 let event_struct_builder = StructBuilder::new(event_fields, event_struct_builders);
741 let events = ListBuilder::new(event_struct_builder);
742
743 let link_fields = vec![
744 Field::new("trace_id", DataType::FixedSizeBinary(16), false),
745 Field::new("span_id", DataType::FixedSizeBinary(8), false),
746 Field::new("trace_state", DataType::Utf8, false),
747 attribute_field(),
748 Field::new("dropped_attributes_count", DataType::UInt32, false),
749 ];
750
751 let link_struct_builders = vec![
752 Box::new(FixedSizeBinaryBuilder::new(16)) as Box<dyn ArrayBuilder>,
753 Box::new(FixedSizeBinaryBuilder::new(8)) as Box<dyn ArrayBuilder>,
754 Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
755 Box::new(MapBuilder::new(
756 Some(map_field_name.clone()),
757 StringBuilder::new(),
758 StringViewBuilder::new(),
759 )) as Box<dyn ArrayBuilder>,
760 Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
761 ];
762
763 let link_struct_builder = StructBuilder::new(link_fields, link_struct_builders);
764 let links = ListBuilder::new(link_struct_builder);
765
766 let input = StringViewBuilder::new();
767 let output = StringViewBuilder::new();
768 let search_blob = StringViewBuilder::new();
769 let partition_date = Date32Builder::new();
770
771 Self {
772 schema,
773 trace_id,
774 span_id,
775 parent_span_id,
776 flags,
777 trace_state,
778 scope_name,
779 scope_version,
780 service_name,
781 span_name,
782 span_kind,
783 start_time,
784 end_time,
785 duration_ms,
786 status_code,
787 status_message,
788 label,
789 attributes,
790 resource_attributes,
791 events,
792 links,
793 input,
794 output,
795 search_blob,
796 partition_date,
797 }
798 }
799
800 pub fn append(&mut self, span: &TraceSpanRecord) -> Result<(), TraceEngineError> {
802 let trace_bytes = span.trace_id.as_bytes();
804 self.trace_id
805 .append_value(trace_bytes)
806 .map_err(TraceEngineError::ArrowError)?;
807
808 let span_bytes = span.span_id.as_bytes();
809 self.span_id
810 .append_value(span_bytes)
811 .map_err(TraceEngineError::ArrowError)?;
812
813 match &span.parent_span_id {
814 Some(pid) => {
815 self.parent_span_id
816 .append_value(pid.as_bytes())
817 .map_err(TraceEngineError::ArrowError)?;
818 }
819 None => self.parent_span_id.append_null(),
820 }
821
822 self.flags.append_value(span.flags);
824 self.trace_state.append_value(&span.trace_state);
825
826 self.scope_name.append_value(&span.scope_name);
828 match &span.scope_version {
829 Some(v) => self.scope_version.append_value(v),
830 None => self.scope_version.append_null(),
831 }
832
833 self.service_name.append_value(&span.service_name);
835 self.span_name.append_value(&span.span_name);
836 if span.span_kind.is_empty() {
838 self.span_kind.append_null();
839 } else {
840 self.span_kind.append_value(&span.span_kind);
841 }
842
843 self.start_time
845 .append_value(span.start_time.timestamp_micros());
846 self.end_time.append_value(span.end_time.timestamp_micros());
847 self.duration_ms.append_value(span.duration_ms);
848
849 self.status_code.append_value(span.status_code);
851 if span.status_message.is_empty() {
852 self.status_message.append_null();
853 } else {
854 self.status_message.append_value(&span.status_message);
855 }
856
857 match &span.label {
859 Some(l) => self.label.append_value(l),
860 None => self.label.append_null(),
861 }
862
863 self.append_attributes(&span.attributes).inspect_err(|e| {
865 error!(
866 "Failed to append attributes for span {}: {}",
867 span.span_id, e
868 )
869 })?;
870
871 self.append_resource_attributes(&span.resource_attributes)
873 .inspect_err(|e| {
874 error!(
875 "Failed to append resource_attributes for span {}: {}",
876 span.span_id, e
877 )
878 })?;
879
880 self.append_events(&span.events)
882 .inspect_err(|e| error!("Failed to append events for span {}: {}", span.span_id, e))?;
883
884 self.append_links(&span.links)
886 .inspect_err(|e| error!("Failed to append links for span {}: {}", span.span_id, e))?;
887
888 self.input.append_value(
890 serde_json::to_string(&span.input).unwrap_or_else(|_| "null".to_string()),
891 );
892
893 self.output.append_value(
894 serde_json::to_string(&span.output).unwrap_or_else(|_| "null".to_string()),
895 );
896
897 let search_text = Self::build_search_blob(span);
899 self.search_blob.append_value(search_text);
900
901 let days = span.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
903 self.partition_date.append_value(days);
904
905 Ok(())
906 }
907
908 fn append_attributes(&mut self, attributes: &[Attribute]) -> Result<(), TraceEngineError> {
909 for attr in attributes {
910 self.attributes.keys().append_value(&attr.key);
911 let value_str =
912 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
913 self.attributes.values().append_value(value_str);
914 }
915 self.attributes.append(true)?;
916 Ok(())
917 }
918
919 fn append_resource_attributes(
920 &mut self,
921 attributes: &[Attribute],
922 ) -> Result<(), TraceEngineError> {
923 if attributes.is_empty() {
924 self.resource_attributes.append(false)?; } else {
926 for attr in attributes {
927 self.resource_attributes.keys().append_value(&attr.key);
928 let value_str =
929 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
930 self.resource_attributes.values().append_value(value_str);
931 }
932 self.resource_attributes.append(true)?;
933 }
934 Ok(())
935 }
936
937 fn append_events(&mut self, events: &[SpanEvent]) -> Result<(), TraceEngineError> {
938 let event_struct = self.events.values();
939 for event in events {
940 let name_builder = event_struct
941 .field_builder::<StringBuilder>(0)
942 .ok_or_else(|| TraceEngineError::DowncastError("event name builder"))?;
943 name_builder.append_value(&event.name);
944
945 let time_builder = event_struct
946 .field_builder::<TimestampMicrosecondBuilder>(1)
947 .ok_or_else(|| TraceEngineError::DowncastError("event timestamp builder"))?;
948 time_builder.append_value(event.timestamp.timestamp_micros());
949
950 let attr_builder = event_struct
951 .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(2)
952 .ok_or_else(|| TraceEngineError::DowncastError("event attributes builder"))?;
953
954 for attr in &event.attributes {
955 attr_builder.keys().append_value(&attr.key);
956 let value_str =
957 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
958 attr_builder.values().append_value(value_str);
959 }
960 attr_builder.append(true)?;
961
962 let dropped_builder =
963 event_struct
964 .field_builder::<UInt32Builder>(3)
965 .ok_or_else(|| {
966 TraceEngineError::DowncastError("dropped attributes count builder")
967 })?;
968 dropped_builder.append_value(event.dropped_attributes_count);
969
970 event_struct.append(true);
971 }
972
973 self.events.append(true);
974 Ok(())
975 }
976
977 fn append_links(&mut self, links: &[SpanLink]) -> Result<(), TraceEngineError> {
978 let link_struct = self.links.values();
979
980 for link in links {
981 let trace_builder = link_struct
982 .field_builder::<FixedSizeBinaryBuilder>(0)
983 .ok_or_else(|| TraceEngineError::DowncastError("link trace_id builder"))?;
984
985 let trace_bytes = TraceId::hex_to_bytes(&link.trace_id).map_err(|e| {
986 TraceEngineError::InvalidHexId(link.trace_id.clone(), e.to_string())
987 })?;
988 trace_builder.append_value(&trace_bytes)?;
989
990 let span_builder = link_struct
991 .field_builder::<FixedSizeBinaryBuilder>(1)
992 .ok_or_else(|| TraceEngineError::DowncastError("link span_id builder"))?;
993
994 let span_bytes = SpanId::hex_to_bytes(&link.span_id)
995 .map_err(|e| TraceEngineError::InvalidHexId(link.span_id.clone(), e.to_string()))?;
996 span_builder.append_value(&span_bytes)?;
997
998 let state_builder = link_struct
999 .field_builder::<StringBuilder>(2)
1000 .ok_or_else(|| TraceEngineError::DowncastError("link trace_state builder"))?;
1001 state_builder.append_value(&link.trace_state);
1002
1003 let attr_builder = link_struct
1004 .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(3)
1005 .ok_or_else(|| TraceEngineError::DowncastError("link attributes builder"))?;
1006
1007 for attr in &link.attributes {
1008 attr_builder.keys().append_value(&attr.key);
1009 let value_str =
1010 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
1011 attr_builder.values().append_value(value_str);
1012 }
1013 attr_builder.append(true)?;
1014
1015 let dropped_builder =
1016 link_struct
1017 .field_builder::<UInt32Builder>(4)
1018 .ok_or_else(|| {
1019 TraceEngineError::DowncastError("link dropped attributes count builder")
1020 })?;
1021 dropped_builder.append_value(link.dropped_attributes_count);
1022
1023 link_struct.append(true);
1024 }
1025
1026 self.links.append(true);
1027 Ok(())
1028 }
1029
1030 fn build_search_blob(span: &TraceSpanRecord) -> String {
1037 let mut search = String::with_capacity(512);
1038
1039 search.push('|');
1041 search.push_str(&span.service_name);
1042 search.push_str("| |");
1043 search.push_str(&span.span_name);
1044 search.push_str("| |");
1045 search.push_str(&span.scope_name);
1046 search.push('|');
1047
1048 if !span.status_message.is_empty() {
1049 search.push_str(" |");
1050 search.push_str(&span.status_message);
1051 search.push('|');
1052 }
1053
1054 for attr in &span.attributes {
1056 search.push_str(" |");
1057 search.push_str(&attr.key);
1058 search.push('=');
1059 match &attr.value {
1060 Value::String(s) => search.push_str(s),
1061 Value::Number(n) => search.push_str(&n.to_string()),
1062 Value::Bool(b) => search.push_str(&b.to_string()),
1063 Value::Null => {}
1064 other => search.push_str(&other.to_string()),
1065 }
1066 search.push('|');
1067 }
1068
1069 for event in &span.events {
1070 search.push_str(" |");
1071 search.push_str(&event.name);
1072 search.push('|');
1073 }
1074
1075 search
1076 }
1077
1078 pub fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
1080 let batch = RecordBatch::try_new(
1081 self.schema.clone(),
1082 vec![
1083 Arc::new(self.trace_id.finish()),
1084 Arc::new(self.span_id.finish()),
1085 Arc::new(self.parent_span_id.finish()),
1086 Arc::new(self.flags.finish()),
1087 Arc::new(self.trace_state.finish()),
1088 Arc::new(self.scope_name.finish()),
1089 Arc::new(self.scope_version.finish()),
1090 Arc::new(self.service_name.finish()),
1091 Arc::new(self.span_name.finish()),
1092 Arc::new(self.span_kind.finish()),
1093 Arc::new(self.start_time.finish()),
1094 Arc::new(self.end_time.finish()),
1095 Arc::new(self.duration_ms.finish()),
1096 Arc::new(self.status_code.finish()),
1097 Arc::new(self.status_message.finish()),
1098 Arc::new(self.label.finish()),
1099 Arc::new(self.attributes.finish()),
1100 Arc::new(self.resource_attributes.finish()),
1101 Arc::new(self.events.finish()),
1102 Arc::new(self.links.finish()),
1103 Arc::new(self.input.finish()),
1104 Arc::new(self.output.finish()),
1105 Arc::new(self.search_blob.finish()),
1106 Arc::new(self.partition_date.finish()),
1107 ],
1108 )?;
1109
1110 Ok(batch)
1111 }
1112}