Skip to main content

zlayer_storage/replicator/
wal_monitor.rs

1//! WAL file change detection
2//!
3//! Monitors `SQLite` WAL files for changes and parses WAL headers to track frame counts.
4
5use crate::error::{LayerStorageError, Result};
6use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::trace;
12
13/// WAL file change event
14#[derive(Debug, Clone)]
15pub struct WalEvent {
16    /// Current frame count in the WAL
17    pub frame_count: u64,
18    /// Size of the WAL file in bytes
19    pub file_size: u64,
20    /// Whether this is a checkpoint event
21    pub is_checkpoint: bool,
22}
23
24/// `SQLite` WAL header structure (first 32 bytes)
25///
26/// See: <https://www.sqlite.org/fileformat2.html#walformat>
27#[repr(C)]
28#[derive(Debug, Clone, Copy)]
29struct WalHeader {
30    /// Magic number: 0x377f0682 or 0x377f0683
31    magic: u32,
32    /// Format version (currently 3007000)
33    format_version: u32,
34    /// Database page size
35    page_size: u32,
36    /// Checkpoint sequence number
37    checkpoint_seq: u32,
38    /// Salt-1 (random value)
39    salt1: u32,
40    /// Salt-2 (random value)
41    salt2: u32,
42    /// Checksum-1
43    checksum1: u32,
44    /// Checksum-2
45    checksum2: u32,
46}
47
48impl WalHeader {
49    /// Parse WAL header from bytes
50    fn from_bytes(bytes: &[u8]) -> Option<Self> {
51        if bytes.len() < 32 {
52            return None;
53        }
54
55        // WAL header is big-endian
56        let magic = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
57
58        // Validate magic number
59        if magic != 0x377f_0682 && magic != 0x377f_0683 {
60            return None;
61        }
62
63        Some(Self {
64            magic,
65            format_version: u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
66            page_size: u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
67            checkpoint_seq: u32::from_be_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]),
68            salt1: u32::from_be_bytes([bytes[16], bytes[17], bytes[18], bytes[19]]),
69            salt2: u32::from_be_bytes([bytes[20], bytes[21], bytes[22], bytes[23]]),
70            checksum1: u32::from_be_bytes([bytes[24], bytes[25], bytes[26], bytes[27]]),
71            checksum2: u32::from_be_bytes([bytes[28], bytes[29], bytes[30], bytes[31]]),
72        })
73    }
74}
75
76/// WAL frame header (24 bytes per frame)
77///
78/// Reserved for future frame-level parsing
79#[repr(C)]
80#[derive(Debug, Clone, Copy)]
81#[allow(dead_code)]
82struct WalFrameHeader {
83    /// Page number
84    page_number: u32,
85    /// Database size in pages (for commit frames) or 0
86    db_size: u32,
87    /// Salt-1 (must match WAL header)
88    salt1: u32,
89    /// Salt-2 (must match WAL header)
90    salt2: u32,
91    /// Checksum-1
92    checksum1: u32,
93    /// Checksum-2
94    checksum2: u32,
95}
96
97/// Monitors a `SQLite` WAL file for changes
98pub struct WalMonitor {
99    /// Path to the WAL file
100    wal_path: PathBuf,
101    /// Last known frame count
102    last_frame_count: Arc<AtomicU64>,
103    /// Last known file size
104    last_file_size: Arc<AtomicU64>,
105    /// File watcher
106    watcher: RecommendedWatcher,
107    /// Event receiver
108    event_rx: mpsc::Receiver<notify::Result<Event>>,
109}
110
111impl WalMonitor {
112    /// Create a new WAL monitor
113    ///
114    /// # Arguments
115    ///
116    /// * `wal_path` - Path to the `SQLite` WAL file
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the file watcher cannot be initialized.
121    pub fn new(wal_path: PathBuf) -> Result<Self> {
122        let (tx, rx) = mpsc::channel(100);
123
124        // Create watcher with a small debounce
125        let watcher = notify::recommended_watcher(move |res| {
126            let _ = tx.blocking_send(res);
127        })
128        .map_err(|e| LayerStorageError::Io(std::io::Error::other(format!("Watcher error: {e}"))))?;
129
130        // Get initial state if file exists
131        let (initial_size, initial_frames) = if wal_path.exists() {
132            let metadata = std::fs::metadata(&wal_path)?;
133            let size = metadata.len();
134            let frames = Self::count_frames_from_file(&wal_path).unwrap_or(0);
135            (size, frames)
136        } else {
137            (0, 0)
138        };
139
140        let mut monitor = Self {
141            wal_path,
142            last_frame_count: Arc::new(AtomicU64::new(initial_frames)),
143            last_file_size: Arc::new(AtomicU64::new(initial_size)),
144            watcher,
145            event_rx: rx,
146        };
147
148        // Start watching the parent directory
149        monitor.start_watching()?;
150
151        Ok(monitor)
152    }
153
154    /// Start watching the WAL file
155    fn start_watching(&mut self) -> Result<()> {
156        // Watch the parent directory since the WAL file may be created/deleted
157        if let Some(parent) = self.wal_path.parent() {
158            if parent.exists() {
159                // We need to re-create the watcher since we can't modify it after creation
160                // The watcher is already watching from the constructor
161                let (tx, rx) = mpsc::channel(100);
162                let wal_path = self.wal_path.clone();
163
164                let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
165                    // Filter to only WAL file events
166                    if let Ok(event) = &res {
167                        let is_wal_event = event.paths.iter().any(|p| {
168                            p.file_name()
169                                .is_some_and(|n| n.to_string_lossy().ends_with("-wal"))
170                                || p == &wal_path
171                        });
172                        if is_wal_event {
173                            let _ = tx.blocking_send(res);
174                        }
175                    }
176                })
177                .map_err(|e| {
178                    LayerStorageError::Io(std::io::Error::other(format!("Watcher error: {e}")))
179                })?;
180
181                watcher
182                    .watch(parent, RecursiveMode::NonRecursive)
183                    .map_err(|e| {
184                        LayerStorageError::Io(std::io::Error::other(format!("Watch error: {e}")))
185                    })?;
186
187                self.watcher = watcher;
188                self.event_rx = rx;
189            }
190        }
191        Ok(())
192    }
193
194    /// Check for WAL changes
195    ///
196    /// Returns `Some(WalEvent)` if changes were detected, `None` otherwise.
197    pub async fn check_for_changes(&self) -> Result<Option<WalEvent>> {
198        // Check if file exists and get current state
199        if !self.wal_path.exists() {
200            // WAL might have been checkpointed away
201            let last_size = self.last_file_size.load(Ordering::SeqCst);
202            if last_size > 0 {
203                // WAL was deleted (checkpoint completed)
204                self.last_file_size.store(0, Ordering::SeqCst);
205                self.last_frame_count.store(0, Ordering::SeqCst);
206                return Ok(Some(WalEvent {
207                    frame_count: 0,
208                    file_size: 0,
209                    is_checkpoint: true,
210                }));
211            }
212            return Ok(None);
213        }
214
215        // Get current file size
216        let metadata = tokio::fs::metadata(&self.wal_path).await?;
217        let current_size = metadata.len();
218        let last_size = self.last_file_size.load(Ordering::SeqCst);
219
220        if current_size == last_size {
221            return Ok(None);
222        }
223
224        // Size changed, count frames
225        let frame_count = Self::count_frames_from_file(&self.wal_path).unwrap_or(0);
226        let last_frames = self.last_frame_count.load(Ordering::SeqCst);
227
228        // Update state
229        self.last_file_size.store(current_size, Ordering::SeqCst);
230        self.last_frame_count.store(frame_count, Ordering::SeqCst);
231
232        // If size decreased, it's likely a checkpoint
233        let is_checkpoint = current_size < last_size;
234
235        if frame_count != last_frames || is_checkpoint {
236            trace!(
237                "WAL change: {} -> {} frames, {} -> {} bytes",
238                last_frames,
239                frame_count,
240                last_size,
241                current_size
242            );
243
244            Ok(Some(WalEvent {
245                frame_count,
246                file_size: current_size,
247                is_checkpoint,
248            }))
249        } else {
250            Ok(None)
251        }
252    }
253
254    /// Count frames in a WAL file
255    fn count_frames_from_file(path: &PathBuf) -> Option<u64> {
256        let data = std::fs::read(path).ok()?;
257        Self::count_frames(&data)
258    }
259
260    /// Count frames in WAL data
261    fn count_frames(data: &[u8]) -> Option<u64> {
262        if data.len() < 32 {
263            return Some(0);
264        }
265
266        let header = WalHeader::from_bytes(data)?;
267        let page_size = header.page_size as usize;
268
269        if page_size == 0 {
270            return Some(0);
271        }
272
273        // Frame size = 24 byte header + page_size
274        let frame_size = 24 + page_size;
275
276        // Calculate number of frames
277        let data_after_header = data.len().saturating_sub(32);
278        let frame_count = data_after_header / frame_size;
279
280        Some(frame_count as u64)
281    }
282
283    /// Get the current frame count
284    #[allow(dead_code)]
285    pub fn frame_count(&self) -> u64 {
286        self.last_frame_count.load(Ordering::SeqCst)
287    }
288
289    /// Get the current WAL file size
290    #[allow(dead_code)]
291    pub fn file_size(&self) -> u64 {
292        self.last_file_size.load(Ordering::SeqCst)
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299
300    #[test]
301    fn test_wal_header_parsing() {
302        // Valid WAL header (big-endian)
303        let mut header_bytes = vec![0u8; 32];
304        // Magic number 0x377f0682
305        header_bytes[0..4].copy_from_slice(&0x377f_0682_u32.to_be_bytes());
306        // Format version 3007000
307        header_bytes[4..8].copy_from_slice(&3_007_000_u32.to_be_bytes());
308        // Page size 4096
309        header_bytes[8..12].copy_from_slice(&4096_u32.to_be_bytes());
310
311        let header = WalHeader::from_bytes(&header_bytes).unwrap();
312        assert_eq!(header.magic, 0x377f_0682);
313        assert_eq!(header.format_version, 3_007_000);
314        assert_eq!(header.page_size, 4096);
315    }
316
317    #[test]
318    fn test_invalid_wal_header() {
319        // Invalid magic number
320        let header_bytes = vec![0u8; 32];
321        assert!(WalHeader::from_bytes(&header_bytes).is_none());
322
323        // Too short
324        let short_bytes = vec![0u8; 16];
325        assert!(WalHeader::from_bytes(&short_bytes).is_none());
326    }
327
328    #[test]
329    fn test_frame_counting() {
330        // Create a valid WAL with header and 2 frames
331        let page_size: usize = 4096;
332        let frame_size = 24 + page_size;
333        let mut data = vec![0u8; 32 + 2 * frame_size];
334
335        // Write valid header
336        data[0..4].copy_from_slice(&0x377f_0682_u32.to_be_bytes());
337        #[allow(clippy::cast_possible_truncation)]
338        let page_size_u32 = page_size as u32;
339        data[8..12].copy_from_slice(&page_size_u32.to_be_bytes());
340
341        let count = WalMonitor::count_frames(&data).unwrap();
342        assert_eq!(count, 2);
343    }
344
345    #[test]
346    fn test_empty_wal() {
347        let data = vec![0u8; 16];
348        let count = WalMonitor::count_frames(&data);
349        assert_eq!(count, Some(0));
350    }
351}