Skip to main content

scouter_types/trace/
mod.rs

1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use pyo3::prelude::*;
16use pyo3::types::PyDict;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::fmt;
22
23pub const FUNCTION_TYPE: &str = "function.type";
24pub const FUNCTION_STREAMING: &str = "function.streaming";
25pub const FUNCTION_NAME: &str = "function.name";
26pub const FUNCTION_MODULE: &str = "function.module";
27pub const FUNCTION_QUALNAME: &str = "function.qualname";
28pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
29pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
30pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
31pub const SERVICE_NAME: &str = "service.name";
32pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
33pub const BAGGAGE_PREFIX: &str = "baggage";
34pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
35pub const SCOUTER_SCOPE: &str = "scouter.scope";
36pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
37pub const SPAN_ERROR: &str = "span.error";
38pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
39pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
40pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
41pub const SCOUTER_ENTITY: &str = "scouter.entity";
42
43// patterns for identifying baggage and tags
44pub const BAGGAGE_PATTERN: &str = "baggage.";
45pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
46pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
47
48type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
49
50#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq, Hash)]
51pub struct ScouterEntityAttribute {
52    pub uid: String,
53    pub r#type: String,
54    pub space: String,
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
58pub struct TraceId([u8; 16]);
59
60impl TraceId {
61    pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
62        let mut bytes = [0u8; 16];
63        hex::decode_to_slice(hex, &mut bytes)?;
64        Ok(Self(bytes))
65    }
66
67    pub fn from_bytes(bytes: [u8; 16]) -> Self {
68        Self(bytes)
69    }
70
71    pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
72        if slice.len() != 16 {
73            return Err(RecordError::SliceError(format!(
74                "Invalid trace_id length: expected 16 bytes, got {}",
75                slice.len()
76            )));
77        }
78        let mut bytes = [0u8; 16];
79        bytes.copy_from_slice(slice);
80        Ok(Self(bytes))
81    }
82
83    pub fn to_hex(&self) -> String {
84        hex::encode(self.0)
85    }
86
87    pub fn as_bytes(&self) -> &[u8; 16] {
88        &self.0
89    }
90
91    pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
92        let bytes = hex::decode(hex)?;
93        // validate length for trace_id (16 bytes) or span_id (8 bytes)
94        if bytes.len() == 16 {
95            Ok(bytes)
96        } else {
97            Err(RecordError::SliceError(format!(
98                "Invalid hex string length: expected 16 or 8 bytes, got {}",
99                bytes.len()
100            )))
101        }
102    }
103}
104
105impl fmt::Display for TraceId {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        write!(f, "{}", self.to_hex())
108    }
109}
110
111impl Serialize for TraceId {
112    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
113    where
114        S: serde::Serializer,
115    {
116        serializer.serialize_str(&self.to_hex())
117    }
118}
119
120impl<'de> Deserialize<'de> for TraceId {
121    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
122    where
123        D: serde::Deserializer<'de>,
124    {
125        let hex = String::deserialize(deserializer)?;
126        TraceId::from_hex(&hex).map_err(serde::de::Error::custom)
127    }
128}
129
130#[cfg(feature = "server")]
131impl<'r> sqlx::Decode<'r, sqlx::Postgres> for TraceId {
132    fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
133        let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
134        if bytes.len() != 16 {
135            return Err("TraceId must be exactly 16 bytes".into());
136        }
137        let mut array = [0u8; 16];
138        array.copy_from_slice(bytes);
139        Ok(TraceId(array))
140    }
141}
142
143#[cfg(feature = "server")]
144impl sqlx::Type<sqlx::Postgres> for TraceId {
145    fn type_info() -> sqlx::postgres::PgTypeInfo {
146        <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
147    }
148}
149
150#[cfg(feature = "server")]
151impl sqlx::Encode<'_, sqlx::Postgres> for TraceId {
152    fn encode_by_ref(
153        &self,
154        buf: &mut sqlx::postgres::PgArgumentBuffer,
155    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
156        <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
157    }
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
161pub struct SpanId([u8; 8]);
162
163impl SpanId {
164    pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
165        let mut bytes = [0u8; 8];
166        hex::decode_to_slice(hex, &mut bytes)?;
167        Ok(Self(bytes))
168    }
169
170    pub fn from_bytes(bytes: [u8; 8]) -> Self {
171        Self(bytes)
172    }
173
174    pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
175        if slice.len() != 8 {
176            return Err(RecordError::SliceError(format!(
177                "Invalid trace_id length: expected 8 bytes, got {}",
178                slice.len()
179            )));
180        }
181        let mut bytes = [0u8; 8];
182        bytes.copy_from_slice(slice);
183        Ok(Self(bytes))
184    }
185
186    pub fn to_hex(&self) -> String {
187        hex::encode(self.0)
188    }
189
190    pub fn as_bytes(&self) -> &[u8; 8] {
191        &self.0
192    }
193
194    pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
195        let bytes = hex::decode(hex)?;
196        if bytes.len() == 8 {
197            Ok(bytes)
198        } else {
199            Err(RecordError::SliceError(format!(
200                "Invalid hex string length: expected 16 or 8 bytes, got {}",
201                bytes.len()
202            )))
203        }
204    }
205}
206
207impl fmt::Display for SpanId {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        write!(f, "{}", self.to_hex())
210    }
211}
212
213impl Serialize for SpanId {
214    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
215    where
216        S: serde::Serializer,
217    {
218        serializer.serialize_str(&self.to_hex())
219    }
220}
221
222impl<'de> Deserialize<'de> for SpanId {
223    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
224    where
225        D: serde::Deserializer<'de>,
226    {
227        let hex = String::deserialize(deserializer)?;
228        SpanId::from_hex(&hex).map_err(serde::de::Error::custom)
229    }
230}
231
232#[cfg(feature = "server")]
233impl<'r> sqlx::Decode<'r, sqlx::Postgres> for SpanId {
234    fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
235        let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
236        if bytes.len() != 8 {
237            return Err("SpanId must be exactly 8 bytes".into());
238        }
239        let mut array = [0u8; 8];
240        array.copy_from_slice(bytes);
241        Ok(SpanId(array))
242    }
243}
244
245#[cfg(feature = "server")]
246impl sqlx::Type<sqlx::Postgres> for SpanId {
247    fn type_info() -> sqlx::postgres::PgTypeInfo {
248        <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
249    }
250}
251
252#[cfg(feature = "server")]
253impl sqlx::Encode<'_, sqlx::Postgres> for SpanId {
254    fn encode_by_ref(
255        &self,
256        buf: &mut sqlx::postgres::PgArgumentBuffer,
257    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
258        <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
259    }
260}
261
262#[derive(Clone, Debug, Serialize, Deserialize, Default)]
263#[pyclass]
264pub struct TraceRecord {
265    #[pyo3(get)]
266    pub created_at: DateTime<Utc>,
267    pub trace_id: TraceId,
268    #[pyo3(get)]
269    pub service_name: String,
270    #[pyo3(get)]
271    pub scope_name: String,
272    #[pyo3(get)]
273    pub scope_version: Option<String>,
274    #[pyo3(get)]
275    pub trace_state: String,
276    #[pyo3(get)]
277    pub start_time: chrono::DateTime<Utc>,
278    #[pyo3(get)]
279    pub end_time: chrono::DateTime<Utc>,
280    #[pyo3(get)]
281    pub duration_ms: i64,
282    #[pyo3(get)]
283    pub status_code: i32,
284    #[pyo3(get)]
285    pub status_message: String,
286    pub root_span_id: SpanId,
287    #[pyo3(get)]
288    pub span_count: i32,
289    #[pyo3(get)]
290    pub tags: Vec<Tag>,
291    #[pyo3(get)]
292    pub process_attributes: Vec<Attribute>,
293}
294
295#[pymethods]
296impl TraceRecord {
297    pub fn __str__(&self) -> String {
298        PyHelperFuncs::__str__(self)
299    }
300
301    #[getter]
302    pub fn get_trace_id(&self) -> String {
303        self.trace_id.to_hex()
304    }
305
306    #[getter]
307    pub fn get_root_span_id(&self) -> String {
308        self.root_span_id.to_hex()
309    }
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize, Default)]
313#[pyclass]
314pub struct TraceSpanRecord {
315    #[pyo3(get)]
316    pub created_at: chrono::DateTime<Utc>,
317
318    // core identifiers
319    pub trace_id: TraceId,
320    pub span_id: SpanId,
321    pub parent_span_id: Option<SpanId>,
322
323    // W3C Trace Context fields
324    #[pyo3(get)]
325    pub flags: i32,
326    #[pyo3(get)]
327    pub trace_state: String,
328
329    // instrumentation
330    #[pyo3(get)]
331    pub scope_name: String,
332    #[pyo3(get)]
333    pub scope_version: Option<String>,
334
335    // Span metadata
336    #[pyo3(get)]
337    pub span_name: String,
338    #[pyo3(get)]
339    pub span_kind: String,
340
341    // Temporal data
342    #[pyo3(get)]
343    pub start_time: chrono::DateTime<Utc>,
344    #[pyo3(get)]
345    pub end_time: chrono::DateTime<Utc>,
346    #[pyo3(get)]
347    pub duration_ms: i64,
348
349    // Status
350    #[pyo3(get)]
351    pub status_code: i32,
352    #[pyo3(get)]
353    pub status_message: String,
354
355    // Semi-structured data
356    #[pyo3(get)]
357    pub attributes: Vec<Attribute>,
358    #[pyo3(get)]
359    pub events: Vec<SpanEvent>,
360    #[pyo3(get)]
361    pub links: Vec<SpanLink>,
362
363    // Scouter-specific fields
364    #[pyo3(get)]
365    pub label: Option<String>,
366    pub input: Value,
367    pub output: Value,
368
369    // Service reference (denormalized for query performance)
370    #[pyo3(get)]
371    pub service_name: String,
372    #[pyo3(get)]
373    pub resource_attributes: Vec<Attribute>,
374}
375
376#[pymethods]
377impl TraceSpanRecord {
378    #[getter]
379    pub fn get_trace_id(&self) -> String {
380        self.trace_id.to_hex()
381    }
382
383    #[getter]
384    pub fn get_span_id(&self) -> String {
385        self.span_id.to_hex()
386    }
387
388    #[getter]
389    pub fn get_parent_span_id(&self) -> Option<String> {
390        self.parent_span_id.as_ref().map(|id| id.to_hex())
391    }
392
393    #[getter]
394    pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
395        let dict = PyDict::new(py);
396        match &self.input {
397            Value::Null => {}
398            _ => {
399                json_to_pyobject(py, &self.input, &dict)?;
400            }
401        }
402        Ok(dict)
403    }
404
405    #[getter]
406    pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
407        let dict = PyDict::new(py);
408        match &self.output {
409            Value::Null => {}
410            _ => {
411                json_to_pyobject(py, &self.output, &dict)?;
412            }
413        }
414        Ok(dict)
415    }
416
417    pub fn __str__(&self) -> String {
418        // serialize the struct to a string
419        PyHelperFuncs::__str__(self)
420    }
421}
422
423#[derive(Clone, Debug, Serialize, Deserialize, Default)]
424#[pyclass]
425#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
426pub struct TraceBaggageRecord {
427    #[pyo3(get)]
428    pub created_at: DateTime<Utc>,
429    pub trace_id: TraceId,
430    #[pyo3(get)]
431    pub scope: String,
432    #[pyo3(get)]
433    pub key: String,
434    #[pyo3(get)]
435    pub value: String,
436}
437
438#[pymethods]
439impl TraceBaggageRecord {
440    pub fn __str__(&self) -> String {
441        PyHelperFuncs::__str__(self)
442    }
443
444    #[getter]
445    pub fn get_trace_id(&self) -> String {
446        self.trace_id.to_hex()
447    }
448}
449
450pub type TraceRecords = (
451    Vec<TraceSpanRecord>,
452    Vec<TraceBaggageRecord>,
453    Vec<TagRecord>,
454);
455
456pub trait TraceRecordExt {
457    fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
458        Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
459    }
460
461    fn process_attributes(
462        trace_id: &TraceId,
463        span_attributes: &[KeyValue],
464        scope: &str,
465        created_at: DateTime<Utc>,
466    ) -> Result<SpanAttributes, RecordError> {
467        let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
468        let mut baggage_records = Vec::new();
469        let mut tags = Vec::new();
470        let scope_owned = scope.to_string();
471
472        for kv in span_attributes {
473            let key = &kv.key;
474
475            // Check if this is a baggage-prefixed tag
476            if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
477                if !tag_key.is_empty() {
478                    // tag values are stored as strings for tag table
479                    let string_value = match &kv.value {
480                        Some(v) => Self::otel_value_to_string(v),
481                        None => "null".to_string(),
482                    };
483
484                    // Extract as a tag
485                    tags.push(TagRecord::from_trace(
486                        trace_id,
487                        tag_key.to_string(),
488                        string_value.clone(),
489                    ));
490
491                    // Store cleaned attribute with stripped key
492                    cleaned_attributes.push(Attribute {
493                        key: tag_key.to_string(),
494                        value: Value::String(string_value.clone()),
495                    });
496
497                    // Also extract as baggage since it has baggage prefix
498                    baggage_records.push(TraceBaggageRecord {
499                        created_at,
500                        trace_id: *trace_id,
501                        scope: scope_owned.clone(),
502                        key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
503                        value: string_value,
504                    });
505                } else {
506                    tracing::warn!(
507                        attribute_key = %key,
508                        "Skipping baggage tag with empty key after prefix removal"
509                    );
510                }
511            }
512            // Check for non-baggage tags
513            else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
514                // tag values are stored as strings for tag table
515                if !tag_key.is_empty() {
516                    let string_value = match &kv.value {
517                        Some(v) => Self::otel_value_to_string(v),
518                        None => "null".to_string(),
519                    };
520
521                    tags.push(TagRecord::from_trace(
522                        trace_id,
523                        tag_key.to_string(),
524                        string_value.clone(),
525                    ));
526
527                    cleaned_attributes.push(Attribute {
528                        key: tag_key.to_string(),
529                        value: Value::String(string_value.clone()),
530                    });
531                } else {
532                    tracing::warn!(
533                        attribute_key = %key,
534                        "Skipping tag with empty key after prefix removal"
535                    );
536                }
537            }
538            // Check for regular baggage (not tags)
539            else if key.starts_with(BAGGAGE_PATTERN) {
540                let clean_key = key
541                    .strip_prefix(BAGGAGE_PATTERN)
542                    .unwrap_or(key)
543                    .trim()
544                    .to_string();
545
546                let string_value = match &kv.value {
547                    Some(v) => Self::otel_value_to_string(v),
548                    None => "null".to_string(),
549                };
550
551                baggage_records.push(TraceBaggageRecord {
552                    created_at,
553                    trace_id: *trace_id,
554                    scope: scope_owned.clone(),
555                    key: clean_key,
556                    value: string_value,
557                });
558            }
559            // Regular attribute
560            else {
561                let value = match &kv.value {
562                    Some(v) => otel_value_to_serde_value(v),
563                    None => Value::Null,
564                };
565
566                cleaned_attributes.push(Attribute {
567                    key: key.clone(),
568                    value,
569                });
570            }
571        }
572
573        Ok((cleaned_attributes, baggage_records, tags))
574    }
575
576    fn otel_value_to_string(value: &AnyValue) -> String {
577        match &value.value {
578            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
579                s.clone()
580            }
581            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
582                i.to_string()
583            }
584            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
585                d.to_string()
586            }
587            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
588                b.to_string()
589            }
590            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
591            | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
592                let serde_val = otel_value_to_serde_value(value);
593                serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
594            }
595            _ => "null".to_string(),
596        }
597    }
598
599    fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
600        attributes
601            .iter()
602            .map(|kv| {
603                let value = match &kv.value {
604                    Some(v) => otel_value_to_serde_value(v),
605                    None => Value::Null,
606                };
607
608                Ok(Attribute {
609                    key: kv.key.clone(),
610                    value,
611                })
612            })
613            .collect()
614    }
615
616    fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
617        attributes
618            .iter()
619            .map(|kv| {
620                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
621                Ok(SpanEvent {
622                    name: kv.name.clone(),
623                    timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
624                    attributes,
625                    dropped_attributes_count: kv.dropped_attributes_count,
626                })
627            })
628            .collect()
629    }
630
631    fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
632        attributes
633            .iter()
634            .map(|kv| {
635                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
636                Ok(SpanLink {
637                    trace_id: hex::encode(&kv.trace_id),
638                    span_id: hex::encode(&kv.span_id),
639                    trace_state: kv.trace_state.clone(),
640                    attributes,
641                    dropped_attributes_count: kv.dropped_attributes_count,
642                })
643            })
644            .collect()
645    }
646}
647
648#[derive(Clone, Debug, Serialize, Deserialize, Default)]
649pub struct TraceServerRecord {
650    pub request: ExportTraceServiceRequest,
651}
652
653impl TraceRecordExt for TraceServerRecord {}
654
655impl TraceServerRecord {
656    /// Extract InstrumentationScope name and version from ScopeSpan
657    fn get_scope_info(
658        scope_span: &opentelemetry_proto::tonic::trace::v1::ScopeSpans,
659    ) -> (String, Option<String>) {
660        let scope_name = scope_span
661            .scope
662            .as_ref()
663            .map(|s| s.name.clone())
664            .filter(|n| !n.is_empty())
665            .unwrap_or_else(|| "unknown".to_string());
666
667        let scope_version = scope_span.scope.as_ref().and_then(|s| {
668            if s.version.is_empty() {
669                None
670            } else {
671                Some(s.version.clone())
672            }
673        });
674
675        (scope_name, scope_version)
676    }
677
678    /// Safely convert OpenTelemetry timestamps to DateTime<Utc> and calculate duration
679    ///
680    /// # Arguments
681    /// * `start_time` - Start timestamp in nanoseconds since Unix epoch
682    /// * `end_time` - End timestamp in nanoseconds since Unix epoch
683    ///
684    /// # Returns
685    /// Tuple of (start_time, end_time, duration_ms) with proper error handling
686    fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
687        // Safe timestamp conversion with bounds checking
688        let start_dt = Self::safe_timestamp_conversion(start_time);
689        let end_dt = Self::safe_timestamp_conversion(end_time);
690
691        // Calculate duration with overflow protection
692        let duration_ms = if end_time >= start_time {
693            let duration_nanos = end_time.saturating_sub(start_time);
694            (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
695        } else {
696            tracing::warn!(
697                start_time = start_time,
698                end_time = end_time,
699                "Invalid timestamp order detected in trace span"
700            );
701            0
702        };
703
704        (start_dt, end_dt, duration_ms)
705    }
706
707    /// Safely convert u64 nanosecond timestamp to DateTime<Utc>
708    fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
709        if timestamp_nanos <= i64::MAX as u64 {
710            DateTime::from_timestamp_nanos(timestamp_nanos as i64)
711        } else {
712            let seconds = timestamp_nanos / 1_000_000_000;
713            let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
714
715            DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
716                tracing::warn!(
717                    timestamp = timestamp_nanos,
718                    seconds = seconds,
719                    nanoseconds = nanoseconds,
720                    "Failed to convert large timestamp, falling back to current time"
721                );
722                Utc::now()
723            })
724        }
725    }
726
727    /// Safely convert span kind i32 to string with proper error handling
728    fn span_kind_to_string(kind: i32) -> String {
729        SpanKind::try_from(kind)
730            .map(|sk| {
731                sk.as_str_name()
732                    .strip_prefix("SPAN_KIND_")
733                    .unwrap_or(sk.as_str_name())
734            })
735            .unwrap_or("UNSPECIFIED")
736            .to_string()
737    }
738
739    fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
740        let mut input = Value::Null;
741        let mut output = Value::Null;
742
743        for attr in attributes {
744            if attr.key == SCOUTER_TRACING_INPUT {
745                if let Value::String(s) = &attr.value {
746                    input = serde_json::from_str(s).unwrap_or_else(|e| {
747                        tracing::warn!(
748                            key = SCOUTER_TRACING_INPUT,
749                            error = %e,
750                            value = s,
751                            "Failed to parse input attribute as JSON, falling back to string value."
752                        );
753                        Value::String(s.clone()) // Or Value::Null
754                    });
755                }
756            } else if attr.key == SCOUTER_TRACING_OUTPUT {
757                if let Value::String(s) = &attr.value {
758                    output = serde_json::from_str(s)
759                        .unwrap_or_else(|e| {
760                            tracing::warn!(
761                                key = SCOUTER_TRACING_OUTPUT,
762                                error = %e,
763                                value = s,
764                                "Failed to parse output attribute as JSON, falling back to string value."
765                            );
766                            Value::String(s.clone()) // Or Value::Null
767                        });
768                }
769            }
770        }
771        (input, output)
772    }
773
774    fn get_service_name_from_resource(
775        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
776        default: &str,
777    ) -> String {
778        resource
779            .as_ref()
780            .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
781            .and_then(|attr| {
782                attr.value.as_ref().and_then(|v| {
783                    if let Some(
784                        opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
785                    ) = &v.value
786                    {
787                        Some(s.clone())
788                    } else {
789                        None
790                    }
791                })
792            })
793            .unwrap_or_else(|| {
794                tracing::warn!(
795                    "Service name not found in resource attributes, falling back to default: {}",
796                    default
797                );
798                default.to_string()
799            })
800    }
801
802    /// Filter and extract trace start time attribute from span attributes
803    /// This is a global scouter attribute that indicates the trace start time and is set across all spans
804    pub fn get_trace_start_time_attribute(
805        attributes: &Vec<Attribute>,
806        start_time: &DateTime<Utc>,
807    ) -> DateTime<Utc> {
808        for attr in attributes {
809            if attr.key == TRACE_START_TIME_KEY {
810                if let Value::String(s) = &attr.value {
811                    if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
812                        return dt;
813                    }
814                }
815            }
816        }
817
818        tracing::warn!(
819            "Trace start time attribute not found or invalid, falling back to span start_time"
820        );
821        *start_time
822    }
823
824    pub fn convert_to_baggage_records(
825        trace_id: &TraceId,
826        attributes: &Vec<Attribute>,
827        scope_name: &str,
828    ) -> Vec<TraceBaggageRecord> {
829        let baggage_kvs: Vec<(String, String)> = attributes
830            .iter()
831            .filter_map(|attr| {
832                // Only process attributes with baggage prefix
833                if attr.key.starts_with(BAGGAGE_PREFIX) {
834                    let clean_key = attr
835                        .key
836                        .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
837                        .map(|stripped| stripped.trim())
838                        .unwrap_or(&attr.key)
839                        .to_string();
840
841                    // Handle different value types from OpenTelemetry KeyValue
842                    let value_string = match &attr.value {
843                        Value::String(s) => s.clone(),
844                        Value::Number(n) => n.to_string(),
845                        Value::Bool(b) => b.to_string(),
846                        Value::Null => "null".to_string(),
847                        Value::Array(_) | Value::Object(_) => {
848                            // For complex types, use compact JSON representation
849                            serde_json::to_string(&attr.value)
850                                .unwrap_or_else(|_| format!("{:?}", attr.value))
851                        }
852                    };
853
854                    Some((clean_key, value_string))
855                } else {
856                    None
857                }
858            })
859            .collect();
860
861        baggage_kvs
862            .into_iter()
863            .map(|(key, value)| TraceBaggageRecord {
864                created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
865                trace_id: *trace_id,
866                scope: scope_name.to_string(),
867                key,
868                value,
869            })
870            .collect()
871    }
872
873    pub fn to_records(self) -> Result<TraceRecords, RecordError> {
874        let resource_spans = self.request.resource_spans;
875
876        // Pre-calculate capacity to avoid reallocations
877        let estimated_capacity: usize = resource_spans
878            .iter()
879            .map(|rs| {
880                rs.scope_spans
881                    .iter()
882                    .map(|ss| ss.spans.len())
883                    .sum::<usize>()
884            })
885            .sum();
886
887        let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
888        let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
889        let mut tags: HashSet<TagRecord> = HashSet::new();
890
891        for resource_span in resource_spans {
892            // process metadata only once per resource span
893            let service_name =
894                Self::get_service_name_from_resource(&resource_span.resource, "unknown");
895            let resource_attributes = Attribute::from_resources(&resource_span.resource);
896
897            for scope_span in &resource_span.scope_spans {
898                let (scope_name, scope_version) = Self::get_scope_info(scope_span);
899
900                for span in &scope_span.spans {
901                    // Core identifiers
902                    let trace_id = TraceId::from_slice(span.trace_id.as_slice())?;
903                    let span_id = SpanId::from_slice(span.span_id.as_slice())?;
904                    let parent_span_id = if !span.parent_span_id.is_empty() {
905                        Some(SpanId::from_slice(span.parent_span_id.as_slice())?)
906                    } else {
907                        None
908                    };
909
910                    let (start_time, end_time, duration_ms) =
911                        Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
912
913                    let (cleaned_attributes, span_baggage, span_tags) = Self::process_attributes(
914                        &trace_id,
915                        &span.attributes,
916                        &scope_name,
917                        start_time,
918                    )?;
919
920                    // Add to collections
921                    baggage_records.extend(span_baggage);
922                    tags.extend(span_tags);
923
924                    let (input, output) = Self::extract_input_output(&cleaned_attributes);
925
926                    // SpanRecord for insert
927                    span_records.push(TraceSpanRecord {
928                        created_at: start_time,
929                        trace_id,
930                        span_id,
931                        parent_span_id,
932                        flags: span.flags as i32,
933                        trace_state: span.trace_state.clone(),
934                        scope_name: scope_name.clone(),
935                        scope_version: scope_version.clone(),
936                        span_name: span.name.clone(),
937                        span_kind: Self::span_kind_to_string(span.kind),
938                        start_time,
939                        end_time,
940                        duration_ms,
941                        status_code: span.status.as_ref().map(|s| s.code).unwrap_or(0),
942                        status_message: span
943                            .status
944                            .as_ref()
945                            .map(|s| s.message.clone())
946                            .unwrap_or_default(),
947                        attributes: cleaned_attributes,
948                        events: Self::events_to_json_array(&span.events)?,
949                        links: Self::links_to_json_array(&span.links)?,
950                        label: None,
951                        input,
952                        output,
953                        service_name: service_name.clone(),
954                        resource_attributes: resource_attributes.clone(),
955                    });
956                }
957            }
958        }
959
960        let tag_records: Vec<TagRecord> = tags.into_iter().collect();
961        Ok((span_records, baggage_records, tag_records))
962    }
963}
964
965#[derive(Clone, Debug, Serialize, Deserialize, Default)]
966#[pyclass]
967pub struct Attribute {
968    #[pyo3(get)]
969    pub key: String,
970    pub value: Value,
971}
972
973#[pymethods]
974impl Attribute {
975    #[getter]
976    pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
977        Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
978    }
979
980    pub fn __str__(&self) -> String {
981        PyHelperFuncs::__str__(self)
982    }
983}
984
985impl Attribute {
986    pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
987        Attribute {
988            key,
989            value: otel_value_to_serde_value(value),
990        }
991    }
992
993    fn from_resources(
994        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
995    ) -> Vec<Attribute> {
996        match resource {
997            Some(res) => res
998                .attributes
999                .iter()
1000                .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
1001                .collect(),
1002            None => vec![],
1003        }
1004    }
1005}
1006
1007#[derive(Clone, Debug, Serialize, Deserialize)]
1008#[pyclass]
1009pub struct SpanEvent {
1010    #[pyo3(get)]
1011    pub timestamp: chrono::DateTime<Utc>,
1012    #[pyo3(get)]
1013    pub name: String,
1014    #[pyo3(get)]
1015    pub attributes: Vec<Attribute>,
1016    #[pyo3(get)]
1017    pub dropped_attributes_count: u32,
1018}
1019
1020#[pymethods]
1021impl SpanEvent {
1022    pub fn __str__(&self) -> String {
1023        PyHelperFuncs::__str__(self)
1024    }
1025}
1026
1027#[derive(Clone, Debug, Serialize, Deserialize)]
1028#[pyclass]
1029pub struct SpanLink {
1030    #[pyo3(get)]
1031    pub trace_id: String,
1032    #[pyo3(get)]
1033    pub span_id: String,
1034    #[pyo3(get)]
1035    pub trace_state: String,
1036    #[pyo3(get)]
1037    pub attributes: Vec<Attribute>,
1038    #[pyo3(get)]
1039    pub dropped_attributes_count: u32,
1040}
1041
1042#[pymethods]
1043impl SpanLink {
1044    pub fn __str__(&self) -> String {
1045        PyHelperFuncs::__str__(self)
1046    }
1047}
1048
1049#[derive(Clone, Debug, Serialize, Deserialize)]
1050#[pyclass]
1051pub struct Tag {
1052    #[pyo3(get)]
1053    pub key: String,
1054    #[pyo3(get)]
1055    pub value: String,
1056}
1057
1058#[pymethods]
1059impl Tag {
1060    pub fn __str__(&self) -> String {
1061        PyHelperFuncs::__str__(self)
1062    }
1063}
1064
1065#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
1066#[pyclass]
1067#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
1068pub struct TagRecord {
1069    #[pyo3(get)]
1070    pub entity_type: String,
1071    #[pyo3(get)]
1072    pub entity_id: String,
1073    #[pyo3(get)]
1074    pub key: String,
1075    #[pyo3(get)]
1076    pub value: String,
1077}
1078
1079impl TagRecord {
1080    /// Create a tag record from a TraceId
1081    pub fn from_trace(trace_id: &TraceId, key: String, value: String) -> Self {
1082        Self {
1083            entity_type: "trace".to_string(),
1084            entity_id: trace_id.to_hex(),
1085            key,
1086            value,
1087        }
1088    }
1089}
1090
1091#[pymethods]
1092impl TagRecord {
1093    pub fn __str__(&self) -> String {
1094        PyHelperFuncs::__str__(self)
1095    }
1096}
1097
1098/// Convert a flat list of `TraceSpanRecord`s into a tree-enriched list of `TraceSpan`s.
1099///
1100/// Groups records by `trace_id`, performs DFS to compute `depth`, `path`,
1101/// `root_span_id`, and `span_order` for each span.
1102pub fn build_trace_spans(records: Vec<TraceSpanRecord>) -> Vec<sql::TraceSpan> {
1103    if records.is_empty() {
1104        return Vec::new();
1105    }
1106
1107    // Group by trace_id (hex)
1108    let mut groups: HashMap<String, Vec<&TraceSpanRecord>> = HashMap::new();
1109    for record in &records {
1110        groups
1111            .entry(record.trace_id.to_hex())
1112            .or_default()
1113            .push(record);
1114    }
1115
1116    let mut all_spans = Vec::with_capacity(records.len());
1117    let mut global_order: i32 = 0;
1118
1119    for spans in groups.values() {
1120        // Build parent→children index (using span_id bytes as key)
1121        let mut children: HashMap<[u8; 8], Vec<usize>> = HashMap::new();
1122        let mut root_indices: Vec<usize> = Vec::new();
1123
1124        for (i, span) in spans.iter().enumerate() {
1125            if let Some(pid) = &span.parent_span_id {
1126                children.entry(*pid.as_bytes()).or_default().push(i);
1127            } else {
1128                root_indices.push(i);
1129            }
1130        }
1131
1132        // Sort roots by start_time for determinism
1133        root_indices.sort_by_key(|&i| spans[i].start_time);
1134
1135        // Determine root_span_id
1136        let root_span_id_hex = if let Some(&first_root) = root_indices.first() {
1137            spans[first_root].span_id.to_hex()
1138        } else {
1139            // All spans have parents (orphans) — use first span's span_id as fallback
1140            spans[0].span_id.to_hex()
1141        };
1142
1143        // DFS traversal (iterative)
1144        let pre_dfs_len = all_spans.len();
1145        dfs_assign_records(
1146            &root_indices,
1147            spans,
1148            &children,
1149            &root_span_id_hex,
1150            &mut all_spans,
1151            &mut global_order,
1152        );
1153
1154        // Attach orphan spans (parent not found in this trace group)
1155        // Only look at spans added by THIS group's DFS to avoid cross-group collisions
1156        let visited: HashSet<[u8; 8]> = all_spans[pre_dfs_len..]
1157            .iter()
1158            .filter_map(|s| {
1159                let bytes = SpanId::hex_to_bytes(&s.span_id).ok()?;
1160                let arr: [u8; 8] = bytes.try_into().ok()?;
1161                Some(arr)
1162            })
1163            .collect();
1164
1165        for span in spans {
1166            if !visited.contains(span.span_id.as_bytes()) {
1167                let span_id_hex = span.span_id.to_hex();
1168                all_spans.push(record_to_trace_span(
1169                    span,
1170                    &span_id_hex,
1171                    &root_span_id_hex,
1172                    0,
1173                    vec![span_id_hex.clone()],
1174                    global_order,
1175                ));
1176                global_order += 1;
1177            }
1178        }
1179    }
1180
1181    all_spans
1182}
1183
1184/// Iterative DFS traversal to assign depth, path, and span_order to trace spans.
1185fn dfs_assign_records(
1186    root_indices: &[usize],
1187    spans: &[&TraceSpanRecord],
1188    children: &HashMap<[u8; 8], Vec<usize>>,
1189    root_span_id_hex: &str,
1190    result: &mut Vec<sql::TraceSpan>,
1191    span_order: &mut i32,
1192) {
1193    // Stack entries: (span_index, depth, path_so_far)
1194    let mut stack: Vec<(usize, i32, Vec<String>)> = Vec::new();
1195    let mut visited: HashSet<usize> = HashSet::new();
1196
1197    // Push roots in reverse so the first root is processed first
1198    for &idx in root_indices.iter().rev() {
1199        stack.push((idx, 0, Vec::new()));
1200    }
1201
1202    while let Some((idx, depth, path_so_far)) = stack.pop() {
1203        if !visited.insert(idx) {
1204            continue; // cycle detected — skip
1205        }
1206        let span = spans[idx];
1207        let span_id_hex = span.span_id.to_hex();
1208
1209        let mut path = path_so_far;
1210        path.push(span_id_hex.clone());
1211
1212        result.push(record_to_trace_span(
1213            span,
1214            &span_id_hex,
1215            root_span_id_hex,
1216            depth,
1217            path.clone(),
1218            *span_order,
1219        ));
1220        *span_order += 1;
1221
1222        // Push children in reverse start_time order so earliest is processed first
1223        if let Some(child_indices) = children.get(span.span_id.as_bytes()) {
1224            let mut sorted = child_indices.clone();
1225            sorted.sort_by_key(|&i| spans[i].start_time);
1226            for &ci in sorted.iter().rev() {
1227                stack.push((ci, depth + 1, path.clone()));
1228            }
1229        }
1230    }
1231}
1232
1233fn record_to_trace_span(
1234    record: &TraceSpanRecord,
1235    span_id_hex: &str,
1236    root_span_id_hex: &str,
1237    depth: i32,
1238    path: Vec<String>,
1239    span_order: i32,
1240) -> sql::TraceSpan {
1241    let input = match &record.input {
1242        Value::Null => None,
1243        v => Some(v.clone()),
1244    };
1245    let output = match &record.output {
1246        Value::Null => None,
1247        v => Some(v.clone()),
1248    };
1249
1250    sql::TraceSpan {
1251        trace_id: record.trace_id.to_hex(),
1252        span_id: span_id_hex.to_string(),
1253        parent_span_id: record.parent_span_id.as_ref().map(|id| id.to_hex()),
1254        span_name: record.span_name.clone(),
1255        span_kind: Some(record.span_kind.clone()),
1256        start_time: record.start_time,
1257        end_time: record.end_time,
1258        duration_ms: record.duration_ms,
1259        status_code: record.status_code,
1260        status_message: Some(record.status_message.clone()),
1261        attributes: record.attributes.clone(),
1262        events: record.events.clone(),
1263        links: record.links.clone(),
1264        depth,
1265        path,
1266        root_span_id: root_span_id_hex.to_string(),
1267        service_name: record.service_name.clone(),
1268        span_order,
1269        input,
1270        output,
1271    }
1272}
1273
1274/// Lightweight trace summary record written to the Delta Lake `trace_summaries` table.
1275///
1276/// Produced by converting a `TraceAggregator` (in `scouter_sql`) after the in-memory
1277/// aggregation phase. Entity tags are written separately to Postgres and are not included here.
1278#[derive(Clone, Debug)]
1279pub struct TraceSummaryRecord {
1280    pub trace_id: TraceId,
1281    pub service_name: String,
1282    pub scope_name: String,
1283    pub scope_version: String,
1284    pub root_operation: String,
1285    pub start_time: DateTime<Utc>,
1286    pub end_time: Option<DateTime<Utc>>,
1287    pub status_code: i32,
1288    pub status_message: String,
1289    pub span_count: i64,
1290    pub error_count: i64,
1291    pub resource_attributes: Vec<Attribute>,
1292    /// Entity UIDs associated with this trace (from `scouter.entity` attributes).
1293    pub entity_ids: Vec<String>,
1294    /// Queue record UIDs associated with this trace (from `scouter.queue.record` attributes).
1295    pub queue_ids: Vec<String>,
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300    use super::*;
1301
1302    fn make_span_record(
1303        trace_id: [u8; 16],
1304        span_id: [u8; 8],
1305        parent_span_id: Option<[u8; 8]>,
1306        name: &str,
1307        start_ms: i64,
1308    ) -> TraceSpanRecord {
1309        TraceSpanRecord {
1310            trace_id: TraceId::from_bytes(trace_id),
1311            span_id: SpanId::from_bytes(span_id),
1312            parent_span_id: parent_span_id.map(SpanId::from_bytes),
1313            span_name: name.to_string(),
1314            start_time: chrono::DateTime::from_timestamp_millis(start_ms).unwrap(),
1315            end_time: chrono::DateTime::from_timestamp_millis(start_ms + 100).unwrap(),
1316            duration_ms: 100,
1317            ..Default::default()
1318        }
1319    }
1320
1321    #[test]
1322    fn build_trace_spans_empty() {
1323        let result = build_trace_spans(vec![]);
1324        assert!(result.is_empty());
1325    }
1326
1327    #[test]
1328    fn build_trace_spans_simple_tree() {
1329        let tid = [0u8; 16];
1330        let root_sid = [1u8; 8];
1331        let child_sid = [2u8; 8];
1332
1333        let records = vec![
1334            make_span_record(tid, root_sid, None, "root", 1000),
1335            make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1336        ];
1337
1338        let spans = build_trace_spans(records);
1339        assert_eq!(spans.len(), 2);
1340
1341        // Root span
1342        let root = spans.iter().find(|s| s.span_name == "root").unwrap();
1343        assert_eq!(root.depth, 0);
1344        assert_eq!(root.span_order, 0);
1345        assert!(root.parent_span_id.is_none());
1346        assert_eq!(root.path.len(), 1);
1347
1348        // Child span
1349        let child = spans.iter().find(|s| s.span_name == "child").unwrap();
1350        assert_eq!(child.depth, 1);
1351        assert_eq!(child.span_order, 1);
1352        assert!(child.parent_span_id.is_some());
1353        assert_eq!(child.path.len(), 2);
1354        assert_eq!(child.root_span_id, root.span_id);
1355    }
1356
1357    #[test]
1358    fn build_trace_spans_orphan_spans() {
1359        let tid = [0u8; 16];
1360        let orphan_sid = [3u8; 8];
1361        // Parent doesn't exist in the batch
1362        let missing_parent = [99u8; 8];
1363
1364        let records = vec![make_span_record(
1365            tid,
1366            orphan_sid,
1367            Some(missing_parent),
1368            "orphan",
1369            1000,
1370        )];
1371
1372        let spans = build_trace_spans(records);
1373        assert_eq!(spans.len(), 1);
1374
1375        let orphan = &spans[0];
1376        assert_eq!(orphan.span_name, "orphan");
1377        assert_eq!(orphan.depth, 0);
1378    }
1379
1380    #[test]
1381    fn build_trace_spans_multiple_traces() {
1382        let tid1 = [1u8; 16];
1383        let tid2 = [2u8; 16];
1384
1385        let records = vec![
1386            make_span_record(tid1, [10u8; 8], None, "trace1_root", 1000),
1387            make_span_record(tid2, [20u8; 8], None, "trace2_root", 2000),
1388            make_span_record(tid1, [11u8; 8], Some([10u8; 8]), "trace1_child", 1050),
1389        ];
1390
1391        let spans = build_trace_spans(records);
1392        assert_eq!(spans.len(), 3);
1393
1394        // Check that trace_ids are correct
1395        let t1_spans: Vec<_> = spans
1396            .iter()
1397            .filter(|s| s.trace_id == TraceId::from_bytes(tid1).to_hex())
1398            .collect();
1399        assert_eq!(t1_spans.len(), 2);
1400
1401        let t2_spans: Vec<_> = spans
1402            .iter()
1403            .filter(|s| s.trace_id == TraceId::from_bytes(tid2).to_hex())
1404            .collect();
1405        assert_eq!(t2_spans.len(), 1);
1406    }
1407
1408    #[test]
1409    fn build_trace_spans_deep_tree() {
1410        let tid = [0u8; 16];
1411        let root_sid = [1u8; 8];
1412        let child_sid = [2u8; 8];
1413        let grandchild_sid = [3u8; 8];
1414
1415        let records = vec![
1416            make_span_record(tid, root_sid, None, "root", 1000),
1417            make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1418            make_span_record(tid, grandchild_sid, Some(child_sid), "grandchild", 1100),
1419        ];
1420
1421        let spans = build_trace_spans(records);
1422        assert_eq!(spans.len(), 3);
1423
1424        let grandchild = spans.iter().find(|s| s.span_name == "grandchild").unwrap();
1425        assert_eq!(grandchild.depth, 2);
1426        assert_eq!(grandchild.path.len(), 3); // root → child → grandchild
1427    }
1428
1429    #[test]
1430    fn build_trace_spans_cross_group_collision() {
1431        // Two different traces where spans happen to share the same span_id bytes.
1432        // The visited set must be scoped per group to avoid cross-group collisions.
1433        let tid1 = [1u8; 16];
1434        let tid2 = [2u8; 16];
1435        let shared_sid = [42u8; 8]; // Same span_id in both traces
1436
1437        let records = vec![
1438            make_span_record(tid1, shared_sid, None, "trace1_root", 1000),
1439            make_span_record(tid2, shared_sid, None, "trace2_root", 2000),
1440        ];
1441
1442        let spans = build_trace_spans(records);
1443        // Both spans must appear — the cross-group visited set bug would drop the second
1444        assert_eq!(spans.len(), 2);
1445
1446        let names: HashSet<&str> = spans.iter().map(|s| s.span_name.as_str()).collect();
1447        assert!(names.contains("trace1_root"));
1448        assert!(names.contains("trace2_root"));
1449    }
1450
1451    #[test]
1452    fn build_trace_spans_input_output_mapping() {
1453        let tid = [0u8; 16];
1454        let records = vec![TraceSpanRecord {
1455            trace_id: TraceId::from_bytes(tid),
1456            span_id: SpanId::from_bytes([1u8; 8]),
1457            parent_span_id: None,
1458            span_name: "test".to_string(),
1459            input: serde_json::json!({"key": "value"}),
1460            output: Value::Null,
1461            ..Default::default()
1462        }];
1463
1464        let spans = build_trace_spans(records);
1465        assert_eq!(spans.len(), 1);
1466        assert!(spans[0].input.is_some());
1467        assert!(spans[0].output.is_none()); // Null → None
1468    }
1469
1470    #[test]
1471    fn build_trace_spans_cycle_does_not_loop() {
1472        // Construct a cycle: A → B → A (via parent_span_id pointing back).
1473        // The DFS visited guard must prevent infinite traversal.
1474        let tid = [0u8; 16];
1475        let span_a = [1u8; 8];
1476        let span_b = [2u8; 8];
1477
1478        // A is the root (no parent), B claims A as parent,
1479        // but we also add A as a child of B in the children map by
1480        // making A's parent_span_id point to B. This creates a cycle.
1481        let records = vec![
1482            make_span_record(tid, span_a, Some(span_b), "A", 1000),
1483            make_span_record(tid, span_b, Some(span_a), "B", 1050),
1484        ];
1485
1486        // Both are orphans (neither has a true root), so both become synthetic roots.
1487        // The key test: this terminates without hanging.
1488        let spans = build_trace_spans(records);
1489        assert_eq!(spans.len(), 2, "Both spans should appear exactly once");
1490    }
1491}