1use bytes::{BytesMut, BufMut};
7use parking_lot::Mutex;
8use std::collections::VecDeque;
9use std::fs::{File, OpenOptions};
10use std::io::{Read, Write, Seek, SeekFrom};
11use std::path::Path;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use crate::{Error, Result, Record, Lsn, layout::Region, types::checksum};
15
16const RECORD_HEADER_SIZE: usize = 12;
18
19struct WalEntry {
22 lsn: Lsn,
23 record: Record,
24}
25
26pub struct WalRing {
28 inner: Arc<Mutex<WalRingInner>>,
29}
30
31struct WalRingInner {
32 file: File,
33 region: Region,
34
35 write_offset: u64, checkpoint_lsn: Lsn, next_lsn: Lsn, pending: VecDeque<WalEntry>,
42 last_flush: Instant,
43 batch_timeout: Duration,
44}
45
46impl WalRing {
47 pub fn create(path: impl AsRef<Path>, region: Region) -> Result<Self> {
49 let mut file = OpenOptions::new()
50 .read(true)
51 .write(true)
52 .create(true)
53 .open(path)?;
54
55 file.seek(SeekFrom::Start(region.offset))?;
57 let zeros = vec![0u8; region.size as usize];
58 file.write_all(&zeros)?;
59 file.sync_all()?;
60
61 Ok(Self {
62 inner: Arc::new(Mutex::new(WalRingInner {
63 file,
64 region,
65 write_offset: 0,
66 checkpoint_lsn: 0,
67 next_lsn: 1,
68 pending: VecDeque::new(),
69 last_flush: Instant::now(),
70 batch_timeout: Duration::from_millis(10), })),
72 })
73 }
74
75 pub fn open(path: impl AsRef<Path>, region: Region) -> Result<Self> {
77 let mut file = OpenOptions::new()
78 .read(true)
79 .write(true)
80 .open(path)?;
81
82 let records = Self::recover(&mut file, ®ion)?;
84
85 let max_lsn = if records.is_empty() {
86 0
87 } else {
88 records.iter().map(|(lsn, _)| *lsn).max().unwrap_or(0)
89 };
90
91 Ok(Self {
92 inner: Arc::new(Mutex::new(WalRingInner {
93 file,
94 region,
95 write_offset: 0, checkpoint_lsn: 0,
97 next_lsn: max_lsn + 1,
98 pending: VecDeque::new(),
99 last_flush: Instant::now(),
100 batch_timeout: Duration::from_millis(10),
101 })),
102 })
103 }
104
105 pub fn append(&self, record: Record) -> Result<Lsn> {
107 let mut inner = self.inner.lock();
108
109 let lsn = inner.next_lsn;
110 inner.next_lsn += 1;
111
112 inner.pending.push_back(WalEntry { lsn, record });
113
114 if inner.last_flush.elapsed() >= inner.batch_timeout {
116 Self::flush_inner(&mut inner)?;
117 }
118
119 Ok(lsn)
120 }
121
122 pub fn flush(&self) -> Result<()> {
124 let mut inner = self.inner.lock();
125 Self::flush_inner(&mut inner)
126 }
127
128 fn flush_inner(inner: &mut WalRingInner) -> Result<()> {
130 if inner.pending.is_empty() {
131 return Ok(());
132 }
133
134 let mut buf = BytesMut::new();
136
137 for entry in &inner.pending {
138 let data = bincode::serialize(&entry.record)
139 .map_err(|e| Error::Internal(format!("Serialize error: {}", e)))?;
140
141 buf.put_u64_le(entry.lsn);
143 buf.put_u32_le(data.len() as u32);
144 buf.put_slice(&data);
145
146 let crc = checksum::compute(&data);
147 buf.put_u32_le(crc);
148 }
149
150 let total_size = buf.len() as u64;
151
152 if inner.write_offset + total_size > inner.region.size {
154 inner.write_offset = 0;
156 }
157
158 let file_offset = inner.region.offset + inner.write_offset;
160 inner.file.seek(SeekFrom::Start(file_offset))?;
161 inner.file.write_all(&buf)?;
162 inner.file.sync_all()?;
163
164 inner.write_offset += total_size;
166 inner.pending.clear();
167 inner.last_flush = Instant::now();
168
169 Ok(())
170 }
171
172 fn recover(file: &mut File, region: &Region) -> Result<Vec<(Lsn, Record)>> {
174 let mut records = Vec::new();
175
176 file.seek(SeekFrom::Start(region.offset))?;
178 let mut ring_data = vec![0u8; region.size as usize];
179
180 let bytes_read = file.read(&mut ring_data)?;
182 if bytes_read == 0 {
183 return Ok(records); }
185
186 let mut offset = 0usize;
187
188 while offset + RECORD_HEADER_SIZE + 4 < ring_data.len() {
190 let lsn = u64::from_le_bytes([
192 ring_data[offset],
193 ring_data[offset + 1],
194 ring_data[offset + 2],
195 ring_data[offset + 3],
196 ring_data[offset + 4],
197 ring_data[offset + 5],
198 ring_data[offset + 6],
199 ring_data[offset + 7],
200 ]);
201
202 if lsn == 0 {
204 break;
205 }
206
207 let len = u32::from_le_bytes([
208 ring_data[offset + 8],
209 ring_data[offset + 9],
210 ring_data[offset + 10],
211 ring_data[offset + 11],
212 ]) as usize;
213
214 if offset + RECORD_HEADER_SIZE + len + 4 > ring_data.len() {
216 break; }
218
219 let data_start = offset + RECORD_HEADER_SIZE;
221 let data_end = data_start + len;
222 let data = &ring_data[data_start..data_end];
223
224 let crc_offset = data_end;
225 let expected_crc = u32::from_le_bytes([
226 ring_data[crc_offset],
227 ring_data[crc_offset + 1],
228 ring_data[crc_offset + 2],
229 ring_data[crc_offset + 3],
230 ]);
231
232 if checksum::verify(data, expected_crc) {
234 match bincode::deserialize::<Record>(data) {
236 Ok(record) => {
237 records.push((lsn, record));
238 offset = crc_offset + 4;
239 }
240 Err(_) => {
241 break;
243 }
244 }
245 } else {
246 break;
248 }
249 }
250
251 records.sort_by_key(|(lsn, _)| *lsn);
253
254 Ok(records)
255 }
256
257 pub fn read_all(&self) -> Result<Vec<(Lsn, Record)>> {
259 let inner = self.inner.lock();
260 let mut file = inner.file.try_clone()?;
261 drop(inner);
262
263 let inner = self.inner.lock();
264 let region = inner.region;
265 drop(inner);
266
267 Self::recover(&mut file, ®ion)
268 }
269
270 pub fn set_checkpoint(&self, lsn: Lsn) -> Result<()> {
272 let mut inner = self.inner.lock();
273 inner.checkpoint_lsn = lsn;
274 Ok(())
275 }
276
277 pub fn compact(&self) -> Result<()> {
280 Ok(())
284 }
285
286 pub fn next_lsn(&self) -> Lsn {
288 let inner = self.inner.lock();
289 inner.next_lsn
290 }
291
292 pub fn set_batch_timeout(&self, timeout: Duration) {
294 let mut inner = self.inner.lock();
295 inner.batch_timeout = timeout;
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use crate::{Key, Value, layout::Region};
303 use tempfile::NamedTempFile;
304 use std::collections::HashMap;
305
306 #[test]
307 fn test_wal_ring_create_and_append() {
308 let tmp = NamedTempFile::new().unwrap();
309 let region = Region::new(0, 64 * 1024); let wal = WalRing::create(tmp.path(), region).unwrap();
312
313 let key = Key::new(b"test".to_vec());
314 let mut item = HashMap::new();
315 item.insert("value".to_string(), Value::string("hello"));
316 let record = Record::put(key, item, 1);
317
318 let lsn = wal.append(record).unwrap();
319 assert_eq!(lsn, 1);
320
321 wal.flush().unwrap();
322
323 let records = wal.read_all().unwrap();
324 assert_eq!(records.len(), 1);
325 assert_eq!(records[0].0, 1);
326 }
327
328 #[test]
329 fn test_wal_ring_recovery() {
330 let tmp = NamedTempFile::new().unwrap();
331 let region = Region::new(0, 64 * 1024);
332
333 {
335 let wal = WalRing::create(tmp.path(), region).unwrap();
336
337 for i in 0..5 {
338 let key = Key::new(format!("key{}", i).into_bytes());
339 let item = HashMap::new();
340 let record = Record::put(key, item, i);
341 wal.append(record).unwrap();
342 }
343
344 wal.flush().unwrap();
345 }
346
347 let wal = WalRing::open(tmp.path(), region).unwrap();
349 assert_eq!(wal.next_lsn(), 6);
350
351 let records = wal.read_all().unwrap();
352 assert_eq!(records.len(), 5);
353 }
354
355 #[test]
356 fn test_wal_ring_group_commit() {
357 let tmp = NamedTempFile::new().unwrap();
358 let region = Region::new(0, 64 * 1024);
359
360 let wal = WalRing::create(tmp.path(), region).unwrap();
361
362 for i in 0..10 {
364 let key = Key::new(format!("key{}", i).into_bytes());
365 let item = HashMap::new();
366 let record = Record::put(key, item, i);
367 wal.append(record).unwrap();
368 }
369
370 wal.flush().unwrap();
372
373 let records = wal.read_all().unwrap();
374 assert_eq!(records.len(), 10);
375 }
376
377 #[test]
378 fn test_wal_ring_wrap_around() {
379 let tmp = NamedTempFile::new().unwrap();
380 let region = Region::new(0, 1024); let wal = WalRing::create(tmp.path(), region).unwrap();
383
384 for i in 0..50 {
386 let key = Key::new(format!("key{}", i).into_bytes());
387 let mut item = HashMap::new();
388 item.insert("data".to_string(), Value::string("x".repeat(50)));
389 let record = Record::put(key, item, i);
390 wal.append(record).unwrap();
391 wal.flush().unwrap();
392 }
393
394 let records = wal.read_all().unwrap();
396 assert!(!records.is_empty());
397 }
398
399 #[test]
400 fn test_wal_ring_checkpoint() {
401 let tmp = NamedTempFile::new().unwrap();
402 let region = Region::new(0, 64 * 1024);
403
404 let wal = WalRing::create(tmp.path(), region).unwrap();
405
406 for i in 0..10 {
407 let key = Key::new(format!("key{}", i).into_bytes());
408 let item = HashMap::new();
409 let record = Record::put(key, item, i);
410 wal.append(record).unwrap();
411 }
412
413 wal.flush().unwrap();
414
415 wal.set_checkpoint(5).unwrap();
417 wal.compact().unwrap(); let records = wal.read_all().unwrap();
420 assert_eq!(records.len(), 10); }
422}