Skip to main content

scouter_types/trace/
mod.rs

1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use pyo3::prelude::*;
16use pyo3::types::PyDict;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::fmt;
22
23pub const FUNCTION_TYPE: &str = "function.type";
24pub const FUNCTION_STREAMING: &str = "function.streaming";
25pub const FUNCTION_NAME: &str = "function.name";
26pub const FUNCTION_MODULE: &str = "function.module";
27pub const FUNCTION_QUALNAME: &str = "function.qualname";
28pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
29pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
30pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
31pub const SERVICE_NAME: &str = "service.name";
32pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
33pub const BAGGAGE_PREFIX: &str = "baggage";
34pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
35pub const SCOUTER_SCOPE: &str = "scouter.scope";
36pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
37pub const SPAN_ERROR: &str = "span.error";
38pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
39pub const SCOUTER_EVAL_SCENARIO_ID_ATTR: &str = "scouter.eval.scenario_id";
40pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
41pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
42pub const SCOUTER_ENTITY: &str = "scouter.entity";
43
44// patterns for identifying baggage and tags
45pub const BAGGAGE_PATTERN: &str = "baggage.";
46pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
47pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
48
49type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
50
51#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq, Hash)]
52pub struct ScouterEntityAttribute {
53    pub uid: String,
54    pub r#type: String,
55    pub space: String,
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
59pub struct TraceId([u8; 16]);
60
61impl TraceId {
62    pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
63        let mut bytes = [0u8; 16];
64        hex::decode_to_slice(hex, &mut bytes)?;
65        Ok(Self(bytes))
66    }
67
68    pub fn from_bytes(bytes: [u8; 16]) -> Self {
69        Self(bytes)
70    }
71
72    pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
73        if slice.len() != 16 {
74            return Err(RecordError::SliceError(format!(
75                "Invalid trace_id length: expected 16 bytes, got {}",
76                slice.len()
77            )));
78        }
79        let mut bytes = [0u8; 16];
80        bytes.copy_from_slice(slice);
81        Ok(Self(bytes))
82    }
83
84    pub fn to_hex(&self) -> String {
85        hex::encode(self.0)
86    }
87
88    pub fn as_bytes(&self) -> &[u8; 16] {
89        &self.0
90    }
91
92    pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
93        let bytes = hex::decode(hex)?;
94        // validate length for trace_id (16 bytes) or span_id (8 bytes)
95        if bytes.len() == 16 {
96            Ok(bytes)
97        } else {
98            Err(RecordError::SliceError(format!(
99                "Invalid hex string length: expected 16 or 8 bytes, got {}",
100                bytes.len()
101            )))
102        }
103    }
104}
105
106impl fmt::Display for TraceId {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        write!(f, "{}", self.to_hex())
109    }
110}
111
112impl Serialize for TraceId {
113    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
114    where
115        S: serde::Serializer,
116    {
117        serializer.serialize_str(&self.to_hex())
118    }
119}
120
121impl<'de> Deserialize<'de> for TraceId {
122    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
123    where
124        D: serde::Deserializer<'de>,
125    {
126        let hex = String::deserialize(deserializer)?;
127        TraceId::from_hex(&hex).map_err(serde::de::Error::custom)
128    }
129}
130
131#[cfg(feature = "server")]
132impl<'r> sqlx::Decode<'r, sqlx::Postgres> for TraceId {
133    fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
134        let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
135        if bytes.len() != 16 {
136            return Err("TraceId must be exactly 16 bytes".into());
137        }
138        let mut array = [0u8; 16];
139        array.copy_from_slice(bytes);
140        Ok(TraceId(array))
141    }
142}
143
144#[cfg(feature = "server")]
145impl sqlx::Type<sqlx::Postgres> for TraceId {
146    fn type_info() -> sqlx::postgres::PgTypeInfo {
147        <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
148    }
149}
150
151#[cfg(feature = "server")]
152impl sqlx::Encode<'_, sqlx::Postgres> for TraceId {
153    fn encode_by_ref(
154        &self,
155        buf: &mut sqlx::postgres::PgArgumentBuffer,
156    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
157        <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
158    }
159}
160
161#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
162pub struct SpanId([u8; 8]);
163
164impl SpanId {
165    pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
166        let mut bytes = [0u8; 8];
167        hex::decode_to_slice(hex, &mut bytes)?;
168        Ok(Self(bytes))
169    }
170
171    pub fn from_bytes(bytes: [u8; 8]) -> Self {
172        Self(bytes)
173    }
174
175    pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
176        if slice.len() != 8 {
177            return Err(RecordError::SliceError(format!(
178                "Invalid trace_id length: expected 8 bytes, got {}",
179                slice.len()
180            )));
181        }
182        let mut bytes = [0u8; 8];
183        bytes.copy_from_slice(slice);
184        Ok(Self(bytes))
185    }
186
187    pub fn to_hex(&self) -> String {
188        hex::encode(self.0)
189    }
190
191    pub fn as_bytes(&self) -> &[u8; 8] {
192        &self.0
193    }
194
195    pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
196        let bytes = hex::decode(hex)?;
197        if bytes.len() == 8 {
198            Ok(bytes)
199        } else {
200            Err(RecordError::SliceError(format!(
201                "Invalid hex string length: expected 16 or 8 bytes, got {}",
202                bytes.len()
203            )))
204        }
205    }
206}
207
208impl fmt::Display for SpanId {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        write!(f, "{}", self.to_hex())
211    }
212}
213
214impl Serialize for SpanId {
215    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
216    where
217        S: serde::Serializer,
218    {
219        serializer.serialize_str(&self.to_hex())
220    }
221}
222
223impl<'de> Deserialize<'de> for SpanId {
224    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
225    where
226        D: serde::Deserializer<'de>,
227    {
228        let hex = String::deserialize(deserializer)?;
229        SpanId::from_hex(&hex).map_err(serde::de::Error::custom)
230    }
231}
232
233#[cfg(feature = "server")]
234impl<'r> sqlx::Decode<'r, sqlx::Postgres> for SpanId {
235    fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
236        let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
237        if bytes.len() != 8 {
238            return Err("SpanId must be exactly 8 bytes".into());
239        }
240        let mut array = [0u8; 8];
241        array.copy_from_slice(bytes);
242        Ok(SpanId(array))
243    }
244}
245
246#[cfg(feature = "server")]
247impl sqlx::Type<sqlx::Postgres> for SpanId {
248    fn type_info() -> sqlx::postgres::PgTypeInfo {
249        <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
250    }
251}
252
253#[cfg(feature = "server")]
254impl sqlx::Encode<'_, sqlx::Postgres> for SpanId {
255    fn encode_by_ref(
256        &self,
257        buf: &mut sqlx::postgres::PgArgumentBuffer,
258    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
259        <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
260    }
261}
262
263#[derive(Clone, Debug, Serialize, Deserialize, Default)]
264#[pyclass]
265pub struct TraceRecord {
266    #[pyo3(get)]
267    pub created_at: DateTime<Utc>,
268    pub trace_id: TraceId,
269    #[pyo3(get)]
270    pub service_name: String,
271    #[pyo3(get)]
272    pub scope_name: String,
273    #[pyo3(get)]
274    pub scope_version: Option<String>,
275    #[pyo3(get)]
276    pub trace_state: String,
277    #[pyo3(get)]
278    pub start_time: chrono::DateTime<Utc>,
279    #[pyo3(get)]
280    pub end_time: chrono::DateTime<Utc>,
281    #[pyo3(get)]
282    pub duration_ms: i64,
283    #[pyo3(get)]
284    pub status_code: i32,
285    #[pyo3(get)]
286    pub status_message: String,
287    pub root_span_id: SpanId,
288    #[pyo3(get)]
289    pub span_count: i32,
290    #[pyo3(get)]
291    pub tags: Vec<Tag>,
292    #[pyo3(get)]
293    pub process_attributes: Vec<Attribute>,
294}
295
296#[pymethods]
297impl TraceRecord {
298    pub fn __str__(&self) -> String {
299        PyHelperFuncs::__str__(self)
300    }
301
302    #[getter]
303    pub fn get_trace_id(&self) -> String {
304        self.trace_id.to_hex()
305    }
306
307    #[getter]
308    pub fn get_root_span_id(&self) -> String {
309        self.root_span_id.to_hex()
310    }
311}
312
313#[derive(Clone, Debug, Serialize, Deserialize, Default)]
314#[pyclass]
315pub struct TraceSpanRecord {
316    #[pyo3(get)]
317    pub created_at: chrono::DateTime<Utc>,
318
319    // core identifiers
320    pub trace_id: TraceId,
321    pub span_id: SpanId,
322    pub parent_span_id: Option<SpanId>,
323
324    // W3C Trace Context fields
325    #[pyo3(get)]
326    pub flags: i32,
327    #[pyo3(get)]
328    pub trace_state: String,
329
330    // instrumentation
331    #[pyo3(get)]
332    pub scope_name: String,
333    #[pyo3(get)]
334    pub scope_version: Option<String>,
335
336    // Span metadata
337    #[pyo3(get)]
338    pub span_name: String,
339    #[pyo3(get)]
340    pub span_kind: String,
341
342    // Temporal data
343    #[pyo3(get)]
344    pub start_time: chrono::DateTime<Utc>,
345    #[pyo3(get)]
346    pub end_time: chrono::DateTime<Utc>,
347    #[pyo3(get)]
348    pub duration_ms: i64,
349
350    // Status
351    #[pyo3(get)]
352    pub status_code: i32,
353    #[pyo3(get)]
354    pub status_message: String,
355
356    // Semi-structured data
357    #[pyo3(get)]
358    pub attributes: Vec<Attribute>,
359    #[pyo3(get)]
360    pub events: Vec<SpanEvent>,
361    #[pyo3(get)]
362    pub links: Vec<SpanLink>,
363
364    // Scouter-specific fields
365    #[pyo3(get)]
366    pub label: Option<String>,
367    pub input: Value,
368    pub output: Value,
369
370    // Service reference (denormalized for query performance)
371    #[pyo3(get)]
372    pub service_name: String,
373    #[pyo3(get)]
374    pub resource_attributes: Vec<Attribute>,
375}
376
377#[pymethods]
378impl TraceSpanRecord {
379    #[getter]
380    pub fn get_trace_id(&self) -> String {
381        self.trace_id.to_hex()
382    }
383
384    #[getter]
385    pub fn get_span_id(&self) -> String {
386        self.span_id.to_hex()
387    }
388
389    #[getter]
390    pub fn get_parent_span_id(&self) -> Option<String> {
391        self.parent_span_id.as_ref().map(|id| id.to_hex())
392    }
393
394    #[getter]
395    pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
396        let dict = PyDict::new(py);
397        match &self.input {
398            Value::Null => {}
399            _ => {
400                json_to_pyobject(py, &self.input, &dict)?;
401            }
402        }
403        Ok(dict)
404    }
405
406    #[getter]
407    pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
408        let dict = PyDict::new(py);
409        match &self.output {
410            Value::Null => {}
411            _ => {
412                json_to_pyobject(py, &self.output, &dict)?;
413            }
414        }
415        Ok(dict)
416    }
417
418    pub fn __str__(&self) -> String {
419        // serialize the struct to a string
420        PyHelperFuncs::__str__(self)
421    }
422}
423
424#[derive(Clone, Debug, Serialize, Deserialize, Default)]
425#[pyclass]
426#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
427pub struct TraceBaggageRecord {
428    #[pyo3(get)]
429    pub created_at: DateTime<Utc>,
430    pub trace_id: TraceId,
431    #[pyo3(get)]
432    pub scope: String,
433    #[pyo3(get)]
434    pub key: String,
435    #[pyo3(get)]
436    pub value: String,
437}
438
439#[pymethods]
440impl TraceBaggageRecord {
441    pub fn __str__(&self) -> String {
442        PyHelperFuncs::__str__(self)
443    }
444
445    #[getter]
446    pub fn get_trace_id(&self) -> String {
447        self.trace_id.to_hex()
448    }
449}
450
451pub type TraceRecords = (
452    Vec<TraceSpanRecord>,
453    Vec<TraceBaggageRecord>,
454    Vec<TagRecord>,
455);
456
457pub trait TraceRecordExt {
458    fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
459        Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
460    }
461
462    fn process_attributes(
463        trace_id: &TraceId,
464        span_attributes: &[KeyValue],
465        scope: &str,
466        created_at: DateTime<Utc>,
467    ) -> Result<SpanAttributes, RecordError> {
468        let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
469        let mut baggage_records = Vec::new();
470        let mut tags = Vec::new();
471        let scope_owned = scope.to_string();
472
473        for kv in span_attributes {
474            let key = &kv.key;
475
476            // Check if this is a baggage-prefixed tag
477            if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
478                if !tag_key.is_empty() {
479                    // tag values are stored as strings for tag table
480                    let string_value = match &kv.value {
481                        Some(v) => Self::otel_value_to_string(v),
482                        None => "null".to_string(),
483                    };
484
485                    // Extract as a tag
486                    tags.push(TagRecord::from_trace(
487                        trace_id,
488                        tag_key.to_string(),
489                        string_value.clone(),
490                    ));
491
492                    // Store cleaned attribute with stripped key
493                    cleaned_attributes.push(Attribute {
494                        key: tag_key.to_string(),
495                        value: Value::String(string_value.clone()),
496                    });
497
498                    // Also extract as baggage since it has baggage prefix
499                    baggage_records.push(TraceBaggageRecord {
500                        created_at,
501                        trace_id: *trace_id,
502                        scope: scope_owned.clone(),
503                        key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
504                        value: string_value,
505                    });
506                } else {
507                    tracing::warn!(
508                        attribute_key = %key,
509                        "Skipping baggage tag with empty key after prefix removal"
510                    );
511                }
512            }
513            // Check for non-baggage tags
514            else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
515                // tag values are stored as strings for tag table
516                if !tag_key.is_empty() {
517                    let string_value = match &kv.value {
518                        Some(v) => Self::otel_value_to_string(v),
519                        None => "null".to_string(),
520                    };
521
522                    tags.push(TagRecord::from_trace(
523                        trace_id,
524                        tag_key.to_string(),
525                        string_value.clone(),
526                    ));
527
528                    cleaned_attributes.push(Attribute {
529                        key: tag_key.to_string(),
530                        value: Value::String(string_value.clone()),
531                    });
532                } else {
533                    tracing::warn!(
534                        attribute_key = %key,
535                        "Skipping tag with empty key after prefix removal"
536                    );
537                }
538            }
539            // Check for regular baggage (not tags)
540            else if key.starts_with(BAGGAGE_PATTERN) {
541                let clean_key = key
542                    .strip_prefix(BAGGAGE_PATTERN)
543                    .unwrap_or(key)
544                    .trim()
545                    .to_string();
546
547                let string_value = match &kv.value {
548                    Some(v) => Self::otel_value_to_string(v),
549                    None => "null".to_string(),
550                };
551
552                baggage_records.push(TraceBaggageRecord {
553                    created_at,
554                    trace_id: *trace_id,
555                    scope: scope_owned.clone(),
556                    key: clean_key,
557                    value: string_value,
558                });
559            }
560            // Regular attribute
561            else {
562                let value = match &kv.value {
563                    Some(v) => otel_value_to_serde_value(v),
564                    None => Value::Null,
565                };
566
567                cleaned_attributes.push(Attribute {
568                    key: key.clone(),
569                    value,
570                });
571            }
572        }
573
574        Ok((cleaned_attributes, baggage_records, tags))
575    }
576
577    fn otel_value_to_string(value: &AnyValue) -> String {
578        match &value.value {
579            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
580                s.clone()
581            }
582            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
583                i.to_string()
584            }
585            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
586                d.to_string()
587            }
588            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
589                b.to_string()
590            }
591            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
592            | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
593                let serde_val = otel_value_to_serde_value(value);
594                serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
595            }
596            _ => "null".to_string(),
597        }
598    }
599
600    fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
601        attributes
602            .iter()
603            .map(|kv| {
604                let value = match &kv.value {
605                    Some(v) => otel_value_to_serde_value(v),
606                    None => Value::Null,
607                };
608
609                Ok(Attribute {
610                    key: kv.key.clone(),
611                    value,
612                })
613            })
614            .collect()
615    }
616
617    fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
618        attributes
619            .iter()
620            .map(|kv| {
621                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
622                Ok(SpanEvent {
623                    name: kv.name.clone(),
624                    timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
625                    attributes,
626                    dropped_attributes_count: kv.dropped_attributes_count,
627                })
628            })
629            .collect()
630    }
631
632    fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
633        attributes
634            .iter()
635            .map(|kv| {
636                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
637                Ok(SpanLink {
638                    trace_id: hex::encode(&kv.trace_id),
639                    span_id: hex::encode(&kv.span_id),
640                    trace_state: kv.trace_state.clone(),
641                    attributes,
642                    dropped_attributes_count: kv.dropped_attributes_count,
643                })
644            })
645            .collect()
646    }
647}
648
649#[derive(Clone, Debug, Serialize, Deserialize, Default)]
650pub struct TraceServerRecord {
651    pub request: ExportTraceServiceRequest,
652}
653
654impl TraceRecordExt for TraceServerRecord {}
655
656impl TraceServerRecord {
657    /// Extract InstrumentationScope name and version from ScopeSpan
658    fn get_scope_info(
659        scope_span: &opentelemetry_proto::tonic::trace::v1::ScopeSpans,
660    ) -> (String, Option<String>) {
661        let scope_name = scope_span
662            .scope
663            .as_ref()
664            .map(|s| s.name.clone())
665            .filter(|n| !n.is_empty())
666            .unwrap_or_else(|| "unknown".to_string());
667
668        let scope_version = scope_span.scope.as_ref().and_then(|s| {
669            if s.version.is_empty() {
670                None
671            } else {
672                Some(s.version.clone())
673            }
674        });
675
676        (scope_name, scope_version)
677    }
678
679    /// Safely convert OpenTelemetry timestamps to DateTime<Utc> and calculate duration
680    ///
681    /// # Arguments
682    /// * `start_time` - Start timestamp in nanoseconds since Unix epoch
683    /// * `end_time` - End timestamp in nanoseconds since Unix epoch
684    ///
685    /// # Returns
686    /// Tuple of (start_time, end_time, duration_ms) with proper error handling
687    fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
688        // Safe timestamp conversion with bounds checking
689        let start_dt = Self::safe_timestamp_conversion(start_time);
690        let end_dt = Self::safe_timestamp_conversion(end_time);
691
692        // Calculate duration with overflow protection
693        let duration_ms = if end_time >= start_time {
694            let duration_nanos = end_time.saturating_sub(start_time);
695            (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
696        } else {
697            tracing::warn!(
698                start_time = start_time,
699                end_time = end_time,
700                "Invalid timestamp order detected in trace span"
701            );
702            0
703        };
704
705        (start_dt, end_dt, duration_ms)
706    }
707
708    /// Safely convert u64 nanosecond timestamp to DateTime<Utc>
709    fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
710        if timestamp_nanos <= i64::MAX as u64 {
711            DateTime::from_timestamp_nanos(timestamp_nanos as i64)
712        } else {
713            let seconds = timestamp_nanos / 1_000_000_000;
714            let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
715
716            DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
717                tracing::warn!(
718                    timestamp = timestamp_nanos,
719                    seconds = seconds,
720                    nanoseconds = nanoseconds,
721                    "Failed to convert large timestamp, falling back to current time"
722                );
723                Utc::now()
724            })
725        }
726    }
727
728    /// Safely convert span kind i32 to string with proper error handling
729    fn span_kind_to_string(kind: i32) -> String {
730        SpanKind::try_from(kind)
731            .map(|sk| {
732                sk.as_str_name()
733                    .strip_prefix("SPAN_KIND_")
734                    .unwrap_or(sk.as_str_name())
735            })
736            .unwrap_or("UNSPECIFIED")
737            .to_string()
738    }
739
740    fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
741        let mut input = Value::Null;
742        let mut output = Value::Null;
743
744        for attr in attributes {
745            if attr.key == SCOUTER_TRACING_INPUT {
746                if let Value::String(s) = &attr.value {
747                    input = serde_json::from_str(s).unwrap_or_else(|e| {
748                        tracing::warn!(
749                            key = SCOUTER_TRACING_INPUT,
750                            error = %e,
751                            value = s,
752                            "Failed to parse input attribute as JSON, falling back to string value."
753                        );
754                        Value::String(s.clone()) // Or Value::Null
755                    });
756                }
757            } else if attr.key == SCOUTER_TRACING_OUTPUT {
758                if let Value::String(s) = &attr.value {
759                    output = serde_json::from_str(s)
760                        .unwrap_or_else(|e| {
761                            tracing::warn!(
762                                key = SCOUTER_TRACING_OUTPUT,
763                                error = %e,
764                                value = s,
765                                "Failed to parse output attribute as JSON, falling back to string value."
766                            );
767                            Value::String(s.clone()) // Or Value::Null
768                        });
769                }
770            }
771        }
772        (input, output)
773    }
774
775    fn get_service_name_from_resource(
776        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
777        default: &str,
778    ) -> String {
779        resource
780            .as_ref()
781            .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
782            .and_then(|attr| {
783                attr.value.as_ref().and_then(|v| {
784                    if let Some(
785                        opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
786                    ) = &v.value
787                    {
788                        Some(s.clone())
789                    } else {
790                        None
791                    }
792                })
793            })
794            .unwrap_or_else(|| {
795                tracing::warn!(
796                    "Service name not found in resource attributes, falling back to default: {}",
797                    default
798                );
799                default.to_string()
800            })
801    }
802
803    /// Filter and extract trace start time attribute from span attributes
804    /// This is a global scouter attribute that indicates the trace start time and is set across all spans
805    pub fn get_trace_start_time_attribute(
806        attributes: &Vec<Attribute>,
807        start_time: &DateTime<Utc>,
808    ) -> DateTime<Utc> {
809        for attr in attributes {
810            if attr.key == TRACE_START_TIME_KEY {
811                if let Value::String(s) = &attr.value {
812                    if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
813                        return dt;
814                    }
815                }
816            }
817        }
818
819        tracing::warn!(
820            "Trace start time attribute not found or invalid, falling back to span start_time"
821        );
822        *start_time
823    }
824
825    pub fn convert_to_baggage_records(
826        trace_id: &TraceId,
827        attributes: &Vec<Attribute>,
828        scope_name: &str,
829    ) -> Vec<TraceBaggageRecord> {
830        let baggage_kvs: Vec<(String, String)> = attributes
831            .iter()
832            .filter_map(|attr| {
833                // Only process attributes with baggage prefix
834                if attr.key.starts_with(BAGGAGE_PREFIX) {
835                    let clean_key = attr
836                        .key
837                        .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
838                        .map(|stripped| stripped.trim())
839                        .unwrap_or(&attr.key)
840                        .to_string();
841
842                    // Handle different value types from OpenTelemetry KeyValue
843                    let value_string = match &attr.value {
844                        Value::String(s) => s.clone(),
845                        Value::Number(n) => n.to_string(),
846                        Value::Bool(b) => b.to_string(),
847                        Value::Null => "null".to_string(),
848                        Value::Array(_) | Value::Object(_) => {
849                            // For complex types, use compact JSON representation
850                            serde_json::to_string(&attr.value)
851                                .unwrap_or_else(|_| format!("{:?}", attr.value))
852                        }
853                    };
854
855                    Some((clean_key, value_string))
856                } else {
857                    None
858                }
859            })
860            .collect();
861
862        baggage_kvs
863            .into_iter()
864            .map(|(key, value)| TraceBaggageRecord {
865                created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
866                trace_id: *trace_id,
867                scope: scope_name.to_string(),
868                key,
869                value,
870            })
871            .collect()
872    }
873
874    pub fn to_records(self) -> Result<TraceRecords, RecordError> {
875        let resource_spans = self.request.resource_spans;
876
877        // Pre-calculate capacity to avoid reallocations
878        let estimated_capacity: usize = resource_spans
879            .iter()
880            .map(|rs| {
881                rs.scope_spans
882                    .iter()
883                    .map(|ss| ss.spans.len())
884                    .sum::<usize>()
885            })
886            .sum();
887
888        let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
889        let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
890        let mut tags: HashSet<TagRecord> = HashSet::new();
891
892        for resource_span in resource_spans {
893            // process metadata only once per resource span
894            let service_name =
895                Self::get_service_name_from_resource(&resource_span.resource, "unknown");
896            let resource_attributes = Attribute::from_resources(&resource_span.resource);
897
898            for scope_span in &resource_span.scope_spans {
899                let (scope_name, scope_version) = Self::get_scope_info(scope_span);
900
901                for span in &scope_span.spans {
902                    // Core identifiers
903                    let trace_id = TraceId::from_slice(span.trace_id.as_slice())?;
904                    let span_id = SpanId::from_slice(span.span_id.as_slice())?;
905                    let parent_span_id = if !span.parent_span_id.is_empty() {
906                        Some(SpanId::from_slice(span.parent_span_id.as_slice())?)
907                    } else {
908                        None
909                    };
910
911                    let (start_time, end_time, duration_ms) =
912                        Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
913
914                    let (cleaned_attributes, span_baggage, span_tags) = Self::process_attributes(
915                        &trace_id,
916                        &span.attributes,
917                        &scope_name,
918                        start_time,
919                    )?;
920
921                    // Add to collections
922                    baggage_records.extend(span_baggage);
923                    tags.extend(span_tags);
924
925                    let (input, output) = Self::extract_input_output(&cleaned_attributes);
926
927                    // SpanRecord for insert
928                    span_records.push(TraceSpanRecord {
929                        created_at: start_time,
930                        trace_id,
931                        span_id,
932                        parent_span_id,
933                        flags: span.flags as i32,
934                        trace_state: span.trace_state.clone(),
935                        scope_name: scope_name.clone(),
936                        scope_version: scope_version.clone(),
937                        span_name: span.name.clone(),
938                        span_kind: Self::span_kind_to_string(span.kind),
939                        start_time,
940                        end_time,
941                        duration_ms,
942                        status_code: span.status.as_ref().map(|s| s.code).unwrap_or(0),
943                        status_message: span
944                            .status
945                            .as_ref()
946                            .map(|s| s.message.clone())
947                            .unwrap_or_default(),
948                        attributes: cleaned_attributes,
949                        events: Self::events_to_json_array(&span.events)?,
950                        links: Self::links_to_json_array(&span.links)?,
951                        label: None,
952                        input,
953                        output,
954                        service_name: service_name.clone(),
955                        resource_attributes: resource_attributes.clone(),
956                    });
957                }
958            }
959        }
960
961        let tag_records: Vec<TagRecord> = tags.into_iter().collect();
962        Ok((span_records, baggage_records, tag_records))
963    }
964}
965
966#[derive(Clone, Debug, Serialize, Deserialize, Default)]
967#[pyclass]
968pub struct Attribute {
969    #[pyo3(get)]
970    pub key: String,
971    pub value: Value,
972}
973
974#[pymethods]
975impl Attribute {
976    #[getter]
977    pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
978        Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
979    }
980
981    pub fn __str__(&self) -> String {
982        PyHelperFuncs::__str__(self)
983    }
984}
985
986impl Attribute {
987    pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
988        Attribute {
989            key,
990            value: otel_value_to_serde_value(value),
991        }
992    }
993
994    fn from_resources(
995        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
996    ) -> Vec<Attribute> {
997        match resource {
998            Some(res) => res
999                .attributes
1000                .iter()
1001                .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
1002                .collect(),
1003            None => vec![],
1004        }
1005    }
1006}
1007
1008#[derive(Clone, Debug, Serialize, Deserialize)]
1009#[pyclass]
1010pub struct SpanEvent {
1011    #[pyo3(get)]
1012    pub timestamp: chrono::DateTime<Utc>,
1013    #[pyo3(get)]
1014    pub name: String,
1015    #[pyo3(get)]
1016    pub attributes: Vec<Attribute>,
1017    #[pyo3(get)]
1018    pub dropped_attributes_count: u32,
1019}
1020
1021#[pymethods]
1022impl SpanEvent {
1023    pub fn __str__(&self) -> String {
1024        PyHelperFuncs::__str__(self)
1025    }
1026}
1027
1028#[derive(Clone, Debug, Serialize, Deserialize)]
1029#[pyclass]
1030pub struct SpanLink {
1031    #[pyo3(get)]
1032    pub trace_id: String,
1033    #[pyo3(get)]
1034    pub span_id: String,
1035    #[pyo3(get)]
1036    pub trace_state: String,
1037    #[pyo3(get)]
1038    pub attributes: Vec<Attribute>,
1039    #[pyo3(get)]
1040    pub dropped_attributes_count: u32,
1041}
1042
1043#[pymethods]
1044impl SpanLink {
1045    pub fn __str__(&self) -> String {
1046        PyHelperFuncs::__str__(self)
1047    }
1048}
1049
1050#[derive(Clone, Debug, Serialize, Deserialize)]
1051#[pyclass]
1052pub struct Tag {
1053    #[pyo3(get)]
1054    pub key: String,
1055    #[pyo3(get)]
1056    pub value: String,
1057}
1058
1059#[pymethods]
1060impl Tag {
1061    pub fn __str__(&self) -> String {
1062        PyHelperFuncs::__str__(self)
1063    }
1064}
1065
1066#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
1067#[pyclass]
1068#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
1069pub struct TagRecord {
1070    #[pyo3(get)]
1071    pub entity_type: String,
1072    #[pyo3(get)]
1073    pub entity_id: String,
1074    #[pyo3(get)]
1075    pub key: String,
1076    #[pyo3(get)]
1077    pub value: String,
1078}
1079
1080impl TagRecord {
1081    /// Create a tag record from a TraceId
1082    pub fn from_trace(trace_id: &TraceId, key: String, value: String) -> Self {
1083        Self {
1084            entity_type: "trace".to_string(),
1085            entity_id: trace_id.to_hex(),
1086            key,
1087            value,
1088        }
1089    }
1090}
1091
1092#[pymethods]
1093impl TagRecord {
1094    pub fn __str__(&self) -> String {
1095        PyHelperFuncs::__str__(self)
1096    }
1097}
1098
1099/// Convert a flat list of `TraceSpanRecord`s into a tree-enriched list of `TraceSpan`s.
1100///
1101/// Groups records by `trace_id`, performs DFS to compute `depth`, `path`,
1102/// `root_span_id`, and `span_order` for each span.
1103pub fn build_trace_spans(records: Vec<TraceSpanRecord>) -> Vec<sql::TraceSpan> {
1104    if records.is_empty() {
1105        return Vec::new();
1106    }
1107
1108    // Group by trace_id (hex)
1109    let mut groups: HashMap<String, Vec<&TraceSpanRecord>> = HashMap::new();
1110    for record in &records {
1111        groups
1112            .entry(record.trace_id.to_hex())
1113            .or_default()
1114            .push(record);
1115    }
1116
1117    let mut all_spans = Vec::with_capacity(records.len());
1118    let mut global_order: i32 = 0;
1119
1120    for spans in groups.values() {
1121        // Build parent→children index (using span_id bytes as key)
1122        let mut children: HashMap<[u8; 8], Vec<usize>> = HashMap::new();
1123        let mut root_indices: Vec<usize> = Vec::new();
1124
1125        for (i, span) in spans.iter().enumerate() {
1126            if let Some(pid) = &span.parent_span_id {
1127                children.entry(*pid.as_bytes()).or_default().push(i);
1128            } else {
1129                root_indices.push(i);
1130            }
1131        }
1132
1133        // Sort roots by start_time for determinism
1134        root_indices.sort_by_key(|&i| spans[i].start_time);
1135
1136        // Determine root_span_id
1137        let root_span_id_hex = if let Some(&first_root) = root_indices.first() {
1138            spans[first_root].span_id.to_hex()
1139        } else {
1140            // All spans have parents (orphans) — use first span's span_id as fallback
1141            spans[0].span_id.to_hex()
1142        };
1143
1144        // DFS traversal (iterative)
1145        let pre_dfs_len = all_spans.len();
1146        dfs_assign_records(
1147            &root_indices,
1148            spans,
1149            &children,
1150            &root_span_id_hex,
1151            &mut all_spans,
1152            &mut global_order,
1153        );
1154
1155        // Attach orphan spans (parent not found in this trace group)
1156        // Only look at spans added by THIS group's DFS to avoid cross-group collisions
1157        let visited: HashSet<[u8; 8]> = all_spans[pre_dfs_len..]
1158            .iter()
1159            .filter_map(|s| {
1160                let bytes = SpanId::hex_to_bytes(&s.span_id).ok()?;
1161                let arr: [u8; 8] = bytes.try_into().ok()?;
1162                Some(arr)
1163            })
1164            .collect();
1165
1166        for span in spans {
1167            if !visited.contains(span.span_id.as_bytes()) {
1168                let span_id_hex = span.span_id.to_hex();
1169                all_spans.push(record_to_trace_span(
1170                    span,
1171                    &span_id_hex,
1172                    &root_span_id_hex,
1173                    0,
1174                    vec![span_id_hex.clone()],
1175                    global_order,
1176                ));
1177                global_order += 1;
1178            }
1179        }
1180    }
1181
1182    all_spans
1183}
1184
1185/// Iterative DFS traversal to assign depth, path, and span_order to trace spans.
1186fn dfs_assign_records(
1187    root_indices: &[usize],
1188    spans: &[&TraceSpanRecord],
1189    children: &HashMap<[u8; 8], Vec<usize>>,
1190    root_span_id_hex: &str,
1191    result: &mut Vec<sql::TraceSpan>,
1192    span_order: &mut i32,
1193) {
1194    // Stack entries: (span_index, depth, path_so_far)
1195    let mut stack: Vec<(usize, i32, Vec<String>)> = Vec::new();
1196    let mut visited: HashSet<usize> = HashSet::new();
1197
1198    // Push roots in reverse so the first root is processed first
1199    for &idx in root_indices.iter().rev() {
1200        stack.push((idx, 0, Vec::new()));
1201    }
1202
1203    while let Some((idx, depth, path_so_far)) = stack.pop() {
1204        if !visited.insert(idx) {
1205            continue; // cycle detected — skip
1206        }
1207        let span = spans[idx];
1208        let span_id_hex = span.span_id.to_hex();
1209
1210        let mut path = path_so_far;
1211        path.push(span_id_hex.clone());
1212
1213        result.push(record_to_trace_span(
1214            span,
1215            &span_id_hex,
1216            root_span_id_hex,
1217            depth,
1218            path.clone(),
1219            *span_order,
1220        ));
1221        *span_order += 1;
1222
1223        // Push children in reverse start_time order so earliest is processed first
1224        if let Some(child_indices) = children.get(span.span_id.as_bytes()) {
1225            let mut sorted = child_indices.clone();
1226            sorted.sort_by_key(|&i| spans[i].start_time);
1227            for &ci in sorted.iter().rev() {
1228                stack.push((ci, depth + 1, path.clone()));
1229            }
1230        }
1231    }
1232}
1233
1234fn record_to_trace_span(
1235    record: &TraceSpanRecord,
1236    span_id_hex: &str,
1237    root_span_id_hex: &str,
1238    depth: i32,
1239    path: Vec<String>,
1240    span_order: i32,
1241) -> sql::TraceSpan {
1242    let input = match &record.input {
1243        Value::Null => None,
1244        v => Some(v.clone()),
1245    };
1246    let output = match &record.output {
1247        Value::Null => None,
1248        v => Some(v.clone()),
1249    };
1250
1251    sql::TraceSpan {
1252        trace_id: record.trace_id.to_hex(),
1253        span_id: span_id_hex.to_string(),
1254        parent_span_id: record.parent_span_id.as_ref().map(|id| id.to_hex()),
1255        span_name: record.span_name.clone(),
1256        span_kind: Some(record.span_kind.clone()),
1257        start_time: record.start_time,
1258        end_time: record.end_time,
1259        duration_ms: record.duration_ms,
1260        status_code: record.status_code,
1261        status_message: Some(record.status_message.clone()),
1262        attributes: record.attributes.clone(),
1263        events: record.events.clone(),
1264        links: record.links.clone(),
1265        depth,
1266        path,
1267        root_span_id: root_span_id_hex.to_string(),
1268        service_name: record.service_name.clone(),
1269        span_order,
1270        input,
1271        output,
1272    }
1273}
1274
1275/// Lightweight trace summary record written to the Delta Lake `trace_summaries` table.
1276///
1277/// Produced by converting a `TraceAggregator` (in `scouter_sql`) after the in-memory
1278/// aggregation phase. Entity tags are written separately to Postgres and are not included here.
1279#[derive(Clone, Debug)]
1280pub struct TraceSummaryRecord {
1281    pub trace_id: TraceId,
1282    pub service_name: String,
1283    pub scope_name: String,
1284    pub scope_version: String,
1285    pub root_operation: String,
1286    pub start_time: DateTime<Utc>,
1287    pub end_time: Option<DateTime<Utc>>,
1288    pub status_code: i32,
1289    pub status_message: String,
1290    pub span_count: i64,
1291    pub error_count: i64,
1292    pub resource_attributes: Vec<Attribute>,
1293    /// Entity UIDs associated with this trace (from `scouter.entity` attributes).
1294    pub entity_ids: Vec<String>,
1295    /// Queue record UIDs associated with this trace (from `scouter.queue.record` attributes).
1296    pub queue_ids: Vec<String>,
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use super::*;
1302
1303    fn make_span_record(
1304        trace_id: [u8; 16],
1305        span_id: [u8; 8],
1306        parent_span_id: Option<[u8; 8]>,
1307        name: &str,
1308        start_ms: i64,
1309    ) -> TraceSpanRecord {
1310        TraceSpanRecord {
1311            trace_id: TraceId::from_bytes(trace_id),
1312            span_id: SpanId::from_bytes(span_id),
1313            parent_span_id: parent_span_id.map(SpanId::from_bytes),
1314            span_name: name.to_string(),
1315            start_time: chrono::DateTime::from_timestamp_millis(start_ms).unwrap(),
1316            end_time: chrono::DateTime::from_timestamp_millis(start_ms + 100).unwrap(),
1317            duration_ms: 100,
1318            ..Default::default()
1319        }
1320    }
1321
1322    #[test]
1323    fn build_trace_spans_empty() {
1324        let result = build_trace_spans(vec![]);
1325        assert!(result.is_empty());
1326    }
1327
1328    #[test]
1329    fn build_trace_spans_simple_tree() {
1330        let tid = [0u8; 16];
1331        let root_sid = [1u8; 8];
1332        let child_sid = [2u8; 8];
1333
1334        let records = vec![
1335            make_span_record(tid, root_sid, None, "root", 1000),
1336            make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1337        ];
1338
1339        let spans = build_trace_spans(records);
1340        assert_eq!(spans.len(), 2);
1341
1342        // Root span
1343        let root = spans.iter().find(|s| s.span_name == "root").unwrap();
1344        assert_eq!(root.depth, 0);
1345        assert_eq!(root.span_order, 0);
1346        assert!(root.parent_span_id.is_none());
1347        assert_eq!(root.path.len(), 1);
1348
1349        // Child span
1350        let child = spans.iter().find(|s| s.span_name == "child").unwrap();
1351        assert_eq!(child.depth, 1);
1352        assert_eq!(child.span_order, 1);
1353        assert!(child.parent_span_id.is_some());
1354        assert_eq!(child.path.len(), 2);
1355        assert_eq!(child.root_span_id, root.span_id);
1356    }
1357
1358    #[test]
1359    fn build_trace_spans_orphan_spans() {
1360        let tid = [0u8; 16];
1361        let orphan_sid = [3u8; 8];
1362        // Parent doesn't exist in the batch
1363        let missing_parent = [99u8; 8];
1364
1365        let records = vec![make_span_record(
1366            tid,
1367            orphan_sid,
1368            Some(missing_parent),
1369            "orphan",
1370            1000,
1371        )];
1372
1373        let spans = build_trace_spans(records);
1374        assert_eq!(spans.len(), 1);
1375
1376        let orphan = &spans[0];
1377        assert_eq!(orphan.span_name, "orphan");
1378        assert_eq!(orphan.depth, 0);
1379    }
1380
1381    #[test]
1382    fn build_trace_spans_multiple_traces() {
1383        let tid1 = [1u8; 16];
1384        let tid2 = [2u8; 16];
1385
1386        let records = vec![
1387            make_span_record(tid1, [10u8; 8], None, "trace1_root", 1000),
1388            make_span_record(tid2, [20u8; 8], None, "trace2_root", 2000),
1389            make_span_record(tid1, [11u8; 8], Some([10u8; 8]), "trace1_child", 1050),
1390        ];
1391
1392        let spans = build_trace_spans(records);
1393        assert_eq!(spans.len(), 3);
1394
1395        // Check that trace_ids are correct
1396        let t1_spans: Vec<_> = spans
1397            .iter()
1398            .filter(|s| s.trace_id == TraceId::from_bytes(tid1).to_hex())
1399            .collect();
1400        assert_eq!(t1_spans.len(), 2);
1401
1402        let t2_spans: Vec<_> = spans
1403            .iter()
1404            .filter(|s| s.trace_id == TraceId::from_bytes(tid2).to_hex())
1405            .collect();
1406        assert_eq!(t2_spans.len(), 1);
1407    }
1408
1409    #[test]
1410    fn build_trace_spans_deep_tree() {
1411        let tid = [0u8; 16];
1412        let root_sid = [1u8; 8];
1413        let child_sid = [2u8; 8];
1414        let grandchild_sid = [3u8; 8];
1415
1416        let records = vec![
1417            make_span_record(tid, root_sid, None, "root", 1000),
1418            make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1419            make_span_record(tid, grandchild_sid, Some(child_sid), "grandchild", 1100),
1420        ];
1421
1422        let spans = build_trace_spans(records);
1423        assert_eq!(spans.len(), 3);
1424
1425        let grandchild = spans.iter().find(|s| s.span_name == "grandchild").unwrap();
1426        assert_eq!(grandchild.depth, 2);
1427        assert_eq!(grandchild.path.len(), 3); // root → child → grandchild
1428    }
1429
1430    #[test]
1431    fn build_trace_spans_cross_group_collision() {
1432        // Two different traces where spans happen to share the same span_id bytes.
1433        // The visited set must be scoped per group to avoid cross-group collisions.
1434        let tid1 = [1u8; 16];
1435        let tid2 = [2u8; 16];
1436        let shared_sid = [42u8; 8]; // Same span_id in both traces
1437
1438        let records = vec![
1439            make_span_record(tid1, shared_sid, None, "trace1_root", 1000),
1440            make_span_record(tid2, shared_sid, None, "trace2_root", 2000),
1441        ];
1442
1443        let spans = build_trace_spans(records);
1444        // Both spans must appear — the cross-group visited set bug would drop the second
1445        assert_eq!(spans.len(), 2);
1446
1447        let names: HashSet<&str> = spans.iter().map(|s| s.span_name.as_str()).collect();
1448        assert!(names.contains("trace1_root"));
1449        assert!(names.contains("trace2_root"));
1450    }
1451
1452    #[test]
1453    fn build_trace_spans_input_output_mapping() {
1454        let tid = [0u8; 16];
1455        let records = vec![TraceSpanRecord {
1456            trace_id: TraceId::from_bytes(tid),
1457            span_id: SpanId::from_bytes([1u8; 8]),
1458            parent_span_id: None,
1459            span_name: "test".to_string(),
1460            input: serde_json::json!({"key": "value"}),
1461            output: Value::Null,
1462            ..Default::default()
1463        }];
1464
1465        let spans = build_trace_spans(records);
1466        assert_eq!(spans.len(), 1);
1467        assert!(spans[0].input.is_some());
1468        assert!(spans[0].output.is_none()); // Null → None
1469    }
1470
1471    #[test]
1472    fn build_trace_spans_cycle_does_not_loop() {
1473        // Construct a cycle: A → B → A (via parent_span_id pointing back).
1474        // The DFS visited guard must prevent infinite traversal.
1475        let tid = [0u8; 16];
1476        let span_a = [1u8; 8];
1477        let span_b = [2u8; 8];
1478
1479        // A is the root (no parent), B claims A as parent,
1480        // but we also add A as a child of B in the children map by
1481        // making A's parent_span_id point to B. This creates a cycle.
1482        let records = vec![
1483            make_span_record(tid, span_a, Some(span_b), "A", 1000),
1484            make_span_record(tid, span_b, Some(span_a), "B", 1050),
1485        ];
1486
1487        // Both are orphans (neither has a true root), so both become synthetic roots.
1488        // The key test: this terminates without hanging.
1489        let spans = build_trace_spans(records);
1490        assert_eq!(spans.len(), 2, "Both spans should appear exactly once");
1491    }
1492}