Skip to main content

scouter_dataframe/parquet/tracing/
queries.rs

1use crate::error::TraceEngineError;
2use crate::parquet::utils::match_attr_expr;
3use arrow::array::RecordBatch;
4use arrow::array::{
5    BinaryArray, Int32Array, Int64Array, ListArray, MapArray, StringArray,
6    TimestampMicrosecondArray,
7};
8use arrow::compute::cast;
9use arrow::datatypes::DataType;
10use arrow_array::Array;
11use chrono::{DateTime, Datelike, TimeZone, Utc};
12use datafusion::common::JoinType;
13use datafusion::logical_expr::{cast as df_cast, col, lit, when, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use mini_moka::sync::Cache;
17use scouter_types::sql::{TraceFilters, TraceMetricBucket, TraceSpan};
18use scouter_types::{Attribute, SpanEvent, SpanId, SpanLink, TraceId};
19use std::collections::HashMap;
20use std::hash::{Hash, Hasher};
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{error, info, instrument};
24
25/// Days from year-0001 to Unix epoch (1970-01-01), used to convert chrono → Arrow Date32.
26const UNIX_EPOCH_DAYS: i32 = 719_163;
27
28/// Build a typed `Timestamp(Microsecond, UTC)` literal for DataFusion predicates.
29///
30/// Using this instead of `lit(dt.to_rfc3339())` ensures the predicate type matches
31/// the column type exactly, enabling Parquet row-group min/max pruning.
32#[inline]
33pub(crate) fn ts_lit(dt: &DateTime<Utc>) -> Expr {
34    lit(ScalarValue::TimestampMicrosecond(
35        Some(dt.timestamp_micros()),
36        Some("UTC".into()),
37    ))
38}
39
40/// Build a typed `Date32` literal for DataFusion partition pruning predicates.
41///
42/// Partition-level filters are evaluated at directory granularity — DataFusion skips
43/// entire `partition_date=YYYY-MM-DD/` directories before reading any file statistics.
44#[inline]
45pub(crate) fn date_lit(dt: &DateTime<Utc>) -> Expr {
46    let days = dt.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
47    lit(ScalarValue::Date32(Some(days)))
48}
49
50// Column name constants
51pub const START_TIME_COL: &str = "start_time";
52pub const PARTITION_DATE_COL: &str = "partition_date";
53pub const END_TIME_COL: &str = "end_time";
54pub const SERVICE_NAME_COL: &str = "service_name";
55pub const TRACE_ID_COL: &str = "trace_id";
56pub const SPAN_ID_COL: &str = "span_id";
57pub const PARENT_SPAN_ID_COL: &str = "parent_span_id";
58pub const SPAN_NAME_COL: &str = "span_name";
59pub const SPAN_KIND_COL: &str = "span_kind";
60pub const DURATION_MS_COL: &str = "duration_ms";
61pub const STATUS_CODE_COL: &str = "status_code";
62pub const STATUS_MESSAGE_COL: &str = "status_message";
63pub const ATTRIBUTES_COL: &str = "attributes";
64pub const EVENTS_COL: &str = "events";
65pub const LINKS_COL: &str = "links";
66pub const INPUT_COL: &str = "input";
67pub const OUTPUT_COL: &str = "output";
68pub const SEARCH_BLOB_COL: &str = "search_blob";
69pub const ENTITY_ID_COL: &str = "entity_id";
70pub const SPAN_TABLE_NAME: &str = "trace_spans";
71
72const SUMMARY_TABLE_NAME: &str = "trace_summaries";
73const ERROR_COUNT_COL: &str = "error_count";
74const ENTITY_IDS_COL: &str = "entity_ids";
75const QUEUE_IDS_COL: &str = "queue_ids";
76
77/// Columns needed to reconstruct a `TraceSpan` (all fields except search_blob).
78const SPAN_COLUMNS: &[&str] = &[
79    TRACE_ID_COL,
80    SPAN_ID_COL,
81    PARENT_SPAN_ID_COL,
82    SERVICE_NAME_COL,
83    SPAN_NAME_COL,
84    SPAN_KIND_COL,
85    START_TIME_COL,
86    END_TIME_COL,
87    DURATION_MS_COL,
88    STATUS_CODE_COL,
89    STATUS_MESSAGE_COL,
90    ATTRIBUTES_COL,
91    EVENTS_COL,
92    LINKS_COL,
93    INPUT_COL,
94    OUTPUT_COL,
95];
96
97/// Flat span extracted from Arrow — no hierarchy fields.
98/// `build_span_tree()` assigns depth, span_order, path, root_span_id.
99struct FlatSpan {
100    trace_id: [u8; 16],
101    span_id: [u8; 8],
102    parent_span_id: Option<[u8; 8]>,
103    service_name: String,
104    span_name: String,
105    span_kind: Option<String>,
106    start_time: DateTime<Utc>,
107    end_time: DateTime<Utc>,
108    duration_ms: i64,
109    status_code: i32,
110    status_message: Option<String>,
111    attributes: Vec<Attribute>,
112    events: Vec<SpanEvent>,
113    links: Vec<SpanLink>,
114    input: Option<serde_json::Value>,
115    output: Option<serde_json::Value>,
116}
117
118struct TraceQueryBuilder {
119    df: DataFrame,
120}
121
122impl TraceQueryBuilder {
123    async fn set_table(
124        ctx: Arc<SessionContext>,
125        table_name: &str,
126    ) -> Result<Self, TraceEngineError> {
127        let df = ctx
128            .table(table_name)
129            .await
130            .inspect_err(|e| error!("Failed to load table {}: {}", table_name, e))?;
131        Ok(Self { df })
132    }
133
134    fn select_columns(mut self, columns: &[&str]) -> Result<Self, TraceEngineError> {
135        self.df = self.df.select_columns(columns)?;
136        Ok(self)
137    }
138
139    fn add_filter(mut self, expr: Expr) -> Result<Self, TraceEngineError> {
140        self.df = self.df.filter(expr)?;
141        Ok(self)
142    }
143
144    fn add_sort(mut self, sort: Vec<SortExpr>) -> Result<Self, TraceEngineError> {
145        self.df = self.df.sort(sort)?;
146        Ok(self)
147    }
148
149    fn with_limit(mut self, n: Option<usize>) -> Result<Self, TraceEngineError> {
150        self.df = self.df.limit(0, n)?;
151        Ok(self)
152    }
153
154    async fn execute(self) -> Result<Vec<RecordBatch>, TraceEngineError> {
155        let batches = self
156            .df
157            .collect()
158            .await
159            .inspect_err(|e| error!("Failed to collect query results: {}", e))?;
160        Ok(batches)
161    }
162}
163
164/// Extract attributes from a MapArray at a given row index.
165fn extract_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
166    if map_array.is_null(row_idx) {
167        return Vec::new();
168    }
169    let entry = map_array.value(row_idx);
170    let struct_array = entry
171        .as_any()
172        .downcast_ref::<arrow::array::StructArray>()
173        .unwrap();
174    let keys_arr = cast(struct_array.column(0).as_ref(), &DataType::Utf8).unwrap();
175    let keys = keys_arr.as_any().downcast_ref::<StringArray>().unwrap();
176    let values_arr = cast(struct_array.column(1).as_ref(), &DataType::Utf8).unwrap();
177    let values = values_arr.as_any().downcast_ref::<StringArray>().unwrap();
178
179    (0..struct_array.len())
180        .map(|i| Attribute {
181            key: keys.value(i).to_string(),
182            value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
183        })
184        .collect()
185}
186
187/// Extract SpanEvents from a ListArray at a given row index.
188fn extract_events(list_array: &ListArray, row_idx: usize) -> Vec<SpanEvent> {
189    if list_array.is_null(row_idx) {
190        return Vec::new();
191    }
192    let values = list_array.value(row_idx);
193    let struct_array = values
194        .as_any()
195        .downcast_ref::<arrow::array::StructArray>()
196        .unwrap();
197
198    let names_arr = cast(
199        struct_array
200            .column_by_name("name")
201            .expect("event name col")
202            .as_ref(),
203        &DataType::Utf8,
204    )
205    .expect("event name cast");
206    let names = names_arr
207        .as_any()
208        .downcast_ref::<StringArray>()
209        .expect("event name StringArray");
210    let timestamps = struct_array
211        .column_by_name("timestamp")
212        .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
213        .expect("event timestamp should be TimestampMicrosecondArray");
214    let attrs = struct_array
215        .column_by_name("attributes")
216        .and_then(|c| c.as_any().downcast_ref::<MapArray>())
217        .expect("event attributes should be MapArray");
218    // Delta Lake maps UInt32 → Integer (Int32); DataFusion returns Int32Array on read.
219    let dropped = struct_array
220        .column_by_name("dropped_attributes_count")
221        .and_then(|c| c.as_any().downcast_ref::<arrow::array::Int32Array>())
222        .expect("dropped_attributes_count should be Int32Array");
223
224    (0..struct_array.len())
225        .map(|i| {
226            let micros = timestamps.value(i);
227            let secs = micros / 1_000_000;
228            let nanos = ((micros % 1_000_000) * 1_000) as u32;
229            let ts = Utc.timestamp_opt(secs, nanos).unwrap();
230            SpanEvent {
231                name: names.value(i).to_string(),
232                timestamp: ts,
233                attributes: extract_attributes(attrs, i),
234                dropped_attributes_count: dropped.value(i) as u32,
235            }
236        })
237        .collect()
238}
239
240/// Extract SpanLinks from a ListArray at a given row index.
241fn extract_links(list_array: &ListArray, row_idx: usize) -> Vec<SpanLink> {
242    if list_array.is_null(row_idx) {
243        return Vec::new();
244    }
245    let values = list_array.value(row_idx);
246    let struct_array = values
247        .as_any()
248        .downcast_ref::<arrow::array::StructArray>()
249        .unwrap();
250
251    // Cast FixedSizeBinary → Binary and any string variant → Utf8 for type-stable access.
252    let trace_id_arr = cast(
253        struct_array
254            .column_by_name("trace_id")
255            .expect("link trace_id col")
256            .as_ref(),
257        &DataType::Binary,
258    )
259    .expect("link trace_id cast");
260    let trace_ids = trace_id_arr
261        .as_any()
262        .downcast_ref::<BinaryArray>()
263        .expect("link trace_id BinaryArray");
264    let span_id_arr = cast(
265        struct_array
266            .column_by_name("span_id")
267            .expect("link span_id col")
268            .as_ref(),
269        &DataType::Binary,
270    )
271    .expect("link span_id cast");
272    let span_ids = span_id_arr
273        .as_any()
274        .downcast_ref::<BinaryArray>()
275        .expect("link span_id BinaryArray");
276    let trace_state_arr = cast(
277        struct_array
278            .column_by_name("trace_state")
279            .expect("link trace_state col")
280            .as_ref(),
281        &DataType::Utf8,
282    )
283    .expect("link trace_state cast");
284    let trace_states = trace_state_arr
285        .as_any()
286        .downcast_ref::<StringArray>()
287        .expect("link trace_state StringArray");
288    let attrs = struct_array
289        .column_by_name("attributes")
290        .and_then(|c| c.as_any().downcast_ref::<MapArray>())
291        .expect("link attributes should be MapArray");
292    // Delta Lake maps UInt32 → Integer (Int32); DataFusion returns Int32Array on read.
293    let dropped = struct_array
294        .column_by_name("dropped_attributes_count")
295        .and_then(|c| c.as_any().downcast_ref::<arrow::array::Int32Array>())
296        .expect("link dropped_attributes_count should be Int32Array");
297
298    (0..struct_array.len())
299        .map(|i| {
300            let tid_bytes: [u8; 16] = trace_ids.value(i).try_into().expect("16 bytes");
301            let sid_bytes: [u8; 8] = span_ids.value(i).try_into().expect("8 bytes");
302            SpanLink {
303                trace_id: TraceId::from_bytes(tid_bytes).to_hex(),
304                span_id: SpanId::from_bytes(sid_bytes).to_hex(),
305                trace_state: trace_states.value(i).to_string(),
306                attributes: extract_attributes(attrs, i),
307                dropped_attributes_count: dropped.value(i) as u32,
308            }
309        })
310        .collect()
311}
312
313/// Convert Arrow RecordBatches to `FlatSpan` intermediate structs.
314fn batches_to_flat_spans(batches: Vec<RecordBatch>) -> Result<Vec<FlatSpan>, TraceEngineError> {
315    let mut result = Vec::new();
316
317    for batch in &batches {
318        let schema = batch.schema();
319
320        macro_rules! col_idx {
321            ($name:expr) => {
322                schema.index_of($name).map_err(|_| {
323                    TraceEngineError::BatchConversion(format!("Missing column: {}", $name))
324                })?
325            };
326        }
327
328        // Cast FixedSizeBinary → Binary: table_provider() may return either type depending
329        // on whether DataFusion resolves the Delta schema or the Arrow Parquet file metadata.
330        // cast() is zero-copy for fixed-size → variable-length binary reinterpretation.
331        let trace_id_arr = cast(
332            batch.column(col_idx!("trace_id")).as_ref(),
333            &DataType::Binary,
334        )
335        .map_err(|e| TraceEngineError::BatchConversion(format!("trace_id cast: {e}")))?;
336        let trace_id_col = trace_id_arr
337            .as_any()
338            .downcast_ref::<BinaryArray>()
339            .ok_or_else(|| TraceEngineError::BatchConversion("trace_id not BinaryArray".into()))?;
340
341        let span_id_arr = cast(
342            batch.column(col_idx!("span_id")).as_ref(),
343            &DataType::Binary,
344        )
345        .map_err(|e| TraceEngineError::BatchConversion(format!("span_id cast: {e}")))?;
346        let span_id_col = span_id_arr
347            .as_any()
348            .downcast_ref::<BinaryArray>()
349            .ok_or_else(|| TraceEngineError::BatchConversion("span_id not BinaryArray".into()))?;
350
351        let parent_id_arr = cast(
352            batch.column(col_idx!("parent_span_id")).as_ref(),
353            &DataType::Binary,
354        )
355        .map_err(|e| TraceEngineError::BatchConversion(format!("parent_span_id cast: {e}")))?;
356        let parent_id_col = parent_id_arr
357            .as_any()
358            .downcast_ref::<BinaryArray>()
359            .ok_or_else(|| {
360                TraceEngineError::BatchConversion("parent_span_id not BinaryArray".into())
361            })?;
362        // Dictionary(Int32/Int8, Utf8) comes back as DictionaryArray from Parquet schema path;
363        // cast to Utf8 normalizes to StringArray regardless of schema path.
364        let svc_arr = cast(
365            batch.column(col_idx!("service_name")).as_ref(),
366            &DataType::Utf8,
367        )
368        .map_err(|e| TraceEngineError::BatchConversion(format!("service_name cast: {e}")))?;
369        let svc_col = svc_arr
370            .as_any()
371            .downcast_ref::<StringArray>()
372            .ok_or_else(|| {
373                TraceEngineError::BatchConversion("service_name not StringArray".into())
374            })?;
375        let span_name_arr = cast(
376            batch.column(col_idx!("span_name")).as_ref(),
377            &DataType::Utf8,
378        )
379        .map_err(|e| TraceEngineError::BatchConversion(format!("span_name cast: {e}")))?;
380        let span_name_col = span_name_arr
381            .as_any()
382            .downcast_ref::<StringArray>()
383            .ok_or_else(|| TraceEngineError::BatchConversion("span_name not StringArray".into()))?;
384        let span_kind_arr = cast(
385            batch.column(col_idx!("span_kind")).as_ref(),
386            &DataType::Utf8,
387        )
388        .map_err(|e| TraceEngineError::BatchConversion(format!("span_kind cast: {e}")))?;
389        let span_kind_col = span_kind_arr
390            .as_any()
391            .downcast_ref::<StringArray>()
392            .ok_or_else(|| TraceEngineError::BatchConversion("span_kind not StringArray".into()))?;
393        let start_col = batch
394            .column(col_idx!("start_time"))
395            .as_any()
396            .downcast_ref::<TimestampMicrosecondArray>()
397            .ok_or_else(|| TraceEngineError::BatchConversion("start_time not Timestamp".into()))?;
398        let end_col = batch
399            .column(col_idx!("end_time"))
400            .as_any()
401            .downcast_ref::<TimestampMicrosecondArray>()
402            .ok_or_else(|| TraceEngineError::BatchConversion("end_time not Timestamp".into()))?;
403        let dur_col = batch
404            .column(col_idx!("duration_ms"))
405            .as_any()
406            .downcast_ref::<Int64Array>()
407            .ok_or_else(|| TraceEngineError::BatchConversion("duration_ms not Int64".into()))?;
408        let sc_col = batch
409            .column(col_idx!("status_code"))
410            .as_any()
411            .downcast_ref::<Int32Array>()
412            .ok_or_else(|| TraceEngineError::BatchConversion("status_code not Int32".into()))?;
413        let sm_arr = cast(
414            batch.column(col_idx!("status_message")).as_ref(),
415            &DataType::Utf8,
416        )
417        .map_err(|e| TraceEngineError::BatchConversion(format!("status_message cast: {e}")))?;
418        let sm_col = sm_arr
419            .as_any()
420            .downcast_ref::<StringArray>()
421            .ok_or_else(|| {
422                TraceEngineError::BatchConversion("status_message not StringArray".into())
423            })?;
424        let attrs_col = batch
425            .column(col_idx!("attributes"))
426            .as_any()
427            .downcast_ref::<MapArray>()
428            .ok_or_else(|| TraceEngineError::BatchConversion("attributes not MapArray".into()))?;
429        let events_col = batch
430            .column(col_idx!("events"))
431            .as_any()
432            .downcast_ref::<ListArray>()
433            .ok_or_else(|| TraceEngineError::BatchConversion("events not ListArray".into()))?;
434        let links_col = batch
435            .column(col_idx!("links"))
436            .as_any()
437            .downcast_ref::<ListArray>()
438            .ok_or_else(|| TraceEngineError::BatchConversion("links not ListArray".into()))?;
439        // Utf8View stored in Parquet may come back as StringViewArray; cast to Utf8 normalizes.
440        let input_arr = cast(batch.column(col_idx!("input")).as_ref(), &DataType::Utf8)
441            .map_err(|e| TraceEngineError::BatchConversion(format!("input cast: {e}")))?;
442        let input_col = input_arr
443            .as_any()
444            .downcast_ref::<StringArray>()
445            .ok_or_else(|| TraceEngineError::BatchConversion("input not StringArray".into()))?;
446        let output_arr = cast(batch.column(col_idx!("output")).as_ref(), &DataType::Utf8)
447            .map_err(|e| TraceEngineError::BatchConversion(format!("output cast: {e}")))?;
448        let output_col = output_arr
449            .as_any()
450            .downcast_ref::<StringArray>()
451            .ok_or_else(|| TraceEngineError::BatchConversion("output not StringArray".into()))?;
452
453        for i in 0..batch.num_rows() {
454            let tid_bytes: [u8; 16] = trace_id_col
455                .value(i)
456                .try_into()
457                .map_err(|_| TraceEngineError::BatchConversion("trace_id bad length".into()))?;
458            let sid_bytes: [u8; 8] = span_id_col
459                .value(i)
460                .try_into()
461                .map_err(|_| TraceEngineError::BatchConversion("span_id bad length".into()))?;
462
463            let parent_id = if parent_id_col.is_null(i) {
464                None
465            } else {
466                let bytes: [u8; 8] = parent_id_col.value(i).try_into().map_err(|_| {
467                    TraceEngineError::BatchConversion("parent_span_id bad length".into())
468                })?;
469                Some(bytes)
470            };
471
472            let micros_start = start_col.value(i);
473            let start_time = Utc
474                .timestamp_opt(
475                    micros_start / 1_000_000,
476                    ((micros_start % 1_000_000) * 1_000) as u32,
477                )
478                .unwrap();
479            let micros_end = end_col.value(i);
480            let end_time = Utc
481                .timestamp_opt(
482                    micros_end / 1_000_000,
483                    ((micros_end % 1_000_000) * 1_000) as u32,
484                )
485                .unwrap();
486
487            let input = if input_col.is_null(i) {
488                None
489            } else {
490                serde_json::from_str(input_col.value(i)).ok()
491            };
492            let output = if output_col.is_null(i) {
493                None
494            } else {
495                serde_json::from_str(output_col.value(i)).ok()
496            };
497
498            result.push(FlatSpan {
499                trace_id: tid_bytes,
500                span_id: sid_bytes,
501                parent_span_id: parent_id,
502                service_name: svc_col.value(i).to_string(),
503                span_name: span_name_col.value(i).to_string(),
504                span_kind: if span_kind_col.is_null(i) {
505                    None
506                } else {
507                    Some(span_kind_col.value(i).to_string())
508                },
509                start_time,
510                end_time,
511                duration_ms: dur_col.value(i),
512                status_code: sc_col.value(i),
513                status_message: if sm_col.is_null(i) {
514                    None
515                } else {
516                    Some(sm_col.value(i).to_string())
517                },
518                attributes: extract_attributes(attrs_col, i),
519                events: extract_events(events_col, i),
520                links: extract_links(links_col, i),
521                input,
522                output,
523            });
524        }
525    }
526
527    Ok(result)
528}
529
530/// Build a `Vec<TraceSpan>` from flat spans by computing hierarchy via DFS traversal.
531///
532/// Assigns `depth`, `span_order`, `path`, and `root_span_id` — the same fields that
533/// Postgres computed via a recursive CTE. Spans are returned in DFS order (span_order ascending).
534fn build_span_tree(spans: Vec<FlatSpan>) -> Vec<TraceSpan> {
535    if spans.is_empty() {
536        return Vec::new();
537    }
538
539    // Find root span (no parent)
540    let root_span_id_hex = spans
541        .iter()
542        .find(|s| s.parent_span_id.is_none())
543        .map(|s| SpanId::from_bytes(s.span_id).to_hex())
544        .unwrap_or_else(|| {
545            // All spans have parents — use first span's parent as synthetic root
546            SpanId::from_bytes(spans[0].span_id).to_hex()
547        });
548
549    // Build children map: parent_span_id → Vec<index>
550    let mut children: HashMap<[u8; 8], Vec<usize>> = HashMap::new();
551    let mut root_indices: Vec<usize> = Vec::new();
552
553    for (i, span) in spans.iter().enumerate() {
554        if let Some(pid) = span.parent_span_id {
555            children.entry(pid).or_default().push(i);
556        } else {
557            root_indices.push(i);
558        }
559    }
560
561    // Sort root indices by start_time for deterministic ordering
562    root_indices.sort_by_key(|&i| spans[i].start_time);
563
564    let mut result: Vec<TraceSpan> = Vec::with_capacity(spans.len());
565    let mut span_order: i32 = 0;
566
567    dfs_assign(
568        &root_indices,
569        &spans,
570        &children,
571        0,
572        Vec::new(),
573        &root_span_id_hex,
574        &mut result,
575        &mut span_order,
576    );
577
578    // Attach orphan spans (parent not found in this batch)
579    let visited: std::collections::HashSet<[u8; 8]> = result
580        .iter()
581        .filter_map(|s| {
582            let v = SpanId::hex_to_bytes(&s.span_id).ok()?;
583            let arr: [u8; 8] = v.try_into().ok()?;
584            Some(arr)
585        })
586        .collect();
587
588    for span in spans.iter() {
589        if !visited.contains(&span.span_id) {
590            let span_id_hex = SpanId::from_bytes(span.span_id).to_hex();
591            result.push(flat_to_trace_span(
592                span,
593                &span_id_hex,
594                &root_span_id_hex,
595                0,
596                vec![span_id_hex.clone()],
597                span_order,
598            ));
599            span_order += 1;
600        }
601    }
602
603    result
604}
605
606#[allow(clippy::too_many_arguments)]
607fn dfs_assign(
608    indices: &[usize],
609    spans: &[FlatSpan],
610    children: &HashMap<[u8; 8], Vec<usize>>,
611    depth: i32,
612    path_so_far: Vec<String>,
613    root_span_id_hex: &str,
614    result: &mut Vec<TraceSpan>,
615    span_order: &mut i32,
616) {
617    for &idx in indices {
618        let span = &spans[idx];
619        let span_id_hex = SpanId::from_bytes(span.span_id).to_hex();
620
621        let mut path = path_so_far.clone();
622        path.push(span_id_hex.clone());
623
624        result.push(flat_to_trace_span(
625            span,
626            &span_id_hex,
627            root_span_id_hex,
628            depth,
629            path.clone(),
630            *span_order,
631        ));
632        *span_order += 1;
633
634        // Recurse into children, sorted by start_time
635        if let Some(child_indices) = children.get(&span.span_id) {
636            let mut sorted = child_indices.clone();
637            sorted.sort_by_key(|&i| spans[i].start_time);
638            dfs_assign(
639                &sorted,
640                spans,
641                children,
642                depth + 1,
643                path,
644                root_span_id_hex,
645                result,
646                span_order,
647            );
648        }
649    }
650}
651
652fn flat_to_trace_span(
653    span: &FlatSpan,
654    span_id_hex: &str,
655    root_span_id_hex: &str,
656    depth: i32,
657    path: Vec<String>,
658    span_order: i32,
659) -> TraceSpan {
660    TraceSpan {
661        trace_id: TraceId::from_bytes(span.trace_id).to_hex(),
662        span_id: span_id_hex.to_string(),
663        parent_span_id: span.parent_span_id.map(|b| SpanId::from_bytes(b).to_hex()),
664        span_name: span.span_name.clone(),
665        span_kind: span.span_kind.clone(),
666        start_time: span.start_time,
667        end_time: span.end_time,
668        duration_ms: span.duration_ms,
669        status_code: span.status_code,
670        status_message: span.status_message.clone(),
671        attributes: span.attributes.clone(),
672        events: span.events.clone(),
673        links: span.links.clone(),
674        depth,
675        path,
676        root_span_id: root_span_id_hex.to_string(),
677        service_name: span.service_name.clone(),
678        span_order,
679        input: span.input.clone(),
680        output: span.output.clone(),
681    }
682}
683
684/// Normalize an attribute filter string for search_blob LIKE matching.
685///
686/// Converts `key:value` separator to `key=value` (standardized format) so filters match
687/// the pipe-bounded `|key=value|` blob written by `build_search_blob()`.
688///
689/// Note: URL-like patterns (`http://`) are left unchanged to avoid breaking URL values.
690pub(crate) fn normalize_attr_filter(filter: &str) -> String {
691    let normalized = match filter.find(':') {
692        Some(pos) if !filter[pos..].starts_with("://") => {
693            format!("{}={}", &filter[..pos], &filter[pos + 1..])
694        }
695        _ => filter.to_string(),
696    };
697    format!("%{}%", normalized)
698}
699
700/// High-performance query patterns for Delta Lake trace storage.
701///
702/// Time predicates are always applied FIRST to enable Delta Lake partition pruning.
703/// `span_cache` provides sub-millisecond repeat reads for trace detail clicks.
704/// `metrics_cache` provides sub-millisecond repeat reads for dashboard metric charts.
705pub struct TraceQueries {
706    ctx: Arc<SessionContext>,
707    /// LRU cache keyed by 16-byte trace ID. TTL=5 min — archived span data is immutable.
708    span_cache: Cache<[u8; 16], Arc<Vec<TraceSpan>>>,
709    /// LRU cache keyed by hash of (service, start, end, interval, filters, entity).
710    /// TTL=60s — short enough to reflect new archive writes, long enough to absorb UI refreshes.
711    metrics_cache: Cache<u64, Arc<Vec<TraceMetricBucket>>>,
712}
713
714/// Compute a stable u64 cache key from all `get_trace_metrics` parameters.
715fn metrics_cache_key(
716    service_name: Option<&str>,
717    start_time: &DateTime<Utc>,
718    end_time: &DateTime<Utc>,
719    bucket_interval: &str,
720    attribute_filters: Option<&[String]>,
721    entity_uid: Option<&str>,
722) -> u64 {
723    let mut h = std::collections::hash_map::DefaultHasher::new();
724    service_name.hash(&mut h);
725    start_time.timestamp_micros().hash(&mut h);
726    end_time.timestamp_micros().hash(&mut h);
727    bucket_interval.hash(&mut h);
728    attribute_filters.hash(&mut h);
729    entity_uid.hash(&mut h);
730    h.finish()
731}
732
733impl TraceQueries {
734    pub fn new(ctx: Arc<SessionContext>) -> Self {
735        let span_cache = Cache::builder()
736            .max_capacity(1_000)
737            .time_to_live(Duration::from_secs(300))
738            .build();
739        let metrics_cache = Cache::builder()
740            .max_capacity(500)
741            .time_to_live(Duration::from_secs(60))
742            .build();
743        Self {
744            ctx,
745            span_cache,
746            metrics_cache,
747        }
748    }
749
750    /// Get all spans for a trace, reconstructed as a tree with hierarchy fields populated.
751    ///
752    /// # Arguments
753    /// * `trace_id_bytes` - Raw 16-byte trace ID
754    /// * `service_name` - Optional service filter
755    /// * `start_time` - Optional lower time bound (applied FIRST for partition pruning)
756    /// * `end_time` - Optional upper time bound
757    /// * `limit` - Optional row limit
758    ///
759    /// When `trace_id_bytes` is 16 bytes, results are cached for 5 minutes — repeat detail
760    /// clicks (common in the UI) return in <1µs without hitting Delta Lake.
761    #[instrument(skip_all)]
762    pub async fn get_trace_spans(
763        &self,
764        trace_id_bytes: Option<&[u8]>,
765        service_name: Option<&str>,
766        start_time: Option<&DateTime<Utc>>,
767        end_time: Option<&DateTime<Utc>>,
768        limit: Option<usize>,
769    ) -> Result<Vec<TraceSpan>, TraceEngineError> {
770        // Cache lookup for by-id trace detail queries (the hot interactive path).
771        if let Some(tid) = trace_id_bytes {
772            if let Ok(key) = <[u8; 16]>::try_from(tid) {
773                if let Some(cached) = self.span_cache.get(&key) {
774                    return Ok((*cached).clone());
775                }
776
777                let result = self
778                    .query_spans(Some(tid), service_name, start_time, end_time, limit)
779                    .await?;
780                self.span_cache.insert(key, Arc::new(result.clone()));
781                return Ok(result);
782            }
783        }
784
785        // No trace_id or non-16-byte ID — uncached scan path (time/service/attribute queries).
786        self.query_spans(trace_id_bytes, service_name, start_time, end_time, limit)
787            .await
788    }
789
790    /// Execute the actual DataFusion query without cache logic.
791    pub async fn query_spans(
792        &self,
793        trace_id_bytes: Option<&[u8]>,
794        service_name: Option<&str>,
795        start_time: Option<&DateTime<Utc>>,
796        end_time: Option<&DateTime<Utc>>,
797        limit: Option<usize>,
798    ) -> Result<Vec<TraceSpan>, TraceEngineError> {
799        let mut builder = TraceQueryBuilder::set_table(self.ctx.clone(), SPAN_TABLE_NAME).await?;
800
801        // Partition filters FIRST — eliminates whole partition_date=YYYY-MM-DD/ directories
802        // at directory level before any file metadata or Parquet statistics are read.
803        if let Some(start) = start_time {
804            builder = builder.add_filter(col(PARTITION_DATE_COL).gt_eq(date_lit(start)))?;
805        }
806        if let Some(end) = end_time {
807            builder = builder.add_filter(col(PARTITION_DATE_COL).lt_eq(date_lit(end)))?;
808        }
809
810        // Row-group level pruning — typed Timestamp literals enable Parquet min/max pruning
811        // within the partition directories that survived the directory-level filter above.
812        if let Some(start) = start_time {
813            builder = builder.add_filter(col(START_TIME_COL).gt_eq(ts_lit(start)))?;
814        }
815        if let Some(end) = end_time {
816            builder = builder.add_filter(col(START_TIME_COL).lt(ts_lit(end)))?;
817        }
818
819        if let Some(tid) = trace_id_bytes {
820            builder = builder.add_filter(col(TRACE_ID_COL).eq(lit(tid)))?;
821        }
822
823        if let Some(svc) = service_name {
824            builder = builder.add_filter(col(SERVICE_NAME_COL).eq(lit(svc)))?;
825        }
826
827        builder = builder.select_columns(SPAN_COLUMNS)?;
828
829        // Sort by start_time for stable DFS input; tree builder assigns span_order
830        builder = builder.add_sort(vec![col(START_TIME_COL).sort(true, true)])?;
831        builder = builder.with_limit(limit)?;
832
833        let batches = builder.execute().await?;
834
835        info!(
836            "Queried {} raw spans across {} batches from Delta Lake",
837            batches.iter().map(|b| b.num_rows()).sum::<usize>(),
838            batches.len()
839        );
840
841        let flat_spans = batches_to_flat_spans(batches)?;
842        Ok(build_span_tree(flat_spans))
843    }
844
845    /// Get trace metrics over a time range, bucketed by the given interval string.
846    ///
847    /// `bucket_interval` must be a valid DataFusion `DATE_TRUNC` precision unit:
848    /// `"second"`, `"minute"`, `"hour"`, `"day"`, `"week"`, `"month"`, `"year"`.
849    ///
850    /// Matches Postgres logic: trace duration = `MAX(end_time) - MIN(start_time)` across all
851    /// spans of a trace (not per-span `duration_ms`). Root service is the service of the span
852    /// where `parent_span_id IS NULL`. Service filter applies to root spans only.
853    ///
854    /// `attribute_filters` is a list of `"key:value"` strings OR-matched against `search_blob`.
855    /// `entity_trace_ids` is an optional pre-resolved list of binary trace IDs (16 bytes each).
856    #[instrument(skip_all)]
857    pub async fn get_trace_metrics(
858        &self,
859        service_name: Option<&str>,
860        start_time: DateTime<Utc>,
861        end_time: DateTime<Utc>,
862        bucket_interval: &str,
863        attribute_filters: Option<&[String]>,
864        entity_uid: Option<&str>,
865    ) -> Result<Vec<TraceMetricBucket>, TraceEngineError> {
866        // Cache hit: return immediately without touching Delta Lake.
867        let cache_key = metrics_cache_key(
868            service_name,
869            &start_time,
870            &end_time,
871            bucket_interval,
872            attribute_filters,
873            entity_uid,
874        );
875        if let Some(cached) = self.metrics_cache.get(&cache_key) {
876            return Ok((*cached).clone());
877        }
878
879        const VALID_INTERVALS: &[&str] =
880            &["second", "minute", "hour", "day", "week", "month", "year"];
881        if !VALID_INTERVALS.contains(&bucket_interval) {
882            return Err(TraceEngineError::UnsupportedOperation(format!(
883                "Invalid bucket_interval '{}'. Must be one of: {}",
884                bucket_interval,
885                VALID_INTERVALS.join(", ")
886            )));
887        }
888
889        // ── Phase 1: Spans base DataFrame — time-first for partition + row-group pruning ──
890        let mut spans_df = self
891            .ctx
892            .table(SPAN_TABLE_NAME)
893            .await
894            .map_err(TraceEngineError::DatafusionError)?;
895
896        // Partition directory pruning — eliminates whole YYYY-MM-DD/ directories before
897        // DataFusion reads a single file's metadata or Parquet column statistics.
898        spans_df = spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start_time)))?;
899        spans_df = spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end_time)))?;
900
901        // Row-group pruning — typed Timestamp(Microsecond, UTC) literals let DataFusion
902        // use Parquet column min/max stats within the surviving partition directories.
903        spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start_time)))?;
904        spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&end_time)))?;
905
906        // ── Phase 2: Entity filter — optional INNER JOIN against summary table ──
907        //
908        // Resolves the set of matching trace IDs from the summary table (time-first),
909        // then INNER JOIN into spans.  Replaces the `entity_traces` CTE + join.
910        if let Some(uid) = entity_uid {
911            let mut entity_df = self
912                .ctx
913                .table(SUMMARY_TABLE_NAME)
914                .await
915                .map_err(TraceEngineError::DatafusionError)?;
916
917            // Summary-side time pruning (same partition-pruning principle as spans).
918            entity_df = entity_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start_time)))?;
919            entity_df = entity_df.filter(col(START_TIME_COL).lt(ts_lit(&end_time)))?;
920            entity_df = entity_df.filter(datafusion::functions_nested::expr_fn::array_has(
921                col(ENTITY_IDS_COL),
922                lit(uid),
923            ))?;
924
925            // Alias to avoid ambiguous `trace_id` column in the JOIN output schema.
926            let entity_df = entity_df
927                .select(vec![col(TRACE_ID_COL).alias("_entity_tid")])?
928                .distinct()?;
929
930            spans_df = spans_df.join(
931                entity_df,
932                JoinType::Inner,
933                &[TRACE_ID_COL],
934                &["_entity_tid"],
935                None,
936            )?;
937        }
938
939        // ── Phase 3: trace_level — aggregate per-trace ───────────────────────
940        //
941        // Replaces the `trace_level` CTE:
942        //   MIN(start_time) → trace_start
943        //   MAX(end_time)   → trace_end (NULL when all end_times are NULL)
944        //   MAX(CASE WHEN parent_span_id IS NULL THEN service_name END) → root_service
945        //   MAX(status_code) → status_code
946        //   [MAX(CAST(match_attr OR-chain AS INT64)) → attr_match]  ← single-scan attr filter
947        //
948        // CASE WHEN parent_span_id IS NULL THEN CAST(service_name AS Utf8) END:
949        // The root span is the one with no parent; MAX picks the single non-NULL value
950        // across all spans for a given trace.
951        use datafusion::functions::expr_fn::date_trunc;
952        use datafusion::functions_aggregate::expr_fn::approx_percentile_cont;
953        use datafusion::functions_aggregate::expr_fn::{avg, count, max, min};
954
955        let root_service_case = when(
956            col(PARENT_SPAN_ID_COL).is_null(),
957            df_cast(col(SERVICE_NAME_COL), DataType::Utf8),
958        )
959        .end()?;
960
961        let has_attr_filter = attribute_filters.is_some_and(|f| !f.is_empty());
962
963        let mut agg_exprs: Vec<Expr> = vec![
964            min(col(START_TIME_COL)).alias("trace_start"),
965            max(col(END_TIME_COL)).alias("trace_end"),
966            max(root_service_case).alias("root_service"),
967            max(col(STATUS_CODE_COL)).alias("status_code"),
968        ];
969
970        // Attribute filter: OR-chain match_attr() calls over search_blob, cast to Int64,
971        // then MAX to get 1 if any span in the trace matched.
972        // Single-pass: avoids a second table scan for attribute filtering.
973        // Post-aggregate .filter(attr_match = 1) replaces the SQL HAVING clause.
974        if has_attr_filter {
975            let filters = attribute_filters.unwrap();
976            let mut match_expr: Option<Expr> = None;
977            for f in filters {
978                let pattern = normalize_attr_filter(f);
979                let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
980                match_expr = Some(match match_expr {
981                    None => cond,
982                    Some(e) => e.or(cond),
983                });
984            }
985            // CAST(bool OR-chain AS INT64): true → 1, false → 0.
986            // MAX over the group gives 1 if any span matched.
987            let attr_int = df_cast(match_expr.unwrap(), DataType::Int64);
988            agg_exprs.push(max(attr_int).alias("attr_match"));
989        }
990
991        let mut trace_level_df = spans_df.aggregate(vec![col(TRACE_ID_COL)], agg_exprs)?;
992
993        // HAVING attr_match = 1 — post-aggregate filter (SQL HAVING equivalent).
994        if has_attr_filter {
995            trace_level_df = trace_level_df.filter(col("attr_match").eq(lit(1i64)))?;
996        }
997
998        // ── Phase 4: service_filtered — duration_ms, null guard, service filter ──
999        //
1000        // Replaces the `service_filtered` CTE.
1001        // Cast Timestamp(Microsecond, UTC) → Int64 gives µs since epoch;
1002        // subtracting gives trace duration in µs, dividing by 1_000 gives ms.
1003        // Computed here (after the per-trace aggregate) to avoid the DataFusion
1004        // restriction on duplicate aggregate expressions in the same SELECT.
1005        let duration_expr = (df_cast(col("trace_end"), DataType::Int64)
1006            - df_cast(col("trace_start"), DataType::Int64))
1007            / lit(1000i64);
1008
1009        let mut service_filtered_df = trace_level_df
1010            .filter(col("trace_end").is_not_null())?
1011            .with_column("duration_ms", duration_expr)?;
1012
1013        if let Some(svc) = service_name {
1014            service_filtered_df = service_filtered_df.filter(col("root_service").eq(lit(svc)))?;
1015        }
1016
1017        // ── Phase 5: DATE_TRUNC bucket ───────────────────────────────────────
1018        //
1019        // Replaces the `bucketed` CTE.
1020        // date_trunc(precision_literal, timestamp_expr) — precision is a Utf8 scalar.
1021        let bucket_expr = date_trunc(lit(bucket_interval), col("trace_start"));
1022        let bucketed_df = service_filtered_df.with_column("bucket_start", bucket_expr)?;
1023
1024        // ── Phase 6: Final bucketed aggregation ─────────────────────────────
1025        let duration_f64 = df_cast(col("duration_ms"), DataType::Float64);
1026        let error_rate_case =
1027            when(col(STATUS_CODE_COL).eq(lit(2i32)), lit(1.0f64)).otherwise(lit(0.0f64))?;
1028
1029        // approx_percentile_cont in DataFusion 52: (SortExpr, percentile, limit: Option<Expr>)
1030        // SortExpr is col.sort(asc, nulls_first); None limit = no row-count cap.
1031        let final_df = bucketed_df
1032            .aggregate(
1033                vec![col("bucket_start")],
1034                vec![
1035                    count(lit(1i64)).alias("trace_count"),
1036                    avg(duration_f64.clone()).alias("avg_duration_ms"),
1037                    approx_percentile_cont(
1038                        duration_f64.clone().sort(true, false),
1039                        lit(0.50f64),
1040                        None,
1041                    )
1042                    .alias("p50_duration_ms"),
1043                    approx_percentile_cont(
1044                        duration_f64.clone().sort(true, false),
1045                        lit(0.95f64),
1046                        None,
1047                    )
1048                    .alias("p95_duration_ms"),
1049                    approx_percentile_cont(duration_f64.sort(true, false), lit(0.99f64), None)
1050                        .alias("p99_duration_ms"),
1051                    avg(error_rate_case).alias("error_rate"),
1052                ],
1053            )?
1054            .sort(vec![col("bucket_start").sort(true, true)])?;
1055
1056        let batches = final_df
1057            .collect()
1058            .await
1059            .map_err(TraceEngineError::DatafusionError)?;
1060
1061        let mut metrics = Vec::new();
1062        for batch in &batches {
1063            let schema = batch.schema();
1064
1065            // DATE_TRUNC may return Timestamp(Nanosecond) when string literals in the WHERE
1066            // clause cause DataFusion to upcast the column. Cast explicitly to
1067            // Timestamp(Microsecond, UTC) so Arrow handles the ns→µs division correctly,
1068            // regardless of the sub-type returned by the query plan.
1069            let raw_bucket = batch.column(schema.index_of("bucket_start").unwrap());
1070            let bucket_arr = arrow::compute::cast(
1071                raw_bucket,
1072                &arrow::datatypes::DataType::Timestamp(
1073                    arrow::datatypes::TimeUnit::Microsecond,
1074                    Some("UTC".into()),
1075                ),
1076            )
1077            .map_err(|e| TraceEngineError::BatchConversion(format!("bucket_start cast: {}", e)))?;
1078            let bucket_col = bucket_arr
1079                .as_any()
1080                .downcast_ref::<TimestampMicrosecondArray>()
1081                .ok_or_else(|| TraceEngineError::BatchConversion("bucket_start".into()))?;
1082            let count_col = batch
1083                .column(schema.index_of("trace_count").unwrap())
1084                .as_any()
1085                .downcast_ref::<Int64Array>()
1086                .ok_or_else(|| TraceEngineError::BatchConversion("trace_count".into()))?;
1087            let avg_col = batch
1088                .column(schema.index_of("avg_duration_ms").unwrap())
1089                .as_any()
1090                .downcast_ref::<arrow::array::Float64Array>()
1091                .ok_or_else(|| TraceEngineError::BatchConversion("avg_duration_ms".into()))?;
1092            let p50_col = batch
1093                .column(schema.index_of("p50_duration_ms").unwrap())
1094                .as_any()
1095                .downcast_ref::<arrow::array::Float64Array>()
1096                .ok_or_else(|| TraceEngineError::BatchConversion("p50_duration_ms".into()))?;
1097            let p95_col = batch
1098                .column(schema.index_of("p95_duration_ms").unwrap())
1099                .as_any()
1100                .downcast_ref::<arrow::array::Float64Array>()
1101                .ok_or_else(|| TraceEngineError::BatchConversion("p95_duration_ms".into()))?;
1102            let p99_col = batch
1103                .column(schema.index_of("p99_duration_ms").unwrap())
1104                .as_any()
1105                .downcast_ref::<arrow::array::Float64Array>()
1106                .ok_or_else(|| TraceEngineError::BatchConversion("p99_duration_ms".into()))?;
1107            let err_col = batch
1108                .column(schema.index_of("error_rate").unwrap())
1109                .as_any()
1110                .downcast_ref::<arrow::array::Float64Array>()
1111                .ok_or_else(|| TraceEngineError::BatchConversion("error_rate".into()))?;
1112
1113            for i in 0..batch.num_rows() {
1114                let micros = bucket_col.value(i);
1115                let bucket_start = DateTime::from_timestamp_micros(micros)
1116                    .unwrap_or_default()
1117                    .with_timezone(&Utc);
1118
1119                metrics.push(TraceMetricBucket {
1120                    bucket_start,
1121                    trace_count: count_col.value(i),
1122                    avg_duration_ms: avg_col.value(i),
1123                    p50_duration_ms: if p50_col.is_null(i) {
1124                        None
1125                    } else {
1126                        Some(p50_col.value(i))
1127                    },
1128                    p95_duration_ms: if p95_col.is_null(i) {
1129                        None
1130                    } else {
1131                        Some(p95_col.value(i))
1132                    },
1133                    p99_duration_ms: if p99_col.is_null(i) {
1134                        None
1135                    } else {
1136                        Some(p99_col.value(i))
1137                    },
1138                    error_rate: err_col.value(i),
1139                });
1140            }
1141        }
1142
1143        self.metrics_cache
1144            .insert(cache_key, Arc::new(metrics.clone()));
1145        Ok(metrics)
1146    }
1147
1148    /// Look up traces from the summary table that match aggregate-level filters
1149    /// (e.g. `entity_uid`, `queue_uid`, `has_errors`) and return spans for the
1150    /// most-recent matching trace. The entire pipeline runs as a single DataFusion
1151    /// JOIN — no intermediate collection, no Postgres round-trip.
1152    pub async fn query_spans_from_trace_filters(
1153        &self,
1154        filters: &TraceFilters,
1155    ) -> Result<Vec<TraceSpan>, TraceEngineError> {
1156        // ── Phase 1: Summary filters (time-first for partition pruning) ─────
1157        let mut summary_df = self
1158            .ctx
1159            .table(SUMMARY_TABLE_NAME)
1160            .await
1161            .map_err(TraceEngineError::DatafusionError)?;
1162
1163        if let Some(start) = filters.start_time {
1164            summary_df = summary_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
1165        }
1166        if let Some(end) = filters.end_time {
1167            summary_df = summary_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
1168        }
1169        if let Some(ref svc) = filters.service_name {
1170            summary_df = summary_df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
1171        }
1172        match filters.has_errors {
1173            Some(true) => {
1174                summary_df = summary_df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
1175            }
1176            Some(false) => {
1177                summary_df = summary_df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
1178            }
1179            None => {}
1180        }
1181        if let Some(sc) = filters.status_code {
1182            summary_df = summary_df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
1183        }
1184        if let Some(ref uid) = filters.entity_uid {
1185            summary_df = summary_df.filter(datafusion::functions_nested::expr_fn::array_has(
1186                col(ENTITY_IDS_COL),
1187                lit(uid.as_str()),
1188            ))?;
1189        }
1190        if let Some(ref uid) = filters.queue_uid {
1191            summary_df = summary_df.filter(datafusion::functions_nested::expr_fn::array_has(
1192                col(QUEUE_IDS_COL),
1193                lit(uid.as_str()),
1194            ))?;
1195        }
1196
1197        // ── Phase 1b: Attribute filter join (keeps everything in DataFusion) ─
1198        if let Some(ref attr_filters) = filters.attribute_filters {
1199            if !attr_filters.is_empty() {
1200                let mut attr_df = self
1201                    .ctx
1202                    .table(SPAN_TABLE_NAME)
1203                    .await
1204                    .map_err(TraceEngineError::DatafusionError)?;
1205
1206                // Time pruning on the span side
1207                if let Some(start) = filters.start_time {
1208                    attr_df = attr_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
1209                }
1210                if let Some(end) = filters.end_time {
1211                    attr_df = attr_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
1212                }
1213
1214                // OR-match search_blob against each filter pattern via match_attr UDF.
1215                // match_attr_expr is a drop-in replacement for col(..).like(lit(pattern)):
1216                // handles Utf8View natively and uses .contains() for LIKE '%inner%' semantics.
1217                let mut attr_expr: Option<Expr> = None;
1218                for f in attr_filters {
1219                    let pattern = normalize_attr_filter(f);
1220                    let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
1221                    attr_expr = Some(match attr_expr {
1222                        None => cond,
1223                        Some(e) => e.or(cond),
1224                    });
1225                }
1226                if let Some(expr) = attr_expr {
1227                    attr_df = attr_df.filter(expr)?;
1228                }
1229
1230                // Deduplicate and alias trace_id to avoid ambiguous column in join
1231                let attr_df = attr_df
1232                    .select(vec![col(TRACE_ID_COL).alias("_attr_tid")])?
1233                    .distinct()?;
1234
1235                summary_df = summary_df.join(
1236                    attr_df,
1237                    JoinType::Inner,
1238                    &[TRACE_ID_COL],
1239                    &["_attr_tid"],
1240                    None,
1241                )?;
1242            }
1243        }
1244
1245        // ── Phase 2: Sort DESC, limit 1, project trace_id → _match_tid ──────
1246        let first_trace_df = summary_df
1247            .sort(vec![
1248                col(START_TIME_COL).sort(false, false),
1249                col(TRACE_ID_COL).sort(false, false),
1250            ])?
1251            .limit(0, Some(1))?
1252            .select(vec![col(TRACE_ID_COL).alias("_match_tid")])?;
1253
1254        // ── Phase 3: Spans DataFrame — partition + row-group pruning ─────────
1255        let mut spans_df = self
1256            .ctx
1257            .table(SPAN_TABLE_NAME)
1258            .await
1259            .map_err(TraceEngineError::DatafusionError)?;
1260
1261        if let Some(start) = filters.start_time {
1262            spans_df = spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
1263            spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
1264        }
1265        if let Some(end) = filters.end_time {
1266            spans_df = spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
1267            spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
1268        }
1269        spans_df = spans_df.select_columns(SPAN_COLUMNS)?;
1270        spans_df = spans_df.sort(vec![col(START_TIME_COL).sort(true, true)])?;
1271
1272        // ── Phase 4: Inner join — spans filtered to the single matching trace ─
1273        let result_df = spans_df.join(
1274            first_trace_df,
1275            JoinType::Inner,
1276            &[TRACE_ID_COL],
1277            &["_match_tid"],
1278            None,
1279        )?;
1280
1281        let batches = result_df
1282            .collect()
1283            .await
1284            .map_err(TraceEngineError::DatafusionError)?;
1285
1286        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
1287            return Ok(Vec::new());
1288        }
1289
1290        let flat_spans = batches_to_flat_spans(batches)?;
1291        Ok(build_span_tree(flat_spans))
1292    }
1293}