scouter_dataframe/parquet/tracing/
queries.rs1use crate::error::TraceEngineError;
2use crate::parquet::tracing::span_view::TraceSpanBatch;
3use arrow::array::RecordBatch;
4use chrono::{DateTime, Utc};
5use datafusion::logical_expr::{col, lit, SortExpr};
6use datafusion::prelude::*;
7use std::sync::Arc;
8use tracing::{error, info, instrument};
9
10pub const START_TIME_COL: &str = "start_time";
12pub const END_TIME_COL: &str = "end_time";
13pub const SERVICE_NAME_COL: &str = "service_name";
14pub const TRACE_ID_COL: &str = "trace_id";
15pub const SPAN_ID_COL: &str = "span_id";
16pub const PARENT_SPAN_ID_COL: &str = "parent_span_id";
17pub const ROOT_SPAN_ID_COL: &str = "root_span_id";
18pub const SPAN_NAME_COL: &str = "span_name";
19pub const SPAN_KIND_COL: &str = "span_kind";
20pub const DURATION_MS_COL: &str = "duration_ms";
21pub const STATUS_CODE_COL: &str = "status_code";
22pub const STATUS_MESSAGE_COL: &str = "status_message";
23pub const DEPTH_COL: &str = "depth";
24pub const SPAN_ORDER_COL: &str = "span_order";
25pub const PATH_COL: &str = "path";
26pub const ATTRIBUTES_COL: &str = "attributes";
27pub const EVENTS_COL: &str = "events";
28pub const LINKS_COL: &str = "links";
29pub const INPUT_COL: &str = "input";
30pub const OUTPUT_COL: &str = "output";
31pub const ALL_COLUMNS: [&str; 20] = [
32 TRACE_ID_COL,
33 SPAN_ID_COL,
34 PARENT_SPAN_ID_COL,
35 ROOT_SPAN_ID_COL,
36 SERVICE_NAME_COL,
37 SPAN_NAME_COL,
38 SPAN_KIND_COL,
39 START_TIME_COL,
40 END_TIME_COL,
41 DURATION_MS_COL,
42 STATUS_CODE_COL,
43 STATUS_MESSAGE_COL,
44 DEPTH_COL,
45 SPAN_ORDER_COL,
46 PATH_COL,
47 ATTRIBUTES_COL,
48 EVENTS_COL,
49 LINKS_COL,
50 INPUT_COL,
51 OUTPUT_COL,
52];
53pub const SPAN_TABLE_NAME: &str = "trace_spans";
54
55struct TraceQueryBuilder {
56 df: DataFrame,
57}
58
59impl TraceQueryBuilder {
60 async fn set_table(
61 ctx: Arc<SessionContext>,
62 table_name: &str,
63 ) -> Result<Self, TraceEngineError> {
64 let df = ctx
65 .table(table_name)
66 .await
67 .inspect_err(|e| error!("Failed to load table {}: {}", table_name, e))?;
68 Ok(Self { df })
69 }
70
71 fn select_columns(mut self, columns: &[&str]) -> Result<Self, TraceEngineError> {
72 self.df = self.df.select_columns(columns)?;
73 Ok(self)
74 }
75
76 fn add_filter(mut self, expr: Expr) -> Result<Self, TraceEngineError> {
77 self.df = self.df.filter(expr)?;
78 Ok(self)
79 }
80
81 fn add_sort(mut self, sort: Vec<SortExpr>) -> Result<Self, TraceEngineError> {
82 self.df = self.df.sort(sort)?;
83 Ok(self)
84 }
85
86 fn with_limit(mut self, n: Option<usize>) -> Result<Self, TraceEngineError> {
87 self.df = self.df.limit(0, n)?;
88 Ok(self)
89 }
90
91 async fn execute(self) -> Result<Vec<RecordBatch>, TraceEngineError> {
92 let batches = self
93 .df
94 .collect()
95 .await
96 .inspect_err(|e| error!("Failed to collect query results: {}", e))?;
97 Ok(batches)
98 }
99}
100pub struct TraceQueries {
109 ctx: Arc<SessionContext>,
110}
111
112impl TraceQueries {
113 pub fn new(ctx: Arc<SessionContext>) -> Self {
114 Self { ctx }
115 }
116
117 #[instrument(skip_all)]
133 pub async fn get_trace_spans(
134 &self,
135 trace_id_bytes: Option<&[u8]>,
136 service_name: Option<&str>,
137 start_time: Option<&DateTime<Utc>>,
138 end_time: Option<&DateTime<Utc>>,
139 limit: Option<usize>,
140 ) -> Result<Vec<TraceSpanBatch>, TraceEngineError> {
141 let mut builder = TraceQueryBuilder::set_table(self.ctx.clone(), SPAN_TABLE_NAME).await?;
142 builder = builder.select_columns(&ALL_COLUMNS)?;
143
144 let mut time_filter: Option<Expr> = None;
146 if let Some(start) = start_time {
147 let filter = col(START_TIME_COL).gt_eq(lit(start.to_rfc3339()));
148 time_filter = Some(match time_filter {
149 Some(existing) => existing.and(filter),
150 None => filter,
151 });
152 }
153
154 if let Some(end) = end_time {
155 let filter = col(START_TIME_COL).lt(lit(end.to_rfc3339()));
156 time_filter = Some(match time_filter {
157 Some(existing) => existing.and(filter),
158 None => filter,
159 });
160 }
161
162 if let Some(filter) = time_filter {
163 builder = builder.add_filter(filter)?;
164 }
165
166 if let Some(trace_id_bytes) = trace_id_bytes {
167 builder = builder.add_filter(col(TRACE_ID_COL).eq(lit(trace_id_bytes)))?;
168 }
169
170 if let Some(service) = service_name {
171 builder = builder.add_filter(col(SERVICE_NAME_COL).eq(lit(service)))?;
172 }
173
174 let sort = col(SPAN_ORDER_COL).sort(true, true);
175 builder = builder.add_sort(vec![sort])?;
176 builder = builder.with_limit(limit)?;
177
178 let batches = builder.execute().await?;
179
180 info!(
181 "Queried {} spans across {} batches",
182 batches.iter().map(|b| b.num_rows()).sum::<usize>(),
183 batches.len()
184 );
185
186 batches_to_span_views(batches)
187 }
188
189 pub async fn get_trace_tree_batches(
193 &self,
194 trace_id_bytes: &[u8],
195 start_time_hint: Option<(&str, &str)>,
196 ) -> Result<Vec<RecordBatch>, TraceEngineError> {
197 let mut df = self.ctx.table("trace_spans").await?;
199
200 if let Some((min, max)) = start_time_hint {
202 df = df.filter(
203 col("start_time")
204 .gt_eq(lit(min))
205 .and(col("start_time").lt(lit(max))),
206 )?;
207 }
208
209 df = df.filter(col("trace_id").eq(lit(trace_id_bytes)))?;
210
211 df = df.select_columns(&[
213 "span_id",
214 "parent_span_id",
215 "root_span_id",
216 "service_name",
217 "span_name",
218 "span_kind",
219 "start_time",
220 "end_time",
221 "duration_ms",
222 "status_code",
223 "status_message",
224 "depth",
225 "span_order",
226 "path",
227 "attributes",
228 ])?;
229
230 df = df.sort(vec![col("span_order").sort(true, true)])?;
232
233 let batches = df.collect().await?;
234 Ok(batches)
235 }
236}
237
238fn batches_to_span_views(
243 batches: Vec<RecordBatch>,
244) -> Result<Vec<TraceSpanBatch>, TraceEngineError> {
245 batches
246 .iter()
247 .map(|batch| {
248 TraceSpanBatch::from_record_batch(batch)
249 .map_err(|e| TraceEngineError::BatchConversion(format!("Arrow error: {}", e)))
250 })
251 .collect()
252}