Skip to main content

drasi_lib/
recovery.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Recovery policy and error types for checkpoint-based recovery.
16//!
17//! See the design doc at
18//! <https://github.com/drasi-project/design-documents/tree/main/drasi-lib/Source-Checkpoints>
19//! (doc 03 §4 — Recovery Policies) for the semantic rationale.
20
21use serde::{Deserialize, Serialize};
22
23/// Behavior when a source cannot honor a requested resume position.
24///
25/// Configured per-query via `QueryConfig::recovery_policy`. When the field is
26/// `None`, a future global default applies (which itself defaults to `Strict`).
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum RecoveryPolicy {
30    /// Fail startup if the source cannot honor the requested resume position
31    /// (e.g., WAL pruned past the checkpoint, replication slot invalidated).
32    /// Requires manual intervention. Favors correctness over availability.
33    #[default]
34    Strict,
35    /// Automatically wipe the query's persistent index and perform a full
36    /// re-bootstrap on gap detection. Favors availability over consistency.
37    AutoReset,
38}
39
40/// Errors specific to checkpoint-based recovery.
41#[derive(Debug, thiserror::Error)]
42pub enum RecoveryError {
43    /// A persistent query requires all of its sources to support positional
44    /// replay, but one or more do not (e.g., transient HTTP source with WAL
45    /// disabled).
46    #[error("Incompatible source '{source_id}' for persistent query '{query_id}': {reason}")]
47    IncompatibleSource {
48        query_id: String,
49        source_id: String,
50        reason: String,
51    },
52
53    /// The `AutoReset` recovery policy fired: the query's index has been
54    /// wiped and a full bootstrap will follow.
55    #[error("Auto-reset triggered for query '{query_id}': {reason}")]
56    AutoResetTriggered { query_id: String, reason: String },
57}
58
59#[cfg(test)]
60mod tests {
61    use super::*;
62
63    #[test]
64    fn default_is_strict() {
65        assert_eq!(RecoveryPolicy::default(), RecoveryPolicy::Strict);
66    }
67
68    #[test]
69    fn json_round_trip_strict() {
70        let json = serde_json::to_string(&RecoveryPolicy::Strict).unwrap();
71        assert_eq!(json, "\"strict\"");
72        let back: RecoveryPolicy = serde_json::from_str(&json).unwrap();
73        assert_eq!(back, RecoveryPolicy::Strict);
74    }
75
76    #[test]
77    fn json_round_trip_auto_reset() {
78        let json = serde_json::to_string(&RecoveryPolicy::AutoReset).unwrap();
79        assert_eq!(json, "\"auto_reset\"");
80        let back: RecoveryPolicy = serde_json::from_str(&json).unwrap();
81        assert_eq!(back, RecoveryPolicy::AutoReset);
82    }
83
84    #[test]
85    fn yaml_round_trip() {
86        for policy in [RecoveryPolicy::Strict, RecoveryPolicy::AutoReset] {
87            let yaml = serde_yaml::to_string(&policy).unwrap();
88            let back: RecoveryPolicy = serde_yaml::from_str(&yaml).unwrap();
89            assert_eq!(back, policy);
90        }
91    }
92
93    #[test]
94    fn incompatible_source_message() {
95        let err = RecoveryError::IncompatibleSource {
96            query_id: "q1".into(),
97            source_id: "s1".into(),
98            reason: "no WAL".into(),
99        };
100        let msg = err.to_string();
101        assert!(msg.contains("q1"));
102        assert!(msg.contains("s1"));
103        assert!(msg.contains("no WAL"));
104    }
105
106    #[test]
107    fn auto_reset_message() {
108        let err = RecoveryError::AutoResetTriggered {
109            query_id: "q1".into(),
110            reason: "gap".into(),
111        };
112        let msg = err.to_string();
113        assert!(msg.contains("q1"));
114        assert!(msg.contains("gap"));
115    }
116}