Skip to main content

scouter_dataframe/parquet/tracing/
queries.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::span_view::TraceSpanBatch;
3use arrow::array::RecordBatch;
4use chrono::{DateTime, Utc};
5use datafusion::logical_expr::{col, lit, SortExpr};
6use datafusion::prelude::*;
7use std::sync::Arc;
8use tracing::{error, info, instrument};
9
10// common columns
11pub const START_TIME_COL: &str = "start_time";
12pub const END_TIME_COL: &str = "end_time";
13pub const SERVICE_NAME_COL: &str = "service_name";
14pub const TRACE_ID_COL: &str = "trace_id";
15pub const SPAN_ID_COL: &str = "span_id";
16pub const PARENT_SPAN_ID_COL: &str = "parent_span_id";
17pub const ROOT_SPAN_ID_COL: &str = "root_span_id";
18pub const SPAN_NAME_COL: &str = "span_name";
19pub const SPAN_KIND_COL: &str = "span_kind";
20pub const DURATION_MS_COL: &str = "duration_ms";
21pub const STATUS_CODE_COL: &str = "status_code";
22pub const STATUS_MESSAGE_COL: &str = "status_message";
23pub const DEPTH_COL: &str = "depth";
24pub const SPAN_ORDER_COL: &str = "span_order";
25pub const PATH_COL: &str = "path";
26pub const ATTRIBUTES_COL: &str = "attributes";
27pub const EVENTS_COL: &str = "events";
28pub const LINKS_COL: &str = "links";
29pub const INPUT_COL: &str = "input";
30pub const OUTPUT_COL: &str = "output";
31pub const ALL_COLUMNS: [&str; 20] = [
32    TRACE_ID_COL,
33    SPAN_ID_COL,
34    PARENT_SPAN_ID_COL,
35    ROOT_SPAN_ID_COL,
36    SERVICE_NAME_COL,
37    SPAN_NAME_COL,
38    SPAN_KIND_COL,
39    START_TIME_COL,
40    END_TIME_COL,
41    DURATION_MS_COL,
42    STATUS_CODE_COL,
43    STATUS_MESSAGE_COL,
44    DEPTH_COL,
45    SPAN_ORDER_COL,
46    PATH_COL,
47    ATTRIBUTES_COL,
48    EVENTS_COL,
49    LINKS_COL,
50    INPUT_COL,
51    OUTPUT_COL,
52];
53pub const SPAN_TABLE_NAME: &str = "trace_spans";
54
55struct TraceQueryBuilder {
56    df: DataFrame,
57}
58
59impl TraceQueryBuilder {
60    async fn set_table(
61        ctx: Arc<SessionContext>,
62        table_name: &str,
63    ) -> Result<Self, TraceEngineError> {
64        let df = ctx
65            .table(table_name)
66            .await
67            .inspect_err(|e| error!("Failed to load table {}: {}", table_name, e))?;
68        Ok(Self { df })
69    }
70
71    fn select_columns(mut self, columns: &[&str]) -> Result<Self, TraceEngineError> {
72        self.df = self.df.select_columns(columns)?;
73        Ok(self)
74    }
75
76    fn add_filter(mut self, expr: Expr) -> Result<Self, TraceEngineError> {
77        self.df = self.df.filter(expr)?;
78        Ok(self)
79    }
80
81    fn add_sort(mut self, sort: Vec<SortExpr>) -> Result<Self, TraceEngineError> {
82        self.df = self.df.sort(sort)?;
83        Ok(self)
84    }
85
86    fn with_limit(mut self, n: Option<usize>) -> Result<Self, TraceEngineError> {
87        self.df = self.df.limit(0, n)?;
88        Ok(self)
89    }
90
91    async fn execute(self) -> Result<Vec<RecordBatch>, TraceEngineError> {
92        let batches = self
93            .df
94            .collect()
95            .await
96            .inspect_err(|e| error!("Failed to collect query results: {}", e))?;
97        Ok(batches)
98    }
99}
100/// High-performance query patterns for Delta Lake trace storage
101///
102/// Design principles:
103/// 1. Time-based filters FIRST (enables partition pruning)
104/// 2. Binary ID comparisons (no hex decoding overhead)
105/// 3. Dictionary-encoded column filters (service_name, span_kind)
106/// 4. Minimize data scanned via projection pushdown
107/// 5. Leverage pre-computed fields (depth, span_order, path, root_span_id)
108pub struct TraceQueries {
109    ctx: Arc<SessionContext>,
110}
111
112impl TraceQueries {
113    pub fn new(ctx: Arc<SessionContext>) -> Self {
114        Self { ctx }
115    }
116
117    /// Get all spans for a trace_id (direct equivalent of Postgres scouter.get_trace_spans)
118    ///
119    /// This is MUCH simpler than the Postgres version because:
120    /// - No recursive CTE needed (depth/path/root_span_id pre-computed)
121    /// - Already sorted by span_order
122    /// - Direct binary comparison on trace_id
123    ///
124    /// # Arguments
125    /// * `trace_id_bytes` - 16-byte trace ID (no hex encoding needed)
126    /// * `service_name` - Optional service name filter
127    /// * `start_time_hint` - Optional time range hint for partition pruning
128    ///
129    /// # Performance
130    /// - Without time hint: Scans all Delta Lake files (still fast due to min/max stats)
131    /// - With time hint: 100-1000x faster via partition pruning
132    #[instrument(skip_all)]
133    pub async fn get_trace_spans(
134        &self,
135        trace_id_bytes: Option<&[u8]>,
136        service_name: Option<&str>,
137        start_time: Option<&DateTime<Utc>>,
138        end_time: Option<&DateTime<Utc>>,
139        limit: Option<usize>,
140    ) -> Result<Vec<TraceSpanBatch>, TraceEngineError> {
141        let mut builder = TraceQueryBuilder::set_table(self.ctx.clone(), SPAN_TABLE_NAME).await?;
142        builder = builder.select_columns(&ALL_COLUMNS)?;
143
144        // Add time filters first to enable partition pruning
145        let mut time_filter: Option<Expr> = None;
146        if let Some(start) = start_time {
147            let filter = col(START_TIME_COL).gt_eq(lit(start.to_rfc3339()));
148            time_filter = Some(match time_filter {
149                Some(existing) => existing.and(filter),
150                None => filter,
151            });
152        }
153
154        if let Some(end) = end_time {
155            let filter = col(START_TIME_COL).lt(lit(end.to_rfc3339()));
156            time_filter = Some(match time_filter {
157                Some(existing) => existing.and(filter),
158                None => filter,
159            });
160        }
161
162        if let Some(filter) = time_filter {
163            builder = builder.add_filter(filter)?;
164        }
165
166        if let Some(trace_id_bytes) = trace_id_bytes {
167            builder = builder.add_filter(col(TRACE_ID_COL).eq(lit(trace_id_bytes)))?;
168        }
169
170        if let Some(service) = service_name {
171            builder = builder.add_filter(col(SERVICE_NAME_COL).eq(lit(service)))?;
172        }
173
174        let sort = col(SPAN_ORDER_COL).sort(true, true);
175        builder = builder.add_sort(vec![sort])?;
176        builder = builder.with_limit(limit)?;
177
178        let batches = builder.execute().await?;
179
180        info!(
181            "Queried {} spans across {} batches",
182            batches.iter().map(|b| b.num_rows()).sum::<usize>(),
183            batches.len()
184        );
185
186        batches_to_span_views(batches)
187    }
188
189    /// Get trace tree structure (same as get_trace_spans but returns raw RecordBatches)
190    ///
191    /// Use this when you need Arrow-native processing without conversion overhead
192    pub async fn get_trace_tree_batches(
193        &self,
194        trace_id_bytes: &[u8],
195        start_time_hint: Option<(&str, &str)>,
196    ) -> Result<Vec<RecordBatch>, TraceEngineError> {
197        // Start with the base table
198        let mut df = self.ctx.table("trace_spans").await?;
199
200        // Apply time filter first (enables partition pruning)
201        if let Some((min, max)) = start_time_hint {
202            df = df.filter(
203                col("start_time")
204                    .gt_eq(lit(min))
205                    .and(col("start_time").lt(lit(max))),
206            )?;
207        }
208
209        df = df.filter(col("trace_id").eq(lit(trace_id_bytes)))?;
210
211        // Select only needed columns for tree structure
212        df = df.select_columns(&[
213            "span_id",
214            "parent_span_id",
215            "root_span_id",
216            "service_name",
217            "span_name",
218            "span_kind",
219            "start_time",
220            "end_time",
221            "duration_ms",
222            "status_code",
223            "status_message",
224            "depth",
225            "span_order",
226            "path",
227            "attributes",
228        ])?;
229
230        // Order by span_order
231        df = df.sort(vec![col("span_order").sort(true, true)])?;
232
233        let batches = df.collect().await?;
234        Ok(batches)
235    }
236}
237
238/// Convert RecordBatches to zero-copy TraceSpanBatch views
239///
240/// This creates Arc-backed views with NO allocations!
241/// Allocations only happen during serialization (hex encoding, JSON stringify)
242fn batches_to_span_views(
243    batches: Vec<RecordBatch>,
244) -> Result<Vec<TraceSpanBatch>, TraceEngineError> {
245    batches
246        .iter()
247        .map(|batch| {
248            TraceSpanBatch::from_record_batch(batch)
249                .map_err(|e| TraceEngineError::BatchConversion(format!("Arrow error: {}", e)))
250        })
251        .collect()
252}