1use crate::{
50 bpool,
51 error::{FrozenErr, FrozenRes},
52 ffile, hints, mpscq,
53};
54use std::{
55 sync::{self, atomic},
56 thread, time,
57};
58
59static mut MODULE_ID: u8 = 0;
61
62const ERRDOMAIN: u8 = 0x13;
64
65#[repr(u16)]
67pub enum FPErr {
68 Hcf = 0x400,
70
71 Txe = 0x401,
73
74 Lpn = 0x402,
76}
77
78impl FPErr {
79 #[inline]
80 fn default_message(&self) -> &'static [u8] {
81 match self {
82 Self::Lpn => b"lock poisoned",
83 Self::Hcf => b"hault and catch fire",
84 Self::Txe => b"thread failed or paniced",
85 }
86 }
87}
88
89#[inline]
90fn new_err<R>(res: FPErr, message: Vec<u8>) -> FrozenRes<R> {
91 let detail = res.default_message();
92 let err = FrozenErr::new(unsafe { MODULE_ID }, ERRDOMAIN, res as u16, detail, message);
93 Err(err)
94}
95
96#[inline]
97fn new_err_raw<E: std::fmt::Display>(res: FPErr, error: E) -> FrozenErr {
98 let detail = res.default_message();
99 FrozenErr::new(
100 unsafe { MODULE_ID },
101 ERRDOMAIN,
102 res as u16,
103 detail,
104 error.to_string().as_bytes().to_vec(),
105 )
106}
107
108#[derive(Debug)]
146pub struct FrozenPipe {
147 core: sync::Arc<Core>,
148 tx: Option<thread::JoinHandle<()>>,
149}
150
151impl FrozenPipe {
152 pub fn new(file: ffile::FrozenFile, pool: bpool::BufPool, flush_duration: time::Duration) -> FrozenRes<Self> {
154 let core = Core::new(file, pool, flush_duration)?;
155 let tx = Core::spawn_tx(core.clone())?;
156
157 Ok(Self { core, tx: Some(tx) })
158 }
159
160 #[inline(always)]
204 pub fn write(&self, buf: &[u8], index: usize) -> FrozenRes<u64> {
205 let chunk_size = self.core.chunk_size;
206 let chunks = buf.len().div_ceil(chunk_size);
207
208 let alloc = self.core.pool.allocate(chunks)?;
209
210 let _lock = self.core.acquire_io_lock()?;
215
216 let mut src_off = 0usize;
217 for ptr in alloc.slots() {
218 if src_off >= buf.len() {
219 break;
220 }
221
222 let remaining = buf.len() - src_off;
223 let copy = remaining.min(chunk_size);
224
225 unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr().add(src_off), *ptr, copy) };
226 src_off += copy;
227 }
228
229 let req = WriteReq::new(index, chunks, alloc);
230 self.core.mpscq.push(req);
231
232 Ok(self.core.epoch.load(atomic::Ordering::Acquire) + 1)
233 }
234
235 #[inline(always)]
274 pub fn read_single(&self, index: usize) -> FrozenRes<Vec<u8>> {
275 let _lock = self.core.acquire_io_lock()?;
276
277 let mut slice = vec![0u8; self.core.chunk_size];
278 self.core.file.pread(slice.as_mut_ptr(), index)?;
279
280 drop(_lock);
281 Ok(slice)
282 }
283
284 #[inline(always)]
322 pub fn read(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
323 let _lock = self.core.acquire_io_lock()?;
324
325 match count {
326 2 => self.read_2x(index),
327 4 => self.read_4x(index),
328 _ => self.read_multi(index, count),
329 }
330 }
331
332 #[inline(always)]
333 fn read_2x(&self, index: usize) -> FrozenRes<Vec<u8>> {
334 let chunk = self.core.chunk_size;
335
336 let mut buf = vec![0u8; chunk * 2];
337 let base = buf.as_mut_ptr();
338
339 let ptrs = [base, unsafe { base.add(chunk) }];
340 self.core.file.preadv(&ptrs, index)?;
341
342 Ok(buf)
343 }
344
345 #[inline(always)]
346 fn read_4x(&self, index: usize) -> FrozenRes<Vec<u8>> {
347 let chunk = self.core.chunk_size;
348
349 let mut buf = vec![0u8; chunk * 4];
350 let base = buf.as_mut_ptr();
351
352 let ptrs = [
353 base,
354 unsafe { base.add(chunk) },
355 unsafe { base.add(chunk * 2) },
356 unsafe { base.add(chunk * 3) },
357 ];
358 self.core.file.preadv(&ptrs, index)?;
359
360 Ok(buf)
361 }
362
363 #[inline(always)]
364 fn read_multi(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
365 let chunk = self.core.chunk_size;
366
367 let mut buf = vec![0u8; chunk * count];
368 let base = buf.as_mut_ptr();
369
370 let mut ptrs = Vec::with_capacity(count);
371 for i in 0..count {
372 ptrs.push(unsafe { base.add(i * chunk) });
373 }
374
375 self.core.file.preadv(&ptrs, index)?;
376 Ok(buf)
377 }
378
379 pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()> {
415 self.internal_wait(epoch)
416 }
417
418 pub fn force_durability(&self, epoch: u64) -> FrozenRes<()> {
454 let guard = self.core.lock.lock().map_err(|e| new_err_raw(FPErr::Lpn, e))?;
455 self.core.cv.notify_one();
456 drop(guard);
457
458 self.internal_wait(epoch)
459 }
460
461 pub fn grow(&self, count: usize) -> FrozenRes<()> {
493 loop {
494 let epoch = self.core.epoch.load(atomic::Ordering::Acquire);
496 self.force_durability(epoch)?;
497
498 let lock = self.core.acquire_exclusive_io_lock()?;
500
501 if self.core.mpscq.is_empty() {
505 self.core.file.grow(count)?;
506 drop(lock);
507 return Ok(());
508 }
509
510 drop(lock);
511 }
512 }
513
514 fn internal_wait(&self, epoch: u64) -> FrozenRes<()> {
515 if hints::unlikely(self.core.epoch.load(atomic::Ordering::Acquire) >= epoch) {
516 return Ok(());
517 }
518
519 if let Some(sync_err) = self.core.get_sync_error() {
520 return Err(sync_err);
521 }
522
523 let mut guard = match self.core.durable_lock.lock() {
524 Ok(g) => g,
525 Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
526 };
527
528 loop {
529 if let Some(sync_err) = self.core.get_sync_error() {
530 return Err(sync_err);
531 }
532
533 if self.core.epoch.load(atomic::Ordering::Acquire) >= epoch {
534 return Ok(());
535 }
536
537 guard = match self.core.durable_cv.wait(guard) {
538 Ok(g) => g,
539 Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
540 };
541 }
542 }
543}
544
545impl Drop for FrozenPipe {
546 fn drop(&mut self) {
547 self.core.closed.store(true, atomic::Ordering::Release);
548 self.core.cv.notify_one(); if let Some(handle) = self.tx.take() {
551 let _ = handle.join();
552 }
553
554 let _io_lock = self.core.acquire_exclusive_io_lock();
557
558 let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
560 if !ptr.is_null() {
561 unsafe {
562 drop(Box::from_raw(ptr));
563 }
564 }
565 }
566}
567
568#[derive(Debug)]
569struct Core {
570 chunk_size: usize,
571 cv: sync::Condvar,
572 pool: bpool::BufPool,
573 lock: sync::Mutex<()>,
574 file: ffile::FrozenFile,
575 epoch: atomic::AtomicU64,
576 io_lock: sync::RwLock<()>,
577 durable_cv: sync::Condvar,
578 closed: atomic::AtomicBool,
579 durable_lock: sync::Mutex<()>,
580 flush_duration: time::Duration,
581 mpscq: mpscq::MPSCQueue<WriteReq>,
582 error: atomic::AtomicPtr<FrozenErr>,
583}
584
585impl Core {
586 fn new(
587 file: ffile::FrozenFile,
588 pool: bpool::BufPool,
589 flush_duration: time::Duration,
590 ) -> FrozenRes<sync::Arc<Self>> {
591 let cfg = file.cfg();
592 let chunk_size = cfg.chunk_size;
593
594 unsafe { MODULE_ID = cfg.mid };
597
598 Ok(sync::Arc::new(Self {
599 file,
600 pool,
601 chunk_size,
602 flush_duration,
603 cv: sync::Condvar::new(),
604 lock: sync::Mutex::new(()),
605 io_lock: sync::RwLock::new(()),
606 epoch: atomic::AtomicU64::new(0),
607 durable_cv: sync::Condvar::new(),
608 mpscq: mpscq::MPSCQueue::default(),
609 durable_lock: sync::Mutex::new(()),
610 closed: atomic::AtomicBool::new(false),
611 error: atomic::AtomicPtr::new(std::ptr::null_mut()),
612 }))
613 }
614
615 #[inline]
616 fn acquire_io_lock(&self) -> FrozenRes<sync::RwLockReadGuard<'_, ()>> {
617 self.io_lock.read().map_err(|e| new_err_raw(FPErr::Lpn, e))
618 }
619
620 #[inline]
621 fn acquire_exclusive_io_lock(&self) -> FrozenRes<sync::RwLockWriteGuard<'_, ()>> {
622 self.io_lock.write().map_err(|e| new_err_raw(FPErr::Lpn, e))
623 }
624
625 #[inline]
626 fn get_sync_error(&self) -> Option<FrozenErr> {
627 let ptr = self.error.load(atomic::Ordering::Acquire);
628 if hints::likely(ptr.is_null()) {
629 return None;
630 }
631
632 Some(unsafe { (*ptr).clone() })
633 }
634
635 #[inline]
636 fn set_sync_error(&self, err: FrozenErr) {
637 let boxed = Box::into_raw(Box::new(err));
638 let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
639
640 if !old.is_null() {
642 unsafe {
643 drop(Box::from_raw(old));
644 }
645 }
646 }
647
648 #[inline]
649 fn clear_sync_error(&self) {
650 let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
651 if hints::unlikely(!old.is_null()) {
652 unsafe {
653 drop(Box::from_raw(old));
654 }
655 }
656 }
657
658 #[inline]
659 fn incr_epoch(&self) {
660 self.epoch.fetch_add(1, atomic::Ordering::Release);
661 }
662
663 fn write_batch(&self, batch: Vec<WriteReq>) -> FrozenRes<(usize, usize)> {
664 let mut max_index = 0usize;
665 let mut min_index = usize::MAX;
666
667 for req in &batch {
668 let slots = req.alloc.slots();
669 match req.chunks {
670 1 => {
671 self.file.pwrite(slots[0], req.index)?;
672 }
673 _ => {
674 self.file.pwritev(slots, req.index)?;
675 }
676 }
677
678 min_index = min_index.min(req.index);
679 max_index = max_index.max(req.index + req.chunks);
680 }
681
682 Ok((min_index, max_index))
683 }
684
685 fn spawn_tx(core: sync::Arc<Self>) -> FrozenRes<thread::JoinHandle<()>> {
686 match thread::Builder::new()
687 .name("fpipe-flush-tx".into())
688 .spawn(move || Self::flush_tx(core))
689 {
690 Ok(tx) => Ok(tx),
691 Err(error) => {
692 let mut error = error.to_string().as_bytes().to_vec();
693 error.extend_from_slice(b"Failed to spawn flush thread for FrozenPipe");
694 new_err(FPErr::Hcf, error)
695 }
696 }
697 }
698
699 fn flush_tx(core: sync::Arc<Self>) {
700 let mut guard = match core.lock.lock() {
702 Ok(g) => g,
703 Err(error) => {
704 let mut message = error.to_string().as_bytes().to_vec();
705 message.extend_from_slice(b"Flush thread died before init could be completed for FrozenPipe");
706 let error = FrozenErr::new(
707 unsafe { MODULE_ID },
708 ERRDOMAIN,
709 FPErr::Lpn as u16,
710 FPErr::Lpn.default_message(),
711 message,
712 );
713 core.set_sync_error(error);
714 return;
715 }
716 };
717
718 loop {
720 guard = match core.cv.wait_timeout(guard, core.flush_duration) {
721 Ok((g, _)) => g,
722 Err(e) => {
723 core.set_sync_error(new_err_raw(FPErr::Txe, e));
724 return;
725 }
726 };
727
728 drop(guard);
732
733 let req_batch = core.mpscq.drain();
737 let closing = core.closed.load(atomic::Ordering::Acquire);
738
739 if req_batch.is_empty() {
740 if closing {
741 return;
742 }
743
744 guard = match core.lock.lock() {
745 Ok(g) => g,
746 Err(e) => {
747 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
748 return;
749 }
750 };
751
752 continue;
753 }
754
755 let io_lock = match core.acquire_exclusive_io_lock() {
759 Ok(lock) => lock,
760 Err(e) => {
761 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
762 return;
763 }
764 };
765
766 let (_min, _max) = match core.write_batch(req_batch) {
767 Ok(res) => res,
768 Err(err) => {
769 core.set_sync_error(err);
770 drop(io_lock);
771
772 guard = match core.lock.lock() {
773 Ok(g) => g,
774 Err(e) => {
775 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
776 return;
777 }
778 };
779
780 continue;
781 }
782 };
783
784 match core.file.sync() {
793 Err(err) => core.set_sync_error(err),
794 Ok(()) => {
795 core.incr_epoch();
796 let _g = match core.durable_lock.lock() {
797 Ok(g) => g,
798 Err(e) => {
799 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
800 return;
801 }
802 };
803
804 core.durable_cv.notify_all();
805 core.clear_sync_error();
806 }
807 }
808
809 drop(io_lock);
810 guard = match core.lock.lock() {
811 Ok(g) => g,
812 Err(e) => {
813 core.set_sync_error(new_err_raw(FPErr::Lpn, e));
814 return;
815 }
816 };
817 }
818 }
819}
820
821unsafe impl Send for Core {}
822unsafe impl Sync for Core {}
823
824#[derive(Debug)]
825struct WriteReq {
826 index: usize,
827 chunks: usize,
828 alloc: bpool::Allocation,
829}
830
831impl WriteReq {
832 fn new(index: usize, chunks: usize, alloc: bpool::Allocation) -> Self {
833 Self { alloc, index, chunks }
834 }
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840 use crate::{
841 bpool::{BPBackend, BPCfg, BufPool},
842 error::TEST_MID,
843 ffile::{FFCfg, FrozenFile},
844 };
845 use std::sync::{Arc, Barrier};
846 use std::thread;
847 use std::time::{Duration, Instant};
848
849 const CHUNK: usize = 0x20;
850 const INIT: usize = 0x20;
851 const FLUSH: Duration = Duration::from_micros(10);
852
853 fn new_env() -> (tempfile::TempDir, FrozenPipe) {
854 let dir = tempfile::tempdir().unwrap();
855 let path = dir.path().join("tmp_pipe");
856
857 let file = FrozenFile::new(FFCfg {
858 mid: TEST_MID,
859 path,
860 chunk_size: CHUNK,
861 initial_chunk_amount: INIT,
862 })
863 .unwrap();
864 let pool = BufPool::new(BPCfg {
865 mid: TEST_MID,
866 chunk_size: CHUNK,
867 backend: BPBackend::Prealloc { capacity: 0x100 },
868 });
869
870 let pipe = FrozenPipe::new(file, pool, FLUSH).unwrap();
871
872 (dir, pipe)
873 }
874
875 mod lifecycle {
876 use super::*;
877
878 #[test]
879 fn ok_new() {
880 let (_dir, pipe) = new_env();
881 assert_eq!(pipe.core.epoch.load(atomic::Ordering::Acquire), 0);
882 }
883
884 #[test]
885 fn ok_drop() {
886 let (_dir, pipe) = new_env();
887 drop(pipe);
888 }
889 }
890
891 mod fp_write {
892 use super::*;
893
894 #[test]
895 fn ok_write_and_wait() {
896 let (_dir, pipe) = new_env();
897
898 let buf = vec![0xAB; CHUNK];
899 let epoch = pipe.write(&buf, 0).unwrap();
900 pipe.wait_for_durability(epoch).unwrap();
901 }
902
903 #[test]
904 fn ok_write_multiple_chunks() {
905 let (_dir, pipe) = new_env();
906
907 let buf = vec![0xAA; CHUNK * 4];
908 let epoch = pipe.write(&buf, 0).unwrap();
909 pipe.wait_for_durability(epoch).unwrap();
910 }
911
912 #[test]
913 fn ok_force_durability() {
914 let (_dir, pipe) = new_env();
915
916 let buf = vec![1u8; CHUNK];
917 let epoch = pipe.write(&buf, 0).unwrap();
918 pipe.force_durability(epoch).unwrap();
919 }
920
921 #[test]
922 fn ok_write_epoch_monotonic() {
923 let (_dir, pipe) = new_env();
924 let buf = vec![1u8; CHUNK];
925
926 let e1 = pipe.write(&buf, 0).unwrap();
927 pipe.wait_for_durability(e1).unwrap();
928
929 let e2 = pipe.write(&buf, 1).unwrap();
930 pipe.wait_for_durability(e2).unwrap();
931
932 assert!(e2 >= e1);
933 }
934
935 #[test]
936 fn ok_write_large() {
937 let (_dir, pipe) = new_env();
938 let buf = vec![0xAB; CHUNK * 0x80];
939
940 let epoch = pipe.write(&buf, 0).unwrap();
941 pipe.wait_for_durability(epoch).unwrap();
942 }
943
944 #[test]
945 fn ok_write_large_batch() {
946 let (_dir, pipe) = new_env();
947
948 for i in 0..0x100 {
949 let buf = vec![i as u8; CHUNK];
950 let epoch = pipe.write(&buf, i).unwrap();
951 pipe.wait_for_durability(epoch).unwrap();
952 }
953 }
954
955 #[test]
956 fn ok_write_is_blocked_at_pool_exhaustion_for_prealloc_backend() {
957 let dir = tempfile::tempdir().unwrap();
958 let path = dir.path().join("tmp_pipe");
959
960 let file = FrozenFile::new(FFCfg {
961 mid: TEST_MID,
962 path,
963 chunk_size: CHUNK,
964 initial_chunk_amount: INIT,
965 })
966 .unwrap();
967
968 let pool = BufPool::new(BPCfg {
969 mid: TEST_MID,
970 chunk_size: CHUNK,
971 backend: BPBackend::Prealloc { capacity: 1 },
972 });
973
974 let pipe = Arc::new(FrozenPipe::new(file, pool, FLUSH).unwrap());
975
976 let p2 = pipe.clone();
977 let t = thread::spawn(move || {
978 let buf = vec![1u8; CHUNK];
979 let epoch = p2.write(&buf, 0).unwrap();
980 p2.wait_for_durability(epoch).unwrap();
981 });
982
983 thread::sleep(Duration::from_millis(0x0A));
984
985 let buf = vec![2u8; CHUNK];
986 let epoch = pipe.write(&buf, 1).unwrap();
987 pipe.wait_for_durability(epoch).unwrap();
988
989 t.join().unwrap();
990 }
991 }
992
993 mod fp_read {
994 use super::*;
995
996 #[test]
997 fn ok_read_single_after_write() {
998 let (_dir, pipe) = new_env();
999
1000 let buf = vec![0xAB; CHUNK];
1001 let epoch = pipe.write(&buf, 0).unwrap();
1002 pipe.wait_for_durability(epoch).unwrap();
1003
1004 let read = pipe.read_single(0).unwrap();
1005 assert_eq!(read, buf);
1006 }
1007
1008 #[test]
1009 fn ok_read_2x() {
1010 let (_dir, pipe) = new_env();
1011
1012 let buf = vec![0xAA; CHUNK * 2];
1013 let epoch = pipe.write(&buf, 0).unwrap();
1014 pipe.wait_for_durability(epoch).unwrap();
1015
1016 let read = pipe.read(0, 2).unwrap();
1017 assert_eq!(read, buf);
1018 }
1019
1020 #[test]
1021 fn ok_read_4x() {
1022 let (_dir, pipe) = new_env();
1023
1024 let buf = vec![0xBB; CHUNK * 4];
1025 let epoch = pipe.write(&buf, 0).unwrap();
1026 pipe.wait_for_durability(epoch).unwrap();
1027
1028 let read = pipe.read(0, 4).unwrap();
1029 assert_eq!(read, buf);
1030 }
1031
1032 #[test]
1033 fn ok_read_multi_generic() {
1034 let (_dir, pipe) = new_env();
1035
1036 let buf = vec![0xCC; CHUNK * 6];
1037 let epoch = pipe.write(&buf, 0).unwrap();
1038 pipe.wait_for_durability(epoch).unwrap();
1039
1040 let read = pipe.read(0, 6).unwrap();
1041 assert_eq!(read, buf);
1042 }
1043
1044 #[test]
1045 fn ok_read_multiple_indices() {
1046 let (_dir, pipe) = new_env();
1047
1048 for i in 0..8 {
1049 let buf = vec![i as u8; CHUNK];
1050 let epoch = pipe.write(&buf, i).unwrap();
1051 pipe.wait_for_durability(epoch).unwrap();
1052 }
1053
1054 for i in 0..8 {
1055 let read = pipe.read_single(i).unwrap();
1056 assert_eq!(read, vec![i as u8; CHUNK]);
1057 }
1058 }
1059
1060 #[test]
1061 fn ok_overwrite_same_index() {
1062 let (_dir, pipe) = new_env();
1063
1064 let buf1 = vec![0xAA; CHUNK];
1065 let e1 = pipe.write(&buf1, 0).unwrap();
1066 pipe.wait_for_durability(e1).unwrap();
1067
1068 let buf2 = vec![0xBB; CHUNK];
1069 let e2 = pipe.write(&buf2, 0).unwrap();
1070 pipe.wait_for_durability(e2).unwrap();
1071
1072 let read = pipe.read_single(0).unwrap();
1073 assert_eq!(read, buf2);
1074 }
1075
1076 #[test]
1077 fn ok_large_read_multi() {
1078 let (_dir, pipe) = new_env();
1079
1080 let buf = vec![0x7A; CHUNK * 0x10];
1081 let epoch = pipe.write(&buf, 0).unwrap();
1082 pipe.wait_for_durability(epoch).unwrap();
1083
1084 let read = pipe.read(0, 0x10).unwrap();
1085 assert_eq!(read, buf);
1086 }
1087
1088 #[test]
1089 fn ok_read_concurrent() {
1090 const THREADS: usize = 8;
1091
1092 let (_dir, pipe) = new_env();
1093 let pipe = Arc::new(pipe);
1094
1095 for i in 0..THREADS {
1096 let buf = vec![i as u8; CHUNK];
1097 let epoch = pipe.write(&buf, i).unwrap();
1098 pipe.wait_for_durability(epoch).unwrap();
1099 }
1100
1101 let mut handles = Vec::new();
1102
1103 for i in 0..THREADS {
1104 let pipe = pipe.clone();
1105
1106 handles.push(thread::spawn(move || {
1107 let read = pipe.read_single(i).unwrap();
1108 assert_eq!(read, vec![i as u8; CHUNK]);
1109 }));
1110 }
1111
1112 for h in handles {
1113 h.join().unwrap();
1114 }
1115 }
1116
1117 #[test]
1118 fn ok_concurrent_read_write() {
1119 let (_dir, pipe) = new_env();
1120 let pipe = Arc::new(pipe);
1121
1122 let writer = {
1123 let pipe = pipe.clone();
1124 thread::spawn(move || {
1125 for i in 0..0x40 {
1126 let buf = vec![i as u8; CHUNK];
1127 let epoch = pipe.write(&buf, i).unwrap();
1128 pipe.wait_for_durability(epoch).unwrap();
1129 }
1130 })
1131 };
1132
1133 let reader = {
1134 let pipe = pipe.clone();
1135 thread::spawn(move || {
1136 for _ in 0..0x40 {
1137 let _ = pipe.read_single(0);
1138 }
1139 })
1140 };
1141
1142 writer.join().unwrap();
1143 reader.join().unwrap();
1144 }
1145
1146 #[test]
1147 fn ok_read_after_grow() {
1148 let (_dir, pipe) = new_env();
1149
1150 pipe.grow(8).unwrap();
1151
1152 let buf = vec![0x5A; CHUNK];
1153 let epoch = pipe.write(&buf, INIT).unwrap();
1154 pipe.wait_for_durability(epoch).unwrap();
1155
1156 let read = pipe.read_single(INIT).unwrap();
1157 assert_eq!(read, buf);
1158 }
1159 }
1160
1161 mod batching {
1162 use super::*;
1163
1164 #[test]
1165 fn ok_multiple_writes_single_batch() {
1166 let (_dir, pipe) = new_env();
1167
1168 let mut epochs = Vec::new();
1169 for i in 0..0x10 {
1170 let buf = vec![i as u8; CHUNK];
1171 epochs.push(pipe.write(&buf, i).unwrap());
1172 }
1173
1174 for e in epochs {
1175 pipe.wait_for_durability(e).unwrap();
1176 }
1177
1178 assert!(pipe.core.epoch.load(atomic::Ordering::Acquire) > 0);
1179 }
1180 }
1181
1182 mod fp_grow {
1183 use super::*;
1184
1185 #[test]
1186 fn ok_grow_file() {
1187 let (_dir, pipe) = new_env();
1188 let curr_len = pipe.core.file.length().unwrap();
1189
1190 pipe.grow(0x10).unwrap();
1191 let new_len = pipe.core.file.length().unwrap();
1192
1193 assert_eq!(new_len, curr_len + (0x10 * pipe.core.chunk_size));
1194 }
1195
1196 #[test]
1197 fn ok_write_after_grow() {
1198 let (_dir, pipe) = new_env();
1199 pipe.grow(0x10).unwrap();
1200
1201 let buf = vec![0xBB; CHUNK];
1202 let epoch = pipe.write(&buf, INIT).unwrap();
1203 pipe.wait_for_durability(epoch).unwrap();
1204 }
1205
1206 #[test]
1207 fn ok_grow_while_writing() {
1208 let (_dir, pipe) = new_env();
1209 let pipe = Arc::new(pipe);
1210 let curr_len = pipe.core.file.length().unwrap();
1211
1212 let p2 = pipe.clone();
1213 let writer = thread::spawn(move || {
1214 for i in 0..INIT {
1215 let buf = vec![1u8; CHUNK];
1216 let epoch = p2.write(&buf, i).unwrap();
1217 p2.wait_for_durability(epoch).unwrap();
1218 }
1219 });
1220
1221 thread::sleep(Duration::from_millis(10));
1222
1223 pipe.grow(0x3A).unwrap();
1224 writer.join().unwrap();
1225
1226 let new_len = pipe.core.file.length().unwrap();
1227 assert_eq!(new_len, curr_len + (0x3A * pipe.core.chunk_size));
1228 }
1229 }
1230
1231 mod concurrency {
1232 use super::*;
1233
1234 #[test]
1235 fn ok_multi_writer() {
1236 const THREADS: usize = 8;
1237 const ITERS: usize = 0x100;
1238
1239 let (_dir, pipe) = new_env();
1240 let pipe = Arc::new(pipe);
1241
1242 let mut handles = Vec::new();
1243 for t in 0..THREADS {
1244 let pipe = pipe.clone();
1245
1246 handles.push(thread::spawn(move || {
1247 for i in 0..ITERS {
1248 let buf = vec![t as u8; CHUNK];
1249 let epoch = pipe.write(&buf, i).unwrap();
1250 pipe.wait_for_durability(epoch).unwrap();
1251 }
1252 }));
1253 }
1254
1255 for h in handles {
1256 h.join().unwrap();
1257 }
1258 }
1259
1260 #[test]
1261 fn ok_barrier_start_parallel_writes() {
1262 const THREADS: usize = 8;
1263
1264 let (_dir, pipe) = new_env();
1265 let pipe = Arc::new(pipe);
1266 let barrier = Arc::new(Barrier::new(THREADS));
1267
1268 let mut handles = Vec::new();
1269
1270 for i in 0..THREADS {
1271 let pipe = pipe.clone();
1272 let barrier = barrier.clone();
1273
1274 handles.push(thread::spawn(move || {
1275 barrier.wait();
1276
1277 let buf = vec![i as u8; CHUNK];
1278 let epoch = pipe.write(&buf, i).unwrap();
1279 pipe.wait_for_durability(epoch).unwrap();
1280 }));
1281 }
1282
1283 for h in handles {
1284 h.join().unwrap();
1285 }
1286 }
1287 }
1288
1289 mod durability_wait {
1290 use super::*;
1291
1292 #[test]
1293 fn ok_wait_blocks_until_flush() {
1294 let (_dir, pipe) = new_env();
1295
1296 let buf = vec![0x55; CHUNK];
1297 let epoch = pipe.write(&buf, 0).unwrap();
1298
1299 let start = Instant::now();
1300 pipe.wait_for_durability(epoch).unwrap();
1301
1302 assert!(start.elapsed() >= Duration::from_micros(1));
1303 }
1304
1305 #[test]
1306 fn ok_force_durability_concurrent() {
1307 let (_dir, pipe) = new_env();
1308 let pipe = Arc::new(pipe);
1309
1310 let mut handles = Vec::new();
1311 for i in 0..0x0A {
1312 let pipe = pipe.clone();
1313
1314 handles.push(thread::spawn(move || {
1315 let buf = vec![i as u8; CHUNK];
1316 let epoch = pipe.write(&buf, i).unwrap();
1317 pipe.force_durability(epoch).unwrap();
1318 }));
1319 }
1320
1321 for h in handles {
1322 h.join().unwrap();
1323 }
1324 }
1325 }
1326
1327 mod shutdown {
1328 use super::*;
1329
1330 #[test]
1331 fn ok_drop_with_pending_writes() {
1332 let (_dir, pipe) = new_env();
1333
1334 let buf = vec![0xAA; CHUNK];
1335 pipe.write(&buf, 0).unwrap();
1336 drop(pipe);
1337 }
1338
1339 #[test]
1340 fn ok_drop_during_activity() {
1341 let (_dir, pipe) = new_env();
1342 let pipe = Arc::new(pipe);
1343
1344 let p2 = pipe.clone();
1345
1346 let handle = thread::spawn(move || {
1347 let buf = vec![1u8; CHUNK];
1348 let epoch = p2.write(&buf, 0).unwrap();
1349 p2.wait_for_durability(epoch).unwrap();
1350 });
1351
1352 thread::sleep(Duration::from_millis(10));
1353 drop(pipe);
1354
1355 handle.join().unwrap();
1356 }
1357
1358 #[test]
1359 fn ok_drop_while_writer_waiting() {
1360 let (_dir, pipe) = new_env();
1361 let pipe = Arc::new(pipe);
1362
1363 let p2 = pipe.clone();
1364 let handle = thread::spawn(move || {
1365 for i in 0..0x80 {
1366 let buf = vec![1u8; CHUNK];
1367 let epoch = p2.write(&buf, i).unwrap();
1368 p2.wait_for_durability(epoch).unwrap();
1369 }
1370 });
1371
1372 thread::sleep(Duration::from_millis(0x0A));
1373 drop(pipe);
1374
1375 handle.join().unwrap();
1376 }
1377 }
1378}