zlayer_storage/replicator/
wal_monitor.rs1use crate::error::{LayerStorageError, Result};
6use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::trace;
12
13#[derive(Debug, Clone)]
15pub struct WalEvent {
16 pub frame_count: u64,
18 pub file_size: u64,
20 pub is_checkpoint: bool,
22}
23
24#[repr(C)]
28#[derive(Debug, Clone, Copy)]
29struct WalHeader {
30 magic: u32,
32 format_version: u32,
34 page_size: u32,
36 checkpoint_seq: u32,
38 salt1: u32,
40 salt2: u32,
42 checksum1: u32,
44 checksum2: u32,
46}
47
48impl WalHeader {
49 fn from_bytes(bytes: &[u8]) -> Option<Self> {
51 if bytes.len() < 32 {
52 return None;
53 }
54
55 let magic = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
57
58 if magic != 0x377f_0682 && magic != 0x377f_0683 {
60 return None;
61 }
62
63 Some(Self {
64 magic,
65 format_version: u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
66 page_size: u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
67 checkpoint_seq: u32::from_be_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]),
68 salt1: u32::from_be_bytes([bytes[16], bytes[17], bytes[18], bytes[19]]),
69 salt2: u32::from_be_bytes([bytes[20], bytes[21], bytes[22], bytes[23]]),
70 checksum1: u32::from_be_bytes([bytes[24], bytes[25], bytes[26], bytes[27]]),
71 checksum2: u32::from_be_bytes([bytes[28], bytes[29], bytes[30], bytes[31]]),
72 })
73 }
74}
75
76#[repr(C)]
80#[derive(Debug, Clone, Copy)]
81#[allow(dead_code)]
82struct WalFrameHeader {
83 page_number: u32,
85 db_size: u32,
87 salt1: u32,
89 salt2: u32,
91 checksum1: u32,
93 checksum2: u32,
95}
96
97pub struct WalMonitor {
99 wal_path: PathBuf,
101 last_frame_count: Arc<AtomicU64>,
103 last_file_size: Arc<AtomicU64>,
105 watcher: RecommendedWatcher,
107 event_rx: mpsc::Receiver<notify::Result<Event>>,
109}
110
111impl WalMonitor {
112 pub fn new(wal_path: PathBuf) -> Result<Self> {
122 let (tx, rx) = mpsc::channel(100);
123
124 let watcher = notify::recommended_watcher(move |res| {
126 let _ = tx.blocking_send(res);
127 })
128 .map_err(|e| LayerStorageError::Io(std::io::Error::other(format!("Watcher error: {e}"))))?;
129
130 let (initial_size, initial_frames) = if wal_path.exists() {
132 let metadata = std::fs::metadata(&wal_path)?;
133 let size = metadata.len();
134 let frames = Self::count_frames_from_file(&wal_path).unwrap_or(0);
135 (size, frames)
136 } else {
137 (0, 0)
138 };
139
140 let mut monitor = Self {
141 wal_path,
142 last_frame_count: Arc::new(AtomicU64::new(initial_frames)),
143 last_file_size: Arc::new(AtomicU64::new(initial_size)),
144 watcher,
145 event_rx: rx,
146 };
147
148 monitor.start_watching()?;
150
151 Ok(monitor)
152 }
153
154 fn start_watching(&mut self) -> Result<()> {
156 if let Some(parent) = self.wal_path.parent() {
158 if parent.exists() {
159 let (tx, rx) = mpsc::channel(100);
162 let wal_path = self.wal_path.clone();
163
164 let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
165 if let Ok(event) = &res {
167 let is_wal_event = event.paths.iter().any(|p| {
168 p.file_name()
169 .is_some_and(|n| n.to_string_lossy().ends_with("-wal"))
170 || p == &wal_path
171 });
172 if is_wal_event {
173 let _ = tx.blocking_send(res);
174 }
175 }
176 })
177 .map_err(|e| {
178 LayerStorageError::Io(std::io::Error::other(format!("Watcher error: {e}")))
179 })?;
180
181 watcher
182 .watch(parent, RecursiveMode::NonRecursive)
183 .map_err(|e| {
184 LayerStorageError::Io(std::io::Error::other(format!("Watch error: {e}")))
185 })?;
186
187 self.watcher = watcher;
188 self.event_rx = rx;
189 }
190 }
191 Ok(())
192 }
193
194 pub async fn check_for_changes(&self) -> Result<Option<WalEvent>> {
198 if !self.wal_path.exists() {
200 let last_size = self.last_file_size.load(Ordering::SeqCst);
202 if last_size > 0 {
203 self.last_file_size.store(0, Ordering::SeqCst);
205 self.last_frame_count.store(0, Ordering::SeqCst);
206 return Ok(Some(WalEvent {
207 frame_count: 0,
208 file_size: 0,
209 is_checkpoint: true,
210 }));
211 }
212 return Ok(None);
213 }
214
215 let metadata = tokio::fs::metadata(&self.wal_path).await?;
217 let current_size = metadata.len();
218 let last_size = self.last_file_size.load(Ordering::SeqCst);
219
220 if current_size == last_size {
221 return Ok(None);
222 }
223
224 let frame_count = Self::count_frames_from_file(&self.wal_path).unwrap_or(0);
226 let last_frames = self.last_frame_count.load(Ordering::SeqCst);
227
228 self.last_file_size.store(current_size, Ordering::SeqCst);
230 self.last_frame_count.store(frame_count, Ordering::SeqCst);
231
232 let is_checkpoint = current_size < last_size;
234
235 if frame_count != last_frames || is_checkpoint {
236 trace!(
237 "WAL change: {} -> {} frames, {} -> {} bytes",
238 last_frames,
239 frame_count,
240 last_size,
241 current_size
242 );
243
244 Ok(Some(WalEvent {
245 frame_count,
246 file_size: current_size,
247 is_checkpoint,
248 }))
249 } else {
250 Ok(None)
251 }
252 }
253
254 fn count_frames_from_file(path: &PathBuf) -> Option<u64> {
256 let data = std::fs::read(path).ok()?;
257 Self::count_frames(&data)
258 }
259
260 fn count_frames(data: &[u8]) -> Option<u64> {
262 if data.len() < 32 {
263 return Some(0);
264 }
265
266 let header = WalHeader::from_bytes(data)?;
267 let page_size = header.page_size as usize;
268
269 if page_size == 0 {
270 return Some(0);
271 }
272
273 let frame_size = 24 + page_size;
275
276 let data_after_header = data.len().saturating_sub(32);
278 let frame_count = data_after_header / frame_size;
279
280 Some(frame_count as u64)
281 }
282
283 #[allow(dead_code)]
285 pub fn frame_count(&self) -> u64 {
286 self.last_frame_count.load(Ordering::SeqCst)
287 }
288
289 #[allow(dead_code)]
291 pub fn file_size(&self) -> u64 {
292 self.last_file_size.load(Ordering::SeqCst)
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299
300 #[test]
301 fn test_wal_header_parsing() {
302 let mut header_bytes = vec![0u8; 32];
304 header_bytes[0..4].copy_from_slice(&0x377f_0682_u32.to_be_bytes());
306 header_bytes[4..8].copy_from_slice(&3_007_000_u32.to_be_bytes());
308 header_bytes[8..12].copy_from_slice(&4096_u32.to_be_bytes());
310
311 let header = WalHeader::from_bytes(&header_bytes).unwrap();
312 assert_eq!(header.magic, 0x377f_0682);
313 assert_eq!(header.format_version, 3_007_000);
314 assert_eq!(header.page_size, 4096);
315 }
316
317 #[test]
318 fn test_invalid_wal_header() {
319 let header_bytes = vec![0u8; 32];
321 assert!(WalHeader::from_bytes(&header_bytes).is_none());
322
323 let short_bytes = vec![0u8; 16];
325 assert!(WalHeader::from_bytes(&short_bytes).is_none());
326 }
327
328 #[test]
329 fn test_frame_counting() {
330 let page_size: usize = 4096;
332 let frame_size = 24 + page_size;
333 let mut data = vec![0u8; 32 + 2 * frame_size];
334
335 data[0..4].copy_from_slice(&0x377f_0682_u32.to_be_bytes());
337 #[allow(clippy::cast_possible_truncation)]
338 let page_size_u32 = page_size as u32;
339 data[8..12].copy_from_slice(&page_size_u32.to_be_bytes());
340
341 let count = WalMonitor::count_frames(&data).unwrap();
342 assert_eq!(count, 2);
343 }
344
345 #[test]
346 fn test_empty_wal() {
347 let data = vec![0u8; 16];
348 let count = WalMonitor::count_frames(&data);
349 assert_eq!(count, Some(0));
350 }
351}