1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use pyo3::prelude::*;
16use pyo3::types::PyDict;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::fmt;
22
23pub const FUNCTION_TYPE: &str = "function.type";
24pub const FUNCTION_STREAMING: &str = "function.streaming";
25pub const FUNCTION_NAME: &str = "function.name";
26pub const FUNCTION_MODULE: &str = "function.module";
27pub const FUNCTION_QUALNAME: &str = "function.qualname";
28pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
29pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
30pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
31pub const SERVICE_NAME: &str = "service.name";
32pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
33pub const BAGGAGE_PREFIX: &str = "baggage";
34pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
35pub const SCOUTER_SCOPE: &str = "scouter.scope";
36pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
37pub const SPAN_ERROR: &str = "span.error";
38pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
39pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
40pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
41pub const SCOUTER_ENTITY: &str = "scouter.entity";
42
43pub const BAGGAGE_PATTERN: &str = "baggage.";
45pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
46pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
47
48type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
49
50#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq, Hash)]
51pub struct ScouterEntityAttribute {
52 pub uid: String,
53 pub r#type: String,
54 pub space: String,
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
58pub struct TraceId([u8; 16]);
59
60impl TraceId {
61 pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
62 let mut bytes = [0u8; 16];
63 hex::decode_to_slice(hex, &mut bytes)?;
64 Ok(Self(bytes))
65 }
66
67 pub fn from_bytes(bytes: [u8; 16]) -> Self {
68 Self(bytes)
69 }
70
71 pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
72 if slice.len() != 16 {
73 return Err(RecordError::SliceError(format!(
74 "Invalid trace_id length: expected 16 bytes, got {}",
75 slice.len()
76 )));
77 }
78 let mut bytes = [0u8; 16];
79 bytes.copy_from_slice(slice);
80 Ok(Self(bytes))
81 }
82
83 pub fn to_hex(&self) -> String {
84 hex::encode(self.0)
85 }
86
87 pub fn as_bytes(&self) -> &[u8; 16] {
88 &self.0
89 }
90
91 pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
92 let bytes = hex::decode(hex)?;
93 if bytes.len() == 16 {
95 Ok(bytes)
96 } else {
97 Err(RecordError::SliceError(format!(
98 "Invalid hex string length: expected 16 or 8 bytes, got {}",
99 bytes.len()
100 )))
101 }
102 }
103}
104
105impl fmt::Display for TraceId {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 write!(f, "{}", self.to_hex())
108 }
109}
110
111impl Serialize for TraceId {
112 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
113 where
114 S: serde::Serializer,
115 {
116 serializer.serialize_str(&self.to_hex())
117 }
118}
119
120impl<'de> Deserialize<'de> for TraceId {
121 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
122 where
123 D: serde::Deserializer<'de>,
124 {
125 let hex = String::deserialize(deserializer)?;
126 TraceId::from_hex(&hex).map_err(serde::de::Error::custom)
127 }
128}
129
130#[cfg(feature = "server")]
131impl<'r> sqlx::Decode<'r, sqlx::Postgres> for TraceId {
132 fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
133 let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
134 if bytes.len() != 16 {
135 return Err("TraceId must be exactly 16 bytes".into());
136 }
137 let mut array = [0u8; 16];
138 array.copy_from_slice(bytes);
139 Ok(TraceId(array))
140 }
141}
142
143#[cfg(feature = "server")]
144impl sqlx::Type<sqlx::Postgres> for TraceId {
145 fn type_info() -> sqlx::postgres::PgTypeInfo {
146 <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
147 }
148}
149
150#[cfg(feature = "server")]
151impl sqlx::Encode<'_, sqlx::Postgres> for TraceId {
152 fn encode_by_ref(
153 &self,
154 buf: &mut sqlx::postgres::PgArgumentBuffer,
155 ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
156 <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
157 }
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
161pub struct SpanId([u8; 8]);
162
163impl SpanId {
164 pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
165 let mut bytes = [0u8; 8];
166 hex::decode_to_slice(hex, &mut bytes)?;
167 Ok(Self(bytes))
168 }
169
170 pub fn from_bytes(bytes: [u8; 8]) -> Self {
171 Self(bytes)
172 }
173
174 pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
175 if slice.len() != 8 {
176 return Err(RecordError::SliceError(format!(
177 "Invalid trace_id length: expected 8 bytes, got {}",
178 slice.len()
179 )));
180 }
181 let mut bytes = [0u8; 8];
182 bytes.copy_from_slice(slice);
183 Ok(Self(bytes))
184 }
185
186 pub fn to_hex(&self) -> String {
187 hex::encode(self.0)
188 }
189
190 pub fn as_bytes(&self) -> &[u8; 8] {
191 &self.0
192 }
193
194 pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
195 let bytes = hex::decode(hex)?;
196 if bytes.len() == 8 {
197 Ok(bytes)
198 } else {
199 Err(RecordError::SliceError(format!(
200 "Invalid hex string length: expected 16 or 8 bytes, got {}",
201 bytes.len()
202 )))
203 }
204 }
205}
206
207impl fmt::Display for SpanId {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 write!(f, "{}", self.to_hex())
210 }
211}
212
213impl Serialize for SpanId {
214 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
215 where
216 S: serde::Serializer,
217 {
218 serializer.serialize_str(&self.to_hex())
219 }
220}
221
222impl<'de> Deserialize<'de> for SpanId {
223 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
224 where
225 D: serde::Deserializer<'de>,
226 {
227 let hex = String::deserialize(deserializer)?;
228 SpanId::from_hex(&hex).map_err(serde::de::Error::custom)
229 }
230}
231
232#[cfg(feature = "server")]
233impl<'r> sqlx::Decode<'r, sqlx::Postgres> for SpanId {
234 fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
235 let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
236 if bytes.len() != 8 {
237 return Err("SpanId must be exactly 8 bytes".into());
238 }
239 let mut array = [0u8; 8];
240 array.copy_from_slice(bytes);
241 Ok(SpanId(array))
242 }
243}
244
245#[cfg(feature = "server")]
246impl sqlx::Type<sqlx::Postgres> for SpanId {
247 fn type_info() -> sqlx::postgres::PgTypeInfo {
248 <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
249 }
250}
251
252#[cfg(feature = "server")]
253impl sqlx::Encode<'_, sqlx::Postgres> for SpanId {
254 fn encode_by_ref(
255 &self,
256 buf: &mut sqlx::postgres::PgArgumentBuffer,
257 ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
258 <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
259 }
260}
261
262#[derive(Clone, Debug, Serialize, Deserialize, Default)]
263#[pyclass]
264pub struct TraceRecord {
265 #[pyo3(get)]
266 pub created_at: DateTime<Utc>,
267 pub trace_id: TraceId,
268 #[pyo3(get)]
269 pub service_name: String,
270 #[pyo3(get)]
271 pub scope_name: String,
272 #[pyo3(get)]
273 pub scope_version: Option<String>,
274 #[pyo3(get)]
275 pub trace_state: String,
276 #[pyo3(get)]
277 pub start_time: chrono::DateTime<Utc>,
278 #[pyo3(get)]
279 pub end_time: chrono::DateTime<Utc>,
280 #[pyo3(get)]
281 pub duration_ms: i64,
282 #[pyo3(get)]
283 pub status_code: i32,
284 #[pyo3(get)]
285 pub status_message: String,
286 pub root_span_id: SpanId,
287 #[pyo3(get)]
288 pub span_count: i32,
289 #[pyo3(get)]
290 pub tags: Vec<Tag>,
291 #[pyo3(get)]
292 pub process_attributes: Vec<Attribute>,
293}
294
295#[pymethods]
296impl TraceRecord {
297 pub fn __str__(&self) -> String {
298 PyHelperFuncs::__str__(self)
299 }
300
301 #[getter]
302 pub fn get_trace_id(&self) -> String {
303 self.trace_id.to_hex()
304 }
305
306 #[getter]
307 pub fn get_root_span_id(&self) -> String {
308 self.root_span_id.to_hex()
309 }
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize, Default)]
313#[pyclass]
314pub struct TraceSpanRecord {
315 #[pyo3(get)]
316 pub created_at: chrono::DateTime<Utc>,
317
318 pub trace_id: TraceId,
320 pub span_id: SpanId,
321 pub parent_span_id: Option<SpanId>,
322
323 #[pyo3(get)]
325 pub flags: i32,
326 #[pyo3(get)]
327 pub trace_state: String,
328
329 #[pyo3(get)]
331 pub scope_name: String,
332 #[pyo3(get)]
333 pub scope_version: Option<String>,
334
335 #[pyo3(get)]
337 pub span_name: String,
338 #[pyo3(get)]
339 pub span_kind: String,
340
341 #[pyo3(get)]
343 pub start_time: chrono::DateTime<Utc>,
344 #[pyo3(get)]
345 pub end_time: chrono::DateTime<Utc>,
346 #[pyo3(get)]
347 pub duration_ms: i64,
348
349 #[pyo3(get)]
351 pub status_code: i32,
352 #[pyo3(get)]
353 pub status_message: String,
354
355 #[pyo3(get)]
357 pub attributes: Vec<Attribute>,
358 #[pyo3(get)]
359 pub events: Vec<SpanEvent>,
360 #[pyo3(get)]
361 pub links: Vec<SpanLink>,
362
363 #[pyo3(get)]
365 pub label: Option<String>,
366 pub input: Value,
367 pub output: Value,
368
369 #[pyo3(get)]
371 pub service_name: String,
372 #[pyo3(get)]
373 pub resource_attributes: Vec<Attribute>,
374}
375
376#[pymethods]
377impl TraceSpanRecord {
378 #[getter]
379 pub fn get_trace_id(&self) -> String {
380 self.trace_id.to_hex()
381 }
382
383 #[getter]
384 pub fn get_span_id(&self) -> String {
385 self.span_id.to_hex()
386 }
387
388 #[getter]
389 pub fn get_parent_span_id(&self) -> Option<String> {
390 self.parent_span_id.as_ref().map(|id| id.to_hex())
391 }
392
393 #[getter]
394 pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
395 let dict = PyDict::new(py);
396 match &self.input {
397 Value::Null => {}
398 _ => {
399 json_to_pyobject(py, &self.input, &dict)?;
400 }
401 }
402 Ok(dict)
403 }
404
405 #[getter]
406 pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
407 let dict = PyDict::new(py);
408 match &self.output {
409 Value::Null => {}
410 _ => {
411 json_to_pyobject(py, &self.output, &dict)?;
412 }
413 }
414 Ok(dict)
415 }
416
417 pub fn __str__(&self) -> String {
418 PyHelperFuncs::__str__(self)
420 }
421}
422
423#[derive(Clone, Debug, Serialize, Deserialize, Default)]
424#[pyclass]
425#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
426pub struct TraceBaggageRecord {
427 #[pyo3(get)]
428 pub created_at: DateTime<Utc>,
429 pub trace_id: TraceId,
430 #[pyo3(get)]
431 pub scope: String,
432 #[pyo3(get)]
433 pub key: String,
434 #[pyo3(get)]
435 pub value: String,
436}
437
438#[pymethods]
439impl TraceBaggageRecord {
440 pub fn __str__(&self) -> String {
441 PyHelperFuncs::__str__(self)
442 }
443
444 #[getter]
445 pub fn get_trace_id(&self) -> String {
446 self.trace_id.to_hex()
447 }
448}
449
450pub type TraceRecords = (
451 Vec<TraceSpanRecord>,
452 Vec<TraceBaggageRecord>,
453 Vec<TagRecord>,
454);
455
456pub trait TraceRecordExt {
457 fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
458 Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
459 }
460
461 fn process_attributes(
462 trace_id: &TraceId,
463 span_attributes: &[KeyValue],
464 scope: &str,
465 created_at: DateTime<Utc>,
466 ) -> Result<SpanAttributes, RecordError> {
467 let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
468 let mut baggage_records = Vec::new();
469 let mut tags = Vec::new();
470 let scope_owned = scope.to_string();
471
472 for kv in span_attributes {
473 let key = &kv.key;
474
475 if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
477 if !tag_key.is_empty() {
478 let string_value = match &kv.value {
480 Some(v) => Self::otel_value_to_string(v),
481 None => "null".to_string(),
482 };
483
484 tags.push(TagRecord::from_trace(
486 trace_id,
487 tag_key.to_string(),
488 string_value.clone(),
489 ));
490
491 cleaned_attributes.push(Attribute {
493 key: tag_key.to_string(),
494 value: Value::String(string_value.clone()),
495 });
496
497 baggage_records.push(TraceBaggageRecord {
499 created_at,
500 trace_id: *trace_id,
501 scope: scope_owned.clone(),
502 key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
503 value: string_value,
504 });
505 } else {
506 tracing::warn!(
507 attribute_key = %key,
508 "Skipping baggage tag with empty key after prefix removal"
509 );
510 }
511 }
512 else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
514 if !tag_key.is_empty() {
516 let string_value = match &kv.value {
517 Some(v) => Self::otel_value_to_string(v),
518 None => "null".to_string(),
519 };
520
521 tags.push(TagRecord::from_trace(
522 trace_id,
523 tag_key.to_string(),
524 string_value.clone(),
525 ));
526
527 cleaned_attributes.push(Attribute {
528 key: tag_key.to_string(),
529 value: Value::String(string_value.clone()),
530 });
531 } else {
532 tracing::warn!(
533 attribute_key = %key,
534 "Skipping tag with empty key after prefix removal"
535 );
536 }
537 }
538 else if key.starts_with(BAGGAGE_PATTERN) {
540 let clean_key = key
541 .strip_prefix(BAGGAGE_PATTERN)
542 .unwrap_or(key)
543 .trim()
544 .to_string();
545
546 let string_value = match &kv.value {
547 Some(v) => Self::otel_value_to_string(v),
548 None => "null".to_string(),
549 };
550
551 baggage_records.push(TraceBaggageRecord {
552 created_at,
553 trace_id: *trace_id,
554 scope: scope_owned.clone(),
555 key: clean_key,
556 value: string_value,
557 });
558 }
559 else {
561 let value = match &kv.value {
562 Some(v) => otel_value_to_serde_value(v),
563 None => Value::Null,
564 };
565
566 cleaned_attributes.push(Attribute {
567 key: key.clone(),
568 value,
569 });
570 }
571 }
572
573 Ok((cleaned_attributes, baggage_records, tags))
574 }
575
576 fn otel_value_to_string(value: &AnyValue) -> String {
577 match &value.value {
578 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
579 s.clone()
580 }
581 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
582 i.to_string()
583 }
584 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
585 d.to_string()
586 }
587 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
588 b.to_string()
589 }
590 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
591 | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
592 let serde_val = otel_value_to_serde_value(value);
593 serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
594 }
595 _ => "null".to_string(),
596 }
597 }
598
599 fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
600 attributes
601 .iter()
602 .map(|kv| {
603 let value = match &kv.value {
604 Some(v) => otel_value_to_serde_value(v),
605 None => Value::Null,
606 };
607
608 Ok(Attribute {
609 key: kv.key.clone(),
610 value,
611 })
612 })
613 .collect()
614 }
615
616 fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
617 attributes
618 .iter()
619 .map(|kv| {
620 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
621 Ok(SpanEvent {
622 name: kv.name.clone(),
623 timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
624 attributes,
625 dropped_attributes_count: kv.dropped_attributes_count,
626 })
627 })
628 .collect()
629 }
630
631 fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
632 attributes
633 .iter()
634 .map(|kv| {
635 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
636 Ok(SpanLink {
637 trace_id: hex::encode(&kv.trace_id),
638 span_id: hex::encode(&kv.span_id),
639 trace_state: kv.trace_state.clone(),
640 attributes,
641 dropped_attributes_count: kv.dropped_attributes_count,
642 })
643 })
644 .collect()
645 }
646}
647
648#[derive(Clone, Debug, Serialize, Deserialize, Default)]
649pub struct TraceServerRecord {
650 pub request: ExportTraceServiceRequest,
651}
652
653impl TraceRecordExt for TraceServerRecord {}
654
655impl TraceServerRecord {
656 fn get_scope_info(
658 scope_span: &opentelemetry_proto::tonic::trace::v1::ScopeSpans,
659 ) -> (String, Option<String>) {
660 let scope_name = scope_span
661 .scope
662 .as_ref()
663 .map(|s| s.name.clone())
664 .filter(|n| !n.is_empty())
665 .unwrap_or_else(|| "unknown".to_string());
666
667 let scope_version = scope_span.scope.as_ref().and_then(|s| {
668 if s.version.is_empty() {
669 None
670 } else {
671 Some(s.version.clone())
672 }
673 });
674
675 (scope_name, scope_version)
676 }
677
678 fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
687 let start_dt = Self::safe_timestamp_conversion(start_time);
689 let end_dt = Self::safe_timestamp_conversion(end_time);
690
691 let duration_ms = if end_time >= start_time {
693 let duration_nanos = end_time.saturating_sub(start_time);
694 (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
695 } else {
696 tracing::warn!(
697 start_time = start_time,
698 end_time = end_time,
699 "Invalid timestamp order detected in trace span"
700 );
701 0
702 };
703
704 (start_dt, end_dt, duration_ms)
705 }
706
707 fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
709 if timestamp_nanos <= i64::MAX as u64 {
710 DateTime::from_timestamp_nanos(timestamp_nanos as i64)
711 } else {
712 let seconds = timestamp_nanos / 1_000_000_000;
713 let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
714
715 DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
716 tracing::warn!(
717 timestamp = timestamp_nanos,
718 seconds = seconds,
719 nanoseconds = nanoseconds,
720 "Failed to convert large timestamp, falling back to current time"
721 );
722 Utc::now()
723 })
724 }
725 }
726
727 fn span_kind_to_string(kind: i32) -> String {
729 SpanKind::try_from(kind)
730 .map(|sk| {
731 sk.as_str_name()
732 .strip_prefix("SPAN_KIND_")
733 .unwrap_or(sk.as_str_name())
734 })
735 .unwrap_or("UNSPECIFIED")
736 .to_string()
737 }
738
739 fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
740 let mut input = Value::Null;
741 let mut output = Value::Null;
742
743 for attr in attributes {
744 if attr.key == SCOUTER_TRACING_INPUT {
745 if let Value::String(s) = &attr.value {
746 input = serde_json::from_str(s).unwrap_or_else(|e| {
747 tracing::warn!(
748 key = SCOUTER_TRACING_INPUT,
749 error = %e,
750 value = s,
751 "Failed to parse input attribute as JSON, falling back to string value."
752 );
753 Value::String(s.clone()) });
755 }
756 } else if attr.key == SCOUTER_TRACING_OUTPUT {
757 if let Value::String(s) = &attr.value {
758 output = serde_json::from_str(s)
759 .unwrap_or_else(|e| {
760 tracing::warn!(
761 key = SCOUTER_TRACING_OUTPUT,
762 error = %e,
763 value = s,
764 "Failed to parse output attribute as JSON, falling back to string value."
765 );
766 Value::String(s.clone()) });
768 }
769 }
770 }
771 (input, output)
772 }
773
774 fn get_service_name_from_resource(
775 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
776 default: &str,
777 ) -> String {
778 resource
779 .as_ref()
780 .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
781 .and_then(|attr| {
782 attr.value.as_ref().and_then(|v| {
783 if let Some(
784 opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
785 ) = &v.value
786 {
787 Some(s.clone())
788 } else {
789 None
790 }
791 })
792 })
793 .unwrap_or_else(|| {
794 tracing::warn!(
795 "Service name not found in resource attributes, falling back to default: {}",
796 default
797 );
798 default.to_string()
799 })
800 }
801
802 pub fn get_trace_start_time_attribute(
805 attributes: &Vec<Attribute>,
806 start_time: &DateTime<Utc>,
807 ) -> DateTime<Utc> {
808 for attr in attributes {
809 if attr.key == TRACE_START_TIME_KEY {
810 if let Value::String(s) = &attr.value {
811 if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
812 return dt;
813 }
814 }
815 }
816 }
817
818 tracing::warn!(
819 "Trace start time attribute not found or invalid, falling back to span start_time"
820 );
821 *start_time
822 }
823
824 pub fn convert_to_baggage_records(
825 trace_id: &TraceId,
826 attributes: &Vec<Attribute>,
827 scope_name: &str,
828 ) -> Vec<TraceBaggageRecord> {
829 let baggage_kvs: Vec<(String, String)> = attributes
830 .iter()
831 .filter_map(|attr| {
832 if attr.key.starts_with(BAGGAGE_PREFIX) {
834 let clean_key = attr
835 .key
836 .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
837 .map(|stripped| stripped.trim())
838 .unwrap_or(&attr.key)
839 .to_string();
840
841 let value_string = match &attr.value {
843 Value::String(s) => s.clone(),
844 Value::Number(n) => n.to_string(),
845 Value::Bool(b) => b.to_string(),
846 Value::Null => "null".to_string(),
847 Value::Array(_) | Value::Object(_) => {
848 serde_json::to_string(&attr.value)
850 .unwrap_or_else(|_| format!("{:?}", attr.value))
851 }
852 };
853
854 Some((clean_key, value_string))
855 } else {
856 None
857 }
858 })
859 .collect();
860
861 baggage_kvs
862 .into_iter()
863 .map(|(key, value)| TraceBaggageRecord {
864 created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
865 trace_id: *trace_id,
866 scope: scope_name.to_string(),
867 key,
868 value,
869 })
870 .collect()
871 }
872
873 pub fn to_records(self) -> Result<TraceRecords, RecordError> {
874 let resource_spans = self.request.resource_spans;
875
876 let estimated_capacity: usize = resource_spans
878 .iter()
879 .map(|rs| {
880 rs.scope_spans
881 .iter()
882 .map(|ss| ss.spans.len())
883 .sum::<usize>()
884 })
885 .sum();
886
887 let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
888 let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
889 let mut tags: HashSet<TagRecord> = HashSet::new();
890
891 for resource_span in resource_spans {
892 let service_name =
894 Self::get_service_name_from_resource(&resource_span.resource, "unknown");
895 let resource_attributes = Attribute::from_resources(&resource_span.resource);
896
897 for scope_span in &resource_span.scope_spans {
898 let (scope_name, scope_version) = Self::get_scope_info(scope_span);
899
900 for span in &scope_span.spans {
901 let trace_id = TraceId::from_slice(span.trace_id.as_slice())?;
903 let span_id = SpanId::from_slice(span.span_id.as_slice())?;
904 let parent_span_id = if !span.parent_span_id.is_empty() {
905 Some(SpanId::from_slice(span.parent_span_id.as_slice())?)
906 } else {
907 None
908 };
909
910 let (start_time, end_time, duration_ms) =
911 Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
912
913 let (cleaned_attributes, span_baggage, span_tags) = Self::process_attributes(
914 &trace_id,
915 &span.attributes,
916 &scope_name,
917 start_time,
918 )?;
919
920 baggage_records.extend(span_baggage);
922 tags.extend(span_tags);
923
924 let (input, output) = Self::extract_input_output(&cleaned_attributes);
925
926 span_records.push(TraceSpanRecord {
928 created_at: start_time,
929 trace_id,
930 span_id,
931 parent_span_id,
932 flags: span.flags as i32,
933 trace_state: span.trace_state.clone(),
934 scope_name: scope_name.clone(),
935 scope_version: scope_version.clone(),
936 span_name: span.name.clone(),
937 span_kind: Self::span_kind_to_string(span.kind),
938 start_time,
939 end_time,
940 duration_ms,
941 status_code: span.status.as_ref().map(|s| s.code).unwrap_or(0),
942 status_message: span
943 .status
944 .as_ref()
945 .map(|s| s.message.clone())
946 .unwrap_or_default(),
947 attributes: cleaned_attributes,
948 events: Self::events_to_json_array(&span.events)?,
949 links: Self::links_to_json_array(&span.links)?,
950 label: None,
951 input,
952 output,
953 service_name: service_name.clone(),
954 resource_attributes: resource_attributes.clone(),
955 });
956 }
957 }
958 }
959
960 let tag_records: Vec<TagRecord> = tags.into_iter().collect();
961 Ok((span_records, baggage_records, tag_records))
962 }
963}
964
965#[derive(Clone, Debug, Serialize, Deserialize, Default)]
966#[pyclass]
967pub struct Attribute {
968 #[pyo3(get)]
969 pub key: String,
970 pub value: Value,
971}
972
973#[pymethods]
974impl Attribute {
975 #[getter]
976 pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
977 Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
978 }
979
980 pub fn __str__(&self) -> String {
981 PyHelperFuncs::__str__(self)
982 }
983}
984
985impl Attribute {
986 pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
987 Attribute {
988 key,
989 value: otel_value_to_serde_value(value),
990 }
991 }
992
993 fn from_resources(
994 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
995 ) -> Vec<Attribute> {
996 match resource {
997 Some(res) => res
998 .attributes
999 .iter()
1000 .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
1001 .collect(),
1002 None => vec![],
1003 }
1004 }
1005}
1006
1007#[derive(Clone, Debug, Serialize, Deserialize)]
1008#[pyclass]
1009pub struct SpanEvent {
1010 #[pyo3(get)]
1011 pub timestamp: chrono::DateTime<Utc>,
1012 #[pyo3(get)]
1013 pub name: String,
1014 #[pyo3(get)]
1015 pub attributes: Vec<Attribute>,
1016 #[pyo3(get)]
1017 pub dropped_attributes_count: u32,
1018}
1019
1020#[pymethods]
1021impl SpanEvent {
1022 pub fn __str__(&self) -> String {
1023 PyHelperFuncs::__str__(self)
1024 }
1025}
1026
1027#[derive(Clone, Debug, Serialize, Deserialize)]
1028#[pyclass]
1029pub struct SpanLink {
1030 #[pyo3(get)]
1031 pub trace_id: String,
1032 #[pyo3(get)]
1033 pub span_id: String,
1034 #[pyo3(get)]
1035 pub trace_state: String,
1036 #[pyo3(get)]
1037 pub attributes: Vec<Attribute>,
1038 #[pyo3(get)]
1039 pub dropped_attributes_count: u32,
1040}
1041
1042#[pymethods]
1043impl SpanLink {
1044 pub fn __str__(&self) -> String {
1045 PyHelperFuncs::__str__(self)
1046 }
1047}
1048
1049#[derive(Clone, Debug, Serialize, Deserialize)]
1050#[pyclass]
1051pub struct Tag {
1052 #[pyo3(get)]
1053 pub key: String,
1054 #[pyo3(get)]
1055 pub value: String,
1056}
1057
1058#[pymethods]
1059impl Tag {
1060 pub fn __str__(&self) -> String {
1061 PyHelperFuncs::__str__(self)
1062 }
1063}
1064
1065#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
1066#[pyclass]
1067#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
1068pub struct TagRecord {
1069 #[pyo3(get)]
1070 pub entity_type: String,
1071 #[pyo3(get)]
1072 pub entity_id: String,
1073 #[pyo3(get)]
1074 pub key: String,
1075 #[pyo3(get)]
1076 pub value: String,
1077}
1078
1079impl TagRecord {
1080 pub fn from_trace(trace_id: &TraceId, key: String, value: String) -> Self {
1082 Self {
1083 entity_type: "trace".to_string(),
1084 entity_id: trace_id.to_hex(),
1085 key,
1086 value,
1087 }
1088 }
1089}
1090
1091#[pymethods]
1092impl TagRecord {
1093 pub fn __str__(&self) -> String {
1094 PyHelperFuncs::__str__(self)
1095 }
1096}
1097
1098pub fn build_trace_spans(records: Vec<TraceSpanRecord>) -> Vec<sql::TraceSpan> {
1103 if records.is_empty() {
1104 return Vec::new();
1105 }
1106
1107 let mut groups: HashMap<String, Vec<&TraceSpanRecord>> = HashMap::new();
1109 for record in &records {
1110 groups
1111 .entry(record.trace_id.to_hex())
1112 .or_default()
1113 .push(record);
1114 }
1115
1116 let mut all_spans = Vec::with_capacity(records.len());
1117 let mut global_order: i32 = 0;
1118
1119 for spans in groups.values() {
1120 let mut children: HashMap<[u8; 8], Vec<usize>> = HashMap::new();
1122 let mut root_indices: Vec<usize> = Vec::new();
1123
1124 for (i, span) in spans.iter().enumerate() {
1125 if let Some(pid) = &span.parent_span_id {
1126 children.entry(*pid.as_bytes()).or_default().push(i);
1127 } else {
1128 root_indices.push(i);
1129 }
1130 }
1131
1132 root_indices.sort_by_key(|&i| spans[i].start_time);
1134
1135 let root_span_id_hex = if let Some(&first_root) = root_indices.first() {
1137 spans[first_root].span_id.to_hex()
1138 } else {
1139 spans[0].span_id.to_hex()
1141 };
1142
1143 let pre_dfs_len = all_spans.len();
1145 dfs_assign_records(
1146 &root_indices,
1147 spans,
1148 &children,
1149 &root_span_id_hex,
1150 &mut all_spans,
1151 &mut global_order,
1152 );
1153
1154 let visited: HashSet<[u8; 8]> = all_spans[pre_dfs_len..]
1157 .iter()
1158 .filter_map(|s| {
1159 let bytes = SpanId::hex_to_bytes(&s.span_id).ok()?;
1160 let arr: [u8; 8] = bytes.try_into().ok()?;
1161 Some(arr)
1162 })
1163 .collect();
1164
1165 for span in spans {
1166 if !visited.contains(span.span_id.as_bytes()) {
1167 let span_id_hex = span.span_id.to_hex();
1168 all_spans.push(record_to_trace_span(
1169 span,
1170 &span_id_hex,
1171 &root_span_id_hex,
1172 0,
1173 vec![span_id_hex.clone()],
1174 global_order,
1175 ));
1176 global_order += 1;
1177 }
1178 }
1179 }
1180
1181 all_spans
1182}
1183
1184fn dfs_assign_records(
1186 root_indices: &[usize],
1187 spans: &[&TraceSpanRecord],
1188 children: &HashMap<[u8; 8], Vec<usize>>,
1189 root_span_id_hex: &str,
1190 result: &mut Vec<sql::TraceSpan>,
1191 span_order: &mut i32,
1192) {
1193 let mut stack: Vec<(usize, i32, Vec<String>)> = Vec::new();
1195 let mut visited: HashSet<usize> = HashSet::new();
1196
1197 for &idx in root_indices.iter().rev() {
1199 stack.push((idx, 0, Vec::new()));
1200 }
1201
1202 while let Some((idx, depth, path_so_far)) = stack.pop() {
1203 if !visited.insert(idx) {
1204 continue; }
1206 let span = spans[idx];
1207 let span_id_hex = span.span_id.to_hex();
1208
1209 let mut path = path_so_far;
1210 path.push(span_id_hex.clone());
1211
1212 result.push(record_to_trace_span(
1213 span,
1214 &span_id_hex,
1215 root_span_id_hex,
1216 depth,
1217 path.clone(),
1218 *span_order,
1219 ));
1220 *span_order += 1;
1221
1222 if let Some(child_indices) = children.get(span.span_id.as_bytes()) {
1224 let mut sorted = child_indices.clone();
1225 sorted.sort_by_key(|&i| spans[i].start_time);
1226 for &ci in sorted.iter().rev() {
1227 stack.push((ci, depth + 1, path.clone()));
1228 }
1229 }
1230 }
1231}
1232
1233fn record_to_trace_span(
1234 record: &TraceSpanRecord,
1235 span_id_hex: &str,
1236 root_span_id_hex: &str,
1237 depth: i32,
1238 path: Vec<String>,
1239 span_order: i32,
1240) -> sql::TraceSpan {
1241 let input = match &record.input {
1242 Value::Null => None,
1243 v => Some(v.clone()),
1244 };
1245 let output = match &record.output {
1246 Value::Null => None,
1247 v => Some(v.clone()),
1248 };
1249
1250 sql::TraceSpan {
1251 trace_id: record.trace_id.to_hex(),
1252 span_id: span_id_hex.to_string(),
1253 parent_span_id: record.parent_span_id.as_ref().map(|id| id.to_hex()),
1254 span_name: record.span_name.clone(),
1255 span_kind: Some(record.span_kind.clone()),
1256 start_time: record.start_time,
1257 end_time: record.end_time,
1258 duration_ms: record.duration_ms,
1259 status_code: record.status_code,
1260 status_message: Some(record.status_message.clone()),
1261 attributes: record.attributes.clone(),
1262 events: record.events.clone(),
1263 links: record.links.clone(),
1264 depth,
1265 path,
1266 root_span_id: root_span_id_hex.to_string(),
1267 service_name: record.service_name.clone(),
1268 span_order,
1269 input,
1270 output,
1271 }
1272}
1273
1274#[derive(Clone, Debug)]
1279pub struct TraceSummaryRecord {
1280 pub trace_id: TraceId,
1281 pub service_name: String,
1282 pub scope_name: String,
1283 pub scope_version: String,
1284 pub root_operation: String,
1285 pub start_time: DateTime<Utc>,
1286 pub end_time: Option<DateTime<Utc>>,
1287 pub status_code: i32,
1288 pub status_message: String,
1289 pub span_count: i64,
1290 pub error_count: i64,
1291 pub resource_attributes: Vec<Attribute>,
1292 pub entity_ids: Vec<String>,
1294 pub queue_ids: Vec<String>,
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 use super::*;
1301
1302 fn make_span_record(
1303 trace_id: [u8; 16],
1304 span_id: [u8; 8],
1305 parent_span_id: Option<[u8; 8]>,
1306 name: &str,
1307 start_ms: i64,
1308 ) -> TraceSpanRecord {
1309 TraceSpanRecord {
1310 trace_id: TraceId::from_bytes(trace_id),
1311 span_id: SpanId::from_bytes(span_id),
1312 parent_span_id: parent_span_id.map(SpanId::from_bytes),
1313 span_name: name.to_string(),
1314 start_time: chrono::DateTime::from_timestamp_millis(start_ms).unwrap(),
1315 end_time: chrono::DateTime::from_timestamp_millis(start_ms + 100).unwrap(),
1316 duration_ms: 100,
1317 ..Default::default()
1318 }
1319 }
1320
1321 #[test]
1322 fn build_trace_spans_empty() {
1323 let result = build_trace_spans(vec![]);
1324 assert!(result.is_empty());
1325 }
1326
1327 #[test]
1328 fn build_trace_spans_simple_tree() {
1329 let tid = [0u8; 16];
1330 let root_sid = [1u8; 8];
1331 let child_sid = [2u8; 8];
1332
1333 let records = vec![
1334 make_span_record(tid, root_sid, None, "root", 1000),
1335 make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1336 ];
1337
1338 let spans = build_trace_spans(records);
1339 assert_eq!(spans.len(), 2);
1340
1341 let root = spans.iter().find(|s| s.span_name == "root").unwrap();
1343 assert_eq!(root.depth, 0);
1344 assert_eq!(root.span_order, 0);
1345 assert!(root.parent_span_id.is_none());
1346 assert_eq!(root.path.len(), 1);
1347
1348 let child = spans.iter().find(|s| s.span_name == "child").unwrap();
1350 assert_eq!(child.depth, 1);
1351 assert_eq!(child.span_order, 1);
1352 assert!(child.parent_span_id.is_some());
1353 assert_eq!(child.path.len(), 2);
1354 assert_eq!(child.root_span_id, root.span_id);
1355 }
1356
1357 #[test]
1358 fn build_trace_spans_orphan_spans() {
1359 let tid = [0u8; 16];
1360 let orphan_sid = [3u8; 8];
1361 let missing_parent = [99u8; 8];
1363
1364 let records = vec![make_span_record(
1365 tid,
1366 orphan_sid,
1367 Some(missing_parent),
1368 "orphan",
1369 1000,
1370 )];
1371
1372 let spans = build_trace_spans(records);
1373 assert_eq!(spans.len(), 1);
1374
1375 let orphan = &spans[0];
1376 assert_eq!(orphan.span_name, "orphan");
1377 assert_eq!(orphan.depth, 0);
1378 }
1379
1380 #[test]
1381 fn build_trace_spans_multiple_traces() {
1382 let tid1 = [1u8; 16];
1383 let tid2 = [2u8; 16];
1384
1385 let records = vec![
1386 make_span_record(tid1, [10u8; 8], None, "trace1_root", 1000),
1387 make_span_record(tid2, [20u8; 8], None, "trace2_root", 2000),
1388 make_span_record(tid1, [11u8; 8], Some([10u8; 8]), "trace1_child", 1050),
1389 ];
1390
1391 let spans = build_trace_spans(records);
1392 assert_eq!(spans.len(), 3);
1393
1394 let t1_spans: Vec<_> = spans
1396 .iter()
1397 .filter(|s| s.trace_id == TraceId::from_bytes(tid1).to_hex())
1398 .collect();
1399 assert_eq!(t1_spans.len(), 2);
1400
1401 let t2_spans: Vec<_> = spans
1402 .iter()
1403 .filter(|s| s.trace_id == TraceId::from_bytes(tid2).to_hex())
1404 .collect();
1405 assert_eq!(t2_spans.len(), 1);
1406 }
1407
1408 #[test]
1409 fn build_trace_spans_deep_tree() {
1410 let tid = [0u8; 16];
1411 let root_sid = [1u8; 8];
1412 let child_sid = [2u8; 8];
1413 let grandchild_sid = [3u8; 8];
1414
1415 let records = vec![
1416 make_span_record(tid, root_sid, None, "root", 1000),
1417 make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1418 make_span_record(tid, grandchild_sid, Some(child_sid), "grandchild", 1100),
1419 ];
1420
1421 let spans = build_trace_spans(records);
1422 assert_eq!(spans.len(), 3);
1423
1424 let grandchild = spans.iter().find(|s| s.span_name == "grandchild").unwrap();
1425 assert_eq!(grandchild.depth, 2);
1426 assert_eq!(grandchild.path.len(), 3); }
1428
1429 #[test]
1430 fn build_trace_spans_cross_group_collision() {
1431 let tid1 = [1u8; 16];
1434 let tid2 = [2u8; 16];
1435 let shared_sid = [42u8; 8]; let records = vec![
1438 make_span_record(tid1, shared_sid, None, "trace1_root", 1000),
1439 make_span_record(tid2, shared_sid, None, "trace2_root", 2000),
1440 ];
1441
1442 let spans = build_trace_spans(records);
1443 assert_eq!(spans.len(), 2);
1445
1446 let names: HashSet<&str> = spans.iter().map(|s| s.span_name.as_str()).collect();
1447 assert!(names.contains("trace1_root"));
1448 assert!(names.contains("trace2_root"));
1449 }
1450
1451 #[test]
1452 fn build_trace_spans_input_output_mapping() {
1453 let tid = [0u8; 16];
1454 let records = vec![TraceSpanRecord {
1455 trace_id: TraceId::from_bytes(tid),
1456 span_id: SpanId::from_bytes([1u8; 8]),
1457 parent_span_id: None,
1458 span_name: "test".to_string(),
1459 input: serde_json::json!({"key": "value"}),
1460 output: Value::Null,
1461 ..Default::default()
1462 }];
1463
1464 let spans = build_trace_spans(records);
1465 assert_eq!(spans.len(), 1);
1466 assert!(spans[0].input.is_some());
1467 assert!(spans[0].output.is_none()); }
1469
1470 #[test]
1471 fn build_trace_spans_cycle_does_not_loop() {
1472 let tid = [0u8; 16];
1475 let span_a = [1u8; 8];
1476 let span_b = [2u8; 8];
1477
1478 let records = vec![
1482 make_span_record(tid, span_a, Some(span_b), "A", 1000),
1483 make_span_record(tid, span_b, Some(span_a), "B", 1050),
1484 ];
1485
1486 let spans = build_trace_spans(records);
1489 assert_eq!(spans.len(), 2, "Both spans should appear exactly once");
1490 }
1491}