Skip to main content

sc_observability_types/
query.rs

1use std::sync::LazyLock;
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use thiserror::Error;
6
7use crate::{
8    ActionName, CorrelationId, Diagnostic, DiagnosticInfo, ErrorCode, ErrorContext, Level,
9    LogEvent, Remediation, ServiceName, TargetCategory, Timestamp, error_codes, sealed,
10};
11
12/// Deterministic result ordering for historical query and follow polling.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
14pub enum LogOrder {
15    #[default]
16    /// Return records from oldest to newest.
17    OldestFirst,
18    /// Return records from newest to oldest.
19    NewestFirst,
20}
21
22/// One exact-match field filter in a historical/follow log query.
23#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24pub struct LogFieldMatch {
25    /// Structured field name to compare.
26    pub field: String,
27    /// Exact JSON value to match.
28    pub value: Value,
29}
30
31impl LogFieldMatch {
32    /// Creates an exact-value field match.
33    #[must_use]
34    pub fn equals(field: impl Into<String>, value: Value) -> Self {
35        Self {
36            field: field.into(),
37            value,
38        }
39    }
40}
41
42/// Shared historical/follow query contract used by the logging reader/runtime layers.
43#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
44pub struct LogQuery {
45    /// Optional service filter.
46    pub service: Option<ServiceName>,
47    /// Allowed severity levels; empty means any level.
48    pub levels: Vec<Level>,
49    /// Optional target/category filter.
50    pub target: Option<TargetCategory>,
51    /// Optional action filter.
52    pub action: Option<ActionName>,
53    /// Optional request identifier filter.
54    pub request_id: Option<CorrelationId>,
55    /// Optional correlation identifier filter.
56    pub correlation_id: Option<CorrelationId>,
57    /// Optional inclusive lower timestamp bound.
58    pub since: Option<Timestamp>,
59    /// Optional inclusive upper timestamp bound.
60    pub until: Option<Timestamp>,
61    /// Exact-match field predicates.
62    pub field_matches: Vec<LogFieldMatch>,
63    /// Optional maximum number of returned events.
64    pub limit: Option<usize>,
65    /// Result ordering.
66    pub order: LogOrder,
67}
68
69impl LogQuery {
70    /// Validates the frozen shared query semantics before runtime execution.
71    ///
72    /// # Errors
73    ///
74    /// Returns [`QueryError::InvalidQuery`] when the query violates the shared
75    /// contract for limit or timestamp-range semantics.
76    pub fn validate(&self) -> Result<(), QueryError> {
77        if self.limit == Some(0) {
78            return Err(QueryError::invalid_query(
79                "query limit must be greater than zero when provided",
80            ));
81        }
82
83        if matches!((self.since, self.until), (Some(since), Some(until)) if since > until) {
84            return Err(QueryError::invalid_query(
85                "query since timestamp must be less than or equal to until",
86            ));
87        }
88
89        if self
90            .field_matches
91            .iter()
92            .any(|field_match| field_match.field.trim().is_empty())
93        {
94            return Err(QueryError::invalid_query(
95                "query field match names must not be empty",
96            ));
97        }
98
99        Ok(())
100    }
101}
102
103/// Stable synchronous result contract returned by query/follow polling surfaces.
104#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
105pub struct LogSnapshot {
106    /// Matching events returned by the query or poll.
107    pub events: Vec<LogEvent>,
108    /// Whether the result set was truncated by the configured limit.
109    pub truncated: bool,
110}
111
112/// Stable shared error contract for historical query and follow operations.
113#[derive(Debug, PartialEq, Serialize, Deserialize, Error)]
114pub enum QueryError {
115    #[error("{0}")]
116    /// The query contract was invalid before execution.
117    InvalidQuery(#[source] Box<ErrorContext>),
118    #[error("{0}")]
119    /// I/O failed while reading log data.
120    Io(#[source] Box<ErrorContext>),
121    #[error("{0}")]
122    /// A JSONL record failed to decode.
123    Decode(#[source] Box<ErrorContext>),
124    #[error("{0}")]
125    /// Query or follow is unavailable in the current runtime state.
126    Unavailable(#[source] Box<ErrorContext>),
127    #[error("query runtime shut down")]
128    /// The query runtime was shut down.
129    Shutdown,
130}
131
132impl QueryError {
133    /// Returns the stable machine-readable error code for this variant.
134    #[must_use]
135    pub fn code(&self) -> ErrorCode {
136        match self {
137            Self::InvalidQuery(_) => error_codes::SC_LOG_QUERY_INVALID_QUERY,
138            Self::Io(_) => error_codes::SC_LOG_QUERY_IO,
139            Self::Decode(_) => error_codes::SC_LOG_QUERY_DECODE,
140            Self::Unavailable(_) => error_codes::SC_LOG_QUERY_UNAVAILABLE,
141            Self::Shutdown => error_codes::SC_LOG_QUERY_SHUTDOWN,
142        }
143    }
144
145    /// Returns the attached diagnostic for the error.
146    #[must_use]
147    pub fn diagnostic(&self) -> &Diagnostic {
148        match self {
149            Self::InvalidQuery(context)
150            | Self::Io(context)
151            | Self::Decode(context)
152            | Self::Unavailable(context) => context.diagnostic(),
153            Self::Shutdown => shutdown_diagnostic(),
154        }
155    }
156
157    /// Builds an invalid-query error using the stable shared code.
158    #[must_use]
159    pub fn invalid_query(message: impl Into<String>) -> Self {
160        Self::InvalidQuery(Box::new(ErrorContext::new(
161            error_codes::SC_LOG_QUERY_INVALID_QUERY,
162            message,
163            Remediation::recoverable("correct the query parameters", ["retry the query"]),
164        )))
165    }
166}
167
168impl sealed::Sealed for QueryError {}
169
170impl DiagnosticInfo for QueryError {
171    fn diagnostic(&self) -> &Diagnostic {
172        self.diagnostic()
173    }
174}
175
176fn shutdown_diagnostic() -> &'static Diagnostic {
177    static DIAGNOSTIC: LazyLock<Diagnostic> = LazyLock::new(|| Diagnostic {
178        timestamp: Timestamp::UNIX_EPOCH,
179        code: error_codes::SC_LOG_QUERY_SHUTDOWN,
180        message: "query runtime shut down".to_string(),
181        cause: None,
182        remediation: Remediation::recoverable("restart the logger", ["retry"]),
183        docs: None,
184        details: serde_json::Map::new(),
185    });
186
187    &DIAGNOSTIC
188}
189
190#[cfg(test)]
191mod tests {
192    use serde_json::{Map, json};
193    use time::Duration;
194
195    use super::*;
196    use crate::{DiagnosticSummary, ProcessIdentity, TraceContext};
197
198    fn service_name() -> ServiceName {
199        ServiceName::new("sc-observability").expect("valid service name")
200    }
201
202    fn target_category() -> TargetCategory {
203        TargetCategory::new("logging.query").expect("valid target category")
204    }
205
206    fn action_name() -> ActionName {
207        ActionName::new("query.executed").expect("valid action name")
208    }
209
210    fn trace_context() -> TraceContext {
211        TraceContext {
212            trace_id: crate::TraceId::new("0123456789abcdef0123456789abcdef")
213                .expect("valid trace id"),
214            span_id: crate::SpanId::new("0123456789abcdef").expect("valid span id"),
215            parent_span_id: None,
216        }
217    }
218
219    fn diagnostic(code: ErrorCode, message: &str) -> Diagnostic {
220        Diagnostic {
221            timestamp: Timestamp::UNIX_EPOCH,
222            code,
223            message: message.to_string(),
224            cause: Some("root cause".to_string()),
225            remediation: Remediation::recoverable("fix input", ["retry"]),
226            docs: Some("https://example.test/query".to_string()),
227            details: Map::from_iter([("line".to_string(), json!(12))]),
228        }
229    }
230
231    fn log_event() -> LogEvent {
232        LogEvent {
233            version: crate::SchemaVersion::new("v1").expect("valid schema version"),
234            timestamp: Timestamp::UNIX_EPOCH,
235            level: Level::Info,
236            service: service_name(),
237            target: target_category(),
238            action: action_name(),
239            message: Some("query event".to_string()),
240            identity: ProcessIdentity::default(),
241            trace: Some(trace_context()),
242            request_id: Some(CorrelationId::new("req-1").expect("valid request id")),
243            correlation_id: Some(CorrelationId::new("corr-1").expect("valid correlation id")),
244            outcome: Some(crate::OutcomeLabel::new("success").expect("valid outcome")),
245            diagnostic: None,
246            state_transition: None,
247            fields: Map::from_iter([("status".to_string(), json!("ok"))]),
248        }
249    }
250
251    #[test]
252    fn log_query_round_trips_through_serde() {
253        let query = LogQuery {
254            service: Some(service_name()),
255            levels: vec![Level::Info, Level::Warn],
256            target: Some(target_category()),
257            action: Some(action_name()),
258            request_id: Some(CorrelationId::new("req-1").expect("valid request id")),
259            correlation_id: Some(CorrelationId::new("corr-1").expect("valid correlation id")),
260            since: Some(Timestamp::UNIX_EPOCH),
261            until: Some(Timestamp::UNIX_EPOCH + Duration::minutes(5)),
262            field_matches: vec![LogFieldMatch::equals("status", json!("ok"))],
263            limit: Some(25),
264            order: LogOrder::NewestFirst,
265        };
266
267        let encoded = serde_json::to_value(&query).expect("serialize query");
268        let decoded: LogQuery = serde_json::from_value(encoded).expect("deserialize query");
269        assert_eq!(decoded, query);
270        decoded.validate().expect("valid query");
271    }
272
273    #[test]
274    fn log_query_validation_rejects_invalid_ranges_and_limits() {
275        let invalid_limit = LogQuery {
276            limit: Some(0),
277            ..LogQuery::default()
278        };
279        let invalid_range = LogQuery {
280            since: Some(Timestamp::UNIX_EPOCH + Duration::hours(1)),
281            until: Some(Timestamp::UNIX_EPOCH),
282            ..LogQuery::default()
283        };
284        let invalid_field = LogQuery {
285            field_matches: vec![LogFieldMatch::equals("", json!("ok"))],
286            ..LogQuery::default()
287        };
288
289        assert_eq!(
290            invalid_limit.validate().expect_err("invalid limit").code(),
291            error_codes::SC_LOG_QUERY_INVALID_QUERY
292        );
293        assert_eq!(
294            invalid_range.validate().expect_err("invalid range").code(),
295            error_codes::SC_LOG_QUERY_INVALID_QUERY
296        );
297        assert_eq!(
298            invalid_field
299                .validate()
300                .expect_err("invalid field name")
301                .code(),
302            error_codes::SC_LOG_QUERY_INVALID_QUERY
303        );
304    }
305
306    #[test]
307    fn log_snapshot_round_trips_through_serde() {
308        let snapshot = LogSnapshot {
309            events: vec![log_event()],
310            truncated: true,
311        };
312
313        let encoded = serde_json::to_value(&snapshot).expect("serialize snapshot");
314        let decoded: LogSnapshot = serde_json::from_value(encoded).expect("deserialize snapshot");
315        assert_eq!(decoded, snapshot);
316    }
317
318    #[test]
319    fn query_error_variants_round_trip_and_match_stable_codes() {
320        let variants = [
321            QueryError::InvalidQuery(Box::new(ErrorContext::new(
322                error_codes::SC_LOG_QUERY_INVALID_QUERY,
323                "invalid query",
324                Remediation::recoverable("fix the query", ["retry"]),
325            ))),
326            QueryError::Io(Box::new(ErrorContext::new(
327                error_codes::SC_LOG_QUERY_IO,
328                "i/o failure",
329                Remediation::recoverable("check the log path", ["retry"]),
330            ))),
331            QueryError::Decode(Box::new(ErrorContext::new(
332                error_codes::SC_LOG_QUERY_DECODE,
333                "decode failure",
334                Remediation::recoverable("repair the malformed record", ["retry"]),
335            ))),
336            QueryError::Unavailable(Box::new(ErrorContext::new(
337                error_codes::SC_LOG_QUERY_UNAVAILABLE,
338                "query unavailable",
339                Remediation::recoverable("wait for logging to recover", ["retry"]),
340            ))),
341            QueryError::Shutdown,
342        ];
343
344        for error in variants {
345            let encoded = serde_json::to_value(&error).expect("serialize error");
346            let decoded: QueryError = serde_json::from_value(encoded).expect("deserialize error");
347            assert_eq!(decoded, error);
348            assert_eq!(decoded.diagnostic().code, decoded.code());
349        }
350    }
351
352    #[test]
353    fn query_shutdown_uses_static_diagnostic() {
354        let error = QueryError::Shutdown;
355        assert_eq!(error.code(), error_codes::SC_LOG_QUERY_SHUTDOWN);
356        assert_eq!(error.diagnostic().message, "query runtime shut down");
357    }
358
359    #[test]
360    fn query_health_report_round_trips_through_serde() {
361        let report = crate::QueryHealthReport {
362            state: crate::QueryHealthState::Degraded,
363            last_error: Some(DiagnosticSummary::from(&diagnostic(
364                error_codes::SC_LOG_QUERY_DECODE,
365                "decode failure",
366            ))),
367        };
368
369        let encoded = serde_json::to_value(&report).expect("serialize report");
370        let decoded: crate::QueryHealthReport =
371            serde_json::from_value(encoded).expect("deserialize report");
372        assert_eq!(decoded, report);
373    }
374}