1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use pyo3::prelude::*;
16use pyo3::types::PyDict;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashSet;
20use std::fmt;
21
22pub const FUNCTION_TYPE: &str = "function.type";
23pub const FUNCTION_STREAMING: &str = "function.streaming";
24pub const FUNCTION_NAME: &str = "function.name";
25pub const FUNCTION_MODULE: &str = "function.module";
26pub const FUNCTION_QUALNAME: &str = "function.qualname";
27pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
28pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
29pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
30pub const SERVICE_NAME: &str = "service.name";
31pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
32pub const BAGGAGE_PREFIX: &str = "baggage";
33pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
34pub const SCOUTER_SCOPE: &str = "scouter.scope";
35pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
36pub const SPAN_ERROR: &str = "span.error";
37pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
38pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
39pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
40pub const SCOUTER_ENTITY: &str = "scouter.entity";
41
42pub const BAGGAGE_PATTERN: &str = "baggage.";
44pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
45pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
46
47type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
48
49#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq, Hash)]
50pub struct ScouterEntityAttribute {
51 pub uid: String,
52 pub r#type: String,
53 pub space: String,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
57pub struct TraceId([u8; 16]);
58
59impl TraceId {
60 pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
61 let mut bytes = [0u8; 16];
62 hex::decode_to_slice(hex, &mut bytes)?;
63 Ok(Self(bytes))
64 }
65
66 pub fn from_bytes(bytes: [u8; 16]) -> Self {
67 Self(bytes)
68 }
69
70 pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
71 if slice.len() != 16 {
72 return Err(RecordError::SliceError(format!(
73 "Invalid trace_id length: expected 16 bytes, got {}",
74 slice.len()
75 )));
76 }
77 let mut bytes = [0u8; 16];
78 bytes.copy_from_slice(slice);
79 Ok(Self(bytes))
80 }
81
82 pub fn to_hex(&self) -> String {
83 hex::encode(self.0)
84 }
85
86 pub fn as_bytes(&self) -> &[u8; 16] {
87 &self.0
88 }
89
90 pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
91 let bytes = hex::decode(hex)?;
92 if bytes.len() == 16 {
94 Ok(bytes)
95 } else {
96 Err(RecordError::SliceError(format!(
97 "Invalid hex string length: expected 16 or 8 bytes, got {}",
98 bytes.len()
99 )))
100 }
101 }
102}
103
104impl fmt::Display for TraceId {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 write!(f, "{}", self.to_hex())
107 }
108}
109
110impl Serialize for TraceId {
111 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
112 where
113 S: serde::Serializer,
114 {
115 serializer.serialize_str(&self.to_hex())
116 }
117}
118
119impl<'de> Deserialize<'de> for TraceId {
120 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
121 where
122 D: serde::Deserializer<'de>,
123 {
124 let hex = String::deserialize(deserializer)?;
125 TraceId::from_hex(&hex).map_err(serde::de::Error::custom)
126 }
127}
128
129#[cfg(feature = "server")]
130impl<'r> sqlx::Decode<'r, sqlx::Postgres> for TraceId {
131 fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
132 let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
133 if bytes.len() != 16 {
134 return Err("TraceId must be exactly 16 bytes".into());
135 }
136 let mut array = [0u8; 16];
137 array.copy_from_slice(bytes);
138 Ok(TraceId(array))
139 }
140}
141
142#[cfg(feature = "server")]
143impl sqlx::Type<sqlx::Postgres> for TraceId {
144 fn type_info() -> sqlx::postgres::PgTypeInfo {
145 <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
146 }
147}
148
149#[cfg(feature = "server")]
150impl sqlx::Encode<'_, sqlx::Postgres> for TraceId {
151 fn encode_by_ref(
152 &self,
153 buf: &mut sqlx::postgres::PgArgumentBuffer,
154 ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
155 <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
156 }
157}
158
159#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
160pub struct SpanId([u8; 8]);
161
162impl SpanId {
163 pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
164 let mut bytes = [0u8; 8];
165 hex::decode_to_slice(hex, &mut bytes)?;
166 Ok(Self(bytes))
167 }
168
169 pub fn from_bytes(bytes: [u8; 8]) -> Self {
170 Self(bytes)
171 }
172
173 pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
174 if slice.len() != 8 {
175 return Err(RecordError::SliceError(format!(
176 "Invalid trace_id length: expected 8 bytes, got {}",
177 slice.len()
178 )));
179 }
180 let mut bytes = [0u8; 8];
181 bytes.copy_from_slice(slice);
182 Ok(Self(bytes))
183 }
184
185 pub fn to_hex(&self) -> String {
186 hex::encode(self.0)
187 }
188
189 pub fn as_bytes(&self) -> &[u8; 8] {
190 &self.0
191 }
192
193 pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
194 let bytes = hex::decode(hex)?;
195 if bytes.len() == 8 {
196 Ok(bytes)
197 } else {
198 Err(RecordError::SliceError(format!(
199 "Invalid hex string length: expected 16 or 8 bytes, got {}",
200 bytes.len()
201 )))
202 }
203 }
204}
205
206impl fmt::Display for SpanId {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 write!(f, "{}", self.to_hex())
209 }
210}
211
212impl Serialize for SpanId {
213 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
214 where
215 S: serde::Serializer,
216 {
217 serializer.serialize_str(&self.to_hex())
218 }
219}
220
221impl<'de> Deserialize<'de> for SpanId {
222 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
223 where
224 D: serde::Deserializer<'de>,
225 {
226 let hex = String::deserialize(deserializer)?;
227 SpanId::from_hex(&hex).map_err(serde::de::Error::custom)
228 }
229}
230
231#[cfg(feature = "server")]
232impl<'r> sqlx::Decode<'r, sqlx::Postgres> for SpanId {
233 fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
234 let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
235 if bytes.len() != 8 {
236 return Err("SpanId must be exactly 8 bytes".into());
237 }
238 let mut array = [0u8; 8];
239 array.copy_from_slice(bytes);
240 Ok(SpanId(array))
241 }
242}
243
244#[cfg(feature = "server")]
245impl sqlx::Type<sqlx::Postgres> for SpanId {
246 fn type_info() -> sqlx::postgres::PgTypeInfo {
247 <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
248 }
249}
250
251#[cfg(feature = "server")]
252impl sqlx::Encode<'_, sqlx::Postgres> for SpanId {
253 fn encode_by_ref(
254 &self,
255 buf: &mut sqlx::postgres::PgArgumentBuffer,
256 ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
257 <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
258 }
259}
260
261#[derive(Clone, Debug, Serialize, Deserialize, Default)]
262#[pyclass]
263pub struct TraceRecord {
264 #[pyo3(get)]
265 pub created_at: DateTime<Utc>,
266 pub trace_id: TraceId,
267 #[pyo3(get)]
268 pub service_name: String,
269 #[pyo3(get)]
270 pub scope_name: String,
271 #[pyo3(get)]
272 pub scope_version: Option<String>,
273 #[pyo3(get)]
274 pub trace_state: String,
275 #[pyo3(get)]
276 pub start_time: chrono::DateTime<Utc>,
277 #[pyo3(get)]
278 pub end_time: chrono::DateTime<Utc>,
279 #[pyo3(get)]
280 pub duration_ms: i64,
281 #[pyo3(get)]
282 pub status_code: i32,
283 #[pyo3(get)]
284 pub status_message: String,
285 pub root_span_id: SpanId,
286 #[pyo3(get)]
287 pub span_count: i32,
288 #[pyo3(get)]
289 pub tags: Vec<Tag>,
290 #[pyo3(get)]
291 pub process_attributes: Vec<Attribute>,
292}
293
294#[pymethods]
295impl TraceRecord {
296 pub fn __str__(&self) -> String {
297 PyHelperFuncs::__str__(self)
298 }
299
300 #[getter]
301 pub fn get_trace_id(&self) -> String {
302 self.trace_id.to_hex()
303 }
304
305 #[getter]
306 pub fn get_root_span_id(&self) -> String {
307 self.root_span_id.to_hex()
308 }
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize, Default)]
312#[pyclass]
313pub struct TraceSpanRecord {
314 #[pyo3(get)]
315 pub created_at: chrono::DateTime<Utc>,
316
317 pub trace_id: TraceId,
319 pub span_id: SpanId,
320 pub parent_span_id: Option<SpanId>,
321
322 #[pyo3(get)]
324 pub flags: i32,
325 #[pyo3(get)]
326 pub trace_state: String,
327
328 #[pyo3(get)]
330 pub scope_name: String,
331 #[pyo3(get)]
332 pub scope_version: Option<String>,
333
334 #[pyo3(get)]
336 pub span_name: String,
337 #[pyo3(get)]
338 pub span_kind: String,
339
340 #[pyo3(get)]
342 pub start_time: chrono::DateTime<Utc>,
343 #[pyo3(get)]
344 pub end_time: chrono::DateTime<Utc>,
345 #[pyo3(get)]
346 pub duration_ms: i64,
347
348 #[pyo3(get)]
350 pub status_code: i32,
351 #[pyo3(get)]
352 pub status_message: String,
353
354 #[pyo3(get)]
356 pub attributes: Vec<Attribute>,
357 #[pyo3(get)]
358 pub events: Vec<SpanEvent>,
359 #[pyo3(get)]
360 pub links: Vec<SpanLink>,
361
362 #[pyo3(get)]
364 pub label: Option<String>,
365 pub input: Value,
366 pub output: Value,
367
368 #[pyo3(get)]
370 pub service_name: String,
371 #[pyo3(get)]
372 pub resource_attributes: Vec<Attribute>,
373}
374
375#[pymethods]
376impl TraceSpanRecord {
377 #[getter]
378 pub fn get_trace_id(&self) -> String {
379 self.trace_id.to_hex()
380 }
381
382 #[getter]
383 pub fn get_span_id(&self) -> String {
384 self.span_id.to_hex()
385 }
386
387 #[getter]
388 pub fn get_parent_span_id(&self) -> Option<String> {
389 self.parent_span_id.as_ref().map(|id| id.to_hex())
390 }
391
392 #[getter]
393 pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
394 let dict = PyDict::new(py);
395 match &self.input {
396 Value::Null => {}
397 _ => {
398 json_to_pyobject(py, &self.input, &dict)?;
399 }
400 }
401 Ok(dict)
402 }
403
404 #[getter]
405 pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
406 let dict = PyDict::new(py);
407 match &self.output {
408 Value::Null => {}
409 _ => {
410 json_to_pyobject(py, &self.output, &dict)?;
411 }
412 }
413 Ok(dict)
414 }
415
416 pub fn __str__(&self) -> String {
417 PyHelperFuncs::__str__(self)
419 }
420}
421
422#[derive(Clone, Debug, Serialize, Deserialize, Default)]
423#[pyclass]
424#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
425pub struct TraceBaggageRecord {
426 #[pyo3(get)]
427 pub created_at: DateTime<Utc>,
428 pub trace_id: TraceId,
429 #[pyo3(get)]
430 pub scope: String,
431 #[pyo3(get)]
432 pub key: String,
433 #[pyo3(get)]
434 pub value: String,
435}
436
437#[pymethods]
438impl TraceBaggageRecord {
439 pub fn __str__(&self) -> String {
440 PyHelperFuncs::__str__(self)
441 }
442
443 #[getter]
444 pub fn get_trace_id(&self) -> String {
445 self.trace_id.to_hex()
446 }
447}
448
449pub type TraceRecords = (
450 Vec<TraceSpanRecord>,
451 Vec<TraceBaggageRecord>,
452 Vec<TagRecord>,
453);
454
455pub trait TraceRecordExt {
456 fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
457 Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
458 }
459
460 fn process_attributes(
461 trace_id: &TraceId,
462 span_attributes: &[KeyValue],
463 scope: &str,
464 created_at: DateTime<Utc>,
465 ) -> Result<SpanAttributes, RecordError> {
466 let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
467 let mut baggage_records = Vec::new();
468 let mut tags = Vec::new();
469 let scope_owned = scope.to_string();
470
471 for kv in span_attributes {
472 let key = &kv.key;
473
474 if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
476 if !tag_key.is_empty() {
477 let string_value = match &kv.value {
479 Some(v) => Self::otel_value_to_string(v),
480 None => "null".to_string(),
481 };
482
483 tags.push(TagRecord::from_trace(
485 trace_id,
486 tag_key.to_string(),
487 string_value.clone(),
488 ));
489
490 cleaned_attributes.push(Attribute {
492 key: tag_key.to_string(),
493 value: Value::String(string_value.clone()),
494 });
495
496 baggage_records.push(TraceBaggageRecord {
498 created_at,
499 trace_id: trace_id.clone(),
500 scope: scope_owned.clone(),
501 key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
502 value: string_value,
503 });
504 } else {
505 tracing::warn!(
506 attribute_key = %key,
507 "Skipping baggage tag with empty key after prefix removal"
508 );
509 }
510 }
511 else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
513 if !tag_key.is_empty() {
515 let string_value = match &kv.value {
516 Some(v) => Self::otel_value_to_string(v),
517 None => "null".to_string(),
518 };
519
520 tags.push(TagRecord::from_trace(
521 trace_id,
522 tag_key.to_string(),
523 string_value.clone(),
524 ));
525
526 cleaned_attributes.push(Attribute {
527 key: tag_key.to_string(),
528 value: Value::String(string_value.clone()),
529 });
530 } else {
531 tracing::warn!(
532 attribute_key = %key,
533 "Skipping tag with empty key after prefix removal"
534 );
535 }
536 }
537 else if key.starts_with(BAGGAGE_PATTERN) {
539 let clean_key = key
540 .strip_prefix(BAGGAGE_PATTERN)
541 .unwrap_or(key)
542 .trim()
543 .to_string();
544
545 let string_value = match &kv.value {
546 Some(v) => Self::otel_value_to_string(v),
547 None => "null".to_string(),
548 };
549
550 baggage_records.push(TraceBaggageRecord {
551 created_at,
552 trace_id: trace_id.clone(),
553 scope: scope_owned.clone(),
554 key: clean_key,
555 value: string_value,
556 });
557 }
558 else {
560 let value = match &kv.value {
561 Some(v) => otel_value_to_serde_value(v),
562 None => Value::Null,
563 };
564
565 cleaned_attributes.push(Attribute {
566 key: key.clone(),
567 value,
568 });
569 }
570 }
571
572 Ok((cleaned_attributes, baggage_records, tags))
573 }
574
575 fn otel_value_to_string(value: &AnyValue) -> String {
576 match &value.value {
577 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
578 s.clone()
579 }
580 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
581 i.to_string()
582 }
583 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
584 d.to_string()
585 }
586 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
587 b.to_string()
588 }
589 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
590 | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
591 let serde_val = otel_value_to_serde_value(value);
592 serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
593 }
594 _ => "null".to_string(),
595 }
596 }
597
598 fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
599 attributes
600 .iter()
601 .map(|kv| {
602 let value = match &kv.value {
603 Some(v) => otel_value_to_serde_value(v),
604 None => Value::Null,
605 };
606
607 Ok(Attribute {
608 key: kv.key.clone(),
609 value,
610 })
611 })
612 .collect()
613 }
614
615 fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
616 attributes
617 .iter()
618 .map(|kv| {
619 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
620 Ok(SpanEvent {
621 name: kv.name.clone(),
622 timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
623 attributes,
624 dropped_attributes_count: kv.dropped_attributes_count,
625 })
626 })
627 .collect()
628 }
629
630 fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
631 attributes
632 .iter()
633 .map(|kv| {
634 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
635 Ok(SpanLink {
636 trace_id: hex::encode(&kv.trace_id),
637 span_id: hex::encode(&kv.span_id),
638 trace_state: kv.trace_state.clone(),
639 attributes,
640 dropped_attributes_count: kv.dropped_attributes_count,
641 })
642 })
643 .collect()
644 }
645}
646
647#[derive(Clone, Debug, Serialize, Deserialize, Default)]
648pub struct TraceServerRecord {
649 pub request: ExportTraceServiceRequest,
650}
651
652impl TraceRecordExt for TraceServerRecord {}
653
654impl TraceServerRecord {
655 fn get_scope_info(
657 scope_span: &opentelemetry_proto::tonic::trace::v1::ScopeSpans,
658 ) -> (String, Option<String>) {
659 let scope_name = scope_span
660 .scope
661 .as_ref()
662 .map(|s| s.name.clone())
663 .filter(|n| !n.is_empty())
664 .unwrap_or_else(|| "unknown".to_string());
665
666 let scope_version = scope_span.scope.as_ref().and_then(|s| {
667 if s.version.is_empty() {
668 None
669 } else {
670 Some(s.version.clone())
671 }
672 });
673
674 (scope_name, scope_version)
675 }
676
677 fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
686 let start_dt = Self::safe_timestamp_conversion(start_time);
688 let end_dt = Self::safe_timestamp_conversion(end_time);
689
690 let duration_ms = if end_time >= start_time {
692 let duration_nanos = end_time.saturating_sub(start_time);
693 (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
694 } else {
695 tracing::warn!(
696 start_time = start_time,
697 end_time = end_time,
698 "Invalid timestamp order detected in trace span"
699 );
700 0
701 };
702
703 (start_dt, end_dt, duration_ms)
704 }
705
706 fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
708 if timestamp_nanos <= i64::MAX as u64 {
709 DateTime::from_timestamp_nanos(timestamp_nanos as i64)
710 } else {
711 let seconds = timestamp_nanos / 1_000_000_000;
712 let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
713
714 DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
715 tracing::warn!(
716 timestamp = timestamp_nanos,
717 seconds = seconds,
718 nanoseconds = nanoseconds,
719 "Failed to convert large timestamp, falling back to current time"
720 );
721 Utc::now()
722 })
723 }
724 }
725
726 fn span_kind_to_string(kind: i32) -> String {
728 SpanKind::try_from(kind)
729 .map(|sk| {
730 sk.as_str_name()
731 .strip_prefix("SPAN_KIND_")
732 .unwrap_or(sk.as_str_name())
733 })
734 .unwrap_or("UNSPECIFIED")
735 .to_string()
736 }
737
738 fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
739 let mut input = Value::Null;
740 let mut output = Value::Null;
741
742 for attr in attributes {
743 if attr.key == SCOUTER_TRACING_INPUT {
744 if let Value::String(s) = &attr.value {
745 input = serde_json::from_str(s).unwrap_or_else(|e| {
746 tracing::warn!(
747 key = SCOUTER_TRACING_INPUT,
748 error = %e,
749 value = s,
750 "Failed to parse input attribute as JSON, falling back to string value."
751 );
752 Value::String(s.clone()) });
754 }
755 } else if attr.key == SCOUTER_TRACING_OUTPUT {
756 if let Value::String(s) = &attr.value {
757 output = serde_json::from_str(s)
758 .unwrap_or_else(|e| {
759 tracing::warn!(
760 key = SCOUTER_TRACING_OUTPUT,
761 error = %e,
762 value = s,
763 "Failed to parse output attribute as JSON, falling back to string value."
764 );
765 Value::String(s.clone()) });
767 }
768 }
769 }
770 (input, output)
771 }
772
773 fn get_service_name_from_resource(
774 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
775 default: &str,
776 ) -> String {
777 resource
778 .as_ref()
779 .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
780 .and_then(|attr| {
781 attr.value.as_ref().and_then(|v| {
782 if let Some(
783 opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
784 ) = &v.value
785 {
786 Some(s.clone())
787 } else {
788 None
789 }
790 })
791 })
792 .unwrap_or_else(|| {
793 tracing::warn!(
794 "Service name not found in resource attributes, falling back to default: {}",
795 default
796 );
797 default.to_string()
798 })
799 }
800
801 pub fn get_trace_start_time_attribute(
804 attributes: &Vec<Attribute>,
805 start_time: &DateTime<Utc>,
806 ) -> DateTime<Utc> {
807 for attr in attributes {
808 if attr.key == TRACE_START_TIME_KEY {
809 if let Value::String(s) = &attr.value {
810 if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
811 return dt;
812 }
813 }
814 }
815 }
816
817 tracing::warn!(
818 "Trace start time attribute not found or invalid, falling back to span start_time"
819 );
820 *start_time
821 }
822
823 pub fn convert_to_baggage_records(
824 trace_id: &TraceId,
825 attributes: &Vec<Attribute>,
826 scope_name: &str,
827 ) -> Vec<TraceBaggageRecord> {
828 let baggage_kvs: Vec<(String, String)> = attributes
829 .iter()
830 .filter_map(|attr| {
831 if attr.key.starts_with(BAGGAGE_PREFIX) {
833 let clean_key = attr
834 .key
835 .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
836 .map(|stripped| stripped.trim())
837 .unwrap_or(&attr.key)
838 .to_string();
839
840 let value_string = match &attr.value {
842 Value::String(s) => s.clone(),
843 Value::Number(n) => n.to_string(),
844 Value::Bool(b) => b.to_string(),
845 Value::Null => "null".to_string(),
846 Value::Array(_) | Value::Object(_) => {
847 serde_json::to_string(&attr.value)
849 .unwrap_or_else(|_| format!("{:?}", attr.value))
850 }
851 };
852
853 Some((clean_key, value_string))
854 } else {
855 None
856 }
857 })
858 .collect();
859
860 baggage_kvs
861 .into_iter()
862 .map(|(key, value)| TraceBaggageRecord {
863 created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
864 trace_id: trace_id.clone(),
865 scope: scope_name.to_string(),
866 key,
867 value,
868 })
869 .collect()
870 }
871
872 pub fn to_records(self) -> Result<TraceRecords, RecordError> {
873 let resource_spans = self.request.resource_spans;
874
875 let estimated_capacity: usize = resource_spans
877 .iter()
878 .map(|rs| {
879 rs.scope_spans
880 .iter()
881 .map(|ss| ss.spans.len())
882 .sum::<usize>()
883 })
884 .sum();
885
886 let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
887 let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
888 let mut tags: HashSet<TagRecord> = HashSet::new();
889
890 for resource_span in resource_spans {
891 let service_name =
893 Self::get_service_name_from_resource(&resource_span.resource, "unknown");
894 let resource_attributes = Attribute::from_resources(&resource_span.resource);
895
896 for scope_span in &resource_span.scope_spans {
897 let (scope_name, scope_version) = Self::get_scope_info(scope_span);
898
899 for span in &scope_span.spans {
900 let trace_id = TraceId::from_slice(span.trace_id.as_slice())?;
902 let span_id = SpanId::from_slice(span.span_id.as_slice())?;
903 let parent_span_id = if !span.parent_span_id.is_empty() {
904 Some(SpanId::from_slice(span.parent_span_id.as_slice())?)
905 } else {
906 None
907 };
908
909 let (start_time, end_time, duration_ms) =
910 Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
911
912 let (cleaned_attributes, span_baggage, span_tags) = Self::process_attributes(
913 &trace_id,
914 &span.attributes,
915 &scope_name,
916 start_time,
917 )?;
918
919 baggage_records.extend(span_baggage);
921 tags.extend(span_tags);
922
923 let (input, output) = Self::extract_input_output(&cleaned_attributes);
924
925 span_records.push(TraceSpanRecord {
927 created_at: start_time,
928 trace_id,
929 span_id,
930 parent_span_id,
931 flags: span.flags as i32,
932 trace_state: span.trace_state.clone(),
933 scope_name: scope_name.clone(),
934 scope_version: scope_version.clone(),
935 span_name: span.name.clone(),
936 span_kind: Self::span_kind_to_string(span.kind),
937 start_time,
938 end_time,
939 duration_ms,
940 status_code: span.status.as_ref().map(|s| s.code).unwrap_or(0),
941 status_message: span
942 .status
943 .as_ref()
944 .map(|s| s.message.clone())
945 .unwrap_or_default(),
946 attributes: cleaned_attributes,
947 events: Self::events_to_json_array(&span.events)?,
948 links: Self::links_to_json_array(&span.links)?,
949 label: None,
950 input,
951 output,
952 service_name: service_name.clone(),
953 resource_attributes: resource_attributes.clone(),
954 });
955 }
956 }
957 }
958
959 let tag_records: Vec<TagRecord> = tags.into_iter().collect();
960 Ok((span_records, baggage_records, tag_records))
961 }
962}
963
964#[derive(Clone, Debug, Serialize, Deserialize, Default)]
965#[pyclass]
966pub struct Attribute {
967 #[pyo3(get)]
968 pub key: String,
969 pub value: Value,
970}
971
972#[pymethods]
973impl Attribute {
974 #[getter]
975 pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
976 Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
977 }
978
979 pub fn __str__(&self) -> String {
980 PyHelperFuncs::__str__(self)
981 }
982}
983
984impl Attribute {
985 pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
986 Attribute {
987 key,
988 value: otel_value_to_serde_value(value),
989 }
990 }
991
992 fn from_resources(
993 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
994 ) -> Vec<Attribute> {
995 match resource {
996 Some(res) => res
997 .attributes
998 .iter()
999 .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
1000 .collect(),
1001 None => vec![],
1002 }
1003 }
1004}
1005
1006#[derive(Clone, Debug, Serialize, Deserialize)]
1007#[pyclass]
1008pub struct SpanEvent {
1009 #[pyo3(get)]
1010 pub timestamp: chrono::DateTime<Utc>,
1011 #[pyo3(get)]
1012 pub name: String,
1013 #[pyo3(get)]
1014 pub attributes: Vec<Attribute>,
1015 #[pyo3(get)]
1016 pub dropped_attributes_count: u32,
1017}
1018
1019#[pymethods]
1020impl SpanEvent {
1021 pub fn __str__(&self) -> String {
1022 PyHelperFuncs::__str__(self)
1023 }
1024}
1025
1026#[derive(Clone, Debug, Serialize, Deserialize)]
1027#[pyclass]
1028pub struct SpanLink {
1029 #[pyo3(get)]
1030 pub trace_id: String,
1031 #[pyo3(get)]
1032 pub span_id: String,
1033 #[pyo3(get)]
1034 pub trace_state: String,
1035 #[pyo3(get)]
1036 pub attributes: Vec<Attribute>,
1037 #[pyo3(get)]
1038 pub dropped_attributes_count: u32,
1039}
1040
1041#[pymethods]
1042impl SpanLink {
1043 pub fn __str__(&self) -> String {
1044 PyHelperFuncs::__str__(self)
1045 }
1046}
1047
1048#[derive(Clone, Debug, Serialize, Deserialize)]
1049#[pyclass]
1050pub struct Tag {
1051 #[pyo3(get)]
1052 pub key: String,
1053 #[pyo3(get)]
1054 pub value: String,
1055}
1056
1057#[pymethods]
1058impl Tag {
1059 pub fn __str__(&self) -> String {
1060 PyHelperFuncs::__str__(self)
1061 }
1062}
1063
1064#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
1065#[pyclass]
1066#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
1067pub struct TagRecord {
1068 #[pyo3(get)]
1069 pub entity_type: String,
1070 #[pyo3(get)]
1071 pub entity_id: String,
1072 #[pyo3(get)]
1073 pub key: String,
1074 #[pyo3(get)]
1075 pub value: String,
1076}
1077
1078impl TagRecord {
1079 pub fn from_trace(trace_id: &TraceId, key: String, value: String) -> Self {
1081 Self {
1082 entity_type: "trace".to_string(),
1083 entity_id: trace_id.to_hex(),
1084 key,
1085 value,
1086 }
1087 }
1088}
1089
1090#[pymethods]
1091impl TagRecord {
1092 pub fn __str__(&self) -> String {
1093 PyHelperFuncs::__str__(self)
1094 }
1095}