1use std::fs;
24use std::path::{Path, PathBuf};
25
26use crate::error::{Result, WalError};
27
28pub fn fsync_directory(dir: &Path) -> Result<()> {
36 let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
37 dir_file.sync_all().map_err(WalError::Io)?;
38 Ok(())
39}
40
41pub const DEFAULT_SEGMENT_TARGET_SIZE: u64 = 64 * 1024 * 1024;
46
47const SEGMENT_EXTENSION: &str = "seg";
49
50const SEGMENT_PREFIX: &str = "wal-";
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct SegmentMeta {
56 pub path: PathBuf,
58
59 pub first_lsn: u64,
61
62 pub file_size: u64,
64}
65
66impl SegmentMeta {
67 pub fn dwb_path(&self) -> PathBuf {
69 self.path.with_extension("dwb")
70 }
71}
72
73impl Ord for SegmentMeta {
74 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
75 self.first_lsn.cmp(&other.first_lsn)
76 }
77}
78
79impl PartialOrd for SegmentMeta {
80 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
81 Some(self.cmp(other))
82 }
83}
84
85pub fn segment_filename(first_lsn: u64) -> String {
87 format!("{SEGMENT_PREFIX}{first_lsn:020}.{SEGMENT_EXTENSION}")
88}
89
90pub fn segment_path(wal_dir: &Path, first_lsn: u64) -> PathBuf {
92 wal_dir.join(segment_filename(first_lsn))
93}
94
95fn parse_segment_filename(filename: &str) -> Option<u64> {
99 let stem = filename.strip_prefix(SEGMENT_PREFIX)?;
100 let lsn_str = stem.strip_suffix(&format!(".{SEGMENT_EXTENSION}"))?;
101 lsn_str.parse::<u64>().ok()
102}
103
104pub fn discover_segments(wal_dir: &Path) -> Result<Vec<SegmentMeta>> {
108 if !wal_dir.exists() {
109 return Ok(Vec::new());
110 }
111
112 let entries = fs::read_dir(wal_dir).map_err(WalError::Io)?;
113 let mut segments = Vec::new();
114
115 for entry in entries {
116 let entry = entry.map_err(WalError::Io)?;
117 let file_name = entry.file_name();
118 let name = file_name.to_string_lossy();
119
120 if let Some(first_lsn) = parse_segment_filename(&name) {
121 let metadata = entry.metadata().map_err(WalError::Io)?;
122 segments.push(SegmentMeta {
123 path: entry.path(),
124 first_lsn,
125 file_size: metadata.len(),
126 });
127 }
128 }
129
130 segments.sort();
131 Ok(segments)
132}
133
134pub fn migrate_legacy_wal(legacy_path: &Path, wal_dir: &Path) -> Result<bool> {
145 if !legacy_path.is_file() {
147 return Ok(false);
148 }
149
150 let metadata = fs::metadata(legacy_path).map_err(WalError::Io)?;
152 if metadata.len() == 0 {
153 let _ = fs::remove_file(legacy_path);
155 return Ok(false);
156 }
157
158 let info = crate::recovery::recover(legacy_path)?;
160 let first_lsn = if info.record_count == 0 {
161 1 } else {
163 let mut reader = crate::reader::WalReader::open(legacy_path)?;
166 match reader.next_record()? {
167 Some(record) => record.header.lsn,
168 None => 1,
169 }
170 };
171
172 fs::create_dir_all(wal_dir).map_err(WalError::Io)?;
174
175 let new_path = segment_path(wal_dir, first_lsn);
177 fs::rename(legacy_path, &new_path).map_err(WalError::Io)?;
178
179 let legacy_dwb = legacy_path.with_extension("dwb");
181 if legacy_dwb.exists() {
182 let new_dwb = new_path.with_extension("dwb");
183 fs::rename(&legacy_dwb, &new_dwb).map_err(WalError::Io)?;
184 }
185
186 tracing::info!(
187 legacy = %legacy_path.display(),
188 segment = %new_path.display(),
189 first_lsn,
190 "migrated legacy WAL to segmented format"
191 );
192
193 Ok(true)
194}
195
196pub fn truncate_segments(
203 wal_dir: &Path,
204 checkpoint_lsn: u64,
205 active_segment_first_lsn: u64,
206) -> Result<TruncateResult> {
207 let segments = discover_segments(wal_dir)?;
208 let mut deleted_count = 0u64;
209 let mut bytes_reclaimed = 0u64;
210
211 for seg in &segments {
212 if seg.first_lsn == active_segment_first_lsn {
214 continue;
215 }
216
217 let next_first_lsn = segments
229 .iter()
230 .find(|s| s.first_lsn > seg.first_lsn)
231 .map(|s| s.first_lsn)
232 .unwrap_or(u64::MAX);
233
234 if next_first_lsn <= checkpoint_lsn {
237 bytes_reclaimed += seg.file_size;
238
239 fs::remove_file(&seg.path).map_err(WalError::Io)?;
241
242 let dwb_path = seg.dwb_path();
244 if dwb_path.exists() {
245 let _ = fs::remove_file(&dwb_path);
246 }
247
248 tracing::info!(
249 segment = %seg.path.display(),
250 first_lsn = seg.first_lsn,
251 "deleted WAL segment (checkpoint_lsn={})",
252 checkpoint_lsn
253 );
254
255 deleted_count += 1;
256 }
257 }
258
259 if deleted_count > 0 {
261 let _ = fsync_directory(wal_dir);
262 }
263
264 Ok(TruncateResult {
265 segments_deleted: deleted_count,
266 bytes_reclaimed,
267 })
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub struct TruncateResult {
273 pub segments_deleted: u64,
275
276 pub bytes_reclaimed: u64,
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn segment_filename_format() {
286 assert_eq!(segment_filename(1), "wal-00000000000000000001.seg");
287 assert_eq!(segment_filename(999), "wal-00000000000000000999.seg");
288 assert_eq!(segment_filename(u64::MAX), "wal-18446744073709551615.seg");
289 }
290
291 #[test]
292 fn parse_segment_filename_valid() {
293 assert_eq!(
294 parse_segment_filename("wal-00000000000000000001.seg"),
295 Some(1)
296 );
297 assert_eq!(
298 parse_segment_filename("wal-00000000000000000999.seg"),
299 Some(999)
300 );
301 }
302
303 #[test]
304 fn parse_segment_filename_invalid() {
305 assert_eq!(parse_segment_filename("wal.log"), None);
306 assert_eq!(parse_segment_filename("wal-abc.seg"), None);
307 assert_eq!(parse_segment_filename("other-00001.seg"), None);
308 assert_eq!(parse_segment_filename("wal-00001.dwb"), None);
309 }
310
311 #[test]
312 fn discover_empty_dir() {
313 let dir = tempfile::tempdir().unwrap();
314 let segments = discover_segments(dir.path()).unwrap();
315 assert!(segments.is_empty());
316 }
317
318 #[test]
319 fn discover_nonexistent_dir() {
320 let segments = discover_segments(Path::new("/nonexistent/wal/dir")).unwrap();
321 assert!(segments.is_empty());
322 }
323
324 #[test]
325 fn discover_segments_sorted() {
326 let dir = tempfile::tempdir().unwrap();
327
328 fs::write(dir.path().join("wal-00000000000000000050.seg"), b"seg3").unwrap();
330 fs::write(dir.path().join("wal-00000000000000000001.seg"), b"seg1").unwrap();
331 fs::write(dir.path().join("wal-00000000000000000025.seg"), b"seg2").unwrap();
332 fs::write(dir.path().join("wal-00000000000000000001.dwb"), b"dwb").unwrap();
334 fs::write(dir.path().join("metadata.json"), b"{}").unwrap();
335
336 let segments = discover_segments(dir.path()).unwrap();
337 assert_eq!(segments.len(), 3);
338 assert_eq!(segments[0].first_lsn, 1);
339 assert_eq!(segments[1].first_lsn, 25);
340 assert_eq!(segments[2].first_lsn, 50);
341 }
342
343 #[test]
344 fn truncate_deletes_old_segments() {
345 let dir = tempfile::tempdir().unwrap();
346
347 fs::write(dir.path().join("wal-00000000000000000001.seg"), b"data1").unwrap();
349 fs::write(dir.path().join("wal-00000000000000000001.dwb"), b"dwb1").unwrap();
350 fs::write(dir.path().join("wal-00000000000000000050.seg"), b"data2").unwrap();
351 fs::write(dir.path().join("wal-00000000000000000100.seg"), b"data3").unwrap();
352
353 let result = truncate_segments(dir.path(), 100, 100).unwrap();
356 assert_eq!(result.segments_deleted, 2);
357
358 let remaining = discover_segments(dir.path()).unwrap();
360 assert_eq!(remaining.len(), 1);
361 assert_eq!(remaining[0].first_lsn, 100);
362
363 assert!(!dir.path().join("wal-00000000000000000001.dwb").exists());
365 }
366
367 #[test]
368 fn truncate_never_deletes_active_segment() {
369 let dir = tempfile::tempdir().unwrap();
370
371 fs::write(dir.path().join("wal-00000000000000000001.seg"), b"data").unwrap();
373
374 let result = truncate_segments(dir.path(), 999, 1).unwrap();
375 assert_eq!(result.segments_deleted, 0);
376
377 let remaining = discover_segments(dir.path()).unwrap();
378 assert_eq!(remaining.len(), 1);
379 }
380
381 #[test]
382 fn truncate_no_segments_below_checkpoint() {
383 let dir = tempfile::tempdir().unwrap();
384
385 fs::write(dir.path().join("wal-00000000000000000100.seg"), b"data").unwrap();
386 fs::write(dir.path().join("wal-00000000000000000200.seg"), b"data").unwrap();
387
388 let result = truncate_segments(dir.path(), 50, 200).unwrap();
391 assert_eq!(result.segments_deleted, 0);
392 }
393
394 #[test]
395 fn migrate_legacy_wal() {
396 let dir = tempfile::tempdir().unwrap();
397 let legacy_path = dir.path().join("test.wal");
398 let wal_dir = dir.path().join("wal_segments");
399
400 {
402 let mut writer =
403 crate::writer::WalWriter::open_without_direct_io(&legacy_path).unwrap();
404 writer
405 .append(crate::record::RecordType::Put as u16, 1, 0, b"hello")
406 .unwrap();
407 writer
408 .append(crate::record::RecordType::Put as u16, 1, 0, b"world")
409 .unwrap();
410 writer.sync().unwrap();
411 }
412
413 let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
415 assert!(migrated);
416
417 assert!(!legacy_path.exists());
419
420 let segments = discover_segments(&wal_dir).unwrap();
422 assert_eq!(segments.len(), 1);
423 assert_eq!(segments[0].first_lsn, 1);
424
425 let reader = crate::reader::WalReader::open(&segments[0].path).unwrap();
427 let records: Vec<_> = reader.records().collect::<crate::Result<_>>().unwrap();
428 assert_eq!(records.len(), 2);
429 assert_eq!(records[0].payload, b"hello");
430 }
431
432 #[test]
433 fn migrate_nonexistent_legacy_is_noop() {
434 let dir = tempfile::tempdir().unwrap();
435 let legacy_path = dir.path().join("nonexistent.wal");
436 let wal_dir = dir.path().join("wal_segments");
437
438 let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
439 assert!(!migrated);
440 }
441
442 #[test]
443 fn migrate_empty_legacy_is_noop() {
444 let dir = tempfile::tempdir().unwrap();
445 let legacy_path = dir.path().join("empty.wal");
446 let wal_dir = dir.path().join("wal_segments");
447
448 fs::write(&legacy_path, b"").unwrap();
450
451 let migrated = super::migrate_legacy_wal(&legacy_path, &wal_dir).unwrap();
452 assert!(!migrated);
453 assert!(!legacy_path.exists()); }
455}