1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use opentelemetry_proto::tonic::trace::v1::Span;
16use pyo3::prelude::*;
17use pyo3::types::PyDict;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::cmp::{max, min};
21use std::collections::HashMap;
22use std::collections::HashSet;
23
24pub const FUNCTION_TYPE: &str = "function.type";
25pub const FUNCTION_STREAMING: &str = "function.streaming";
26pub const FUNCTION_NAME: &str = "function.name";
27pub const FUNCTION_MODULE: &str = "function.module";
28pub const FUNCTION_QUALNAME: &str = "function.qualname";
29pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
30pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
31pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
32pub const SERVICE_NAME: &str = "service.name";
33pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
34pub const BAGGAGE_PREFIX: &str = "baggage";
35pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
36pub const SCOUTER_SCOPE: &str = "scouter.scope";
37pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
38pub const SPAN_ERROR: &str = "span.error";
39pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
40pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
41pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
42
43pub const BAGGAGE_PATTERN: &str = "baggage.";
45pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
46pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
47
48type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
49
50#[derive(Clone, Debug, Serialize, Deserialize, Default)]
51#[pyclass]
52pub struct TraceRecord {
53 #[pyo3(get)]
54 pub created_at: DateTime<Utc>,
55 #[pyo3(get)]
56 pub trace_id: String,
57 #[pyo3(get)]
58 pub service_name: String,
59 #[pyo3(get)]
60 pub scope: String,
61 #[pyo3(get)]
62 pub trace_state: String,
63 #[pyo3(get)]
64 pub start_time: chrono::DateTime<Utc>,
65 #[pyo3(get)]
66 pub end_time: chrono::DateTime<Utc>,
67 #[pyo3(get)]
68 pub duration_ms: i64,
69 #[pyo3(get)]
70 pub status_code: i32,
71 #[pyo3(get)]
72 pub status_message: String,
73 #[pyo3(get)]
74 pub root_span_id: String,
75 #[pyo3(get)]
76 pub span_count: i32,
77 #[pyo3(get)]
78 pub tags: Vec<Tag>,
79 #[pyo3(get)]
80 pub process_attributes: Vec<Attribute>,
81}
82
83#[pymethods]
84impl TraceRecord {
85 pub fn __str__(&self) -> String {
86 PyHelperFuncs::__str__(self)
87 }
88}
89
90impl TraceRecord {
91 pub fn merge(&mut self, other: &TraceRecord) {
94 self.start_time = min(self.start_time, other.start_time);
96 self.end_time = max(self.end_time, other.end_time);
97
98 if self.end_time > self.start_time {
100 self.duration_ms = (self.end_time - self.start_time).num_milliseconds();
101 } else {
102 self.duration_ms = 0;
104 }
105
106 if self.status_code != 2 && other.status_code == 2 {
107 self.status_code = 2;
108 }
109
110 self.span_count += other.span_count;
111
112 let mut existing_tag_keys: std::collections::HashSet<String> =
113 self.tags.iter().map(|t| t.key.clone()).collect();
114
115 for tag in &other.tags {
116 if !existing_tag_keys.contains(&tag.key) {
117 self.tags.push(tag.clone());
118 existing_tag_keys.insert(tag.key.clone());
119 }
120 }
121
122 let mut existing_attr_keys: std::collections::HashSet<String> = self
124 .process_attributes
125 .iter()
126 .map(|a| a.key.clone())
127 .collect();
128
129 for attr in &other.process_attributes {
130 if !existing_attr_keys.contains(&attr.key) {
131 self.process_attributes.push(attr.clone());
132 existing_attr_keys.insert(attr.key.clone());
133 }
134 }
135 }
136}
137
138#[derive(Hash, Eq, PartialEq, Clone)]
139struct TraceKey {
140 created_at: chrono::DateTime<chrono::Utc>, trace_id: String,
142 scope: String,
143}
144
145pub fn deduplicate_and_merge_traces(raw_traces: Vec<TraceRecord>) -> Vec<TraceRecord> {
146 let mut merged_traces: HashMap<TraceKey, TraceRecord> = HashMap::new();
147
148 for trace in raw_traces {
149 let key = TraceKey {
150 created_at: trace.created_at,
151 trace_id: trace.trace_id.clone(),
152 scope: trace.scope.clone(),
153 };
154
155 merged_traces
156 .entry(key)
157 .and_modify(|existing_trace| {
158 existing_trace.merge(&trace);
159 })
160 .or_insert(trace);
161 }
162
163 merged_traces.into_values().collect()
164}
165
166#[derive(Clone, Debug, Serialize, Deserialize, Default)]
167#[pyclass]
168pub struct TraceSpanRecord {
169 #[pyo3(get)]
170 pub created_at: chrono::DateTime<Utc>,
171 #[pyo3(get)]
172 pub span_id: String,
173 #[pyo3(get)]
174 pub trace_id: String,
175 #[pyo3(get)]
176 pub parent_span_id: Option<String>,
177 #[pyo3(get)]
178 pub scope: String,
179 #[pyo3(get)]
180 pub span_name: String,
181 #[pyo3(get)]
182 pub span_kind: String,
183 #[pyo3(get)]
184 pub start_time: chrono::DateTime<Utc>,
185 #[pyo3(get)]
186 pub end_time: chrono::DateTime<Utc>,
187 #[pyo3(get)]
188 pub duration_ms: i64,
189 #[pyo3(get)]
190 pub status_code: i32,
191 #[pyo3(get)]
192 pub status_message: String,
193 #[pyo3(get)]
194 pub attributes: Vec<Attribute>,
195 #[pyo3(get)]
196 pub events: Vec<SpanEvent>,
197 #[pyo3(get)]
198 pub links: Vec<SpanLink>,
199 #[pyo3(get)]
200 pub label: Option<String>,
201 pub input: Value,
202 pub output: Value,
203 #[pyo3(get)]
204 pub service_name: String,
205 #[pyo3(get)]
206 pub resource_attributes: Vec<Attribute>,
207}
208
209#[pymethods]
210impl TraceSpanRecord {
211 #[getter]
212 pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
213 let dict = PyDict::new(py);
214 match &self.input {
215 Value::Null => {}
216 _ => {
217 json_to_pyobject(py, &self.input, &dict)?;
218 }
219 }
220 Ok(dict)
221 }
222
223 #[getter]
224 pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
225 let dict = PyDict::new(py);
226 match &self.output {
227 Value::Null => {}
228 _ => {
229 json_to_pyobject(py, &self.output, &dict)?;
230 }
231 }
232 Ok(dict)
233 }
234
235 pub fn __str__(&self) -> String {
236 PyHelperFuncs::__str__(self)
238 }
239}
240
241#[derive(Clone, Debug, Serialize, Deserialize, Default)]
242#[pyclass]
243#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
244pub struct TraceBaggageRecord {
245 #[pyo3(get)]
246 pub created_at: DateTime<Utc>,
247 #[pyo3(get)]
248 pub trace_id: String,
249 #[pyo3(get)]
250 pub scope: String,
251 #[pyo3(get)]
252 pub key: String,
253 #[pyo3(get)]
254 pub value: String,
255}
256
257#[pymethods]
258impl TraceBaggageRecord {
259 pub fn __str__(&self) -> String {
260 PyHelperFuncs::__str__(self)
261 }
262}
263
264pub type TraceRecords = (
265 Vec<TraceSpanRecord>,
266 Vec<TraceBaggageRecord>,
267 Vec<TagRecord>,
268);
269
270pub trait TraceRecordExt {
271 fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
272 Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
273 }
274
275 fn process_attributes(
276 trace_id: &str,
277 span_attributes: &[KeyValue],
278 scope: &str,
279 created_at: DateTime<Utc>,
280 ) -> Result<SpanAttributes, RecordError> {
281 let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
282 let mut baggage_records = Vec::new();
283 let mut tags = Vec::new();
284
285 let trace_id_owned = trace_id.to_string();
286 let scope_owned = scope.to_string();
287 let entity_type = "trace".to_string();
288
289 for kv in span_attributes {
290 let key = &kv.key;
291
292 if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
294 if !tag_key.is_empty() {
295 let value = match &kv.value {
296 Some(v) => Self::otel_value_to_string(v),
297 None => "null".to_string(),
298 };
299
300 tags.push(TagRecord {
302 entity_type: entity_type.clone(),
303 entity_id: trace_id_owned.clone(),
304 key: tag_key.to_string(),
305 value: value.clone(),
306 });
307
308 cleaned_attributes.push(Attribute {
310 key: tag_key.to_string(),
311 value: Value::String(value.clone()),
312 });
313
314 baggage_records.push(TraceBaggageRecord {
316 created_at,
317 trace_id: trace_id_owned.clone(),
318 scope: scope_owned.clone(),
319 key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
320 value,
321 });
322 } else {
323 tracing::warn!(
324 attribute_key = %key,
325 "Skipping baggage tag with empty key after prefix removal"
326 );
327 }
328 }
329 else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
331 if !tag_key.is_empty() {
332 let value = match &kv.value {
333 Some(v) => Self::otel_value_to_string(v),
334 None => "null".to_string(),
335 };
336
337 tags.push(TagRecord {
338 entity_type: entity_type.clone(),
339 entity_id: trace_id_owned.clone(),
340 key: tag_key.to_string(),
341 value: value.clone(),
342 });
343
344 cleaned_attributes.push(Attribute {
345 key: tag_key.to_string(),
346 value: Value::String(value),
347 });
348 } else {
349 tracing::warn!(
350 attribute_key = %key,
351 "Skipping tag with empty key after prefix removal"
352 );
353 }
354 }
355 else if key.starts_with(BAGGAGE_PATTERN) {
357 let clean_key = key
358 .strip_prefix(BAGGAGE_PATTERN)
359 .unwrap_or(key)
360 .trim()
361 .to_string();
362
363 let value_string = match &kv.value {
364 Some(v) => Self::otel_value_to_string(v),
365 None => "null".to_string(),
366 };
367
368 baggage_records.push(TraceBaggageRecord {
369 created_at,
370 trace_id: trace_id_owned.clone(),
371 scope: scope_owned.clone(),
372 key: clean_key,
373 value: value_string,
374 });
375 }
376 else {
378 let value = match &kv.value {
379 Some(v) => otel_value_to_serde_value(v),
380 None => Value::Null,
381 };
382
383 cleaned_attributes.push(Attribute {
384 key: key.clone(),
385 value,
386 });
387 }
388 }
389
390 Ok((cleaned_attributes, baggage_records, tags))
391 }
392
393 fn otel_value_to_string(value: &AnyValue) -> String {
394 match &value.value {
395 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
396 s.clone()
397 }
398 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
399 i.to_string()
400 }
401 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
402 d.to_string()
403 }
404 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
405 b.to_string()
406 }
407 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
408 | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
409 let serde_val = otel_value_to_serde_value(value);
410 serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
411 }
412 _ => "null".to_string(),
413 }
414 }
415
416 fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
417 attributes
418 .iter()
419 .map(|kv| {
420 let value = match &kv.value {
421 Some(v) => otel_value_to_serde_value(v),
422 None => Value::Null,
423 };
424
425 Ok(Attribute {
426 key: kv.key.clone(),
427 value,
428 })
429 })
430 .collect()
431 }
432
433 fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
434 attributes
435 .iter()
436 .map(|kv| {
437 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
438 Ok(SpanEvent {
439 name: kv.name.clone(),
440 timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
441 attributes,
442 dropped_attributes_count: kv.dropped_attributes_count,
443 })
444 })
445 .collect()
446 }
447
448 fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
449 attributes
450 .iter()
451 .map(|kv| {
452 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
453 Ok(SpanLink {
454 trace_id: hex::encode(&kv.trace_id),
455 span_id: hex::encode(&kv.span_id),
456 trace_state: kv.trace_state.clone(),
457 attributes,
458 dropped_attributes_count: kv.dropped_attributes_count,
459 })
460 })
461 .collect()
462 }
463}
464
465#[derive(Clone, Debug, Serialize, Deserialize, Default)]
466pub struct TraceServerRecord {
467 pub request: ExportTraceServiceRequest,
468}
469
470impl TraceRecordExt for TraceServerRecord {}
471
472impl TraceServerRecord {
473 fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
482 let start_dt = Self::safe_timestamp_conversion(start_time);
484 let end_dt = Self::safe_timestamp_conversion(end_time);
485
486 let duration_ms = if end_time >= start_time {
488 let duration_nanos = end_time.saturating_sub(start_time);
489 (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
490 } else {
491 tracing::warn!(
492 start_time = start_time,
493 end_time = end_time,
494 "Invalid timestamp order detected in trace span"
495 );
496 0
497 };
498
499 (start_dt, end_dt, duration_ms)
500 }
501
502 fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
504 if timestamp_nanos <= i64::MAX as u64 {
505 DateTime::from_timestamp_nanos(timestamp_nanos as i64)
506 } else {
507 let seconds = timestamp_nanos / 1_000_000_000;
508 let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
509
510 DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
511 tracing::warn!(
512 timestamp = timestamp_nanos,
513 seconds = seconds,
514 nanoseconds = nanoseconds,
515 "Failed to convert large timestamp, falling back to current time"
516 );
517 Utc::now()
518 })
519 }
520 }
521
522 fn span_kind_to_string(kind: i32) -> String {
524 SpanKind::try_from(kind)
525 .map(|sk| {
526 sk.as_str_name()
527 .strip_prefix("SPAN_KIND_")
528 .unwrap_or(sk.as_str_name())
529 })
530 .unwrap_or("UNSPECIFIED")
531 .to_string()
532 }
533
534 fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
535 let mut input = Value::Null;
536 let mut output = Value::Null;
537
538 for attr in attributes {
539 if attr.key == SCOUTER_TRACING_INPUT {
540 if let Value::String(s) = &attr.value {
541 input = serde_json::from_str(s).unwrap_or_else(|e| {
542 tracing::warn!(
543 key = SCOUTER_TRACING_INPUT,
544 error = %e,
545 value = s,
546 "Failed to parse input attribute as JSON, falling back to string value."
547 );
548 Value::String(s.clone()) });
550 }
551 } else if attr.key == SCOUTER_TRACING_OUTPUT {
552 if let Value::String(s) = &attr.value {
553 output = serde_json::from_str(s)
554 .unwrap_or_else(|e| {
555 tracing::warn!(
556 key = SCOUTER_TRACING_OUTPUT,
557 error = %e,
558 value = s,
559 "Failed to parse output attribute as JSON, falling back to string value."
560 );
561 Value::String(s.clone()) });
563 }
564 }
565 }
566 (input, output)
567 }
568
569 fn get_scope_from_resource(
570 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
571 default: &str,
572 ) -> String {
573 resource
574 .as_ref()
575 .and_then(|r| r.attributes.iter().find(|attr| attr.key == SCOUTER_SCOPE))
576 .and_then(|attr| {
577 attr.value.as_ref().and_then(|v| {
578 if let Some(
579 opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
580 ) = &v.value
581 {
582 Some(s.clone())
583 } else {
584 None
585 }
586 })
587 })
588 .unwrap_or_else(|| default.to_string())
589 }
590
591 fn get_service_name_from_resource(
592 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
593 default: &str,
594 ) -> String {
595 resource
596 .as_ref()
597 .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
598 .and_then(|attr| {
599 attr.value.as_ref().and_then(|v| {
600 if let Some(
601 opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
602 ) = &v.value
603 {
604 Some(s.clone())
605 } else {
606 None
607 }
608 })
609 })
610 .unwrap_or_else(|| {
611 tracing::warn!(
612 "Service name not found in resource attributes, falling back to default: {}",
613 default
614 );
615 default.to_string()
616 })
617 }
618
619 pub fn get_trace_start_time_attribute(
622 attributes: &Vec<Attribute>,
623 start_time: &DateTime<Utc>,
624 ) -> DateTime<Utc> {
625 for attr in attributes {
626 if attr.key == TRACE_START_TIME_KEY {
627 if let Value::String(s) = &attr.value {
628 if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
629 return dt;
630 }
631 }
632 }
633 }
634
635 tracing::warn!(
636 "Trace start time attribute not found or invalid, falling back to span start_time"
637 );
638 *start_time
639 }
640
641 pub fn convert_to_baggage_records(
642 trace_id: &str,
643 attributes: &Vec<Attribute>,
644 scope_name: &str,
645 ) -> Vec<TraceBaggageRecord> {
646 let baggage_kvs: Vec<(String, String)> = attributes
647 .iter()
648 .filter_map(|attr| {
649 if attr.key.starts_with(BAGGAGE_PREFIX) {
651 let clean_key = attr
652 .key
653 .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
654 .map(|stripped| stripped.trim())
655 .unwrap_or(&attr.key)
656 .to_string();
657
658 let value_string = match &attr.value {
660 Value::String(s) => s.clone(),
661 Value::Number(n) => n.to_string(),
662 Value::Bool(b) => b.to_string(),
663 Value::Null => "null".to_string(),
664 Value::Array(_) | Value::Object(_) => {
665 serde_json::to_string(&attr.value)
667 .unwrap_or_else(|_| format!("{:?}", attr.value))
668 }
669 };
670
671 Some((clean_key, value_string))
672 } else {
673 None
674 }
675 })
676 .collect();
677
678 baggage_kvs
679 .into_iter()
680 .map(|(key, value)| TraceBaggageRecord {
681 created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
682 trace_id: trace_id.to_string(),
683 scope: scope_name.to_string(),
684 key,
685 value,
686 })
687 .collect()
688 }
689
690 #[allow(clippy::too_many_arguments)]
692 pub fn convert_to_span_record(
693 trace_id: &str,
694 span_id: &str,
695 span: &Span,
696 attributes: &Vec<Attribute>,
697 scope_name: &str,
698 start_time: DateTime<Utc>,
699 end_time: DateTime<Utc>,
700 duration_ms: i64,
701 service_name: String,
702 resource_attributes: &[Attribute],
703 ) -> Result<TraceSpanRecord, RecordError> {
704 let parent_span_id = if !span.parent_span_id.is_empty() {
706 Some(hex::encode(&span.parent_span_id))
707 } else {
708 None
709 };
710
711 let (input, output) = Self::extract_input_output(attributes);
712
713 Ok(TraceSpanRecord {
714 created_at: start_time,
715 trace_id: trace_id.to_string(),
716 span_id: span_id.to_string(),
717 parent_span_id,
718 start_time,
719 end_time,
720 duration_ms,
721 service_name,
722 scope: scope_name.to_string(),
723 span_name: span.name.clone(),
724 span_kind: Self::span_kind_to_string(span.kind),
725 status_code: span.status.as_ref().map(|s| s.code).unwrap_or_else(|| 0),
726 status_message: span
727 .status
728 .as_ref()
729 .map(|s| s.message.clone())
730 .unwrap_or_default(),
731 attributes: attributes.to_owned(),
732 events: Self::events_to_json_array(&span.events)?,
733 links: Self::links_to_json_array(&span.links)?,
734 label: None,
735 input,
736 output,
737 resource_attributes: resource_attributes.to_owned(),
738 })
739 }
740
741 pub fn to_records(self) -> Result<TraceRecords, RecordError> {
742 let resource_spans = self.request.resource_spans;
743
744 let estimated_capacity: usize = resource_spans
746 .iter()
747 .map(|rs| {
748 rs.scope_spans
749 .iter()
750 .map(|ss| ss.spans.len())
751 .sum::<usize>()
752 })
753 .sum();
754
755 let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
756 let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
757 let mut tags: HashSet<TagRecord> = HashSet::new();
758
759 for resource_span in resource_spans {
760 let service_name =
762 Self::get_service_name_from_resource(&resource_span.resource, "unknown");
763 let scope = Self::get_scope_from_resource(&resource_span.resource, "unknown");
764 let resource_attributes = Attribute::from_resources(&resource_span.resource);
765
766 for scope_span in &resource_span.scope_spans {
767 for span in &scope_span.spans {
768 let trace_id = hex::encode(&span.trace_id);
770 let span_id = hex::encode(&span.span_id);
771 let service_name = service_name.clone();
772
773 let (start_time, end_time, duration_ms) =
774 Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
775
776 let (cleaned_attributes, span_baggage, span_tags) =
777 Self::process_attributes(&trace_id, &span.attributes, &scope, start_time)?;
778
779 baggage_records.extend(span_baggage);
781 tags.extend(span_tags);
782
783 span_records.push(Self::convert_to_span_record(
785 &trace_id,
786 &span_id,
787 span,
788 &cleaned_attributes,
789 &scope,
790 start_time,
791 end_time,
792 duration_ms,
793 service_name,
794 &resource_attributes,
795 )?);
796 }
797 }
798 }
799
800 let tag_records: Vec<TagRecord> = tags.into_iter().collect();
801 Ok((span_records, baggage_records, tag_records))
802 }
803}
804
805#[derive(Clone, Debug, Serialize, Deserialize, Default)]
806#[pyclass]
807pub struct Attribute {
808 #[pyo3(get)]
809 pub key: String,
810 pub value: Value,
811}
812
813#[pymethods]
814impl Attribute {
815 #[getter]
816 pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
817 Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
818 }
819
820 pub fn __str__(&self) -> String {
821 PyHelperFuncs::__str__(self)
822 }
823}
824
825impl Attribute {
826 pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
827 Attribute {
828 key,
829 value: otel_value_to_serde_value(value),
830 }
831 }
832
833 fn from_resources(
834 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
835 ) -> Vec<Attribute> {
836 match resource {
837 Some(res) => res
838 .attributes
839 .iter()
840 .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
841 .collect(),
842 None => vec![],
843 }
844 }
845}
846
847#[derive(Clone, Debug, Serialize, Deserialize)]
848#[pyclass]
849pub struct SpanEvent {
850 #[pyo3(get)]
851 pub timestamp: chrono::DateTime<Utc>,
852 #[pyo3(get)]
853 pub name: String,
854 #[pyo3(get)]
855 pub attributes: Vec<Attribute>,
856 #[pyo3(get)]
857 pub dropped_attributes_count: u32,
858}
859
860#[pymethods]
861impl SpanEvent {
862 pub fn __str__(&self) -> String {
863 PyHelperFuncs::__str__(self)
864 }
865}
866
867#[derive(Clone, Debug, Serialize, Deserialize)]
868#[pyclass]
869pub struct SpanLink {
870 #[pyo3(get)]
871 pub trace_id: String,
872 #[pyo3(get)]
873 pub span_id: String,
874 #[pyo3(get)]
875 pub trace_state: String,
876 #[pyo3(get)]
877 pub attributes: Vec<Attribute>,
878 #[pyo3(get)]
879 pub dropped_attributes_count: u32,
880}
881
882#[pymethods]
883impl SpanLink {
884 pub fn __str__(&self) -> String {
885 PyHelperFuncs::__str__(self)
886 }
887}
888
889#[derive(Clone, Debug, Serialize, Deserialize)]
890#[pyclass]
891pub struct Tag {
892 #[pyo3(get)]
893 pub key: String,
894 #[pyo3(get)]
895 pub value: String,
896}
897
898#[pymethods]
899impl Tag {
900 pub fn __str__(&self) -> String {
901 PyHelperFuncs::__str__(self)
902 }
903}
904
905#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
906#[pyclass]
907#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
908pub struct TagRecord {
909 #[pyo3(get)]
910 pub entity_type: String,
911 #[pyo3(get)]
912 pub entity_id: String,
913 #[pyo3(get)]
914 pub key: String,
915 #[pyo3(get)]
916 pub value: String,
917}
918
919#[pymethods]
920impl TagRecord {
921 pub fn __str__(&self) -> String {
922 PyHelperFuncs::__str__(self)
923 }
924}