Skip to main content

drasi_lib/
recovery.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Recovery policy and error types for checkpoint-based recovery.
16//!
17//! See the design doc at
18//! <https://github.com/drasi-project/design-documents/tree/main/drasi-lib/Source-Checkpoints>
19//! (doc 03 §4 — Recovery Policies) for the semantic rationale.
20
21use serde::{Deserialize, Serialize};
22
23/// Behavior when a source cannot honor a requested resume position.
24///
25/// Configured per-query via `QueryConfig::recovery_policy`. When the field is
26/// `None`, a future global default applies (which itself defaults to `Strict`).
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum RecoveryPolicy {
30    /// Fail startup if the source cannot honor the requested resume position
31    /// (e.g., WAL pruned past the checkpoint, replication slot invalidated).
32    /// Requires manual intervention. Favors correctness over availability.
33    #[default]
34    Strict,
35    /// Automatically wipe the query's persistent index and perform a full
36    /// re-bootstrap on gap detection. Favors availability over consistency.
37    AutoReset,
38}
39
40/// Behavior when a reaction's checkpoint falls behind the query outbox.
41///
42/// Configured per-reaction via `ReactionBaseParams::recovery_policy` or by
43/// overriding `Reaction::default_recovery_policy()`. Unlike the query-side
44/// `RecoveryPolicy`, reactions support an additional `AutoSkipGap` variant.
45#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47#[repr(u8)]
48pub enum ReactionRecoveryPolicy {
49    /// Fail startup if the outbox cannot satisfy the checkpoint position.
50    /// Requires manual intervention. Favors correctness over availability.
51    #[default]
52    Strict,
53    /// Wipe checkpoint and re-bootstrap from a full snapshot.
54    /// Favors availability; the reaction re-processes everything from scratch.
55    AutoReset,
56    /// Skip the gap and resume from the latest available outbox entry.
57    /// Accepts potential data loss in exchange for minimal disruption.
58    AutoSkipGap,
59}
60
61/// Errors specific to checkpoint-based recovery.
62#[derive(Debug, thiserror::Error)]
63pub enum RecoveryError {
64    /// A persistent query requires all of its sources to support positional
65    /// replay, but one or more do not (e.g., transient HTTP source with WAL
66    /// disabled).
67    #[error("Incompatible source '{source_id}' for persistent query '{query_id}': {reason}")]
68    IncompatibleSource {
69        query_id: String,
70        source_id: String,
71        reason: String,
72    },
73
74    /// The `AutoReset` recovery policy fired: the query's index has been
75    /// wiped and a full bootstrap will follow.
76    #[error("Auto-reset triggered for query '{query_id}': {reason}")]
77    AutoResetTriggered { query_id: String, reason: String },
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83
84    #[test]
85    fn default_is_strict() {
86        assert_eq!(RecoveryPolicy::default(), RecoveryPolicy::Strict);
87    }
88
89    #[test]
90    fn json_round_trip_strict() {
91        let json = serde_json::to_string(&RecoveryPolicy::Strict).unwrap();
92        assert_eq!(json, "\"strict\"");
93        let back: RecoveryPolicy = serde_json::from_str(&json).unwrap();
94        assert_eq!(back, RecoveryPolicy::Strict);
95    }
96
97    #[test]
98    fn json_round_trip_auto_reset() {
99        let json = serde_json::to_string(&RecoveryPolicy::AutoReset).unwrap();
100        assert_eq!(json, "\"auto_reset\"");
101        let back: RecoveryPolicy = serde_json::from_str(&json).unwrap();
102        assert_eq!(back, RecoveryPolicy::AutoReset);
103    }
104
105    #[test]
106    fn yaml_round_trip() {
107        for policy in [RecoveryPolicy::Strict, RecoveryPolicy::AutoReset] {
108            let yaml = serde_yaml::to_string(&policy).unwrap();
109            let back: RecoveryPolicy = serde_yaml::from_str(&yaml).unwrap();
110            assert_eq!(back, policy);
111        }
112    }
113
114    #[test]
115    fn incompatible_source_message() {
116        let err = RecoveryError::IncompatibleSource {
117            query_id: "q1".into(),
118            source_id: "s1".into(),
119            reason: "no WAL".into(),
120        };
121        let msg = err.to_string();
122        assert!(msg.contains("q1"));
123        assert!(msg.contains("s1"));
124        assert!(msg.contains("no WAL"));
125    }
126
127    #[test]
128    fn auto_reset_message() {
129        let err = RecoveryError::AutoResetTriggered {
130            query_id: "q1".into(),
131            reason: "gap".into(),
132        };
133        let msg = err.to_string();
134        assert!(msg.contains("q1"));
135        assert!(msg.contains("gap"));
136    }
137
138    #[test]
139    fn reaction_default_is_strict() {
140        assert_eq!(
141            ReactionRecoveryPolicy::default(),
142            ReactionRecoveryPolicy::Strict
143        );
144    }
145
146    #[test]
147    fn reaction_json_round_trip() {
148        for (policy, expected_json) in [
149            (ReactionRecoveryPolicy::Strict, "\"strict\""),
150            (ReactionRecoveryPolicy::AutoReset, "\"auto_reset\""),
151            (ReactionRecoveryPolicy::AutoSkipGap, "\"auto_skip_gap\""),
152        ] {
153            let json = serde_json::to_string(&policy).unwrap();
154            assert_eq!(json, expected_json);
155            let back: ReactionRecoveryPolicy = serde_json::from_str(&json).unwrap();
156            assert_eq!(back, policy);
157        }
158    }
159
160    #[test]
161    fn reaction_yaml_round_trip() {
162        for policy in [
163            ReactionRecoveryPolicy::Strict,
164            ReactionRecoveryPolicy::AutoReset,
165            ReactionRecoveryPolicy::AutoSkipGap,
166        ] {
167            let yaml = serde_yaml::to_string(&policy).unwrap();
168            let back: ReactionRecoveryPolicy = serde_yaml::from_str(&yaml).unwrap();
169            assert_eq!(back, policy);
170        }
171    }
172}