Skip to main content

scouter_dataframe/parquet/tracing/
engine.rs

1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::arrow_schema_to_delta;
4use crate::parquet::tracing::traits::attribute_field;
5use crate::parquet::tracing::traits::TraceSchemaExt;
6use crate::parquet::utils::{create_attr_match_udf, register_cloud_logstore_factories};
7use crate::storage::ObjectStore;
8use arrow::array::*;
9use arrow::datatypes::*;
10use arrow_array::RecordBatch;
11use chrono::{Datelike, Utc};
12use datafusion::prelude::SessionContext;
13use deltalake::datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
14use deltalake::datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
15use deltalake::datafusion::parquet::schema::types::ColumnPath;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_settings::ObjectStorageSettings;
19use scouter_types::SpanId;
20use scouter_types::TraceId;
21use scouter_types::TraceSpanRecord;
22use scouter_types::{Attribute, SpanEvent, SpanLink};
23use serde_json::Value;
24use std::sync::Arc;
25use tokio::sync::oneshot;
26use tokio::sync::{mpsc, RwLock as AsyncRwLock};
27use tokio::time::{interval, Duration};
28use tracing::{debug, error, info, instrument};
29use url::Url;
30
31const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
32
33/// Control table task names for distributed coordination.
34const TASK_OPTIMIZE: &str = "trace_optimize";
35const TASK_RETENTION: &str = "trace_retention";
36
37/// Days from year-0001 to Unix epoch (1970-01-01), used to convert chrono → Arrow Date32.
38/// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`.
39const UNIX_EPOCH_DAYS: i32 = 719_163;
40
41pub enum TableCommand {
42    Write {
43        spans: Vec<TraceSpanRecord>,
44        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
45    },
46    Optimize {
47        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
48    },
49    Vacuum {
50        retention_hours: u64,
51        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
52    },
53    Expire {
54        cutoff_date: chrono::NaiveDate,
55        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
56    },
57    Shutdown,
58}
59
60async fn build_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
61    let mut base = object_store.get_base_url()?;
62    let mut path = base.path().to_string();
63    if !path.ends_with('/') {
64        path.push('/');
65    }
66    path.push_str(TRACE_SPAN_TABLE_NAME);
67    base.set_path(&path);
68    Ok(base)
69}
70
71#[instrument(skip_all)]
72async fn create_table(
73    object_store: &ObjectStore,
74    table_url: Url,
75    schema: SchemaRef,
76) -> Result<DeltaTable, TraceEngineError> {
77    info!(
78        "Creating trace span table [{}://.../{} ]",
79        table_url.scheme(),
80        table_url
81            .path_segments()
82            .and_then(|mut s| s.next_back())
83            .unwrap_or(TRACE_SPAN_TABLE_NAME)
84    );
85
86    let store = object_store.as_dyn_object_store();
87    let table = DeltaTableBuilder::from_url(table_url.clone())?
88        .with_storage_backend(store, table_url)
89        .build()?;
90
91    let delta_fields = arrow_schema_to_delta(&schema);
92
93    table
94        .create()
95        .with_table_name(TRACE_SPAN_TABLE_NAME)
96        .with_columns(delta_fields)
97        .with_partition_columns(vec!["partition_date".to_string()])
98        .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
99        // Only collect min/max statistics for columns that benefit from data skipping.
100        .with_configuration_property(
101            TableProperty::DataSkippingStatsColumns,
102            Some("start_time,end_time,service_name,duration_ms,status_code,partition_date"),
103        )
104        .await
105        .map_err(Into::into)
106}
107
108#[instrument(skip_all)]
109async fn build_or_create_table(
110    object_store: &ObjectStore,
111    schema: SchemaRef,
112) -> Result<DeltaTable, TraceEngineError> {
113    register_cloud_logstore_factories();
114    let table_url = build_url(object_store).await?;
115    info!(
116        "Attempting to load trace span table [{}://.../{} ]",
117        table_url.scheme(),
118        table_url
119            .path_segments()
120            .and_then(|mut s| s.next_back())
121            .unwrap_or(TRACE_SPAN_TABLE_NAME)
122    );
123
124    // For all store types we check for an existing Delta table by attempting a load.
125    // Local tables can be checked cheaply via the filesystem; remote tables require
126    // an actual load attempt against the object store.
127    let is_delta_table = if table_url.scheme() == "file" {
128        if let Ok(path) = table_url.to_file_path() {
129            if !path.exists() {
130                info!("Creating directory for local table: {:?}", path);
131                std::fs::create_dir_all(&path)?;
132            }
133            path.join("_delta_log").exists()
134        } else {
135            false
136        }
137    } else {
138        let store = object_store.as_dyn_object_store();
139        match DeltaTableBuilder::from_url(table_url.clone()) {
140            Ok(builder) => builder
141                .with_storage_backend(store, table_url.clone())
142                .load()
143                .await
144                .is_ok(),
145            Err(_) => false,
146        }
147    };
148
149    if is_delta_table {
150        info!(
151            "Loaded existing trace span table [{}://.../{} ]",
152            table_url.scheme(),
153            table_url
154                .path_segments()
155                .and_then(|mut s| s.next_back())
156                .unwrap_or(TRACE_SPAN_TABLE_NAME)
157        );
158        let store = object_store.as_dyn_object_store();
159        let table = DeltaTableBuilder::from_url(table_url.clone())?
160            .with_storage_backend(store, table_url)
161            .load()
162            .await?;
163        Ok(table)
164    } else {
165        info!("Table does not exist, creating new table");
166        create_table(object_store, table_url, schema).await
167    }
168}
169
170/// Core trace span engine for high-throughput observability workloads.
171///
172/// Hierarchy fields (depth, span_order, path, root_span_id) are NOT stored — they are
173/// computed at query time via Rust DFS traversal. This matches how Jaeger/Zipkin operate and
174/// avoids ordering dependencies during ingest (spans may arrive out-of-order within a batch).
175pub struct TraceSpanDBEngine {
176    schema: Arc<Schema>,
177    pub object_store: ObjectStore,
178    table: Arc<AsyncRwLock<DeltaTable>>,
179    pub ctx: Arc<SessionContext>,
180    control: ControlTableEngine,
181}
182
183impl TraceSchemaExt for TraceSpanDBEngine {}
184
185impl TraceSpanDBEngine {
186    pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, TraceEngineError> {
187        let object_store = ObjectStore::new(storage_settings)?;
188        let schema = Arc::new(Self::create_schema());
189        let delta_table = build_or_create_table(&object_store, schema.clone()).await?;
190        let ctx = object_store.get_session()?;
191
192        // Register the match_attr UDF so DataFusion plans can use it for search_blob filtering.
193        // This must happen before any query is planned — UDFs live on the SessionContext.
194        ctx.register_udf(create_attr_match_udf());
195
196        // A freshly-created table has no committed Parquet files yet — table_provider()
197        // Defer registration until the first write populates the log.
198        if let Ok(provider) = delta_table.table_provider().await {
199            ctx.register_table(TRACE_SPAN_TABLE_NAME, provider)?;
200        } else {
201            info!("Empty table at init — deferring SessionContext registration until first write");
202        }
203        let control = ControlTableEngine::new(&object_store, get_pod_id()).await?;
204
205        Ok(TraceSpanDBEngine {
206            schema,
207            object_store,
208            table: Arc::new(AsyncRwLock::new(delta_table)),
209            ctx: Arc::new(ctx),
210            control,
211        })
212    }
213
214    /// Build a RecordBatch from a vector of TraceSpanRecord (raw ingest type, no hierarchy).
215    pub fn build_batch(
216        &self,
217        spans: Vec<TraceSpanRecord>,
218    ) -> Result<RecordBatch, TraceEngineError> {
219        let start_time = std::time::Instant::now();
220        let mut builder = TraceSpanBatchBuilder::new(self.schema.clone());
221
222        for span in spans {
223            builder.append(&span)?;
224        }
225
226        let record_batch = builder
227            .finish()
228            .inspect_err(|e| error!("Failed to build RecordBatch: {}", e))?;
229
230        let duration = start_time.elapsed();
231        debug!(
232            "Built RecordBatch with {} rows in {:?}",
233            record_batch.num_rows(),
234            duration
235        );
236        Ok(record_batch)
237    }
238
239    /// Build the shared `WriterProperties` used for both ingest writes and Z-ORDER compaction.
240    fn build_writer_props() -> WriterProperties {
241        WriterProperties::builder()
242            // Row group size: creates ~4 groups per 128MB file so bloom + page stats
243            // prune within files, not just across files.
244            .set_max_row_group_size(32_768)
245            // Bloom filter on trace_id: skips ~99% of row groups for trace_id equality lookups.
246            .set_column_bloom_filter_enabled(ColumnPath::new(vec!["trace_id".to_string()]), true)
247            .set_column_bloom_filter_fpp(ColumnPath::new(vec!["trace_id".to_string()]), 0.01)
248            .set_column_bloom_filter_ndv(ColumnPath::new(vec!["trace_id".to_string()]), 32_768)
249            // service_name: low cardinality but hot lookup path — bloom skips row groups fast
250            .set_column_bloom_filter_enabled(
251                ColumnPath::new(vec!["service_name".to_string()]),
252                true,
253            )
254            .set_column_bloom_filter_fpp(ColumnPath::new(vec!["service_name".to_string()]), 0.01)
255            .set_column_bloom_filter_ndv(ColumnPath::new(vec!["service_name".to_string()]), 256)
256            // span_name: high cardinality equality queries (e.g. "grpc.unary/method")
257            .set_column_bloom_filter_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
258            .set_column_bloom_filter_fpp(ColumnPath::new(vec!["span_name".to_string()]), 0.01)
259            .set_column_bloom_filter_ndv(ColumnPath::new(vec!["span_name".to_string()]), 32_768)
260            // Page-level stats on start_time: finest-grained time pruning within row groups.
261            .set_column_statistics_enabled(
262                ColumnPath::new(vec!["start_time".to_string()]),
263                EnabledStatistics::Page,
264            )
265            // status_code: page-level min/max prunes pages for error-only queries.
266            // Do NOT use bloom filter: only 3 possible values (0/1/2), overhead > benefit.
267            .set_column_statistics_enabled(
268                ColumnPath::new(vec!["status_code".to_string()]),
269                EnabledStatistics::Page,
270            )
271            // Delta encoding on near-sorted integer columns: 4-8x compression on timestamps
272            // after Z-ORDER compaction; 2-4x on durations within a service.
273            .set_column_encoding(
274                ColumnPath::new(vec!["start_time".to_string()]),
275                Encoding::DELTA_BINARY_PACKED,
276            )
277            .set_column_encoding(
278                ColumnPath::new(vec!["duration_ms".to_string()]),
279                Encoding::DELTA_BINARY_PACKED,
280            )
281            // ZSTD level 3: ~40% better compression than SNAPPY on text columns;
282            // marginal decompression overhead is offset by reduced I/O.
283            .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
284            // Dictionary hint on span_name: high repetition similar to service_name.
285            .set_column_dictionary_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
286            .build()
287    }
288
289    /// Write spans to the Delta table (single-writer invariant via actor channel).
290    async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
291        info!("Engine received write request for {} spans", spans.len());
292
293        let batch = self
294            .build_batch(spans)
295            .inspect_err(|e| error!("failed to build batch: {:?}", e))?;
296        info!("Built batch with {} rows", batch.num_rows());
297
298        let mut table_guard = self.table.write().await;
299        info!("Acquired table write lock");
300
301        // update_incremental is intentionally omitted here.
302        //
303        // This engine runs as a single-writer actor — no other process commits to this
304        // Delta table, so the in-memory state is always current. Calling update_incremental
305        // on a freshly-created empty table (version 0, no data files) causes the Delta
306        // Kernel to emit "Not a Delta table: No files in log segment", which mutates
307        // table_guard into a corrupted intermediate state before the error propagates.
308        // That corrupted clone then has no partition column metadata, producing unpartitioned
309        // flat Parquet files instead of partition_date=YYYY-MM-DD/ hive directories.
310
311        let current_table = table_guard.clone();
312
313        let updated_table = current_table
314            .write(vec![batch])
315            .with_save_mode(deltalake::protocol::SaveMode::Append)
316            .with_writer_properties(Self::build_writer_props())
317            // Always declare partition columns explicitly — do not rely solely on the
318            // in-memory snapshot, which can be stale after a failed update_incremental.
319            .with_partition_columns(vec!["partition_date".to_string()])
320            .await?;
321
322        info!("Successfully wrote batch to Delta Lake");
323
324        self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
325        self.ctx
326            .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
327        // Ensure the table's object store is registered with the DataFusion session
328        // so that DeltaScan::scan() can resolve file URLs during query execution.
329        updated_table.update_datafusion_session(&self.ctx.state())?;
330
331        *table_guard = updated_table;
332
333        Ok(())
334    }
335
336    async fn optimize_table(&self) -> Result<(), TraceEngineError> {
337        let mut table_guard = self.table.write().await;
338
339        let current_table = table_guard.clone();
340
341        let (updated_table, _metrics) = current_table
342            .optimize()
343            .with_target_size(128 * 1024 * 1024)
344            .with_type(OptimizeType::ZOrder(vec![
345                "start_time".to_string(),
346                "service_name".to_string(),
347            ]))
348            // Bloom filters must be re-specified here — compaction rewrites all Parquet files
349            // from scratch using these properties. Without this, every compaction cycle
350            // silently discards all bloom filters on the rewritten files.
351            .with_writer_properties(Self::build_writer_props())
352            .await?;
353
354        self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
355        self.ctx
356            .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
357        updated_table.update_datafusion_session(&self.ctx.state())?;
358
359        *table_guard = updated_table;
360
361        Ok(())
362    }
363
364    async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
365        let mut table_guard = self.table.write().await;
366
367        let (updated_table, _metrics) = table_guard
368            .clone()
369            .vacuum()
370            .with_retention_period(chrono::Duration::hours(retention_hours as i64))
371            .with_enforce_retention_duration(false)
372            .await?;
373
374        self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
375        self.ctx
376            .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
377        updated_table.update_datafusion_session(&self.ctx.state())?;
378
379        *table_guard = updated_table;
380
381        Ok(())
382    }
383
384    /// Delete all rows with `partition_date` older than `cutoff_date`.
385    ///
386    /// This is a logical delete — it writes a new Delta log entry marking the rows as removed.
387    /// Physical disk space is not reclaimed until `vacuum_table()` runs afterwards.
388    async fn expire_table(&self, cutoff_date: chrono::NaiveDate) -> Result<(), TraceEngineError> {
389        let mut table_guard = self.table.write().await;
390
391        // CAST('YYYY-MM-DD' AS DATE) produces a Date32 that matches the partition column type,
392        // which allows Delta Lake to translate this into a partition directory filter.
393        let predicate = format!(
394            "partition_date < CAST('{}' AS DATE)",
395            cutoff_date.format("%Y-%m-%d")
396        );
397
398        let (updated_table, metrics) = table_guard
399            .clone()
400            .delete()
401            .with_predicate(predicate)
402            .await?;
403
404        info!(
405            "Expired {} rows older than {}",
406            metrics.num_deleted_rows, cutoff_date
407        );
408
409        self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
410        self.ctx
411            .register_table(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?)?;
412        updated_table.update_datafusion_session(&self.ctx.state())?;
413
414        *table_guard = updated_table;
415
416        Ok(())
417    }
418
419    /// Try to claim and run the optimize task via the control table.
420    ///
421    /// The control table's OCC ensures only one pod runs this at a time across
422    async fn try_run_optimize(&self, interval_hours: u64) {
423        match self.control.try_claim_task(TASK_OPTIMIZE).await {
424            Ok(true) => match self.optimize_table().await {
425                Ok(()) => {
426                    // Vacuum tombstoned files left behind by compaction.
427                    // retention_hours=0 is safe here because the single-writer invariant
428                    // guarantees no concurrent reader is using an older table version.
429                    if let Err(e) = self.vacuum_table(0).await {
430                        error!("Post-optimize vacuum failed: {}", e);
431                    }
432                    let _ = self
433                        .control
434                        .release_task(
435                            TASK_OPTIMIZE,
436                            chrono::Duration::hours(interval_hours as i64),
437                        )
438                        .await;
439                }
440                Err(e) => {
441                    error!("Optimize failed: {}", e);
442                    let _ = self.control.release_task_on_failure(TASK_OPTIMIZE).await;
443                }
444            },
445            Ok(false) => { /* not due or another pod owns it */ }
446            Err(e) => error!("Optimize claim check failed: {}", e),
447        }
448    }
449
450    /// Try to claim and run the retention task via the control table.
451    async fn try_run_retention(&self, retention_days: u32) {
452        match self.control.try_claim_task(TASK_RETENTION).await {
453            Ok(true) => {
454                let cutoff =
455                    (Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
456                match self.expire_table(cutoff).await {
457                    Ok(()) => {
458                        // Reclaim disk space after logical delete
459                        let _ = self.vacuum_table(0).await;
460                        let _ = self
461                            .control
462                            .release_task(TASK_RETENTION, chrono::Duration::hours(24))
463                            .await;
464                    }
465                    Err(e) => {
466                        error!("Retention failed: {}", e);
467                        let _ = self.control.release_task_on_failure(TASK_RETENTION).await;
468                    }
469                }
470            }
471            Ok(false) => {}
472            Err(e) => error!("Retention claim check failed: {}", e),
473        }
474    }
475
476    /// Refresh the in-memory Delta table snapshot from shared object storage.
477    ///
478    /// This is mainly for multiple pods sharing the same storage.
479    /// Safety: clones the table before calling `update_incremental` so that a failure
480    /// (e.g. "Not a Delta table" on an empty table) leaves the original guard intact.
481    async fn refresh_table(&self) -> Result<(), TraceEngineError> {
482        let mut table_guard = self.table.write().await;
483        let current_version = table_guard.version();
484
485        // Clone before update_incremental — on failure the clone is discarded and the
486        // original guard stays intact, avoiding the corrupted-state bug described at line 301.
487        let mut refreshed = table_guard.clone();
488        match refreshed.update_incremental(None).await {
489            Ok(_) => {
490                if refreshed.version() > current_version {
491                    debug!(
492                        "Table refreshed: v{:?} → v{:?}",
493                        current_version,
494                        refreshed.version()
495                    );
496                    self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
497                    self.ctx
498                        .register_table(TRACE_SPAN_TABLE_NAME, refreshed.table_provider().await?)?;
499                    refreshed.update_datafusion_session(&self.ctx.state())?;
500                    *table_guard = refreshed;
501                }
502            }
503            Err(e) => {
504                // Tolerate: empty tables (no log yet), transient network errors.
505                // These are expected on freshly-created tables and do not indicate a bug.
506                debug!("Table refresh skipped: {}", e);
507            }
508        }
509        Ok(())
510    }
511
512    #[instrument(skip_all, name = "trace_engine_actor")]
513    pub fn start_actor(
514        self,
515        compaction_interval_hours: u64,
516        retention_days: Option<u32>,
517        refresh_interval_secs: u64,
518    ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
519        let (tx, mut rx) = mpsc::channel::<TableCommand>(100);
520
521        let handle = tokio::spawn(async move {
522            // Poll every 5 minutes — the actual schedule is persisted in the
523            // control table's `next_run_at` and survives pod restarts.
524            let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
525            scheduler_ticker.tick().await; // skip immediate tick
526
527            // Refresh ticker: picks up commits from other pods on shared storage.
528            // Runs on every process/pod independently — unlike compaction, there is no control-table
529            // mutual exclusion here. Every pod must refresh its own in-memory snapshot.
530            let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs));
531            refresh_ticker.tick().await;
532
533            loop {
534                tokio::select! {
535                    Some(cmd) = rx.recv() => {
536                        match cmd {
537                            TableCommand::Write { spans, respond_to } => {
538                                match self.write_spans(spans).await {
539                                    Ok(_) => { let _ = respond_to.send(Ok(())); }
540                                    Err(e) => {
541                                        tracing::error!("Write failed: {}", e);
542                                        let _ = respond_to.send(Err(e));
543                                    }
544                                }
545                            }
546                            TableCommand::Optimize { respond_to } => {
547                                // Response is sent before vacuum so callers aren't blocked
548                                // on the potentially slow file-deletion pass.
549                                let _ = respond_to.send(self.optimize_table().await);
550                                if let Err(e) = self.vacuum_table(0).await {
551                                    error!("Post-optimize vacuum failed: {}", e);
552                                }
553                            }
554                            TableCommand::Vacuum { retention_hours, respond_to } => {
555                                let _ = respond_to.send(self.vacuum_table(retention_hours).await);
556                            }
557                            TableCommand::Expire { cutoff_date, respond_to } => {
558                                let _ = respond_to.send(self.expire_table(cutoff_date).await);
559                            }
560                            TableCommand::Shutdown => {
561                                tracing::info!("Shutting down table engine");
562                                break;
563                            }
564                        }
565                    }
566                    _ = scheduler_ticker.tick() => {
567                        self.try_run_optimize(compaction_interval_hours).await;
568                        if let Some(days) = retention_days {
569                            self.try_run_retention(days).await;
570                        }
571                    }
572                    _ = refresh_ticker.tick() => {
573                        if let Err(e) = self.refresh_table().await {
574                            error!("Table refresh failed: {}", e);
575                        }
576                    }
577                }
578            }
579        });
580
581        (tx, handle)
582    }
583}
584
585/// Efficient builder for converting `TraceSpanRecord` (ingest type) into Arrow `RecordBatch`.
586///
587/// Hierarchy fields (depth, span_order, path, root_span_id) are NOT included — they are
588/// computed at query time from the flat span data stored here.
589pub struct TraceSpanBatchBuilder {
590    schema: SchemaRef,
591
592    // ID builders
593    trace_id: FixedSizeBinaryBuilder,
594    span_id: FixedSizeBinaryBuilder,
595    parent_span_id: FixedSizeBinaryBuilder,
596
597    // W3C Trace Context
598    flags: Int32Builder,
599    trace_state: StringBuilder,
600
601    // Instrumentation scope
602    scope_name: StringBuilder,
603    scope_version: StringBuilder,
604
605    // Metadata builders
606    service_name: StringDictionaryBuilder<Int32Type>,
607    span_name: StringBuilder,
608    span_kind: StringDictionaryBuilder<Int8Type>,
609
610    // Time builders
611    start_time: TimestampMicrosecondBuilder,
612    end_time: TimestampMicrosecondBuilder,
613    duration_ms: Int64Builder,
614
615    // Status builders
616    status_code: Int32Builder,
617    status_message: StringBuilder,
618
619    // Scouter-specific
620    label: StringBuilder,
621
622    // Attribute builders
623    attributes: MapBuilder<StringBuilder, StringViewBuilder>,
624    resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
625
626    // Nested structure builders
627    events: ListBuilder<StructBuilder>,
628    links: ListBuilder<StructBuilder>,
629
630    // Payload builders
631    input: StringViewBuilder,
632    output: StringViewBuilder,
633
634    // Search optimizer
635    search_blob: StringViewBuilder,
636
637    // Partition key (days since Unix epoch)
638    partition_date: Date32Builder,
639}
640
641impl TraceSpanBatchBuilder {
642    pub fn new(schema: SchemaRef) -> Self {
643        let trace_id = FixedSizeBinaryBuilder::new(16);
644        let span_id = FixedSizeBinaryBuilder::new(8);
645        let parent_span_id = FixedSizeBinaryBuilder::new(8);
646
647        let flags = Int32Builder::new();
648        let trace_state = StringBuilder::new();
649
650        let scope_name = StringBuilder::new();
651        let scope_version = StringBuilder::new();
652
653        let service_name = StringDictionaryBuilder::<Int32Type>::new();
654        let span_name = StringBuilder::new();
655        let span_kind = StringDictionaryBuilder::<Int8Type>::new();
656
657        let start_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
658        let end_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
659        let duration_ms = Int64Builder::new();
660
661        let status_code = Int32Builder::new();
662        let status_message = StringBuilder::new();
663
664        let label = StringBuilder::new();
665
666        let map_field_name = MapFieldNames {
667            entry: "key_value".to_string(),
668            key: "key".to_string(),
669            value: "value".to_string(),
670        };
671        let attributes = MapBuilder::new(
672            Some(map_field_name.clone()),
673            StringBuilder::new(),
674            StringViewBuilder::new(),
675        );
676        let resource_attributes = MapBuilder::new(
677            Some(map_field_name.clone()),
678            StringBuilder::new(),
679            StringViewBuilder::new(),
680        );
681
682        let event_fields = vec![
683            Field::new("name", DataType::Utf8, false),
684            Field::new(
685                "timestamp",
686                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
687                false,
688            ),
689            attribute_field(),
690            Field::new("dropped_attributes_count", DataType::UInt32, false),
691        ];
692
693        let event_struct_builders = vec![
694            Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
695            Box::new(TimestampMicrosecondBuilder::new().with_timezone("UTC"))
696                as Box<dyn ArrayBuilder>,
697            Box::new(MapBuilder::new(
698                Some(map_field_name.clone()),
699                StringBuilder::new(),
700                StringViewBuilder::new(),
701            )) as Box<dyn ArrayBuilder>,
702            Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
703        ];
704
705        let event_struct_builder = StructBuilder::new(event_fields, event_struct_builders);
706        let events = ListBuilder::new(event_struct_builder);
707
708        let link_fields = vec![
709            Field::new("trace_id", DataType::FixedSizeBinary(16), false),
710            Field::new("span_id", DataType::FixedSizeBinary(8), false),
711            Field::new("trace_state", DataType::Utf8, false),
712            attribute_field(),
713            Field::new("dropped_attributes_count", DataType::UInt32, false),
714        ];
715
716        let link_struct_builders = vec![
717            Box::new(FixedSizeBinaryBuilder::new(16)) as Box<dyn ArrayBuilder>,
718            Box::new(FixedSizeBinaryBuilder::new(8)) as Box<dyn ArrayBuilder>,
719            Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
720            Box::new(MapBuilder::new(
721                Some(map_field_name.clone()),
722                StringBuilder::new(),
723                StringViewBuilder::new(),
724            )) as Box<dyn ArrayBuilder>,
725            Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
726        ];
727
728        let link_struct_builder = StructBuilder::new(link_fields, link_struct_builders);
729        let links = ListBuilder::new(link_struct_builder);
730
731        let input = StringViewBuilder::new();
732        let output = StringViewBuilder::new();
733        let search_blob = StringViewBuilder::new();
734        let partition_date = Date32Builder::new();
735
736        Self {
737            schema,
738            trace_id,
739            span_id,
740            parent_span_id,
741            flags,
742            trace_state,
743            scope_name,
744            scope_version,
745            service_name,
746            span_name,
747            span_kind,
748            start_time,
749            end_time,
750            duration_ms,
751            status_code,
752            status_message,
753            label,
754            attributes,
755            resource_attributes,
756            events,
757            links,
758            input,
759            output,
760            search_blob,
761            partition_date,
762        }
763    }
764
765    /// Append a single `TraceSpanRecord` to the batch.
766    pub fn append(&mut self, span: &TraceSpanRecord) -> Result<(), TraceEngineError> {
767        // IDs
768        let trace_bytes = span.trace_id.as_bytes();
769        self.trace_id
770            .append_value(trace_bytes)
771            .map_err(TraceEngineError::ArrowError)?;
772
773        let span_bytes = span.span_id.as_bytes();
774        self.span_id
775            .append_value(span_bytes)
776            .map_err(TraceEngineError::ArrowError)?;
777
778        match &span.parent_span_id {
779            Some(pid) => {
780                self.parent_span_id
781                    .append_value(pid.as_bytes())
782                    .map_err(TraceEngineError::ArrowError)?;
783            }
784            None => self.parent_span_id.append_null(),
785        }
786
787        // W3C Trace Context
788        self.flags.append_value(span.flags);
789        self.trace_state.append_value(&span.trace_state);
790
791        // Instrumentation scope
792        self.scope_name.append_value(&span.scope_name);
793        match &span.scope_version {
794            Some(v) => self.scope_version.append_value(v),
795            None => self.scope_version.append_null(),
796        }
797
798        // Metadata
799        self.service_name.append_value(&span.service_name);
800        self.span_name.append_value(&span.span_name);
801        // span_kind is a non-empty string in TraceSpanRecord — store as non-null
802        if span.span_kind.is_empty() {
803            self.span_kind.append_null();
804        } else {
805            self.span_kind.append_value(&span.span_kind);
806        }
807
808        // Timestamps
809        self.start_time
810            .append_value(span.start_time.timestamp_micros());
811        self.end_time.append_value(span.end_time.timestamp_micros());
812        self.duration_ms.append_value(span.duration_ms);
813
814        // Status
815        self.status_code.append_value(span.status_code);
816        if span.status_message.is_empty() {
817            self.status_message.append_null();
818        } else {
819            self.status_message.append_value(&span.status_message);
820        }
821
822        // Scouter-specific
823        match &span.label {
824            Some(l) => self.label.append_value(l),
825            None => self.label.append_null(),
826        }
827
828        // Attributes
829        self.append_attributes(&span.attributes).inspect_err(|e| {
830            error!(
831                "Failed to append attributes for span {}: {}",
832                span.span_id, e
833            )
834        })?;
835
836        // Resource attributes
837        self.append_resource_attributes(&span.resource_attributes)
838            .inspect_err(|e| {
839                error!(
840                    "Failed to append resource_attributes for span {}: {}",
841                    span.span_id, e
842                )
843            })?;
844
845        // Events
846        self.append_events(&span.events)
847            .inspect_err(|e| error!("Failed to append events for span {}: {}", span.span_id, e))?;
848
849        // Links
850        self.append_links(&span.links)
851            .inspect_err(|e| error!("Failed to append links for span {}: {}", span.span_id, e))?;
852
853        // Payloads
854        self.input.append_value(
855            serde_json::to_string(&span.input).unwrap_or_else(|_| "null".to_string()),
856        );
857
858        self.output.append_value(
859            serde_json::to_string(&span.output).unwrap_or_else(|_| "null".to_string()),
860        );
861
862        // Search blob
863        let search_text = Self::build_search_blob(span);
864        self.search_blob.append_value(search_text);
865
866        // Partition key — days since Unix epoch, derived from span start date
867        let days = span.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
868        self.partition_date.append_value(days);
869
870        Ok(())
871    }
872
873    fn append_attributes(&mut self, attributes: &[Attribute]) -> Result<(), TraceEngineError> {
874        for attr in attributes {
875            self.attributes.keys().append_value(&attr.key);
876            let value_str =
877                serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
878            self.attributes.values().append_value(value_str);
879        }
880        self.attributes.append(true)?;
881        Ok(())
882    }
883
884    fn append_resource_attributes(
885        &mut self,
886        attributes: &[Attribute],
887    ) -> Result<(), TraceEngineError> {
888        if attributes.is_empty() {
889            self.resource_attributes.append(false)?; // null map
890        } else {
891            for attr in attributes {
892                self.resource_attributes.keys().append_value(&attr.key);
893                let value_str =
894                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
895                self.resource_attributes.values().append_value(value_str);
896            }
897            self.resource_attributes.append(true)?;
898        }
899        Ok(())
900    }
901
902    fn append_events(&mut self, events: &[SpanEvent]) -> Result<(), TraceEngineError> {
903        let event_struct = self.events.values();
904        for event in events {
905            let name_builder = event_struct
906                .field_builder::<StringBuilder>(0)
907                .ok_or_else(|| TraceEngineError::DowncastError("event name builder"))?;
908            name_builder.append_value(&event.name);
909
910            let time_builder = event_struct
911                .field_builder::<TimestampMicrosecondBuilder>(1)
912                .ok_or_else(|| TraceEngineError::DowncastError("event timestamp builder"))?;
913            time_builder.append_value(event.timestamp.timestamp_micros());
914
915            let attr_builder = event_struct
916                .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(2)
917                .ok_or_else(|| TraceEngineError::DowncastError("event attributes builder"))?;
918
919            for attr in &event.attributes {
920                attr_builder.keys().append_value(&attr.key);
921                let value_str =
922                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
923                attr_builder.values().append_value(value_str);
924            }
925            attr_builder.append(true)?;
926
927            let dropped_builder =
928                event_struct
929                    .field_builder::<UInt32Builder>(3)
930                    .ok_or_else(|| {
931                        TraceEngineError::DowncastError("dropped attributes count builder")
932                    })?;
933            dropped_builder.append_value(event.dropped_attributes_count);
934
935            event_struct.append(true);
936        }
937
938        self.events.append(true);
939        Ok(())
940    }
941
942    fn append_links(&mut self, links: &[SpanLink]) -> Result<(), TraceEngineError> {
943        let link_struct = self.links.values();
944
945        for link in links {
946            let trace_builder = link_struct
947                .field_builder::<FixedSizeBinaryBuilder>(0)
948                .ok_or_else(|| TraceEngineError::DowncastError("link trace_id builder"))?;
949
950            let trace_bytes = TraceId::hex_to_bytes(&link.trace_id).map_err(|e| {
951                TraceEngineError::InvalidHexId(link.trace_id.clone(), e.to_string())
952            })?;
953            trace_builder.append_value(&trace_bytes)?;
954
955            let span_builder = link_struct
956                .field_builder::<FixedSizeBinaryBuilder>(1)
957                .ok_or_else(|| TraceEngineError::DowncastError("link span_id builder"))?;
958
959            let span_bytes = SpanId::hex_to_bytes(&link.span_id)
960                .map_err(|e| TraceEngineError::InvalidHexId(link.span_id.clone(), e.to_string()))?;
961            span_builder.append_value(&span_bytes)?;
962
963            let state_builder = link_struct
964                .field_builder::<StringBuilder>(2)
965                .ok_or_else(|| TraceEngineError::DowncastError("link trace_state builder"))?;
966            state_builder.append_value(&link.trace_state);
967
968            let attr_builder = link_struct
969                .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(3)
970                .ok_or_else(|| TraceEngineError::DowncastError("link attributes builder"))?;
971
972            for attr in &link.attributes {
973                attr_builder.keys().append_value(&attr.key);
974                let value_str =
975                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
976                attr_builder.values().append_value(value_str);
977            }
978            attr_builder.append(true)?;
979
980            let dropped_builder =
981                link_struct
982                    .field_builder::<UInt32Builder>(4)
983                    .ok_or_else(|| {
984                        TraceEngineError::DowncastError("link dropped attributes count builder")
985                    })?;
986            dropped_builder.append_value(link.dropped_attributes_count);
987
988            link_struct.append(true);
989        }
990
991        self.links.append(true);
992        Ok(())
993    }
994
995    /// Build a concatenated search string from `TraceSpanRecord` for full-text queries.
996    ///
997    /// Uses pipe-bounded tokens (`|key=value|`) to prevent false-positive substring matches
998    /// where a value contains something that looks like a different attribute key or value.
999    /// Queries use `%key=value%` patterns which match both old `key:value` archive data
1000    /// and the new `|key=value|` format.
1001    fn build_search_blob(span: &TraceSpanRecord) -> String {
1002        let mut search = String::with_capacity(512);
1003
1004        // Pipe-bounded bare tokens for full-text (service, span, scope)
1005        search.push('|');
1006        search.push_str(&span.service_name);
1007        search.push_str("| |");
1008        search.push_str(&span.span_name);
1009        search.push_str("| |");
1010        search.push_str(&span.scope_name);
1011        search.push('|');
1012
1013        if !span.status_message.is_empty() {
1014            search.push_str(" |");
1015            search.push_str(&span.status_message);
1016            search.push('|');
1017        }
1018
1019        // Pipe-bounded key=value tokens — standardize on `=` separator
1020        for attr in &span.attributes {
1021            search.push_str(" |");
1022            search.push_str(&attr.key);
1023            search.push('=');
1024            match &attr.value {
1025                Value::String(s) => search.push_str(s),
1026                Value::Number(n) => search.push_str(&n.to_string()),
1027                Value::Bool(b) => search.push_str(&b.to_string()),
1028                Value::Null => {}
1029                other => search.push_str(&other.to_string()),
1030            }
1031            search.push('|');
1032        }
1033
1034        for event in &span.events {
1035            search.push_str(" |");
1036            search.push_str(&event.name);
1037            search.push('|');
1038        }
1039
1040        search
1041    }
1042
1043    /// Finalize and build the RecordBatch. Column order must match `create_schema()`.
1044    pub fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
1045        let batch = RecordBatch::try_new(
1046            self.schema.clone(),
1047            vec![
1048                Arc::new(self.trace_id.finish()),
1049                Arc::new(self.span_id.finish()),
1050                Arc::new(self.parent_span_id.finish()),
1051                Arc::new(self.flags.finish()),
1052                Arc::new(self.trace_state.finish()),
1053                Arc::new(self.scope_name.finish()),
1054                Arc::new(self.scope_version.finish()),
1055                Arc::new(self.service_name.finish()),
1056                Arc::new(self.span_name.finish()),
1057                Arc::new(self.span_kind.finish()),
1058                Arc::new(self.start_time.finish()),
1059                Arc::new(self.end_time.finish()),
1060                Arc::new(self.duration_ms.finish()),
1061                Arc::new(self.status_code.finish()),
1062                Arc::new(self.status_message.finish()),
1063                Arc::new(self.label.finish()),
1064                Arc::new(self.attributes.finish()),
1065                Arc::new(self.resource_attributes.finish()),
1066                Arc::new(self.events.finish()),
1067                Arc::new(self.links.finish()),
1068                Arc::new(self.input.finish()),
1069                Arc::new(self.output.finish()),
1070                Arc::new(self.search_blob.finish()),
1071                Arc::new(self.partition_date.finish()),
1072            ],
1073        )?;
1074
1075        Ok(batch)
1076    }
1077}