Skip to main content

scouter_dataframe/parquet/tracing/
span_view.rs

1//! Zero-copy view over Arrow RecordBatch for TraceSpan data (flat, no hierarchy fields).
2//!
3//! Hierarchy fields (depth, span_order, path, root_span_id) are NOT stored in Delta Lake —
4//! they are computed at query time by `build_span_tree()` in `queries.rs`.
5
6use arrow::array::*;
7use chrono::{DateTime, TimeZone, Utc};
8use scouter_types::{Attribute, SpanId, TraceId};
9use scouter_types::{SpanEvent, SpanLink};
10use serde::Serialize;
11use std::sync::Arc;
12use tracing::{error, instrument};
13
14pub fn extract_attributes_from_map(
15    array: &StructArray,
16    idx: usize,
17    column_name: &str,
18) -> Vec<Attribute> {
19    let attr_col = array.column_by_name(column_name);
20
21    if attr_col.is_none() {
22        return Vec::new();
23    }
24
25    let map_array = attr_col
26        .and_then(|col| col.as_any().downcast_ref::<MapArray>())
27        .expect("attributes should be MapArray");
28
29    if map_array.is_null(idx) {
30        return Vec::new();
31    }
32
33    let struct_array = map_array.value(idx);
34    let keys = struct_array.column(0).as_string::<i32>();
35    let values = struct_array.column(1).as_string::<i32>();
36
37    (0..struct_array.len())
38        .map(|i| Attribute {
39            key: keys.value(i).to_string(),
40            value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
41        })
42        .collect()
43}
44
45/// Zero-copy view of a batch of trace spans backed by Arrow arrays.
46///
47/// Hierarchy fields are absent (they are computed at query time).
48/// Use `TraceQueries::get_trace_spans()` for the full `TraceSpan` type with hierarchy populated.
49#[derive(Clone)]
50pub struct TraceSpanBatch {
51    trace_ids: Arc<BinaryArray>,
52    span_ids: Arc<BinaryArray>,
53    parent_span_ids: Arc<BinaryArray>,
54    flags: Arc<Int32Array>,
55    trace_states: Arc<StringArray>,
56    scope_names: Arc<StringArray>,
57    scope_versions: Arc<StringArray>,
58    span_names: Arc<StringArray>,
59    service_names: Arc<StringArray>,
60    span_kinds: Arc<StringArray>,
61    start_times: Arc<TimestampMicrosecondArray>,
62    end_times: Arc<TimestampMicrosecondArray>,
63    durations: Arc<Int64Array>,
64    status_codes: Arc<Int32Array>,
65    status_messages: Arc<StringArray>,
66    labels: Arc<StringArray>,
67    attributes: Arc<MapArray>,
68    events: Arc<ListArray>,
69    links: Arc<ListArray>,
70    inputs: Arc<StringArray>,
71    outputs: Arc<StringArray>,
72
73    len: usize,
74}
75
76impl TraceSpanBatch {
77    /// Create a zero-copy view from a RecordBatch (new schema without hierarchy fields).
78    #[instrument(skip_all)]
79    pub fn from_record_batch(batch: &RecordBatch) -> Result<Self, arrow::error::ArrowError> {
80        let schema = batch.schema();
81
82        macro_rules! get_col {
83            ($name:expr, $type:ty) => {{
84                let idx = schema.index_of($name).inspect_err(|_| {
85                    error!("Column '{}' not found in batch schema", $name);
86                })?;
87                let array = batch.column(idx);
88                Arc::new(
89                    array
90                        .as_any()
91                        .downcast_ref::<$type>()
92                        .ok_or_else(|| {
93                            error!(
94                                "Column {} is not of expected type {}",
95                                $name,
96                                std::any::type_name::<$type>()
97                            );
98                            arrow::error::ArrowError::CastError(format!(
99                                "Column {} is not {}",
100                                $name,
101                                std::any::type_name::<$type>()
102                            ))
103                        })?
104                        .clone(),
105                )
106            }};
107        }
108
109        Ok(TraceSpanBatch {
110            trace_ids: get_col!("trace_id", BinaryArray),
111            span_ids: get_col!("span_id", BinaryArray),
112            parent_span_ids: get_col!("parent_span_id", BinaryArray),
113            flags: get_col!("flags", Int32Array),
114            trace_states: get_col!("trace_state", StringArray),
115            scope_names: get_col!("scope_name", StringArray),
116            scope_versions: get_col!("scope_version", StringArray),
117            span_names: get_col!("span_name", StringArray),
118            service_names: get_col!("service_name", StringArray),
119            span_kinds: get_col!("span_kind", StringArray),
120            start_times: get_col!("start_time", TimestampMicrosecondArray),
121            end_times: get_col!("end_time", TimestampMicrosecondArray),
122            durations: get_col!("duration_ms", Int64Array),
123            status_codes: get_col!("status_code", Int32Array),
124            status_messages: get_col!("status_message", StringArray),
125            labels: get_col!("label", StringArray),
126            attributes: get_col!("attributes", MapArray),
127            events: get_col!("events", ListArray),
128            links: get_col!("links", ListArray),
129            inputs: get_col!("input", StringArray),
130            outputs: get_col!("output", StringArray),
131            len: batch.num_rows(),
132        })
133    }
134
135    pub fn len(&self) -> usize {
136        self.len
137    }
138
139    pub fn is_empty(&self) -> bool {
140        self.len == 0
141    }
142
143    pub fn get(&self, idx: usize) -> Option<TraceSpanView<'_>> {
144        if idx >= self.len {
145            return None;
146        }
147        Some(TraceSpanView { batch: self, idx })
148    }
149
150    pub fn iter(&self) -> TraceSpanIterator<'_> {
151        TraceSpanIterator {
152            batch: self,
153            idx: 0,
154        }
155    }
156}
157
158/// Zero-copy view of a single span (no hierarchy fields).
159#[derive(Clone, Copy)]
160pub struct TraceSpanView<'a> {
161    batch: &'a TraceSpanBatch,
162    idx: usize,
163}
164
165impl<'a> TraceSpanView<'a> {
166    pub fn trace_id_bytes(&self) -> &[u8; 16] {
167        let bytes = self.batch.trace_ids.value(self.idx);
168        bytes.try_into().expect("Trace ID should be 16 bytes")
169    }
170
171    pub fn trace_id_hex(&self) -> String {
172        TraceId::from_bytes(*self.trace_id_bytes()).to_hex()
173    }
174
175    pub fn span_id_bytes(&self) -> &[u8; 8] {
176        let bytes = self.batch.span_ids.value(self.idx);
177        bytes.try_into().expect("Span ID should be 8 bytes")
178    }
179
180    pub fn span_id_hex(&self) -> String {
181        SpanId::from_bytes(*self.span_id_bytes()).to_hex()
182    }
183
184    pub fn parent_span_id_bytes(&self) -> Option<&[u8; 8]> {
185        if self.batch.parent_span_ids.is_null(self.idx) {
186            None
187        } else {
188            let bytes = self.batch.parent_span_ids.value(self.idx);
189            Some(bytes.try_into().expect("Parent Span ID should be 8 bytes"))
190        }
191    }
192
193    pub fn parent_span_id_hex(&self) -> Option<String> {
194        self.parent_span_id_bytes()
195            .map(|bytes| SpanId::from_bytes(*bytes).to_hex())
196    }
197
198    pub fn flags(&self) -> i32 {
199        self.batch.flags.value(self.idx)
200    }
201
202    pub fn trace_state(&self) -> &str {
203        self.batch.trace_states.value(self.idx)
204    }
205
206    pub fn scope_name(&self) -> &str {
207        self.batch.scope_names.value(self.idx)
208    }
209
210    pub fn scope_version(&self) -> Option<&str> {
211        if self.batch.scope_versions.is_null(self.idx) {
212            None
213        } else {
214            Some(self.batch.scope_versions.value(self.idx))
215        }
216    }
217
218    pub fn span_name(&self) -> &str {
219        self.batch.span_names.value(self.idx)
220    }
221
222    pub fn service_name(&self) -> &str {
223        self.batch.service_names.value(self.idx)
224    }
225
226    pub fn span_kind(&self) -> Option<&str> {
227        if self.batch.span_kinds.is_null(self.idx) {
228            None
229        } else {
230            Some(self.batch.span_kinds.value(self.idx))
231        }
232    }
233
234    pub fn start_time(&self) -> DateTime<Utc> {
235        let micros = self.batch.start_times.value(self.idx);
236        let secs = micros / 1_000_000;
237        let nanos = ((micros % 1_000_000) * 1_000) as u32;
238        Utc.timestamp_opt(secs, nanos).unwrap()
239    }
240
241    pub fn end_time(&self) -> DateTime<Utc> {
242        let micros = self.batch.end_times.value(self.idx);
243        let secs = micros / 1_000_000;
244        let nanos = ((micros % 1_000_000) * 1_000) as u32;
245        Utc.timestamp_opt(secs, nanos).unwrap()
246    }
247
248    pub fn duration_ms(&self) -> i64 {
249        self.batch.durations.value(self.idx)
250    }
251
252    pub fn status_code(&self) -> i32 {
253        self.batch.status_codes.value(self.idx)
254    }
255
256    pub fn status_message(&self) -> Option<&str> {
257        if self.batch.status_messages.is_null(self.idx) {
258            None
259        } else {
260            Some(self.batch.status_messages.value(self.idx))
261        }
262    }
263
264    pub fn label(&self) -> Option<&str> {
265        if self.batch.labels.is_null(self.idx) {
266            None
267        } else {
268            Some(self.batch.labels.value(self.idx))
269        }
270    }
271
272    pub fn input_json(&self) -> Option<&str> {
273        if self.batch.inputs.is_null(self.idx) {
274            None
275        } else {
276            Some(self.batch.inputs.value(self.idx))
277        }
278    }
279
280    pub fn output_json(&self) -> Option<&str> {
281        if self.batch.outputs.is_null(self.idx) {
282            None
283        } else {
284            Some(self.batch.outputs.value(self.idx))
285        }
286    }
287
288    pub fn attributes(&self) -> Vec<Attribute> {
289        if self.batch.attributes.is_null(self.idx) {
290            return Vec::new();
291        }
292        let struct_array = self.batch.attributes.value(self.idx);
293        let keys = struct_array.column(0).as_string::<i32>();
294        let values = struct_array.column(1).as_string::<i32>();
295        (0..struct_array.len())
296            .map(|i| Attribute {
297                key: keys.value(i).to_string(),
298                value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
299            })
300            .collect()
301    }
302
303    pub fn events(&self) -> Vec<SpanEvent> {
304        if self.batch.events.is_null(self.idx) {
305            return Vec::new();
306        }
307        let array = self.batch.events.value(self.idx);
308        let event_list = array.as_struct();
309        (0..event_list.len())
310            .map(|i| SpanEventView::new(event_list, i).into_event())
311            .collect()
312    }
313
314    pub fn links(&self) -> Vec<SpanLink> {
315        if self.batch.links.is_null(self.idx) {
316            return Vec::new();
317        }
318        let link_list = self.batch.links.value(self.idx);
319        let struct_array = link_list.as_struct();
320        (0..struct_array.len())
321            .map(|i| SpanLinkView::new(struct_array, i).into_link())
322            .collect()
323    }
324}
325
326impl<'a> Serialize for TraceSpanView<'a> {
327    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
328    where
329        S: serde::Serializer,
330    {
331        use serde::ser::SerializeStruct;
332
333        let mut state = serializer.serialize_struct("TraceSpanView", 21)?;
334
335        state.serialize_field("trace_id", &self.trace_id_hex())?;
336        state.serialize_field("span_id", &self.span_id_hex())?;
337        state.serialize_field("parent_span_id", &self.parent_span_id_hex())?;
338        state.serialize_field("flags", &self.flags())?;
339        state.serialize_field("trace_state", self.trace_state())?;
340        state.serialize_field("scope_name", self.scope_name())?;
341        state.serialize_field("scope_version", &self.scope_version())?;
342        state.serialize_field("span_name", self.span_name())?;
343        state.serialize_field("service_name", self.service_name())?;
344        state.serialize_field("span_kind", &self.span_kind())?;
345        state.serialize_field("start_time", &self.start_time())?;
346        state.serialize_field("end_time", &self.end_time())?;
347        state.serialize_field("duration_ms", &self.duration_ms())?;
348        state.serialize_field("status_code", &self.status_code())?;
349        state.serialize_field("status_message", &self.status_message())?;
350        state.serialize_field("label", &self.label())?;
351        state.serialize_field("input", &self.input_json())?;
352        state.serialize_field("output", &self.output_json())?;
353        state.serialize_field("attributes", &self.attributes())?;
354        state.serialize_field("events", &self.events())?;
355        state.serialize_field("links", &self.links())?;
356
357        state.end()
358    }
359}
360
361pub struct TraceSpanIterator<'a> {
362    batch: &'a TraceSpanBatch,
363    idx: usize,
364}
365
366impl<'a> Iterator for TraceSpanIterator<'a> {
367    type Item = TraceSpanView<'a>;
368
369    fn next(&mut self) -> Option<Self::Item> {
370        if self.idx >= self.batch.len() {
371            return None;
372        }
373        let view = TraceSpanView {
374            batch: self.batch,
375            idx: self.idx,
376        };
377        self.idx += 1;
378        Some(view)
379    }
380
381    fn size_hint(&self) -> (usize, Option<usize>) {
382        let remaining = self.batch.len() - self.idx;
383        (remaining, Some(remaining))
384    }
385}
386
387impl<'a> ExactSizeIterator for TraceSpanIterator<'a> {}
388
389pub struct SpanEventView<'a> {
390    array: &'a StructArray,
391    idx: usize,
392}
393
394impl<'a> SpanEventView<'a> {
395    fn new(array: &'a StructArray, idx: usize) -> Self {
396        Self { array, idx }
397    }
398
399    pub fn timestamp(&self) -> DateTime<Utc> {
400        let timestamp_array = self
401            .array
402            .column_by_name("timestamp")
403            .and_then(|col| col.as_any().downcast_ref::<TimestampMicrosecondArray>())
404            .expect("timestamp should be TimestampMicrosecondArray");
405
406        let micros = timestamp_array.value(self.idx);
407        let secs = micros / 1_000_000;
408        let nanos = ((micros % 1_000_000) * 1_000) as u32;
409        Utc.timestamp_opt(secs, nanos).unwrap()
410    }
411
412    pub fn name(&self) -> &str {
413        let name_array = self
414            .array
415            .column_by_name("name")
416            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
417            .expect("name should be StringArray");
418        name_array.value(self.idx)
419    }
420
421    pub fn attributes(&self) -> Vec<Attribute> {
422        extract_attributes_from_map(self.array, self.idx, "attributes")
423    }
424
425    pub fn dropped_attributes_count(&self) -> u32 {
426        let count_array = self
427            .array
428            .column_by_name("dropped_attributes_count")
429            .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
430            .expect("dropped_attributes_count should be UInt32Array");
431        count_array.value(self.idx)
432    }
433
434    fn into_event(self) -> SpanEvent {
435        SpanEvent {
436            timestamp: self.timestamp(),
437            name: self.name().to_string(),
438            attributes: self.attributes(),
439            dropped_attributes_count: self.dropped_attributes_count(),
440        }
441    }
442}
443
444pub struct SpanLinkView<'a> {
445    array: &'a StructArray,
446    idx: usize,
447}
448
449impl<'a> SpanLinkView<'a> {
450    fn new(array: &'a StructArray, idx: usize) -> Self {
451        Self { array, idx }
452    }
453
454    pub fn trace_id(&self) -> String {
455        let trace_id_array = self
456            .array
457            .column_by_name("trace_id")
458            .map(|col| col.as_fixed_size_binary())
459            .expect("trace_id should be FixedSizeBinaryArray");
460
461        let bytes = trace_id_array.value(self.idx);
462        let bytes_array: [u8; 16] = bytes.try_into().expect("trace_id should be 16 bytes");
463        TraceId::from_bytes(bytes_array).to_hex()
464    }
465
466    pub fn span_id(&self) -> String {
467        let span_id_array = self
468            .array
469            .column_by_name("span_id")
470            .map(|col| col.as_fixed_size_binary())
471            .expect("span_id should be FixedSizeBinaryArray");
472
473        let bytes = span_id_array.value(self.idx);
474        let bytes_array: [u8; 8] = bytes.try_into().expect("span_id should be 8 bytes");
475        SpanId::from_bytes(bytes_array).to_hex()
476    }
477
478    pub fn trace_state(&self) -> &str {
479        let trace_state_array = self
480            .array
481            .column_by_name("trace_state")
482            .map(|col| col.as_string::<i32>())
483            .expect("trace_state should be StringArray");
484
485        if trace_state_array.is_null(self.idx) {
486            ""
487        } else {
488            trace_state_array.value(self.idx)
489        }
490    }
491
492    pub fn attributes(&self) -> Vec<Attribute> {
493        extract_attributes_from_map(self.array, self.idx, "attributes")
494    }
495
496    pub fn dropped_attributes_count(&self) -> u32 {
497        let count_array = self
498            .array
499            .column_by_name("dropped_attributes_count")
500            .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
501            .expect("dropped_attributes_count should be UInt32Array");
502        count_array.value(self.idx)
503    }
504
505    fn into_link(self) -> SpanLink {
506        SpanLink {
507            trace_id: self.trace_id(),
508            span_id: self.span_id(),
509            trace_state: self.trace_state().to_string(),
510            attributes: self.attributes(),
511            dropped_attributes_count: self.dropped_attributes_count(),
512        }
513    }
514}