Skip to main content

scouter_types/trace/
mod.rs

1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use pyo3::prelude::*;
16use pyo3::types::PyDict;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashSet;
20use std::fmt;
21
22pub const FUNCTION_TYPE: &str = "function.type";
23pub const FUNCTION_STREAMING: &str = "function.streaming";
24pub const FUNCTION_NAME: &str = "function.name";
25pub const FUNCTION_MODULE: &str = "function.module";
26pub const FUNCTION_QUALNAME: &str = "function.qualname";
27pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
28pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
29pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
30pub const SERVICE_NAME: &str = "service.name";
31pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
32pub const BAGGAGE_PREFIX: &str = "baggage";
33pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
34pub const SCOUTER_SCOPE: &str = "scouter.scope";
35pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
36pub const SPAN_ERROR: &str = "span.error";
37pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
38pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
39pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
40pub const SCOUTER_ENTITY: &str = "scouter.entity";
41
42// patterns for identifying baggage and tags
43pub const BAGGAGE_PATTERN: &str = "baggage.";
44pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
45pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
46
47type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
48
49#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq, Hash)]
50pub struct ScouterEntityAttribute {
51    pub uid: String,
52    pub r#type: String,
53    pub space: String,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
57pub struct TraceId([u8; 16]);
58
59impl TraceId {
60    pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
61        let mut bytes = [0u8; 16];
62        hex::decode_to_slice(hex, &mut bytes)?;
63        Ok(Self(bytes))
64    }
65
66    pub fn from_bytes(bytes: [u8; 16]) -> Self {
67        Self(bytes)
68    }
69
70    pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
71        if slice.len() != 16 {
72            return Err(RecordError::SliceError(format!(
73                "Invalid trace_id length: expected 16 bytes, got {}",
74                slice.len()
75            )));
76        }
77        let mut bytes = [0u8; 16];
78        bytes.copy_from_slice(slice);
79        Ok(Self(bytes))
80    }
81
82    pub fn to_hex(&self) -> String {
83        hex::encode(self.0)
84    }
85
86    pub fn as_bytes(&self) -> &[u8; 16] {
87        &self.0
88    }
89
90    pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
91        let bytes = hex::decode(hex)?;
92        // validate length for trace_id (16 bytes) or span_id (8 bytes)
93        if bytes.len() == 16 {
94            Ok(bytes)
95        } else {
96            Err(RecordError::SliceError(format!(
97                "Invalid hex string length: expected 16 or 8 bytes, got {}",
98                bytes.len()
99            )))
100        }
101    }
102}
103
104impl fmt::Display for TraceId {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        write!(f, "{}", self.to_hex())
107    }
108}
109
110impl Serialize for TraceId {
111    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
112    where
113        S: serde::Serializer,
114    {
115        serializer.serialize_str(&self.to_hex())
116    }
117}
118
119impl<'de> Deserialize<'de> for TraceId {
120    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
121    where
122        D: serde::Deserializer<'de>,
123    {
124        let hex = String::deserialize(deserializer)?;
125        TraceId::from_hex(&hex).map_err(serde::de::Error::custom)
126    }
127}
128
129#[cfg(feature = "server")]
130impl<'r> sqlx::Decode<'r, sqlx::Postgres> for TraceId {
131    fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
132        let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
133        if bytes.len() != 16 {
134            return Err("TraceId must be exactly 16 bytes".into());
135        }
136        let mut array = [0u8; 16];
137        array.copy_from_slice(bytes);
138        Ok(TraceId(array))
139    }
140}
141
142#[cfg(feature = "server")]
143impl sqlx::Type<sqlx::Postgres> for TraceId {
144    fn type_info() -> sqlx::postgres::PgTypeInfo {
145        <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
146    }
147}
148
149#[cfg(feature = "server")]
150impl sqlx::Encode<'_, sqlx::Postgres> for TraceId {
151    fn encode_by_ref(
152        &self,
153        buf: &mut sqlx::postgres::PgArgumentBuffer,
154    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
155        <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
156    }
157}
158
159#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
160pub struct SpanId([u8; 8]);
161
162impl SpanId {
163    pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
164        let mut bytes = [0u8; 8];
165        hex::decode_to_slice(hex, &mut bytes)?;
166        Ok(Self(bytes))
167    }
168
169    pub fn from_bytes(bytes: [u8; 8]) -> Self {
170        Self(bytes)
171    }
172
173    pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
174        if slice.len() != 8 {
175            return Err(RecordError::SliceError(format!(
176                "Invalid trace_id length: expected 8 bytes, got {}",
177                slice.len()
178            )));
179        }
180        let mut bytes = [0u8; 8];
181        bytes.copy_from_slice(slice);
182        Ok(Self(bytes))
183    }
184
185    pub fn to_hex(&self) -> String {
186        hex::encode(self.0)
187    }
188
189    pub fn as_bytes(&self) -> &[u8; 8] {
190        &self.0
191    }
192
193    pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
194        let bytes = hex::decode(hex)?;
195        if bytes.len() == 8 {
196            Ok(bytes)
197        } else {
198            Err(RecordError::SliceError(format!(
199                "Invalid hex string length: expected 16 or 8 bytes, got {}",
200                bytes.len()
201            )))
202        }
203    }
204}
205
206impl fmt::Display for SpanId {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        write!(f, "{}", self.to_hex())
209    }
210}
211
212impl Serialize for SpanId {
213    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
214    where
215        S: serde::Serializer,
216    {
217        serializer.serialize_str(&self.to_hex())
218    }
219}
220
221impl<'de> Deserialize<'de> for SpanId {
222    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
223    where
224        D: serde::Deserializer<'de>,
225    {
226        let hex = String::deserialize(deserializer)?;
227        SpanId::from_hex(&hex).map_err(serde::de::Error::custom)
228    }
229}
230
231#[cfg(feature = "server")]
232impl<'r> sqlx::Decode<'r, sqlx::Postgres> for SpanId {
233    fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
234        let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
235        if bytes.len() != 8 {
236            return Err("SpanId must be exactly 8 bytes".into());
237        }
238        let mut array = [0u8; 8];
239        array.copy_from_slice(bytes);
240        Ok(SpanId(array))
241    }
242}
243
244#[cfg(feature = "server")]
245impl sqlx::Type<sqlx::Postgres> for SpanId {
246    fn type_info() -> sqlx::postgres::PgTypeInfo {
247        <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
248    }
249}
250
251#[cfg(feature = "server")]
252impl sqlx::Encode<'_, sqlx::Postgres> for SpanId {
253    fn encode_by_ref(
254        &self,
255        buf: &mut sqlx::postgres::PgArgumentBuffer,
256    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
257        <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
258    }
259}
260
261#[derive(Clone, Debug, Serialize, Deserialize, Default)]
262#[pyclass]
263pub struct TraceRecord {
264    #[pyo3(get)]
265    pub created_at: DateTime<Utc>,
266    pub trace_id: TraceId,
267    #[pyo3(get)]
268    pub service_name: String,
269    #[pyo3(get)]
270    pub scope_name: String,
271    #[pyo3(get)]
272    pub scope_version: Option<String>,
273    #[pyo3(get)]
274    pub trace_state: String,
275    #[pyo3(get)]
276    pub start_time: chrono::DateTime<Utc>,
277    #[pyo3(get)]
278    pub end_time: chrono::DateTime<Utc>,
279    #[pyo3(get)]
280    pub duration_ms: i64,
281    #[pyo3(get)]
282    pub status_code: i32,
283    #[pyo3(get)]
284    pub status_message: String,
285    pub root_span_id: SpanId,
286    #[pyo3(get)]
287    pub span_count: i32,
288    #[pyo3(get)]
289    pub tags: Vec<Tag>,
290    #[pyo3(get)]
291    pub process_attributes: Vec<Attribute>,
292}
293
294#[pymethods]
295impl TraceRecord {
296    pub fn __str__(&self) -> String {
297        PyHelperFuncs::__str__(self)
298    }
299
300    #[getter]
301    pub fn get_trace_id(&self) -> String {
302        self.trace_id.to_hex()
303    }
304
305    #[getter]
306    pub fn get_root_span_id(&self) -> String {
307        self.root_span_id.to_hex()
308    }
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize, Default)]
312#[pyclass]
313pub struct TraceSpanRecord {
314    #[pyo3(get)]
315    pub created_at: chrono::DateTime<Utc>,
316
317    // core identifiers
318    pub trace_id: TraceId,
319    pub span_id: SpanId,
320    pub parent_span_id: Option<SpanId>,
321
322    // W3C Trace Context fields
323    #[pyo3(get)]
324    pub flags: i32,
325    #[pyo3(get)]
326    pub trace_state: String,
327
328    // instrumentation
329    #[pyo3(get)]
330    pub scope_name: String,
331    #[pyo3(get)]
332    pub scope_version: Option<String>,
333
334    // Span metadata
335    #[pyo3(get)]
336    pub span_name: String,
337    #[pyo3(get)]
338    pub span_kind: String,
339
340    // Temporal data
341    #[pyo3(get)]
342    pub start_time: chrono::DateTime<Utc>,
343    #[pyo3(get)]
344    pub end_time: chrono::DateTime<Utc>,
345    #[pyo3(get)]
346    pub duration_ms: i64,
347
348    // Status
349    #[pyo3(get)]
350    pub status_code: i32,
351    #[pyo3(get)]
352    pub status_message: String,
353
354    // Semi-structured data
355    #[pyo3(get)]
356    pub attributes: Vec<Attribute>,
357    #[pyo3(get)]
358    pub events: Vec<SpanEvent>,
359    #[pyo3(get)]
360    pub links: Vec<SpanLink>,
361
362    // Scouter-specific fields
363    #[pyo3(get)]
364    pub label: Option<String>,
365    pub input: Value,
366    pub output: Value,
367
368    // Service reference (denormalized for query performance)
369    #[pyo3(get)]
370    pub service_name: String,
371    #[pyo3(get)]
372    pub resource_attributes: Vec<Attribute>,
373}
374
375#[pymethods]
376impl TraceSpanRecord {
377    #[getter]
378    pub fn get_trace_id(&self) -> String {
379        self.trace_id.to_hex()
380    }
381
382    #[getter]
383    pub fn get_span_id(&self) -> String {
384        self.span_id.to_hex()
385    }
386
387    #[getter]
388    pub fn get_parent_span_id(&self) -> Option<String> {
389        self.parent_span_id.as_ref().map(|id| id.to_hex())
390    }
391
392    #[getter]
393    pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
394        let dict = PyDict::new(py);
395        match &self.input {
396            Value::Null => {}
397            _ => {
398                json_to_pyobject(py, &self.input, &dict)?;
399            }
400        }
401        Ok(dict)
402    }
403
404    #[getter]
405    pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
406        let dict = PyDict::new(py);
407        match &self.output {
408            Value::Null => {}
409            _ => {
410                json_to_pyobject(py, &self.output, &dict)?;
411            }
412        }
413        Ok(dict)
414    }
415
416    pub fn __str__(&self) -> String {
417        // serialize the struct to a string
418        PyHelperFuncs::__str__(self)
419    }
420}
421
422#[derive(Clone, Debug, Serialize, Deserialize, Default)]
423#[pyclass]
424#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
425pub struct TraceBaggageRecord {
426    #[pyo3(get)]
427    pub created_at: DateTime<Utc>,
428    pub trace_id: TraceId,
429    #[pyo3(get)]
430    pub scope: String,
431    #[pyo3(get)]
432    pub key: String,
433    #[pyo3(get)]
434    pub value: String,
435}
436
437#[pymethods]
438impl TraceBaggageRecord {
439    pub fn __str__(&self) -> String {
440        PyHelperFuncs::__str__(self)
441    }
442
443    #[getter]
444    pub fn get_trace_id(&self) -> String {
445        self.trace_id.to_hex()
446    }
447}
448
449pub type TraceRecords = (
450    Vec<TraceSpanRecord>,
451    Vec<TraceBaggageRecord>,
452    Vec<TagRecord>,
453);
454
455pub trait TraceRecordExt {
456    fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
457        Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
458    }
459
460    fn process_attributes(
461        trace_id: &TraceId,
462        span_attributes: &[KeyValue],
463        scope: &str,
464        created_at: DateTime<Utc>,
465    ) -> Result<SpanAttributes, RecordError> {
466        let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
467        let mut baggage_records = Vec::new();
468        let mut tags = Vec::new();
469        let scope_owned = scope.to_string();
470
471        for kv in span_attributes {
472            let key = &kv.key;
473
474            // Check if this is a baggage-prefixed tag
475            if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
476                if !tag_key.is_empty() {
477                    // tag values are stored as strings for tag table
478                    let string_value = match &kv.value {
479                        Some(v) => Self::otel_value_to_string(v),
480                        None => "null".to_string(),
481                    };
482
483                    // Extract as a tag
484                    tags.push(TagRecord::from_trace(
485                        trace_id,
486                        tag_key.to_string(),
487                        string_value.clone(),
488                    ));
489
490                    // Store cleaned attribute with stripped key
491                    cleaned_attributes.push(Attribute {
492                        key: tag_key.to_string(),
493                        value: Value::String(string_value.clone()),
494                    });
495
496                    // Also extract as baggage since it has baggage prefix
497                    baggage_records.push(TraceBaggageRecord {
498                        created_at,
499                        trace_id: trace_id.clone(),
500                        scope: scope_owned.clone(),
501                        key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
502                        value: string_value,
503                    });
504                } else {
505                    tracing::warn!(
506                        attribute_key = %key,
507                        "Skipping baggage tag with empty key after prefix removal"
508                    );
509                }
510            }
511            // Check for non-baggage tags
512            else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
513                // tag values are stored as strings for tag table
514                if !tag_key.is_empty() {
515                    let string_value = match &kv.value {
516                        Some(v) => Self::otel_value_to_string(v),
517                        None => "null".to_string(),
518                    };
519
520                    tags.push(TagRecord::from_trace(
521                        trace_id,
522                        tag_key.to_string(),
523                        string_value.clone(),
524                    ));
525
526                    cleaned_attributes.push(Attribute {
527                        key: tag_key.to_string(),
528                        value: Value::String(string_value.clone()),
529                    });
530                } else {
531                    tracing::warn!(
532                        attribute_key = %key,
533                        "Skipping tag with empty key after prefix removal"
534                    );
535                }
536            }
537            // Check for regular baggage (not tags)
538            else if key.starts_with(BAGGAGE_PATTERN) {
539                let clean_key = key
540                    .strip_prefix(BAGGAGE_PATTERN)
541                    .unwrap_or(key)
542                    .trim()
543                    .to_string();
544
545                let string_value = match &kv.value {
546                    Some(v) => Self::otel_value_to_string(v),
547                    None => "null".to_string(),
548                };
549
550                baggage_records.push(TraceBaggageRecord {
551                    created_at,
552                    trace_id: trace_id.clone(),
553                    scope: scope_owned.clone(),
554                    key: clean_key,
555                    value: string_value,
556                });
557            }
558            // Regular attribute
559            else {
560                let value = match &kv.value {
561                    Some(v) => otel_value_to_serde_value(v),
562                    None => Value::Null,
563                };
564
565                cleaned_attributes.push(Attribute {
566                    key: key.clone(),
567                    value,
568                });
569            }
570        }
571
572        Ok((cleaned_attributes, baggage_records, tags))
573    }
574
575    fn otel_value_to_string(value: &AnyValue) -> String {
576        match &value.value {
577            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
578                s.clone()
579            }
580            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
581                i.to_string()
582            }
583            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
584                d.to_string()
585            }
586            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
587                b.to_string()
588            }
589            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
590            | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
591                let serde_val = otel_value_to_serde_value(value);
592                serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
593            }
594            _ => "null".to_string(),
595        }
596    }
597
598    fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
599        attributes
600            .iter()
601            .map(|kv| {
602                let value = match &kv.value {
603                    Some(v) => otel_value_to_serde_value(v),
604                    None => Value::Null,
605                };
606
607                Ok(Attribute {
608                    key: kv.key.clone(),
609                    value,
610                })
611            })
612            .collect()
613    }
614
615    fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
616        attributes
617            .iter()
618            .map(|kv| {
619                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
620                Ok(SpanEvent {
621                    name: kv.name.clone(),
622                    timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
623                    attributes,
624                    dropped_attributes_count: kv.dropped_attributes_count,
625                })
626            })
627            .collect()
628    }
629
630    fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
631        attributes
632            .iter()
633            .map(|kv| {
634                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
635                Ok(SpanLink {
636                    trace_id: hex::encode(&kv.trace_id),
637                    span_id: hex::encode(&kv.span_id),
638                    trace_state: kv.trace_state.clone(),
639                    attributes,
640                    dropped_attributes_count: kv.dropped_attributes_count,
641                })
642            })
643            .collect()
644    }
645}
646
647#[derive(Clone, Debug, Serialize, Deserialize, Default)]
648pub struct TraceServerRecord {
649    pub request: ExportTraceServiceRequest,
650}
651
652impl TraceRecordExt for TraceServerRecord {}
653
654impl TraceServerRecord {
655    /// Extract InstrumentationScope name and version from ScopeSpan
656    fn get_scope_info(
657        scope_span: &opentelemetry_proto::tonic::trace::v1::ScopeSpans,
658    ) -> (String, Option<String>) {
659        let scope_name = scope_span
660            .scope
661            .as_ref()
662            .map(|s| s.name.clone())
663            .filter(|n| !n.is_empty())
664            .unwrap_or_else(|| "unknown".to_string());
665
666        let scope_version = scope_span.scope.as_ref().and_then(|s| {
667            if s.version.is_empty() {
668                None
669            } else {
670                Some(s.version.clone())
671            }
672        });
673
674        (scope_name, scope_version)
675    }
676
677    /// Safely convert OpenTelemetry timestamps to DateTime<Utc> and calculate duration
678    ///
679    /// # Arguments
680    /// * `start_time` - Start timestamp in nanoseconds since Unix epoch
681    /// * `end_time` - End timestamp in nanoseconds since Unix epoch
682    ///
683    /// # Returns
684    /// Tuple of (start_time, end_time, duration_ms) with proper error handling
685    fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
686        // Safe timestamp conversion with bounds checking
687        let start_dt = Self::safe_timestamp_conversion(start_time);
688        let end_dt = Self::safe_timestamp_conversion(end_time);
689
690        // Calculate duration with overflow protection
691        let duration_ms = if end_time >= start_time {
692            let duration_nanos = end_time.saturating_sub(start_time);
693            (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
694        } else {
695            tracing::warn!(
696                start_time = start_time,
697                end_time = end_time,
698                "Invalid timestamp order detected in trace span"
699            );
700            0
701        };
702
703        (start_dt, end_dt, duration_ms)
704    }
705
706    /// Safely convert u64 nanosecond timestamp to DateTime<Utc>
707    fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
708        if timestamp_nanos <= i64::MAX as u64 {
709            DateTime::from_timestamp_nanos(timestamp_nanos as i64)
710        } else {
711            let seconds = timestamp_nanos / 1_000_000_000;
712            let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
713
714            DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
715                tracing::warn!(
716                    timestamp = timestamp_nanos,
717                    seconds = seconds,
718                    nanoseconds = nanoseconds,
719                    "Failed to convert large timestamp, falling back to current time"
720                );
721                Utc::now()
722            })
723        }
724    }
725
726    /// Safely convert span kind i32 to string with proper error handling
727    fn span_kind_to_string(kind: i32) -> String {
728        SpanKind::try_from(kind)
729            .map(|sk| {
730                sk.as_str_name()
731                    .strip_prefix("SPAN_KIND_")
732                    .unwrap_or(sk.as_str_name())
733            })
734            .unwrap_or("UNSPECIFIED")
735            .to_string()
736    }
737
738    fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
739        let mut input = Value::Null;
740        let mut output = Value::Null;
741
742        for attr in attributes {
743            if attr.key == SCOUTER_TRACING_INPUT {
744                if let Value::String(s) = &attr.value {
745                    input = serde_json::from_str(s).unwrap_or_else(|e| {
746                        tracing::warn!(
747                            key = SCOUTER_TRACING_INPUT,
748                            error = %e,
749                            value = s,
750                            "Failed to parse input attribute as JSON, falling back to string value."
751                        );
752                        Value::String(s.clone()) // Or Value::Null
753                    });
754                }
755            } else if attr.key == SCOUTER_TRACING_OUTPUT {
756                if let Value::String(s) = &attr.value {
757                    output = serde_json::from_str(s)
758                        .unwrap_or_else(|e| {
759                            tracing::warn!(
760                                key = SCOUTER_TRACING_OUTPUT,
761                                error = %e,
762                                value = s,
763                                "Failed to parse output attribute as JSON, falling back to string value."
764                            );
765                            Value::String(s.clone()) // Or Value::Null
766                        });
767                }
768            }
769        }
770        (input, output)
771    }
772
773    fn get_service_name_from_resource(
774        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
775        default: &str,
776    ) -> String {
777        resource
778            .as_ref()
779            .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
780            .and_then(|attr| {
781                attr.value.as_ref().and_then(|v| {
782                    if let Some(
783                        opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
784                    ) = &v.value
785                    {
786                        Some(s.clone())
787                    } else {
788                        None
789                    }
790                })
791            })
792            .unwrap_or_else(|| {
793                tracing::warn!(
794                    "Service name not found in resource attributes, falling back to default: {}",
795                    default
796                );
797                default.to_string()
798            })
799    }
800
801    /// Filter and extract trace start time attribute from span attributes
802    /// This is a global scouter attribute that indicates the trace start time and is set across all spans
803    pub fn get_trace_start_time_attribute(
804        attributes: &Vec<Attribute>,
805        start_time: &DateTime<Utc>,
806    ) -> DateTime<Utc> {
807        for attr in attributes {
808            if attr.key == TRACE_START_TIME_KEY {
809                if let Value::String(s) = &attr.value {
810                    if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
811                        return dt;
812                    }
813                }
814            }
815        }
816
817        tracing::warn!(
818            "Trace start time attribute not found or invalid, falling back to span start_time"
819        );
820        *start_time
821    }
822
823    pub fn convert_to_baggage_records(
824        trace_id: &TraceId,
825        attributes: &Vec<Attribute>,
826        scope_name: &str,
827    ) -> Vec<TraceBaggageRecord> {
828        let baggage_kvs: Vec<(String, String)> = attributes
829            .iter()
830            .filter_map(|attr| {
831                // Only process attributes with baggage prefix
832                if attr.key.starts_with(BAGGAGE_PREFIX) {
833                    let clean_key = attr
834                        .key
835                        .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
836                        .map(|stripped| stripped.trim())
837                        .unwrap_or(&attr.key)
838                        .to_string();
839
840                    // Handle different value types from OpenTelemetry KeyValue
841                    let value_string = match &attr.value {
842                        Value::String(s) => s.clone(),
843                        Value::Number(n) => n.to_string(),
844                        Value::Bool(b) => b.to_string(),
845                        Value::Null => "null".to_string(),
846                        Value::Array(_) | Value::Object(_) => {
847                            // For complex types, use compact JSON representation
848                            serde_json::to_string(&attr.value)
849                                .unwrap_or_else(|_| format!("{:?}", attr.value))
850                        }
851                    };
852
853                    Some((clean_key, value_string))
854                } else {
855                    None
856                }
857            })
858            .collect();
859
860        baggage_kvs
861            .into_iter()
862            .map(|(key, value)| TraceBaggageRecord {
863                created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
864                trace_id: trace_id.clone(),
865                scope: scope_name.to_string(),
866                key,
867                value,
868            })
869            .collect()
870    }
871
872    pub fn to_records(self) -> Result<TraceRecords, RecordError> {
873        let resource_spans = self.request.resource_spans;
874
875        // Pre-calculate capacity to avoid reallocations
876        let estimated_capacity: usize = resource_spans
877            .iter()
878            .map(|rs| {
879                rs.scope_spans
880                    .iter()
881                    .map(|ss| ss.spans.len())
882                    .sum::<usize>()
883            })
884            .sum();
885
886        let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
887        let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
888        let mut tags: HashSet<TagRecord> = HashSet::new();
889
890        for resource_span in resource_spans {
891            // process metadata only once per resource span
892            let service_name =
893                Self::get_service_name_from_resource(&resource_span.resource, "unknown");
894            let resource_attributes = Attribute::from_resources(&resource_span.resource);
895
896            for scope_span in &resource_span.scope_spans {
897                let (scope_name, scope_version) = Self::get_scope_info(scope_span);
898
899                for span in &scope_span.spans {
900                    // Core identifiers
901                    let trace_id = TraceId::from_slice(span.trace_id.as_slice())?;
902                    let span_id = SpanId::from_slice(span.span_id.as_slice())?;
903                    let parent_span_id = if !span.parent_span_id.is_empty() {
904                        Some(SpanId::from_slice(span.parent_span_id.as_slice())?)
905                    } else {
906                        None
907                    };
908
909                    let (start_time, end_time, duration_ms) =
910                        Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
911
912                    let (cleaned_attributes, span_baggage, span_tags) = Self::process_attributes(
913                        &trace_id,
914                        &span.attributes,
915                        &scope_name,
916                        start_time,
917                    )?;
918
919                    // Add to collections
920                    baggage_records.extend(span_baggage);
921                    tags.extend(span_tags);
922
923                    let (input, output) = Self::extract_input_output(&cleaned_attributes);
924
925                    // SpanRecord for insert
926                    span_records.push(TraceSpanRecord {
927                        created_at: start_time,
928                        trace_id,
929                        span_id,
930                        parent_span_id,
931                        flags: span.flags as i32,
932                        trace_state: span.trace_state.clone(),
933                        scope_name: scope_name.clone(),
934                        scope_version: scope_version.clone(),
935                        span_name: span.name.clone(),
936                        span_kind: Self::span_kind_to_string(span.kind),
937                        start_time,
938                        end_time,
939                        duration_ms,
940                        status_code: span.status.as_ref().map(|s| s.code).unwrap_or(0),
941                        status_message: span
942                            .status
943                            .as_ref()
944                            .map(|s| s.message.clone())
945                            .unwrap_or_default(),
946                        attributes: cleaned_attributes,
947                        events: Self::events_to_json_array(&span.events)?,
948                        links: Self::links_to_json_array(&span.links)?,
949                        label: None,
950                        input,
951                        output,
952                        service_name: service_name.clone(),
953                        resource_attributes: resource_attributes.clone(),
954                    });
955                }
956            }
957        }
958
959        let tag_records: Vec<TagRecord> = tags.into_iter().collect();
960        Ok((span_records, baggage_records, tag_records))
961    }
962}
963
964#[derive(Clone, Debug, Serialize, Deserialize, Default)]
965#[pyclass]
966pub struct Attribute {
967    #[pyo3(get)]
968    pub key: String,
969    pub value: Value,
970}
971
972#[pymethods]
973impl Attribute {
974    #[getter]
975    pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
976        Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
977    }
978
979    pub fn __str__(&self) -> String {
980        PyHelperFuncs::__str__(self)
981    }
982}
983
984impl Attribute {
985    pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
986        Attribute {
987            key,
988            value: otel_value_to_serde_value(value),
989        }
990    }
991
992    fn from_resources(
993        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
994    ) -> Vec<Attribute> {
995        match resource {
996            Some(res) => res
997                .attributes
998                .iter()
999                .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
1000                .collect(),
1001            None => vec![],
1002        }
1003    }
1004}
1005
1006#[derive(Clone, Debug, Serialize, Deserialize)]
1007#[pyclass]
1008pub struct SpanEvent {
1009    #[pyo3(get)]
1010    pub timestamp: chrono::DateTime<Utc>,
1011    #[pyo3(get)]
1012    pub name: String,
1013    #[pyo3(get)]
1014    pub attributes: Vec<Attribute>,
1015    #[pyo3(get)]
1016    pub dropped_attributes_count: u32,
1017}
1018
1019#[pymethods]
1020impl SpanEvent {
1021    pub fn __str__(&self) -> String {
1022        PyHelperFuncs::__str__(self)
1023    }
1024}
1025
1026#[derive(Clone, Debug, Serialize, Deserialize)]
1027#[pyclass]
1028pub struct SpanLink {
1029    #[pyo3(get)]
1030    pub trace_id: String,
1031    #[pyo3(get)]
1032    pub span_id: String,
1033    #[pyo3(get)]
1034    pub trace_state: String,
1035    #[pyo3(get)]
1036    pub attributes: Vec<Attribute>,
1037    #[pyo3(get)]
1038    pub dropped_attributes_count: u32,
1039}
1040
1041#[pymethods]
1042impl SpanLink {
1043    pub fn __str__(&self) -> String {
1044        PyHelperFuncs::__str__(self)
1045    }
1046}
1047
1048#[derive(Clone, Debug, Serialize, Deserialize)]
1049#[pyclass]
1050pub struct Tag {
1051    #[pyo3(get)]
1052    pub key: String,
1053    #[pyo3(get)]
1054    pub value: String,
1055}
1056
1057#[pymethods]
1058impl Tag {
1059    pub fn __str__(&self) -> String {
1060        PyHelperFuncs::__str__(self)
1061    }
1062}
1063
1064#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
1065#[pyclass]
1066#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
1067pub struct TagRecord {
1068    #[pyo3(get)]
1069    pub entity_type: String,
1070    #[pyo3(get)]
1071    pub entity_id: String,
1072    #[pyo3(get)]
1073    pub key: String,
1074    #[pyo3(get)]
1075    pub value: String,
1076}
1077
1078impl TagRecord {
1079    /// Create a tag record from a TraceId
1080    pub fn from_trace(trace_id: &TraceId, key: String, value: String) -> Self {
1081        Self {
1082            entity_type: "trace".to_string(),
1083            entity_id: trace_id.to_hex(),
1084            key,
1085            value,
1086        }
1087    }
1088}
1089
1090#[pymethods]
1091impl TagRecord {
1092    pub fn __str__(&self) -> String {
1093        PyHelperFuncs::__str__(self)
1094    }
1095}