1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::arrow_schema_to_delta;
4use crate::parquet::tracing::traits::attribute_field;
5use crate::parquet::tracing::traits::TraceSchemaExt;
6use crate::parquet::utils::{create_attr_match_udf, register_cloud_logstore_factories};
7use crate::storage::ObjectStore;
8use arrow::array::*;
9use arrow::datatypes::*;
10use arrow_array::RecordBatch;
11use chrono::{Datelike, Utc};
12use datafusion::prelude::SessionContext;
13use deltalake::datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
14use deltalake::datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
15use deltalake::datafusion::parquet::schema::types::ColumnPath;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_settings::ObjectStorageSettings;
19use scouter_types::SpanId;
20use scouter_types::TraceId;
21use scouter_types::TraceSpanRecord;
22use scouter_types::{Attribute, SpanEvent, SpanLink};
23use serde_json::Value;
24use std::sync::Arc;
25use tokio::sync::oneshot;
26use tokio::sync::{mpsc, RwLock as AsyncRwLock};
27use tokio::time::{interval, Duration};
28use tracing::{debug, error, info, instrument};
29use url::Url;
30
31const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
32
33const TASK_OPTIMIZE: &str = "trace_optimize";
35const TASK_RETENTION: &str = "trace_retention";
36
37const UNIX_EPOCH_DAYS: i32 = 719_163;
40
41pub enum TableCommand {
42 Write {
43 spans: Vec<TraceSpanRecord>,
44 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
45 },
46 Optimize {
47 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
48 },
49 Vacuum {
50 retention_hours: u64,
51 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
52 },
53 Expire {
54 cutoff_date: chrono::NaiveDate,
55 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
56 },
57 Shutdown,
58}
59
60async fn build_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
61 let mut base = object_store.get_base_url()?;
62 let mut path = base.path().to_string();
63 if !path.ends_with('/') {
64 path.push('/');
65 }
66 path.push_str(TRACE_SPAN_TABLE_NAME);
67 base.set_path(&path);
68 Ok(base)
69}
70
71#[instrument(skip_all)]
72async fn create_table(
73 object_store: &ObjectStore,
74 table_url: Url,
75 schema: SchemaRef,
76) -> Result<DeltaTable, TraceEngineError> {
77 info!(
78 "Creating trace span table [{}://.../{} ]",
79 table_url.scheme(),
80 table_url
81 .path_segments()
82 .and_then(|mut s| s.next_back())
83 .unwrap_or(TRACE_SPAN_TABLE_NAME)
84 );
85
86 let store = object_store.as_dyn_object_store();
87 let table = DeltaTableBuilder::from_url(table_url.clone())?
88 .with_storage_backend(store, table_url)
89 .build()?;
90
91 let delta_fields = arrow_schema_to_delta(&schema);
92
93 table
94 .create()
95 .with_table_name(TRACE_SPAN_TABLE_NAME)
96 .with_columns(delta_fields)
97 .with_partition_columns(vec!["partition_date".to_string()])
98 .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
99 .with_configuration_property(
101 TableProperty::DataSkippingStatsColumns,
102 Some("start_time,end_time,service_name,duration_ms,status_code,partition_date"),
103 )
104 .await
105 .map_err(Into::into)
106}
107
108#[instrument(skip_all)]
109async fn build_or_create_table(
110 object_store: &ObjectStore,
111 schema: SchemaRef,
112) -> Result<DeltaTable, TraceEngineError> {
113 register_cloud_logstore_factories();
114 let table_url = build_url(object_store).await?;
115 info!(
116 "Attempting to load trace span table [{}://.../{} ]",
117 table_url.scheme(),
118 table_url
119 .path_segments()
120 .and_then(|mut s| s.next_back())
121 .unwrap_or(TRACE_SPAN_TABLE_NAME)
122 );
123
124 let is_delta_table = if table_url.scheme() == "file" {
128 if let Ok(path) = table_url.to_file_path() {
129 if !path.exists() {
130 info!("Creating directory for local table: {:?}", path);
131 std::fs::create_dir_all(&path)?;
132 }
133 path.join("_delta_log").exists()
134 } else {
135 false
136 }
137 } else {
138 let store = object_store.as_dyn_object_store();
139 match DeltaTableBuilder::from_url(table_url.clone()) {
140 Ok(builder) => builder
141 .with_storage_backend(store, table_url.clone())
142 .load()
143 .await
144 .is_ok(),
145 Err(_) => false,
146 }
147 };
148
149 if is_delta_table {
150 info!(
151 "Loaded existing trace span table [{}://.../{} ]",
152 table_url.scheme(),
153 table_url
154 .path_segments()
155 .and_then(|mut s| s.next_back())
156 .unwrap_or(TRACE_SPAN_TABLE_NAME)
157 );
158 let store = object_store.as_dyn_object_store();
159 let table = DeltaTableBuilder::from_url(table_url.clone())?
160 .with_storage_backend(store, table_url)
161 .load()
162 .await?;
163 Ok(table)
164 } else {
165 info!("Table does not exist, creating new table");
166 create_table(object_store, table_url, schema).await
167 }
168}
169
170pub struct TraceSpanDBEngine {
176 schema: Arc<Schema>,
177 pub object_store: ObjectStore,
178 table: Arc<AsyncRwLock<DeltaTable>>,
179 pub ctx: Arc<SessionContext>,
180 control: ControlTableEngine,
181}
182
183impl TraceSchemaExt for TraceSpanDBEngine {}
184
185impl TraceSpanDBEngine {
186 pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, TraceEngineError> {
187 let object_store = ObjectStore::new(storage_settings)?;
188 let schema = Arc::new(Self::create_schema());
189 let delta_table = build_or_create_table(&object_store, schema.clone()).await?;
190 let ctx = object_store.get_session()?;
191
192 ctx.register_udf(create_attr_match_udf());
195
196 if let Ok(provider) = delta_table.table_provider().await {
199 ctx.register_table(TRACE_SPAN_TABLE_NAME, provider)?;
200 } else {
201 info!("Empty table at init — deferring SessionContext registration until first write");
202 }
203 let control = ControlTableEngine::new(&object_store, get_pod_id()).await?;
204
205 Ok(TraceSpanDBEngine {
206 schema,
207 object_store,
208 table: Arc::new(AsyncRwLock::new(delta_table)),
209 ctx: Arc::new(ctx),
210 control,
211 })
212 }
213
214 pub fn build_batch(
216 &self,
217 spans: Vec<TraceSpanRecord>,
218 ) -> Result<RecordBatch, TraceEngineError> {
219 let start_time = std::time::Instant::now();
220 let mut builder = TraceSpanBatchBuilder::new(self.schema.clone());
221
222 for span in spans {
223 builder.append(&span)?;
224 }
225
226 let record_batch = builder
227 .finish()
228 .inspect_err(|e| error!("Failed to build RecordBatch: {}", e))?;
229
230 let duration = start_time.elapsed();
231 debug!(
232 "Built RecordBatch with {} rows in {:?}",
233 record_batch.num_rows(),
234 duration
235 );
236 Ok(record_batch)
237 }
238
239 fn build_writer_props() -> WriterProperties {
241 WriterProperties::builder()
242 .set_max_row_group_size(32_768)
245 .set_column_bloom_filter_enabled(ColumnPath::new(vec!["trace_id".to_string()]), true)
247 .set_column_bloom_filter_fpp(ColumnPath::new(vec!["trace_id".to_string()]), 0.01)
248 .set_column_bloom_filter_ndv(ColumnPath::new(vec!["trace_id".to_string()]), 32_768)
249 .set_column_bloom_filter_enabled(
251 ColumnPath::new(vec!["service_name".to_string()]),
252 true,
253 )
254 .set_column_bloom_filter_fpp(ColumnPath::new(vec!["service_name".to_string()]), 0.01)
255 .set_column_bloom_filter_ndv(ColumnPath::new(vec!["service_name".to_string()]), 256)
256 .set_column_bloom_filter_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
258 .set_column_bloom_filter_fpp(ColumnPath::new(vec!["span_name".to_string()]), 0.01)
259 .set_column_bloom_filter_ndv(ColumnPath::new(vec!["span_name".to_string()]), 32_768)
260 .set_column_statistics_enabled(
262 ColumnPath::new(vec!["start_time".to_string()]),
263 EnabledStatistics::Page,
264 )
265 .set_column_statistics_enabled(
268 ColumnPath::new(vec!["status_code".to_string()]),
269 EnabledStatistics::Page,
270 )
271 .set_column_encoding(
274 ColumnPath::new(vec!["start_time".to_string()]),
275 Encoding::DELTA_BINARY_PACKED,
276 )
277 .set_column_encoding(
278 ColumnPath::new(vec!["duration_ms".to_string()]),
279 Encoding::DELTA_BINARY_PACKED,
280 )
281 .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
284 .set_column_dictionary_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
286 .build()
287 }
288
289 async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
291 info!("Engine received write request for {} spans", spans.len());
292
293 let batch = self
294 .build_batch(spans)
295 .inspect_err(|e| error!("failed to build batch: {:?}", e))?;
296 info!("Built batch with {} rows", batch.num_rows());
297
298 let mut table_guard = self.table.write().await;
299 info!("Acquired table write lock");
300
301 let current_table = table_guard.clone();
312
313 let updated_table = current_table
314 .write(vec![batch])
315 .with_save_mode(deltalake::protocol::SaveMode::Append)
316 .with_writer_properties(Self::build_writer_props())
317 .with_partition_columns(vec!["partition_date".to_string()])
320 .await?;
321
322 info!("Successfully wrote batch to Delta Lake");
323
324 self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
325 self.ctx
326 .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
327 updated_table.update_datafusion_session(&self.ctx.state())?;
330
331 *table_guard = updated_table;
332
333 Ok(())
334 }
335
336 async fn optimize_table(&self) -> Result<(), TraceEngineError> {
337 let mut table_guard = self.table.write().await;
338
339 let current_table = table_guard.clone();
340
341 let (updated_table, _metrics) = current_table
342 .optimize()
343 .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
344 .with_type(OptimizeType::ZOrder(vec![
345 "start_time".to_string(),
346 "service_name".to_string(),
347 ]))
348 .with_writer_properties(Self::build_writer_props())
352 .await?;
353
354 self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
355 self.ctx
356 .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
357 updated_table.update_datafusion_session(&self.ctx.state())?;
358
359 *table_guard = updated_table;
360
361 Ok(())
362 }
363
364 async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
365 let mut table_guard = self.table.write().await;
366
367 let (updated_table, _metrics) = table_guard
368 .clone()
369 .vacuum()
370 .with_retention_period(chrono::Duration::hours(retention_hours as i64))
371 .with_enforce_retention_duration(false)
372 .await?;
373
374 self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
375 self.ctx
376 .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
377 updated_table.update_datafusion_session(&self.ctx.state())?;
378
379 *table_guard = updated_table;
380
381 Ok(())
382 }
383
384 async fn expire_table(&self, cutoff_date: chrono::NaiveDate) -> Result<(), TraceEngineError> {
389 let mut table_guard = self.table.write().await;
390
391 let predicate = format!(
394 "partition_date < CAST('{}' AS DATE)",
395 cutoff_date.format("%Y-%m-%d")
396 );
397
398 let (updated_table, metrics) = table_guard
399 .clone()
400 .delete()
401 .with_predicate(predicate)
402 .await?;
403
404 info!(
405 "Expired {} rows older than {}",
406 metrics.num_deleted_rows, cutoff_date
407 );
408
409 self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
410 self.ctx
411 .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
412 updated_table.update_datafusion_session(&self.ctx.state())?;
413
414 *table_guard = updated_table;
415
416 Ok(())
417 }
418
419 async fn try_run_optimize(&self, interval_hours: u64) {
423 match self.control.try_claim_task(TASK_OPTIMIZE).await {
424 Ok(true) => match self.optimize_table().await {
425 Ok(()) => {
426 if let Err(e) = self.vacuum_table(0).await {
430 error!("Post-optimize vacuum failed: {}", e);
431 }
432 let _ = self
433 .control
434 .release_task(
435 TASK_OPTIMIZE,
436 chrono::Duration::hours(interval_hours as i64),
437 )
438 .await;
439 }
440 Err(e) => {
441 error!("Optimize failed: {}", e);
442 let _ = self.control.release_task_on_failure(TASK_OPTIMIZE).await;
443 }
444 },
445 Ok(false) => { }
446 Err(e) => error!("Optimize claim check failed: {}", e),
447 }
448 }
449
450 async fn try_run_retention(&self, retention_days: u32) {
452 match self.control.try_claim_task(TASK_RETENTION).await {
453 Ok(true) => {
454 let cutoff =
455 (Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
456 match self.expire_table(cutoff).await {
457 Ok(()) => {
458 let _ = self.vacuum_table(0).await;
460 let _ = self
461 .control
462 .release_task(TASK_RETENTION, chrono::Duration::hours(24))
463 .await;
464 }
465 Err(e) => {
466 error!("Retention failed: {}", e);
467 let _ = self.control.release_task_on_failure(TASK_RETENTION).await;
468 }
469 }
470 }
471 Ok(false) => {}
472 Err(e) => error!("Retention claim check failed: {}", e),
473 }
474 }
475
476 async fn refresh_table(&self) -> Result<(), TraceEngineError> {
482 let mut table_guard = self.table.write().await;
483 let current_version = table_guard.version();
484
485 let mut refreshed = table_guard.clone();
488 match refreshed.update_incremental(None).await {
489 Ok(_) => {
490 if refreshed.version() > current_version {
491 debug!(
492 "Table refreshed: v{:?} → v{:?}",
493 current_version,
494 refreshed.version()
495 );
496 self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
497 self.ctx
498 .register_table(TRACE_SPAN_TABLE_NAME, refreshed.table_provider().await?)?;
499 refreshed.update_datafusion_session(&self.ctx.state())?;
500 *table_guard = refreshed;
501 }
502 }
503 Err(e) => {
504 debug!("Table refresh skipped: {}", e);
507 }
508 }
509 Ok(())
510 }
511
512 #[instrument(skip_all, name = "trace_engine_actor")]
513 pub fn start_actor(
514 self,
515 compaction_interval_hours: u64,
516 retention_days: Option<u32>,
517 refresh_interval_secs: u64,
518 ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
519 let (tx, mut rx) = mpsc::channel::<TableCommand>(100);
520
521 let handle = tokio::spawn(async move {
522 let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
525 scheduler_ticker.tick().await; let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs));
531 refresh_ticker.tick().await;
532
533 loop {
534 tokio::select! {
535 Some(cmd) = rx.recv() => {
536 match cmd {
537 TableCommand::Write { spans, respond_to } => {
538 match self.write_spans(spans).await {
539 Ok(_) => { let _ = respond_to.send(Ok(())); }
540 Err(e) => {
541 tracing::error!("Write failed: {}", e);
542 let _ = respond_to.send(Err(e));
543 }
544 }
545 }
546 TableCommand::Optimize { respond_to } => {
547 let _ = respond_to.send(self.optimize_table().await);
550 if let Err(e) = self.vacuum_table(0).await {
551 error!("Post-optimize vacuum failed: {}", e);
552 }
553 }
554 TableCommand::Vacuum { retention_hours, respond_to } => {
555 let _ = respond_to.send(self.vacuum_table(retention_hours).await);
556 }
557 TableCommand::Expire { cutoff_date, respond_to } => {
558 let _ = respond_to.send(self.expire_table(cutoff_date).await);
559 }
560 TableCommand::Shutdown => {
561 tracing::info!("Shutting down table engine");
562 break;
563 }
564 }
565 }
566 _ = scheduler_ticker.tick() => {
567 self.try_run_optimize(compaction_interval_hours).await;
568 if let Some(days) = retention_days {
569 self.try_run_retention(days).await;
570 }
571 }
572 _ = refresh_ticker.tick() => {
573 if let Err(e) = self.refresh_table().await {
574 error!("Table refresh failed: {}", e);
575 }
576 }
577 }
578 }
579 });
580
581 (tx, handle)
582 }
583}
584
585pub struct TraceSpanBatchBuilder {
590 schema: SchemaRef,
591
592 trace_id: FixedSizeBinaryBuilder,
594 span_id: FixedSizeBinaryBuilder,
595 parent_span_id: FixedSizeBinaryBuilder,
596
597 flags: Int32Builder,
599 trace_state: StringBuilder,
600
601 scope_name: StringBuilder,
603 scope_version: StringBuilder,
604
605 service_name: StringDictionaryBuilder<Int32Type>,
607 span_name: StringBuilder,
608 span_kind: StringDictionaryBuilder<Int8Type>,
609
610 start_time: TimestampMicrosecondBuilder,
612 end_time: TimestampMicrosecondBuilder,
613 duration_ms: Int64Builder,
614
615 status_code: Int32Builder,
617 status_message: StringBuilder,
618
619 label: StringBuilder,
621
622 attributes: MapBuilder<StringBuilder, StringViewBuilder>,
624 resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
625
626 events: ListBuilder<StructBuilder>,
628 links: ListBuilder<StructBuilder>,
629
630 input: StringViewBuilder,
632 output: StringViewBuilder,
633
634 search_blob: StringViewBuilder,
636
637 partition_date: Date32Builder,
639}
640
641impl TraceSpanBatchBuilder {
642 pub fn new(schema: SchemaRef) -> Self {
643 let trace_id = FixedSizeBinaryBuilder::new(16);
644 let span_id = FixedSizeBinaryBuilder::new(8);
645 let parent_span_id = FixedSizeBinaryBuilder::new(8);
646
647 let flags = Int32Builder::new();
648 let trace_state = StringBuilder::new();
649
650 let scope_name = StringBuilder::new();
651 let scope_version = StringBuilder::new();
652
653 let service_name = StringDictionaryBuilder::<Int32Type>::new();
654 let span_name = StringBuilder::new();
655 let span_kind = StringDictionaryBuilder::<Int8Type>::new();
656
657 let start_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
658 let end_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
659 let duration_ms = Int64Builder::new();
660
661 let status_code = Int32Builder::new();
662 let status_message = StringBuilder::new();
663
664 let label = StringBuilder::new();
665
666 let map_field_name = MapFieldNames {
667 entry: "key_value".to_string(),
668 key: "key".to_string(),
669 value: "value".to_string(),
670 };
671 let attributes = MapBuilder::new(
672 Some(map_field_name.clone()),
673 StringBuilder::new(),
674 StringViewBuilder::new(),
675 );
676 let resource_attributes = MapBuilder::new(
677 Some(map_field_name.clone()),
678 StringBuilder::new(),
679 StringViewBuilder::new(),
680 );
681
682 let event_fields = vec![
683 Field::new("name", DataType::Utf8, false),
684 Field::new(
685 "timestamp",
686 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
687 false,
688 ),
689 attribute_field(),
690 Field::new("dropped_attributes_count", DataType::UInt32, false),
691 ];
692
693 let event_struct_builders = vec![
694 Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
695 Box::new(TimestampMicrosecondBuilder::new().with_timezone("UTC"))
696 as Box<dyn ArrayBuilder>,
697 Box::new(MapBuilder::new(
698 Some(map_field_name.clone()),
699 StringBuilder::new(),
700 StringViewBuilder::new(),
701 )) as Box<dyn ArrayBuilder>,
702 Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
703 ];
704
705 let event_struct_builder = StructBuilder::new(event_fields, event_struct_builders);
706 let events = ListBuilder::new(event_struct_builder);
707
708 let link_fields = vec![
709 Field::new("trace_id", DataType::FixedSizeBinary(16), false),
710 Field::new("span_id", DataType::FixedSizeBinary(8), false),
711 Field::new("trace_state", DataType::Utf8, false),
712 attribute_field(),
713 Field::new("dropped_attributes_count", DataType::UInt32, false),
714 ];
715
716 let link_struct_builders = vec![
717 Box::new(FixedSizeBinaryBuilder::new(16)) as Box<dyn ArrayBuilder>,
718 Box::new(FixedSizeBinaryBuilder::new(8)) as Box<dyn ArrayBuilder>,
719 Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
720 Box::new(MapBuilder::new(
721 Some(map_field_name.clone()),
722 StringBuilder::new(),
723 StringViewBuilder::new(),
724 )) as Box<dyn ArrayBuilder>,
725 Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
726 ];
727
728 let link_struct_builder = StructBuilder::new(link_fields, link_struct_builders);
729 let links = ListBuilder::new(link_struct_builder);
730
731 let input = StringViewBuilder::new();
732 let output = StringViewBuilder::new();
733 let search_blob = StringViewBuilder::new();
734 let partition_date = Date32Builder::new();
735
736 Self {
737 schema,
738 trace_id,
739 span_id,
740 parent_span_id,
741 flags,
742 trace_state,
743 scope_name,
744 scope_version,
745 service_name,
746 span_name,
747 span_kind,
748 start_time,
749 end_time,
750 duration_ms,
751 status_code,
752 status_message,
753 label,
754 attributes,
755 resource_attributes,
756 events,
757 links,
758 input,
759 output,
760 search_blob,
761 partition_date,
762 }
763 }
764
765 pub fn append(&mut self, span: &TraceSpanRecord) -> Result<(), TraceEngineError> {
767 let trace_bytes = span.trace_id.as_bytes();
769 self.trace_id
770 .append_value(trace_bytes)
771 .map_err(TraceEngineError::ArrowError)?;
772
773 let span_bytes = span.span_id.as_bytes();
774 self.span_id
775 .append_value(span_bytes)
776 .map_err(TraceEngineError::ArrowError)?;
777
778 match &span.parent_span_id {
779 Some(pid) => {
780 self.parent_span_id
781 .append_value(pid.as_bytes())
782 .map_err(TraceEngineError::ArrowError)?;
783 }
784 None => self.parent_span_id.append_null(),
785 }
786
787 self.flags.append_value(span.flags);
789 self.trace_state.append_value(&span.trace_state);
790
791 self.scope_name.append_value(&span.scope_name);
793 match &span.scope_version {
794 Some(v) => self.scope_version.append_value(v),
795 None => self.scope_version.append_null(),
796 }
797
798 self.service_name.append_value(&span.service_name);
800 self.span_name.append_value(&span.span_name);
801 if span.span_kind.is_empty() {
803 self.span_kind.append_null();
804 } else {
805 self.span_kind.append_value(&span.span_kind);
806 }
807
808 self.start_time
810 .append_value(span.start_time.timestamp_micros());
811 self.end_time.append_value(span.end_time.timestamp_micros());
812 self.duration_ms.append_value(span.duration_ms);
813
814 self.status_code.append_value(span.status_code);
816 if span.status_message.is_empty() {
817 self.status_message.append_null();
818 } else {
819 self.status_message.append_value(&span.status_message);
820 }
821
822 match &span.label {
824 Some(l) => self.label.append_value(l),
825 None => self.label.append_null(),
826 }
827
828 self.append_attributes(&span.attributes).inspect_err(|e| {
830 error!(
831 "Failed to append attributes for span {}: {}",
832 span.span_id, e
833 )
834 })?;
835
836 self.append_resource_attributes(&span.resource_attributes)
838 .inspect_err(|e| {
839 error!(
840 "Failed to append resource_attributes for span {}: {}",
841 span.span_id, e
842 )
843 })?;
844
845 self.append_events(&span.events)
847 .inspect_err(|e| error!("Failed to append events for span {}: {}", span.span_id, e))?;
848
849 self.append_links(&span.links)
851 .inspect_err(|e| error!("Failed to append links for span {}: {}", span.span_id, e))?;
852
853 self.input.append_value(
855 serde_json::to_string(&span.input).unwrap_or_else(|_| "null".to_string()),
856 );
857
858 self.output.append_value(
859 serde_json::to_string(&span.output).unwrap_or_else(|_| "null".to_string()),
860 );
861
862 let search_text = Self::build_search_blob(span);
864 self.search_blob.append_value(search_text);
865
866 let days = span.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
868 self.partition_date.append_value(days);
869
870 Ok(())
871 }
872
873 fn append_attributes(&mut self, attributes: &[Attribute]) -> Result<(), TraceEngineError> {
874 for attr in attributes {
875 self.attributes.keys().append_value(&attr.key);
876 let value_str =
877 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
878 self.attributes.values().append_value(value_str);
879 }
880 self.attributes.append(true)?;
881 Ok(())
882 }
883
884 fn append_resource_attributes(
885 &mut self,
886 attributes: &[Attribute],
887 ) -> Result<(), TraceEngineError> {
888 if attributes.is_empty() {
889 self.resource_attributes.append(false)?; } else {
891 for attr in attributes {
892 self.resource_attributes.keys().append_value(&attr.key);
893 let value_str =
894 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
895 self.resource_attributes.values().append_value(value_str);
896 }
897 self.resource_attributes.append(true)?;
898 }
899 Ok(())
900 }
901
902 fn append_events(&mut self, events: &[SpanEvent]) -> Result<(), TraceEngineError> {
903 let event_struct = self.events.values();
904 for event in events {
905 let name_builder = event_struct
906 .field_builder::<StringBuilder>(0)
907 .ok_or_else(|| TraceEngineError::DowncastError("event name builder"))?;
908 name_builder.append_value(&event.name);
909
910 let time_builder = event_struct
911 .field_builder::<TimestampMicrosecondBuilder>(1)
912 .ok_or_else(|| TraceEngineError::DowncastError("event timestamp builder"))?;
913 time_builder.append_value(event.timestamp.timestamp_micros());
914
915 let attr_builder = event_struct
916 .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(2)
917 .ok_or_else(|| TraceEngineError::DowncastError("event attributes builder"))?;
918
919 for attr in &event.attributes {
920 attr_builder.keys().append_value(&attr.key);
921 let value_str =
922 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
923 attr_builder.values().append_value(value_str);
924 }
925 attr_builder.append(true)?;
926
927 let dropped_builder =
928 event_struct
929 .field_builder::<UInt32Builder>(3)
930 .ok_or_else(|| {
931 TraceEngineError::DowncastError("dropped attributes count builder")
932 })?;
933 dropped_builder.append_value(event.dropped_attributes_count);
934
935 event_struct.append(true);
936 }
937
938 self.events.append(true);
939 Ok(())
940 }
941
942 fn append_links(&mut self, links: &[SpanLink]) -> Result<(), TraceEngineError> {
943 let link_struct = self.links.values();
944
945 for link in links {
946 let trace_builder = link_struct
947 .field_builder::<FixedSizeBinaryBuilder>(0)
948 .ok_or_else(|| TraceEngineError::DowncastError("link trace_id builder"))?;
949
950 let trace_bytes = TraceId::hex_to_bytes(&link.trace_id).map_err(|e| {
951 TraceEngineError::InvalidHexId(link.trace_id.clone(), e.to_string())
952 })?;
953 trace_builder.append_value(&trace_bytes)?;
954
955 let span_builder = link_struct
956 .field_builder::<FixedSizeBinaryBuilder>(1)
957 .ok_or_else(|| TraceEngineError::DowncastError("link span_id builder"))?;
958
959 let span_bytes = SpanId::hex_to_bytes(&link.span_id)
960 .map_err(|e| TraceEngineError::InvalidHexId(link.span_id.clone(), e.to_string()))?;
961 span_builder.append_value(&span_bytes)?;
962
963 let state_builder = link_struct
964 .field_builder::<StringBuilder>(2)
965 .ok_or_else(|| TraceEngineError::DowncastError("link trace_state builder"))?;
966 state_builder.append_value(&link.trace_state);
967
968 let attr_builder = link_struct
969 .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(3)
970 .ok_or_else(|| TraceEngineError::DowncastError("link attributes builder"))?;
971
972 for attr in &link.attributes {
973 attr_builder.keys().append_value(&attr.key);
974 let value_str =
975 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
976 attr_builder.values().append_value(value_str);
977 }
978 attr_builder.append(true)?;
979
980 let dropped_builder =
981 link_struct
982 .field_builder::<UInt32Builder>(4)
983 .ok_or_else(|| {
984 TraceEngineError::DowncastError("link dropped attributes count builder")
985 })?;
986 dropped_builder.append_value(link.dropped_attributes_count);
987
988 link_struct.append(true);
989 }
990
991 self.links.append(true);
992 Ok(())
993 }
994
995 fn build_search_blob(span: &TraceSpanRecord) -> String {
1002 let mut search = String::with_capacity(512);
1003
1004 search.push('|');
1006 search.push_str(&span.service_name);
1007 search.push_str("| |");
1008 search.push_str(&span.span_name);
1009 search.push_str("| |");
1010 search.push_str(&span.scope_name);
1011 search.push('|');
1012
1013 if !span.status_message.is_empty() {
1014 search.push_str(" |");
1015 search.push_str(&span.status_message);
1016 search.push('|');
1017 }
1018
1019 for attr in &span.attributes {
1021 search.push_str(" |");
1022 search.push_str(&attr.key);
1023 search.push('=');
1024 match &attr.value {
1025 Value::String(s) => search.push_str(s),
1026 Value::Number(n) => search.push_str(&n.to_string()),
1027 Value::Bool(b) => search.push_str(&b.to_string()),
1028 Value::Null => {}
1029 other => search.push_str(&other.to_string()),
1030 }
1031 search.push('|');
1032 }
1033
1034 for event in &span.events {
1035 search.push_str(" |");
1036 search.push_str(&event.name);
1037 search.push('|');
1038 }
1039
1040 search
1041 }
1042
1043 pub fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
1045 let batch = RecordBatch::try_new(
1046 self.schema.clone(),
1047 vec![
1048 Arc::new(self.trace_id.finish()),
1049 Arc::new(self.span_id.finish()),
1050 Arc::new(self.parent_span_id.finish()),
1051 Arc::new(self.flags.finish()),
1052 Arc::new(self.trace_state.finish()),
1053 Arc::new(self.scope_name.finish()),
1054 Arc::new(self.scope_version.finish()),
1055 Arc::new(self.service_name.finish()),
1056 Arc::new(self.span_name.finish()),
1057 Arc::new(self.span_kind.finish()),
1058 Arc::new(self.start_time.finish()),
1059 Arc::new(self.end_time.finish()),
1060 Arc::new(self.duration_ms.finish()),
1061 Arc::new(self.status_code.finish()),
1062 Arc::new(self.status_message.finish()),
1063 Arc::new(self.label.finish()),
1064 Arc::new(self.attributes.finish()),
1065 Arc::new(self.resource_attributes.finish()),
1066 Arc::new(self.events.finish()),
1067 Arc::new(self.links.finish()),
1068 Arc::new(self.input.finish()),
1069 Arc::new(self.output.finish()),
1070 Arc::new(self.search_blob.finish()),
1071 Arc::new(self.partition_date.finish()),
1072 ],
1073 )?;
1074
1075 Ok(batch)
1076 }
1077}