drasi_lib/recovery.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Recovery policy and error types for checkpoint-based recovery.
16//!
17//! See the design doc at
18//! <https://github.com/drasi-project/design-documents/tree/main/drasi-lib/Source-Checkpoints>
19//! (doc 03 §4 — Recovery Policies) for the semantic rationale.
20
21use serde::{Deserialize, Serialize};
22
23/// Behavior when a source cannot honor a requested resume position.
24///
25/// Configured per-query via `QueryConfig::recovery_policy`. When the field is
26/// `None`, a future global default applies (which itself defaults to `Strict`).
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum RecoveryPolicy {
30 /// Fail startup if the source cannot honor the requested resume position
31 /// (e.g., WAL pruned past the checkpoint, replication slot invalidated).
32 /// Requires manual intervention. Favors correctness over availability.
33 #[default]
34 Strict,
35 /// Automatically wipe the query's persistent index and perform a full
36 /// re-bootstrap on gap detection. Favors availability over consistency.
37 AutoReset,
38}
39
40/// Errors specific to checkpoint-based recovery.
41#[derive(Debug, thiserror::Error)]
42pub enum RecoveryError {
43 /// A persistent query requires all of its sources to support positional
44 /// replay, but one or more do not (e.g., transient HTTP source with WAL
45 /// disabled).
46 #[error("Incompatible source '{source_id}' for persistent query '{query_id}': {reason}")]
47 IncompatibleSource {
48 query_id: String,
49 source_id: String,
50 reason: String,
51 },
52
53 /// The `AutoReset` recovery policy fired: the query's index has been
54 /// wiped and a full bootstrap will follow.
55 #[error("Auto-reset triggered for query '{query_id}': {reason}")]
56 AutoResetTriggered { query_id: String, reason: String },
57}
58
59#[cfg(test)]
60mod tests {
61 use super::*;
62
63 #[test]
64 fn default_is_strict() {
65 assert_eq!(RecoveryPolicy::default(), RecoveryPolicy::Strict);
66 }
67
68 #[test]
69 fn json_round_trip_strict() {
70 let json = serde_json::to_string(&RecoveryPolicy::Strict).unwrap();
71 assert_eq!(json, "\"strict\"");
72 let back: RecoveryPolicy = serde_json::from_str(&json).unwrap();
73 assert_eq!(back, RecoveryPolicy::Strict);
74 }
75
76 #[test]
77 fn json_round_trip_auto_reset() {
78 let json = serde_json::to_string(&RecoveryPolicy::AutoReset).unwrap();
79 assert_eq!(json, "\"auto_reset\"");
80 let back: RecoveryPolicy = serde_json::from_str(&json).unwrap();
81 assert_eq!(back, RecoveryPolicy::AutoReset);
82 }
83
84 #[test]
85 fn yaml_round_trip() {
86 for policy in [RecoveryPolicy::Strict, RecoveryPolicy::AutoReset] {
87 let yaml = serde_yaml::to_string(&policy).unwrap();
88 let back: RecoveryPolicy = serde_yaml::from_str(&yaml).unwrap();
89 assert_eq!(back, policy);
90 }
91 }
92
93 #[test]
94 fn incompatible_source_message() {
95 let err = RecoveryError::IncompatibleSource {
96 query_id: "q1".into(),
97 source_id: "s1".into(),
98 reason: "no WAL".into(),
99 };
100 let msg = err.to_string();
101 assert!(msg.contains("q1"));
102 assert!(msg.contains("s1"));
103 assert!(msg.contains("no WAL"));
104 }
105
106 #[test]
107 fn auto_reset_message() {
108 let err = RecoveryError::AutoResetTriggered {
109 query_id: "q1".into(),
110 reason: "gap".into(),
111 };
112 let msg = err.to_string();
113 assert!(msg.contains("q1"));
114 assert!(msg.contains("gap"));
115 }
116}