Skip to main content

scouter_types/trace/
mod.rs

1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use opentelemetry_proto::tonic::trace::v1::Span;
16use pyo3::prelude::*;
17use pyo3::types::PyDict;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::cmp::{max, min};
21use std::collections::HashMap;
22use std::collections::HashSet;
23
24pub const FUNCTION_TYPE: &str = "function.type";
25pub const FUNCTION_STREAMING: &str = "function.streaming";
26pub const FUNCTION_NAME: &str = "function.name";
27pub const FUNCTION_MODULE: &str = "function.module";
28pub const FUNCTION_QUALNAME: &str = "function.qualname";
29pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
30pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
31pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
32pub const SERVICE_NAME: &str = "service.name";
33pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
34pub const BAGGAGE_PREFIX: &str = "baggage";
35pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
36pub const SCOUTER_SCOPE: &str = "scouter.scope";
37pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
38pub const SPAN_ERROR: &str = "span.error";
39pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
40pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
41pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
42
43// patterns for identifying baggage and tags
44pub const BAGGAGE_PATTERN: &str = "baggage.";
45pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
46pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
47
48type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
49
50#[derive(Clone, Debug, Serialize, Deserialize, Default)]
51#[pyclass]
52pub struct TraceRecord {
53    #[pyo3(get)]
54    pub created_at: DateTime<Utc>,
55    #[pyo3(get)]
56    pub trace_id: String,
57    #[pyo3(get)]
58    pub service_name: String,
59    #[pyo3(get)]
60    pub scope: String,
61    #[pyo3(get)]
62    pub trace_state: String,
63    #[pyo3(get)]
64    pub start_time: chrono::DateTime<Utc>,
65    #[pyo3(get)]
66    pub end_time: chrono::DateTime<Utc>,
67    #[pyo3(get)]
68    pub duration_ms: i64,
69    #[pyo3(get)]
70    pub status_code: i32,
71    #[pyo3(get)]
72    pub status_message: String,
73    #[pyo3(get)]
74    pub root_span_id: String,
75    #[pyo3(get)]
76    pub span_count: i32,
77    #[pyo3(get)]
78    pub tags: Vec<Tag>,
79    #[pyo3(get)]
80    pub process_attributes: Vec<Attribute>,
81}
82
83#[pymethods]
84impl TraceRecord {
85    pub fn __str__(&self) -> String {
86        PyHelperFuncs::__str__(self)
87    }
88}
89
90impl TraceRecord {
91    /// Merges data from another TraceRecord belonging to the same trace.
92    /// This is crucial for updating a trace record as more spans arrive.
93    pub fn merge(&mut self, other: &TraceRecord) {
94        // 1. Update the overall trace time bounds
95        self.start_time = min(self.start_time, other.start_time);
96        self.end_time = max(self.end_time, other.end_time);
97
98        // 2. Recalculate duration based on new time bounds
99        if self.end_time > self.start_time {
100            self.duration_ms = (self.end_time - self.start_time).num_milliseconds();
101        } else {
102            // Handle edge case where end_time may not be set yet (duration = 0)
103            self.duration_ms = 0;
104        }
105
106        if self.status_code != 2 && other.status_code == 2 {
107            self.status_code = 2;
108        }
109
110        self.span_count += other.span_count;
111
112        let mut existing_tag_keys: std::collections::HashSet<String> =
113            self.tags.iter().map(|t| t.key.clone()).collect();
114
115        for tag in &other.tags {
116            if !existing_tag_keys.contains(&tag.key) {
117                self.tags.push(tag.clone());
118                existing_tag_keys.insert(tag.key.clone());
119            }
120        }
121
122        // 3. Merge process attributes, avoiding duplicates
123        let mut existing_attr_keys: std::collections::HashSet<String> = self
124            .process_attributes
125            .iter()
126            .map(|a| a.key.clone())
127            .collect();
128
129        for attr in &other.process_attributes {
130            if !existing_attr_keys.contains(&attr.key) {
131                self.process_attributes.push(attr.clone());
132                existing_attr_keys.insert(attr.key.clone());
133            }
134        }
135    }
136}
137
138#[derive(Hash, Eq, PartialEq, Clone)]
139struct TraceKey {
140    created_at: chrono::DateTime<chrono::Utc>, // Or whatever your created_at type is
141    trace_id: String,
142    scope: String,
143}
144
145pub fn deduplicate_and_merge_traces(raw_traces: Vec<TraceRecord>) -> Vec<TraceRecord> {
146    let mut merged_traces: HashMap<TraceKey, TraceRecord> = HashMap::new();
147
148    for trace in raw_traces {
149        let key = TraceKey {
150            created_at: trace.created_at,
151            trace_id: trace.trace_id.clone(),
152            scope: trace.scope.clone(),
153        };
154
155        merged_traces
156            .entry(key)
157            .and_modify(|existing_trace| {
158                existing_trace.merge(&trace);
159            })
160            .or_insert(trace);
161    }
162
163    merged_traces.into_values().collect()
164}
165
166#[derive(Clone, Debug, Serialize, Deserialize, Default)]
167#[pyclass]
168pub struct TraceSpanRecord {
169    #[pyo3(get)]
170    pub created_at: chrono::DateTime<Utc>,
171    #[pyo3(get)]
172    pub span_id: String,
173    #[pyo3(get)]
174    pub trace_id: String,
175    #[pyo3(get)]
176    pub parent_span_id: Option<String>,
177    #[pyo3(get)]
178    pub scope: String,
179    #[pyo3(get)]
180    pub span_name: String,
181    #[pyo3(get)]
182    pub span_kind: String,
183    #[pyo3(get)]
184    pub start_time: chrono::DateTime<Utc>,
185    #[pyo3(get)]
186    pub end_time: chrono::DateTime<Utc>,
187    #[pyo3(get)]
188    pub duration_ms: i64,
189    #[pyo3(get)]
190    pub status_code: i32,
191    #[pyo3(get)]
192    pub status_message: String,
193    #[pyo3(get)]
194    pub attributes: Vec<Attribute>,
195    #[pyo3(get)]
196    pub events: Vec<SpanEvent>,
197    #[pyo3(get)]
198    pub links: Vec<SpanLink>,
199    #[pyo3(get)]
200    pub label: Option<String>,
201    pub input: Value,
202    pub output: Value,
203    #[pyo3(get)]
204    pub service_name: String,
205    #[pyo3(get)]
206    pub resource_attributes: Vec<Attribute>,
207}
208
209#[pymethods]
210impl TraceSpanRecord {
211    #[getter]
212    pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
213        let dict = PyDict::new(py);
214        match &self.input {
215            Value::Null => {}
216            _ => {
217                json_to_pyobject(py, &self.input, &dict)?;
218            }
219        }
220        Ok(dict)
221    }
222
223    #[getter]
224    pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
225        let dict = PyDict::new(py);
226        match &self.output {
227            Value::Null => {}
228            _ => {
229                json_to_pyobject(py, &self.output, &dict)?;
230            }
231        }
232        Ok(dict)
233    }
234
235    pub fn __str__(&self) -> String {
236        // serialize the struct to a string
237        PyHelperFuncs::__str__(self)
238    }
239}
240
241#[derive(Clone, Debug, Serialize, Deserialize, Default)]
242#[pyclass]
243#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
244pub struct TraceBaggageRecord {
245    #[pyo3(get)]
246    pub created_at: DateTime<Utc>,
247    #[pyo3(get)]
248    pub trace_id: String,
249    #[pyo3(get)]
250    pub scope: String,
251    #[pyo3(get)]
252    pub key: String,
253    #[pyo3(get)]
254    pub value: String,
255}
256
257#[pymethods]
258impl TraceBaggageRecord {
259    pub fn __str__(&self) -> String {
260        PyHelperFuncs::__str__(self)
261    }
262}
263
264pub type TraceRecords = (
265    Vec<TraceSpanRecord>,
266    Vec<TraceBaggageRecord>,
267    Vec<TagRecord>,
268);
269
270pub trait TraceRecordExt {
271    fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
272        Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
273    }
274
275    fn process_attributes(
276        trace_id: &str,
277        span_attributes: &[KeyValue],
278        scope: &str,
279        created_at: DateTime<Utc>,
280    ) -> Result<SpanAttributes, RecordError> {
281        let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
282        let mut baggage_records = Vec::new();
283        let mut tags = Vec::new();
284
285        let trace_id_owned = trace_id.to_string();
286        let scope_owned = scope.to_string();
287        let entity_type = "trace".to_string();
288
289        for kv in span_attributes {
290            let key = &kv.key;
291
292            // Check if this is a baggage-prefixed tag
293            if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
294                if !tag_key.is_empty() {
295                    let value = match &kv.value {
296                        Some(v) => Self::otel_value_to_string(v),
297                        None => "null".to_string(),
298                    };
299
300                    // Extract as a tag
301                    tags.push(TagRecord {
302                        entity_type: entity_type.clone(),
303                        entity_id: trace_id_owned.clone(),
304                        key: tag_key.to_string(),
305                        value: value.clone(),
306                    });
307
308                    // Store cleaned attribute with stripped key
309                    cleaned_attributes.push(Attribute {
310                        key: tag_key.to_string(),
311                        value: Value::String(value.clone()),
312                    });
313
314                    // Also extract as baggage since it has baggage prefix
315                    baggage_records.push(TraceBaggageRecord {
316                        created_at,
317                        trace_id: trace_id_owned.clone(),
318                        scope: scope_owned.clone(),
319                        key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
320                        value,
321                    });
322                } else {
323                    tracing::warn!(
324                        attribute_key = %key,
325                        "Skipping baggage tag with empty key after prefix removal"
326                    );
327                }
328            }
329            // Check for non-baggage tags
330            else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
331                if !tag_key.is_empty() {
332                    let value = match &kv.value {
333                        Some(v) => Self::otel_value_to_string(v),
334                        None => "null".to_string(),
335                    };
336
337                    tags.push(TagRecord {
338                        entity_type: entity_type.clone(),
339                        entity_id: trace_id_owned.clone(),
340                        key: tag_key.to_string(),
341                        value: value.clone(),
342                    });
343
344                    cleaned_attributes.push(Attribute {
345                        key: tag_key.to_string(),
346                        value: Value::String(value),
347                    });
348                } else {
349                    tracing::warn!(
350                        attribute_key = %key,
351                        "Skipping tag with empty key after prefix removal"
352                    );
353                }
354            }
355            // Check for regular baggage (not tags)
356            else if key.starts_with(BAGGAGE_PATTERN) {
357                let clean_key = key
358                    .strip_prefix(BAGGAGE_PATTERN)
359                    .unwrap_or(key)
360                    .trim()
361                    .to_string();
362
363                let value_string = match &kv.value {
364                    Some(v) => Self::otel_value_to_string(v),
365                    None => "null".to_string(),
366                };
367
368                baggage_records.push(TraceBaggageRecord {
369                    created_at,
370                    trace_id: trace_id_owned.clone(),
371                    scope: scope_owned.clone(),
372                    key: clean_key,
373                    value: value_string,
374                });
375            }
376            // Regular attribute
377            else {
378                let value = match &kv.value {
379                    Some(v) => otel_value_to_serde_value(v),
380                    None => Value::Null,
381                };
382
383                cleaned_attributes.push(Attribute {
384                    key: key.clone(),
385                    value,
386                });
387            }
388        }
389
390        Ok((cleaned_attributes, baggage_records, tags))
391    }
392
393    fn otel_value_to_string(value: &AnyValue) -> String {
394        match &value.value {
395            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
396                s.clone()
397            }
398            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
399                i.to_string()
400            }
401            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
402                d.to_string()
403            }
404            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
405                b.to_string()
406            }
407            Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
408            | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
409                let serde_val = otel_value_to_serde_value(value);
410                serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
411            }
412            _ => "null".to_string(),
413        }
414    }
415
416    fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
417        attributes
418            .iter()
419            .map(|kv| {
420                let value = match &kv.value {
421                    Some(v) => otel_value_to_serde_value(v),
422                    None => Value::Null,
423                };
424
425                Ok(Attribute {
426                    key: kv.key.clone(),
427                    value,
428                })
429            })
430            .collect()
431    }
432
433    fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
434        attributes
435            .iter()
436            .map(|kv| {
437                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
438                Ok(SpanEvent {
439                    name: kv.name.clone(),
440                    timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
441                    attributes,
442                    dropped_attributes_count: kv.dropped_attributes_count,
443                })
444            })
445            .collect()
446    }
447
448    fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
449        attributes
450            .iter()
451            .map(|kv| {
452                let attributes = Self::attributes_to_json_array(&kv.attributes)?;
453                Ok(SpanLink {
454                    trace_id: hex::encode(&kv.trace_id),
455                    span_id: hex::encode(&kv.span_id),
456                    trace_state: kv.trace_state.clone(),
457                    attributes,
458                    dropped_attributes_count: kv.dropped_attributes_count,
459                })
460            })
461            .collect()
462    }
463}
464
465#[derive(Clone, Debug, Serialize, Deserialize, Default)]
466pub struct TraceServerRecord {
467    pub request: ExportTraceServiceRequest,
468}
469
470impl TraceRecordExt for TraceServerRecord {}
471
472impl TraceServerRecord {
473    /// Safely convert OpenTelemetry timestamps to DateTime<Utc> and calculate duration
474    ///
475    /// # Arguments
476    /// * `start_time` - Start timestamp in nanoseconds since Unix epoch
477    /// * `end_time` - End timestamp in nanoseconds since Unix epoch
478    ///
479    /// # Returns
480    /// Tuple of (start_time, end_time, duration_ms) with proper error handling
481    fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
482        // Safe timestamp conversion with bounds checking
483        let start_dt = Self::safe_timestamp_conversion(start_time);
484        let end_dt = Self::safe_timestamp_conversion(end_time);
485
486        // Calculate duration with overflow protection
487        let duration_ms = if end_time >= start_time {
488            let duration_nanos = end_time.saturating_sub(start_time);
489            (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
490        } else {
491            tracing::warn!(
492                start_time = start_time,
493                end_time = end_time,
494                "Invalid timestamp order detected in trace span"
495            );
496            0
497        };
498
499        (start_dt, end_dt, duration_ms)
500    }
501
502    /// Safely convert u64 nanosecond timestamp to DateTime<Utc>
503    fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
504        if timestamp_nanos <= i64::MAX as u64 {
505            DateTime::from_timestamp_nanos(timestamp_nanos as i64)
506        } else {
507            let seconds = timestamp_nanos / 1_000_000_000;
508            let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
509
510            DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
511                tracing::warn!(
512                    timestamp = timestamp_nanos,
513                    seconds = seconds,
514                    nanoseconds = nanoseconds,
515                    "Failed to convert large timestamp, falling back to current time"
516                );
517                Utc::now()
518            })
519        }
520    }
521
522    /// Safely convert span kind i32 to string with proper error handling
523    fn span_kind_to_string(kind: i32) -> String {
524        SpanKind::try_from(kind)
525            .map(|sk| {
526                sk.as_str_name()
527                    .strip_prefix("SPAN_KIND_")
528                    .unwrap_or(sk.as_str_name())
529            })
530            .unwrap_or("UNSPECIFIED")
531            .to_string()
532    }
533
534    fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
535        let mut input = Value::Null;
536        let mut output = Value::Null;
537
538        for attr in attributes {
539            if attr.key == SCOUTER_TRACING_INPUT {
540                if let Value::String(s) = &attr.value {
541                    input = serde_json::from_str(s).unwrap_or_else(|e| {
542                        tracing::warn!(
543                            key = SCOUTER_TRACING_INPUT,
544                            error = %e,
545                            value = s,
546                            "Failed to parse input attribute as JSON, falling back to string value."
547                        );
548                        Value::String(s.clone()) // Or Value::Null
549                    });
550                }
551            } else if attr.key == SCOUTER_TRACING_OUTPUT {
552                if let Value::String(s) = &attr.value {
553                    output = serde_json::from_str(s)
554                        .unwrap_or_else(|e| {
555                            tracing::warn!(
556                                key = SCOUTER_TRACING_OUTPUT,
557                                error = %e,
558                                value = s,
559                                "Failed to parse output attribute as JSON, falling back to string value."
560                            );
561                            Value::String(s.clone()) // Or Value::Null
562                        });
563                }
564            }
565        }
566        (input, output)
567    }
568
569    fn get_scope_from_resource(
570        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
571        default: &str,
572    ) -> String {
573        resource
574            .as_ref()
575            .and_then(|r| r.attributes.iter().find(|attr| attr.key == SCOUTER_SCOPE))
576            .and_then(|attr| {
577                attr.value.as_ref().and_then(|v| {
578                    if let Some(
579                        opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
580                    ) = &v.value
581                    {
582                        Some(s.clone())
583                    } else {
584                        None
585                    }
586                })
587            })
588            .unwrap_or_else(|| default.to_string())
589    }
590
591    fn get_service_name_from_resource(
592        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
593        default: &str,
594    ) -> String {
595        resource
596            .as_ref()
597            .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
598            .and_then(|attr| {
599                attr.value.as_ref().and_then(|v| {
600                    if let Some(
601                        opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
602                    ) = &v.value
603                    {
604                        Some(s.clone())
605                    } else {
606                        None
607                    }
608                })
609            })
610            .unwrap_or_else(|| {
611                tracing::warn!(
612                    "Service name not found in resource attributes, falling back to default: {}",
613                    default
614                );
615                default.to_string()
616            })
617    }
618
619    /// Filter and extract trace start time attribute from span attributes
620    /// This is a global scouter attribute that indicates the trace start time and is set across all spans
621    pub fn get_trace_start_time_attribute(
622        attributes: &Vec<Attribute>,
623        start_time: &DateTime<Utc>,
624    ) -> DateTime<Utc> {
625        for attr in attributes {
626            if attr.key == TRACE_START_TIME_KEY {
627                if let Value::String(s) = &attr.value {
628                    if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
629                        return dt;
630                    }
631                }
632            }
633        }
634
635        tracing::warn!(
636            "Trace start time attribute not found or invalid, falling back to span start_time"
637        );
638        *start_time
639    }
640
641    pub fn convert_to_baggage_records(
642        trace_id: &str,
643        attributes: &Vec<Attribute>,
644        scope_name: &str,
645    ) -> Vec<TraceBaggageRecord> {
646        let baggage_kvs: Vec<(String, String)> = attributes
647            .iter()
648            .filter_map(|attr| {
649                // Only process attributes with baggage prefix
650                if attr.key.starts_with(BAGGAGE_PREFIX) {
651                    let clean_key = attr
652                        .key
653                        .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
654                        .map(|stripped| stripped.trim())
655                        .unwrap_or(&attr.key)
656                        .to_string();
657
658                    // Handle different value types from OpenTelemetry KeyValue
659                    let value_string = match &attr.value {
660                        Value::String(s) => s.clone(),
661                        Value::Number(n) => n.to_string(),
662                        Value::Bool(b) => b.to_string(),
663                        Value::Null => "null".to_string(),
664                        Value::Array(_) | Value::Object(_) => {
665                            // For complex types, use compact JSON representation
666                            serde_json::to_string(&attr.value)
667                                .unwrap_or_else(|_| format!("{:?}", attr.value))
668                        }
669                    };
670
671                    Some((clean_key, value_string))
672                } else {
673                    None
674                }
675            })
676            .collect();
677
678        baggage_kvs
679            .into_iter()
680            .map(|(key, value)| TraceBaggageRecord {
681                created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
682                trace_id: trace_id.to_string(),
683                scope: scope_name.to_string(),
684                key,
685                value,
686            })
687            .collect()
688    }
689
690    /// Convert to TraceRecord
691    #[allow(clippy::too_many_arguments)]
692    pub fn convert_to_span_record(
693        trace_id: &str,
694        span_id: &str,
695        span: &Span,
696        attributes: &Vec<Attribute>,
697        scope_name: &str,
698        start_time: DateTime<Utc>,
699        end_time: DateTime<Utc>,
700        duration_ms: i64,
701        service_name: String,
702        resource_attributes: &[Attribute],
703    ) -> Result<TraceSpanRecord, RecordError> {
704        // get parent span id (can be empty)
705        let parent_span_id = if !span.parent_span_id.is_empty() {
706            Some(hex::encode(&span.parent_span_id))
707        } else {
708            None
709        };
710
711        let (input, output) = Self::extract_input_output(attributes);
712
713        Ok(TraceSpanRecord {
714            created_at: start_time,
715            trace_id: trace_id.to_string(),
716            span_id: span_id.to_string(),
717            parent_span_id,
718            start_time,
719            end_time,
720            duration_ms,
721            service_name,
722            scope: scope_name.to_string(),
723            span_name: span.name.clone(),
724            span_kind: Self::span_kind_to_string(span.kind),
725            status_code: span.status.as_ref().map(|s| s.code).unwrap_or_else(|| 0),
726            status_message: span
727                .status
728                .as_ref()
729                .map(|s| s.message.clone())
730                .unwrap_or_default(),
731            attributes: attributes.to_owned(),
732            events: Self::events_to_json_array(&span.events)?,
733            links: Self::links_to_json_array(&span.links)?,
734            label: None,
735            input,
736            output,
737            resource_attributes: resource_attributes.to_owned(),
738        })
739    }
740
741    pub fn to_records(self) -> Result<TraceRecords, RecordError> {
742        let resource_spans = self.request.resource_spans;
743
744        // Pre-calculate capacity to avoid reallocations
745        let estimated_capacity: usize = resource_spans
746            .iter()
747            .map(|rs| {
748                rs.scope_spans
749                    .iter()
750                    .map(|ss| ss.spans.len())
751                    .sum::<usize>()
752            })
753            .sum();
754
755        let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
756        let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
757        let mut tags: HashSet<TagRecord> = HashSet::new();
758
759        for resource_span in resource_spans {
760            // process metadata only once per resource span
761            let service_name =
762                Self::get_service_name_from_resource(&resource_span.resource, "unknown");
763            let scope = Self::get_scope_from_resource(&resource_span.resource, "unknown");
764            let resource_attributes = Attribute::from_resources(&resource_span.resource);
765
766            for scope_span in &resource_span.scope_spans {
767                for span in &scope_span.spans {
768                    // base attributes
769                    let trace_id = hex::encode(&span.trace_id);
770                    let span_id = hex::encode(&span.span_id);
771                    let service_name = service_name.clone();
772
773                    let (start_time, end_time, duration_ms) =
774                        Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
775
776                    let (cleaned_attributes, span_baggage, span_tags) =
777                        Self::process_attributes(&trace_id, &span.attributes, &scope, start_time)?;
778
779                    // Add to collections
780                    baggage_records.extend(span_baggage);
781                    tags.extend(span_tags);
782
783                    // SpanRecord for insert
784                    span_records.push(Self::convert_to_span_record(
785                        &trace_id,
786                        &span_id,
787                        span,
788                        &cleaned_attributes,
789                        &scope,
790                        start_time,
791                        end_time,
792                        duration_ms,
793                        service_name,
794                        &resource_attributes,
795                    )?);
796                }
797            }
798        }
799
800        let tag_records: Vec<TagRecord> = tags.into_iter().collect();
801        Ok((span_records, baggage_records, tag_records))
802    }
803}
804
805#[derive(Clone, Debug, Serialize, Deserialize, Default)]
806#[pyclass]
807pub struct Attribute {
808    #[pyo3(get)]
809    pub key: String,
810    pub value: Value,
811}
812
813#[pymethods]
814impl Attribute {
815    #[getter]
816    pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
817        Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
818    }
819
820    pub fn __str__(&self) -> String {
821        PyHelperFuncs::__str__(self)
822    }
823}
824
825impl Attribute {
826    pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
827        Attribute {
828            key,
829            value: otel_value_to_serde_value(value),
830        }
831    }
832
833    fn from_resources(
834        resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
835    ) -> Vec<Attribute> {
836        match resource {
837            Some(res) => res
838                .attributes
839                .iter()
840                .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
841                .collect(),
842            None => vec![],
843        }
844    }
845}
846
847#[derive(Clone, Debug, Serialize, Deserialize)]
848#[pyclass]
849pub struct SpanEvent {
850    #[pyo3(get)]
851    pub timestamp: chrono::DateTime<Utc>,
852    #[pyo3(get)]
853    pub name: String,
854    #[pyo3(get)]
855    pub attributes: Vec<Attribute>,
856    #[pyo3(get)]
857    pub dropped_attributes_count: u32,
858}
859
860#[pymethods]
861impl SpanEvent {
862    pub fn __str__(&self) -> String {
863        PyHelperFuncs::__str__(self)
864    }
865}
866
867#[derive(Clone, Debug, Serialize, Deserialize)]
868#[pyclass]
869pub struct SpanLink {
870    #[pyo3(get)]
871    pub trace_id: String,
872    #[pyo3(get)]
873    pub span_id: String,
874    #[pyo3(get)]
875    pub trace_state: String,
876    #[pyo3(get)]
877    pub attributes: Vec<Attribute>,
878    #[pyo3(get)]
879    pub dropped_attributes_count: u32,
880}
881
882#[pymethods]
883impl SpanLink {
884    pub fn __str__(&self) -> String {
885        PyHelperFuncs::__str__(self)
886    }
887}
888
889#[derive(Clone, Debug, Serialize, Deserialize)]
890#[pyclass]
891pub struct Tag {
892    #[pyo3(get)]
893    pub key: String,
894    #[pyo3(get)]
895    pub value: String,
896}
897
898#[pymethods]
899impl Tag {
900    pub fn __str__(&self) -> String {
901        PyHelperFuncs::__str__(self)
902    }
903}
904
905#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
906#[pyclass]
907#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
908pub struct TagRecord {
909    #[pyo3(get)]
910    pub entity_type: String,
911    #[pyo3(get)]
912    pub entity_id: String,
913    #[pyo3(get)]
914    pub key: String,
915    #[pyo3(get)]
916    pub value: String,
917}
918
919#[pymethods]
920impl TagRecord {
921    pub fn __str__(&self) -> String {
922        PyHelperFuncs::__str__(self)
923    }
924}