1pub mod sql;
2
3use crate::error::RecordError;
4use crate::otel_value_to_serde_value;
5use crate::PyHelperFuncs;
6use crate::{json_to_pyobject, json_to_pyobject_value};
7use chrono::DateTime;
8use chrono::Utc;
9use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
10use opentelemetry_proto::tonic::common::v1::AnyValue;
11use opentelemetry_proto::tonic::common::v1::KeyValue;
12use opentelemetry_proto::tonic::trace::v1::span::Event;
13use opentelemetry_proto::tonic::trace::v1::span::Link;
14use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
15use opentelemetry_proto::tonic::trace::v1::Span;
16use pyo3::prelude::*;
17use pyo3::types::PyDict;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::cmp::{max, min};
21use std::collections::HashMap;
22
23pub const FUNCTION_TYPE: &str = "function.type";
24pub const FUNCTION_STREAMING: &str = "function.streaming";
25pub const FUNCTION_NAME: &str = "function.name";
26pub const FUNCTION_MODULE: &str = "function.module";
27pub const FUNCTION_QUALNAME: &str = "function.qualname";
28pub const SCOUTER_TRACING_INPUT: &str = "scouter.tracing.input";
29pub const SCOUTER_TRACING_OUTPUT: &str = "scouter.tracing.output";
30pub const SCOUTER_TRACING_LABEL: &str = "scouter.tracing.label";
31pub const SERVICE_NAME: &str = "service.name";
32pub const SCOUTER_TAG_PREFIX: &str = "scouter.tracing.tag";
33pub const BAGGAGE_PREFIX: &str = "baggage";
34pub const TRACE_START_TIME_KEY: &str = "scouter.tracing.start_time";
35
36#[derive(Clone, Debug, Serialize, Deserialize, Default)]
37#[pyclass]
38pub struct TraceRecord {
39 #[pyo3(get)]
40 pub created_at: DateTime<Utc>,
41 #[pyo3(get)]
42 pub trace_id: String,
43 #[pyo3(get)]
44 pub space: String,
45 #[pyo3(get)]
46 pub name: String,
47 #[pyo3(get)]
48 pub version: String,
49 #[pyo3(get)]
50 pub scope: String,
51 #[pyo3(get)]
52 pub trace_state: String,
53 #[pyo3(get)]
54 pub start_time: chrono::DateTime<Utc>,
55 #[pyo3(get)]
56 pub end_time: chrono::DateTime<Utc>,
57 #[pyo3(get)]
58 pub duration_ms: i64,
59 #[pyo3(get)]
60 pub status_code: i32,
61 #[pyo3(get)]
62 pub status_message: String,
63 #[pyo3(get)]
64 pub root_span_id: String,
65 #[pyo3(get)]
66 pub span_count: i32,
67 #[pyo3(get)]
68 pub tags: Vec<Tag>,
69}
70
71#[pymethods]
72impl TraceRecord {
73 pub fn __str__(&self) -> String {
74 PyHelperFuncs::__str__(self)
75 }
76}
77
78impl TraceRecord {
79 pub fn merge(&mut self, other: &TraceRecord) {
82 self.start_time = min(self.start_time, other.start_time);
84 self.end_time = max(self.end_time, other.end_time);
85
86 if self.end_time > self.start_time {
88 self.duration_ms = (self.end_time - self.start_time).num_milliseconds();
89 } else {
90 self.duration_ms = 0;
92 }
93
94 if self.status_code != 2 && other.status_code == 2 {
95 self.status_code = 2;
96 }
97
98 self.span_count += other.span_count;
99
100 let mut existing_tag_keys: std::collections::HashSet<String> =
101 self.tags.iter().map(|t| t.key.clone()).collect();
102
103 for tag in &other.tags {
104 if !existing_tag_keys.contains(&tag.key) {
105 self.tags.push(tag.clone());
106 existing_tag_keys.insert(tag.key.clone());
107 }
108 }
109 }
110}
111
112#[derive(Hash, Eq, PartialEq, Clone)]
113struct TraceKey {
114 created_at: chrono::DateTime<chrono::Utc>, trace_id: String,
116 scope: String,
117}
118
119pub fn deduplicate_and_merge_traces(raw_traces: Vec<TraceRecord>) -> Vec<TraceRecord> {
120 let mut merged_traces: HashMap<TraceKey, TraceRecord> = HashMap::new();
121
122 for trace in raw_traces {
123 let key = TraceKey {
124 created_at: trace.created_at,
125 trace_id: trace.trace_id.clone(),
126 scope: trace.scope.clone(),
127 };
128
129 merged_traces
130 .entry(key)
131 .and_modify(|existing_trace| {
132 existing_trace.merge(&trace);
133 })
134 .or_insert(trace);
135 }
136
137 merged_traces.into_values().collect()
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize, Default)]
141#[pyclass]
142pub struct TraceSpanRecord {
143 #[pyo3(get)]
144 pub created_at: chrono::DateTime<Utc>,
145 #[pyo3(get)]
146 pub span_id: String,
147 #[pyo3(get)]
148 pub trace_id: String,
149 #[pyo3(get)]
150 pub parent_span_id: Option<String>,
151 #[pyo3(get)]
152 pub space: String,
153 #[pyo3(get)]
154 pub name: String,
155 #[pyo3(get)]
156 pub version: String,
157 #[pyo3(get)]
158 pub scope: String,
159 #[pyo3(get)]
160 pub span_name: String,
161 #[pyo3(get)]
162 pub span_kind: String,
163 #[pyo3(get)]
164 pub start_time: chrono::DateTime<Utc>,
165 #[pyo3(get)]
166 pub end_time: chrono::DateTime<Utc>,
167 #[pyo3(get)]
168 pub duration_ms: i64,
169 #[pyo3(get)]
170 pub status_code: i32,
171 #[pyo3(get)]
172 pub status_message: String,
173 #[pyo3(get)]
174 pub attributes: Vec<Attribute>,
175 #[pyo3(get)]
176 pub events: Vec<SpanEvent>,
177 #[pyo3(get)]
178 pub links: Vec<SpanLink>,
179 #[pyo3(get)]
180 pub label: Option<String>,
181 pub input: Value,
182 pub output: Value,
183}
184
185#[pymethods]
186impl TraceSpanRecord {
187 #[getter]
188 pub fn get_input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
189 let dict = PyDict::new(py);
190 match &self.input {
191 Value::Null => {}
192 _ => {
193 json_to_pyobject(py, &self.input, &dict)?;
194 }
195 }
196 Ok(dict)
197 }
198
199 #[getter]
200 pub fn get_output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyDict>, RecordError> {
201 let dict = PyDict::new(py);
202 match &self.output {
203 Value::Null => {}
204 _ => {
205 json_to_pyobject(py, &self.output, &dict)?;
206 }
207 }
208 Ok(dict)
209 }
210
211 pub fn __str__(&self) -> String {
212 PyHelperFuncs::__str__(self)
214 }
215}
216
217#[derive(Clone, Debug, Serialize, Deserialize, Default)]
218#[pyclass]
219#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
220pub struct TraceBaggageRecord {
221 #[pyo3(get)]
222 pub created_at: DateTime<Utc>,
223 #[pyo3(get)]
224 pub trace_id: String,
225 #[pyo3(get)]
226 pub scope: String,
227 #[pyo3(get)]
228 pub key: String,
229 #[pyo3(get)]
230 pub value: String,
231}
232
233#[pymethods]
234impl TraceBaggageRecord {
235 pub fn __str__(&self) -> String {
236 PyHelperFuncs::__str__(self)
237 }
238}
239
240pub type TraceRecords = (
241 Vec<TraceRecord>,
242 Vec<TraceSpanRecord>,
243 Vec<TraceBaggageRecord>,
244);
245
246pub trait TraceRecordExt {
247 fn keyvalue_to_json_array<T: Serialize>(attributes: &Vec<T>) -> Result<Value, RecordError> {
248 Ok(serde_json::to_value(attributes).unwrap_or(Value::Array(vec![])))
249 }
250
251 fn attributes_to_json_array(attributes: &[KeyValue]) -> Result<Vec<Attribute>, RecordError> {
252 attributes
253 .iter()
254 .map(|kv| {
255 let value = match &kv.value {
256 Some(v) => otel_value_to_serde_value(v),
257 None => Value::Null,
258 };
259 Ok(Attribute {
260 key: kv.key.clone(),
261 value,
262 })
263 })
264 .collect()
265 }
266
267 fn events_to_json_array(attributes: &[Event]) -> Result<Vec<SpanEvent>, RecordError> {
268 attributes
269 .iter()
270 .map(|kv| {
271 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
272 Ok(SpanEvent {
273 name: kv.name.clone(),
274 timestamp: DateTime::<Utc>::from_timestamp_nanos(kv.time_unix_nano as i64),
275 attributes,
276 dropped_attributes_count: kv.dropped_attributes_count,
277 })
278 })
279 .collect()
280 }
281
282 fn links_to_json_array(attributes: &[Link]) -> Result<Vec<SpanLink>, RecordError> {
283 attributes
284 .iter()
285 .map(|kv| {
286 let attributes = Self::attributes_to_json_array(&kv.attributes)?;
287 Ok(SpanLink {
288 trace_id: hex::encode(&kv.trace_id),
289 span_id: hex::encode(&kv.span_id),
290 trace_state: kv.trace_state.clone(),
291 attributes,
292 dropped_attributes_count: kv.dropped_attributes_count,
293 })
294 })
295 .collect()
296 }
297
298 fn extract_tags(attributes: &[Attribute]) -> Result<Vec<Tag>, RecordError> {
309 let pattern = format!("{}.{}.", BAGGAGE_PREFIX, SCOUTER_TAG_PREFIX);
310
311 let tags: Result<Vec<Tag>, RecordError> = attributes
312 .iter()
313 .filter_map(|attr| {
314 attr.key.strip_prefix(&pattern).and_then(|tag_key| {
316 if tag_key.is_empty() {
318 tracing::warn!(
319 attribute_key = %attr.key,
320 "Skipping tag with empty key after prefix removal"
321 );
322 return None;
323 }
324
325 let value = match &attr.value {
326 Value::String(s) => s.clone(),
327 Value::Number(n) => n.to_string(),
328 Value::Bool(b) => b.to_string(),
329 Value::Null => "null".to_string(),
330
331 Value::Array(_) | Value::Object(_) => {
333 serde_json::to_string(&attr.value)
335 .unwrap_or_else(|_| format!("{:?}", attr.value))
336 }
337 };
338
339 Some(Ok(Tag {
340 key: tag_key.to_string(),
341 value,
342 }))
343 })
344 })
345 .collect();
346
347 tags
348 }
349}
350
351#[derive(Clone, Debug, Serialize, Deserialize, Default)]
352pub struct TraceServerRecord {
353 pub space: String,
354 pub name: String,
355 pub version: String,
356 pub request: ExportTraceServiceRequest,
357}
358
359impl TraceRecordExt for TraceServerRecord {}
360
361impl TraceServerRecord {
362 fn extract_time(start_time: u64, end_time: u64) -> (DateTime<Utc>, DateTime<Utc>, i64) {
371 let start_dt = Self::safe_timestamp_conversion(start_time);
373 let end_dt = Self::safe_timestamp_conversion(end_time);
374
375 let duration_ms = if end_time >= start_time {
377 let duration_nanos = end_time.saturating_sub(start_time);
378 (duration_nanos / 1_000_000).min(i64::MAX as u64) as i64
379 } else {
380 tracing::warn!(
381 start_time = start_time,
382 end_time = end_time,
383 "Invalid timestamp order detected in trace span"
384 );
385 0
386 };
387
388 (start_dt, end_dt, duration_ms)
389 }
390
391 fn safe_timestamp_conversion(timestamp_nanos: u64) -> DateTime<Utc> {
393 if timestamp_nanos <= i64::MAX as u64 {
394 DateTime::from_timestamp_nanos(timestamp_nanos as i64)
395 } else {
396 let seconds = timestamp_nanos / 1_000_000_000;
397 let nanoseconds = (timestamp_nanos % 1_000_000_000) as u32;
398
399 DateTime::from_timestamp(seconds as i64, nanoseconds).unwrap_or_else(|| {
400 tracing::warn!(
401 timestamp = timestamp_nanos,
402 seconds = seconds,
403 nanoseconds = nanoseconds,
404 "Failed to convert large timestamp, falling back to current time"
405 );
406 Utc::now()
407 })
408 }
409 }
410
411 fn span_kind_to_string(kind: i32) -> String {
413 SpanKind::try_from(kind)
414 .map(|sk| {
415 sk.as_str_name()
416 .strip_prefix("SPAN_KIND_")
417 .unwrap_or(sk.as_str_name())
418 })
419 .unwrap_or("UNSPECIFIED")
420 .to_string()
421 }
422
423 fn extract_input_output(attributes: &[Attribute]) -> (Value, Value) {
424 let mut input = Value::Null;
425 let mut output = Value::Null;
426
427 for attr in attributes {
428 if attr.key == SCOUTER_TRACING_INPUT {
429 if let Value::String(s) = &attr.value {
430 input = serde_json::from_str(s).unwrap_or_else(|e| {
431 tracing::warn!(
432 key = SCOUTER_TRACING_INPUT,
433 error = %e,
434 value = s,
435 "Failed to parse input attribute as JSON, falling back to string value."
436 );
437 Value::String(s.clone()) });
439 }
440 } else if attr.key == SCOUTER_TRACING_OUTPUT {
441 if let Value::String(s) = &attr.value {
442 output = serde_json::from_str(s)
443 .unwrap_or_else(|e| {
444 tracing::warn!(
445 key = SCOUTER_TRACING_OUTPUT,
446 error = %e,
447 value = s,
448 "Failed to parse output attribute as JSON, falling back to string value."
449 );
450 Value::String(s.clone()) });
452 }
453 }
454 }
455 (input, output)
456 }
457 #[allow(clippy::too_many_arguments)]
459 pub fn convert_to_trace_record(
460 &self,
461 trace_id: &str,
462 span_id: &str,
463 span: &Span,
464 scope_name: &str,
465 attributes: &Vec<Attribute>,
466 space: &str,
467 name: &str,
468 version: &str,
469 start_time: DateTime<Utc>,
470 end_time: DateTime<Utc>,
471 duration_ms: i64,
472 ) -> Result<TraceRecord, RecordError> {
473 Ok(TraceRecord {
474 created_at: Self::get_trace_start_time_attribute(attributes, &start_time),
475 trace_id: trace_id.to_string(),
476 space: space.to_owned(),
477 name: name.to_owned(),
478 version: version.to_owned(),
479 scope: scope_name.to_string(),
480 trace_state: span.trace_state.clone(),
481 start_time,
482 end_time,
483 duration_ms,
484 status_code: span.status.as_ref().map(|s| s.code).unwrap_or_else(|| 0),
485 status_message: span
486 .status
487 .as_ref()
488 .map(|s| s.message.clone())
489 .unwrap_or_default(),
490 root_span_id: span_id.to_string(),
491 tags: Self::extract_tags(attributes)?,
492 span_count: 1,
493 })
494 }
495
496 pub fn get_trace_start_time_attribute(
499 attributes: &Vec<Attribute>,
500 start_time: &DateTime<Utc>,
501 ) -> DateTime<Utc> {
502 for attr in attributes {
503 if attr.key == TRACE_START_TIME_KEY {
504 if let Value::String(s) = &attr.value {
505 if let Ok(dt) = s.parse::<chrono::DateTime<chrono::Utc>>() {
506 return dt;
507 }
508 }
509 }
510 }
511
512 tracing::warn!(
513 "Trace start time attribute not found or invalid, falling back to span start_time"
514 );
515 *start_time
516 }
517
518 pub fn convert_to_baggage_records(
519 trace_id: &str,
520 attributes: &Vec<Attribute>,
521 scope_name: &str,
522 ) -> Vec<TraceBaggageRecord> {
523 let baggage_kvs: Vec<(String, String)> = attributes
524 .iter()
525 .filter_map(|attr| {
526 if attr.key.starts_with(BAGGAGE_PREFIX) {
528 let clean_key = attr
529 .key
530 .strip_prefix(format!("{}.", BAGGAGE_PREFIX).as_str())
531 .map(|stripped| stripped.trim())
532 .unwrap_or(&attr.key)
533 .to_string();
534
535 let value_string = match &attr.value {
537 Value::String(s) => s.clone(),
538 Value::Number(n) => n.to_string(),
539 Value::Bool(b) => b.to_string(),
540 Value::Null => "null".to_string(),
541 Value::Array(_) | Value::Object(_) => {
542 serde_json::to_string(&attr.value)
544 .unwrap_or_else(|_| format!("{:?}", attr.value))
545 }
546 };
547
548 Some((clean_key, value_string))
549 } else {
550 None
551 }
552 })
553 .collect();
554
555 baggage_kvs
556 .into_iter()
557 .map(|(key, value)| TraceBaggageRecord {
558 created_at: Self::get_trace_start_time_attribute(attributes, &Utc::now()),
559 trace_id: trace_id.to_string(),
560 scope: scope_name.to_string(),
561 key,
562 value,
563 })
564 .collect()
565 }
566
567 #[allow(clippy::too_many_arguments)]
569 pub fn convert_to_span_record(
570 &self,
571 trace_id: &str,
572 span_id: &str,
573 span: &Span,
574 attributes: &Vec<Attribute>,
575 scope_name: &str,
576 space: &str,
577 name: &str,
578 version: &str,
579 start_time: DateTime<Utc>,
580 end_time: DateTime<Utc>,
581 duration_ms: i64,
582 ) -> Result<TraceSpanRecord, RecordError> {
583 let parent_span_id = if !span.parent_span_id.is_empty() {
585 Some(hex::encode(&span.parent_span_id))
586 } else {
587 None
588 };
589
590 let (input, output) = Self::extract_input_output(attributes);
591
592 Ok(TraceSpanRecord {
593 created_at: start_time,
594 trace_id: trace_id.to_string(),
595 span_id: span_id.to_string(),
596 parent_span_id,
597 start_time,
598 end_time,
599 duration_ms,
600 space: space.to_owned(),
601 name: name.to_owned(),
602 version: version.to_owned(),
603 scope: scope_name.to_string(),
604 span_name: span.name.clone(),
605 span_kind: Self::span_kind_to_string(span.kind),
606 status_code: span.status.as_ref().map(|s| s.code).unwrap_or_else(|| 0),
607 status_message: span
608 .status
609 .as_ref()
610 .map(|s| s.message.clone())
611 .unwrap_or_default(),
612 attributes: attributes.to_owned(),
613 events: Self::events_to_json_array(&span.events)?,
614 links: Self::links_to_json_array(&span.links)?,
615 label: None,
616 input,
617 output,
618 })
619 }
620
621 pub fn to_records(&self) -> Result<TraceRecords, RecordError> {
622 let resource_spans = &self.request.resource_spans;
623
624 let estimated_capacity: usize = resource_spans
626 .iter()
627 .map(|rs| {
628 rs.scope_spans
629 .iter()
630 .map(|ss| ss.spans.len())
631 .sum::<usize>()
632 })
633 .sum();
634
635 let mut trace_records: Vec<TraceRecord> = Vec::with_capacity(estimated_capacity);
636 let mut span_records: Vec<TraceSpanRecord> = Vec::with_capacity(estimated_capacity);
637 let mut baggage_records: Vec<TraceBaggageRecord> = Vec::new();
638
639 let space = &self.space;
640 let name = &self.name;
641 let version = &self.version;
642
643 for resource_span in resource_spans {
644 for scope_span in &resource_span.scope_spans {
645 let scope_name = &scope_span.scope.as_ref().map_or("", |s| &s.name);
647
648 for span in &scope_span.spans {
649 let attributes = Self::attributes_to_json_array(&span.attributes)?;
650 let trace_id = hex::encode(&span.trace_id);
651 let span_id = hex::encode(&span.span_id);
652
653 let (start_time, end_time, duration_ms) =
655 Self::extract_time(span.start_time_unix_nano, span.end_time_unix_nano);
656
657 trace_records.push(self.convert_to_trace_record(
659 &trace_id,
660 &span_id,
661 span,
662 scope_name,
663 &attributes,
664 space,
665 name,
666 version,
667 start_time,
668 end_time,
669 duration_ms,
670 )?);
671
672 span_records.push(self.convert_to_span_record(
674 &trace_id,
675 &span_id,
676 span,
677 &attributes,
678 scope_name,
679 space,
680 name,
681 version,
682 start_time,
683 end_time,
684 duration_ms,
685 )?);
686
687 baggage_records.extend(Self::convert_to_baggage_records(
689 &trace_id,
690 &attributes,
691 scope_name,
692 ));
693 }
694 }
695 }
696
697 trace_records.sort_by_key(|trace| trace.start_time);
699 let mut trace_records = deduplicate_and_merge_traces(trace_records);
700
701 trace_records.shrink_to_fit();
703 Ok((trace_records, span_records, baggage_records))
704 }
705}
706
707#[derive(Clone, Debug, Serialize, Deserialize, Default)]
708#[pyclass]
709pub struct Attribute {
710 #[pyo3(get)]
711 pub key: String,
712 pub value: Value,
713}
714
715#[pymethods]
716impl Attribute {
717 #[getter]
718 pub fn get_value<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, RecordError> {
719 Ok(json_to_pyobject_value(py, &self.value)?.bind(py).clone())
720 }
721
722 pub fn __str__(&self) -> String {
723 PyHelperFuncs::__str__(self)
724 }
725}
726
727impl Attribute {
728 pub fn from_otel_value(key: String, value: &AnyValue) -> Self {
729 Attribute {
730 key,
731 value: otel_value_to_serde_value(value),
732 }
733 }
734}
735
736#[derive(Clone, Debug, Serialize, Deserialize)]
737#[pyclass]
738pub struct SpanEvent {
739 #[pyo3(get)]
740 pub timestamp: chrono::DateTime<Utc>,
741 #[pyo3(get)]
742 pub name: String,
743 #[pyo3(get)]
744 pub attributes: Vec<Attribute>,
745 #[pyo3(get)]
746 pub dropped_attributes_count: u32,
747}
748
749#[pymethods]
750impl SpanEvent {
751 pub fn __str__(&self) -> String {
752 PyHelperFuncs::__str__(self)
753 }
754}
755
756#[derive(Clone, Debug, Serialize, Deserialize)]
757#[pyclass]
758pub struct SpanLink {
759 #[pyo3(get)]
760 pub trace_id: String,
761 #[pyo3(get)]
762 pub span_id: String,
763 #[pyo3(get)]
764 pub trace_state: String,
765 #[pyo3(get)]
766 pub attributes: Vec<Attribute>,
767 #[pyo3(get)]
768 pub dropped_attributes_count: u32,
769}
770
771#[pymethods]
772impl SpanLink {
773 pub fn __str__(&self) -> String {
774 PyHelperFuncs::__str__(self)
775 }
776}
777
778#[derive(Clone, Debug, Serialize, Deserialize)]
779#[pyclass]
780pub struct Tag {
781 #[pyo3(get)]
782 pub key: String,
783 #[pyo3(get)]
784 pub value: String,
785}
786
787#[pymethods]
788impl Tag {
789 pub fn __str__(&self) -> String {
790 PyHelperFuncs::__str__(self)
791 }
792}
793
794#[derive(Clone, Debug, Serialize, Deserialize)]
795#[pyclass]
796#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
797pub struct TagRecord {
798 #[pyo3(get)]
799 pub created_at: DateTime<Utc>,
800 #[pyo3(get)]
801 pub entity_type: String,
802 #[pyo3(get)]
803 pub entity_id: String,
804 #[pyo3(get)]
805 pub key: String,
806 #[pyo3(get)]
807 pub value: String,
808}
809
810#[pymethods]
811impl TagRecord {
812 pub fn __str__(&self) -> String {
813 PyHelperFuncs::__str__(self)
814 }
815}