1use crate::error::TraceEngineError;
2use crate::parquet::utils::match_attr_expr;
3use arrow::array::RecordBatch;
4use arrow::array::{
5 BinaryArray, Int32Array, Int64Array, ListArray, MapArray, StringArray,
6 TimestampMicrosecondArray,
7};
8use arrow::compute::cast;
9use arrow::datatypes::DataType;
10use arrow_array::Array;
11use chrono::{DateTime, Datelike, TimeZone, Utc};
12use datafusion::common::JoinType;
13use datafusion::logical_expr::{cast as df_cast, col, lit, when, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use mini_moka::sync::Cache;
17use scouter_types::sql::{TraceFilters, TraceMetricBucket, TraceSpan};
18use scouter_types::{Attribute, SpanEvent, SpanId, SpanLink, TraceId};
19use std::collections::HashMap;
20use std::hash::{Hash, Hasher};
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{error, info, instrument};
24
25const UNIX_EPOCH_DAYS: i32 = 719_163;
27
28#[inline]
33pub(crate) fn ts_lit(dt: &DateTime<Utc>) -> Expr {
34 lit(ScalarValue::TimestampMicrosecond(
35 Some(dt.timestamp_micros()),
36 Some("UTC".into()),
37 ))
38}
39
40#[inline]
45pub(crate) fn date_lit(dt: &DateTime<Utc>) -> Expr {
46 let days = dt.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
47 lit(ScalarValue::Date32(Some(days)))
48}
49
50pub const START_TIME_COL: &str = "start_time";
52pub const PARTITION_DATE_COL: &str = "partition_date";
53pub const END_TIME_COL: &str = "end_time";
54pub const SERVICE_NAME_COL: &str = "service_name";
55pub const TRACE_ID_COL: &str = "trace_id";
56pub const SPAN_ID_COL: &str = "span_id";
57pub const PARENT_SPAN_ID_COL: &str = "parent_span_id";
58pub const SPAN_NAME_COL: &str = "span_name";
59pub const SPAN_KIND_COL: &str = "span_kind";
60pub const DURATION_MS_COL: &str = "duration_ms";
61pub const STATUS_CODE_COL: &str = "status_code";
62pub const STATUS_MESSAGE_COL: &str = "status_message";
63pub const ATTRIBUTES_COL: &str = "attributes";
64pub const EVENTS_COL: &str = "events";
65pub const LINKS_COL: &str = "links";
66pub const INPUT_COL: &str = "input";
67pub const OUTPUT_COL: &str = "output";
68pub const SEARCH_BLOB_COL: &str = "search_blob";
69pub const ENTITY_ID_COL: &str = "entity_id";
70pub const SPAN_TABLE_NAME: &str = "trace_spans";
71
72const SUMMARY_TABLE_NAME: &str = "trace_summaries";
73const ERROR_COUNT_COL: &str = "error_count";
74const ENTITY_IDS_COL: &str = "entity_ids";
75const QUEUE_IDS_COL: &str = "queue_ids";
76
77const SPAN_COLUMNS: &[&str] = &[
79 TRACE_ID_COL,
80 SPAN_ID_COL,
81 PARENT_SPAN_ID_COL,
82 SERVICE_NAME_COL,
83 SPAN_NAME_COL,
84 SPAN_KIND_COL,
85 START_TIME_COL,
86 END_TIME_COL,
87 DURATION_MS_COL,
88 STATUS_CODE_COL,
89 STATUS_MESSAGE_COL,
90 ATTRIBUTES_COL,
91 EVENTS_COL,
92 LINKS_COL,
93 INPUT_COL,
94 OUTPUT_COL,
95];
96
97struct FlatSpan {
100 trace_id: [u8; 16],
101 span_id: [u8; 8],
102 parent_span_id: Option<[u8; 8]>,
103 service_name: String,
104 span_name: String,
105 span_kind: Option<String>,
106 start_time: DateTime<Utc>,
107 end_time: DateTime<Utc>,
108 duration_ms: i64,
109 status_code: i32,
110 status_message: Option<String>,
111 attributes: Vec<Attribute>,
112 events: Vec<SpanEvent>,
113 links: Vec<SpanLink>,
114 input: Option<serde_json::Value>,
115 output: Option<serde_json::Value>,
116}
117
118struct TraceQueryBuilder {
119 df: DataFrame,
120}
121
122impl TraceQueryBuilder {
123 async fn set_table(
124 ctx: Arc<SessionContext>,
125 table_name: &str,
126 ) -> Result<Self, TraceEngineError> {
127 let df = ctx
128 .table(table_name)
129 .await
130 .inspect_err(|e| error!("Failed to load table {}: {}", table_name, e))?;
131 Ok(Self { df })
132 }
133
134 fn select_columns(mut self, columns: &[&str]) -> Result<Self, TraceEngineError> {
135 self.df = self.df.select_columns(columns)?;
136 Ok(self)
137 }
138
139 fn add_filter(mut self, expr: Expr) -> Result<Self, TraceEngineError> {
140 self.df = self.df.filter(expr)?;
141 Ok(self)
142 }
143
144 fn add_sort(mut self, sort: Vec<SortExpr>) -> Result<Self, TraceEngineError> {
145 self.df = self.df.sort(sort)?;
146 Ok(self)
147 }
148
149 fn with_limit(mut self, n: Option<usize>) -> Result<Self, TraceEngineError> {
150 self.df = self.df.limit(0, n)?;
151 Ok(self)
152 }
153
154 async fn execute(self) -> Result<Vec<RecordBatch>, TraceEngineError> {
155 let batches = self
156 .df
157 .collect()
158 .await
159 .inspect_err(|e| error!("Failed to collect query results: {}", e))?;
160 Ok(batches)
161 }
162}
163
164fn extract_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
166 if map_array.is_null(row_idx) {
167 return Vec::new();
168 }
169 let entry = map_array.value(row_idx);
170 let struct_array = entry
171 .as_any()
172 .downcast_ref::<arrow::array::StructArray>()
173 .unwrap();
174 let keys_arr = cast(struct_array.column(0).as_ref(), &DataType::Utf8).unwrap();
175 let keys = keys_arr.as_any().downcast_ref::<StringArray>().unwrap();
176 let values_arr = cast(struct_array.column(1).as_ref(), &DataType::Utf8).unwrap();
177 let values = values_arr.as_any().downcast_ref::<StringArray>().unwrap();
178
179 (0..struct_array.len())
180 .map(|i| Attribute {
181 key: keys.value(i).to_string(),
182 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
183 })
184 .collect()
185}
186
187fn extract_events(list_array: &ListArray, row_idx: usize) -> Vec<SpanEvent> {
189 if list_array.is_null(row_idx) {
190 return Vec::new();
191 }
192 let values = list_array.value(row_idx);
193 let struct_array = values
194 .as_any()
195 .downcast_ref::<arrow::array::StructArray>()
196 .unwrap();
197
198 let names_arr = cast(
199 struct_array
200 .column_by_name("name")
201 .expect("event name col")
202 .as_ref(),
203 &DataType::Utf8,
204 )
205 .expect("event name cast");
206 let names = names_arr
207 .as_any()
208 .downcast_ref::<StringArray>()
209 .expect("event name StringArray");
210 let timestamps = struct_array
211 .column_by_name("timestamp")
212 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
213 .expect("event timestamp should be TimestampMicrosecondArray");
214 let attrs = struct_array
215 .column_by_name("attributes")
216 .and_then(|c| c.as_any().downcast_ref::<MapArray>())
217 .expect("event attributes should be MapArray");
218 let dropped = struct_array
220 .column_by_name("dropped_attributes_count")
221 .and_then(|c| c.as_any().downcast_ref::<arrow::array::Int32Array>())
222 .expect("dropped_attributes_count should be Int32Array");
223
224 (0..struct_array.len())
225 .map(|i| {
226 let micros = timestamps.value(i);
227 let secs = micros / 1_000_000;
228 let nanos = ((micros % 1_000_000) * 1_000) as u32;
229 let ts = Utc.timestamp_opt(secs, nanos).unwrap();
230 SpanEvent {
231 name: names.value(i).to_string(),
232 timestamp: ts,
233 attributes: extract_attributes(attrs, i),
234 dropped_attributes_count: dropped.value(i) as u32,
235 }
236 })
237 .collect()
238}
239
240fn extract_links(list_array: &ListArray, row_idx: usize) -> Vec<SpanLink> {
242 if list_array.is_null(row_idx) {
243 return Vec::new();
244 }
245 let values = list_array.value(row_idx);
246 let struct_array = values
247 .as_any()
248 .downcast_ref::<arrow::array::StructArray>()
249 .unwrap();
250
251 let trace_id_arr = cast(
253 struct_array
254 .column_by_name("trace_id")
255 .expect("link trace_id col")
256 .as_ref(),
257 &DataType::Binary,
258 )
259 .expect("link trace_id cast");
260 let trace_ids = trace_id_arr
261 .as_any()
262 .downcast_ref::<BinaryArray>()
263 .expect("link trace_id BinaryArray");
264 let span_id_arr = cast(
265 struct_array
266 .column_by_name("span_id")
267 .expect("link span_id col")
268 .as_ref(),
269 &DataType::Binary,
270 )
271 .expect("link span_id cast");
272 let span_ids = span_id_arr
273 .as_any()
274 .downcast_ref::<BinaryArray>()
275 .expect("link span_id BinaryArray");
276 let trace_state_arr = cast(
277 struct_array
278 .column_by_name("trace_state")
279 .expect("link trace_state col")
280 .as_ref(),
281 &DataType::Utf8,
282 )
283 .expect("link trace_state cast");
284 let trace_states = trace_state_arr
285 .as_any()
286 .downcast_ref::<StringArray>()
287 .expect("link trace_state StringArray");
288 let attrs = struct_array
289 .column_by_name("attributes")
290 .and_then(|c| c.as_any().downcast_ref::<MapArray>())
291 .expect("link attributes should be MapArray");
292 let dropped = struct_array
294 .column_by_name("dropped_attributes_count")
295 .and_then(|c| c.as_any().downcast_ref::<arrow::array::Int32Array>())
296 .expect("link dropped_attributes_count should be Int32Array");
297
298 (0..struct_array.len())
299 .map(|i| {
300 let tid_bytes: [u8; 16] = trace_ids.value(i).try_into().expect("16 bytes");
301 let sid_bytes: [u8; 8] = span_ids.value(i).try_into().expect("8 bytes");
302 SpanLink {
303 trace_id: TraceId::from_bytes(tid_bytes).to_hex(),
304 span_id: SpanId::from_bytes(sid_bytes).to_hex(),
305 trace_state: trace_states.value(i).to_string(),
306 attributes: extract_attributes(attrs, i),
307 dropped_attributes_count: dropped.value(i) as u32,
308 }
309 })
310 .collect()
311}
312
313fn batches_to_flat_spans(batches: Vec<RecordBatch>) -> Result<Vec<FlatSpan>, TraceEngineError> {
315 let mut result = Vec::new();
316
317 for batch in &batches {
318 let schema = batch.schema();
319
320 macro_rules! col_idx {
321 ($name:expr) => {
322 schema.index_of($name).map_err(|_| {
323 TraceEngineError::BatchConversion(format!("Missing column: {}", $name))
324 })?
325 };
326 }
327
328 let trace_id_arr = cast(
332 batch.column(col_idx!("trace_id")).as_ref(),
333 &DataType::Binary,
334 )
335 .map_err(|e| TraceEngineError::BatchConversion(format!("trace_id cast: {e}")))?;
336 let trace_id_col = trace_id_arr
337 .as_any()
338 .downcast_ref::<BinaryArray>()
339 .ok_or_else(|| TraceEngineError::BatchConversion("trace_id not BinaryArray".into()))?;
340
341 let span_id_arr = cast(
342 batch.column(col_idx!("span_id")).as_ref(),
343 &DataType::Binary,
344 )
345 .map_err(|e| TraceEngineError::BatchConversion(format!("span_id cast: {e}")))?;
346 let span_id_col = span_id_arr
347 .as_any()
348 .downcast_ref::<BinaryArray>()
349 .ok_or_else(|| TraceEngineError::BatchConversion("span_id not BinaryArray".into()))?;
350
351 let parent_id_arr = cast(
352 batch.column(col_idx!("parent_span_id")).as_ref(),
353 &DataType::Binary,
354 )
355 .map_err(|e| TraceEngineError::BatchConversion(format!("parent_span_id cast: {e}")))?;
356 let parent_id_col = parent_id_arr
357 .as_any()
358 .downcast_ref::<BinaryArray>()
359 .ok_or_else(|| {
360 TraceEngineError::BatchConversion("parent_span_id not BinaryArray".into())
361 })?;
362 let svc_arr = cast(
365 batch.column(col_idx!("service_name")).as_ref(),
366 &DataType::Utf8,
367 )
368 .map_err(|e| TraceEngineError::BatchConversion(format!("service_name cast: {e}")))?;
369 let svc_col = svc_arr
370 .as_any()
371 .downcast_ref::<StringArray>()
372 .ok_or_else(|| {
373 TraceEngineError::BatchConversion("service_name not StringArray".into())
374 })?;
375 let span_name_arr = cast(
376 batch.column(col_idx!("span_name")).as_ref(),
377 &DataType::Utf8,
378 )
379 .map_err(|e| TraceEngineError::BatchConversion(format!("span_name cast: {e}")))?;
380 let span_name_col = span_name_arr
381 .as_any()
382 .downcast_ref::<StringArray>()
383 .ok_or_else(|| TraceEngineError::BatchConversion("span_name not StringArray".into()))?;
384 let span_kind_arr = cast(
385 batch.column(col_idx!("span_kind")).as_ref(),
386 &DataType::Utf8,
387 )
388 .map_err(|e| TraceEngineError::BatchConversion(format!("span_kind cast: {e}")))?;
389 let span_kind_col = span_kind_arr
390 .as_any()
391 .downcast_ref::<StringArray>()
392 .ok_or_else(|| TraceEngineError::BatchConversion("span_kind not StringArray".into()))?;
393 let start_col = batch
394 .column(col_idx!("start_time"))
395 .as_any()
396 .downcast_ref::<TimestampMicrosecondArray>()
397 .ok_or_else(|| TraceEngineError::BatchConversion("start_time not Timestamp".into()))?;
398 let end_col = batch
399 .column(col_idx!("end_time"))
400 .as_any()
401 .downcast_ref::<TimestampMicrosecondArray>()
402 .ok_or_else(|| TraceEngineError::BatchConversion("end_time not Timestamp".into()))?;
403 let dur_col = batch
404 .column(col_idx!("duration_ms"))
405 .as_any()
406 .downcast_ref::<Int64Array>()
407 .ok_or_else(|| TraceEngineError::BatchConversion("duration_ms not Int64".into()))?;
408 let sc_col = batch
409 .column(col_idx!("status_code"))
410 .as_any()
411 .downcast_ref::<Int32Array>()
412 .ok_or_else(|| TraceEngineError::BatchConversion("status_code not Int32".into()))?;
413 let sm_arr = cast(
414 batch.column(col_idx!("status_message")).as_ref(),
415 &DataType::Utf8,
416 )
417 .map_err(|e| TraceEngineError::BatchConversion(format!("status_message cast: {e}")))?;
418 let sm_col = sm_arr
419 .as_any()
420 .downcast_ref::<StringArray>()
421 .ok_or_else(|| {
422 TraceEngineError::BatchConversion("status_message not StringArray".into())
423 })?;
424 let attrs_col = batch
425 .column(col_idx!("attributes"))
426 .as_any()
427 .downcast_ref::<MapArray>()
428 .ok_or_else(|| TraceEngineError::BatchConversion("attributes not MapArray".into()))?;
429 let events_col = batch
430 .column(col_idx!("events"))
431 .as_any()
432 .downcast_ref::<ListArray>()
433 .ok_or_else(|| TraceEngineError::BatchConversion("events not ListArray".into()))?;
434 let links_col = batch
435 .column(col_idx!("links"))
436 .as_any()
437 .downcast_ref::<ListArray>()
438 .ok_or_else(|| TraceEngineError::BatchConversion("links not ListArray".into()))?;
439 let input_arr = cast(batch.column(col_idx!("input")).as_ref(), &DataType::Utf8)
441 .map_err(|e| TraceEngineError::BatchConversion(format!("input cast: {e}")))?;
442 let input_col = input_arr
443 .as_any()
444 .downcast_ref::<StringArray>()
445 .ok_or_else(|| TraceEngineError::BatchConversion("input not StringArray".into()))?;
446 let output_arr = cast(batch.column(col_idx!("output")).as_ref(), &DataType::Utf8)
447 .map_err(|e| TraceEngineError::BatchConversion(format!("output cast: {e}")))?;
448 let output_col = output_arr
449 .as_any()
450 .downcast_ref::<StringArray>()
451 .ok_or_else(|| TraceEngineError::BatchConversion("output not StringArray".into()))?;
452
453 for i in 0..batch.num_rows() {
454 let tid_bytes: [u8; 16] = trace_id_col
455 .value(i)
456 .try_into()
457 .map_err(|_| TraceEngineError::BatchConversion("trace_id bad length".into()))?;
458 let sid_bytes: [u8; 8] = span_id_col
459 .value(i)
460 .try_into()
461 .map_err(|_| TraceEngineError::BatchConversion("span_id bad length".into()))?;
462
463 let parent_id = if parent_id_col.is_null(i) {
464 None
465 } else {
466 let bytes: [u8; 8] = parent_id_col.value(i).try_into().map_err(|_| {
467 TraceEngineError::BatchConversion("parent_span_id bad length".into())
468 })?;
469 Some(bytes)
470 };
471
472 let micros_start = start_col.value(i);
473 let start_time = Utc
474 .timestamp_opt(
475 micros_start / 1_000_000,
476 ((micros_start % 1_000_000) * 1_000) as u32,
477 )
478 .unwrap();
479 let micros_end = end_col.value(i);
480 let end_time = Utc
481 .timestamp_opt(
482 micros_end / 1_000_000,
483 ((micros_end % 1_000_000) * 1_000) as u32,
484 )
485 .unwrap();
486
487 let input = if input_col.is_null(i) {
488 None
489 } else {
490 serde_json::from_str(input_col.value(i)).ok()
491 };
492 let output = if output_col.is_null(i) {
493 None
494 } else {
495 serde_json::from_str(output_col.value(i)).ok()
496 };
497
498 result.push(FlatSpan {
499 trace_id: tid_bytes,
500 span_id: sid_bytes,
501 parent_span_id: parent_id,
502 service_name: svc_col.value(i).to_string(),
503 span_name: span_name_col.value(i).to_string(),
504 span_kind: if span_kind_col.is_null(i) {
505 None
506 } else {
507 Some(span_kind_col.value(i).to_string())
508 },
509 start_time,
510 end_time,
511 duration_ms: dur_col.value(i),
512 status_code: sc_col.value(i),
513 status_message: if sm_col.is_null(i) {
514 None
515 } else {
516 Some(sm_col.value(i).to_string())
517 },
518 attributes: extract_attributes(attrs_col, i),
519 events: extract_events(events_col, i),
520 links: extract_links(links_col, i),
521 input,
522 output,
523 });
524 }
525 }
526
527 Ok(result)
528}
529
530fn build_span_tree(spans: Vec<FlatSpan>) -> Vec<TraceSpan> {
535 if spans.is_empty() {
536 return Vec::new();
537 }
538
539 let root_span_id_hex = spans
541 .iter()
542 .find(|s| s.parent_span_id.is_none())
543 .map(|s| SpanId::from_bytes(s.span_id).to_hex())
544 .unwrap_or_else(|| {
545 SpanId::from_bytes(spans[0].span_id).to_hex()
547 });
548
549 let mut children: HashMap<[u8; 8], Vec<usize>> = HashMap::new();
551 let mut root_indices: Vec<usize> = Vec::new();
552
553 for (i, span) in spans.iter().enumerate() {
554 if let Some(pid) = span.parent_span_id {
555 children.entry(pid).or_default().push(i);
556 } else {
557 root_indices.push(i);
558 }
559 }
560
561 root_indices.sort_by_key(|&i| spans[i].start_time);
563
564 let mut result: Vec<TraceSpan> = Vec::with_capacity(spans.len());
565 let mut span_order: i32 = 0;
566
567 dfs_assign(
568 &root_indices,
569 &spans,
570 &children,
571 0,
572 Vec::new(),
573 &root_span_id_hex,
574 &mut result,
575 &mut span_order,
576 );
577
578 let visited: std::collections::HashSet<[u8; 8]> = result
580 .iter()
581 .filter_map(|s| {
582 let v = SpanId::hex_to_bytes(&s.span_id).ok()?;
583 let arr: [u8; 8] = v.try_into().ok()?;
584 Some(arr)
585 })
586 .collect();
587
588 for span in spans.iter() {
589 if !visited.contains(&span.span_id) {
590 let span_id_hex = SpanId::from_bytes(span.span_id).to_hex();
591 result.push(flat_to_trace_span(
592 span,
593 &span_id_hex,
594 &root_span_id_hex,
595 0,
596 vec![span_id_hex.clone()],
597 span_order,
598 ));
599 span_order += 1;
600 }
601 }
602
603 result
604}
605
606#[allow(clippy::too_many_arguments)]
607fn dfs_assign(
608 indices: &[usize],
609 spans: &[FlatSpan],
610 children: &HashMap<[u8; 8], Vec<usize>>,
611 depth: i32,
612 path_so_far: Vec<String>,
613 root_span_id_hex: &str,
614 result: &mut Vec<TraceSpan>,
615 span_order: &mut i32,
616) {
617 for &idx in indices {
618 let span = &spans[idx];
619 let span_id_hex = SpanId::from_bytes(span.span_id).to_hex();
620
621 let mut path = path_so_far.clone();
622 path.push(span_id_hex.clone());
623
624 result.push(flat_to_trace_span(
625 span,
626 &span_id_hex,
627 root_span_id_hex,
628 depth,
629 path.clone(),
630 *span_order,
631 ));
632 *span_order += 1;
633
634 if let Some(child_indices) = children.get(&span.span_id) {
636 let mut sorted = child_indices.clone();
637 sorted.sort_by_key(|&i| spans[i].start_time);
638 dfs_assign(
639 &sorted,
640 spans,
641 children,
642 depth + 1,
643 path,
644 root_span_id_hex,
645 result,
646 span_order,
647 );
648 }
649 }
650}
651
652fn flat_to_trace_span(
653 span: &FlatSpan,
654 span_id_hex: &str,
655 root_span_id_hex: &str,
656 depth: i32,
657 path: Vec<String>,
658 span_order: i32,
659) -> TraceSpan {
660 TraceSpan {
661 trace_id: TraceId::from_bytes(span.trace_id).to_hex(),
662 span_id: span_id_hex.to_string(),
663 parent_span_id: span.parent_span_id.map(|b| SpanId::from_bytes(b).to_hex()),
664 span_name: span.span_name.clone(),
665 span_kind: span.span_kind.clone(),
666 start_time: span.start_time,
667 end_time: span.end_time,
668 duration_ms: span.duration_ms,
669 status_code: span.status_code,
670 status_message: span.status_message.clone(),
671 attributes: span.attributes.clone(),
672 events: span.events.clone(),
673 links: span.links.clone(),
674 depth,
675 path,
676 root_span_id: root_span_id_hex.to_string(),
677 service_name: span.service_name.clone(),
678 span_order,
679 input: span.input.clone(),
680 output: span.output.clone(),
681 }
682}
683
684pub(crate) fn normalize_attr_filter(filter: &str) -> String {
691 let normalized = match filter.find(':') {
692 Some(pos) if !filter[pos..].starts_with("://") => {
693 format!("{}={}", &filter[..pos], &filter[pos + 1..])
694 }
695 _ => filter.to_string(),
696 };
697 format!("%{}%", normalized)
698}
699
700pub struct TraceQueries {
706 ctx: Arc<SessionContext>,
707 span_cache: Cache<[u8; 16], Arc<Vec<TraceSpan>>>,
709 metrics_cache: Cache<u64, Arc<Vec<TraceMetricBucket>>>,
712}
713
714fn metrics_cache_key(
716 service_name: Option<&str>,
717 start_time: &DateTime<Utc>,
718 end_time: &DateTime<Utc>,
719 bucket_interval: &str,
720 attribute_filters: Option<&[String]>,
721 entity_uid: Option<&str>,
722) -> u64 {
723 let mut h = std::collections::hash_map::DefaultHasher::new();
724 service_name.hash(&mut h);
725 start_time.timestamp_micros().hash(&mut h);
726 end_time.timestamp_micros().hash(&mut h);
727 bucket_interval.hash(&mut h);
728 attribute_filters.hash(&mut h);
729 entity_uid.hash(&mut h);
730 h.finish()
731}
732
733impl TraceQueries {
734 pub fn new(ctx: Arc<SessionContext>) -> Self {
735 let span_cache = Cache::builder()
736 .max_capacity(1_000)
737 .time_to_live(Duration::from_secs(300))
738 .build();
739 let metrics_cache = Cache::builder()
740 .max_capacity(500)
741 .time_to_live(Duration::from_secs(60))
742 .build();
743 Self {
744 ctx,
745 span_cache,
746 metrics_cache,
747 }
748 }
749
750 #[instrument(skip_all)]
762 pub async fn get_trace_spans(
763 &self,
764 trace_id_bytes: Option<&[u8]>,
765 service_name: Option<&str>,
766 start_time: Option<&DateTime<Utc>>,
767 end_time: Option<&DateTime<Utc>>,
768 limit: Option<usize>,
769 ) -> Result<Vec<TraceSpan>, TraceEngineError> {
770 if let Some(tid) = trace_id_bytes {
772 if let Ok(key) = <[u8; 16]>::try_from(tid) {
773 if let Some(cached) = self.span_cache.get(&key) {
774 return Ok((*cached).clone());
775 }
776
777 let result = self
778 .query_spans(Some(tid), service_name, start_time, end_time, limit)
779 .await?;
780 self.span_cache.insert(key, Arc::new(result.clone()));
781 return Ok(result);
782 }
783 }
784
785 self.query_spans(trace_id_bytes, service_name, start_time, end_time, limit)
787 .await
788 }
789
790 pub async fn query_spans(
792 &self,
793 trace_id_bytes: Option<&[u8]>,
794 service_name: Option<&str>,
795 start_time: Option<&DateTime<Utc>>,
796 end_time: Option<&DateTime<Utc>>,
797 limit: Option<usize>,
798 ) -> Result<Vec<TraceSpan>, TraceEngineError> {
799 let mut builder = TraceQueryBuilder::set_table(self.ctx.clone(), SPAN_TABLE_NAME).await?;
800
801 if let Some(start) = start_time {
804 builder = builder.add_filter(col(PARTITION_DATE_COL).gt_eq(date_lit(start)))?;
805 }
806 if let Some(end) = end_time {
807 builder = builder.add_filter(col(PARTITION_DATE_COL).lt_eq(date_lit(end)))?;
808 }
809
810 if let Some(start) = start_time {
813 builder = builder.add_filter(col(START_TIME_COL).gt_eq(ts_lit(start)))?;
814 }
815 if let Some(end) = end_time {
816 builder = builder.add_filter(col(START_TIME_COL).lt(ts_lit(end)))?;
817 }
818
819 if let Some(tid) = trace_id_bytes {
820 builder = builder.add_filter(col(TRACE_ID_COL).eq(lit(tid)))?;
821 }
822
823 if let Some(svc) = service_name {
824 builder = builder.add_filter(col(SERVICE_NAME_COL).eq(lit(svc)))?;
825 }
826
827 builder = builder.select_columns(SPAN_COLUMNS)?;
828
829 builder = builder.add_sort(vec![col(START_TIME_COL).sort(true, true)])?;
831 builder = builder.with_limit(limit)?;
832
833 let batches = builder.execute().await?;
834
835 info!(
836 "Queried {} raw spans across {} batches from Delta Lake",
837 batches.iter().map(|b| b.num_rows()).sum::<usize>(),
838 batches.len()
839 );
840
841 let flat_spans = batches_to_flat_spans(batches)?;
842 Ok(build_span_tree(flat_spans))
843 }
844
845 #[instrument(skip_all)]
857 pub async fn get_trace_metrics(
858 &self,
859 service_name: Option<&str>,
860 start_time: DateTime<Utc>,
861 end_time: DateTime<Utc>,
862 bucket_interval: &str,
863 attribute_filters: Option<&[String]>,
864 entity_uid: Option<&str>,
865 ) -> Result<Vec<TraceMetricBucket>, TraceEngineError> {
866 let cache_key = metrics_cache_key(
868 service_name,
869 &start_time,
870 &end_time,
871 bucket_interval,
872 attribute_filters,
873 entity_uid,
874 );
875 if let Some(cached) = self.metrics_cache.get(&cache_key) {
876 return Ok((*cached).clone());
877 }
878
879 const VALID_INTERVALS: &[&str] =
880 &["second", "minute", "hour", "day", "week", "month", "year"];
881 if !VALID_INTERVALS.contains(&bucket_interval) {
882 return Err(TraceEngineError::UnsupportedOperation(format!(
883 "Invalid bucket_interval '{}'. Must be one of: {}",
884 bucket_interval,
885 VALID_INTERVALS.join(", ")
886 )));
887 }
888
889 let mut spans_df = self
891 .ctx
892 .table(SPAN_TABLE_NAME)
893 .await
894 .map_err(TraceEngineError::DatafusionError)?;
895
896 spans_df = spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start_time)))?;
899 spans_df = spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end_time)))?;
900
901 spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start_time)))?;
904 spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&end_time)))?;
905
906 if let Some(uid) = entity_uid {
911 let mut entity_df = self
912 .ctx
913 .table(SUMMARY_TABLE_NAME)
914 .await
915 .map_err(TraceEngineError::DatafusionError)?;
916
917 entity_df = entity_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start_time)))?;
919 entity_df = entity_df.filter(col(START_TIME_COL).lt(ts_lit(&end_time)))?;
920 entity_df = entity_df.filter(datafusion::functions_nested::expr_fn::array_has(
921 col(ENTITY_IDS_COL),
922 lit(uid),
923 ))?;
924
925 let entity_df = entity_df
927 .select(vec![col(TRACE_ID_COL).alias("_entity_tid")])?
928 .distinct()?;
929
930 spans_df = spans_df.join(
931 entity_df,
932 JoinType::Inner,
933 &[TRACE_ID_COL],
934 &["_entity_tid"],
935 None,
936 )?;
937 }
938
939 use datafusion::functions::expr_fn::date_trunc;
952 use datafusion::functions_aggregate::expr_fn::approx_percentile_cont;
953 use datafusion::functions_aggregate::expr_fn::{avg, count, max, min};
954
955 let root_service_case = when(
956 col(PARENT_SPAN_ID_COL).is_null(),
957 df_cast(col(SERVICE_NAME_COL), DataType::Utf8),
958 )
959 .end()?;
960
961 let has_attr_filter = attribute_filters.is_some_and(|f| !f.is_empty());
962
963 let mut agg_exprs: Vec<Expr> = vec![
964 min(col(START_TIME_COL)).alias("trace_start"),
965 max(col(END_TIME_COL)).alias("trace_end"),
966 max(root_service_case).alias("root_service"),
967 max(col(STATUS_CODE_COL)).alias("status_code"),
968 ];
969
970 if has_attr_filter {
975 let filters = attribute_filters.unwrap();
976 let mut match_expr: Option<Expr> = None;
977 for f in filters {
978 let pattern = normalize_attr_filter(f);
979 let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
980 match_expr = Some(match match_expr {
981 None => cond,
982 Some(e) => e.or(cond),
983 });
984 }
985 let attr_int = df_cast(match_expr.unwrap(), DataType::Int64);
988 agg_exprs.push(max(attr_int).alias("attr_match"));
989 }
990
991 let mut trace_level_df = spans_df.aggregate(vec![col(TRACE_ID_COL)], agg_exprs)?;
992
993 if has_attr_filter {
995 trace_level_df = trace_level_df.filter(col("attr_match").eq(lit(1i64)))?;
996 }
997
998 let duration_expr = (df_cast(col("trace_end"), DataType::Int64)
1006 - df_cast(col("trace_start"), DataType::Int64))
1007 / lit(1000i64);
1008
1009 let mut service_filtered_df = trace_level_df
1010 .filter(col("trace_end").is_not_null())?
1011 .with_column("duration_ms", duration_expr)?;
1012
1013 if let Some(svc) = service_name {
1014 service_filtered_df = service_filtered_df.filter(col("root_service").eq(lit(svc)))?;
1015 }
1016
1017 let bucket_expr = date_trunc(lit(bucket_interval), col("trace_start"));
1022 let bucketed_df = service_filtered_df.with_column("bucket_start", bucket_expr)?;
1023
1024 let duration_f64 = df_cast(col("duration_ms"), DataType::Float64);
1026 let error_rate_case =
1027 when(col(STATUS_CODE_COL).eq(lit(2i32)), lit(1.0f64)).otherwise(lit(0.0f64))?;
1028
1029 let final_df = bucketed_df
1032 .aggregate(
1033 vec![col("bucket_start")],
1034 vec![
1035 count(lit(1i64)).alias("trace_count"),
1036 avg(duration_f64.clone()).alias("avg_duration_ms"),
1037 approx_percentile_cont(
1038 duration_f64.clone().sort(true, false),
1039 lit(0.50f64),
1040 None,
1041 )
1042 .alias("p50_duration_ms"),
1043 approx_percentile_cont(
1044 duration_f64.clone().sort(true, false),
1045 lit(0.95f64),
1046 None,
1047 )
1048 .alias("p95_duration_ms"),
1049 approx_percentile_cont(duration_f64.sort(true, false), lit(0.99f64), None)
1050 .alias("p99_duration_ms"),
1051 avg(error_rate_case).alias("error_rate"),
1052 ],
1053 )?
1054 .sort(vec![col("bucket_start").sort(true, true)])?;
1055
1056 let batches = final_df
1057 .collect()
1058 .await
1059 .map_err(TraceEngineError::DatafusionError)?;
1060
1061 let mut metrics = Vec::new();
1062 for batch in &batches {
1063 let schema = batch.schema();
1064
1065 let raw_bucket = batch.column(schema.index_of("bucket_start").unwrap());
1070 let bucket_arr = arrow::compute::cast(
1071 raw_bucket,
1072 &arrow::datatypes::DataType::Timestamp(
1073 arrow::datatypes::TimeUnit::Microsecond,
1074 Some("UTC".into()),
1075 ),
1076 )
1077 .map_err(|e| TraceEngineError::BatchConversion(format!("bucket_start cast: {}", e)))?;
1078 let bucket_col = bucket_arr
1079 .as_any()
1080 .downcast_ref::<TimestampMicrosecondArray>()
1081 .ok_or_else(|| TraceEngineError::BatchConversion("bucket_start".into()))?;
1082 let count_col = batch
1083 .column(schema.index_of("trace_count").unwrap())
1084 .as_any()
1085 .downcast_ref::<Int64Array>()
1086 .ok_or_else(|| TraceEngineError::BatchConversion("trace_count".into()))?;
1087 let avg_col = batch
1088 .column(schema.index_of("avg_duration_ms").unwrap())
1089 .as_any()
1090 .downcast_ref::<arrow::array::Float64Array>()
1091 .ok_or_else(|| TraceEngineError::BatchConversion("avg_duration_ms".into()))?;
1092 let p50_col = batch
1093 .column(schema.index_of("p50_duration_ms").unwrap())
1094 .as_any()
1095 .downcast_ref::<arrow::array::Float64Array>()
1096 .ok_or_else(|| TraceEngineError::BatchConversion("p50_duration_ms".into()))?;
1097 let p95_col = batch
1098 .column(schema.index_of("p95_duration_ms").unwrap())
1099 .as_any()
1100 .downcast_ref::<arrow::array::Float64Array>()
1101 .ok_or_else(|| TraceEngineError::BatchConversion("p95_duration_ms".into()))?;
1102 let p99_col = batch
1103 .column(schema.index_of("p99_duration_ms").unwrap())
1104 .as_any()
1105 .downcast_ref::<arrow::array::Float64Array>()
1106 .ok_or_else(|| TraceEngineError::BatchConversion("p99_duration_ms".into()))?;
1107 let err_col = batch
1108 .column(schema.index_of("error_rate").unwrap())
1109 .as_any()
1110 .downcast_ref::<arrow::array::Float64Array>()
1111 .ok_or_else(|| TraceEngineError::BatchConversion("error_rate".into()))?;
1112
1113 for i in 0..batch.num_rows() {
1114 let micros = bucket_col.value(i);
1115 let bucket_start = DateTime::from_timestamp_micros(micros)
1116 .unwrap_or_default()
1117 .with_timezone(&Utc);
1118
1119 metrics.push(TraceMetricBucket {
1120 bucket_start,
1121 trace_count: count_col.value(i),
1122 avg_duration_ms: avg_col.value(i),
1123 p50_duration_ms: if p50_col.is_null(i) {
1124 None
1125 } else {
1126 Some(p50_col.value(i))
1127 },
1128 p95_duration_ms: if p95_col.is_null(i) {
1129 None
1130 } else {
1131 Some(p95_col.value(i))
1132 },
1133 p99_duration_ms: if p99_col.is_null(i) {
1134 None
1135 } else {
1136 Some(p99_col.value(i))
1137 },
1138 error_rate: err_col.value(i),
1139 });
1140 }
1141 }
1142
1143 self.metrics_cache
1144 .insert(cache_key, Arc::new(metrics.clone()));
1145 Ok(metrics)
1146 }
1147
1148 pub async fn query_spans_from_trace_filters(
1153 &self,
1154 filters: &TraceFilters,
1155 ) -> Result<Vec<TraceSpan>, TraceEngineError> {
1156 let mut summary_df = self
1158 .ctx
1159 .table(SUMMARY_TABLE_NAME)
1160 .await
1161 .map_err(TraceEngineError::DatafusionError)?;
1162
1163 if let Some(start) = filters.start_time {
1164 summary_df = summary_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
1165 }
1166 if let Some(end) = filters.end_time {
1167 summary_df = summary_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
1168 }
1169 if let Some(ref svc) = filters.service_name {
1170 summary_df = summary_df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
1171 }
1172 match filters.has_errors {
1173 Some(true) => {
1174 summary_df = summary_df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
1175 }
1176 Some(false) => {
1177 summary_df = summary_df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
1178 }
1179 None => {}
1180 }
1181 if let Some(sc) = filters.status_code {
1182 summary_df = summary_df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
1183 }
1184 if let Some(ref uid) = filters.entity_uid {
1185 summary_df = summary_df.filter(datafusion::functions_nested::expr_fn::array_has(
1186 col(ENTITY_IDS_COL),
1187 lit(uid.as_str()),
1188 ))?;
1189 }
1190 if let Some(ref uid) = filters.queue_uid {
1191 summary_df = summary_df.filter(datafusion::functions_nested::expr_fn::array_has(
1192 col(QUEUE_IDS_COL),
1193 lit(uid.as_str()),
1194 ))?;
1195 }
1196
1197 if let Some(ref attr_filters) = filters.attribute_filters {
1199 if !attr_filters.is_empty() {
1200 let mut attr_df = self
1201 .ctx
1202 .table(SPAN_TABLE_NAME)
1203 .await
1204 .map_err(TraceEngineError::DatafusionError)?;
1205
1206 if let Some(start) = filters.start_time {
1208 attr_df = attr_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
1209 }
1210 if let Some(end) = filters.end_time {
1211 attr_df = attr_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
1212 }
1213
1214 let mut attr_expr: Option<Expr> = None;
1218 for f in attr_filters {
1219 let pattern = normalize_attr_filter(f);
1220 let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
1221 attr_expr = Some(match attr_expr {
1222 None => cond,
1223 Some(e) => e.or(cond),
1224 });
1225 }
1226 if let Some(expr) = attr_expr {
1227 attr_df = attr_df.filter(expr)?;
1228 }
1229
1230 let attr_df = attr_df
1232 .select(vec![col(TRACE_ID_COL).alias("_attr_tid")])?
1233 .distinct()?;
1234
1235 summary_df = summary_df.join(
1236 attr_df,
1237 JoinType::Inner,
1238 &[TRACE_ID_COL],
1239 &["_attr_tid"],
1240 None,
1241 )?;
1242 }
1243 }
1244
1245 let first_trace_df = summary_df
1247 .sort(vec![
1248 col(START_TIME_COL).sort(false, false),
1249 col(TRACE_ID_COL).sort(false, false),
1250 ])?
1251 .limit(0, Some(1))?
1252 .select(vec![col(TRACE_ID_COL).alias("_match_tid")])?;
1253
1254 let mut spans_df = self
1256 .ctx
1257 .table(SPAN_TABLE_NAME)
1258 .await
1259 .map_err(TraceEngineError::DatafusionError)?;
1260
1261 if let Some(start) = filters.start_time {
1262 spans_df = spans_df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
1263 spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
1264 }
1265 if let Some(end) = filters.end_time {
1266 spans_df = spans_df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
1267 spans_df = spans_df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
1268 }
1269 spans_df = spans_df.select_columns(SPAN_COLUMNS)?;
1270 spans_df = spans_df.sort(vec![col(START_TIME_COL).sort(true, true)])?;
1271
1272 let result_df = spans_df.join(
1274 first_trace_df,
1275 JoinType::Inner,
1276 &[TRACE_ID_COL],
1277 &["_match_tid"],
1278 None,
1279 )?;
1280
1281 let batches = result_df
1282 .collect()
1283 .await
1284 .map_err(TraceEngineError::DatafusionError)?;
1285
1286 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
1287 return Ok(Vec::new());
1288 }
1289
1290 let flat_spans = batches_to_flat_spans(batches)?;
1291 Ok(build_span_tree(flat_spans))
1292 }
1293}