Skip to main content

scouter_dataframe/parquet/tracing/
span_view.rs

1//! Zero-copy view over Arrow RecordBatch for TraceSpan data
2//!
3//! This module provides efficient, zero-allocation access to trace span data
4//! by holding references directly to Arrow arrays. Allocations only happen
5//! during serialization (e.g., hex encoding IDs, JSON serialization).
6
7use arrow::array::*;
8use chrono::{DateTime, TimeZone, Utc};
9use scouter_types::{Attribute, SpanId, TraceId};
10use scouter_types::{SpanEvent, SpanLink};
11use serde::Serialize;
12use std::sync::Arc;
13use tracing::{error, instrument};
14
15pub fn extract_attributes_from_map(
16    array: &StructArray,
17    idx: usize,
18    column_name: &str,
19) -> Vec<Attribute> {
20    let attr_col = array.column_by_name(column_name);
21
22    if attr_col.is_none() {
23        return Vec::new();
24    }
25
26    let map_array = attr_col
27        .and_then(|col| col.as_any().downcast_ref::<MapArray>())
28        .expect("attributes should be MapArray");
29
30    if map_array.is_null(idx) {
31        return Vec::new();
32    }
33
34    let struct_array = map_array.value(idx);
35    let keys = struct_array.column(0).as_string::<i32>();
36    let values = struct_array.column(1).as_string::<i32>();
37
38    (0..struct_array.len())
39        .map(|i| Attribute {
40            key: keys.value(i).to_string(),
41            value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
42        })
43        .collect()
44}
45
46/// Zero-copy view of a trace span backed by Arrow arrays
47///
48/// Benefits over owned TraceSpan:
49/// - No string allocations until serialization
50/// - No hex encoding overhead until needed
51/// - Direct memory access to Arrow buffers
52/// - Multiple spans share same Arrow array backing
53///
54/// Use case: Query millions of spans, serialize subset to API response
55#[derive(Clone)]
56pub struct TraceSpanBatch {
57    // Hold Arc to keep arrays alive
58    trace_ids: Arc<BinaryArray>, // datafusion/deltalake read back binary
59    span_ids: Arc<BinaryArray>,
60    parent_span_ids: Arc<BinaryArray>,
61    root_span_ids: Arc<BinaryArray>,
62    span_names: Arc<StringArray>,
63    service_names: Arc<StringArray>,
64    span_kinds: Arc<StringArray>,
65    start_times: Arc<TimestampMicrosecondArray>,
66    end_times: Arc<TimestampMicrosecondArray>,
67    durations: Arc<Int64Array>,
68    status_codes: Arc<Int32Array>,
69    status_messages: Arc<StringArray>,
70    depths: Arc<Int32Array>,
71    span_orders: Arc<Int32Array>,
72    paths: Arc<ListArray>,
73    attributes: Arc<MapArray>,
74    events: Arc<ListArray>,
75    links: Arc<ListArray>,
76    inputs: Arc<StringArray>,
77    outputs: Arc<StringArray>,
78
79    // Number of rows in this batch
80    len: usize,
81}
82
83impl TraceSpanBatch {
84    /// Create a zero-copy view from a RecordBatch
85    ///
86    /// This doesn't allocate - just holds Arc references to arrays
87    #[instrument(skip_all)]
88    pub fn from_record_batch(batch: &RecordBatch) -> Result<Self, arrow::error::ArrowError> {
89        let schema = batch.schema();
90
91        // Macro to extract typed array with error handling
92        macro_rules! get_col {
93            ($name:expr, $type:ty) => {{
94                let idx = schema.index_of($name).inspect_err(|_| {
95                    error!("Column '{}' not found in batch schema", $name);
96                })?;
97                let array = batch.column(idx);
98                Arc::new(
99                    array
100                        .as_any()
101                        .downcast_ref::<$type>()
102                        .ok_or_else(|| {
103                            error!(
104                                "Column {} is not of expected type {}",
105                                $name,
106                                std::any::type_name::<$type>()
107                            );
108                            arrow::error::ArrowError::CastError(format!(
109                                "Column {} is not {}",
110                                $name,
111                                std::any::type_name::<$type>()
112                            ))
113                        })?
114                        .clone(),
115                )
116            }};
117        }
118
119        Ok(TraceSpanBatch {
120            trace_ids: get_col!("trace_id", BinaryArray),
121            span_ids: get_col!("span_id", BinaryArray),
122            parent_span_ids: get_col!("parent_span_id", BinaryArray),
123            root_span_ids: get_col!("root_span_id", BinaryArray),
124            span_names: get_col!("span_name", StringArray),
125            service_names: get_col!("service_name", StringArray),
126            span_kinds: get_col!("span_kind", StringArray),
127            start_times: get_col!("start_time", TimestampMicrosecondArray),
128            end_times: get_col!("end_time", TimestampMicrosecondArray),
129            durations: get_col!("duration_ms", Int64Array),
130            status_codes: get_col!("status_code", Int32Array),
131            status_messages: get_col!("status_message", StringArray),
132            depths: get_col!("depth", Int32Array),
133            span_orders: get_col!("span_order", Int32Array),
134            paths: get_col!("path", ListArray),
135            attributes: get_col!("attributes", MapArray),
136            events: get_col!("events", ListArray),
137            links: get_col!("links", ListArray),
138            inputs: get_col!("input", StringArray),
139            outputs: get_col!("output", StringArray),
140            len: batch.num_rows(),
141        })
142    }
143
144    /// Number of spans in this batch
145    pub fn len(&self) -> usize {
146        self.len
147    }
148
149    pub fn is_empty(&self) -> bool {
150        self.len == 0
151    }
152
153    /// Get a view of a single span (zero-copy)
154    pub fn get(&self, idx: usize) -> Option<TraceSpanView<'_>> {
155        if idx >= self.len {
156            return None;
157        }
158
159        Some(TraceSpanView { batch: self, idx })
160    }
161
162    /// Iterator over all spans in this batch (zero-copy)
163    pub fn iter(&self) -> TraceSpanIterator<'_> {
164        TraceSpanIterator {
165            batch: self,
166            idx: 0,
167        }
168    }
169}
170
171/// Zero-copy view of a single span within a batch
172///
173/// This struct holds no data - just a reference to the batch and an index.
174/// All field access is done on-demand without allocation.
175#[derive(Clone, Copy)]
176pub struct TraceSpanView<'a> {
177    batch: &'a TraceSpanBatch,
178    idx: usize,
179}
180
181impl<'a> TraceSpanView<'a> {
182    /// Get trace ID as raw bytes (zero-copy)
183    pub fn trace_id_bytes(&self) -> &[u8; 16] {
184        let bytes = self.batch.trace_ids.value(self.idx);
185        bytes.try_into().expect("Trace ID should be 16 bytes")
186    }
187
188    /// Get trace ID as hex string (allocates)
189    pub fn trace_id_hex(&self) -> String {
190        TraceId::from_bytes(*self.trace_id_bytes()).to_hex()
191    }
192
193    /// Get span ID as raw bytes (zero-copy)
194    pub fn span_id_bytes(&self) -> &[u8; 8] {
195        let bytes = self.batch.span_ids.value(self.idx);
196        bytes.try_into().expect("Span ID should be 8 bytes")
197    }
198
199    /// Get span ID as hex string (allocates)
200    pub fn span_id_hex(&self) -> String {
201        SpanId::from_bytes(*self.span_id_bytes()).to_hex()
202    }
203
204    /// Get parent span ID as raw bytes (zero-copy)
205    pub fn parent_span_id_bytes(&self) -> Option<&[u8; 8]> {
206        if self.batch.parent_span_ids.is_null(self.idx) {
207            None
208        } else {
209            let bytes = self.batch.parent_span_ids.value(self.idx);
210            Some(bytes.try_into().expect("Parent Span ID should be 8 bytes"))
211        }
212    }
213
214    /// Get parent span ID as hex string (allocates)
215    pub fn parent_span_id_hex(&self) -> Option<String> {
216        self.parent_span_id_bytes()
217            .map(|bytes| SpanId::from_bytes(*bytes).to_hex())
218    }
219
220    /// Get root span ID as raw bytes (zero-copy)
221    pub fn root_span_id_bytes(&self) -> &[u8; 8] {
222        let bytes = self.batch.root_span_ids.value(self.idx);
223        bytes.try_into().expect("Root Span ID should be 8 bytes")
224    }
225
226    /// Get root span ID as hex string (allocates)
227    pub fn root_span_id_hex(&self) -> String {
228        SpanId::from_bytes(*self.root_span_id_bytes()).to_hex()
229    }
230
231    /// Get span name as string slice (zero-copy)
232    pub fn span_name(&self) -> &str {
233        self.batch.span_names.value(self.idx)
234    }
235
236    /// Get service name as string slice (zero-copy)
237    pub fn service_name(&self) -> &str {
238        self.batch.service_names.value(self.idx)
239    }
240
241    /// Get span kind as string slice (zero-copy)
242    pub fn span_kind(&self) -> Option<&str> {
243        if self.batch.span_kinds.is_null(self.idx) {
244            None
245        } else {
246            Some(self.batch.span_kinds.value(self.idx))
247        }
248    }
249
250    /// Get start time as DateTime<Utc>
251    pub fn start_time(&self) -> DateTime<Utc> {
252        let micros = self.batch.start_times.value(self.idx);
253        let secs = micros / 1_000_000;
254        let nanos = ((micros % 1_000_000) * 1_000) as u32;
255        Utc.timestamp_opt(secs, nanos).unwrap()
256    }
257
258    /// Get end time as DateTime<Utc>
259    pub fn end_time(&self) -> DateTime<Utc> {
260        let micros = self.batch.end_times.value(self.idx);
261        let secs = micros / 1_000_000;
262        let nanos = ((micros % 1_000_000) * 1_000) as u32;
263        Utc.timestamp_opt(secs, nanos).unwrap()
264    }
265
266    /// Get duration in milliseconds
267    pub fn duration_ms(&self) -> i64 {
268        self.batch.durations.value(self.idx)
269    }
270
271    /// Get status code
272    pub fn status_code(&self) -> i32 {
273        self.batch.status_codes.value(self.idx)
274    }
275
276    /// Get status message (zero-copy)
277    pub fn status_message(&self) -> Option<&str> {
278        if self.batch.status_messages.is_null(self.idx) {
279            None
280        } else {
281            Some(self.batch.status_messages.value(self.idx))
282        }
283    }
284
285    /// Get span depth in tree
286    pub fn depth(&self) -> i32 {
287        self.batch.depths.value(self.idx)
288    }
289
290    /// Get span order (for tree traversal)
291    pub fn span_order(&self) -> i32 {
292        self.batch.span_orders.value(self.idx)
293    }
294
295    /// Get path as list of span IDs (returns iterator to avoid allocation)
296    pub fn path_iter(&self) -> impl Iterator<Item = &'a str> {
297        PathIterator::new(self.batch, self.idx)
298    }
299
300    /// Get input JSON (zero-copy string slice, parse on-demand)
301    pub fn input_json(&self) -> Option<&str> {
302        if self.batch.inputs.is_null(self.idx) {
303            None
304        } else {
305            Some(self.batch.inputs.value(self.idx))
306        }
307    }
308
309    /// Get output JSON (zero-copy string slice, parse on-demand)
310    pub fn output_json(&self) -> Option<&str> {
311        if self.batch.outputs.is_null(self.idx) {
312            None
313        } else {
314            Some(self.batch.outputs.value(self.idx))
315        }
316    }
317
318    /// Extract attributes as key-value pairs
319    pub fn attributes(&self) -> Vec<Attribute> {
320        if self.batch.attributes.is_null(self.idx) {
321            return Vec::new();
322        }
323
324        let struct_array = self.batch.attributes.value(self.idx);
325        let keys = struct_array.column(0).as_string::<i32>();
326        let values = struct_array.column(1).as_string::<i32>();
327        (0..struct_array.len())
328            .map(|i| Attribute {
329                key: keys.value(i).to_string(),
330                value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
331            })
332            .collect()
333    }
334
335    /// Extract events as structured data
336    pub fn events(&self) -> Vec<SpanEvent> {
337        if self.batch.events.is_null(self.idx) {
338            return Vec::new();
339        }
340
341        let array = self.batch.events.value(self.idx);
342        let event_list = array.as_struct();
343
344        (0..event_list.len())
345            .map(|i| SpanEventView::new(event_list, i).into_event())
346            .collect()
347    }
348
349    /// Extract links as structured data
350    pub fn links(&self) -> Vec<SpanLink> {
351        if self.batch.links.is_null(self.idx) {
352            return Vec::new();
353        }
354
355        let link_list = self.batch.links.value(self.idx);
356        let struct_array = link_list.as_struct();
357
358        (0..struct_array.len())
359            .map(|i| SpanLinkView::new(struct_array, i).into_link())
360            .collect()
361    }
362}
363
364/// Implement Serialize to convert directly from Arrow to JSON
365/// This is where allocations happen - only during serialization
366impl<'a> Serialize for TraceSpanView<'a> {
367    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
368    where
369        S: serde::Serializer,
370    {
371        use serde::ser::SerializeStruct;
372
373        let mut state = serializer.serialize_struct("TraceSpan", 19)?;
374
375        // Allocate hex strings only during serialization
376        state.serialize_field("trace_id", &self.trace_id_hex())?;
377        state.serialize_field("span_id", &self.span_id_hex())?;
378        state.serialize_field("parent_span_id", &self.parent_span_id_hex())?;
379        state.serialize_field("root_span_id", &self.root_span_id_hex())?;
380
381        // Zero-copy string slices
382        state.serialize_field("span_name", self.span_name())?;
383        state.serialize_field("service_name", self.service_name())?;
384        state.serialize_field("span_kind", &self.span_kind())?;
385
386        // Times
387        state.serialize_field("start_time", &self.start_time())?;
388        state.serialize_field("end_time", &self.end_time())?;
389        state.serialize_field("duration_ms", &self.duration_ms())?;
390
391        // Status
392        state.serialize_field("status_code", &self.status_code())?;
393        state.serialize_field("status_message", &self.status_message())?;
394
395        // Hierarchy
396        state.serialize_field("depth", &self.depth())?;
397        state.serialize_field("span_order", &self.span_order())?;
398
399        // Path (collect into Vec for serialization)
400        state.serialize_field("path", &self.path_iter().collect::<Vec<_>>())?;
401
402        // JSON fields (parse on-demand if needed, or serialize as raw string)
403        state.serialize_field("input", &self.input_json())?;
404        state.serialize_field("output", &self.output_json())?;
405
406        state.serialize_field("attributes", &self.attributes())?;
407        state.serialize_field("events", &self.events())?;
408        state.serialize_field("links", &self.links())?;
409
410        state.end()
411    }
412}
413
414/// Iterator over spans in a batch
415pub struct TraceSpanIterator<'a> {
416    batch: &'a TraceSpanBatch,
417    idx: usize,
418}
419
420impl<'a> Iterator for TraceSpanIterator<'a> {
421    type Item = TraceSpanView<'a>;
422
423    fn next(&mut self) -> Option<Self::Item> {
424        if self.idx >= self.batch.len() {
425            return None;
426        }
427
428        let view = TraceSpanView {
429            batch: self.batch,
430            idx: self.idx,
431        };
432
433        self.idx += 1;
434        Some(view)
435    }
436
437    fn size_hint(&self) -> (usize, Option<usize>) {
438        let remaining = self.batch.len() - self.idx;
439        (remaining, Some(remaining))
440    }
441}
442
443impl<'a> ExactSizeIterator for TraceSpanIterator<'a> {}
444
445/// Iterator over path elements
446///
447/// This iterator maintains a reference to the TraceSpanBatch, which ensures
448/// the underlying Arrow arrays remain valid for the lifetime 'a. This allows
449/// us to return string slices without unsafe code.
450enum PathIterator<'a> {
451    Empty,
452    NonEmpty {
453        batch: &'a TraceSpanBatch,
454        span_idx: usize,
455        path_idx: usize,
456        path_len: usize,
457    },
458}
459
460impl<'a> PathIterator<'a> {
461    fn new(batch: &'a TraceSpanBatch, span_idx: usize) -> Self {
462        // Check if this span has a path
463        if batch.paths.is_null(span_idx) {
464            return PathIterator::Empty;
465        }
466
467        // Get the length of the path list for this span
468        let path_len = batch.paths.value_length(span_idx) as usize;
469
470        PathIterator::NonEmpty {
471            batch,
472            span_idx,
473            path_idx: 0,
474            path_len,
475        }
476    }
477}
478
479impl<'a> Iterator for PathIterator<'a> {
480    type Item = &'a str;
481
482    fn next(&mut self) -> Option<Self::Item> {
483        match self {
484            PathIterator::Empty => None,
485            PathIterator::NonEmpty {
486                batch,
487                span_idx,
488                path_idx,
489                path_len,
490            } => {
491                if *path_idx >= *path_len {
492                    return None;
493                }
494
495                // Get the offset for this span's list in the flattened values array
496                let offset = batch.paths.value_offsets()[*span_idx] as usize;
497
498                // Get the underlying StringArray from the ListArray
499                let string_array = batch
500                    .paths
501                    .values()
502                    .as_any()
503                    .downcast_ref::<StringArray>()
504                    .expect("Path values should be StringArray");
505
506                // Calculate the actual index in the flattened array
507                let actual_idx = offset + *path_idx;
508
509                // Get the string value - this is safe because:
510                // 1. We hold a reference to the batch for lifetime 'a
511                // 2. The batch holds Arc<ListArray> which keeps the data alive
512                // 3. The returned &str is valid for as long as the batch reference is valid
513                let value = string_array.value(actual_idx);
514                *path_idx += 1;
515
516                Some(value)
517            }
518        }
519    }
520}
521
522/// Zero-copy view of a span event
523pub struct SpanEventView<'a> {
524    array: &'a StructArray,
525    idx: usize,
526}
527
528impl<'a> SpanEventView<'a> {
529    fn new(array: &'a StructArray, idx: usize) -> Self {
530        Self { array, idx }
531    }
532
533    pub fn timestamp(&self) -> DateTime<Utc> {
534        let timestamp_array = self
535            .array
536            .column_by_name("timestamp")
537            .and_then(|col| col.as_any().downcast_ref::<TimestampMicrosecondArray>())
538            .expect("timestamp should be TimestampMicrosecondArray");
539
540        let micros = timestamp_array.value(self.idx);
541        let secs = micros / 1_000_000;
542        let nanos = ((micros % 1_000_000) * 1_000) as u32;
543        Utc.timestamp_opt(secs, nanos).unwrap()
544    }
545
546    pub fn name(&self) -> &str {
547        let name_array = self
548            .array
549            .column_by_name("name")
550            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
551            .expect("name should be StringArray");
552        name_array.value(self.idx)
553    }
554
555    pub fn attributes(&self) -> Vec<Attribute> {
556        extract_attributes_from_map(self.array, self.idx, "attributes")
557    }
558
559    pub fn dropped_attributes_count(&self) -> u32 {
560        let count_array = self
561            .array
562            .column_by_name("dropped_attributes_count")
563            .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
564            .expect("dropped_attributes_count should be UInt32Array");
565        count_array.value(self.idx)
566    }
567
568    fn into_event(self) -> SpanEvent {
569        SpanEvent {
570            timestamp: self.timestamp(),
571            name: self.name().to_string(),
572            attributes: self.attributes(),
573            dropped_attributes_count: self.dropped_attributes_count(),
574        }
575    }
576}
577
578/// Zero-copy view of a span link
579pub struct SpanLinkView<'a> {
580    array: &'a StructArray,
581    idx: usize,
582}
583
584impl<'a> SpanLinkView<'a> {
585    fn new(array: &'a StructArray, idx: usize) -> Self {
586        Self { array, idx }
587    }
588
589    pub fn trace_id(&self) -> String {
590        let trace_id_array = self
591            .array
592            .column_by_name("trace_id")
593            .map(|col| col.as_fixed_size_binary())
594            .expect("trace_id should be FixedSizeBinaryArray");
595
596        let bytes = trace_id_array.value(self.idx);
597        let bytes_array: [u8; 16] = bytes.try_into().expect("trace_id should be 16 bytes");
598        TraceId::from_bytes(bytes_array).to_hex()
599    }
600
601    pub fn span_id(&self) -> String {
602        let span_id_array = self
603            .array
604            .column_by_name("span_id")
605            .map(|col| col.as_fixed_size_binary())
606            .expect("span_id should be FixedSizeBinaryArray");
607
608        let bytes = span_id_array.value(self.idx);
609        let bytes_array: [u8; 8] = bytes.try_into().expect("span_id should be 8 bytes");
610        SpanId::from_bytes(bytes_array).to_hex()
611    }
612
613    pub fn trace_state(&self) -> &str {
614        let trace_state_array = self
615            .array
616            .column_by_name("trace_state")
617            .map(|col| col.as_string::<i32>())
618            .expect("trace_state should be StringArray");
619
620        if trace_state_array.is_null(self.idx) {
621            ""
622        } else {
623            trace_state_array.value(self.idx)
624        }
625    }
626
627    pub fn attributes(&self) -> Vec<Attribute> {
628        extract_attributes_from_map(self.array, self.idx, "attributes")
629    }
630
631    pub fn dropped_attributes_count(&self) -> u32 {
632        let count_array = self
633            .array
634            .column_by_name("dropped_attributes_count")
635            .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
636            .expect("dropped_attributes_count should be UInt32Array");
637        count_array.value(self.idx)
638    }
639
640    fn into_link(self) -> SpanLink {
641        SpanLink {
642            trace_id: self.trace_id(),
643            span_id: self.span_id(),
644            trace_state: self.trace_state().to_string(),
645            attributes: self.attributes(),
646            dropped_attributes_count: self.dropped_attributes_count(),
647        }
648    }
649}