1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
//! Auto-restore from S3
//!
//! Handles restoring `SQLite` databases from S3 backups, including both snapshots
//! and WAL segments.
use super::s3_backend::S3Backend;
use crate::error::{LayerStorageError, Result};
use std::path::PathBuf;
use std::sync::Arc;
use tracing::{debug, info, warn};
/// Manager for database restoration from S3
pub struct RestoreManager {
/// Path to the local database file
db_path: PathBuf,
/// S3 backend for downloads
s3_backend: Arc<S3Backend>,
/// Temporary directory for restoration
temp_dir: PathBuf,
}
impl RestoreManager {
/// Create a new restore manager
///
/// # Arguments
///
/// * `db_path` - Path where the database should be restored
/// * `s3_backend` - S3 backend for downloading backups
/// * `temp_dir` - Temporary directory for intermediate files
pub fn new(db_path: PathBuf, s3_backend: Arc<S3Backend>, temp_dir: PathBuf) -> Self {
Self {
db_path,
s3_backend,
temp_dir,
}
}
/// Restore the database from S3
///
/// This will:
/// 1. Download the latest snapshot
/// 2. Download any WAL segments since the snapshot
/// 3. Apply WAL segments to reconstruct the database
///
/// # Returns
///
/// - `Ok(true)` if a backup was found and restored
/// - `Ok(false)` if no backup was found
/// - `Err(_)` if restoration failed
pub async fn restore(&self) -> Result<bool> {
info!("Starting database restoration from S3");
// Ensure temp directory exists
tokio::fs::create_dir_all(&self.temp_dir).await?;
// Download latest snapshot
let Some(snapshot_data) = self.s3_backend.download_latest_snapshot().await? else {
info!("No snapshot found in S3");
return Ok(false);
};
info!("Downloaded snapshot: {} bytes", snapshot_data.len());
// Write snapshot to temp file
let temp_db_path = self.temp_dir.join("restore.sqlite");
tokio::fs::write(&temp_db_path, &snapshot_data).await?;
// Get metadata to find WAL sequence at snapshot time
let metadata = self.s3_backend.get_metadata().await?;
let snapshot_wal_sequence = metadata.latest_wal_sequence.unwrap_or(0);
// Download WAL segments since snapshot
let wal_segments = self
.s3_backend
.download_wal_segments_since(snapshot_wal_sequence)
.await?;
if !wal_segments.is_empty() {
info!("Downloaded {} WAL segments to apply", wal_segments.len());
// Apply WAL segments
self.apply_wal_segments(&temp_db_path, wal_segments).await?;
}
// Ensure target directory exists
if let Some(parent) = self.db_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// Move restored database to target location
// First, remove any existing database files
if self.db_path.exists() {
tokio::fs::remove_file(&self.db_path).await?;
}
let wal_path = self.wal_path();
if wal_path.exists() {
tokio::fs::remove_file(&wal_path).await?;
}
let shm_path = self.shm_path();
if shm_path.exists() {
tokio::fs::remove_file(&shm_path).await?;
}
// Move temp database to final location
tokio::fs::rename(&temp_db_path, &self.db_path).await?;
// Clean up temp WAL files if any
let temp_wal = temp_db_path.with_extension("sqlite-wal");
if temp_wal.exists() {
let _ = tokio::fs::remove_file(&temp_wal).await;
}
let temp_shm = temp_db_path.with_extension("sqlite-shm");
if temp_shm.exists() {
let _ = tokio::fs::remove_file(&temp_shm).await;
}
info!(
"Database restored successfully to {}",
self.db_path.display()
);
Ok(true)
}
/// Apply WAL segments to a database
///
/// This uses `SQLite`'s ability to recover from WAL files by:
/// 1. Writing WAL data to the WAL file location
/// 2. Opening the database to trigger WAL recovery
async fn apply_wal_segments(
&self,
db_path: &std::path::Path,
segments: Vec<super::cache::CacheEntry>,
) -> Result<()> {
if segments.is_empty() {
return Ok(());
}
info!("Applying {} WAL segments", segments.len());
// For each WAL segment, we need to apply it using rusqlite
// The segments contain complete WAL files, so we use the last one
// (which should contain all the frames from previous segments plus new ones)
// Actually, in our implementation, each "segment" is a snapshot of the WAL file
// at a point in time. The latest segment should contain the most recent state.
// We take the segment with the highest sequence number.
if let Some(latest_segment) = segments.last() {
// Write the WAL file
let wal_path = format!("{}-wal", db_path.display());
tokio::fs::write(&wal_path, &latest_segment.data).await?;
debug!(
"Wrote WAL file ({} bytes) for recovery",
latest_segment.data.len()
);
// Open the database with rusqlite to trigger WAL recovery
// We need to do this in a blocking context since rusqlite is sync
let db_path_clone = db_path.to_path_buf();
tokio::task::spawn_blocking(move || {
let conn = rusqlite::Connection::open(&db_path_clone)
.map_err(|e| LayerStorageError::Database(e.to_string()))?;
// Force a checkpoint to apply WAL
conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
.map_err(|e| LayerStorageError::Database(e.to_string()))?;
// Verify database is intact
let integrity: String = conn
.query_row("PRAGMA integrity_check;", [], |row| row.get(0))
.map_err(|e| LayerStorageError::Database(e.to_string()))?;
if integrity != "ok" {
warn!("Database integrity check returned: {}", integrity);
}
Ok::<_, LayerStorageError>(())
})
.await
.map_err(|e| LayerStorageError::Io(std::io::Error::other(e)))??;
info!("WAL recovery completed");
}
Ok(())
}
/// Get the WAL file path
fn wal_path(&self) -> PathBuf {
let mut path = self.db_path.clone();
let filename = path.file_name().unwrap_or_default().to_string_lossy();
path.set_file_name(format!("{filename}-wal"));
path
}
/// Get the SHM file path
fn shm_path(&self) -> PathBuf {
let mut path = self.db_path.clone();
let filename = path.file_name().unwrap_or_default().to_string_lossy();
path.set_file_name(format!("{filename}-shm"));
path
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wal_path_derivation() {
// Test the path derivation logic directly without needing a full RestoreManager
let db_path = PathBuf::from("/var/lib/app/data.sqlite");
// Derive WAL path
let mut wal_path = db_path.clone();
let filename = wal_path.file_name().unwrap_or_default().to_string_lossy();
wal_path.set_file_name(format!("{filename}-wal"));
assert_eq!(wal_path, PathBuf::from("/var/lib/app/data.sqlite-wal"));
// Derive SHM path
let mut shm_path = db_path.clone();
let filename = shm_path.file_name().unwrap_or_default().to_string_lossy();
shm_path.set_file_name(format!("{filename}-shm"));
assert_eq!(shm_path, PathBuf::from("/var/lib/app/data.sqlite-shm"));
}
}