scouter_dataframe/parquet/tracing/
span_view.rs1use arrow::array::*;
7use chrono::{DateTime, TimeZone, Utc};
8use scouter_types::{Attribute, SpanId, TraceId};
9use scouter_types::{SpanEvent, SpanLink};
10use serde::Serialize;
11use std::sync::Arc;
12use tracing::{error, instrument};
13
14pub fn extract_attributes_from_map(
15 array: &StructArray,
16 idx: usize,
17 column_name: &str,
18) -> Vec<Attribute> {
19 let attr_col = array.column_by_name(column_name);
20
21 if attr_col.is_none() {
22 return Vec::new();
23 }
24
25 let map_array = attr_col
26 .and_then(|col| col.as_any().downcast_ref::<MapArray>())
27 .expect("attributes should be MapArray");
28
29 if map_array.is_null(idx) {
30 return Vec::new();
31 }
32
33 let struct_array = map_array.value(idx);
34 let keys = struct_array.column(0).as_string::<i32>();
35 let values = struct_array.column(1).as_string::<i32>();
36
37 (0..struct_array.len())
38 .map(|i| Attribute {
39 key: keys.value(i).to_string(),
40 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
41 })
42 .collect()
43}
44
45#[derive(Clone)]
50pub struct TraceSpanBatch {
51 trace_ids: Arc<BinaryArray>,
52 span_ids: Arc<BinaryArray>,
53 parent_span_ids: Arc<BinaryArray>,
54 flags: Arc<Int32Array>,
55 trace_states: Arc<StringArray>,
56 scope_names: Arc<StringArray>,
57 scope_versions: Arc<StringArray>,
58 span_names: Arc<StringArray>,
59 service_names: Arc<StringArray>,
60 span_kinds: Arc<StringArray>,
61 start_times: Arc<TimestampMicrosecondArray>,
62 end_times: Arc<TimestampMicrosecondArray>,
63 durations: Arc<Int64Array>,
64 status_codes: Arc<Int32Array>,
65 status_messages: Arc<StringArray>,
66 labels: Arc<StringArray>,
67 attributes: Arc<MapArray>,
68 events: Arc<ListArray>,
69 links: Arc<ListArray>,
70 inputs: Arc<StringArray>,
71 outputs: Arc<StringArray>,
72
73 len: usize,
74}
75
76impl TraceSpanBatch {
77 #[instrument(skip_all)]
79 pub fn from_record_batch(batch: &RecordBatch) -> Result<Self, arrow::error::ArrowError> {
80 let schema = batch.schema();
81
82 macro_rules! get_col {
83 ($name:expr, $type:ty) => {{
84 let idx = schema.index_of($name).inspect_err(|_| {
85 error!("Column '{}' not found in batch schema", $name);
86 })?;
87 let array = batch.column(idx);
88 Arc::new(
89 array
90 .as_any()
91 .downcast_ref::<$type>()
92 .ok_or_else(|| {
93 error!(
94 "Column {} is not of expected type {}",
95 $name,
96 std::any::type_name::<$type>()
97 );
98 arrow::error::ArrowError::CastError(format!(
99 "Column {} is not {}",
100 $name,
101 std::any::type_name::<$type>()
102 ))
103 })?
104 .clone(),
105 )
106 }};
107 }
108
109 Ok(TraceSpanBatch {
110 trace_ids: get_col!("trace_id", BinaryArray),
111 span_ids: get_col!("span_id", BinaryArray),
112 parent_span_ids: get_col!("parent_span_id", BinaryArray),
113 flags: get_col!("flags", Int32Array),
114 trace_states: get_col!("trace_state", StringArray),
115 scope_names: get_col!("scope_name", StringArray),
116 scope_versions: get_col!("scope_version", StringArray),
117 span_names: get_col!("span_name", StringArray),
118 service_names: get_col!("service_name", StringArray),
119 span_kinds: get_col!("span_kind", StringArray),
120 start_times: get_col!("start_time", TimestampMicrosecondArray),
121 end_times: get_col!("end_time", TimestampMicrosecondArray),
122 durations: get_col!("duration_ms", Int64Array),
123 status_codes: get_col!("status_code", Int32Array),
124 status_messages: get_col!("status_message", StringArray),
125 labels: get_col!("label", StringArray),
126 attributes: get_col!("attributes", MapArray),
127 events: get_col!("events", ListArray),
128 links: get_col!("links", ListArray),
129 inputs: get_col!("input", StringArray),
130 outputs: get_col!("output", StringArray),
131 len: batch.num_rows(),
132 })
133 }
134
135 pub fn len(&self) -> usize {
136 self.len
137 }
138
139 pub fn is_empty(&self) -> bool {
140 self.len == 0
141 }
142
143 pub fn get(&self, idx: usize) -> Option<TraceSpanView<'_>> {
144 if idx >= self.len {
145 return None;
146 }
147 Some(TraceSpanView { batch: self, idx })
148 }
149
150 pub fn iter(&self) -> TraceSpanIterator<'_> {
151 TraceSpanIterator {
152 batch: self,
153 idx: 0,
154 }
155 }
156}
157
158#[derive(Clone, Copy)]
160pub struct TraceSpanView<'a> {
161 batch: &'a TraceSpanBatch,
162 idx: usize,
163}
164
165impl<'a> TraceSpanView<'a> {
166 pub fn trace_id_bytes(&self) -> &[u8; 16] {
167 let bytes = self.batch.trace_ids.value(self.idx);
168 bytes.try_into().expect("Trace ID should be 16 bytes")
169 }
170
171 pub fn trace_id_hex(&self) -> String {
172 TraceId::from_bytes(*self.trace_id_bytes()).to_hex()
173 }
174
175 pub fn span_id_bytes(&self) -> &[u8; 8] {
176 let bytes = self.batch.span_ids.value(self.idx);
177 bytes.try_into().expect("Span ID should be 8 bytes")
178 }
179
180 pub fn span_id_hex(&self) -> String {
181 SpanId::from_bytes(*self.span_id_bytes()).to_hex()
182 }
183
184 pub fn parent_span_id_bytes(&self) -> Option<&[u8; 8]> {
185 if self.batch.parent_span_ids.is_null(self.idx) {
186 None
187 } else {
188 let bytes = self.batch.parent_span_ids.value(self.idx);
189 Some(bytes.try_into().expect("Parent Span ID should be 8 bytes"))
190 }
191 }
192
193 pub fn parent_span_id_hex(&self) -> Option<String> {
194 self.parent_span_id_bytes()
195 .map(|bytes| SpanId::from_bytes(*bytes).to_hex())
196 }
197
198 pub fn flags(&self) -> i32 {
199 self.batch.flags.value(self.idx)
200 }
201
202 pub fn trace_state(&self) -> &str {
203 self.batch.trace_states.value(self.idx)
204 }
205
206 pub fn scope_name(&self) -> &str {
207 self.batch.scope_names.value(self.idx)
208 }
209
210 pub fn scope_version(&self) -> Option<&str> {
211 if self.batch.scope_versions.is_null(self.idx) {
212 None
213 } else {
214 Some(self.batch.scope_versions.value(self.idx))
215 }
216 }
217
218 pub fn span_name(&self) -> &str {
219 self.batch.span_names.value(self.idx)
220 }
221
222 pub fn service_name(&self) -> &str {
223 self.batch.service_names.value(self.idx)
224 }
225
226 pub fn span_kind(&self) -> Option<&str> {
227 if self.batch.span_kinds.is_null(self.idx) {
228 None
229 } else {
230 Some(self.batch.span_kinds.value(self.idx))
231 }
232 }
233
234 pub fn start_time(&self) -> DateTime<Utc> {
235 let micros = self.batch.start_times.value(self.idx);
236 let secs = micros / 1_000_000;
237 let nanos = ((micros % 1_000_000) * 1_000) as u32;
238 Utc.timestamp_opt(secs, nanos).unwrap()
239 }
240
241 pub fn end_time(&self) -> DateTime<Utc> {
242 let micros = self.batch.end_times.value(self.idx);
243 let secs = micros / 1_000_000;
244 let nanos = ((micros % 1_000_000) * 1_000) as u32;
245 Utc.timestamp_opt(secs, nanos).unwrap()
246 }
247
248 pub fn duration_ms(&self) -> i64 {
249 self.batch.durations.value(self.idx)
250 }
251
252 pub fn status_code(&self) -> i32 {
253 self.batch.status_codes.value(self.idx)
254 }
255
256 pub fn status_message(&self) -> Option<&str> {
257 if self.batch.status_messages.is_null(self.idx) {
258 None
259 } else {
260 Some(self.batch.status_messages.value(self.idx))
261 }
262 }
263
264 pub fn label(&self) -> Option<&str> {
265 if self.batch.labels.is_null(self.idx) {
266 None
267 } else {
268 Some(self.batch.labels.value(self.idx))
269 }
270 }
271
272 pub fn input_json(&self) -> Option<&str> {
273 if self.batch.inputs.is_null(self.idx) {
274 None
275 } else {
276 Some(self.batch.inputs.value(self.idx))
277 }
278 }
279
280 pub fn output_json(&self) -> Option<&str> {
281 if self.batch.outputs.is_null(self.idx) {
282 None
283 } else {
284 Some(self.batch.outputs.value(self.idx))
285 }
286 }
287
288 pub fn attributes(&self) -> Vec<Attribute> {
289 if self.batch.attributes.is_null(self.idx) {
290 return Vec::new();
291 }
292 let struct_array = self.batch.attributes.value(self.idx);
293 let keys = struct_array.column(0).as_string::<i32>();
294 let values = struct_array.column(1).as_string::<i32>();
295 (0..struct_array.len())
296 .map(|i| Attribute {
297 key: keys.value(i).to_string(),
298 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
299 })
300 .collect()
301 }
302
303 pub fn events(&self) -> Vec<SpanEvent> {
304 if self.batch.events.is_null(self.idx) {
305 return Vec::new();
306 }
307 let array = self.batch.events.value(self.idx);
308 let event_list = array.as_struct();
309 (0..event_list.len())
310 .map(|i| SpanEventView::new(event_list, i).into_event())
311 .collect()
312 }
313
314 pub fn links(&self) -> Vec<SpanLink> {
315 if self.batch.links.is_null(self.idx) {
316 return Vec::new();
317 }
318 let link_list = self.batch.links.value(self.idx);
319 let struct_array = link_list.as_struct();
320 (0..struct_array.len())
321 .map(|i| SpanLinkView::new(struct_array, i).into_link())
322 .collect()
323 }
324}
325
326impl<'a> Serialize for TraceSpanView<'a> {
327 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
328 where
329 S: serde::Serializer,
330 {
331 use serde::ser::SerializeStruct;
332
333 let mut state = serializer.serialize_struct("TraceSpanView", 21)?;
334
335 state.serialize_field("trace_id", &self.trace_id_hex())?;
336 state.serialize_field("span_id", &self.span_id_hex())?;
337 state.serialize_field("parent_span_id", &self.parent_span_id_hex())?;
338 state.serialize_field("flags", &self.flags())?;
339 state.serialize_field("trace_state", self.trace_state())?;
340 state.serialize_field("scope_name", self.scope_name())?;
341 state.serialize_field("scope_version", &self.scope_version())?;
342 state.serialize_field("span_name", self.span_name())?;
343 state.serialize_field("service_name", self.service_name())?;
344 state.serialize_field("span_kind", &self.span_kind())?;
345 state.serialize_field("start_time", &self.start_time())?;
346 state.serialize_field("end_time", &self.end_time())?;
347 state.serialize_field("duration_ms", &self.duration_ms())?;
348 state.serialize_field("status_code", &self.status_code())?;
349 state.serialize_field("status_message", &self.status_message())?;
350 state.serialize_field("label", &self.label())?;
351 state.serialize_field("input", &self.input_json())?;
352 state.serialize_field("output", &self.output_json())?;
353 state.serialize_field("attributes", &self.attributes())?;
354 state.serialize_field("events", &self.events())?;
355 state.serialize_field("links", &self.links())?;
356
357 state.end()
358 }
359}
360
361pub struct TraceSpanIterator<'a> {
362 batch: &'a TraceSpanBatch,
363 idx: usize,
364}
365
366impl<'a> Iterator for TraceSpanIterator<'a> {
367 type Item = TraceSpanView<'a>;
368
369 fn next(&mut self) -> Option<Self::Item> {
370 if self.idx >= self.batch.len() {
371 return None;
372 }
373 let view = TraceSpanView {
374 batch: self.batch,
375 idx: self.idx,
376 };
377 self.idx += 1;
378 Some(view)
379 }
380
381 fn size_hint(&self) -> (usize, Option<usize>) {
382 let remaining = self.batch.len() - self.idx;
383 (remaining, Some(remaining))
384 }
385}
386
387impl<'a> ExactSizeIterator for TraceSpanIterator<'a> {}
388
389pub struct SpanEventView<'a> {
390 array: &'a StructArray,
391 idx: usize,
392}
393
394impl<'a> SpanEventView<'a> {
395 fn new(array: &'a StructArray, idx: usize) -> Self {
396 Self { array, idx }
397 }
398
399 pub fn timestamp(&self) -> DateTime<Utc> {
400 let timestamp_array = self
401 .array
402 .column_by_name("timestamp")
403 .and_then(|col| col.as_any().downcast_ref::<TimestampMicrosecondArray>())
404 .expect("timestamp should be TimestampMicrosecondArray");
405
406 let micros = timestamp_array.value(self.idx);
407 let secs = micros / 1_000_000;
408 let nanos = ((micros % 1_000_000) * 1_000) as u32;
409 Utc.timestamp_opt(secs, nanos).unwrap()
410 }
411
412 pub fn name(&self) -> &str {
413 let name_array = self
414 .array
415 .column_by_name("name")
416 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
417 .expect("name should be StringArray");
418 name_array.value(self.idx)
419 }
420
421 pub fn attributes(&self) -> Vec<Attribute> {
422 extract_attributes_from_map(self.array, self.idx, "attributes")
423 }
424
425 pub fn dropped_attributes_count(&self) -> u32 {
426 let count_array = self
427 .array
428 .column_by_name("dropped_attributes_count")
429 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
430 .expect("dropped_attributes_count should be UInt32Array");
431 count_array.value(self.idx)
432 }
433
434 fn into_event(self) -> SpanEvent {
435 SpanEvent {
436 timestamp: self.timestamp(),
437 name: self.name().to_string(),
438 attributes: self.attributes(),
439 dropped_attributes_count: self.dropped_attributes_count(),
440 }
441 }
442}
443
444pub struct SpanLinkView<'a> {
445 array: &'a StructArray,
446 idx: usize,
447}
448
449impl<'a> SpanLinkView<'a> {
450 fn new(array: &'a StructArray, idx: usize) -> Self {
451 Self { array, idx }
452 }
453
454 pub fn trace_id(&self) -> String {
455 let trace_id_array = self
456 .array
457 .column_by_name("trace_id")
458 .map(|col| col.as_fixed_size_binary())
459 .expect("trace_id should be FixedSizeBinaryArray");
460
461 let bytes = trace_id_array.value(self.idx);
462 let bytes_array: [u8; 16] = bytes.try_into().expect("trace_id should be 16 bytes");
463 TraceId::from_bytes(bytes_array).to_hex()
464 }
465
466 pub fn span_id(&self) -> String {
467 let span_id_array = self
468 .array
469 .column_by_name("span_id")
470 .map(|col| col.as_fixed_size_binary())
471 .expect("span_id should be FixedSizeBinaryArray");
472
473 let bytes = span_id_array.value(self.idx);
474 let bytes_array: [u8; 8] = bytes.try_into().expect("span_id should be 8 bytes");
475 SpanId::from_bytes(bytes_array).to_hex()
476 }
477
478 pub fn trace_state(&self) -> &str {
479 let trace_state_array = self
480 .array
481 .column_by_name("trace_state")
482 .map(|col| col.as_string::<i32>())
483 .expect("trace_state should be StringArray");
484
485 if trace_state_array.is_null(self.idx) {
486 ""
487 } else {
488 trace_state_array.value(self.idx)
489 }
490 }
491
492 pub fn attributes(&self) -> Vec<Attribute> {
493 extract_attributes_from_map(self.array, self.idx, "attributes")
494 }
495
496 pub fn dropped_attributes_count(&self) -> u32 {
497 let count_array = self
498 .array
499 .column_by_name("dropped_attributes_count")
500 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
501 .expect("dropped_attributes_count should be UInt32Array");
502 count_array.value(self.idx)
503 }
504
505 fn into_link(self) -> SpanLink {
506 SpanLink {
507 trace_id: self.trace_id(),
508 span_id: self.span_id(),
509 trace_state: self.trace_state().to_string(),
510 attributes: self.attributes(),
511 dropped_attributes_count: self.dropped_attributes_count(),
512 }
513 }
514}