use crate::error::TypeError;
use crate::json_to_pyobject_value;
use crate::trace::{Attribute, SpanEvent, SpanLink};
use crate::PyHelperFuncs;
use crate::TraceCursor;
use chrono::{DateTime, Utc};
use pyo3::prelude::*;
use pyo3::IntoPyObjectExt;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[cfg(feature = "server")]
use sqlx::{postgres::PgRow, FromRow, Row};
#[derive(Debug, Serialize, Deserialize, Clone)]
#[pyclass]
pub struct TraceListItem {
#[pyo3(get)]
pub trace_id: String,
#[pyo3(get)]
pub service_name: String,
#[pyo3(get)]
pub scope_name: String,
#[pyo3(get)]
pub scope_version: String,
#[pyo3(get)]
pub root_operation: String,
#[pyo3(get)]
pub start_time: DateTime<Utc>,
#[pyo3(get)]
pub end_time: Option<DateTime<Utc>>,
#[pyo3(get)]
pub duration_ms: Option<i64>,
#[pyo3(get)]
pub status_code: i32,
#[pyo3(get)]
pub status_message: Option<String>,
#[pyo3(get)]
pub span_count: i64,
#[pyo3(get)]
pub has_errors: bool,
#[pyo3(get)]
pub error_count: i64,
#[pyo3(get)]
pub resource_attributes: Vec<Attribute>,
#[pyo3(get)]
pub entity_ids: Vec<String>,
#[pyo3(get)]
pub queue_ids: Vec<String>,
}
#[cfg(feature = "server")]
impl FromRow<'_, PgRow> for TraceListItem {
fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
let resource_attributes: Vec<Attribute> =
serde_json::from_value(row.try_get("resource_attributes")?).unwrap_or_default();
Ok(TraceListItem {
trace_id: row.try_get("trace_id")?,
service_name: row.try_get("service_name")?,
scope_name: row.try_get("scope_name")?,
scope_version: row.try_get("scope_version")?,
root_operation: row.try_get("root_operation")?,
start_time: row.try_get("start_time")?,
end_time: row.try_get("end_time")?,
duration_ms: row.try_get("duration_ms")?,
status_code: row.try_get("status_code")?,
status_message: row.try_get("status_message")?,
span_count: row.try_get("span_count")?,
has_errors: row.try_get("has_errors")?,
error_count: row.try_get("error_count")?,
resource_attributes,
entity_ids: vec![],
queue_ids: vec![],
})
}
}
#[pymethods]
impl TraceListItem {
pub fn __str__(&self) -> String {
PyHelperFuncs::__str__(self)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[pyclass]
pub struct TraceSpan {
#[pyo3(get)]
pub trace_id: String,
#[pyo3(get)]
pub span_id: String,
#[pyo3(get)]
pub parent_span_id: Option<String>,
#[pyo3(get)]
pub span_name: String,
#[pyo3(get)]
pub span_kind: Option<String>,
#[pyo3(get)]
pub start_time: DateTime<Utc>,
#[pyo3(get)]
pub end_time: DateTime<Utc>,
#[pyo3(get)]
pub duration_ms: i64,
#[pyo3(get)]
pub status_code: i32,
#[pyo3(get)]
pub status_message: Option<String>,
#[pyo3(get)]
pub attributes: Vec<Attribute>,
#[pyo3(get)]
pub events: Vec<SpanEvent>,
#[pyo3(get)]
pub links: Vec<SpanLink>,
#[pyo3(get)]
pub depth: i32,
#[pyo3(get)]
pub path: Vec<String>,
#[pyo3(get)]
pub root_span_id: String,
#[pyo3(get)]
pub service_name: String,
#[pyo3(get)]
pub span_order: i32,
pub input: Option<Value>,
pub output: Option<Value>,
}
#[pymethods]
impl TraceSpan {
pub fn __str__(&self) -> String {
PyHelperFuncs::__str__(self)
}
#[getter]
pub fn input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TypeError> {
let input = match &self.input {
Some(v) => v,
None => &Value::Null,
};
Ok(json_to_pyobject_value(py, input)?
.into_bound_py_any(py)?
.clone())
}
#[getter]
pub fn output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TypeError> {
let output = match &self.output {
Some(v) => v,
None => &Value::Null,
};
Ok(json_to_pyobject_value(py, output)?
.into_bound_py_any(py)?
.clone())
}
}
#[cfg(feature = "server")]
impl FromRow<'_, PgRow> for TraceSpan {
fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
let attributes: Vec<Attribute> =
serde_json::from_value(row.try_get("attributes")?).unwrap_or_default();
let events: Vec<SpanEvent> =
serde_json::from_value(row.try_get("events")?).unwrap_or_default();
let links: Vec<SpanLink> =
serde_json::from_value(row.try_get("links")?).unwrap_or_default();
let input: Option<Value> = row.try_get("input")?;
let output: Option<Value> = row.try_get("output")?;
Ok(TraceSpan {
trace_id: row.try_get("trace_id")?,
span_id: row.try_get("span_id")?,
parent_span_id: row.try_get("parent_span_id")?,
span_name: row.try_get("span_name")?,
span_kind: row.try_get("span_kind")?,
start_time: row.try_get("start_time")?,
end_time: row.try_get("end_time")?,
duration_ms: row.try_get("duration_ms")?,
status_code: row.try_get("status_code")?,
status_message: row.try_get("status_message")?,
attributes,
events,
links,
depth: row.try_get("depth")?,
path: row.try_get("path")?,
root_span_id: row.try_get("root_span_id")?,
span_order: row.try_get("span_order")?,
input,
output,
service_name: row.try_get("service_name")?,
})
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[pyclass]
pub struct TraceFilters {
#[pyo3(get, set)]
pub service_name: Option<String>,
#[pyo3(get, set)]
pub has_errors: Option<bool>,
#[pyo3(get, set)]
pub status_code: Option<i32>,
#[pyo3(get, set)]
pub start_time: Option<DateTime<Utc>>,
#[pyo3(get, set)]
pub end_time: Option<DateTime<Utc>>,
#[pyo3(get, set)]
pub limit: Option<i32>,
#[pyo3(get, set)]
pub cursor_start_time: Option<DateTime<Utc>>,
#[pyo3(get, set)]
pub cursor_trace_id: Option<String>,
#[pyo3(get, set)]
pub direction: Option<String>,
#[pyo3(get, set)]
pub attribute_filters: Option<Vec<String>>,
#[pyo3(get, set)]
pub trace_ids: Option<Vec<String>>,
#[pyo3(get, set)]
pub entity_uid: Option<String>,
#[pyo3(get, set)]
pub queue_uid: Option<String>,
}
#[pymethods]
#[allow(clippy::too_many_arguments)]
impl TraceFilters {
#[new]
#[pyo3(signature = (
service_name=None,
has_errors=None,
status_code=None,
start_time=None,
end_time=None,
limit=None,
cursor_start_time=None,
cursor_trace_id=None,
attribute_filters=None,
trace_ids=None,
entity_uid=None,
queue_uid=None
))]
pub fn new(
service_name: Option<String>,
has_errors: Option<bool>,
status_code: Option<i32>,
start_time: Option<DateTime<Utc>>,
end_time: Option<DateTime<Utc>>,
limit: Option<i32>,
cursor_start_time: Option<DateTime<Utc>>,
cursor_trace_id: Option<String>,
attribute_filters: Option<Vec<String>>,
trace_ids: Option<Vec<String>>,
entity_uid: Option<String>,
queue_uid: Option<String>,
) -> Self {
TraceFilters {
service_name,
has_errors,
status_code,
start_time,
end_time,
limit,
cursor_start_time,
cursor_trace_id,
direction: None,
attribute_filters,
trace_ids,
entity_uid,
queue_uid,
}
}
}
impl TraceFilters {
pub fn with_service(mut self, service: impl Into<String>) -> Self {
self.service_name = Some(service.into());
self
}
pub fn with_errors_only(mut self) -> Self {
self.has_errors = Some(true);
self
}
pub fn with_time_range(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
self.start_time = Some(start);
self.end_time = Some(end);
self
}
pub fn with_cursor(mut self, started_at: DateTime<Utc>, trace_id: impl Into<String>) -> Self {
self.cursor_start_time = Some(started_at);
self.cursor_trace_id = Some(trace_id.into());
self
}
pub fn limit(mut self, limit: i32) -> Self {
self.limit = Some(limit);
self
}
pub fn next_page(mut self, cursor: &TraceCursor) -> Self {
self.cursor_start_time = Some(cursor.start_time);
self.cursor_trace_id = Some(cursor.trace_id.clone());
self.direction = Some("next".to_string());
self
}
pub fn first_page(mut self) -> Self {
self.cursor_start_time = None;
self.cursor_trace_id = None;
self
}
pub fn previous_page(mut self, cursor: &TraceCursor) -> Self {
self.cursor_start_time = Some(cursor.start_time);
self.cursor_trace_id = Some(cursor.trace_id.clone());
self.direction = Some("previous".to_string());
self
}
pub fn with_entity_uid(mut self, entity_uid: impl Into<String>) -> Self {
self.entity_uid = Some(entity_uid.into());
self
}
pub fn with_queue_uid(mut self, queue_uid: impl Into<String>) -> Self {
self.queue_uid = Some(queue_uid.into());
self
}
pub fn parsed_trace_ids(&self) -> Result<Option<Vec<Vec<u8>>>, TypeError> {
match &self.trace_ids {
None => Ok(None),
Some(ids) => {
let parsed: Result<Vec<Vec<u8>>, _> = ids
.iter()
.map(|hex| {
let bytes = hex::decode(hex)?;
if bytes.len() != 16 {
return Err(TypeError::InvalidLength(format!(
"Invalid trace_id length: expected 16 bytes, got {} for '{}'",
bytes.len(),
hex
)));
}
Ok(bytes)
})
.collect();
parsed.map(Some)
}
}
}
pub fn parsed_cursor_trace_id(&self) -> Result<Option<Vec<u8>>, TypeError> {
match &self.cursor_trace_id {
None => Ok(None),
Some(hex) => {
let bytes = hex::decode(hex)?;
if bytes.len() != 16 {
return Err(TypeError::InvalidLength(format!(
"Invalid cursor_trace_id length: expected 16 bytes, got {}",
bytes.len()
)));
}
Ok(Some(bytes))
}
}
}
}
#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[pyclass]
pub struct TraceMetricBucket {
#[pyo3(get)]
pub bucket_start: DateTime<Utc>,
#[pyo3(get)]
pub trace_count: i64,
#[pyo3(get)]
pub avg_duration_ms: f64,
#[pyo3(get)]
pub p50_duration_ms: Option<f64>,
#[pyo3(get)]
pub p95_duration_ms: Option<f64>,
#[pyo3(get)]
pub p99_duration_ms: Option<f64>,
#[pyo3(get)]
pub error_rate: f64,
}
#[pymethods]
impl TraceMetricBucket {
pub fn __str__(&self) -> String {
PyHelperFuncs::__str__(self)
}
}