1use crate::{
50 bpool,
51 error::{FrozenErr, FrozenRes},
52 ffile, hints, mpscq,
53};
54use std::{
55 sync::{self, atomic},
56 thread, time,
57};
58
59static mut MODULE_ID: u8 = 0;
61
62const ERRDOMAIN: u8 = 0x13;
64
65#[repr(u16)]
67pub enum FPErr {
68 Hcf = 0x400,
70
71 Txe = 0x401,
73
74 Lpn = 0x402,
76}
77
78impl FPErr {
79 #[inline]
80 fn default_message(&self) -> &'static [u8] {
81 match self {
82 Self::Lpn => b"lock poisoned",
83 Self::Hcf => b"hault and catch fire",
84 Self::Txe => b"thread failed or paniced",
85 }
86 }
87}
88
89#[inline]
90fn new_err<R>(res: FPErr, message: Vec<u8>) -> FrozenRes<R> {
91 let detail = res.default_message();
92 let err = FrozenErr::new(unsafe { MODULE_ID }, ERRDOMAIN, res as u16, detail, message);
93 Err(err)
94}
95
96#[inline]
97fn new_err_raw<E: std::fmt::Display>(res: FPErr, error: E) -> FrozenErr {
98 let detail = res.default_message();
99 FrozenErr::new(
100 unsafe { MODULE_ID },
101 ERRDOMAIN,
102 res as u16,
103 detail,
104 error.to_string().as_bytes().to_vec(),
105 )
106}
107
108#[derive(Debug)]
146pub struct FrozenPipe {
147 core: sync::Arc<Core>,
148 tx: Option<thread::JoinHandle<()>>,
149}
150
151impl FrozenPipe {
152 pub fn new(file: ffile::FrozenFile, pool: bpool::BufPool, flush_duration: time::Duration) -> FrozenRes<Self> {
154 let core = Core::new(file, pool, flush_duration)?;
155 let tx = Core::spawn_tx(core.clone())?;
156
157 Ok(Self { core, tx: Some(tx) })
158 }
159
160 #[inline(always)]
204 pub fn write(&self, buf: &[u8], index: usize) -> FrozenRes<u64> {
205 let chunk_size = self.core.chunk_size;
206 let chunks = buf.len().div_ceil(chunk_size);
207
208 let alloc = self.core.pool.allocate(chunks)?;
209
210 let _lock = self.core.acquire_io_lock()?;
215
216 let mut src_off = 0usize;
217 for ptr in alloc.slots() {
218 if src_off >= buf.len() {
219 break;
220 }
221
222 let remaining = buf.len() - src_off;
223 let copy = remaining.min(chunk_size);
224
225 unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr().add(src_off), *ptr, copy) };
226 src_off += copy;
227 }
228
229 let req = WriteReq::new(index, chunks, alloc);
230 self.core.mpscq.push(req);
231
232 Ok(self.core.epoch.load(atomic::Ordering::Acquire) + 1)
233 }
234
235 #[inline(always)]
274 pub fn read_single(&self, index: usize) -> FrozenRes<Vec<u8>> {
275 let mut slice = vec![0u8; self.core.chunk_size];
276 self.core.file.pread(slice.as_mut_ptr(), index)?;
277
278 Ok(slice)
279 }
280
281 #[inline(always)]
319 pub fn read(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
320 match count {
321 2 => self.read_2x(index),
322 4 => self.read_4x(index),
323 _ => self.read_multi(index, count),
324 }
325 }
326
327 #[inline(always)]
328 fn read_2x(&self, index: usize) -> FrozenRes<Vec<u8>> {
329 let chunk = self.core.chunk_size;
330
331 let mut buf = vec![0u8; chunk * 2];
332 let base = buf.as_mut_ptr();
333
334 let ptrs = [base, unsafe { base.add(chunk) }];
335 self.core.file.preadv(&ptrs, index)?;
336
337 Ok(buf)
338 }
339
340 #[inline(always)]
341 fn read_4x(&self, index: usize) -> FrozenRes<Vec<u8>> {
342 let chunk = self.core.chunk_size;
343
344 let mut buf = vec![0u8; chunk * 4];
345 let base = buf.as_mut_ptr();
346
347 let ptrs = [
348 base,
349 unsafe { base.add(chunk) },
350 unsafe { base.add(chunk * 2) },
351 unsafe { base.add(chunk * 3) },
352 ];
353 self.core.file.preadv(&ptrs, index)?;
354
355 Ok(buf)
356 }
357
358 #[inline(always)]
359 fn read_multi(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
360 let chunk = self.core.chunk_size;
361
362 let mut buf = vec![0u8; chunk * count];
363 let base = buf.as_mut_ptr();
364
365 let mut ptrs = Vec::with_capacity(count);
366 for i in 0..count {
367 ptrs.push(unsafe { base.add(i * chunk) });
368 }
369
370 self.core.file.preadv(&ptrs, index)?;
371 Ok(buf)
372 }
373
374 pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()> {
410 self.internal_wait(epoch)
411 }
412
413 pub fn force_durability(&self, epoch: u64) -> FrozenRes<()> {
449 let guard = self.core.lock.lock().map_err(|e| new_err_raw(FPErr::Lpn, e))?;
450 self.core.cv.notify_one();
451 drop(guard);
452
453 self.internal_wait(epoch)
454 }
455
456 pub fn grow(&self, count: usize) -> FrozenRes<()> {
488 loop {
489 let epoch = self.core.epoch.load(atomic::Ordering::Acquire);
491 self.force_durability(epoch)?;
492
493 let lock = self.core.acquire_exclusive_io_lock()?;
495
496 if self.core.mpscq.is_empty() {
500 self.core.file.grow(count)?;
501 drop(lock);
502 return Ok(());
503 }
504
505 drop(lock);
506 }
507 }
508
509 fn internal_wait(&self, epoch: u64) -> FrozenRes<()> {
510 if hints::unlikely(self.core.epoch.load(atomic::Ordering::Acquire) >= epoch) {
511 return Ok(());
512 }
513
514 if let Some(sync_err) = self.core.get_sync_error() {
515 return Err(sync_err);
516 }
517
518 let mut guard = match self.core.durable_lock.lock() {
519 Ok(g) => g,
520 Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
521 };
522
523 loop {
524 if let Some(sync_err) = self.core.get_sync_error() {
525 return Err(sync_err);
526 }
527
528 if self.core.epoch.load(atomic::Ordering::Acquire) >= epoch {
529 return Ok(());
530 }
531
532 guard = match self.core.durable_cv.wait(guard) {
533 Ok(g) => g,
534 Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
535 };
536 }
537 }
538}
539
540impl Drop for FrozenPipe {
541 fn drop(&mut self) {
542 self.core.closed.store(true, atomic::Ordering::Release);
543 self.core.cv.notify_one(); if let Some(handle) = self.tx.take() {
546 let _ = handle.join();
547 }
548
549 let _io_lock = self.core.acquire_exclusive_io_lock();
552
553 let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
555 if !ptr.is_null() {
556 unsafe {
557 drop(Box::from_raw(ptr));
558 }
559 }
560 }
561}
562
563#[derive(Debug)]
564struct Core {
565 chunk_size: usize,
566 cv: sync::Condvar,
567 pool: bpool::BufPool,
568 lock: sync::Mutex<()>,
569 file: ffile::FrozenFile,
570 epoch: atomic::AtomicU64,
571 io_lock: sync::RwLock<()>,
572 durable_cv: sync::Condvar,
573 closed: atomic::AtomicBool,
574 durable_lock: sync::Mutex<()>,
575 flush_duration: time::Duration,
576 mpscq: mpscq::MPSCQueue<WriteReq>,
577 error: atomic::AtomicPtr<FrozenErr>,
578}
579
580impl Core {
581 fn new(
582 file: ffile::FrozenFile,
583 pool: bpool::BufPool,
584 flush_duration: time::Duration,
585 ) -> FrozenRes<sync::Arc<Self>> {
586 let cfg = file.cfg();
587 let chunk_size = cfg.chunk_size;
588
589 unsafe { MODULE_ID = cfg.mid };
592
593 Ok(sync::Arc::new(Self {
594 file,
595 pool,
596 chunk_size,
597 flush_duration,
598 cv: sync::Condvar::new(),
599 lock: sync::Mutex::new(()),
600 io_lock: sync::RwLock::new(()),
601 epoch: atomic::AtomicU64::new(0),
602 durable_cv: sync::Condvar::new(),
603 mpscq: mpscq::MPSCQueue::default(),
604 durable_lock: sync::Mutex::new(()),
605 closed: atomic::AtomicBool::new(false),
606 error: atomic::AtomicPtr::new(std::ptr::null_mut()),
607 }))
608 }
609
610 #[inline]
611 fn acquire_io_lock(&self) -> FrozenRes<sync::RwLockReadGuard<'_, ()>> {
612 self.io_lock.read().map_err(|e| new_err_raw(FPErr::Lpn, e))
613 }
614
615 #[inline]
616 fn acquire_exclusive_io_lock(&self) -> FrozenRes<sync::RwLockWriteGuard<'_, ()>> {
617 self.io_lock.write().map_err(|e| new_err_raw(FPErr::Lpn, e))
618 }
619
620 #[inline]
621 fn get_sync_error(&self) -> Option<FrozenErr> {
622 let ptr = self.error.load(atomic::Ordering::Acquire);
623 if hints::likely(ptr.is_null()) {
624 return None;
625 }
626
627 Some(unsafe { (*ptr).clone() })
628 }
629
630 #[inline]
631 fn set_sync_error(&self, err: FrozenErr) {
632 let boxed = Box::into_raw(Box::new(err));
633 let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
634
635 if !old.is_null() {
637 unsafe {
638 drop(Box::from_raw(old));
639 }
640 }
641 }
642
643 #[inline]
644 fn clear_sync_error(&self) {
645 let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
646 if hints::unlikely(!old.is_null()) {
647 unsafe {
648 drop(Box::from_raw(old));
649 }
650 }
651 }
652
653 #[inline]
654 fn incr_epoch(&self) {
655 self.epoch.fetch_add(1, atomic::Ordering::Release);
656 }
657
658 fn write_batch(&self, batch: Vec<WriteReq>) -> FrozenRes<(usize, usize)> {
659 let mut max_index = 0usize;
660 let mut min_index = usize::MAX;
661
662 for req in &batch {
663 let slots = req.alloc.slots();
664 match req.chunks {
665 1 => {
666 self.file.pwrite(slots[0], req.index)?;
667 }
668 _ => {
669 self.file.pwritev(slots, req.index)?;
670 }
671 }
672
673 min_index = min_index.min(req.index);
674 max_index = max_index.max(req.index + req.chunks);
675 }
676
677 Ok((min_index, max_index))
678 }
679
680 fn spawn_tx(core: sync::Arc<Self>) -> FrozenRes<thread::JoinHandle<()>> {
681 match thread::Builder::new()
682 .name("fpipe-flush-tx".into())
683 .spawn(move || Self::flush_tx(core))
684 {
685 Ok(tx) => Ok(tx),
686 Err(error) => {
687 let mut error = error.to_string().as_bytes().to_vec();
688 error.extend_from_slice(b"Failed to spawn flush thread for FrozenPipe");
689 new_err(FPErr::Hcf, error)
690 }
691 }
692 }
693
694 fn flush_tx(core: sync::Arc<Self>) {
695 let mut guard = match core.lock.lock() {
697 Ok(g) => g,
698 Err(error) => {
699 let mut message = error.to_string().as_bytes().to_vec();
700 message.extend_from_slice(b"Flush thread died before init could be completed for FrozenPipe");
701 let error = FrozenErr::new(
702 unsafe { MODULE_ID },
703 ERRDOMAIN,
704 FPErr::Lpn as u16,
705 FPErr::Lpn.default_message(),
706 message,
707 );
708 core.set_sync_error(error);
709 return;
710 }
711 };
712
713 loop {
715 guard = match core.cv.wait_timeout(guard, core.flush_duration) {
716 Ok((g, _)) => g,
717 Err(e) => {
718 core.set_sync_error(new_err_raw(FPErr::Txe, e));
719 return;
720 }
721 };
722
723 drop(guard);
727
728 let req_batch = core.mpscq.drain();
732 let closing = core.closed.load(atomic::Ordering::Acquire);
733
734 if req_batch.is_empty() {
735 if closing {
736 return;
737 }
738
739 guard = match core.lock.lock() {
740 Ok(g) => g,
741 Err(e) => {
742 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
743 return;
744 }
745 };
746
747 continue;
748 }
749
750 let io_lock = match core.acquire_exclusive_io_lock() {
754 Ok(lock) => lock,
755 Err(e) => {
756 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
757 return;
758 }
759 };
760
761 let (_min, _max) = match core.write_batch(req_batch) {
762 Ok(res) => res,
763 Err(err) => {
764 core.set_sync_error(err);
765 drop(io_lock);
766
767 guard = match core.lock.lock() {
768 Ok(g) => g,
769 Err(e) => {
770 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
771 return;
772 }
773 };
774
775 continue;
776 }
777 };
778
779 match core.file.sync() {
788 Err(err) => core.set_sync_error(err),
789 Ok(()) => {
790 core.incr_epoch();
791 let _g = match core.durable_lock.lock() {
792 Ok(g) => g,
793 Err(e) => {
794 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
795 return;
796 }
797 };
798
799 core.durable_cv.notify_all();
800 core.clear_sync_error();
801 }
802 }
803
804 drop(io_lock);
805 guard = match core.lock.lock() {
806 Ok(g) => g,
807 Err(e) => {
808 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
809 return;
810 }
811 };
812 }
813 }
814}
815
816unsafe impl Send for Core {}
817unsafe impl Sync for Core {}
818
819#[derive(Debug)]
820struct WriteReq {
821 index: usize,
822 chunks: usize,
823 alloc: bpool::Allocation,
824}
825
826impl WriteReq {
827 fn new(index: usize, chunks: usize, alloc: bpool::Allocation) -> Self {
828 Self { alloc, index, chunks }
829 }
830}
831
832#[cfg(test)]
833mod tests {
834 use super::*;
835 use crate::{
836 bpool::{BPBackend, BPCfg, BufPool},
837 error::TEST_MID,
838 ffile::{FFCfg, FrozenFile},
839 };
840 use std::sync::{Arc, Barrier};
841 use std::thread;
842 use std::time::{Duration, Instant};
843
844 const CHUNK: usize = 0x20;
845 const INIT: usize = 0x20;
846 const FLUSH: Duration = Duration::from_micros(10);
847
848 fn new_env() -> (tempfile::TempDir, FrozenPipe) {
849 let dir = tempfile::tempdir().unwrap();
850 let path = dir.path().join("tmp_pipe");
851
852 let file = FrozenFile::new(FFCfg {
853 mid: TEST_MID,
854 path,
855 chunk_size: CHUNK,
856 initial_chunk_amount: INIT,
857 })
858 .unwrap();
859 let pool = BufPool::new(BPCfg {
860 mid: TEST_MID,
861 chunk_size: CHUNK,
862 backend: BPBackend::Prealloc { capacity: 0x100 },
863 });
864
865 let pipe = FrozenPipe::new(file, pool, FLUSH).unwrap();
866
867 (dir, pipe)
868 }
869
870 mod lifecycle {
871 use super::*;
872
873 #[test]
874 fn ok_new() {
875 let (_dir, pipe) = new_env();
876 assert_eq!(pipe.core.epoch.load(atomic::Ordering::Acquire), 0);
877 }
878
879 #[test]
880 fn ok_drop() {
881 let (_dir, pipe) = new_env();
882 drop(pipe);
883 }
884 }
885
886 mod fp_write {
887 use super::*;
888
889 #[test]
890 fn ok_write_and_wait() {
891 let (_dir, pipe) = new_env();
892
893 let buf = vec![0xAB; CHUNK];
894 let epoch = pipe.write(&buf, 0).unwrap();
895 pipe.wait_for_durability(epoch).unwrap();
896 }
897
898 #[test]
899 fn ok_write_multiple_chunks() {
900 let (_dir, pipe) = new_env();
901
902 let buf = vec![0xAA; CHUNK * 4];
903 let epoch = pipe.write(&buf, 0).unwrap();
904 pipe.wait_for_durability(epoch).unwrap();
905 }
906
907 #[test]
908 fn ok_force_durability() {
909 let (_dir, pipe) = new_env();
910
911 let buf = vec![1u8; CHUNK];
912 let epoch = pipe.write(&buf, 0).unwrap();
913 pipe.force_durability(epoch).unwrap();
914 }
915
916 #[test]
917 fn ok_write_epoch_monotonic() {
918 let (_dir, pipe) = new_env();
919 let buf = vec![1u8; CHUNK];
920
921 let e1 = pipe.write(&buf, 0).unwrap();
922 pipe.wait_for_durability(e1).unwrap();
923
924 let e2 = pipe.write(&buf, 1).unwrap();
925 pipe.wait_for_durability(e2).unwrap();
926
927 assert!(e2 >= e1);
928 }
929
930 #[test]
931 fn ok_write_large() {
932 let (_dir, pipe) = new_env();
933 let buf = vec![0xAB; CHUNK * 0x80];
934
935 let epoch = pipe.write(&buf, 0).unwrap();
936 pipe.wait_for_durability(epoch).unwrap();
937 }
938
939 #[test]
940 fn ok_write_large_batch() {
941 let (_dir, pipe) = new_env();
942
943 for i in 0..0x100 {
944 let buf = vec![i as u8; CHUNK];
945 let epoch = pipe.write(&buf, i).unwrap();
946 pipe.wait_for_durability(epoch).unwrap();
947 }
948 }
949
950 #[test]
951 fn ok_write_is_blocked_at_pool_exhaustion_for_prealloc_backend() {
952 let dir = tempfile::tempdir().unwrap();
953 let path = dir.path().join("tmp_pipe");
954
955 let file = FrozenFile::new(FFCfg {
956 mid: TEST_MID,
957 path,
958 chunk_size: CHUNK,
959 initial_chunk_amount: INIT,
960 })
961 .unwrap();
962
963 let pool = BufPool::new(BPCfg {
964 mid: TEST_MID,
965 chunk_size: CHUNK,
966 backend: BPBackend::Prealloc { capacity: 1 },
967 });
968
969 let pipe = Arc::new(FrozenPipe::new(file, pool, FLUSH).unwrap());
970
971 let p2 = pipe.clone();
972 let t = thread::spawn(move || {
973 let buf = vec![1u8; CHUNK];
974 let epoch = p2.write(&buf, 0).unwrap();
975 p2.wait_for_durability(epoch).unwrap();
976 });
977
978 thread::sleep(Duration::from_millis(0x0A));
979
980 let buf = vec![2u8; CHUNK];
981 let epoch = pipe.write(&buf, 1).unwrap();
982 pipe.wait_for_durability(epoch).unwrap();
983
984 t.join().unwrap();
985 }
986 }
987
988 mod fp_read {
989 use super::*;
990
991 #[test]
992 fn ok_read_single_after_write() {
993 let (_dir, pipe) = new_env();
994
995 let buf = vec![0xAB; CHUNK];
996 let epoch = pipe.write(&buf, 0).unwrap();
997 pipe.wait_for_durability(epoch).unwrap();
998
999 let read = pipe.read_single(0).unwrap();
1000 assert_eq!(read, buf);
1001 }
1002
1003 #[test]
1004 fn ok_read_2x() {
1005 let (_dir, pipe) = new_env();
1006
1007 let buf = vec![0xAA; CHUNK * 2];
1008 let epoch = pipe.write(&buf, 0).unwrap();
1009 pipe.wait_for_durability(epoch).unwrap();
1010
1011 let read = pipe.read(0, 2).unwrap();
1012 assert_eq!(read, buf);
1013 }
1014
1015 #[test]
1016 fn ok_read_4x() {
1017 let (_dir, pipe) = new_env();
1018
1019 let buf = vec![0xBB; CHUNK * 4];
1020 let epoch = pipe.write(&buf, 0).unwrap();
1021 pipe.wait_for_durability(epoch).unwrap();
1022
1023 let read = pipe.read(0, 4).unwrap();
1024 assert_eq!(read, buf);
1025 }
1026
1027 #[test]
1028 fn ok_read_multi_generic() {
1029 let (_dir, pipe) = new_env();
1030
1031 let buf = vec![0xCC; CHUNK * 6];
1032 let epoch = pipe.write(&buf, 0).unwrap();
1033 pipe.wait_for_durability(epoch).unwrap();
1034
1035 let read = pipe.read(0, 6).unwrap();
1036 assert_eq!(read, buf);
1037 }
1038
1039 #[test]
1040 fn ok_read_multiple_indices() {
1041 let (_dir, pipe) = new_env();
1042
1043 for i in 0..8 {
1044 let buf = vec![i as u8; CHUNK];
1045 let epoch = pipe.write(&buf, i).unwrap();
1046 pipe.wait_for_durability(epoch).unwrap();
1047 }
1048
1049 for i in 0..8 {
1050 let read = pipe.read_single(i).unwrap();
1051 assert_eq!(read, vec![i as u8; CHUNK]);
1052 }
1053 }
1054
1055 #[test]
1056 fn ok_overwrite_same_index() {
1057 let (_dir, pipe) = new_env();
1058
1059 let buf1 = vec![0xAA; CHUNK];
1060 let e1 = pipe.write(&buf1, 0).unwrap();
1061 pipe.wait_for_durability(e1).unwrap();
1062
1063 let buf2 = vec![0xBB; CHUNK];
1064 let e2 = pipe.write(&buf2, 0).unwrap();
1065 pipe.wait_for_durability(e2).unwrap();
1066
1067 let read = pipe.read_single(0).unwrap();
1068 assert_eq!(read, buf2);
1069 }
1070
1071 #[test]
1072 fn ok_large_read_multi() {
1073 let (_dir, pipe) = new_env();
1074
1075 let buf = vec![0x7A; CHUNK * 0x10];
1076 let epoch = pipe.write(&buf, 0).unwrap();
1077 pipe.wait_for_durability(epoch).unwrap();
1078
1079 let read = pipe.read(0, 0x10).unwrap();
1080 assert_eq!(read, buf);
1081 }
1082
1083 #[test]
1084 fn ok_read_concurrent() {
1085 const THREADS: usize = 8;
1086
1087 let (_dir, pipe) = new_env();
1088 let pipe = Arc::new(pipe);
1089
1090 for i in 0..THREADS {
1091 let buf = vec![i as u8; CHUNK];
1092 let epoch = pipe.write(&buf, i).unwrap();
1093 pipe.wait_for_durability(epoch).unwrap();
1094 }
1095
1096 let mut handles = Vec::new();
1097
1098 for i in 0..THREADS {
1099 let pipe = pipe.clone();
1100
1101 handles.push(thread::spawn(move || {
1102 let read = pipe.read_single(i).unwrap();
1103 assert_eq!(read, vec![i as u8; CHUNK]);
1104 }));
1105 }
1106
1107 for h in handles {
1108 h.join().unwrap();
1109 }
1110 }
1111
1112 #[test]
1113 fn ok_concurrent_read_write() {
1114 let (_dir, pipe) = new_env();
1115 let pipe = Arc::new(pipe);
1116
1117 let writer = {
1118 let pipe = pipe.clone();
1119 thread::spawn(move || {
1120 for i in 0..0x40 {
1121 let buf = vec![i as u8; CHUNK];
1122 let epoch = pipe.write(&buf, i).unwrap();
1123 pipe.wait_for_durability(epoch).unwrap();
1124 }
1125 })
1126 };
1127
1128 let reader = {
1129 let pipe = pipe.clone();
1130 thread::spawn(move || {
1131 for _ in 0..0x40 {
1132 let _ = pipe.read_single(0);
1133 }
1134 })
1135 };
1136
1137 writer.join().unwrap();
1138 reader.join().unwrap();
1139 }
1140
1141 #[test]
1142 fn ok_read_after_grow() {
1143 let (_dir, pipe) = new_env();
1144
1145 pipe.grow(8).unwrap();
1146
1147 let buf = vec![0x5A; CHUNK];
1148 let epoch = pipe.write(&buf, INIT).unwrap();
1149 pipe.wait_for_durability(epoch).unwrap();
1150
1151 let read = pipe.read_single(INIT).unwrap();
1152 assert_eq!(read, buf);
1153 }
1154 }
1155
1156 mod batching {
1157 use super::*;
1158
1159 #[test]
1160 fn ok_multiple_writes_single_batch() {
1161 let (_dir, pipe) = new_env();
1162
1163 let mut epochs = Vec::new();
1164 for i in 0..0x10 {
1165 let buf = vec![i as u8; CHUNK];
1166 epochs.push(pipe.write(&buf, i).unwrap());
1167 }
1168
1169 for e in epochs {
1170 pipe.wait_for_durability(e).unwrap();
1171 }
1172
1173 assert!(pipe.core.epoch.load(atomic::Ordering::Acquire) > 0);
1174 }
1175 }
1176
1177 mod fp_grow {
1178 use super::*;
1179
1180 #[test]
1181 fn ok_grow_file() {
1182 let (_dir, pipe) = new_env();
1183 let curr_len = pipe.core.file.length().unwrap();
1184
1185 pipe.grow(0x10).unwrap();
1186 let new_len = pipe.core.file.length().unwrap();
1187
1188 assert_eq!(new_len, curr_len + (0x10 * pipe.core.chunk_size));
1189 }
1190
1191 #[test]
1192 fn ok_write_after_grow() {
1193 let (_dir, pipe) = new_env();
1194 pipe.grow(0x10).unwrap();
1195
1196 let buf = vec![0xBB; CHUNK];
1197 let epoch = pipe.write(&buf, INIT).unwrap();
1198 pipe.wait_for_durability(epoch).unwrap();
1199 }
1200
1201 #[test]
1202 fn ok_grow_while_writing() {
1203 let (_dir, pipe) = new_env();
1204 let pipe = Arc::new(pipe);
1205 let curr_len = pipe.core.file.length().unwrap();
1206
1207 let p2 = pipe.clone();
1208 let writer = thread::spawn(move || {
1209 for i in 0..INIT {
1210 let buf = vec![1u8; CHUNK];
1211 let epoch = p2.write(&buf, i).unwrap();
1212 p2.wait_for_durability(epoch).unwrap();
1213 }
1214 });
1215
1216 thread::sleep(Duration::from_millis(10));
1217
1218 pipe.grow(0x3A).unwrap();
1219 writer.join().unwrap();
1220
1221 let new_len = pipe.core.file.length().unwrap();
1222 assert_eq!(new_len, curr_len + (0x3A * pipe.core.chunk_size));
1223 }
1224 }
1225
1226 mod concurrency {
1227 use super::*;
1228
1229 #[test]
1230 fn ok_multi_writer() {
1231 const THREADS: usize = 8;
1232 const ITERS: usize = 0x100;
1233
1234 let (_dir, pipe) = new_env();
1235 let pipe = Arc::new(pipe);
1236
1237 let mut handles = Vec::new();
1238 for t in 0..THREADS {
1239 let pipe = pipe.clone();
1240
1241 handles.push(thread::spawn(move || {
1242 for i in 0..ITERS {
1243 let buf = vec![t as u8; CHUNK];
1244 let epoch = pipe.write(&buf, i).unwrap();
1245 pipe.wait_for_durability(epoch).unwrap();
1246 }
1247 }));
1248 }
1249
1250 for h in handles {
1251 h.join().unwrap();
1252 }
1253 }
1254
1255 #[test]
1256 fn ok_barrier_start_parallel_writes() {
1257 const THREADS: usize = 8;
1258
1259 let (_dir, pipe) = new_env();
1260 let pipe = Arc::new(pipe);
1261 let barrier = Arc::new(Barrier::new(THREADS));
1262
1263 let mut handles = Vec::new();
1264
1265 for i in 0..THREADS {
1266 let pipe = pipe.clone();
1267 let barrier = barrier.clone();
1268
1269 handles.push(thread::spawn(move || {
1270 barrier.wait();
1271
1272 let buf = vec![i as u8; CHUNK];
1273 let epoch = pipe.write(&buf, i).unwrap();
1274 pipe.wait_for_durability(epoch).unwrap();
1275 }));
1276 }
1277
1278 for h in handles {
1279 h.join().unwrap();
1280 }
1281 }
1282 }
1283
1284 mod durability_wait {
1285 use super::*;
1286
1287 #[test]
1288 fn ok_wait_blocks_until_flush() {
1289 let (_dir, pipe) = new_env();
1290
1291 let buf = vec![0x55; CHUNK];
1292 let epoch = pipe.write(&buf, 0).unwrap();
1293
1294 let start = Instant::now();
1295 pipe.wait_for_durability(epoch).unwrap();
1296
1297 assert!(start.elapsed() >= Duration::from_micros(1));
1298 }
1299
1300 #[test]
1301 fn ok_force_durability_concurrent() {
1302 let (_dir, pipe) = new_env();
1303 let pipe = Arc::new(pipe);
1304
1305 let mut handles = Vec::new();
1306 for i in 0..0x0A {
1307 let pipe = pipe.clone();
1308
1309 handles.push(thread::spawn(move || {
1310 let buf = vec![i as u8; CHUNK];
1311 let epoch = pipe.write(&buf, i).unwrap();
1312 pipe.force_durability(epoch).unwrap();
1313 }));
1314 }
1315
1316 for h in handles {
1317 h.join().unwrap();
1318 }
1319 }
1320 }
1321
1322 mod shutdown {
1323 use super::*;
1324
1325 #[test]
1326 fn ok_drop_with_pending_writes() {
1327 let (_dir, pipe) = new_env();
1328
1329 let buf = vec![0xAA; CHUNK];
1330 pipe.write(&buf, 0).unwrap();
1331 drop(pipe);
1332 }
1333
1334 #[test]
1335 fn ok_drop_during_activity() {
1336 let (_dir, pipe) = new_env();
1337 let pipe = Arc::new(pipe);
1338
1339 let p2 = pipe.clone();
1340
1341 let handle = thread::spawn(move || {
1342 let buf = vec![1u8; CHUNK];
1343 let epoch = p2.write(&buf, 0).unwrap();
1344 p2.wait_for_durability(epoch).unwrap();
1345 });
1346
1347 thread::sleep(Duration::from_millis(10));
1348 drop(pipe);
1349
1350 handle.join().unwrap();
1351 }
1352
1353 #[test]
1354 fn ok_drop_while_writer_waiting() {
1355 let (_dir, pipe) = new_env();
1356 let pipe = Arc::new(pipe);
1357
1358 let p2 = pipe.clone();
1359 let handle = thread::spawn(move || {
1360 for i in 0..0x80 {
1361 let buf = vec![1u8; CHUNK];
1362 let epoch = p2.write(&buf, i).unwrap();
1363 p2.wait_for_durability(epoch).unwrap();
1364 }
1365 });
1366
1367 thread::sleep(Duration::from_millis(0x0A));
1368 drop(pipe);
1369
1370 handle.join().unwrap();
1371 }
1372 }
1373}