scouter_dataframe/parquet/tracing/
span_view.rs1use arrow::array::*;
8use chrono::{DateTime, TimeZone, Utc};
9use scouter_types::{Attribute, SpanId, TraceId};
10use scouter_types::{SpanEvent, SpanLink};
11use serde::Serialize;
12use std::sync::Arc;
13use tracing::{error, instrument};
14
15pub fn extract_attributes_from_map(
16 array: &StructArray,
17 idx: usize,
18 column_name: &str,
19) -> Vec<Attribute> {
20 let attr_col = array.column_by_name(column_name);
21
22 if attr_col.is_none() {
23 return Vec::new();
24 }
25
26 let map_array = attr_col
27 .and_then(|col| col.as_any().downcast_ref::<MapArray>())
28 .expect("attributes should be MapArray");
29
30 if map_array.is_null(idx) {
31 return Vec::new();
32 }
33
34 let struct_array = map_array.value(idx);
35 let keys = struct_array.column(0).as_string::<i32>();
36 let values = struct_array.column(1).as_string::<i32>();
37
38 (0..struct_array.len())
39 .map(|i| Attribute {
40 key: keys.value(i).to_string(),
41 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
42 })
43 .collect()
44}
45
46#[derive(Clone)]
56pub struct TraceSpanBatch {
57 trace_ids: Arc<BinaryArray>, span_ids: Arc<BinaryArray>,
60 parent_span_ids: Arc<BinaryArray>,
61 root_span_ids: Arc<BinaryArray>,
62 span_names: Arc<StringArray>,
63 service_names: Arc<StringArray>,
64 span_kinds: Arc<StringArray>,
65 start_times: Arc<TimestampMicrosecondArray>,
66 end_times: Arc<TimestampMicrosecondArray>,
67 durations: Arc<Int64Array>,
68 status_codes: Arc<Int32Array>,
69 status_messages: Arc<StringArray>,
70 depths: Arc<Int32Array>,
71 span_orders: Arc<Int32Array>,
72 paths: Arc<ListArray>,
73 attributes: Arc<MapArray>,
74 events: Arc<ListArray>,
75 links: Arc<ListArray>,
76 inputs: Arc<StringArray>,
77 outputs: Arc<StringArray>,
78
79 len: usize,
81}
82
83impl TraceSpanBatch {
84 #[instrument(skip_all)]
88 pub fn from_record_batch(batch: &RecordBatch) -> Result<Self, arrow::error::ArrowError> {
89 let schema = batch.schema();
90
91 macro_rules! get_col {
93 ($name:expr, $type:ty) => {{
94 let idx = schema.index_of($name).inspect_err(|_| {
95 error!("Column '{}' not found in batch schema", $name);
96 })?;
97 let array = batch.column(idx);
98 Arc::new(
99 array
100 .as_any()
101 .downcast_ref::<$type>()
102 .ok_or_else(|| {
103 error!(
104 "Column {} is not of expected type {}",
105 $name,
106 std::any::type_name::<$type>()
107 );
108 arrow::error::ArrowError::CastError(format!(
109 "Column {} is not {}",
110 $name,
111 std::any::type_name::<$type>()
112 ))
113 })?
114 .clone(),
115 )
116 }};
117 }
118
119 Ok(TraceSpanBatch {
120 trace_ids: get_col!("trace_id", BinaryArray),
121 span_ids: get_col!("span_id", BinaryArray),
122 parent_span_ids: get_col!("parent_span_id", BinaryArray),
123 root_span_ids: get_col!("root_span_id", BinaryArray),
124 span_names: get_col!("span_name", StringArray),
125 service_names: get_col!("service_name", StringArray),
126 span_kinds: get_col!("span_kind", StringArray),
127 start_times: get_col!("start_time", TimestampMicrosecondArray),
128 end_times: get_col!("end_time", TimestampMicrosecondArray),
129 durations: get_col!("duration_ms", Int64Array),
130 status_codes: get_col!("status_code", Int32Array),
131 status_messages: get_col!("status_message", StringArray),
132 depths: get_col!("depth", Int32Array),
133 span_orders: get_col!("span_order", Int32Array),
134 paths: get_col!("path", ListArray),
135 attributes: get_col!("attributes", MapArray),
136 events: get_col!("events", ListArray),
137 links: get_col!("links", ListArray),
138 inputs: get_col!("input", StringArray),
139 outputs: get_col!("output", StringArray),
140 len: batch.num_rows(),
141 })
142 }
143
144 pub fn len(&self) -> usize {
146 self.len
147 }
148
149 pub fn is_empty(&self) -> bool {
150 self.len == 0
151 }
152
153 pub fn get(&self, idx: usize) -> Option<TraceSpanView<'_>> {
155 if idx >= self.len {
156 return None;
157 }
158
159 Some(TraceSpanView { batch: self, idx })
160 }
161
162 pub fn iter(&self) -> TraceSpanIterator<'_> {
164 TraceSpanIterator {
165 batch: self,
166 idx: 0,
167 }
168 }
169}
170
171#[derive(Clone, Copy)]
176pub struct TraceSpanView<'a> {
177 batch: &'a TraceSpanBatch,
178 idx: usize,
179}
180
181impl<'a> TraceSpanView<'a> {
182 pub fn trace_id_bytes(&self) -> &[u8; 16] {
184 let bytes = self.batch.trace_ids.value(self.idx);
185 bytes.try_into().expect("Trace ID should be 16 bytes")
186 }
187
188 pub fn trace_id_hex(&self) -> String {
190 TraceId::from_bytes(*self.trace_id_bytes()).to_hex()
191 }
192
193 pub fn span_id_bytes(&self) -> &[u8; 8] {
195 let bytes = self.batch.span_ids.value(self.idx);
196 bytes.try_into().expect("Span ID should be 8 bytes")
197 }
198
199 pub fn span_id_hex(&self) -> String {
201 SpanId::from_bytes(*self.span_id_bytes()).to_hex()
202 }
203
204 pub fn parent_span_id_bytes(&self) -> Option<&[u8; 8]> {
206 if self.batch.parent_span_ids.is_null(self.idx) {
207 None
208 } else {
209 let bytes = self.batch.parent_span_ids.value(self.idx);
210 Some(bytes.try_into().expect("Parent Span ID should be 8 bytes"))
211 }
212 }
213
214 pub fn parent_span_id_hex(&self) -> Option<String> {
216 self.parent_span_id_bytes()
217 .map(|bytes| SpanId::from_bytes(*bytes).to_hex())
218 }
219
220 pub fn root_span_id_bytes(&self) -> &[u8; 8] {
222 let bytes = self.batch.root_span_ids.value(self.idx);
223 bytes.try_into().expect("Root Span ID should be 8 bytes")
224 }
225
226 pub fn root_span_id_hex(&self) -> String {
228 SpanId::from_bytes(*self.root_span_id_bytes()).to_hex()
229 }
230
231 pub fn span_name(&self) -> &str {
233 self.batch.span_names.value(self.idx)
234 }
235
236 pub fn service_name(&self) -> &str {
238 self.batch.service_names.value(self.idx)
239 }
240
241 pub fn span_kind(&self) -> Option<&str> {
243 if self.batch.span_kinds.is_null(self.idx) {
244 None
245 } else {
246 Some(self.batch.span_kinds.value(self.idx))
247 }
248 }
249
250 pub fn start_time(&self) -> DateTime<Utc> {
252 let micros = self.batch.start_times.value(self.idx);
253 let secs = micros / 1_000_000;
254 let nanos = ((micros % 1_000_000) * 1_000) as u32;
255 Utc.timestamp_opt(secs, nanos).unwrap()
256 }
257
258 pub fn end_time(&self) -> DateTime<Utc> {
260 let micros = self.batch.end_times.value(self.idx);
261 let secs = micros / 1_000_000;
262 let nanos = ((micros % 1_000_000) * 1_000) as u32;
263 Utc.timestamp_opt(secs, nanos).unwrap()
264 }
265
266 pub fn duration_ms(&self) -> i64 {
268 self.batch.durations.value(self.idx)
269 }
270
271 pub fn status_code(&self) -> i32 {
273 self.batch.status_codes.value(self.idx)
274 }
275
276 pub fn status_message(&self) -> Option<&str> {
278 if self.batch.status_messages.is_null(self.idx) {
279 None
280 } else {
281 Some(self.batch.status_messages.value(self.idx))
282 }
283 }
284
285 pub fn depth(&self) -> i32 {
287 self.batch.depths.value(self.idx)
288 }
289
290 pub fn span_order(&self) -> i32 {
292 self.batch.span_orders.value(self.idx)
293 }
294
295 pub fn path_iter(&self) -> impl Iterator<Item = &'a str> {
297 PathIterator::new(self.batch, self.idx)
298 }
299
300 pub fn input_json(&self) -> Option<&str> {
302 if self.batch.inputs.is_null(self.idx) {
303 None
304 } else {
305 Some(self.batch.inputs.value(self.idx))
306 }
307 }
308
309 pub fn output_json(&self) -> Option<&str> {
311 if self.batch.outputs.is_null(self.idx) {
312 None
313 } else {
314 Some(self.batch.outputs.value(self.idx))
315 }
316 }
317
318 pub fn attributes(&self) -> Vec<Attribute> {
320 if self.batch.attributes.is_null(self.idx) {
321 return Vec::new();
322 }
323
324 let struct_array = self.batch.attributes.value(self.idx);
325 let keys = struct_array.column(0).as_string::<i32>();
326 let values = struct_array.column(1).as_string::<i32>();
327 (0..struct_array.len())
328 .map(|i| Attribute {
329 key: keys.value(i).to_string(),
330 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
331 })
332 .collect()
333 }
334
335 pub fn events(&self) -> Vec<SpanEvent> {
337 if self.batch.events.is_null(self.idx) {
338 return Vec::new();
339 }
340
341 let array = self.batch.events.value(self.idx);
342 let event_list = array.as_struct();
343
344 (0..event_list.len())
345 .map(|i| SpanEventView::new(event_list, i).into_event())
346 .collect()
347 }
348
349 pub fn links(&self) -> Vec<SpanLink> {
351 if self.batch.links.is_null(self.idx) {
352 return Vec::new();
353 }
354
355 let link_list = self.batch.links.value(self.idx);
356 let struct_array = link_list.as_struct();
357
358 (0..struct_array.len())
359 .map(|i| SpanLinkView::new(struct_array, i).into_link())
360 .collect()
361 }
362}
363
364impl<'a> Serialize for TraceSpanView<'a> {
367 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
368 where
369 S: serde::Serializer,
370 {
371 use serde::ser::SerializeStruct;
372
373 let mut state = serializer.serialize_struct("TraceSpan", 19)?;
374
375 state.serialize_field("trace_id", &self.trace_id_hex())?;
377 state.serialize_field("span_id", &self.span_id_hex())?;
378 state.serialize_field("parent_span_id", &self.parent_span_id_hex())?;
379 state.serialize_field("root_span_id", &self.root_span_id_hex())?;
380
381 state.serialize_field("span_name", self.span_name())?;
383 state.serialize_field("service_name", self.service_name())?;
384 state.serialize_field("span_kind", &self.span_kind())?;
385
386 state.serialize_field("start_time", &self.start_time())?;
388 state.serialize_field("end_time", &self.end_time())?;
389 state.serialize_field("duration_ms", &self.duration_ms())?;
390
391 state.serialize_field("status_code", &self.status_code())?;
393 state.serialize_field("status_message", &self.status_message())?;
394
395 state.serialize_field("depth", &self.depth())?;
397 state.serialize_field("span_order", &self.span_order())?;
398
399 state.serialize_field("path", &self.path_iter().collect::<Vec<_>>())?;
401
402 state.serialize_field("input", &self.input_json())?;
404 state.serialize_field("output", &self.output_json())?;
405
406 state.serialize_field("attributes", &self.attributes())?;
407 state.serialize_field("events", &self.events())?;
408 state.serialize_field("links", &self.links())?;
409
410 state.end()
411 }
412}
413
414pub struct TraceSpanIterator<'a> {
416 batch: &'a TraceSpanBatch,
417 idx: usize,
418}
419
420impl<'a> Iterator for TraceSpanIterator<'a> {
421 type Item = TraceSpanView<'a>;
422
423 fn next(&mut self) -> Option<Self::Item> {
424 if self.idx >= self.batch.len() {
425 return None;
426 }
427
428 let view = TraceSpanView {
429 batch: self.batch,
430 idx: self.idx,
431 };
432
433 self.idx += 1;
434 Some(view)
435 }
436
437 fn size_hint(&self) -> (usize, Option<usize>) {
438 let remaining = self.batch.len() - self.idx;
439 (remaining, Some(remaining))
440 }
441}
442
443impl<'a> ExactSizeIterator for TraceSpanIterator<'a> {}
444
445enum PathIterator<'a> {
451 Empty,
452 NonEmpty {
453 batch: &'a TraceSpanBatch,
454 span_idx: usize,
455 path_idx: usize,
456 path_len: usize,
457 },
458}
459
460impl<'a> PathIterator<'a> {
461 fn new(batch: &'a TraceSpanBatch, span_idx: usize) -> Self {
462 if batch.paths.is_null(span_idx) {
464 return PathIterator::Empty;
465 }
466
467 let path_len = batch.paths.value_length(span_idx) as usize;
469
470 PathIterator::NonEmpty {
471 batch,
472 span_idx,
473 path_idx: 0,
474 path_len,
475 }
476 }
477}
478
479impl<'a> Iterator for PathIterator<'a> {
480 type Item = &'a str;
481
482 fn next(&mut self) -> Option<Self::Item> {
483 match self {
484 PathIterator::Empty => None,
485 PathIterator::NonEmpty {
486 batch,
487 span_idx,
488 path_idx,
489 path_len,
490 } => {
491 if *path_idx >= *path_len {
492 return None;
493 }
494
495 let offset = batch.paths.value_offsets()[*span_idx] as usize;
497
498 let string_array = batch
500 .paths
501 .values()
502 .as_any()
503 .downcast_ref::<StringArray>()
504 .expect("Path values should be StringArray");
505
506 let actual_idx = offset + *path_idx;
508
509 let value = string_array.value(actual_idx);
514 *path_idx += 1;
515
516 Some(value)
517 }
518 }
519 }
520}
521
522pub struct SpanEventView<'a> {
524 array: &'a StructArray,
525 idx: usize,
526}
527
528impl<'a> SpanEventView<'a> {
529 fn new(array: &'a StructArray, idx: usize) -> Self {
530 Self { array, idx }
531 }
532
533 pub fn timestamp(&self) -> DateTime<Utc> {
534 let timestamp_array = self
535 .array
536 .column_by_name("timestamp")
537 .and_then(|col| col.as_any().downcast_ref::<TimestampMicrosecondArray>())
538 .expect("timestamp should be TimestampMicrosecondArray");
539
540 let micros = timestamp_array.value(self.idx);
541 let secs = micros / 1_000_000;
542 let nanos = ((micros % 1_000_000) * 1_000) as u32;
543 Utc.timestamp_opt(secs, nanos).unwrap()
544 }
545
546 pub fn name(&self) -> &str {
547 let name_array = self
548 .array
549 .column_by_name("name")
550 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
551 .expect("name should be StringArray");
552 name_array.value(self.idx)
553 }
554
555 pub fn attributes(&self) -> Vec<Attribute> {
556 extract_attributes_from_map(self.array, self.idx, "attributes")
557 }
558
559 pub fn dropped_attributes_count(&self) -> u32 {
560 let count_array = self
561 .array
562 .column_by_name("dropped_attributes_count")
563 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
564 .expect("dropped_attributes_count should be UInt32Array");
565 count_array.value(self.idx)
566 }
567
568 fn into_event(self) -> SpanEvent {
569 SpanEvent {
570 timestamp: self.timestamp(),
571 name: self.name().to_string(),
572 attributes: self.attributes(),
573 dropped_attributes_count: self.dropped_attributes_count(),
574 }
575 }
576}
577
578pub struct SpanLinkView<'a> {
580 array: &'a StructArray,
581 idx: usize,
582}
583
584impl<'a> SpanLinkView<'a> {
585 fn new(array: &'a StructArray, idx: usize) -> Self {
586 Self { array, idx }
587 }
588
589 pub fn trace_id(&self) -> String {
590 let trace_id_array = self
591 .array
592 .column_by_name("trace_id")
593 .map(|col| col.as_fixed_size_binary())
594 .expect("trace_id should be FixedSizeBinaryArray");
595
596 let bytes = trace_id_array.value(self.idx);
597 let bytes_array: [u8; 16] = bytes.try_into().expect("trace_id should be 16 bytes");
598 TraceId::from_bytes(bytes_array).to_hex()
599 }
600
601 pub fn span_id(&self) -> String {
602 let span_id_array = self
603 .array
604 .column_by_name("span_id")
605 .map(|col| col.as_fixed_size_binary())
606 .expect("span_id should be FixedSizeBinaryArray");
607
608 let bytes = span_id_array.value(self.idx);
609 let bytes_array: [u8; 8] = bytes.try_into().expect("span_id should be 8 bytes");
610 SpanId::from_bytes(bytes_array).to_hex()
611 }
612
613 pub fn trace_state(&self) -> &str {
614 let trace_state_array = self
615 .array
616 .column_by_name("trace_state")
617 .map(|col| col.as_string::<i32>())
618 .expect("trace_state should be StringArray");
619
620 if trace_state_array.is_null(self.idx) {
621 ""
622 } else {
623 trace_state_array.value(self.idx)
624 }
625 }
626
627 pub fn attributes(&self) -> Vec<Attribute> {
628 extract_attributes_from_map(self.array, self.idx, "attributes")
629 }
630
631 pub fn dropped_attributes_count(&self) -> u32 {
632 let count_array = self
633 .array
634 .column_by_name("dropped_attributes_count")
635 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
636 .expect("dropped_attributes_count should be UInt32Array");
637 count_array.value(self.idx)
638 }
639
640 fn into_link(self) -> SpanLink {
641 SpanLink {
642 trace_id: self.trace_id(),
643 span_id: self.span_id(),
644 trace_state: self.trace_state().to_string(),
645 attributes: self.attributes(),
646 dropped_attributes_count: self.dropped_attributes_count(),
647 }
648 }
649}