1use std::collections::VecDeque;
119use std::fs::{File, OpenOptions};
120use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
121use std::path::{Path, PathBuf};
122use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
123use std::sync::{Arc, Condvar, Mutex};
124use std::thread::{self, JoinHandle};
125use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
126
127use super::types::ChunkId;
128
129const JOURNAL_MAGIC: u32 = 0x52424D45;
131
132const JOURNAL_VERSION: u16 = 1;
134
135const HEADER_SIZE: usize = 32;
137
138const DEFAULT_GROUP_COMMIT_MS: u64 = 5;
140
141const DEFAULT_MAX_PENDING_OPS: usize = 100;
143
144const WAL_THRESHOLD_BYTES: usize = 64 * 1024;
146
147#[repr(u16)]
149#[derive(Clone, Copy, Debug, PartialEq, Eq)]
150pub enum RecordFlags {
151 Pending = 0,
153 Committed = 1,
155 Checkpointed = 2,
157 Aborted = 3,
159}
160
161impl RecordFlags {
162 fn from_u16(v: u16) -> Option<Self> {
163 match v {
164 0 => Some(Self::Pending),
165 1 => Some(Self::Committed),
166 2 => Some(Self::Checkpointed),
167 3 => Some(Self::Aborted),
168 _ => None,
169 }
170 }
171}
172
173#[derive(Clone, Copy, Debug, PartialEq, Eq)]
175pub enum DurabilityMode {
176 Immediate,
178 GroupCommit {
180 max_delay_ms: u64,
181 max_pending_ops: usize,
182 },
183 Relaxed,
185}
186
187impl Default for DurabilityMode {
188 fn default() -> Self {
189 Self::GroupCommit {
191 max_delay_ms: DEFAULT_GROUP_COMMIT_MS,
192 max_pending_ops: DEFAULT_MAX_PENDING_OPS,
193 }
194 }
195}
196
197#[derive(Clone, Debug)]
199pub struct RecordHeader {
200 pub magic: u32,
201 pub version: u16,
202 pub flags: RecordFlags,
203 pub txn_id: u64,
204 pub timestamp: u64,
205 pub payload_len: u32,
206 pub checksum: u32,
207}
208
209impl RecordHeader {
210 pub fn new(txn_id: u64, flags: RecordFlags, payload_len: u32) -> Self {
212 let timestamp = SystemTime::now()
213 .duration_since(UNIX_EPOCH)
214 .unwrap_or_default()
215 .as_nanos() as u64;
216
217 Self {
218 magic: JOURNAL_MAGIC,
219 version: JOURNAL_VERSION,
220 flags,
221 txn_id,
222 timestamp,
223 payload_len,
224 checksum: 0, }
226 }
227
228 pub fn to_bytes(&self, payload: &[u8]) -> Vec<u8> {
230 let mut buf = Vec::with_capacity(HEADER_SIZE);
231
232 buf.extend_from_slice(&self.magic.to_le_bytes());
233 buf.extend_from_slice(&self.version.to_le_bytes());
234 buf.extend_from_slice(&(self.flags as u16).to_le_bytes());
235 buf.extend_from_slice(&self.txn_id.to_le_bytes());
236 buf.extend_from_slice(&self.timestamp.to_le_bytes());
237 buf.extend_from_slice(&self.payload_len.to_le_bytes());
238
239 let checksum = crc32_compute(&buf, payload);
241 buf.extend_from_slice(&checksum.to_le_bytes());
242
243 buf
244 }
245
246 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
248 if data.len() < HEADER_SIZE {
249 return Err(io::Error::new(
250 io::ErrorKind::InvalidData,
251 "header too short",
252 ));
253 }
254
255 let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
256 if magic != JOURNAL_MAGIC {
257 return Err(io::Error::new(
258 io::ErrorKind::InvalidData,
259 format!("invalid magic: {:08x}", magic),
260 ));
261 }
262
263 let version = u16::from_le_bytes(data[4..6].try_into().unwrap());
264 if version != JOURNAL_VERSION {
265 return Err(io::Error::new(
266 io::ErrorKind::InvalidData,
267 format!("unsupported version: {}", version),
268 ));
269 }
270
271 let flags = RecordFlags::from_u16(u16::from_le_bytes(data[6..8].try_into().unwrap()))
272 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid flags"))?;
273
274 let txn_id = u64::from_le_bytes(data[8..16].try_into().unwrap());
275 let timestamp = u64::from_le_bytes(data[16..24].try_into().unwrap());
276 let payload_len = u32::from_le_bytes(data[24..28].try_into().unwrap());
277 let checksum = u32::from_le_bytes(data[28..32].try_into().unwrap());
278
279 Ok(Self {
280 magic,
281 version,
282 flags,
283 txn_id,
284 timestamp,
285 payload_len,
286 checksum,
287 })
288 }
289
290 pub fn verify_checksum(&self, payload: &[u8]) -> bool {
292 let mut header_bytes = Vec::with_capacity(28);
293 header_bytes.extend_from_slice(&self.magic.to_le_bytes());
294 header_bytes.extend_from_slice(&self.version.to_le_bytes());
295 header_bytes.extend_from_slice(&(self.flags as u16).to_le_bytes());
296 header_bytes.extend_from_slice(&self.txn_id.to_le_bytes());
297 header_bytes.extend_from_slice(&self.timestamp.to_le_bytes());
298 header_bytes.extend_from_slice(&self.payload_len.to_le_bytes());
299
300 let computed = crc32_compute(&header_bytes, payload);
301 computed == self.checksum
302 }
303}
304
305fn crc32_compute(header: &[u8], payload: &[u8]) -> u32 {
307 const POLYNOMIAL: u32 = 0xEDB88320;
309
310 let mut crc = 0xFFFFFFFF_u32;
311
312 for &byte in header.iter().chain(payload.iter()) {
313 crc ^= byte as u32;
314 for _ in 0..8 {
315 if crc & 1 != 0 {
316 crc = (crc >> 1) ^ POLYNOMIAL;
317 } else {
318 crc >>= 1;
319 }
320 }
321 }
322
323 !crc
324}
325
326#[derive(Clone, Debug)]
328pub enum JournalOp {
329 WriteChunk {
330 chunk_id: ChunkId,
331 data_hash: [u8; 32],
332 data_len: usize,
333 },
334 DeleteChunk {
335 chunk_id: ChunkId,
336 },
337 WriteFile {
338 path: String,
339 chunk_ids: Vec<ChunkId>,
340 size: usize,
341 },
342 DeleteFile {
343 path: String,
344 },
345 UpdateRoot {
346 root_hash: [u8; 32],
347 },
348 Barrier,
349}
350
351impl JournalOp {
352 pub fn to_bytes(&self) -> Vec<u8> {
354 let mut buf = Vec::new();
355
356 match self {
357 JournalOp::WriteChunk {
358 chunk_id,
359 data_hash,
360 data_len,
361 } => {
362 buf.push(1);
363 buf.extend_from_slice(&(*chunk_id as u64).to_le_bytes());
364 buf.extend_from_slice(data_hash);
365 buf.extend_from_slice(&(*data_len as u64).to_le_bytes());
366 }
367 JournalOp::DeleteChunk { chunk_id } => {
368 buf.push(2);
369 buf.extend_from_slice(&(*chunk_id as u64).to_le_bytes());
370 }
371 JournalOp::WriteFile {
372 path,
373 chunk_ids,
374 size,
375 } => {
376 buf.push(3);
377 let path_bytes = path.as_bytes();
378 buf.extend_from_slice(&(path_bytes.len() as u32).to_le_bytes());
379 buf.extend_from_slice(path_bytes);
380 buf.extend_from_slice(&(chunk_ids.len() as u32).to_le_bytes());
381 for &id in chunk_ids {
382 buf.extend_from_slice(&(id as u64).to_le_bytes());
383 }
384 buf.extend_from_slice(&(*size as u64).to_le_bytes());
385 }
386 JournalOp::DeleteFile { path } => {
387 buf.push(4);
388 let path_bytes = path.as_bytes();
389 buf.extend_from_slice(&(path_bytes.len() as u32).to_le_bytes());
390 buf.extend_from_slice(path_bytes);
391 }
392 JournalOp::UpdateRoot { root_hash } => {
393 buf.push(5);
394 buf.extend_from_slice(root_hash);
395 }
396 JournalOp::Barrier => {
397 buf.push(6);
398 }
399 }
400
401 buf
402 }
403
404 pub fn from_bytes(data: &[u8]) -> io::Result<(Self, usize)> {
406 if data.is_empty() {
407 return Err(io::Error::new(io::ErrorKind::InvalidData, "empty data"));
408 }
409
410 let op_type = data[0];
411 let mut pos = 1;
412
413 let op = match op_type {
414 1 => {
415 if data.len() < pos + 8 + 32 + 8 {
417 return Err(io::Error::new(
418 io::ErrorKind::InvalidData,
419 "truncated WriteChunk",
420 ));
421 }
422 let chunk_id =
423 u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as ChunkId;
424 pos += 8;
425 let mut data_hash = [0u8; 32];
426 data_hash.copy_from_slice(&data[pos..pos + 32]);
427 pos += 32;
428 let data_len = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize;
429 pos += 8;
430 JournalOp::WriteChunk {
431 chunk_id,
432 data_hash,
433 data_len,
434 }
435 }
436 2 => {
437 if data.len() < pos + 8 {
439 return Err(io::Error::new(
440 io::ErrorKind::InvalidData,
441 "truncated DeleteChunk",
442 ));
443 }
444 let chunk_id =
445 u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as ChunkId;
446 pos += 8;
447 JournalOp::DeleteChunk { chunk_id }
448 }
449 3 => {
450 if data.len() < pos + 4 {
452 return Err(io::Error::new(
453 io::ErrorKind::InvalidData,
454 "truncated WriteFile path_len",
455 ));
456 }
457 let path_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
458 pos += 4;
459 if data.len() < pos + path_len {
460 return Err(io::Error::new(
461 io::ErrorKind::InvalidData,
462 "truncated WriteFile path",
463 ));
464 }
465 let path = String::from_utf8_lossy(&data[pos..pos + path_len]).into_owned();
466 pos += path_len;
467
468 if data.len() < pos + 4 {
469 return Err(io::Error::new(
470 io::ErrorKind::InvalidData,
471 "truncated WriteFile chunk_count",
472 ));
473 }
474 let chunk_count =
475 u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
476 pos += 4;
477
478 let mut chunk_ids = Vec::with_capacity(chunk_count);
479 for _ in 0..chunk_count {
480 if data.len() < pos + 8 {
481 return Err(io::Error::new(
482 io::ErrorKind::InvalidData,
483 "truncated WriteFile chunk_id",
484 ));
485 }
486 let id = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as ChunkId;
487 pos += 8;
488 chunk_ids.push(id);
489 }
490
491 if data.len() < pos + 8 {
492 return Err(io::Error::new(
493 io::ErrorKind::InvalidData,
494 "truncated WriteFile size",
495 ));
496 }
497 let size = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize;
498 pos += 8;
499
500 JournalOp::WriteFile {
501 path,
502 chunk_ids,
503 size,
504 }
505 }
506 4 => {
507 if data.len() < pos + 4 {
509 return Err(io::Error::new(
510 io::ErrorKind::InvalidData,
511 "truncated DeleteFile",
512 ));
513 }
514 let path_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
515 pos += 4;
516 if data.len() < pos + path_len {
517 return Err(io::Error::new(
518 io::ErrorKind::InvalidData,
519 "truncated DeleteFile path",
520 ));
521 }
522 let path = String::from_utf8_lossy(&data[pos..pos + path_len]).into_owned();
523 pos += path_len;
524 JournalOp::DeleteFile { path }
525 }
526 5 => {
527 if data.len() < pos + 32 {
529 return Err(io::Error::new(
530 io::ErrorKind::InvalidData,
531 "truncated UpdateRoot",
532 ));
533 }
534 let mut root_hash = [0u8; 32];
535 root_hash.copy_from_slice(&data[pos..pos + 32]);
536 pos += 32;
537 JournalOp::UpdateRoot { root_hash }
538 }
539 6 => JournalOp::Barrier,
540 _ => {
541 return Err(io::Error::new(
542 io::ErrorKind::InvalidData,
543 format!("unknown op type: {}", op_type),
544 ))
545 }
546 };
547
548 Ok((op, pos))
549 }
550}
551
552struct PendingWrite {
554 #[allow(dead_code)]
555 txn_id: u64,
556 data: Vec<u8>,
557 committed: Arc<(Mutex<bool>, Condvar)>,
558}
559
560pub struct Journal {
562 path: PathBuf,
564 mode: DurabilityMode,
566 writer: Mutex<BufWriter<File>>,
568 next_txn_id: AtomicU64,
570 open: AtomicBool,
572 pending: Mutex<VecDeque<PendingWrite>>,
574 last_flush: Mutex<Instant>,
576 flush_thread: Mutex<Option<JoinHandle<()>>>,
578 stop_signal: Arc<AtomicBool>,
580}
581
582impl Journal {
583 pub fn open(path: impl AsRef<Path>, mode: DurabilityMode) -> io::Result<Arc<Self>> {
585 let path = path.as_ref().to_path_buf();
586
587 let file = OpenOptions::new()
589 .create(true)
590 .truncate(false)
591 .read(true)
592 .write(true)
593 .open(&path)?;
594
595 let journal = Arc::new(Self {
596 path,
597 mode,
598 writer: Mutex::new(BufWriter::new(file)),
599 next_txn_id: AtomicU64::new(1),
600 open: AtomicBool::new(true),
601 pending: Mutex::new(VecDeque::new()),
602 last_flush: Mutex::new(Instant::now()),
603 flush_thread: Mutex::new(None),
604 stop_signal: Arc::new(AtomicBool::new(false)),
605 });
606
607 if let DurabilityMode::GroupCommit { max_delay_ms, .. } = mode {
609 let journal_clone = Arc::clone(&journal);
610 let stop = Arc::clone(&journal.stop_signal);
611
612 let handle = thread::spawn(move || {
613 while !stop.load(Ordering::Relaxed) {
614 thread::sleep(Duration::from_millis(max_delay_ms));
615 if !stop.load(Ordering::Relaxed) {
616 let _ = journal_clone.flush_pending();
617 }
618 }
619 });
620
621 *journal.flush_thread.lock().unwrap() = Some(handle);
622 }
623
624 Ok(journal)
625 }
626
627 pub fn write_transaction(&self, ops: &[JournalOp]) -> io::Result<u64> {
629 if !self.open.load(Ordering::Acquire) {
630 return Err(io::Error::other("journal closed"));
631 }
632
633 let txn_id = self.next_txn_id.fetch_add(1, Ordering::AcqRel);
634
635 let mut payload = Vec::new();
637 for op in ops {
638 payload.extend_from_slice(&op.to_bytes());
639 }
640
641 let header = RecordHeader::new(txn_id, RecordFlags::Committed, payload.len() as u32);
643 let header_bytes = header.to_bytes(&payload);
644
645 match self.mode {
646 DurabilityMode::Immediate => {
647 let mut writer = self.writer.lock().unwrap();
649 writer.write_all(&header_bytes)?;
650 writer.write_all(&payload)?;
651 writer.flush()?;
652 writer.get_ref().sync_all()?;
653 }
654 DurabilityMode::GroupCommit {
655 max_pending_ops, ..
656 } => {
657 let committed = Arc::new((Mutex::new(false), Condvar::new()));
659 let committed_clone = Arc::clone(&committed);
660
661 let mut record_data = header_bytes;
662 record_data.extend_from_slice(&payload);
663
664 {
665 let mut pending = self.pending.lock().unwrap();
666 pending.push_back(PendingWrite {
667 txn_id,
668 data: record_data,
669 committed: committed_clone,
670 });
671
672 if pending.len() >= max_pending_ops {
674 drop(pending);
675 self.flush_pending()?;
676 }
677 }
678
679 let (lock, cvar) = &*committed;
681 let mut done = lock.lock().unwrap();
682 while !*done {
683 done = cvar.wait(done).unwrap();
684 }
685 }
686 DurabilityMode::Relaxed => {
687 let mut writer = self.writer.lock().unwrap();
689 writer.write_all(&header_bytes)?;
690 writer.write_all(&payload)?;
691 }
693 }
694
695 Ok(txn_id)
696 }
697
698 pub fn flush_pending(&self) -> io::Result<()> {
700 let writes: Vec<PendingWrite> = {
701 let mut pending = self.pending.lock().unwrap();
702 pending.drain(..).collect()
703 };
704
705 if writes.is_empty() {
706 return Ok(());
707 }
708
709 {
711 let mut writer = self.writer.lock().unwrap();
712 for write in &writes {
713 writer.write_all(&write.data)?;
714 }
715 writer.flush()?;
716 writer.get_ref().sync_all()?;
717 }
718
719 for write in writes {
721 let (lock, cvar) = &*write.committed;
722 let mut done = lock.lock().unwrap();
723 *done = true;
724 cvar.notify_one();
725 }
726
727 *self.last_flush.lock().unwrap() = Instant::now();
728
729 Ok(())
730 }
731
732 pub fn write_barrier(&self) -> io::Result<()> {
734 self.write_transaction(&[JournalOp::Barrier])?;
735 Ok(())
736 }
737
738 pub fn replay(&self) -> io::Result<Vec<(u64, Vec<JournalOp>)>> {
740 let mut file = OpenOptions::new().read(true).open(&self.path)?;
741 file.seek(SeekFrom::Start(0))?;
742
743 let mut transactions = Vec::new();
744 let mut header_buf = [0u8; HEADER_SIZE];
745
746 loop {
747 match file.read_exact(&mut header_buf) {
748 Ok(()) => {}
749 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
750 Err(e) => return Err(e),
751 }
752
753 let header = RecordHeader::from_bytes(&header_buf)?;
754
755 let mut payload = vec![0u8; header.payload_len as usize];
757 file.read_exact(&mut payload)?;
758
759 if !header.verify_checksum(&payload) {
761 return Err(io::Error::new(
762 io::ErrorKind::InvalidData,
763 "checksum mismatch",
764 ));
765 }
766
767 if header.flags != RecordFlags::Committed {
769 continue;
770 }
771
772 let mut ops = Vec::new();
774 let mut pos = 0;
775 while pos < payload.len() {
776 let (op, consumed) = JournalOp::from_bytes(&payload[pos..])?;
777 ops.push(op);
778 pos += consumed;
779 }
780
781 transactions.push((header.txn_id, ops));
782 }
783
784 Ok(transactions)
785 }
786
787 pub fn checkpoint(&self) -> io::Result<()> {
789 self.flush_pending()?;
791
792 let mut writer = self.writer.lock().unwrap();
794 writer.get_ref().set_len(0)?;
795 writer.seek(SeekFrom::Start(0))?;
796 writer.get_ref().sync_all()?;
797
798 Ok(())
799 }
800
801 pub fn close(&self) -> io::Result<()> {
803 self.open.store(false, Ordering::Release);
804 self.stop_signal.store(true, Ordering::Release);
805
806 self.flush_pending()?;
808
809 if let Some(handle) = self.flush_thread.lock().unwrap().take() {
811 let _ = handle.join();
812 }
813
814 Ok(())
815 }
816
817 pub fn should_use_wal(data_size: usize, op_count: usize) -> bool {
819 data_size > WAL_THRESHOLD_BYTES || op_count > 1
820 }
821}
822
823impl Drop for Journal {
824 fn drop(&mut self) {
825 let _ = self.close();
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832 use tempfile::tempdir;
833
834 #[test]
835 fn test_record_header_roundtrip() {
836 let payload = b"test payload data";
837 let header = RecordHeader::new(42, RecordFlags::Committed, payload.len() as u32);
838
839 let header_bytes = header.to_bytes(payload);
840 let parsed = RecordHeader::from_bytes(&header_bytes).unwrap();
841
842 assert_eq!(parsed.magic, JOURNAL_MAGIC);
843 assert_eq!(parsed.version, JOURNAL_VERSION);
844 assert_eq!(parsed.flags, RecordFlags::Committed);
845 assert_eq!(parsed.txn_id, 42);
846 assert_eq!(parsed.payload_len, payload.len() as u32);
847 assert!(parsed.verify_checksum(payload));
848 }
849
850 #[test]
851 fn test_journal_op_roundtrip() {
852 let ops = vec![
853 JournalOp::WriteChunk {
854 chunk_id: 123,
855 data_hash: [0xAB; 32],
856 data_len: 4096,
857 },
858 JournalOp::WriteFile {
859 path: "/etc/passwd".to_string(),
860 chunk_ids: vec![1, 2, 3],
861 size: 1024,
862 },
863 JournalOp::DeleteFile {
864 path: "/tmp/test".to_string(),
865 },
866 JournalOp::Barrier,
867 ];
868
869 for op in ops {
870 let bytes = op.to_bytes();
871 let (parsed, _) = JournalOp::from_bytes(&bytes).unwrap();
872
873 match (&op, &parsed) {
874 (
875 JournalOp::WriteChunk {
876 chunk_id: a,
877 data_len: b,
878 ..
879 },
880 JournalOp::WriteChunk {
881 chunk_id: c,
882 data_len: d,
883 ..
884 },
885 ) => {
886 assert_eq!(a, c);
887 assert_eq!(b, d);
888 }
889 (
890 JournalOp::WriteFile {
891 path: a,
892 chunk_ids: b,
893 ..
894 },
895 JournalOp::WriteFile {
896 path: c,
897 chunk_ids: d,
898 ..
899 },
900 ) => {
901 assert_eq!(a, c);
902 assert_eq!(b, d);
903 }
904 (JournalOp::DeleteFile { path: a }, JournalOp::DeleteFile { path: b }) => {
905 assert_eq!(a, b);
906 }
907 (JournalOp::Barrier, JournalOp::Barrier) => {}
908 _ => panic!("mismatched op types"),
909 }
910 }
911 }
912
913 #[test]
914 fn test_journal_write_replay() {
915 let dir = tempdir().unwrap();
916 let journal_path = dir.path().join("test.journal");
917
918 {
920 let journal = Journal::open(&journal_path, DurabilityMode::Immediate).unwrap();
921
922 journal
923 .write_transaction(&[JournalOp::WriteChunk {
924 chunk_id: 1,
925 data_hash: [0x11; 32],
926 data_len: 100,
927 }])
928 .unwrap();
929
930 journal
931 .write_transaction(&[
932 JournalOp::WriteFile {
933 path: "/test".to_string(),
934 chunk_ids: vec![1],
935 size: 100,
936 },
937 JournalOp::Barrier,
938 ])
939 .unwrap();
940
941 journal.close().unwrap();
942 }
943
944 {
946 let journal = Journal::open(&journal_path, DurabilityMode::Immediate).unwrap();
947 let txns = journal.replay().unwrap();
948
949 assert_eq!(txns.len(), 2);
950 assert_eq!(txns[0].0, 1); assert_eq!(txns[1].0, 2);
952
953 assert_eq!(txns[0].1.len(), 1);
955 assert_eq!(txns[1].1.len(), 2);
957 }
958 }
959
960 #[test]
961 fn test_should_use_wal() {
962 assert!(!Journal::should_use_wal(1024, 1));
964
965 assert!(Journal::should_use_wal(100 * 1024, 1));
967
968 assert!(Journal::should_use_wal(1024, 5));
970 }
971}