Skip to main content

sh_layer2/checkpoint_system/
recovery.rs

1//! # Crash Recovery
2//!
3//! 崩溃恢复工具,检测不正常关闭并恢复会话。
4
5use std::path::Path;
6
7use crate::types::{Layer2Result, SessionId};
8
9/// 崩溃恢复管理器
10pub struct CrashRecovery {
11    storage_path: std::path::PathBuf,
12}
13
14impl CrashRecovery {
15    /// 创建新的崩溃恢复管理器
16    pub fn new(storage_path: impl AsRef<Path>) -> Self {
17        Self {
18            storage_path: storage_path.as_ref().to_path_buf(),
19        }
20    }
21
22    /// 检测不正常关闭
23    ///
24    /// 返回崩溃信息(如果有)
25    pub fn detect_unclean_shutdown(&self) -> Layer2Result<Option<CrashInfo>> {
26        // 检查是否有活跃会话没有正常终止
27        if !self.storage_path.exists() {
28            return Ok(None);
29        }
30
31        for session_dir in std::fs::read_dir(&self.storage_path)? {
32            let session_dir = session_dir?;
33            if !session_dir.path().is_dir() {
34                continue;
35            }
36
37            let session_meta = session_dir.path().join("session_meta.json");
38            if !session_meta.exists() {
39                continue;
40            }
41
42            match std::fs::read_to_string(&session_meta) {
43                Ok(content) => {
44                    if let Ok(meta) = serde_json::from_str::<SessionMeta>(&content) {
45                        if meta.is_active && meta.termination_reason.is_none() {
46                            return Ok(Some(CrashInfo {
47                                session_id: Some(meta.session_id.clone()),
48                                last_activity: meta.last_updated,
49                                last_iteration: meta.last_iteration,
50                            }));
51                        }
52                    }
53                }
54                Err(_) => continue,
55            }
56        }
57
58        Ok(None)
59    }
60
61    /// 恢复会话
62    ///
63    /// 从最新的有效检查点恢复
64    pub fn recover_session(&self, session_id: &SessionId) -> Layer2Result<Option<RecoveryResult>> {
65        let session_dir = self
66            .storage_path
67            .join(session_id.to_string())
68            .join("checkpoints");
69
70        if !session_dir.exists() {
71            return Ok(None);
72        }
73
74        // 查找最新的有效检查点
75        let checkpoints = self.list_valid_checkpoints(&session_dir)?;
76
77        for checkpoint_path in checkpoints {
78            match std::fs::read_to_string(&checkpoint_path) {
79                Ok(content) => {
80                    if let Ok(data) = serde_json::from_str::<serde_json::Value>(&content) {
81                        // 验证校验和
82                        if super::ChecksumUtils::verify_checksum(&data).0 {
83                            return Ok(Some(RecoveryResult {
84                                checkpoint_path: Some(checkpoint_path),
85                                data: Some(data),
86                                recovered_from_backup: false,
87                            }));
88                        }
89                    }
90                }
91                Err(_) => continue,
92            }
93        }
94
95        // 尝试从备份恢复
96        self.recover_from_backup(&session_dir)
97    }
98
99    /// 列出有效的检查点
100    fn list_valid_checkpoints(&self, dir: &Path) -> Layer2Result<Vec<std::path::PathBuf>> {
101        let mut checkpoints = Vec::new();
102
103        for entry in std::fs::read_dir(dir)? {
104            let entry = entry?;
105            let path = entry.path();
106            if path.extension().map(|e| e == "json").unwrap_or(false) {
107                if let Some(filename) = path.file_name() {
108                    if filename.to_string_lossy().starts_with("cp_") {
109                        checkpoints.push(path);
110                    }
111                }
112            }
113        }
114
115        // 按修改时间排序(最新的在前)
116        checkpoints.sort_by(|a, b| {
117            let a_time = a
118                .metadata()
119                .and_then(|m| m.modified())
120                .unwrap_or(std::time::UNIX_EPOCH);
121            let b_time = b
122                .metadata()
123                .and_then(|m| m.modified())
124                .unwrap_or(std::time::UNIX_EPOCH);
125            b_time.cmp(&a_time)
126        });
127
128        Ok(checkpoints)
129    }
130
131    /// 从备份恢复
132    fn recover_from_backup(&self, dir: &Path) -> Layer2Result<Option<RecoveryResult>> {
133        let _backup_suffix = ".backup";
134
135        for entry in std::fs::read_dir(dir)? {
136            let entry = entry?;
137            let path = entry.path();
138
139            if path.extension().map(|e| e == "backup").unwrap_or(false) {
140                if let Ok(content) = std::fs::read_to_string(&path) {
141                    if let Ok(data) = serde_json::from_str::<serde_json::Value>(&content) {
142                        if super::ChecksumUtils::verify_checksum(&data).0 {
143                            return Ok(Some(RecoveryResult {
144                                checkpoint_path: Some(path),
145                                data: Some(data),
146                                recovered_from_backup: true,
147                            }));
148                        }
149                    }
150                }
151            }
152        }
153
154        Ok(None)
155    }
156
157    /// 标记会话为活跃
158    pub fn mark_session_active(&self, session_id: &SessionId) -> Layer2Result<()> {
159        let meta_path = self
160            .storage_path
161            .join(session_id.to_string())
162            .join("session_meta.json");
163
164        if meta_path.exists() {
165            let content = std::fs::read_to_string(&meta_path)?;
166            let mut meta: SessionMeta = serde_json::from_str(&content)?;
167            meta.is_active = true;
168            meta.termination_reason = None;
169
170            let json = serde_json::to_string_pretty(&meta)?;
171            std::fs::write(&meta_path, json)?;
172        }
173
174        Ok(())
175    }
176
177    /// 标记会话为终止
178    pub fn mark_session_terminated(
179        &self,
180        session_id: &SessionId,
181        reason: &str,
182    ) -> Layer2Result<()> {
183        let meta_path = self
184            .storage_path
185            .join(session_id.to_string())
186            .join("session_meta.json");
187
188        if meta_path.exists() {
189            let content = std::fs::read_to_string(&meta_path)?;
190            let mut meta: SessionMeta = serde_json::from_str(&content)?;
191            meta.is_active = false;
192            meta.termination_reason = Some(reason.to_string());
193
194            let json = serde_json::to_string_pretty(&meta)?;
195            std::fs::write(&meta_path, json)?;
196        }
197
198        Ok(())
199    }
200}
201
202/// 崩溃信息
203#[derive(Debug, Clone)]
204pub struct CrashInfo {
205    pub session_id: Option<SessionId>,
206    pub last_activity: chrono::DateTime<chrono::Utc>,
207    pub last_iteration: i32,
208}
209
210/// 恢复结果
211#[derive(Debug, Clone)]
212pub struct RecoveryResult {
213    pub checkpoint_path: Option<std::path::PathBuf>,
214    pub data: Option<serde_json::Value>,
215    pub recovered_from_backup: bool,
216}
217
218/// 会话元数据(用于崩溃检测)
219#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
220struct SessionMeta {
221    session_id: SessionId,
222    is_active: bool,
223    termination_reason: Option<String>,
224    last_updated: chrono::DateTime<chrono::Utc>,
225    last_iteration: i32,
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use tempfile::TempDir;
232
233    #[test]
234    fn test_crash_recovery_creation() {
235        let temp_dir = TempDir::new().unwrap();
236        let recovery = CrashRecovery::new(temp_dir.path());
237
238        let result = recovery.detect_unclean_shutdown().unwrap();
239        assert!(result.is_none());
240    }
241}