rustlite_wal/
segment.rs

1// WAL segment management - handles log rotation, cleanup, and segment metadata
2//
3// Segments are named: wal-{sequence:016x}.log
4// Where sequence is a monotonically increasing hex number
5
6use rustlite_core::{Error, Result};
7use std::fs;
8use std::path::{Path, PathBuf};
9
10/// Manages WAL segment files
11pub struct SegmentManager {
12    wal_dir: PathBuf,
13}
14
15/// Information about a WAL segment file
16#[derive(Debug, Clone)]
17pub struct SegmentInfo {
18    /// Path to the segment file
19    pub path: PathBuf,
20    /// Sequence number extracted from filename
21    pub sequence: u64,
22    /// File size in bytes
23    pub size: u64,
24}
25
26impl SegmentManager {
27    /// Create a new segment manager for the given WAL directory
28    pub fn new(wal_dir: PathBuf) -> Self {
29        Self { wal_dir }
30    }
31
32    /// List all segment files in order
33    pub fn list_segments(&self) -> Result<Vec<SegmentInfo>> {
34        if !self.wal_dir.exists() {
35            return Ok(Vec::new());
36        }
37
38        let mut segments: Vec<SegmentInfo> = fs::read_dir(&self.wal_dir)
39            .map_err(|e| Error::Storage(format!("Failed to read WAL directory: {}", e)))?
40            .filter_map(|entry| entry.ok())
41            .filter_map(|entry| self.parse_segment_info(&entry.path()))
42            .collect();
43
44        // Sort by sequence number
45        segments.sort_by_key(|s| s.sequence);
46
47        Ok(segments)
48    }
49
50    /// Parse segment info from a file path
51    fn parse_segment_info(&self, path: &Path) -> Option<SegmentInfo> {
52        let name = path.file_name()?.to_str()?;
53
54        // Must match pattern: wal-{hex}.log
55        if !name.starts_with("wal-") || !name.ends_with(".log") {
56            return None;
57        }
58
59        let seq_str = name.strip_prefix("wal-")?.strip_suffix(".log")?;
60        let sequence = u64::from_str_radix(seq_str, 16).ok()?;
61
62        let size = fs::metadata(path).ok()?.len();
63
64        Some(SegmentInfo {
65            path: path.to_path_buf(),
66            sequence,
67            size,
68        })
69    }
70
71    /// Get the total size of all segments
72    pub fn total_size(&self) -> Result<u64> {
73        let segments = self.list_segments()?;
74        Ok(segments.iter().map(|s| s.size).sum())
75    }
76
77    /// Get the number of segment files
78    pub fn segment_count(&self) -> Result<usize> {
79        Ok(self.list_segments()?.len())
80    }
81
82    /// Delete segments older than the given sequence number
83    ///
84    /// This is useful after a checkpoint to reclaim disk space.
85    /// Returns the number of segments deleted.
86    pub fn cleanup_before(&self, sequence: u64) -> Result<usize> {
87        let segments = self.list_segments()?;
88        let mut deleted = 0;
89
90        for segment in segments {
91            if segment.sequence < sequence {
92                fs::remove_file(&segment.path).map_err(|e| {
93                    Error::Storage(format!(
94                        "Failed to delete segment {:?}: {}",
95                        segment.path, e
96                    ))
97                })?;
98                deleted += 1;
99            }
100        }
101
102        Ok(deleted)
103    }
104
105    /// Delete all segment files
106    ///
107    /// Use with caution - this removes all WAL data!
108    pub fn cleanup_all(&self) -> Result<usize> {
109        let segments = self.list_segments()?;
110        let count = segments.len();
111
112        for segment in segments {
113            fs::remove_file(&segment.path).map_err(|e| {
114                Error::Storage(format!(
115                    "Failed to delete segment {:?}: {}",
116                    segment.path, e
117                ))
118            })?;
119        }
120
121        Ok(count)
122    }
123
124    /// Get the latest (highest sequence) segment
125    pub fn latest_segment(&self) -> Result<Option<SegmentInfo>> {
126        let segments = self.list_segments()?;
127        Ok(segments.into_iter().last())
128    }
129
130    /// Get the oldest (lowest sequence) segment
131    pub fn oldest_segment(&self) -> Result<Option<SegmentInfo>> {
132        let segments = self.list_segments()?;
133        Ok(segments.into_iter().next())
134    }
135
136    /// Check if the WAL directory exists and is accessible
137    pub fn is_available(&self) -> bool {
138        self.wal_dir.exists() && self.wal_dir.is_dir()
139    }
140
141    /// Create the WAL directory if it doesn't exist
142    pub fn ensure_dir(&self) -> Result<()> {
143        fs::create_dir_all(&self.wal_dir)
144            .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::{SyncMode, WalRecord, WalWriter};
152    use tempfile::TempDir;
153
154    fn setup_test_wal() -> (TempDir, PathBuf) {
155        let temp_dir = TempDir::new().expect("Failed to create temp dir");
156        let wal_path = temp_dir.path().join("wal");
157        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
158        (temp_dir, wal_path)
159    }
160
161    #[test]
162    fn test_empty_directory() {
163        let (_temp_dir, wal_path) = setup_test_wal();
164
165        let manager = SegmentManager::new(wal_path);
166        let segments = manager.list_segments().expect("Failed to list segments");
167
168        assert!(segments.is_empty());
169        assert_eq!(manager.segment_count().unwrap(), 0);
170        assert_eq!(manager.total_size().unwrap(), 0);
171    }
172
173    #[test]
174    fn test_list_segments() {
175        let (_temp_dir, wal_path) = setup_test_wal();
176
177        // Create some segments by writing and rotating
178        {
179            let mut writer =
180                WalWriter::new(&wal_path, 50, SyncMode::Sync).expect("Failed to create writer");
181
182            for i in 0..10 {
183                writer
184                    .append(WalRecord::put(
185                        format!("key{}", i).into_bytes(),
186                        format!("value{}", i).into_bytes(),
187                    ))
188                    .expect("Failed to append");
189            }
190        }
191
192        let manager = SegmentManager::new(wal_path);
193        let segments = manager.list_segments().expect("Failed to list segments");
194
195        assert!(!segments.is_empty());
196        // Verify segments are sorted by sequence
197        for i in 1..segments.len() {
198            assert!(segments[i].sequence > segments[i - 1].sequence);
199        }
200    }
201
202    #[test]
203    fn test_total_size() {
204        let (_temp_dir, wal_path) = setup_test_wal();
205
206        {
207            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
208                .expect("Failed to create writer");
209
210            for i in 0..5 {
211                writer
212                    .append(WalRecord::put(
213                        format!("key{}", i).into_bytes(),
214                        format!("value{}", i).into_bytes(),
215                    ))
216                    .expect("Failed to append");
217            }
218        }
219
220        let manager = SegmentManager::new(wal_path);
221        let total_size = manager.total_size().expect("Failed to get total size");
222
223        assert!(total_size > 0);
224    }
225
226    #[test]
227    fn test_cleanup_all() {
228        let (_temp_dir, wal_path) = setup_test_wal();
229
230        {
231            let mut writer =
232                WalWriter::new(&wal_path, 50, SyncMode::Sync).expect("Failed to create writer");
233
234            for i in 0..10 {
235                writer
236                    .append(WalRecord::put(
237                        format!("key{}", i).into_bytes(),
238                        format!("val{}", i).into_bytes(),
239                    ))
240                    .expect("Failed to append");
241            }
242        }
243
244        let manager = SegmentManager::new(wal_path);
245
246        let initial_count = manager.segment_count().unwrap();
247        assert!(initial_count > 0);
248
249        let deleted = manager.cleanup_all().expect("Failed to cleanup");
250        assert_eq!(deleted, initial_count);
251
252        assert_eq!(manager.segment_count().unwrap(), 0);
253    }
254
255    #[test]
256    fn test_latest_and_oldest() {
257        let (_temp_dir, wal_path) = setup_test_wal();
258
259        {
260            let mut writer =
261                WalWriter::new(&wal_path, 50, SyncMode::Sync).expect("Failed to create writer");
262
263            for i in 0..10 {
264                writer
265                    .append(WalRecord::put(
266                        format!("key{}", i).into_bytes(),
267                        format!("val{}", i).into_bytes(),
268                    ))
269                    .expect("Failed to append");
270            }
271        }
272
273        let manager = SegmentManager::new(wal_path);
274
275        let oldest = manager
276            .oldest_segment()
277            .unwrap()
278            .expect("Should have oldest");
279        let latest = manager
280            .latest_segment()
281            .unwrap()
282            .expect("Should have latest");
283
284        assert!(oldest.sequence <= latest.sequence);
285    }
286}