Skip to main content

scouter_types/trace/
sql.rs

1use crate::error::TypeError;
2use crate::json_to_pyobject_value;
3use crate::trace::{Attribute, SpanEvent, SpanLink};
4use crate::PyHelperFuncs;
5use crate::TraceCursor;
6use chrono::{DateTime, Utc};
7use pyo3::prelude::*;
8use pyo3::IntoPyObjectExt;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11#[cfg(feature = "server")]
12use sqlx::{postgres::PgRow, FromRow, Row};
13
14#[derive(Debug, Serialize, Deserialize, Clone)]
15#[pyclass]
16pub struct TraceListItem {
17    #[pyo3(get)]
18    pub trace_id: String,
19    #[pyo3(get)]
20    pub service_name: String,
21    #[pyo3(get)]
22    pub scope_name: String,
23    #[pyo3(get)]
24    pub scope_version: String,
25    #[pyo3(get)]
26    pub root_operation: String,
27    #[pyo3(get)]
28    pub start_time: DateTime<Utc>,
29    #[pyo3(get)]
30    pub end_time: Option<DateTime<Utc>>,
31    #[pyo3(get)]
32    pub duration_ms: Option<i64>,
33    #[pyo3(get)]
34    pub status_code: i32,
35    #[pyo3(get)]
36    pub status_message: Option<String>,
37    #[pyo3(get)]
38    pub span_count: i64,
39    #[pyo3(get)]
40    pub has_errors: bool,
41    #[pyo3(get)]
42    pub error_count: i64,
43    #[pyo3(get)]
44    pub resource_attributes: Vec<Attribute>,
45    #[pyo3(get)]
46    pub entity_ids: Vec<String>,
47    #[pyo3(get)]
48    pub queue_ids: Vec<String>,
49}
50
51#[cfg(feature = "server")]
52impl FromRow<'_, PgRow> for TraceListItem {
53    fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
54        let resource_attributes: Vec<Attribute> =
55            serde_json::from_value(row.try_get("resource_attributes")?).unwrap_or_default();
56        Ok(TraceListItem {
57            trace_id: row.try_get("trace_id")?,
58            service_name: row.try_get("service_name")?,
59            scope_name: row.try_get("scope_name")?,
60            scope_version: row.try_get("scope_version")?,
61            root_operation: row.try_get("root_operation")?,
62            start_time: row.try_get("start_time")?,
63            end_time: row.try_get("end_time")?,
64            duration_ms: row.try_get("duration_ms")?,
65            status_code: row.try_get("status_code")?,
66            status_message: row.try_get("status_message")?,
67            span_count: row.try_get("span_count")?,
68            has_errors: row.try_get("has_errors")?,
69            error_count: row.try_get("error_count")?,
70            resource_attributes,
71            entity_ids: vec![],
72            queue_ids: vec![],
73        })
74    }
75}
76
77#[pymethods]
78impl TraceListItem {
79    pub fn __str__(&self) -> String {
80        PyHelperFuncs::__str__(self)
81    }
82}
83
84#[derive(Debug, Serialize, Deserialize, Clone)]
85#[pyclass]
86pub struct TraceSpan {
87    #[pyo3(get)]
88    pub trace_id: String,
89    #[pyo3(get)]
90    pub span_id: String,
91    #[pyo3(get)]
92    pub parent_span_id: Option<String>,
93    #[pyo3(get)]
94    pub span_name: String,
95    #[pyo3(get)]
96    pub span_kind: Option<String>,
97    #[pyo3(get)]
98    pub start_time: DateTime<Utc>,
99    #[pyo3(get)]
100    pub end_time: DateTime<Utc>,
101    #[pyo3(get)]
102    pub duration_ms: i64,
103    #[pyo3(get)]
104    pub status_code: i32,
105    #[pyo3(get)]
106    pub status_message: Option<String>,
107    #[pyo3(get)]
108    pub attributes: Vec<Attribute>,
109    #[pyo3(get)]
110    pub events: Vec<SpanEvent>,
111    #[pyo3(get)]
112    pub links: Vec<SpanLink>,
113    #[pyo3(get)]
114    pub depth: i32,
115    #[pyo3(get)]
116    pub path: Vec<String>,
117    #[pyo3(get)]
118    pub root_span_id: String,
119    #[pyo3(get)]
120    pub service_name: String,
121    #[pyo3(get)]
122    pub span_order: i32,
123    pub input: Option<Value>,
124    pub output: Option<Value>,
125}
126
127#[pymethods]
128impl TraceSpan {
129    pub fn __str__(&self) -> String {
130        PyHelperFuncs::__str__(self)
131    }
132
133    #[getter]
134    pub fn input<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TypeError> {
135        let input = match &self.input {
136            Some(v) => v,
137            None => &Value::Null,
138        };
139        Ok(json_to_pyobject_value(py, input)?
140            .into_bound_py_any(py)?
141            .clone())
142    }
143
144    #[getter]
145    pub fn output<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TypeError> {
146        let output = match &self.output {
147            Some(v) => v,
148            None => &Value::Null,
149        };
150        Ok(json_to_pyobject_value(py, output)?
151            .into_bound_py_any(py)?
152            .clone())
153    }
154}
155
156#[cfg(feature = "server")]
157impl FromRow<'_, PgRow> for TraceSpan {
158    fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
159        let attributes: Vec<Attribute> =
160            serde_json::from_value(row.try_get("attributes")?).unwrap_or_default();
161        let events: Vec<SpanEvent> =
162            serde_json::from_value(row.try_get("events")?).unwrap_or_default();
163        let links: Vec<SpanLink> =
164            serde_json::from_value(row.try_get("links")?).unwrap_or_default();
165        let input: Option<Value> = row.try_get("input")?;
166        let output: Option<Value> = row.try_get("output")?;
167
168        Ok(TraceSpan {
169            trace_id: row.try_get("trace_id")?,
170            span_id: row.try_get("span_id")?,
171            parent_span_id: row.try_get("parent_span_id")?,
172            span_name: row.try_get("span_name")?,
173            span_kind: row.try_get("span_kind")?,
174            start_time: row.try_get("start_time")?,
175            end_time: row.try_get("end_time")?,
176            duration_ms: row.try_get("duration_ms")?,
177            status_code: row.try_get("status_code")?,
178            status_message: row.try_get("status_message")?,
179            attributes,
180            events,
181            links,
182            depth: row.try_get("depth")?,
183            path: row.try_get("path")?,
184            root_span_id: row.try_get("root_span_id")?,
185            span_order: row.try_get("span_order")?,
186            input,
187            output,
188            service_name: row.try_get("service_name")?,
189        })
190    }
191}
192
193#[derive(Debug, Default, Clone, Serialize, Deserialize)]
194#[pyclass]
195pub struct TraceFilters {
196    #[pyo3(get, set)]
197    pub service_name: Option<String>,
198    #[pyo3(get, set)]
199    pub has_errors: Option<bool>,
200    #[pyo3(get, set)]
201    pub status_code: Option<i32>,
202    #[pyo3(get, set)]
203    pub start_time: Option<DateTime<Utc>>,
204    #[pyo3(get, set)]
205    pub end_time: Option<DateTime<Utc>>,
206    #[pyo3(get, set)]
207    pub limit: Option<i32>,
208    #[pyo3(get, set)]
209    pub cursor_start_time: Option<DateTime<Utc>>,
210    #[pyo3(get, set)]
211    pub cursor_trace_id: Option<String>,
212    #[pyo3(get, set)]
213    pub direction: Option<String>,
214    #[pyo3(get, set)]
215    pub attribute_filters: Option<Vec<String>>,
216    #[pyo3(get, set)]
217    pub trace_ids: Option<Vec<String>>,
218    #[pyo3(get, set)]
219    pub entity_uid: Option<String>,
220    #[pyo3(get, set)]
221    pub queue_uid: Option<String>,
222}
223
224#[pymethods]
225#[allow(clippy::too_many_arguments)]
226impl TraceFilters {
227    #[new]
228    #[pyo3(signature = (
229        service_name=None,
230        has_errors=None,
231        status_code=None,
232        start_time=None,
233        end_time=None,
234        limit=None,
235        cursor_start_time=None,
236        cursor_trace_id=None,
237        attribute_filters=None,
238        trace_ids=None,
239        entity_uid=None,
240        queue_uid=None
241    ))]
242    pub fn new(
243        service_name: Option<String>,
244        has_errors: Option<bool>,
245        status_code: Option<i32>,
246        start_time: Option<DateTime<Utc>>,
247        end_time: Option<DateTime<Utc>>,
248        limit: Option<i32>,
249        cursor_start_time: Option<DateTime<Utc>>,
250        cursor_trace_id: Option<String>,
251        attribute_filters: Option<Vec<String>>,
252        trace_ids: Option<Vec<String>>,
253        entity_uid: Option<String>,
254        queue_uid: Option<String>,
255    ) -> Self {
256        TraceFilters {
257            service_name,
258            has_errors,
259            status_code,
260            start_time,
261            end_time,
262            limit,
263            cursor_start_time,
264            cursor_trace_id,
265            direction: None,
266            attribute_filters,
267            trace_ids,
268            entity_uid,
269            queue_uid,
270        }
271    }
272}
273
274impl TraceFilters {
275    pub fn with_service(mut self, service: impl Into<String>) -> Self {
276        self.service_name = Some(service.into());
277        self
278    }
279
280    pub fn with_errors_only(mut self) -> Self {
281        self.has_errors = Some(true);
282        self
283    }
284
285    pub fn with_time_range(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
286        self.start_time = Some(start);
287        self.end_time = Some(end);
288        self
289    }
290
291    pub fn with_cursor(mut self, started_at: DateTime<Utc>, trace_id: impl Into<String>) -> Self {
292        self.cursor_start_time = Some(started_at);
293        self.cursor_trace_id = Some(trace_id.into());
294        self
295    }
296
297    pub fn limit(mut self, limit: i32) -> Self {
298        self.limit = Some(limit);
299        self
300    }
301
302    pub fn next_page(mut self, cursor: &TraceCursor) -> Self {
303        self.cursor_start_time = Some(cursor.start_time);
304        self.cursor_trace_id = Some(cursor.trace_id.clone());
305        self.direction = Some("next".to_string());
306        self
307    }
308
309    pub fn first_page(mut self) -> Self {
310        self.cursor_start_time = None;
311        self.cursor_trace_id = None;
312        self
313    }
314
315    pub fn previous_page(mut self, cursor: &TraceCursor) -> Self {
316        self.cursor_start_time = Some(cursor.start_time);
317        self.cursor_trace_id = Some(cursor.trace_id.clone());
318        self.direction = Some("previous".to_string());
319        self
320    }
321
322    pub fn with_entity_uid(mut self, entity_uid: impl Into<String>) -> Self {
323        self.entity_uid = Some(entity_uid.into());
324        self
325    }
326
327    pub fn with_queue_uid(mut self, queue_uid: impl Into<String>) -> Self {
328        self.queue_uid = Some(queue_uid.into());
329        self
330    }
331
332    /// Convert hex trace_ids directly to byte vectors for PostgreSQL BYTEA binding
333    /// Returns None if no trace_ids are set
334    /// Returns error if any trace_id is invalid hex or wrong length
335    pub fn parsed_trace_ids(&self) -> Result<Option<Vec<Vec<u8>>>, TypeError> {
336        match &self.trace_ids {
337            None => Ok(None),
338            Some(ids) => {
339                let parsed: Result<Vec<Vec<u8>>, _> = ids
340                    .iter()
341                    .map(|hex| {
342                        let bytes = hex::decode(hex)?;
343
344                        if bytes.len() != 16 {
345                            return Err(TypeError::InvalidLength(format!(
346                                "Invalid trace_id length: expected 16 bytes, got {} for '{}'",
347                                bytes.len(),
348                                hex
349                            )));
350                        }
351
352                        Ok(bytes)
353                    })
354                    .collect();
355
356                parsed.map(Some)
357            }
358        }
359    }
360
361    /// Convert hex cursor_trace_id directly to bytes for PostgreSQL BYTEA binding
362    pub fn parsed_cursor_trace_id(&self) -> Result<Option<Vec<u8>>, TypeError> {
363        match &self.cursor_trace_id {
364            None => Ok(None),
365            Some(hex) => {
366                let bytes = hex::decode(hex)?;
367
368                if bytes.len() != 16 {
369                    return Err(TypeError::InvalidLength(format!(
370                        "Invalid cursor_trace_id length: expected 16 bytes, got {}",
371                        bytes.len()
372                    )));
373                }
374
375                Ok(Some(bytes))
376            }
377        }
378    }
379}
380
381#[cfg_attr(feature = "server", derive(sqlx::FromRow))]
382#[derive(Debug, Serialize, Deserialize, Clone)]
383#[pyclass]
384pub struct TraceMetricBucket {
385    #[pyo3(get)]
386    pub bucket_start: DateTime<Utc>,
387    #[pyo3(get)]
388    pub trace_count: i64,
389    #[pyo3(get)]
390    pub avg_duration_ms: f64,
391    #[pyo3(get)]
392    pub p50_duration_ms: Option<f64>,
393    #[pyo3(get)]
394    pub p95_duration_ms: Option<f64>,
395    #[pyo3(get)]
396    pub p99_duration_ms: Option<f64>,
397    #[pyo3(get)]
398    pub error_rate: f64,
399}
400
401#[pymethods]
402impl TraceMetricBucket {
403    pub fn __str__(&self) -> String {
404        PyHelperFuncs::__str__(self)
405    }
406}