1use std::sync::LazyLock;
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use thiserror::Error;
6
7use crate::{
8 ActionName, CorrelationId, Diagnostic, DiagnosticInfo, ErrorCode, ErrorContext, Level,
9 LogEvent, Remediation, ServiceName, TargetCategory, Timestamp, error_codes, sealed,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
14pub enum LogOrder {
15 #[default]
16 OldestFirst,
18 NewestFirst,
20}
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24pub struct LogFieldMatch {
25 pub field: String,
27 pub value: Value,
29}
30
31impl LogFieldMatch {
32 #[must_use]
34 pub fn equals(field: impl Into<String>, value: Value) -> Self {
35 Self {
36 field: field.into(),
37 value,
38 }
39 }
40}
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
44pub struct LogQuery {
45 pub service: Option<ServiceName>,
47 pub levels: Vec<Level>,
49 pub target: Option<TargetCategory>,
51 pub action: Option<ActionName>,
53 pub request_id: Option<CorrelationId>,
55 pub correlation_id: Option<CorrelationId>,
57 pub since: Option<Timestamp>,
59 pub until: Option<Timestamp>,
61 pub field_matches: Vec<LogFieldMatch>,
63 pub limit: Option<usize>,
65 pub order: LogOrder,
67}
68
69impl LogQuery {
70 pub fn validate(&self) -> Result<(), QueryError> {
77 if self.limit == Some(0) {
78 return Err(QueryError::invalid_query(
79 "query limit must be greater than zero when provided",
80 ));
81 }
82
83 if matches!((self.since, self.until), (Some(since), Some(until)) if since > until) {
84 return Err(QueryError::invalid_query(
85 "query since timestamp must be less than or equal to until",
86 ));
87 }
88
89 if self
90 .field_matches
91 .iter()
92 .any(|field_match| field_match.field.trim().is_empty())
93 {
94 return Err(QueryError::invalid_query(
95 "query field match names must not be empty",
96 ));
97 }
98
99 Ok(())
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
105pub struct LogSnapshot {
106 pub events: Vec<LogEvent>,
108 pub truncated: bool,
110}
111
112#[derive(Debug, PartialEq, Serialize, Deserialize, Error)]
114pub enum QueryError {
115 #[error("{0}")]
116 InvalidQuery(#[source] Box<ErrorContext>),
118 #[error("{0}")]
119 Io(#[source] Box<ErrorContext>),
121 #[error("{0}")]
122 Decode(#[source] Box<ErrorContext>),
124 #[error("{0}")]
125 Unavailable(#[source] Box<ErrorContext>),
127 #[error("query runtime shut down")]
128 Shutdown,
130}
131
132impl QueryError {
133 #[must_use]
135 pub fn code(&self) -> ErrorCode {
136 match self {
137 Self::InvalidQuery(_) => error_codes::SC_LOG_QUERY_INVALID_QUERY,
138 Self::Io(_) => error_codes::SC_LOG_QUERY_IO,
139 Self::Decode(_) => error_codes::SC_LOG_QUERY_DECODE,
140 Self::Unavailable(_) => error_codes::SC_LOG_QUERY_UNAVAILABLE,
141 Self::Shutdown => error_codes::SC_LOG_QUERY_SHUTDOWN,
142 }
143 }
144
145 #[must_use]
147 pub fn diagnostic(&self) -> &Diagnostic {
148 match self {
149 Self::InvalidQuery(context)
150 | Self::Io(context)
151 | Self::Decode(context)
152 | Self::Unavailable(context) => context.diagnostic(),
153 Self::Shutdown => shutdown_diagnostic(),
154 }
155 }
156
157 #[must_use]
159 pub fn invalid_query(message: impl Into<String>) -> Self {
160 Self::InvalidQuery(Box::new(ErrorContext::new(
161 error_codes::SC_LOG_QUERY_INVALID_QUERY,
162 message,
163 Remediation::recoverable("correct the query parameters", ["retry the query"]),
164 )))
165 }
166}
167
168impl sealed::Sealed for QueryError {}
169
170impl DiagnosticInfo for QueryError {
171 fn diagnostic(&self) -> &Diagnostic {
172 self.diagnostic()
173 }
174}
175
176fn shutdown_diagnostic() -> &'static Diagnostic {
177 static DIAGNOSTIC: LazyLock<Diagnostic> = LazyLock::new(|| Diagnostic {
178 timestamp: Timestamp::UNIX_EPOCH,
179 code: error_codes::SC_LOG_QUERY_SHUTDOWN,
180 message: "query runtime shut down".to_string(),
181 cause: None,
182 remediation: Remediation::recoverable("restart the logger", ["retry"]),
183 docs: None,
184 details: serde_json::Map::new(),
185 });
186
187 &DIAGNOSTIC
188}
189
190#[cfg(test)]
191mod tests {
192 use serde_json::{Map, json};
193 use time::Duration;
194
195 use super::*;
196 use crate::{DiagnosticSummary, ProcessIdentity, TraceContext};
197
198 fn service_name() -> ServiceName {
199 ServiceName::new("sc-observability").expect("valid service name")
200 }
201
202 fn target_category() -> TargetCategory {
203 TargetCategory::new("logging.query").expect("valid target category")
204 }
205
206 fn action_name() -> ActionName {
207 ActionName::new("query.executed").expect("valid action name")
208 }
209
210 fn trace_context() -> TraceContext {
211 TraceContext {
212 trace_id: crate::TraceId::new("0123456789abcdef0123456789abcdef")
213 .expect("valid trace id"),
214 span_id: crate::SpanId::new("0123456789abcdef").expect("valid span id"),
215 parent_span_id: None,
216 }
217 }
218
219 fn diagnostic(code: ErrorCode, message: &str) -> Diagnostic {
220 Diagnostic {
221 timestamp: Timestamp::UNIX_EPOCH,
222 code,
223 message: message.to_string(),
224 cause: Some("root cause".to_string()),
225 remediation: Remediation::recoverable("fix input", ["retry"]),
226 docs: Some("https://example.test/query".to_string()),
227 details: Map::from_iter([("line".to_string(), json!(12))]),
228 }
229 }
230
231 fn log_event() -> LogEvent {
232 LogEvent {
233 version: crate::SchemaVersion::new("v1").expect("valid schema version"),
234 timestamp: Timestamp::UNIX_EPOCH,
235 level: Level::Info,
236 service: service_name(),
237 target: target_category(),
238 action: action_name(),
239 message: Some("query event".to_string()),
240 identity: ProcessIdentity::default(),
241 trace: Some(trace_context()),
242 request_id: Some(CorrelationId::new("req-1").expect("valid request id")),
243 correlation_id: Some(CorrelationId::new("corr-1").expect("valid correlation id")),
244 outcome: Some(crate::OutcomeLabel::new("success").expect("valid outcome")),
245 diagnostic: None,
246 state_transition: None,
247 fields: Map::from_iter([("status".to_string(), json!("ok"))]),
248 }
249 }
250
251 #[test]
252 fn log_query_round_trips_through_serde() {
253 let query = LogQuery {
254 service: Some(service_name()),
255 levels: vec![Level::Info, Level::Warn],
256 target: Some(target_category()),
257 action: Some(action_name()),
258 request_id: Some(CorrelationId::new("req-1").expect("valid request id")),
259 correlation_id: Some(CorrelationId::new("corr-1").expect("valid correlation id")),
260 since: Some(Timestamp::UNIX_EPOCH),
261 until: Some(Timestamp::UNIX_EPOCH + Duration::minutes(5)),
262 field_matches: vec![LogFieldMatch::equals("status", json!("ok"))],
263 limit: Some(25),
264 order: LogOrder::NewestFirst,
265 };
266
267 let encoded = serde_json::to_value(&query).expect("serialize query");
268 let decoded: LogQuery = serde_json::from_value(encoded).expect("deserialize query");
269 assert_eq!(decoded, query);
270 decoded.validate().expect("valid query");
271 }
272
273 #[test]
274 fn log_query_validation_rejects_invalid_ranges_and_limits() {
275 let invalid_limit = LogQuery {
276 limit: Some(0),
277 ..LogQuery::default()
278 };
279 let invalid_range = LogQuery {
280 since: Some(Timestamp::UNIX_EPOCH + Duration::hours(1)),
281 until: Some(Timestamp::UNIX_EPOCH),
282 ..LogQuery::default()
283 };
284 let invalid_field = LogQuery {
285 field_matches: vec![LogFieldMatch::equals("", json!("ok"))],
286 ..LogQuery::default()
287 };
288
289 assert_eq!(
290 invalid_limit.validate().expect_err("invalid limit").code(),
291 error_codes::SC_LOG_QUERY_INVALID_QUERY
292 );
293 assert_eq!(
294 invalid_range.validate().expect_err("invalid range").code(),
295 error_codes::SC_LOG_QUERY_INVALID_QUERY
296 );
297 assert_eq!(
298 invalid_field
299 .validate()
300 .expect_err("invalid field name")
301 .code(),
302 error_codes::SC_LOG_QUERY_INVALID_QUERY
303 );
304 }
305
306 #[test]
307 fn log_snapshot_round_trips_through_serde() {
308 let snapshot = LogSnapshot {
309 events: vec![log_event()],
310 truncated: true,
311 };
312
313 let encoded = serde_json::to_value(&snapshot).expect("serialize snapshot");
314 let decoded: LogSnapshot = serde_json::from_value(encoded).expect("deserialize snapshot");
315 assert_eq!(decoded, snapshot);
316 }
317
318 #[test]
319 fn query_error_variants_round_trip_and_match_stable_codes() {
320 let variants = [
321 QueryError::InvalidQuery(Box::new(ErrorContext::new(
322 error_codes::SC_LOG_QUERY_INVALID_QUERY,
323 "invalid query",
324 Remediation::recoverable("fix the query", ["retry"]),
325 ))),
326 QueryError::Io(Box::new(ErrorContext::new(
327 error_codes::SC_LOG_QUERY_IO,
328 "i/o failure",
329 Remediation::recoverable("check the log path", ["retry"]),
330 ))),
331 QueryError::Decode(Box::new(ErrorContext::new(
332 error_codes::SC_LOG_QUERY_DECODE,
333 "decode failure",
334 Remediation::recoverable("repair the malformed record", ["retry"]),
335 ))),
336 QueryError::Unavailable(Box::new(ErrorContext::new(
337 error_codes::SC_LOG_QUERY_UNAVAILABLE,
338 "query unavailable",
339 Remediation::recoverable("wait for logging to recover", ["retry"]),
340 ))),
341 QueryError::Shutdown,
342 ];
343
344 for error in variants {
345 let encoded = serde_json::to_value(&error).expect("serialize error");
346 let decoded: QueryError = serde_json::from_value(encoded).expect("deserialize error");
347 assert_eq!(decoded, error);
348 assert_eq!(decoded.diagnostic().code, decoded.code());
349 }
350 }
351
352 #[test]
353 fn query_shutdown_uses_static_diagnostic() {
354 let error = QueryError::Shutdown;
355 assert_eq!(error.code(), error_codes::SC_LOG_QUERY_SHUTDOWN);
356 assert_eq!(error.diagnostic().message, "query runtime shut down");
357 }
358
359 #[test]
360 fn query_health_report_round_trips_through_serde() {
361 let report = crate::QueryHealthReport {
362 state: crate::QueryHealthState::Degraded,
363 last_error: Some(DiagnosticSummary::from(&diagnostic(
364 error_codes::SC_LOG_QUERY_DECODE,
365 "decode failure",
366 ))),
367 };
368
369 let encoded = serde_json::to_value(&report).expect("serialize report");
370 let decoded: crate::QueryHealthReport =
371 serde_json::from_value(encoded).expect("deserialize report");
372 assert_eq!(decoded, report);
373 }
374}