1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use pyo3::prelude::*;
16use pyo3::types::PyDict;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::fmt;
22
23pub const FUNCTION_TYPE: &str = "function.type";
24pub const FUNCTION_STREAMING: &str = "function.streaming";
25pub const FUNCTION_NAME: &str = "function.name";
26pub const FUNCTION_MODULE: &str = "function.module";
27pub const FUNCTION_QUALNAME: &str = "function.qualname";
28pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
29pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
30pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
31pub const SERVICE_NAME: &str = "service.name";
32pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
33pub const BAGGAGE_PREFIX: &str = "baggage";
34pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
35pub const SCOUTER_SCOPE: &str = "scouter.scope";
36pub const SCOUTER_SCOPE_DEFAULT: &str = concat!("scouter.tracer.", env!("CARGO_PKG_VERSION"));
37pub const SPAN_ERROR: &str = "span.error";
38pub const EXCEPTION_TRACEBACK: &str = "exception.traceback";
39pub const SCOUTER_EVAL_SCENARIO_ID_ATTR: &str = "scouter.eval.scenario_id";
40pub const SCOUTER_QUEUE_RECORD: &str = "scouter.queue.record";
41pub const SCOUTER_QUEUE_EVENT: &str = "scouter.queue.event";
42pub const SCOUTER_ENTITY: &str = "scouter.entity";
43
44pub const BAGGAGE_PATTERN: &str = "baggage.";
46pub const BAGGAGE_TAG_PATTERN: &str = concat!("baggage", ".", "scouter.tracing.tag", ".");
47pub const TAG_PATTERN: &str = concat!("scouter.tracing.tag", ".");
48
49type SpanAttributes = (Vec<Attribute>, Vec<TraceBaggageRecord>, Vec<TagRecord>);
50
51#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq, Hash)]
52pub struct ScouterEntityAttribute {
53 pub uid: String,
54 pub r#type: String,
55 pub space: String,
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
59pub struct TraceId([u8; 16]);
60
61impl TraceId {
62 pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
63 let mut bytes = [0u8; 16];
64 hex::decode_to_slice(hex, &mut bytes)?;
65 Ok(Self(bytes))
66 }
67
68 pub fn from_bytes(bytes: [u8; 16]) -> Self {
69 Self(bytes)
70 }
71
72 pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
73 if slice.len() != 16 {
74 return Err(RecordError::SliceError(format!(
75 "Invalid trace_id length: expected 16 bytes, got {}",
76 slice.len()
77 )));
78 }
79 let mut bytes = [0u8; 16];
80 bytes.copy_from_slice(slice);
81 Ok(Self(bytes))
82 }
83
84 pub fn to_hex(&self) -> String {
85 hex::encode(self.0)
86 }
87
88 pub fn as_bytes(&self) -> &[u8; 16] {
89 &self.0
90 }
91
92 pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
93 let bytes = hex::decode(hex)?;
94 if bytes.len() == 16 {
96 Ok(bytes)
97 } else {
98 Err(RecordError::SliceError(format!(
99 "Invalid hex string length: expected 16 or 8 bytes, got {}",
100 bytes.len()
101 )))
102 }
103 }
104}
105
106impl fmt::Display for TraceId {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 write!(f, "{}", self.to_hex())
109 }
110}
111
112impl Serialize for TraceId {
113 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
114 where
115 S: serde::Serializer,
116 {
117 serializer.serialize_str(&self.to_hex())
118 }
119}
120
121impl<'de> Deserialize<'de> for TraceId {
122 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
123 where
124 D: serde::Deserializer<'de>,
125 {
126 let hex = String::deserialize(deserializer)?;
127 TraceId::from_hex(&hex).map_err(serde::de::Error::custom)
128 }
129}
130
131#[cfg(feature = "server")]
132impl<'r> sqlx::Decode<'r, sqlx::Postgres> for TraceId {
133 fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
134 let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
135 if bytes.len() != 16 {
136 return Err("TraceId must be exactly 16 bytes".into());
137 }
138 let mut array = [0u8; 16];
139 array.copy_from_slice(bytes);
140 Ok(TraceId(array))
141 }
142}
143
144#[cfg(feature = "server")]
145impl sqlx::Type<sqlx::Postgres> for TraceId {
146 fn type_info() -> sqlx::postgres::PgTypeInfo {
147 <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
148 }
149}
150
151#[cfg(feature = "server")]
152impl sqlx::Encode<'_, sqlx::Postgres> for TraceId {
153 fn encode_by_ref(
154 &self,
155 buf: &mut sqlx::postgres::PgArgumentBuffer,
156 ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
157 <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
158 }
159}
160
161#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
162pub struct SpanId([u8; 8]);
163
164impl SpanId {
165 pub fn from_hex(hex: &str) -> Result<Self, hex::FromHexError> {
166 let mut bytes = [0u8; 8];
167 hex::decode_to_slice(hex, &mut bytes)?;
168 Ok(Self(bytes))
169 }
170
171 pub fn from_bytes(bytes: [u8; 8]) -> Self {
172 Self(bytes)
173 }
174
175 pub fn from_slice(slice: &[u8]) -> Result<Self, RecordError> {
176 if slice.len() != 8 {
177 return Err(RecordError::SliceError(format!(
178 "Invalid trace_id length: expected 8 bytes, got {}",
179 slice.len()
180 )));
181 }
182 let mut bytes = [0u8; 8];
183 bytes.copy_from_slice(slice);
184 Ok(Self(bytes))
185 }
186
187 pub fn to_hex(&self) -> String {
188 hex::encode(self.0)
189 }
190
191 pub fn as_bytes(&self) -> &[u8; 8] {
192 &self.0
193 }
194
195 pub fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, RecordError> {
196 let bytes = hex::decode(hex)?;
197 if bytes.len() == 8 {
198 Ok(bytes)
199 } else {
200 Err(RecordError::SliceError(format!(
201 "Invalid hex string length: expected 16 or 8 bytes, got {}",
202 bytes.len()
203 )))
204 }
205 }
206}
207
208impl fmt::Display for SpanId {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 write!(f, "{}", self.to_hex())
211 }
212}
213
214impl Serialize for SpanId {
215 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
216 where
217 S: serde::Serializer,
218 {
219 serializer.serialize_str(&self.to_hex())
220 }
221}
222
223impl<'de> Deserialize<'de> for SpanId {
224 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
225 where
226 D: serde::Deserializer<'de>,
227 {
228 let hex = String::deserialize(deserializer)?;
229 SpanId::from_hex(&hex).map_err(serde::de::Error::custom)
230 }
231}
232
233#[cfg(feature = "server")]
234impl<'r> sqlx::Decode<'r, sqlx::Postgres> for SpanId {
235 fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
236 let bytes = <&[u8] as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
237 if bytes.len() != 8 {
238 return Err("SpanId must be exactly 8 bytes".into());
239 }
240 let mut array = [0u8; 8];
241 array.copy_from_slice(bytes);
242 Ok(SpanId(array))
243 }
244}
245
246#[cfg(feature = "server")]
247impl sqlx::Type<sqlx::Postgres> for SpanId {
248 fn type_info() -> sqlx::postgres::PgTypeInfo {
249 <Vec<u8> as sqlx::Type<sqlx::Postgres>>::type_info()
250 }
251}
252
253#[cfg(feature = "server")]
254impl sqlx::Encode<'_, sqlx::Postgres> for SpanId {
255 fn encode_by_ref(
256 &self,
257 buf: &mut sqlx::postgres::PgArgumentBuffer,
258 ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
259 <&[u8] as sqlx::Encode<sqlx::Postgres>>::encode(&self.0[..], buf)
260 }
261}
262
263#[derive(Clone, Debug, Serialize, Deserialize, Default)]
264#[pyclass]
265pub struct TraceRecord {
266 #[pyo3(get)]
267 pub created_at: DateTime<Utc>,
268 pub trace_id: TraceId,
269 #[pyo3(get)]
270 pub service_name: String,
271 #[pyo3(get)]
272 pub scope_name: String,
273 #[pyo3(get)]
274 pub scope_version: Option<String>,
275 #[pyo3(get)]
276 pub trace_state: String,
277 #[pyo3(get)]
278 pub start_time: chrono::DateTime<Utc>,
279 #[pyo3(get)]
280 pub end_time: chrono::DateTime<Utc>,
281 #[pyo3(get)]
282 pub duration_ms: i64,
283 #[pyo3(get)]
284 pub status_code: i32,
285 #[pyo3(get)]
286 pub status_message: String,
287 pub root_span_id: SpanId,
288 #[pyo3(get)]
289 pub span_count: i32,
290 #[pyo3(get)]
291 pub tags: Vec<Tag>,
292 #[pyo3(get)]
293 pub process_attributes: Vec<Attribute>,
294}
295
296#[pymethods]
297impl TraceRecord {
298 pub fn __str__(&self) -> String {
299 PyHelperFuncs::__str__(self)
300 }
301
302 #[getter]
303 pub fn get_trace_id(&self) -> String {
304 self.trace_id.to_hex()
305 }
306
307 #[getter]
308 pub fn get_root_span_id(&self) -> String {
309 self.root_span_id.to_hex()
310 }
311}
312
313#[derive(Clone, Debug, Serialize, Deserialize, Default)]
314#[pyclass]
315pub struct TraceSpanRecord {
316 #[pyo3(get)]
317 pub created_at: chrono::DateTime<Utc>,
318
319 pub trace_id: TraceId,
321 pub span_id: SpanId,
322 pub parent_span_id: Option<SpanId>,
323
324 #[pyo3(get)]
326 pub flags: i32,
327 #[pyo3(get)]
328 pub trace_state: String,
329
330 #[pyo3(get)]
332 pub scope_name: String,
333 #[pyo3(get)]
334 pub scope_version: Option<String>,
335
336 #[pyo3(get)]
338 pub span_name: String,
339 #[pyo3(get)]
340 pub span_kind: String,
341
342 #[pyo3(get)]
344 pub start_time: chrono::DateTime<Utc>,
345 #[pyo3(get)]
346 pub end_time: chrono::DateTime<Utc>,
347 #[pyo3(get)]
348 pub duration_ms: i64,
349
350 #[pyo3(get)]
352 pub status_code: i32,
353 #[pyo3(get)]
354 pub status_message: String,
355
356 #[pyo3(get)]
358 pub attributes: Vec<Attribute>,
359 #[pyo3(get)]
360 pub events: Vec<SpanEvent>,
361 #[pyo3(get)]
362 pub links: Vec<SpanLink>,
363
364 #[pyo3(get)]
366 pub label: Option<String>,
367 pub input: Value,
368 pub output: Value,
369
370 #[pyo3(get)]
372 pub service_name: String,
373 #[pyo3(get)]
374 pub resource_attributes: Vec<Attribute>,
375}
376
377#[pymethods]
378impl TraceSpanRecord {
379 #[getter]
380 pub fn get_trace_id(&self) -> String {
381 self.trace_id.to_hex()
382 }
383
384 #[getter]
385 pub fn get_span_id(&self) -> String {
386 self.span_id.to_hex()
387 }
388
389 #[getter]
390 pub fn get_parent_span_id(&self) -> Option<String> {
391 self.parent_span_id.as_ref().map(|id| id.to_hex())
392 }
393
394 #[getter]
395 pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
396 let dict = PyDict::new(py);
397 match &self.input {
398 Value::Null => {}
399 _ => {
400 json_to_pyobject(py, &self.input, &dict)?;
401 }
402 }
403 Ok(dict)
404 }
405
406 #[getter]
407 pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
408 let dict = PyDict::new(py);
409 match &self.output {
410 Value::Null => {}
411 _ => {
412 json_to_pyobject(py, &self.output, &dict)?;
413 }
414 }
415 Ok(dict)
416 }
417
418 pub fn __str__(&self) -> String {
419 PyHelperFuncs::__str__(self)
421 }
422}
423
424#[derive(Clone, Debug, Serialize, Deserialize, Default)]
425#[pyclass]
426#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
427pub struct TraceBaggageRecord {
428 #[pyo3(get)]
429 pub created_at: DateTime<Utc>,
430 pub trace_id: TraceId,
431 #[pyo3(get)]
432 pub scope: String,
433 #[pyo3(get)]
434 pub key: String,
435 #[pyo3(get)]
436 pub value: String,
437}
438
439#[pymethods]
440impl TraceBaggageRecord {
441 pub fn __str__(&self) -> String {
442 PyHelperFuncs::__str__(self)
443 }
444
445 #[getter]
446 pub fn get_trace_id(&self) -> String {
447 self.trace_id.to_hex()
448 }
449}
450
451pub type TraceRecords = (
452 Vec<TraceSpanRecord>,
453 Vec<TraceBaggageRecord>,
454 Vec<TagRecord>,
455);
456
457pub trait TraceRecordExt {
458 fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
459 Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
460 }
461
462 fn process_attributes(
463 trace_id: &TraceId,
464 span_attributes: &[KeyValue],
465 scope: &str,
466 created_at: DateTime<Utc>,
467 ) -> Result<SpanAttributes, RecordError> {
468 let mut cleaned_attributes = Vec::with_capacity(span_attributes.len());
469 let mut baggage_records = Vec::new();
470 let mut tags = Vec::new();
471 let scope_owned = scope.to_string();
472
473 for kv in span_attributes {
474 let key = &kv.key;
475
476 if let Some(tag_key) = key.strip_prefix(BAGGAGE_TAG_PATTERN) {
478 if !tag_key.is_empty() {
479 let string_value = match &kv.value {
481 Some(v) => Self::otel_value_to_string(v),
482 None => "null".to_string(),
483 };
484
485 tags.push(TagRecord::from_trace(
487 trace_id,
488 tag_key.to_string(),
489 string_value.clone(),
490 ));
491
492 cleaned_attributes.push(Attribute {
494 key: tag_key.to_string(),
495 value: Value::String(string_value.clone()),
496 });
497
498 baggage_records.push(TraceBaggageRecord {
500 created_at,
501 trace_id: *trace_id,
502 scope: scope_owned.clone(),
503 key: format!("{}.{}", SCOUTER_TAG_PREFIX, tag_key),
504 value: string_value,
505 });
506 } else {
507 tracing::warn!(
508 attribute_key = %key,
509 "Skipping baggage tag with empty key after prefix removal"
510 );
511 }
512 }
513 else if let Some(tag_key) = key.strip_prefix(TAG_PATTERN) {
515 if !tag_key.is_empty() {
517 let string_value = match &kv.value {
518 Some(v) => Self::otel_value_to_string(v),
519 None => "null".to_string(),
520 };
521
522 tags.push(TagRecord::from_trace(
523 trace_id,
524 tag_key.to_string(),
525 string_value.clone(),
526 ));
527
528 cleaned_attributes.push(Attribute {
529 key: tag_key.to_string(),
530 value: Value::String(string_value.clone()),
531 });
532 } else {
533 tracing::warn!(
534 attribute_key = %key,
535 "Skipping tag with empty key after prefix removal"
536 );
537 }
538 }
539 else if key.starts_with(BAGGAGE_PATTERN) {
541 let clean_key = key
542 .strip_prefix(BAGGAGE_PATTERN)
543 .unwrap_or(key)
544 .trim()
545 .to_string();
546
547 let string_value = match &kv.value {
548 Some(v) => Self::otel_value_to_string(v),
549 None => "null".to_string(),
550 };
551
552 baggage_records.push(TraceBaggageRecord {
553 created_at,
554 trace_id: *trace_id,
555 scope: scope_owned.clone(),
556 key: clean_key,
557 value: string_value,
558 });
559 }
560 else {
562 let value = match &kv.value {
563 Some(v) => otel_value_to_serde_value(v),
564 None => Value::Null,
565 };
566
567 cleaned_attributes.push(Attribute {
568 key: key.clone(),
569 value,
570 });
571 }
572 }
573
574 Ok((cleaned_attributes, baggage_records, tags))
575 }
576
577 fn otel_value_to_string(value: &AnyValue) -> String {
578 match &value.value {
579 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
580 s.clone()
581 }
582 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i)) => {
583 i.to_string()
584 }
585 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d)) => {
586 d.to_string()
587 }
588 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b)) => {
589 b.to_string()
590 }
591 Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_))
592 | Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => {
593 let serde_val = otel_value_to_serde_value(value);
594 serde_json::to_string(&serde_val).unwrap_or_else(|_| format!("{:?}", value))
595 }
596 _ => "null".to_string(),
597 }
598 }
599
600 fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
601 attributes
602 .iter()
603 .map(|kv| {
604 let value = match &kv.value {
605 Some(v) => otel_value_to_serde_value(v),
606 None => Value::Null,
607 };
608
609 Ok(Attribute {
610 key: kv.key.clone(),
611 value,
612 })
613 })
614 .collect()
615 }
616
617 fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
618 attributes
619 .iter()
620 .map(|kv| {
621 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
622 Ok(SpanEvent {
623 name: kv.name.clone(),
624 timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
625 attributes,
626 dropped_attributes_count: kv.dropped_attributes_count,
627 })
628 })
629 .collect()
630 }
631
632 fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
633 attributes
634 .iter()
635 .map(|kv| {
636 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
637 Ok(SpanLink {
638 trace_id: hex::encode(&kv.trace_id),
639 span_id: hex::encode(&kv.span_id),
640 trace_state: kv.trace_state.clone(),
641 attributes,
642 dropped_attributes_count: kv.dropped_attributes_count,
643 })
644 })
645 .collect()
646 }
647}
648
649#[derive(Clone, Debug, Serialize, Deserialize, Default)]
650pub struct TraceServerRecord {
651 pub request: ExportTraceServiceRequest,
652}
653
654impl TraceRecordExt for TraceServerRecord {}
655
656impl TraceServerRecord {
657 fn get_scope_info(
659 scope_span: &opentelemetry_proto::tonic::trace::v1::ScopeSpans,
660 ) -> (String, Option<String>) {
661 let scope_name = scope_span
662 .scope
663 .as_ref()
664 .map(|s| s.name.clone())
665 .filter(|n| !n.is_empty())
666 .unwrap_or_else(|| "unknown".to_string());
667
668 let scope_version = scope_span.scope.as_ref().and_then(|s| {
669 if s.version.is_empty() {
670 None
671 } else {
672 Some(s.version.clone())
673 }
674 });
675
676 (scope_name, scope_version)
677 }
678
679 fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
688 let start_dt = Self::safe_timestamp_conversion(start_time);
690 let end_dt = Self::safe_timestamp_conversion(end_time);
691
692 let duration_ms = if end_time >= start_time {
694 let duration_nanos = end_time.saturating_sub(start_time);
695 (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
696 } else {
697 tracing::warn!(
698 start_time = start_time,
699 end_time = end_time,
700 "Invalid timestamp order detected in trace span"
701 );
702 0
703 };
704
705 (start_dt, end_dt, duration_ms)
706 }
707
708 fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
710 if timestamp_nanos <= i64::MAX as u64 {
711 DateTime::from_timestamp_nanos(timestamp_nanos as i64)
712 } else {
713 let seconds = timestamp_nanos / 1_000_000_000;
714 let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
715
716 DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
717 tracing::warn!(
718 timestamp = timestamp_nanos,
719 seconds = seconds,
720 nanoseconds = nanoseconds,
721 "Failed to convert large timestamp, falling back to current time"
722 );
723 Utc::now()
724 })
725 }
726 }
727
728 fn span_kind_to_string(kind: i32) -> String {
730 SpanKind::try_from(kind)
731 .map(|sk| {
732 sk.as_str_name()
733 .strip_prefix("SPAN_KIND_")
734 .unwrap_or(sk.as_str_name())
735 })
736 .unwrap_or("UNSPECIFIED")
737 .to_string()
738 }
739
740 fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
741 let mut input = Value::Null;
742 let mut output = Value::Null;
743
744 for attr in attributes {
745 if attr.key == SCOUTER_TRACING_INPUT {
746 if let Value::String(s) = &attr.value {
747 input = serde_json::from_str(s).unwrap_or_else(|e| {
748 tracing::warn!(
749 key = SCOUTER_TRACING_INPUT,
750 error = %e,
751 value = s,
752 "Failed to parse input attribute as JSON, falling back to string value."
753 );
754 Value::String(s.clone()) });
756 }
757 } else if attr.key == SCOUTER_TRACING_OUTPUT {
758 if let Value::String(s) = &attr.value {
759 output = serde_json::from_str(s)
760 .unwrap_or_else(|e| {
761 tracing::warn!(
762 key = SCOUTER_TRACING_OUTPUT,
763 error = %e,
764 value = s,
765 "Failed to parse output attribute as JSON, falling back to string value."
766 );
767 Value::String(s.clone()) });
769 }
770 }
771 }
772 (input, output)
773 }
774
775 fn get_service_name_from_resource(
776 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
777 default: &str,
778 ) -> String {
779 resource
780 .as_ref()
781 .and_then(|r| r.attributes.iter().find(|attr| attr.key == SERVICE_NAME))
782 .and_then(|attr| {
783 attr.value.as_ref().and_then(|v| {
784 if let Some(
785 opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s),
786 ) = &v.value
787 {
788 Some(s.clone())
789 } else {
790 None
791 }
792 })
793 })
794 .unwrap_or_else(|| {
795 tracing::warn!(
796 "Service name not found in resource attributes, falling back to default: {}",
797 default
798 );
799 default.to_string()
800 })
801 }
802
803 pub fn get_trace_start_time_attribute(
806 attributes: &Vec<Attribute>,
807 start_time: &DateTime<Utc>,
808 ) -> DateTime<Utc> {
809 for attr in attributes {
810 if attr.key == TRACE_START_TIME_KEY {
811 if let Value::String(s) = &attr.value {
812 if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
813 return dt;
814 }
815 }
816 }
817 }
818
819 tracing::warn!(
820 "Trace start time attribute not found or invalid, falling back to span start_time"
821 );
822 *start_time
823 }
824
825 pub fn convert_to_baggage_records(
826 trace_id: &TraceId,
827 attributes: &Vec<Attribute>,
828 scope_name: &str,
829 ) -> Vec<TraceBaggageRecord> {
830 let baggage_kvs: Vec<(String, String)> = attributes
831 .iter()
832 .filter_map(|attr| {
833 if attr.key.starts_with(BAGGAGE_PREFIX) {
835 let clean_key = attr
836 .key
837 .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
838 .map(|stripped| stripped.trim())
839 .unwrap_or(&attr.key)
840 .to_string();
841
842 let value_string = match &attr.value {
844 Value::String(s) => s.clone(),
845 Value::Number(n) => n.to_string(),
846 Value::Bool(b) => b.to_string(),
847 Value::Null => "null".to_string(),
848 Value::Array(_) | Value::Object(_) => {
849 serde_json::to_string(&attr.value)
851 .unwrap_or_else(|_| format!("{:?}", attr.value))
852 }
853 };
854
855 Some((clean_key, value_string))
856 } else {
857 None
858 }
859 })
860 .collect();
861
862 baggage_kvs
863 .into_iter()
864 .map(|(key, value)| TraceBaggageRecord {
865 created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
866 trace_id: *trace_id,
867 scope: scope_name.to_string(),
868 key,
869 value,
870 })
871 .collect()
872 }
873
874 pub fn to_records(self) -> Result<TraceRecords, RecordError> {
875 let resource_spans = self.request.resource_spans;
876
877 let estimated_capacity: usize = resource_spans
879 .iter()
880 .map(|rs| {
881 rs.scope_spans
882 .iter()
883 .map(|ss| ss.spans.len())
884 .sum::<usize>()
885 })
886 .sum();
887
888 let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
889 let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
890 let mut tags: HashSet<TagRecord> = HashSet::new();
891
892 for resource_span in resource_spans {
893 let service_name =
895 Self::get_service_name_from_resource(&resource_span.resource, "unknown");
896 let resource_attributes = Attribute::from_resources(&resource_span.resource);
897
898 for scope_span in &resource_span.scope_spans {
899 let (scope_name, scope_version) = Self::get_scope_info(scope_span);
900
901 for span in &scope_span.spans {
902 let trace_id = TraceId::from_slice(span.trace_id.as_slice())?;
904 let span_id = SpanId::from_slice(span.span_id.as_slice())?;
905 let parent_span_id = if !span.parent_span_id.is_empty() {
906 Some(SpanId::from_slice(span.parent_span_id.as_slice())?)
907 } else {
908 None
909 };
910
911 let (start_time, end_time, duration_ms) =
912 Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
913
914 let (cleaned_attributes, span_baggage, span_tags) = Self::process_attributes(
915 &trace_id,
916 &span.attributes,
917 &scope_name,
918 start_time,
919 )?;
920
921 baggage_records.extend(span_baggage);
923 tags.extend(span_tags);
924
925 let (input, output) = Self::extract_input_output(&cleaned_attributes);
926
927 span_records.push(TraceSpanRecord {
929 created_at: start_time,
930 trace_id,
931 span_id,
932 parent_span_id,
933 flags: span.flags as i32,
934 trace_state: span.trace_state.clone(),
935 scope_name: scope_name.clone(),
936 scope_version: scope_version.clone(),
937 span_name: span.name.clone(),
938 span_kind: Self::span_kind_to_string(span.kind),
939 start_time,
940 end_time,
941 duration_ms,
942 status_code: span.status.as_ref().map(|s| s.code).unwrap_or(0),
943 status_message: span
944 .status
945 .as_ref()
946 .map(|s| s.message.clone())
947 .unwrap_or_default(),
948 attributes: cleaned_attributes,
949 events: Self::events_to_json_array(&span.events)?,
950 links: Self::links_to_json_array(&span.links)?,
951 label: None,
952 input,
953 output,
954 service_name: service_name.clone(),
955 resource_attributes: resource_attributes.clone(),
956 });
957 }
958 }
959 }
960
961 let tag_records: Vec<TagRecord> = tags.into_iter().collect();
962 Ok((span_records, baggage_records, tag_records))
963 }
964}
965
966#[derive(Clone, Debug, Serialize, Deserialize, Default)]
967#[pyclass]
968pub struct Attribute {
969 #[pyo3(get)]
970 pub key: String,
971 pub value: Value,
972}
973
974#[pymethods]
975impl Attribute {
976 #[getter]
977 pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
978 Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
979 }
980
981 pub fn __str__(&self) -> String {
982 PyHelperFuncs::__str__(self)
983 }
984}
985
986impl Attribute {
987 pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
988 Attribute {
989 key,
990 value: otel_value_to_serde_value(value),
991 }
992 }
993
994 fn from_resources(
995 resource: &Option<opentelemetry_proto::tonic::resource::v1::Resource>,
996 ) -> Vec<Attribute> {
997 match resource {
998 Some(res) => res
999 .attributes
1000 .iter()
1001 .map(|kv| Attribute::from_otel_value(kv.key.clone(), kv.value.as_ref().unwrap()))
1002 .collect(),
1003 None => vec![],
1004 }
1005 }
1006}
1007
1008#[derive(Clone, Debug, Serialize, Deserialize)]
1009#[pyclass]
1010pub struct SpanEvent {
1011 #[pyo3(get)]
1012 pub timestamp: chrono::DateTime<Utc>,
1013 #[pyo3(get)]
1014 pub name: String,
1015 #[pyo3(get)]
1016 pub attributes: Vec<Attribute>,
1017 #[pyo3(get)]
1018 pub dropped_attributes_count: u32,
1019}
1020
1021#[pymethods]
1022impl SpanEvent {
1023 pub fn __str__(&self) -> String {
1024 PyHelperFuncs::__str__(self)
1025 }
1026}
1027
1028#[derive(Clone, Debug, Serialize, Deserialize)]
1029#[pyclass]
1030pub struct SpanLink {
1031 #[pyo3(get)]
1032 pub trace_id: String,
1033 #[pyo3(get)]
1034 pub span_id: String,
1035 #[pyo3(get)]
1036 pub trace_state: String,
1037 #[pyo3(get)]
1038 pub attributes: Vec<Attribute>,
1039 #[pyo3(get)]
1040 pub dropped_attributes_count: u32,
1041}
1042
1043#[pymethods]
1044impl SpanLink {
1045 pub fn __str__(&self) -> String {
1046 PyHelperFuncs::__str__(self)
1047 }
1048}
1049
1050#[derive(Clone, Debug, Serialize, Deserialize)]
1051#[pyclass]
1052pub struct Tag {
1053 #[pyo3(get)]
1054 pub key: String,
1055 #[pyo3(get)]
1056 pub value: String,
1057}
1058
1059#[pymethods]
1060impl Tag {
1061 pub fn __str__(&self) -> String {
1062 PyHelperFuncs::__str__(self)
1063 }
1064}
1065
1066#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
1067#[pyclass]
1068#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
1069pub struct TagRecord {
1070 #[pyo3(get)]
1071 pub entity_type: String,
1072 #[pyo3(get)]
1073 pub entity_id: String,
1074 #[pyo3(get)]
1075 pub key: String,
1076 #[pyo3(get)]
1077 pub value: String,
1078}
1079
1080impl TagRecord {
1081 pub fn from_trace(trace_id: &TraceId, key: String, value: String) -> Self {
1083 Self {
1084 entity_type: "trace".to_string(),
1085 entity_id: trace_id.to_hex(),
1086 key,
1087 value,
1088 }
1089 }
1090}
1091
1092#[pymethods]
1093impl TagRecord {
1094 pub fn __str__(&self) -> String {
1095 PyHelperFuncs::__str__(self)
1096 }
1097}
1098
1099pub fn build_trace_spans(records: Vec<TraceSpanRecord>) -> Vec<sql::TraceSpan> {
1104 if records.is_empty() {
1105 return Vec::new();
1106 }
1107
1108 let mut groups: HashMap<String, Vec<&TraceSpanRecord>> = HashMap::new();
1110 for record in &records {
1111 groups
1112 .entry(record.trace_id.to_hex())
1113 .or_default()
1114 .push(record);
1115 }
1116
1117 let mut all_spans = Vec::with_capacity(records.len());
1118 let mut global_order: i32 = 0;
1119
1120 for spans in groups.values() {
1121 let mut children: HashMap<[u8; 8], Vec<usize>> = HashMap::new();
1123 let mut root_indices: Vec<usize> = Vec::new();
1124
1125 for (i, span) in spans.iter().enumerate() {
1126 if let Some(pid) = &span.parent_span_id {
1127 children.entry(*pid.as_bytes()).or_default().push(i);
1128 } else {
1129 root_indices.push(i);
1130 }
1131 }
1132
1133 root_indices.sort_by_key(|&i| spans[i].start_time);
1135
1136 let root_span_id_hex = if let Some(&first_root) = root_indices.first() {
1138 spans[first_root].span_id.to_hex()
1139 } else {
1140 spans[0].span_id.to_hex()
1142 };
1143
1144 let pre_dfs_len = all_spans.len();
1146 dfs_assign_records(
1147 &root_indices,
1148 spans,
1149 &children,
1150 &root_span_id_hex,
1151 &mut all_spans,
1152 &mut global_order,
1153 );
1154
1155 let visited: HashSet<[u8; 8]> = all_spans[pre_dfs_len..]
1158 .iter()
1159 .filter_map(|s| {
1160 let bytes = SpanId::hex_to_bytes(&s.span_id).ok()?;
1161 let arr: [u8; 8] = bytes.try_into().ok()?;
1162 Some(arr)
1163 })
1164 .collect();
1165
1166 for span in spans {
1167 if !visited.contains(span.span_id.as_bytes()) {
1168 let span_id_hex = span.span_id.to_hex();
1169 all_spans.push(record_to_trace_span(
1170 span,
1171 &span_id_hex,
1172 &root_span_id_hex,
1173 0,
1174 vec![span_id_hex.clone()],
1175 global_order,
1176 ));
1177 global_order += 1;
1178 }
1179 }
1180 }
1181
1182 all_spans
1183}
1184
1185fn dfs_assign_records(
1187 root_indices: &[usize],
1188 spans: &[&TraceSpanRecord],
1189 children: &HashMap<[u8; 8], Vec<usize>>,
1190 root_span_id_hex: &str,
1191 result: &mut Vec<sql::TraceSpan>,
1192 span_order: &mut i32,
1193) {
1194 let mut stack: Vec<(usize, i32, Vec<String>)> = Vec::new();
1196 let mut visited: HashSet<usize> = HashSet::new();
1197
1198 for &idx in root_indices.iter().rev() {
1200 stack.push((idx, 0, Vec::new()));
1201 }
1202
1203 while let Some((idx, depth, path_so_far)) = stack.pop() {
1204 if !visited.insert(idx) {
1205 continue; }
1207 let span = spans[idx];
1208 let span_id_hex = span.span_id.to_hex();
1209
1210 let mut path = path_so_far;
1211 path.push(span_id_hex.clone());
1212
1213 result.push(record_to_trace_span(
1214 span,
1215 &span_id_hex,
1216 root_span_id_hex,
1217 depth,
1218 path.clone(),
1219 *span_order,
1220 ));
1221 *span_order += 1;
1222
1223 if let Some(child_indices) = children.get(span.span_id.as_bytes()) {
1225 let mut sorted = child_indices.clone();
1226 sorted.sort_by_key(|&i| spans[i].start_time);
1227 for &ci in sorted.iter().rev() {
1228 stack.push((ci, depth + 1, path.clone()));
1229 }
1230 }
1231 }
1232}
1233
1234fn record_to_trace_span(
1235 record: &TraceSpanRecord,
1236 span_id_hex: &str,
1237 root_span_id_hex: &str,
1238 depth: i32,
1239 path: Vec<String>,
1240 span_order: i32,
1241) -> sql::TraceSpan {
1242 let input = match &record.input {
1243 Value::Null => None,
1244 v => Some(v.clone()),
1245 };
1246 let output = match &record.output {
1247 Value::Null => None,
1248 v => Some(v.clone()),
1249 };
1250
1251 sql::TraceSpan {
1252 trace_id: record.trace_id.to_hex(),
1253 span_id: span_id_hex.to_string(),
1254 parent_span_id: record.parent_span_id.as_ref().map(|id| id.to_hex()),
1255 span_name: record.span_name.clone(),
1256 span_kind: Some(record.span_kind.clone()),
1257 start_time: record.start_time,
1258 end_time: record.end_time,
1259 duration_ms: record.duration_ms,
1260 status_code: record.status_code,
1261 status_message: Some(record.status_message.clone()),
1262 attributes: record.attributes.clone(),
1263 events: record.events.clone(),
1264 links: record.links.clone(),
1265 depth,
1266 path,
1267 root_span_id: root_span_id_hex.to_string(),
1268 service_name: record.service_name.clone(),
1269 span_order,
1270 input,
1271 output,
1272 }
1273}
1274
1275#[derive(Clone, Debug)]
1280pub struct TraceSummaryRecord {
1281 pub trace_id: TraceId,
1282 pub service_name: String,
1283 pub scope_name: String,
1284 pub scope_version: String,
1285 pub root_operation: String,
1286 pub start_time: DateTime<Utc>,
1287 pub end_time: Option<DateTime<Utc>>,
1288 pub status_code: i32,
1289 pub status_message: String,
1290 pub span_count: i64,
1291 pub error_count: i64,
1292 pub resource_attributes: Vec<Attribute>,
1293 pub entity_ids: Vec<String>,
1295 pub queue_ids: Vec<String>,
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302
1303 fn make_span_record(
1304 trace_id: [u8; 16],
1305 span_id: [u8; 8],
1306 parent_span_id: Option<[u8; 8]>,
1307 name: &str,
1308 start_ms: i64,
1309 ) -> TraceSpanRecord {
1310 TraceSpanRecord {
1311 trace_id: TraceId::from_bytes(trace_id),
1312 span_id: SpanId::from_bytes(span_id),
1313 parent_span_id: parent_span_id.map(SpanId::from_bytes),
1314 span_name: name.to_string(),
1315 start_time: chrono::DateTime::from_timestamp_millis(start_ms).unwrap(),
1316 end_time: chrono::DateTime::from_timestamp_millis(start_ms + 100).unwrap(),
1317 duration_ms: 100,
1318 ..Default::default()
1319 }
1320 }
1321
1322 #[test]
1323 fn build_trace_spans_empty() {
1324 let result = build_trace_spans(vec![]);
1325 assert!(result.is_empty());
1326 }
1327
1328 #[test]
1329 fn build_trace_spans_simple_tree() {
1330 let tid = [0u8; 16];
1331 let root_sid = [1u8; 8];
1332 let child_sid = [2u8; 8];
1333
1334 let records = vec![
1335 make_span_record(tid, root_sid, None, "root", 1000),
1336 make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1337 ];
1338
1339 let spans = build_trace_spans(records);
1340 assert_eq!(spans.len(), 2);
1341
1342 let root = spans.iter().find(|s| s.span_name == "root").unwrap();
1344 assert_eq!(root.depth, 0);
1345 assert_eq!(root.span_order, 0);
1346 assert!(root.parent_span_id.is_none());
1347 assert_eq!(root.path.len(), 1);
1348
1349 let child = spans.iter().find(|s| s.span_name == "child").unwrap();
1351 assert_eq!(child.depth, 1);
1352 assert_eq!(child.span_order, 1);
1353 assert!(child.parent_span_id.is_some());
1354 assert_eq!(child.path.len(), 2);
1355 assert_eq!(child.root_span_id, root.span_id);
1356 }
1357
1358 #[test]
1359 fn build_trace_spans_orphan_spans() {
1360 let tid = [0u8; 16];
1361 let orphan_sid = [3u8; 8];
1362 let missing_parent = [99u8; 8];
1364
1365 let records = vec![make_span_record(
1366 tid,
1367 orphan_sid,
1368 Some(missing_parent),
1369 "orphan",
1370 1000,
1371 )];
1372
1373 let spans = build_trace_spans(records);
1374 assert_eq!(spans.len(), 1);
1375
1376 let orphan = &spans[0];
1377 assert_eq!(orphan.span_name, "orphan");
1378 assert_eq!(orphan.depth, 0);
1379 }
1380
1381 #[test]
1382 fn build_trace_spans_multiple_traces() {
1383 let tid1 = [1u8; 16];
1384 let tid2 = [2u8; 16];
1385
1386 let records = vec![
1387 make_span_record(tid1, [10u8; 8], None, "trace1_root", 1000),
1388 make_span_record(tid2, [20u8; 8], None, "trace2_root", 2000),
1389 make_span_record(tid1, [11u8; 8], Some([10u8; 8]), "trace1_child", 1050),
1390 ];
1391
1392 let spans = build_trace_spans(records);
1393 assert_eq!(spans.len(), 3);
1394
1395 let t1_spans: Vec<_> = spans
1397 .iter()
1398 .filter(|s| s.trace_id == TraceId::from_bytes(tid1).to_hex())
1399 .collect();
1400 assert_eq!(t1_spans.len(), 2);
1401
1402 let t2_spans: Vec<_> = spans
1403 .iter()
1404 .filter(|s| s.trace_id == TraceId::from_bytes(tid2).to_hex())
1405 .collect();
1406 assert_eq!(t2_spans.len(), 1);
1407 }
1408
1409 #[test]
1410 fn build_trace_spans_deep_tree() {
1411 let tid = [0u8; 16];
1412 let root_sid = [1u8; 8];
1413 let child_sid = [2u8; 8];
1414 let grandchild_sid = [3u8; 8];
1415
1416 let records = vec![
1417 make_span_record(tid, root_sid, None, "root", 1000),
1418 make_span_record(tid, child_sid, Some(root_sid), "child", 1050),
1419 make_span_record(tid, grandchild_sid, Some(child_sid), "grandchild", 1100),
1420 ];
1421
1422 let spans = build_trace_spans(records);
1423 assert_eq!(spans.len(), 3);
1424
1425 let grandchild = spans.iter().find(|s| s.span_name == "grandchild").unwrap();
1426 assert_eq!(grandchild.depth, 2);
1427 assert_eq!(grandchild.path.len(), 3); }
1429
1430 #[test]
1431 fn build_trace_spans_cross_group_collision() {
1432 let tid1 = [1u8; 16];
1435 let tid2 = [2u8; 16];
1436 let shared_sid = [42u8; 8]; let records = vec![
1439 make_span_record(tid1, shared_sid, None, "trace1_root", 1000),
1440 make_span_record(tid2, shared_sid, None, "trace2_root", 2000),
1441 ];
1442
1443 let spans = build_trace_spans(records);
1444 assert_eq!(spans.len(), 2);
1446
1447 let names: HashSet<&str> = spans.iter().map(|s| s.span_name.as_str()).collect();
1448 assert!(names.contains("trace1_root"));
1449 assert!(names.contains("trace2_root"));
1450 }
1451
1452 #[test]
1453 fn build_trace_spans_input_output_mapping() {
1454 let tid = [0u8; 16];
1455 let records = vec![TraceSpanRecord {
1456 trace_id: TraceId::from_bytes(tid),
1457 span_id: SpanId::from_bytes([1u8; 8]),
1458 parent_span_id: None,
1459 span_name: "test".to_string(),
1460 input: serde_json::json!({"key": "value"}),
1461 output: Value::Null,
1462 ..Default::default()
1463 }];
1464
1465 let spans = build_trace_spans(records);
1466 assert_eq!(spans.len(), 1);
1467 assert!(spans[0].input.is_some());
1468 assert!(spans[0].output.is_none()); }
1470
1471 #[test]
1472 fn build_trace_spans_cycle_does_not_loop() {
1473 let tid = [0u8; 16];
1476 let span_a = [1u8; 8];
1477 let span_b = [2u8; 8];
1478
1479 let records = vec![
1483 make_span_record(tid, span_a, Some(span_b), "A", 1000),
1484 make_span_record(tid, span_b, Some(span_a), "B", 1050),
1485 ];
1486
1487 let spans = build_trace_spans(records);
1490 assert_eq!(spans.len(), 2, "Both spans should appear exactly once");
1491 }
1492}