Skip to main content

runledger_postgres/
error.rs

1use std::{fmt, sync::Arc};
2
3use sqlx::error::ErrorKind;
4
5mod classify;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum QueryErrorCategory {
9    Conflict,
10    Validation,
11    Forbidden,
12    Internal,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub struct FrameworkConstraintSpec {
17    category: QueryErrorCategory,
18    code: &'static str,
19    client_message: &'static str,
20}
21
22impl FrameworkConstraintSpec {
23    #[must_use]
24    pub const fn new(
25        category: QueryErrorCategory,
26        code: &'static str,
27        client_message: &'static str,
28    ) -> Self {
29        Self {
30            category,
31            code,
32            client_message,
33        }
34    }
35
36    #[must_use]
37    pub const fn category(&self) -> QueryErrorCategory {
38        self.category
39    }
40
41    #[must_use]
42    pub const fn code(&self) -> &'static str {
43        self.code
44    }
45
46    #[must_use]
47    pub const fn client_message(&self) -> &'static str {
48        self.client_message
49    }
50}
51
52#[derive(Clone)]
53pub struct QueryError {
54    category: QueryErrorCategory,
55    code: &'static str,
56    client_message: &'static str,
57    sqlstate: Option<String>,
58    constraint: Option<String>,
59    message: String,
60    source: Option<Arc<sqlx::Error>>,
61}
62
63impl QueryError {
64    #[must_use]
65    pub fn from_classified(
66        category: QueryErrorCategory,
67        code: &'static str,
68        client_message: &'static str,
69        internal_message: impl Into<String>,
70    ) -> Self {
71        Self {
72            category,
73            code,
74            client_message,
75            sqlstate: None,
76            constraint: None,
77            message: internal_message.into(),
78            source: None,
79        }
80    }
81
82    #[must_use]
83    pub(crate) fn from_classified_sqlx(
84        category: QueryErrorCategory,
85        code: &'static str,
86        client_message: &'static str,
87        internal_message: impl Into<String>,
88        source: sqlx::Error,
89    ) -> Self {
90        let (sqlstate, constraint) = source
91            .as_database_error()
92            .map(|database_error| {
93                (
94                    database_error.code().map(|code| code.into_owned()),
95                    database_error.constraint().map(ToOwned::to_owned),
96                )
97            })
98            .unwrap_or((None, None));
99
100        Self {
101            category,
102            code,
103            client_message,
104            sqlstate,
105            constraint,
106            message: internal_message.into(),
107            source: Some(Arc::new(source)),
108        }
109    }
110
111    #[must_use]
112    pub fn from_sqlx_with_constraint_classifier<F>(
113        error: sqlx::Error,
114        context: Option<&str>,
115        classify_constraint: F,
116    ) -> Self
117    where
118        F: Fn(&str) -> Option<FrameworkConstraintSpec>,
119    {
120        let (sqlstate, constraint, spec, raw_message) = if let Some(db) = error.as_database_error()
121        {
122            let sqlstate = db.code().map(|code| code.into_owned());
123            let constraint = db.constraint().map(ToOwned::to_owned);
124            let spec = classify_query_error_with_constraint_classifier(
125                &db.kind(),
126                sqlstate.as_deref(),
127                constraint.as_deref(),
128                classify_constraint,
129            );
130            (sqlstate, constraint, spec, db.message().to_owned())
131        } else {
132            (
133                None,
134                None,
135                QueryErrorSpec::internal().into(),
136                error.to_string(),
137            )
138        };
139
140        let message = match context {
141            Some(ctx) => format!("{ctx}: {raw_message}"),
142            None => raw_message,
143        };
144
145        Self {
146            category: spec.category(),
147            code: spec.code(),
148            client_message: spec.client_message(),
149            sqlstate,
150            constraint,
151            message,
152            source: Some(Arc::new(error)),
153        }
154    }
155
156    pub(crate) fn from_sqlx(error: sqlx::Error, context: Option<&str>) -> Self {
157        Self::from_sqlx_with_constraint_classifier(error, context, |_| None)
158    }
159
160    #[must_use]
161    pub const fn category(&self) -> QueryErrorCategory {
162        self.category
163    }
164
165    #[must_use]
166    pub const fn code(&self) -> &'static str {
167        self.code
168    }
169
170    #[must_use]
171    pub const fn client_message(&self) -> &'static str {
172        self.client_message
173    }
174
175    #[must_use]
176    pub fn sqlstate(&self) -> Option<&str> {
177        self.sqlstate.as_deref()
178    }
179
180    #[must_use]
181    pub fn constraint(&self) -> Option<&str> {
182        self.constraint.as_deref()
183    }
184
185    #[must_use]
186    pub fn internal_message(&self) -> &str {
187        &self.message
188    }
189
190    /// Returns the underlying SQLx error for trusted diagnostics.
191    ///
192    /// Public [`Display`](fmt::Display) and [`Debug`](fmt::Debug) output for
193    /// [`QueryError`] is sanitized, but the returned source may contain raw
194    /// database details. Do not log or expose it on untrusted boundaries without
195    /// redaction.
196    #[must_use]
197    pub fn source_arc(&self) -> Option<Arc<sqlx::Error>> {
198        self.source.clone()
199    }
200
201    #[must_use]
202    pub fn reclassified_with_constraint_classifier<F>(mut self, classify_constraint: F) -> Self
203    where
204        F: Fn(&str) -> Option<FrameworkConstraintSpec>,
205    {
206        let Some(spec) = self.constraint.as_deref().and_then(classify_constraint) else {
207            return self;
208        };
209
210        self.category = spec.category();
211        self.code = spec.code();
212        self.client_message = spec.client_message();
213        self
214    }
215}
216
217impl fmt::Debug for QueryError {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        f.debug_struct("QueryError")
220            .field("category", &self.category)
221            .field("code", &self.code)
222            .field("client_message", &self.client_message)
223            .field("sqlstate", &self.sqlstate)
224            .field("constraint", &self.constraint)
225            .field("has_source", &self.source.is_some())
226            .finish()
227    }
228}
229
230impl fmt::Display for QueryError {
231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232        write!(f, "{}", self.client_message)
233    }
234}
235
236impl std::error::Error for QueryError {
237    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
238        self.source
239            .as_deref()
240            .map(|source| source as &(dyn std::error::Error + 'static))
241    }
242}
243
244#[derive(Debug, Clone, Copy)]
245struct QueryErrorSpec {
246    category: QueryErrorCategory,
247    code: &'static str,
248    client_message: &'static str,
249}
250
251impl QueryErrorSpec {
252    const fn conflict(code: &'static str, client_message: &'static str) -> Self {
253        Self {
254            category: QueryErrorCategory::Conflict,
255            code,
256            client_message,
257        }
258    }
259
260    const fn validation(code: &'static str, client_message: &'static str) -> Self {
261        Self {
262            category: QueryErrorCategory::Validation,
263            code,
264            client_message,
265        }
266    }
267
268    const fn forbidden(code: &'static str, client_message: &'static str) -> Self {
269        Self {
270            category: QueryErrorCategory::Forbidden,
271            code,
272            client_message,
273        }
274    }
275
276    const fn internal() -> Self {
277        Self {
278            category: QueryErrorCategory::Internal,
279            code: "db.query_failed",
280            client_message: "Database operation failed.",
281        }
282    }
283}
284
285impl From<QueryErrorSpec> for FrameworkConstraintSpec {
286    fn from(spec: QueryErrorSpec) -> Self {
287        Self::new(spec.category, spec.code, spec.client_message)
288    }
289}
290
291#[must_use]
292pub fn classify_query_error(
293    kind: &ErrorKind,
294    sqlstate: Option<&str>,
295    constraint: Option<&str>,
296) -> FrameworkConstraintSpec {
297    classify_query_error_with_constraint_classifier(kind, sqlstate, constraint, |_| None)
298}
299
300#[must_use]
301pub fn classify_query_error_with_constraint_classifier<F>(
302    kind: &ErrorKind,
303    sqlstate: Option<&str>,
304    constraint: Option<&str>,
305    classify_constraint: F,
306) -> FrameworkConstraintSpec
307where
308    F: Fn(&str) -> Option<FrameworkConstraintSpec>,
309{
310    if let Some(spec) = constraint.and_then(classify_constraint) {
311        return spec;
312    }
313
314    classify_database_error(kind, sqlstate, constraint).into()
315}
316
317fn classify_database_error(
318    kind: &ErrorKind,
319    sqlstate: Option<&str>,
320    constraint: Option<&str>,
321) -> QueryErrorSpec {
322    if let Some(spec) = constraint.and_then(classify_constraint) {
323        return spec;
324    }
325
326    match (kind, sqlstate) {
327        (ErrorKind::UniqueViolation, _) | (_, Some("23505")) => {
328            QueryErrorSpec::conflict("db.unique_violation", "Resource already exists.")
329        }
330        (ErrorKind::ForeignKeyViolation, _) | (_, Some("23503")) => QueryErrorSpec::validation(
331            "db.related_resource_missing",
332            "Related resource does not exist.",
333        ),
334        (_, Some("23001")) => QueryErrorSpec::validation(
335            "db.related_resource_still_referenced",
336            "Related resource is still referenced and cannot be deleted.",
337        ),
338        (ErrorKind::CheckViolation, _) | (_, Some("23514")) => QueryErrorSpec::validation(
339            "db.business_rule_violation",
340            "Request violates a business rule.",
341        ),
342        (ErrorKind::NotNullViolation, _) | (_, Some("23502")) => {
343            QueryErrorSpec::validation("db.required_field_missing", "Required data is missing.")
344        }
345        (_, Some("42501")) => {
346            QueryErrorSpec::forbidden("db.permission_denied", "Operation is not allowed.")
347        }
348        _ => QueryErrorSpec::internal(),
349    }
350}
351
352fn classify_constraint(constraint: &str) -> Option<QueryErrorSpec> {
353    classify::classify_constraint(constraint)
354}
355
356#[must_use]
357pub fn classify_framework_constraint(constraint: &str) -> Option<FrameworkConstraintSpec> {
358    classify_constraint(constraint).map(FrameworkConstraintSpec::from)
359}
360
361#[must_use]
362pub fn has_framework_constraint_classifier(constraint: &str) -> bool {
363    classify_framework_constraint(constraint).is_some()
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    #[test]
371    fn classifies_job_idempotency_constraint() {
372        let spec = classify_database_error(
373            &ErrorKind::UniqueViolation,
374            Some("23505"),
375            Some("uq_job_queue_type_idempotency_org"),
376        );
377        assert_eq!(spec.category, QueryErrorCategory::Conflict);
378        assert_eq!(spec.code, "job.already_enqueued");
379    }
380
381    #[test]
382    fn classifies_global_job_idempotency_constraint() {
383        let spec = classify_database_error(
384            &ErrorKind::UniqueViolation,
385            Some("23505"),
386            Some("uq_job_queue_type_idempotency_global"),
387        );
388        assert_eq!(spec.category, QueryErrorCategory::Conflict);
389        assert_eq!(spec.code, "job.already_enqueued");
390    }
391
392    #[test]
393    fn classifies_workflow_idempotency_constraint() {
394        let spec = classify_database_error(
395            &ErrorKind::UniqueViolation,
396            Some("23505"),
397            Some("uq_workflow_runs_type_idempotency_org"),
398        );
399        assert_eq!(spec.category, QueryErrorCategory::Conflict);
400        assert_eq!(spec.code, "workflow.already_enqueued");
401    }
402
403    #[test]
404    fn classifies_global_workflow_idempotency_constraint() {
405        let spec = classify_database_error(
406            &ErrorKind::UniqueViolation,
407            Some("23505"),
408            Some("uq_workflow_runs_type_idempotency_global"),
409        );
410        assert_eq!(spec.category, QueryErrorCategory::Conflict);
411        assert_eq!(spec.code, "workflow.already_enqueued");
412    }
413
414    #[test]
415    fn classifies_job_definition_fk_constraint() {
416        let spec = classify_database_error(
417            &ErrorKind::ForeignKeyViolation,
418            Some("23503"),
419            Some("fk_job_queue_job_type"),
420        );
421        assert_eq!(spec.category, QueryErrorCategory::Validation);
422        assert_eq!(spec.code, "job.definition_not_found");
423    }
424
425    #[test]
426    fn classifies_job_runtime_config_definition_fk_constraint() {
427        let spec = classify_database_error(
428            &ErrorKind::ForeignKeyViolation,
429            Some("23503"),
430            Some("fk_job_runtime_configs_job_type"),
431        );
432        assert_eq!(spec.category, QueryErrorCategory::Validation);
433        assert_eq!(spec.code, "job.definition_not_found");
434    }
435
436    #[test]
437    fn classifies_job_organization_fk_constraint() {
438        let spec = classify_database_error(
439            &ErrorKind::ForeignKeyViolation,
440            Some("23503"),
441            Some("fk_job_queue_organization"),
442        );
443        assert_eq!(spec.category, QueryErrorCategory::Validation);
444        assert_eq!(spec.code, "job.organization_not_found");
445    }
446
447    #[test]
448    fn classifies_workflow_linkage_symmetry_constraint() {
449        let spec = classify_database_error(
450            &ErrorKind::CheckViolation,
451            Some("23514"),
452            Some("os_workflow_job_linkage_symmetry"),
453        );
454        assert_eq!(spec.category, QueryErrorCategory::Validation);
455        assert_eq!(spec.code, "workflow.linkage_symmetry_violation");
456    }
457
458    #[test]
459    fn classifies_workflow_linkage_symmetry_trigger_table_constraint() {
460        let spec = classify_database_error(
461            &ErrorKind::CheckViolation,
462            Some("23514"),
463            Some("os_workflow_job_linkage_symmetry_trigger_table"),
464        );
465        assert_eq!(spec.category, QueryErrorCategory::Validation);
466        assert_eq!(spec.code, "workflow.linkage_symmetry_trigger_table_invalid");
467    }
468
469    #[test]
470    fn classifies_external_gate_downgrade_blocked_constraint() {
471        let spec = classify_database_error(
472            &ErrorKind::CheckViolation,
473            Some("23514"),
474            Some("os_workflow_external_gate_downgrade_waiting_runs_exist"),
475        );
476        assert_eq!(spec.category, QueryErrorCategory::Validation);
477        assert_eq!(spec.code, "workflow.external_gate_downgrade_blocked");
478    }
479
480    #[test]
481    fn custom_constraint_classifier_takes_precedence() {
482        let spec = classify_query_error_with_constraint_classifier(
483            &ErrorKind::UniqueViolation,
484            Some("23505"),
485            Some("os_custom_override"),
486            |constraint| {
487                (constraint == "os_custom_override").then_some(FrameworkConstraintSpec::new(
488                    QueryErrorCategory::Forbidden,
489                    "custom.override",
490                    "Custom override wins.",
491                ))
492            },
493        );
494        assert_eq!(spec.category(), QueryErrorCategory::Forbidden);
495        assert_eq!(spec.code(), "custom.override");
496        assert_eq!(spec.client_message(), "Custom override wins.");
497    }
498
499    #[test]
500    fn query_error_debug_omits_internal_message() {
501        let error = QueryError::from_classified(
502            QueryErrorCategory::Conflict,
503            "job.idempotency_conflict",
504            "Job enqueue retry conflicts with the existing idempotency key.",
505            "internal context includes secret-idempotency-key",
506        );
507
508        let debug = format!("{error:?}");
509        assert!(debug.contains("job.idempotency_conflict"));
510        assert!(!debug.contains("secret-idempotency-key"));
511
512        let display = error.to_string();
513        assert_eq!(
514            display,
515            "Job enqueue retry conflicts with the existing idempotency key."
516        );
517        assert!(!display.contains("secret-idempotency-key"));
518    }
519
520    #[test]
521    fn query_error_from_sqlx_uses_sanitized_display_and_debug() {
522        let error = QueryError::from_sqlx(
523            sqlx::Error::Protocol("internal secret-idempotency-key detail".into()),
524            Some("sensitive context"),
525        );
526
527        let display = error.to_string();
528        assert_eq!(display, "Database operation failed.");
529        assert!(!display.contains("secret-idempotency-key"));
530
531        let debug = format!("{error:?}");
532        assert!(debug.contains("db.query_failed"));
533        assert!(!debug.contains("secret-idempotency-key"));
534        assert!(error.internal_message().contains("secret-idempotency-key"));
535        assert!(std::error::Error::source(&error).is_some());
536        assert!(error.source_arc().is_some());
537    }
538
539    #[test]
540    fn query_error_from_classified_sqlx_preserves_source_without_leaking_display() {
541        let error = QueryError::from_classified_sqlx(
542            QueryErrorCategory::Conflict,
543            "workflow.release_conflict",
544            "Workflow step release conflicted with another workflow mutation.",
545            "internal context includes secret-lock-key",
546            sqlx::Error::Protocol("database detail includes secret-lock-key".into()),
547        );
548
549        assert_eq!(error.category(), QueryErrorCategory::Conflict);
550        assert_eq!(error.code(), "workflow.release_conflict");
551        assert_eq!(
552            error.client_message(),
553            "Workflow step release conflicted with another workflow mutation."
554        );
555        assert!(error.internal_message().contains("secret-lock-key"));
556        assert!(error.source_arc().is_some());
557        assert!(std::error::Error::source(&error).is_some());
558
559        let display = error.to_string();
560        assert_eq!(
561            display,
562            "Workflow step release conflicted with another workflow mutation."
563        );
564        assert!(!display.contains("secret-lock-key"));
565
566        let debug = format!("{error:?}");
567        assert!(debug.contains("workflow.release_conflict"));
568        assert!(debug.contains("has_source: true"));
569        assert!(!debug.contains("secret-lock-key"));
570    }
571
572    #[test]
573    fn classifies_permission_denied() {
574        let spec = classify_database_error(&ErrorKind::Other, Some("42501"), None);
575        assert_eq!(spec.category, QueryErrorCategory::Forbidden);
576        assert_eq!(spec.code, "db.permission_denied");
577    }
578
579    #[test]
580    fn falls_back_to_internal_for_unmapped_errors() {
581        let spec = classify_database_error(&ErrorKind::Other, Some("99999"), Some("not_mapped"));
582        assert_eq!(spec.category, QueryErrorCategory::Internal);
583        assert_eq!(spec.code, "db.query_failed");
584    }
585}