sh_layer2/checkpoint_system/
recovery.rs1use std::path::Path;
6
7use crate::types::{Layer2Result, SessionId};
8
9pub struct CrashRecovery {
11 storage_path: std::path::PathBuf,
12}
13
14impl CrashRecovery {
15 pub fn new(storage_path: impl AsRef<Path>) -> Self {
17 Self {
18 storage_path: storage_path.as_ref().to_path_buf(),
19 }
20 }
21
22 pub fn detect_unclean_shutdown(&self) -> Layer2Result<Option<CrashInfo>> {
26 if !self.storage_path.exists() {
28 return Ok(None);
29 }
30
31 for session_dir in std::fs::read_dir(&self.storage_path)? {
32 let session_dir = session_dir?;
33 if !session_dir.path().is_dir() {
34 continue;
35 }
36
37 let session_meta = session_dir.path().join("session_meta.json");
38 if !session_meta.exists() {
39 continue;
40 }
41
42 match std::fs::read_to_string(&session_meta) {
43 Ok(content) => {
44 if let Ok(meta) = serde_json::from_str::<SessionMeta>(&content) {
45 if meta.is_active && meta.termination_reason.is_none() {
46 return Ok(Some(CrashInfo {
47 session_id: Some(meta.session_id.clone()),
48 last_activity: meta.last_updated,
49 last_iteration: meta.last_iteration,
50 }));
51 }
52 }
53 }
54 Err(_) => continue,
55 }
56 }
57
58 Ok(None)
59 }
60
61 pub fn recover_session(&self, session_id: &SessionId) -> Layer2Result<Option<RecoveryResult>> {
65 let session_dir = self
66 .storage_path
67 .join(session_id.to_string())
68 .join("checkpoints");
69
70 if !session_dir.exists() {
71 return Ok(None);
72 }
73
74 let checkpoints = self.list_valid_checkpoints(&session_dir)?;
76
77 for checkpoint_path in checkpoints {
78 match std::fs::read_to_string(&checkpoint_path) {
79 Ok(content) => {
80 if let Ok(data) = serde_json::from_str::<serde_json::Value>(&content) {
81 if super::ChecksumUtils::verify_checksum(&data).0 {
83 return Ok(Some(RecoveryResult {
84 checkpoint_path: Some(checkpoint_path),
85 data: Some(data),
86 recovered_from_backup: false,
87 }));
88 }
89 }
90 }
91 Err(_) => continue,
92 }
93 }
94
95 self.recover_from_backup(&session_dir)
97 }
98
99 fn list_valid_checkpoints(&self, dir: &Path) -> Layer2Result<Vec<std::path::PathBuf>> {
101 let mut checkpoints = Vec::new();
102
103 for entry in std::fs::read_dir(dir)? {
104 let entry = entry?;
105 let path = entry.path();
106 if path.extension().map(|e| e == "json").unwrap_or(false) {
107 if let Some(filename) = path.file_name() {
108 if filename.to_string_lossy().starts_with("cp_") {
109 checkpoints.push(path);
110 }
111 }
112 }
113 }
114
115 checkpoints.sort_by(|a, b| {
117 let a_time = a
118 .metadata()
119 .and_then(|m| m.modified())
120 .unwrap_or(std::time::UNIX_EPOCH);
121 let b_time = b
122 .metadata()
123 .and_then(|m| m.modified())
124 .unwrap_or(std::time::UNIX_EPOCH);
125 b_time.cmp(&a_time)
126 });
127
128 Ok(checkpoints)
129 }
130
131 fn recover_from_backup(&self, dir: &Path) -> Layer2Result<Option<RecoveryResult>> {
133 let _backup_suffix = ".backup";
134
135 for entry in std::fs::read_dir(dir)? {
136 let entry = entry?;
137 let path = entry.path();
138
139 if path.extension().map(|e| e == "backup").unwrap_or(false) {
140 if let Ok(content) = std::fs::read_to_string(&path) {
141 if let Ok(data) = serde_json::from_str::<serde_json::Value>(&content) {
142 if super::ChecksumUtils::verify_checksum(&data).0 {
143 return Ok(Some(RecoveryResult {
144 checkpoint_path: Some(path),
145 data: Some(data),
146 recovered_from_backup: true,
147 }));
148 }
149 }
150 }
151 }
152 }
153
154 Ok(None)
155 }
156
157 pub fn mark_session_active(&self, session_id: &SessionId) -> Layer2Result<()> {
159 let meta_path = self
160 .storage_path
161 .join(session_id.to_string())
162 .join("session_meta.json");
163
164 if meta_path.exists() {
165 let content = std::fs::read_to_string(&meta_path)?;
166 let mut meta: SessionMeta = serde_json::from_str(&content)?;
167 meta.is_active = true;
168 meta.termination_reason = None;
169
170 let json = serde_json::to_string_pretty(&meta)?;
171 std::fs::write(&meta_path, json)?;
172 }
173
174 Ok(())
175 }
176
177 pub fn mark_session_terminated(
179 &self,
180 session_id: &SessionId,
181 reason: &str,
182 ) -> Layer2Result<()> {
183 let meta_path = self
184 .storage_path
185 .join(session_id.to_string())
186 .join("session_meta.json");
187
188 if meta_path.exists() {
189 let content = std::fs::read_to_string(&meta_path)?;
190 let mut meta: SessionMeta = serde_json::from_str(&content)?;
191 meta.is_active = false;
192 meta.termination_reason = Some(reason.to_string());
193
194 let json = serde_json::to_string_pretty(&meta)?;
195 std::fs::write(&meta_path, json)?;
196 }
197
198 Ok(())
199 }
200}
201
202#[derive(Debug, Clone)]
204pub struct CrashInfo {
205 pub session_id: Option<SessionId>,
206 pub last_activity: chrono::DateTime<chrono::Utc>,
207 pub last_iteration: i32,
208}
209
210#[derive(Debug, Clone)]
212pub struct RecoveryResult {
213 pub checkpoint_path: Option<std::path::PathBuf>,
214 pub data: Option<serde_json::Value>,
215 pub recovered_from_backup: bool,
216}
217
218#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
220struct SessionMeta {
221 session_id: SessionId,
222 is_active: bool,
223 termination_reason: Option<String>,
224 last_updated: chrono::DateTime<chrono::Utc>,
225 last_iteration: i32,
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use tempfile::TempDir;
232
233 #[test]
234 fn test_crash_recovery_creation() {
235 let temp_dir = TempDir::new().unwrap();
236 let recovery = CrashRecovery::new(temp_dir.path());
237
238 let result = recovery.detect_unclean_shutdown().unwrap();
239 assert!(result.is_none());
240 }
241}