Skip to main content

scouter_dataframe/parquet/tracing/
engine.rs

1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::catalog::TraceCatalogProvider;
4use crate::parquet::tracing::traits::arrow_schema_to_delta;
5use crate::parquet::tracing::traits::attribute_field;
6use crate::parquet::tracing::traits::TraceSchemaExt;
7use crate::parquet::utils::{create_attr_match_udf, register_cloud_logstore_factories};
8use crate::storage::ObjectStore;
9use arrow::array::*;
10use arrow::datatypes::*;
11use arrow_array::RecordBatch;
12use chrono::{Datelike, Utc};
13use datafusion::catalog::CatalogProvider;
14use datafusion::prelude::SessionContext;
15use deltalake::datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
16use deltalake::datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
17use deltalake::datafusion::parquet::schema::types::ColumnPath;
18use deltalake::operations::optimize::OptimizeType;
19use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
20use scouter_settings::ObjectStorageSettings;
21use scouter_types::SpanId;
22use scouter_types::TraceId;
23use scouter_types::TraceSpanRecord;
24use scouter_types::{Attribute, SpanEvent, SpanLink};
25use serde_json::Value;
26use std::sync::Arc;
27use tokio::sync::oneshot;
28use tokio::sync::{mpsc, RwLock as AsyncRwLock};
29use tokio::time::{interval, Duration};
30use tracing::{debug, error, info, instrument};
31use url::Url;
32
33const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
34
35/// Control table task names for distributed coordination.
36const TASK_OPTIMIZE: &str = "trace_optimize";
37const TASK_RETENTION: &str = "trace_retention";
38
39/// Days from year-0001 to Unix epoch (1970-01-01), used to convert chrono → Arrow Date32.
40/// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`.
41const UNIX_EPOCH_DAYS: i32 = 719_163;
42
43pub enum TableCommand {
44    Write {
45        spans: Vec<TraceSpanRecord>,
46        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
47    },
48    Optimize {
49        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
50    },
51    Vacuum {
52        retention_hours: u64,
53        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
54    },
55    Expire {
56        cutoff_date: chrono::NaiveDate,
57        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
58    },
59    Shutdown,
60}
61
62async fn build_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
63    let mut base = object_store.get_base_url()?;
64    let mut path = base.path().to_string();
65    if !path.ends_with('/') {
66        path.push('/');
67    }
68    path.push_str(TRACE_SPAN_TABLE_NAME);
69    base.set_path(&path);
70    Ok(base)
71}
72
73#[instrument(skip_all)]
74async fn create_table(
75    object_store: &ObjectStore,
76    table_url: Url,
77    schema: SchemaRef,
78) -> Result<DeltaTable, TraceEngineError> {
79    info!(
80        "Creating trace span table [{}://.../{} ]",
81        table_url.scheme(),
82        table_url
83            .path_segments()
84            .and_then(|mut s| s.next_back())
85            .unwrap_or(TRACE_SPAN_TABLE_NAME)
86    );
87
88    let store = object_store.as_dyn_object_store();
89    let table = DeltaTableBuilder::from_url(table_url.clone())?
90        .with_storage_backend(store, table_url)
91        .build()?;
92
93    let delta_fields = arrow_schema_to_delta(&schema);
94
95    table
96        .create()
97        .with_table_name(TRACE_SPAN_TABLE_NAME)
98        .with_columns(delta_fields)
99        .with_partition_columns(vec!["partition_date".to_string()])
100        .with_configuration_property(TableProperty::CheckpointInterval, Some("5"))
101        // Only collect min/max statistics for columns that benefit from data skipping.
102        .with_configuration_property(
103            TableProperty::DataSkippingStatsColumns,
104            Some("start_time,end_time,service_name,duration_ms,status_code,partition_date"),
105        )
106        .await
107        .map_err(Into::into)
108}
109
110#[instrument(skip_all)]
111async fn build_or_create_table(
112    object_store: &ObjectStore,
113    schema: SchemaRef,
114) -> Result<DeltaTable, TraceEngineError> {
115    register_cloud_logstore_factories();
116    let table_url = build_url(object_store).await?;
117    info!(
118        "Attempting to load trace span table [{}://.../{} ]",
119        table_url.scheme(),
120        table_url
121            .path_segments()
122            .and_then(|mut s| s.next_back())
123            .unwrap_or(TRACE_SPAN_TABLE_NAME)
124    );
125
126    // For all store types we check for an existing Delta table by attempting a load.
127    // Local tables can be checked cheaply via the filesystem; remote tables require
128    // an actual load attempt against the object store.
129    let is_delta_table = if table_url.scheme() == "file" {
130        if let Ok(path) = table_url.to_file_path() {
131            if !path.exists() {
132                info!("Creating directory for local table: {:?}", path);
133                std::fs::create_dir_all(&path)?;
134            }
135            path.join("_delta_log").exists()
136        } else {
137            false
138        }
139    } else {
140        let store = object_store.as_dyn_object_store();
141        match DeltaTableBuilder::from_url(table_url.clone()) {
142            Ok(builder) => builder
143                .with_storage_backend(store, table_url.clone())
144                .load()
145                .await
146                .is_ok(),
147            Err(_) => false,
148        }
149    };
150
151    if is_delta_table {
152        info!(
153            "Loaded existing trace span table [{}://.../{} ]",
154            table_url.scheme(),
155            table_url
156                .path_segments()
157                .and_then(|mut s| s.next_back())
158                .unwrap_or(TRACE_SPAN_TABLE_NAME)
159        );
160        let store = object_store.as_dyn_object_store();
161        let table = DeltaTableBuilder::from_url(table_url.clone())?
162            .with_storage_backend(store, table_url)
163            .load()
164            .await?;
165        Ok(table)
166    } else {
167        info!("Table does not exist, creating new table");
168        create_table(object_store, table_url, schema).await
169    }
170}
171
172/// The DataFusion catalog name for the tracing engine.
173///
174/// Both `TraceSpanDBEngine` and `TraceSummaryDBEngine` share a `SessionContext` configured with
175/// this catalog as the default. Unqualified table names in SQL and `ctx.table(name)` calls
176/// resolve through our `TraceCatalogProvider` (DashMap-backed) rather than the built-in
177/// `datafusion.public` catalog, enabling atomic single-step `TableProvider` swaps.
178pub const TRACE_CATALOG_NAME: &str = "scouter_tracing";
179const TRACE_DEFAULT_SCHEMA: &str = "default";
180
181/// Core trace span engine for high-throughput observability workloads.
182///
183/// Hierarchy fields (depth, span_order, path, root_span_id) are NOT stored — they are
184/// computed at query time via Rust DFS traversal. This matches how Jaeger/Zipkin operate and
185/// avoids ordering dependencies during ingest (spans may arrive out-of-order within a batch).
186pub struct TraceSpanDBEngine {
187    schema: Arc<Schema>,
188    pub object_store: ObjectStore,
189    table: Arc<AsyncRwLock<DeltaTable>>,
190    ctx: Arc<SessionContext>,
191    /// Shared atomic catalog — both span and summary engines call `swap()` to update their
192    /// respective `TableProvider`s without a deregister/register gap.
193    pub catalog: Arc<TraceCatalogProvider>,
194    control: ControlTableEngine,
195}
196
197impl TraceSchemaExt for TraceSpanDBEngine {}
198
199impl TraceSpanDBEngine {
200    pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, TraceEngineError> {
201        let object_store = ObjectStore::new(storage_settings)?;
202        let schema = Arc::new(Self::create_schema());
203        let delta_table = build_or_create_table(&object_store, schema.clone()).await?;
204
205        // Create the session with our catalog as the default so that unqualified table
206        // names in SQL and ctx.table(name) calls resolve through TraceCatalogProvider's
207        // DashMap rather than the built-in datafusion.public catalog.
208        let ctx =
209            object_store.get_session_with_catalog(TRACE_CATALOG_NAME, TRACE_DEFAULT_SCHEMA)?;
210
211        let catalog = Arc::new(TraceCatalogProvider::new());
212        ctx.register_catalog(
213            TRACE_CATALOG_NAME,
214            Arc::clone(&catalog) as Arc<dyn CatalogProvider>,
215        );
216
217        // Register the match_attr UDF so DataFusion plans can use it for search_blob filtering.
218        // This must happen before any query is planned — UDFs live on the SessionContext.
219        ctx.register_udf(create_attr_match_udf());
220
221        // A freshly-created table has no committed Parquet files yet — table_provider()
222        // returns an error in that case. Defer registration until the first write populates the log.
223        if let Ok(provider) = delta_table.table_provider().await {
224            catalog.swap(TRACE_SPAN_TABLE_NAME, provider);
225        } else {
226            info!("Empty table at init — deferring catalog registration until first write");
227        }
228        let control = ControlTableEngine::new(&object_store, get_pod_id()).await?;
229
230        Ok(TraceSpanDBEngine {
231            schema,
232            object_store,
233            table: Arc::new(AsyncRwLock::new(delta_table)),
234            ctx: Arc::new(ctx),
235            catalog,
236            control,
237        })
238    }
239
240    /// Returns the shared `SessionContext`.
241    ///
242    /// Needed by `TraceSpanService` to share the context with `TraceSummaryDBEngine` and
243    /// `TraceQueries`. Table registration is done via `catalog.swap()` — callers must not
244    /// call `ctx.deregister_table()` + `ctx.register_table()` directly (torn write).
245    pub fn ctx(&self) -> Arc<SessionContext> {
246        Arc::clone(&self.ctx)
247    }
248
249    /// Build a RecordBatch from a vector of TraceSpanRecord (raw ingest type, no hierarchy).
250    pub fn build_batch(
251        &self,
252        spans: Vec<TraceSpanRecord>,
253    ) -> Result<RecordBatch, TraceEngineError> {
254        let start_time = std::time::Instant::now();
255        let mut builder = TraceSpanBatchBuilder::new(self.schema.clone());
256
257        for span in spans {
258            builder.append(&span)?;
259        }
260
261        let record_batch = builder
262            .finish()
263            .inspect_err(|e| error!("Failed to build RecordBatch: {}", e))?;
264
265        let duration = start_time.elapsed();
266        debug!(
267            "Built RecordBatch with {} rows in {:?}",
268            record_batch.num_rows(),
269            duration
270        );
271        Ok(record_batch)
272    }
273
274    /// Build the shared `WriterProperties` used for both ingest writes and Z-ORDER compaction.
275    fn build_writer_props() -> WriterProperties {
276        WriterProperties::builder()
277            // Row group size: creates ~4 groups per 128MB file so bloom + page stats
278            // prune within files, not just across files.
279            .set_max_row_group_size(32_768)
280            // Bloom filter on trace_id: skips ~99% of row groups for trace_id equality lookups.
281            .set_column_bloom_filter_enabled(ColumnPath::new(vec!["trace_id".to_string()]), true)
282            .set_column_bloom_filter_fpp(ColumnPath::new(vec!["trace_id".to_string()]), 0.01)
283            .set_column_bloom_filter_ndv(ColumnPath::new(vec!["trace_id".to_string()]), 32_768)
284            // service_name: low cardinality but hot lookup path — bloom skips row groups fast
285            .set_column_bloom_filter_enabled(
286                ColumnPath::new(vec!["service_name".to_string()]),
287                true,
288            )
289            .set_column_bloom_filter_fpp(ColumnPath::new(vec!["service_name".to_string()]), 0.01)
290            .set_column_bloom_filter_ndv(ColumnPath::new(vec!["service_name".to_string()]), 256)
291            // span_name: high cardinality equality queries (e.g. "grpc.unary/method")
292            .set_column_bloom_filter_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
293            .set_column_bloom_filter_fpp(ColumnPath::new(vec!["span_name".to_string()]), 0.01)
294            .set_column_bloom_filter_ndv(ColumnPath::new(vec!["span_name".to_string()]), 32_768)
295            // Page-level stats on start_time: finest-grained time pruning within row groups.
296            .set_column_statistics_enabled(
297                ColumnPath::new(vec!["start_time".to_string()]),
298                EnabledStatistics::Page,
299            )
300            // status_code: page-level min/max prunes pages for error-only queries.
301            // Do NOT use bloom filter: only 3 possible values (0/1/2), overhead > benefit.
302            .set_column_statistics_enabled(
303                ColumnPath::new(vec!["status_code".to_string()]),
304                EnabledStatistics::Page,
305            )
306            // Delta encoding on near-sorted integer columns: 4-8x compression on timestamps
307            // after Z-ORDER compaction; 2-4x on durations within a service.
308            .set_column_encoding(
309                ColumnPath::new(vec!["start_time".to_string()]),
310                Encoding::DELTA_BINARY_PACKED,
311            )
312            .set_column_encoding(
313                ColumnPath::new(vec!["duration_ms".to_string()]),
314                Encoding::DELTA_BINARY_PACKED,
315            )
316            // ZSTD level 3: ~40% better compression than SNAPPY on text columns;
317            // marginal decompression overhead is offset by reduced I/O.
318            .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
319            // Dictionary hint on span_name: high repetition similar to service_name.
320            .set_column_dictionary_enabled(ColumnPath::new(vec!["span_name".to_string()]), true)
321            .build()
322    }
323
324    /// Write spans to the Delta table (single-writer invariant via actor channel).
325    async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
326        info!("Engine received write request for {} spans", spans.len());
327
328        let batch = self
329            .build_batch(spans)
330            .inspect_err(|e| error!("failed to build batch: {:?}", e))?;
331        info!("Built batch with {} rows", batch.num_rows());
332
333        let mut table_guard = self.table.write().await;
334        info!("Acquired table write lock");
335
336        // update_incremental is intentionally omitted here.
337        //
338        // This engine runs as a single-writer actor — no other process commits to this
339        // Delta table, so the in-memory state is always current. Calling update_incremental
340        // on a freshly-created empty table (version 0, no data files) causes the Delta
341        // Kernel to emit "Not a Delta table: No files in log segment", which mutates
342        // table_guard into a corrupted intermediate state before the error propagates.
343        // That corrupted clone then has no partition column metadata, producing unpartitioned
344        // flat Parquet files instead of partition_date=YYYY-MM-DD/ hive directories.
345
346        let current_table = table_guard.clone();
347
348        let updated_table = current_table
349            .write(vec![batch])
350            .with_save_mode(deltalake::protocol::SaveMode::Append)
351            .with_writer_properties(Self::build_writer_props())
352            // Always declare partition columns explicitly — do not rely solely on the
353            // in-memory snapshot, which can be stale after a failed update_incremental.
354            .with_partition_columns(vec!["partition_date".to_string()])
355            .await?;
356
357        info!("Successfully wrote batch to Delta Lake");
358
359        let new_provider = updated_table.table_provider().await?;
360        // Atomic single-step swap — no deregister/register gap where queries see "not found".
361        self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
362        // Ensure the table's object store is registered with the DataFusion session
363        // so that DeltaScan::scan() can resolve file URLs during query execution.
364        updated_table.update_datafusion_session(&self.ctx.state())?;
365
366        *table_guard = updated_table;
367
368        Ok(())
369    }
370
371    async fn optimize_table(&self) -> Result<(), TraceEngineError> {
372        let mut table_guard = self.table.write().await;
373
374        let current_table = table_guard.clone();
375
376        let (updated_table, _metrics) = current_table
377            .optimize()
378            .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
379            .with_type(OptimizeType::ZOrder(vec![
380                "start_time".to_string(),
381                "service_name".to_string(),
382            ]))
383            // Bloom filters must be re-specified here — compaction rewrites all Parquet files
384            // from scratch using these properties. Without this, every compaction cycle
385            // silently discards all bloom filters on the rewritten files.
386            .with_writer_properties(Self::build_writer_props())
387            .await?;
388
389        self.catalog
390            .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
391        updated_table.update_datafusion_session(&self.ctx.state())?;
392
393        *table_guard = updated_table;
394
395        Ok(())
396    }
397
398    async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
399        let mut table_guard = self.table.write().await;
400
401        let (updated_table, _metrics) = table_guard
402            .clone()
403            .vacuum()
404            .with_retention_period(chrono::Duration::hours(retention_hours as i64))
405            .with_enforce_retention_duration(false)
406            .await?;
407
408        self.catalog
409            .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
410        updated_table.update_datafusion_session(&self.ctx.state())?;
411
412        *table_guard = updated_table;
413
414        Ok(())
415    }
416
417    /// Delete all rows with `partition_date` older than `cutoff_date`.
418    ///
419    /// This is a logical delete — it writes a new Delta log entry marking the rows as removed.
420    /// Physical disk space is not reclaimed until `vacuum_table()` runs afterwards.
421    async fn expire_table(&self, cutoff_date: chrono::NaiveDate) -> Result<(), TraceEngineError> {
422        let mut table_guard = self.table.write().await;
423
424        // CAST('YYYY-MM-DD' AS DATE) produces a Date32 that matches the partition column type,
425        // which allows Delta Lake to translate this into a partition directory filter.
426        let predicate = format!(
427            "partition_date < CAST('{}' AS DATE)",
428            cutoff_date.format("%Y-%m-%d")
429        );
430
431        let (updated_table, metrics) = table_guard
432            .clone()
433            .delete()
434            .with_predicate(predicate)
435            .await?;
436
437        info!(
438            "Expired {} rows older than {}",
439            metrics.num_deleted_rows, cutoff_date
440        );
441
442        self.catalog
443            .swap(TRACE_SPAN_TABLE_NAME, updated_table.table_provider().await?);
444        updated_table.update_datafusion_session(&self.ctx.state())?;
445
446        *table_guard = updated_table;
447
448        Ok(())
449    }
450
451    /// Try to claim and run the optimize task via the control table.
452    ///
453    /// The control table's OCC ensures only one pod runs this at a time across
454    async fn try_run_optimize(&self, interval_hours: u64) {
455        match self.control.try_claim_task(TASK_OPTIMIZE).await {
456            Ok(true) => match self.optimize_table().await {
457                Ok(()) => {
458                    // Vacuum tombstoned files left behind by compaction.
459                    // retention_hours=0 is safe here because the single-writer invariant
460                    // guarantees no concurrent reader is using an older table version.
461                    if let Err(e) = self.vacuum_table(0).await {
462                        error!("Post-optimize vacuum failed: {}", e);
463                    }
464                    let _ = self
465                        .control
466                        .release_task(
467                            TASK_OPTIMIZE,
468                            chrono::Duration::hours(interval_hours as i64),
469                        )
470                        .await;
471                }
472                Err(e) => {
473                    error!("Optimize failed: {}", e);
474                    let _ = self.control.release_task_on_failure(TASK_OPTIMIZE).await;
475                }
476            },
477            Ok(false) => { /* not due or another pod owns it */ }
478            Err(e) => error!("Optimize claim check failed: {}", e),
479        }
480    }
481
482    /// Try to claim and run the retention task via the control table.
483    async fn try_run_retention(&self, retention_days: u32) {
484        match self.control.try_claim_task(TASK_RETENTION).await {
485            Ok(true) => {
486                let cutoff =
487                    (Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
488                match self.expire_table(cutoff).await {
489                    Ok(()) => {
490                        // Reclaim disk space after logical delete
491                        let _ = self.vacuum_table(0).await;
492                        let _ = self
493                            .control
494                            .release_task(TASK_RETENTION, chrono::Duration::hours(24))
495                            .await;
496                    }
497                    Err(e) => {
498                        error!("Retention failed: {}", e);
499                        let _ = self.control.release_task_on_failure(TASK_RETENTION).await;
500                    }
501                }
502            }
503            Ok(false) => {}
504            Err(e) => error!("Retention claim check failed: {}", e),
505        }
506    }
507
508    /// Refresh the in-memory Delta table snapshot from shared object storage.
509    ///
510    /// This is mainly for multiple pods sharing the same storage.
511    /// Safety: clones the table before calling `update_incremental` so that a failure
512    /// (e.g. "Not a Delta table" on an empty table) leaves the original guard intact.
513    async fn refresh_table(&self) -> Result<(), TraceEngineError> {
514        let mut table_guard = self.table.write().await;
515        let current_version = table_guard.version();
516
517        // Clone before update_incremental — on failure the clone is discarded and the
518        // original guard stays intact, avoiding the corrupted-state bug described at line 301.
519        let mut refreshed = table_guard.clone();
520        match refreshed.update_incremental(None).await {
521            Ok(_) => {
522                if refreshed.version() > current_version {
523                    info!(
524                        "Span table refreshed: v{:?} → v{:?}",
525                        current_version,
526                        refreshed.version()
527                    );
528                    let new_provider = refreshed.table_provider().await?;
529                    // Atomic swap — no gap between deregister and register.
530                    self.catalog.swap(TRACE_SPAN_TABLE_NAME, new_provider);
531                    refreshed.update_datafusion_session(&self.ctx.state())?;
532                    *table_guard = refreshed;
533                }
534            }
535            Err(e) => {
536                // Tolerate: empty tables (no log yet), transient network errors.
537                // These are expected on freshly-created tables and do not indicate a bug.
538                debug!("Table refresh skipped: {}", e);
539            }
540        }
541        Ok(())
542    }
543
544    #[instrument(skip_all, name = "trace_engine_actor")]
545    pub fn start_actor(
546        self,
547        compaction_interval_hours: u64,
548        retention_days: Option<u32>,
549        refresh_interval_secs: u64,
550    ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
551        let (tx, mut rx) = mpsc::channel::<TableCommand>(100);
552
553        let handle = tokio::spawn(async move {
554            info!(refresh_interval_secs, "TraceSpanDBEngine actor started");
555
556            // Poll every 5 minutes — the actual schedule is persisted in the
557            // control table's `next_run_at` and survives pod restarts.
558            let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
559            scheduler_ticker.tick().await; // skip immediate tick
560
561            // Refresh ticker: picks up commits from other pods on shared storage.
562            // Runs on every process/pod independently — unlike compaction, there is no control-table
563            // mutual exclusion here. Every pod must refresh its own in-memory snapshot.
564            // Clamp to 1s minimum — tokio::time::interval panics on Duration::ZERO.
565            let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs.max(1)));
566            refresh_ticker.tick().await;
567
568            loop {
569                tokio::select! {
570                    Some(cmd) = rx.recv() => {
571                        match cmd {
572                            TableCommand::Write { spans, respond_to } => {
573                                match self.write_spans(spans).await {
574                                    Ok(_) => { let _ = respond_to.send(Ok(())); }
575                                    Err(e) => {
576                                        tracing::error!("Write failed: {}", e);
577                                        let _ = respond_to.send(Err(e));
578                                    }
579                                }
580                            }
581                            TableCommand::Optimize { respond_to } => {
582                                // Response is sent before vacuum so callers aren't blocked
583                                // on the potentially slow file-deletion pass.
584                                let _ = respond_to.send(self.optimize_table().await);
585                                if let Err(e) = self.vacuum_table(0).await {
586                                    error!("Post-optimize vacuum failed: {}", e);
587                                }
588                            }
589                            TableCommand::Vacuum { retention_hours, respond_to } => {
590                                let _ = respond_to.send(self.vacuum_table(retention_hours).await);
591                            }
592                            TableCommand::Expire { cutoff_date, respond_to } => {
593                                let _ = respond_to.send(self.expire_table(cutoff_date).await);
594                            }
595                            TableCommand::Shutdown => {
596                                tracing::info!("Shutting down table engine");
597                                break;
598                            }
599                        }
600                    }
601                    _ = scheduler_ticker.tick() => {
602                        self.try_run_optimize(compaction_interval_hours).await;
603                        if let Some(days) = retention_days {
604                            self.try_run_retention(days).await;
605                        }
606                    }
607                    _ = refresh_ticker.tick() => {
608                        if let Err(e) = self.refresh_table().await {
609                            error!("Table refresh failed: {}", e);
610                        }
611                    }
612                }
613            }
614        });
615
616        (tx, handle)
617    }
618}
619
620/// Efficient builder for converting `TraceSpanRecord` (ingest type) into Arrow `RecordBatch`.
621///
622/// Hierarchy fields (depth, span_order, path, root_span_id) are NOT included — they are
623/// computed at query time from the flat span data stored here.
624pub struct TraceSpanBatchBuilder {
625    schema: SchemaRef,
626
627    // ID builders
628    trace_id: FixedSizeBinaryBuilder,
629    span_id: FixedSizeBinaryBuilder,
630    parent_span_id: FixedSizeBinaryBuilder,
631
632    // W3C Trace Context
633    flags: Int32Builder,
634    trace_state: StringBuilder,
635
636    // Instrumentation scope
637    scope_name: StringBuilder,
638    scope_version: StringBuilder,
639
640    // Metadata builders
641    service_name: StringDictionaryBuilder<Int32Type>,
642    span_name: StringBuilder,
643    span_kind: StringDictionaryBuilder<Int8Type>,
644
645    // Time builders
646    start_time: TimestampMicrosecondBuilder,
647    end_time: TimestampMicrosecondBuilder,
648    duration_ms: Int64Builder,
649
650    // Status builders
651    status_code: Int32Builder,
652    status_message: StringBuilder,
653
654    // Scouter-specific
655    label: StringBuilder,
656
657    // Attribute builders
658    attributes: MapBuilder<StringBuilder, StringViewBuilder>,
659    resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
660
661    // Nested structure builders
662    events: ListBuilder<StructBuilder>,
663    links: ListBuilder<StructBuilder>,
664
665    // Payload builders
666    input: StringViewBuilder,
667    output: StringViewBuilder,
668
669    // Search optimizer
670    search_blob: StringViewBuilder,
671
672    // Partition key (days since Unix epoch)
673    partition_date: Date32Builder,
674}
675
676impl TraceSpanBatchBuilder {
677    pub fn new(schema: SchemaRef) -> Self {
678        let trace_id = FixedSizeBinaryBuilder::new(16);
679        let span_id = FixedSizeBinaryBuilder::new(8);
680        let parent_span_id = FixedSizeBinaryBuilder::new(8);
681
682        let flags = Int32Builder::new();
683        let trace_state = StringBuilder::new();
684
685        let scope_name = StringBuilder::new();
686        let scope_version = StringBuilder::new();
687
688        let service_name = StringDictionaryBuilder::<Int32Type>::new();
689        let span_name = StringBuilder::new();
690        let span_kind = StringDictionaryBuilder::<Int8Type>::new();
691
692        let start_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
693        let end_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
694        let duration_ms = Int64Builder::new();
695
696        let status_code = Int32Builder::new();
697        let status_message = StringBuilder::new();
698
699        let label = StringBuilder::new();
700
701        let map_field_name = MapFieldNames {
702            entry: "key_value".to_string(),
703            key: "key".to_string(),
704            value: "value".to_string(),
705        };
706        let attributes = MapBuilder::new(
707            Some(map_field_name.clone()),
708            StringBuilder::new(),
709            StringViewBuilder::new(),
710        );
711        let resource_attributes = MapBuilder::new(
712            Some(map_field_name.clone()),
713            StringBuilder::new(),
714            StringViewBuilder::new(),
715        );
716
717        let event_fields = vec![
718            Field::new("name", DataType::Utf8, false),
719            Field::new(
720                "timestamp",
721                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
722                false,
723            ),
724            attribute_field(),
725            Field::new("dropped_attributes_count", DataType::UInt32, false),
726        ];
727
728        let event_struct_builders = vec![
729            Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
730            Box::new(TimestampMicrosecondBuilder::new().with_timezone("UTC"))
731                as Box<dyn ArrayBuilder>,
732            Box::new(MapBuilder::new(
733                Some(map_field_name.clone()),
734                StringBuilder::new(),
735                StringViewBuilder::new(),
736            )) as Box<dyn ArrayBuilder>,
737            Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
738        ];
739
740        let event_struct_builder = StructBuilder::new(event_fields, event_struct_builders);
741        let events = ListBuilder::new(event_struct_builder);
742
743        let link_fields = vec![
744            Field::new("trace_id", DataType::FixedSizeBinary(16), false),
745            Field::new("span_id", DataType::FixedSizeBinary(8), false),
746            Field::new("trace_state", DataType::Utf8, false),
747            attribute_field(),
748            Field::new("dropped_attributes_count", DataType::UInt32, false),
749        ];
750
751        let link_struct_builders = vec![
752            Box::new(FixedSizeBinaryBuilder::new(16)) as Box<dyn ArrayBuilder>,
753            Box::new(FixedSizeBinaryBuilder::new(8)) as Box<dyn ArrayBuilder>,
754            Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
755            Box::new(MapBuilder::new(
756                Some(map_field_name.clone()),
757                StringBuilder::new(),
758                StringViewBuilder::new(),
759            )) as Box<dyn ArrayBuilder>,
760            Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
761        ];
762
763        let link_struct_builder = StructBuilder::new(link_fields, link_struct_builders);
764        let links = ListBuilder::new(link_struct_builder);
765
766        let input = StringViewBuilder::new();
767        let output = StringViewBuilder::new();
768        let search_blob = StringViewBuilder::new();
769        let partition_date = Date32Builder::new();
770
771        Self {
772            schema,
773            trace_id,
774            span_id,
775            parent_span_id,
776            flags,
777            trace_state,
778            scope_name,
779            scope_version,
780            service_name,
781            span_name,
782            span_kind,
783            start_time,
784            end_time,
785            duration_ms,
786            status_code,
787            status_message,
788            label,
789            attributes,
790            resource_attributes,
791            events,
792            links,
793            input,
794            output,
795            search_blob,
796            partition_date,
797        }
798    }
799
800    /// Append a single `TraceSpanRecord` to the batch.
801    pub fn append(&mut self, span: &TraceSpanRecord) -> Result<(), TraceEngineError> {
802        // IDs
803        let trace_bytes = span.trace_id.as_bytes();
804        self.trace_id
805            .append_value(trace_bytes)
806            .map_err(TraceEngineError::ArrowError)?;
807
808        let span_bytes = span.span_id.as_bytes();
809        self.span_id
810            .append_value(span_bytes)
811            .map_err(TraceEngineError::ArrowError)?;
812
813        match &span.parent_span_id {
814            Some(pid) => {
815                self.parent_span_id
816                    .append_value(pid.as_bytes())
817                    .map_err(TraceEngineError::ArrowError)?;
818            }
819            None => self.parent_span_id.append_null(),
820        }
821
822        // W3C Trace Context
823        self.flags.append_value(span.flags);
824        self.trace_state.append_value(&span.trace_state);
825
826        // Instrumentation scope
827        self.scope_name.append_value(&span.scope_name);
828        match &span.scope_version {
829            Some(v) => self.scope_version.append_value(v),
830            None => self.scope_version.append_null(),
831        }
832
833        // Metadata
834        self.service_name.append_value(&span.service_name);
835        self.span_name.append_value(&span.span_name);
836        // span_kind is a non-empty string in TraceSpanRecord — store as non-null
837        if span.span_kind.is_empty() {
838            self.span_kind.append_null();
839        } else {
840            self.span_kind.append_value(&span.span_kind);
841        }
842
843        // Timestamps
844        self.start_time
845            .append_value(span.start_time.timestamp_micros());
846        self.end_time.append_value(span.end_time.timestamp_micros());
847        self.duration_ms.append_value(span.duration_ms);
848
849        // Status
850        self.status_code.append_value(span.status_code);
851        if span.status_message.is_empty() {
852            self.status_message.append_null();
853        } else {
854            self.status_message.append_value(&span.status_message);
855        }
856
857        // Scouter-specific
858        match &span.label {
859            Some(l) => self.label.append_value(l),
860            None => self.label.append_null(),
861        }
862
863        // Attributes
864        self.append_attributes(&span.attributes).inspect_err(|e| {
865            error!(
866                "Failed to append attributes for span {}: {}",
867                span.span_id, e
868            )
869        })?;
870
871        // Resource attributes
872        self.append_resource_attributes(&span.resource_attributes)
873            .inspect_err(|e| {
874                error!(
875                    "Failed to append resource_attributes for span {}: {}",
876                    span.span_id, e
877                )
878            })?;
879
880        // Events
881        self.append_events(&span.events)
882            .inspect_err(|e| error!("Failed to append events for span {}: {}", span.span_id, e))?;
883
884        // Links
885        self.append_links(&span.links)
886            .inspect_err(|e| error!("Failed to append links for span {}: {}", span.span_id, e))?;
887
888        // Payloads
889        self.input.append_value(
890            serde_json::to_string(&span.input).unwrap_or_else(|_| "null".to_string()),
891        );
892
893        self.output.append_value(
894            serde_json::to_string(&span.output).unwrap_or_else(|_| "null".to_string()),
895        );
896
897        // Search blob
898        let search_text = Self::build_search_blob(span);
899        self.search_blob.append_value(search_text);
900
901        // Partition key — days since Unix epoch, derived from span start date
902        let days = span.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
903        self.partition_date.append_value(days);
904
905        Ok(())
906    }
907
908    fn append_attributes(&mut self, attributes: &[Attribute]) -> Result<(), TraceEngineError> {
909        for attr in attributes {
910            self.attributes.keys().append_value(&attr.key);
911            let value_str =
912                serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
913            self.attributes.values().append_value(value_str);
914        }
915        self.attributes.append(true)?;
916        Ok(())
917    }
918
919    fn append_resource_attributes(
920        &mut self,
921        attributes: &[Attribute],
922    ) -> Result<(), TraceEngineError> {
923        if attributes.is_empty() {
924            self.resource_attributes.append(false)?; // null map
925        } else {
926            for attr in attributes {
927                self.resource_attributes.keys().append_value(&attr.key);
928                let value_str =
929                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
930                self.resource_attributes.values().append_value(value_str);
931            }
932            self.resource_attributes.append(true)?;
933        }
934        Ok(())
935    }
936
937    fn append_events(&mut self, events: &[SpanEvent]) -> Result<(), TraceEngineError> {
938        let event_struct = self.events.values();
939        for event in events {
940            let name_builder = event_struct
941                .field_builder::<StringBuilder>(0)
942                .ok_or_else(|| TraceEngineError::DowncastError("event name builder"))?;
943            name_builder.append_value(&event.name);
944
945            let time_builder = event_struct
946                .field_builder::<TimestampMicrosecondBuilder>(1)
947                .ok_or_else(|| TraceEngineError::DowncastError("event timestamp builder"))?;
948            time_builder.append_value(event.timestamp.timestamp_micros());
949
950            let attr_builder = event_struct
951                .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(2)
952                .ok_or_else(|| TraceEngineError::DowncastError("event attributes builder"))?;
953
954            for attr in &event.attributes {
955                attr_builder.keys().append_value(&attr.key);
956                let value_str =
957                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
958                attr_builder.values().append_value(value_str);
959            }
960            attr_builder.append(true)?;
961
962            let dropped_builder =
963                event_struct
964                    .field_builder::<UInt32Builder>(3)
965                    .ok_or_else(|| {
966                        TraceEngineError::DowncastError("dropped attributes count builder")
967                    })?;
968            dropped_builder.append_value(event.dropped_attributes_count);
969
970            event_struct.append(true);
971        }
972
973        self.events.append(true);
974        Ok(())
975    }
976
977    fn append_links(&mut self, links: &[SpanLink]) -> Result<(), TraceEngineError> {
978        let link_struct = self.links.values();
979
980        for link in links {
981            let trace_builder = link_struct
982                .field_builder::<FixedSizeBinaryBuilder>(0)
983                .ok_or_else(|| TraceEngineError::DowncastError("link trace_id builder"))?;
984
985            let trace_bytes = TraceId::hex_to_bytes(&link.trace_id).map_err(|e| {
986                TraceEngineError::InvalidHexId(link.trace_id.clone(), e.to_string())
987            })?;
988            trace_builder.append_value(&trace_bytes)?;
989
990            let span_builder = link_struct
991                .field_builder::<FixedSizeBinaryBuilder>(1)
992                .ok_or_else(|| TraceEngineError::DowncastError("link span_id builder"))?;
993
994            let span_bytes = SpanId::hex_to_bytes(&link.span_id)
995                .map_err(|e| TraceEngineError::InvalidHexId(link.span_id.clone(), e.to_string()))?;
996            span_builder.append_value(&span_bytes)?;
997
998            let state_builder = link_struct
999                .field_builder::<StringBuilder>(2)
1000                .ok_or_else(|| TraceEngineError::DowncastError("link trace_state builder"))?;
1001            state_builder.append_value(&link.trace_state);
1002
1003            let attr_builder = link_struct
1004                .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(3)
1005                .ok_or_else(|| TraceEngineError::DowncastError("link attributes builder"))?;
1006
1007            for attr in &link.attributes {
1008                attr_builder.keys().append_value(&attr.key);
1009                let value_str =
1010                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
1011                attr_builder.values().append_value(value_str);
1012            }
1013            attr_builder.append(true)?;
1014
1015            let dropped_builder =
1016                link_struct
1017                    .field_builder::<UInt32Builder>(4)
1018                    .ok_or_else(|| {
1019                        TraceEngineError::DowncastError("link dropped attributes count builder")
1020                    })?;
1021            dropped_builder.append_value(link.dropped_attributes_count);
1022
1023            link_struct.append(true);
1024        }
1025
1026        self.links.append(true);
1027        Ok(())
1028    }
1029
1030    /// Build a concatenated search string from `TraceSpanRecord` for full-text queries.
1031    ///
1032    /// Uses pipe-bounded tokens (`|key=value|`) to prevent false-positive substring matches
1033    /// where a value contains something that looks like a different attribute key or value.
1034    /// Queries use `%key=value%` patterns which match both old `key:value` archive data
1035    /// and the new `|key=value|` format.
1036    fn build_search_blob(span: &TraceSpanRecord) -> String {
1037        let mut search = String::with_capacity(512);
1038
1039        // Pipe-bounded bare tokens for full-text (service, span, scope)
1040        search.push('|');
1041        search.push_str(&span.service_name);
1042        search.push_str("| |");
1043        search.push_str(&span.span_name);
1044        search.push_str("| |");
1045        search.push_str(&span.scope_name);
1046        search.push('|');
1047
1048        if !span.status_message.is_empty() {
1049            search.push_str(" |");
1050            search.push_str(&span.status_message);
1051            search.push('|');
1052        }
1053
1054        // Pipe-bounded key=value tokens — standardize on `=` separator
1055        for attr in &span.attributes {
1056            search.push_str(" |");
1057            search.push_str(&attr.key);
1058            search.push('=');
1059            match &attr.value {
1060                Value::String(s) => search.push_str(s),
1061                Value::Number(n) => search.push_str(&n.to_string()),
1062                Value::Bool(b) => search.push_str(&b.to_string()),
1063                Value::Null => {}
1064                other => search.push_str(&other.to_string()),
1065            }
1066            search.push('|');
1067        }
1068
1069        for event in &span.events {
1070            search.push_str(" |");
1071            search.push_str(&event.name);
1072            search.push('|');
1073        }
1074
1075        search
1076    }
1077
1078    /// Finalize and build the RecordBatch. Column order must match `create_schema()`.
1079    pub fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
1080        let batch = RecordBatch::try_new(
1081            self.schema.clone(),
1082            vec![
1083                Arc::new(self.trace_id.finish()),
1084                Arc::new(self.span_id.finish()),
1085                Arc::new(self.parent_span_id.finish()),
1086                Arc::new(self.flags.finish()),
1087                Arc::new(self.trace_state.finish()),
1088                Arc::new(self.scope_name.finish()),
1089                Arc::new(self.scope_version.finish()),
1090                Arc::new(self.service_name.finish()),
1091                Arc::new(self.span_name.finish()),
1092                Arc::new(self.span_kind.finish()),
1093                Arc::new(self.start_time.finish()),
1094                Arc::new(self.end_time.finish()),
1095                Arc::new(self.duration_ms.finish()),
1096                Arc::new(self.status_code.finish()),
1097                Arc::new(self.status_message.finish()),
1098                Arc::new(self.label.finish()),
1099                Arc::new(self.attributes.finish()),
1100                Arc::new(self.resource_attributes.finish()),
1101                Arc::new(self.events.finish()),
1102                Arc::new(self.links.finish()),
1103                Arc::new(self.input.finish()),
1104                Arc::new(self.output.finish()),
1105                Arc::new(self.search_blob.finish()),
1106                Arc::new(self.partition_date.finish()),
1107            ],
1108        )?;
1109
1110        Ok(batch)
1111    }
1112}