Skip to main content

frozen_core/
fpipe.rs

1//! An high throughput asynchronous IO pipeline for chunk based storage, it uses batches to write requests and
2//! flushes them in the background, while providing durability guarantees via epochs
3//!
4//! `FrozenPipe` batches write requests and flushes them in the background, providing durability guarantees via epochs
5//!
6//! ## Features
7//!
8//! - Batched IO
9//! - Background durability
10//! - Backpressure via [`BufPool`]
11//! - Crash-safe durability semantics
12//! - Optimized page reads
13//!
14//! ## Example
15//!
16//! ```
17//! use frozen_core::fpipe::FrozenPipe;
18//! use frozen_core::ffile::{FrozenFile, FFCfg};
19//! use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
20//! use std::time::Duration;
21//!
22//! let dir = tempfile::tempdir().unwrap();
23//! let path = dir.path().join("tmp_pipe");
24//!
25//! let file = FrozenFile::new(FFCfg {
26//!     path,
27//!     mid: 0,
28//!     chunk_size: 0x20,
29//!     initial_chunk_amount: 4,
30//! }).unwrap();
31//!
32//! let pool = BufPool::new(BPCfg {
33//!     mid: 0,
34//!     chunk_size: 0x20,
35//!     backend: BPBackend::Prealloc { capacity: 0x10 },
36//! });
37//!
38//! let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x3A)).unwrap();
39//!
40//! let buf = vec![1u8; 0x40];
41//! let epoch = pipe.write(&buf, 0).unwrap();
42//!
43//! pipe.wait_for_durability(epoch).unwrap();
44//!
45//! let read = pipe.read(0, 2).unwrap();
46//! assert_eq!(read, buf);
47//! ```
48
49use crate::{
50    bpool,
51    error::{FrozenErr, FrozenRes},
52    ffile, hints, mpscq,
53};
54use std::{
55    sync::{self, atomic},
56    thread, time,
57};
58
59/// module id used for [`FrozenPipe`]
60static mut MODULE_ID: u8 = 0;
61
62/// Domain Id for [`FrozenPipe`] is **19**
63const ERRDOMAIN: u8 = 0x13;
64
65/// Error codes for [`FrozenPipe`]
66#[repr(u16)]
67pub enum FPErr {
68    /// (1024) internal fuck up (hault and catch fire)
69    Hcf = 0x400,
70
71    /// (1025) thread error or panic inside thread
72    Txe = 0x401,
73
74    /// (1026) lock error (failed or poisoned)
75    Lpn = 0x402,
76}
77
78impl FPErr {
79    #[inline]
80    fn default_message(&self) -> &'static [u8] {
81        match self {
82            Self::Lpn => b"lock poisoned",
83            Self::Hcf => b"hault and catch fire",
84            Self::Txe => b"thread failed or paniced",
85        }
86    }
87}
88
89#[inline]
90fn new_err<R>(res: FPErr, message: Vec<u8>) -> FrozenRes<R> {
91    let detail = res.default_message();
92    let err = FrozenErr::new(unsafe { MODULE_ID }, ERRDOMAIN, res as u16, detail, message);
93    Err(err)
94}
95
96#[inline]
97fn new_err_raw<E: std::fmt::Display>(res: FPErr, error: E) -> FrozenErr {
98    let detail = res.default_message();
99    FrozenErr::new(
100        unsafe { MODULE_ID },
101        ERRDOMAIN,
102        res as u16,
103        detail,
104        error.to_string().as_bytes().to_vec(),
105    )
106}
107
108/// An high throughput asynchronous IO pipeline for chunk based storage, it uses batches to write requests and
109/// flushes them in the background, while providing durability guarantees via epochs
110///
111/// ## Example
112///
113/// ```
114/// use frozen_core::fpipe::FrozenPipe;
115/// use frozen_core::ffile::{FrozenFile, FFCfg};
116/// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
117/// use std::time::Duration;
118///
119/// let dir = tempfile::tempdir().unwrap();
120/// let path = dir.path().join("tmp_pipe_write");
121///
122/// let file = FrozenFile::new(FFCfg {
123///     mid: 0,
124///     path,
125///     chunk_size: 0x20,
126///     initial_chunk_amount: 2,
127/// }).unwrap();
128///
129/// let pool = BufPool::new(BPCfg {
130///     mid: 0,
131///     chunk_size: 0x20,
132///     backend: BPBackend::Dynamic,
133/// });
134///
135/// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();
136///
137/// let buf = vec![0x3Bu8; 0x40];
138/// let epoch = pipe.write(&buf, 0).unwrap();
139///
140/// pipe.wait_for_durability(epoch).unwrap();
141///
142/// let read = pipe.read(0, 2).unwrap();
143/// assert_eq!(read, buf);
144/// ```
145#[derive(Debug)]
146pub struct FrozenPipe {
147    core: sync::Arc<Core>,
148    tx: Option<thread::JoinHandle<()>>,
149}
150
151impl FrozenPipe {
152    /// Create a new instance of [`FrozenPipe`]
153    pub fn new(file: ffile::FrozenFile, pool: bpool::BufPool, flush_duration: time::Duration) -> FrozenRes<Self> {
154        let core = Core::new(file, pool, flush_duration)?;
155        let tx = Core::spawn_tx(core.clone())?;
156
157        Ok(Self { core, tx: Some(tx) })
158    }
159
160    /// Submit a write request
161    ///
162    /// Returns the epoch representing the durability window of the write
163    ///
164    /// ## Working
165    ///
166    /// The buffer is split into `chunk_size` sized segments and staged using [`BufPool`] before being
167    /// written by the background flusher
168    ///
169    /// ## Example
170    ///
171    /// ```
172    /// use frozen_core::fpipe::FrozenPipe;
173    /// use frozen_core::ffile::{FrozenFile, FFCfg};
174    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
175    /// use std::time::Duration;
176    ///
177    /// let dir = tempfile::tempdir().unwrap();
178    /// let path = dir.path().join("tmp_pipe_write");
179    ///
180    /// let file = FrozenFile::new(FFCfg {
181    ///     mid: 0,
182    ///     path,
183    ///     chunk_size: 0x20,
184    ///     initial_chunk_amount: 2,
185    /// }).unwrap();
186    ///
187    /// let pool = BufPool::new(BPCfg {
188    ///     mid: 0,
189    ///     chunk_size: 0x20,
190    ///     backend: BPBackend::Dynamic,
191    /// });
192    ///
193    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();
194    ///
195    /// let buf = vec![0x3Bu8; 0x40];
196    /// let epoch = pipe.write(&buf, 0).unwrap();
197    ///
198    /// pipe.wait_for_durability(epoch).unwrap();
199    ///
200    /// let read = pipe.read(0, 2).unwrap();
201    /// assert_eq!(read, buf);
202    /// ```
203    #[inline(always)]
204    pub fn write(&self, buf: &[u8], index: usize) -> FrozenRes<u64> {
205        let chunk_size = self.core.chunk_size;
206        let chunks = buf.len().div_ceil(chunk_size);
207
208        let alloc = self.core.pool.allocate(chunks)?;
209
210        // NOTE: Read lock prevents torn syncs by ensuring the flusher_tx cannot acquire an exclusive lock unitl the
211        // write ops is submited, while this lock must be acquired after pool allocations as `BufPool::allocate` may
212        // block while waiting for chunks, otherwise the wait would delay the flusher from obtaining the lock, and
213        // potentially stalling the durability progress for the entire `FrozenPipe`
214        let _lock = self.core.acquire_io_lock()?;
215
216        let mut src_off = 0usize;
217        for ptr in alloc.slots() {
218            if src_off >= buf.len() {
219                break;
220            }
221
222            let remaining = buf.len() - src_off;
223            let copy = remaining.min(chunk_size);
224
225            unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr().add(src_off), *ptr, copy) };
226            src_off += copy;
227        }
228
229        let req = WriteReq::new(index, chunks, alloc);
230        self.core.mpscq.push(req);
231
232        Ok(self.core.epoch.load(atomic::Ordering::Acquire) + 1)
233    }
234
235    /// Read a single chunk from the given `index`
236    ///
237    /// This function performs a blocking read operation
238    ///
239    ///
240    /// ## Example
241    ///
242    /// ```
243    /// use frozen_core::fpipe::FrozenPipe;
244    /// use frozen_core::ffile::{FrozenFile, FFCfg};
245    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
246    /// use std::time::Duration;
247    ///
248    /// let dir = tempfile::tempdir().unwrap();
249    /// let path = dir.path().join("tmp_read_single");
250    ///
251    /// let file = FrozenFile::new(FFCfg {
252    ///     path,
253    ///     mid: 0,
254    ///     chunk_size: 0x20,
255    ///     initial_chunk_amount: 2,
256    /// }).unwrap();
257    ///
258    /// let pool = BufPool::new(BPCfg {
259    ///     mid: 0,
260    ///     chunk_size: 0x20,
261    ///     backend: BPBackend::Dynamic,
262    /// });
263    ///
264    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
265    ///
266    /// let data = vec![0xAAu8; 0x20];
267    /// let epoch = pipe.write(&data, 0).unwrap();
268    /// pipe.wait_for_durability(epoch).unwrap();
269    ///
270    /// let read = pipe.read_single(0).unwrap();
271    /// assert_eq!(read, data);
272    /// ```
273    #[inline(always)]
274    pub fn read_single(&self, index: usize) -> FrozenRes<Vec<u8>> {
275        let _lock = self.core.acquire_io_lock()?;
276
277        let mut slice = vec![0u8; self.core.chunk_size];
278        self.core.file.pread(slice.as_mut_ptr(), index)?;
279
280        drop(_lock);
281        Ok(slice)
282    }
283
284    /// Read `count` chunks starting from at the given `index`
285    ///
286    /// This function performs a blocking read operation
287    ///
288    /// ## Example
289    ///
290    /// ```
291    /// use frozen_core::fpipe::FrozenPipe;
292    /// use frozen_core::ffile::{FrozenFile, FFCfg};
293    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
294    /// use std::time::Duration;
295    ///
296    /// let dir = tempfile::tempdir().unwrap();
297    /// let path = dir.path().join("tmp_read_multi");
298    ///
299    /// let file = FrozenFile::new(FFCfg {
300    ///     path,
301    ///     mid: 0,
302    ///     chunk_size: 0x20,
303    ///     initial_chunk_amount: 8,
304    /// }).unwrap();
305    ///
306    /// let pool = BufPool::new(BPCfg {
307    ///     mid: 0,
308    ///     chunk_size: 0x20,
309    ///     backend: BPBackend::Dynamic,
310    /// });
311    ///
312    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
313    ///
314    /// let buf = vec![0xBBu8; 0x20 * 2];
315    /// let epoch = pipe.write(&buf, 0).unwrap();
316    /// pipe.wait_for_durability(epoch).unwrap();
317    ///
318    /// let read = pipe.read(0, 2).unwrap();
319    /// assert_eq!(read, buf);
320    /// ```
321    #[inline(always)]
322    pub fn read(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
323        let _lock = self.core.acquire_io_lock()?;
324
325        match count {
326            2 => self.read_2x(index),
327            4 => self.read_4x(index),
328            _ => self.read_multi(index, count),
329        }
330    }
331
332    #[inline(always)]
333    fn read_2x(&self, index: usize) -> FrozenRes<Vec<u8>> {
334        let chunk = self.core.chunk_size;
335
336        let mut buf = vec![0u8; chunk * 2];
337        let base = buf.as_mut_ptr();
338
339        let ptrs = [base, unsafe { base.add(chunk) }];
340        self.core.file.preadv(&ptrs, index)?;
341
342        Ok(buf)
343    }
344
345    #[inline(always)]
346    fn read_4x(&self, index: usize) -> FrozenRes<Vec<u8>> {
347        let chunk = self.core.chunk_size;
348
349        let mut buf = vec![0u8; chunk * 4];
350        let base = buf.as_mut_ptr();
351
352        let ptrs = [
353            base,
354            unsafe { base.add(chunk) },
355            unsafe { base.add(chunk * 2) },
356            unsafe { base.add(chunk * 3) },
357        ];
358        self.core.file.preadv(&ptrs, index)?;
359
360        Ok(buf)
361    }
362
363    #[inline(always)]
364    fn read_multi(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
365        let chunk = self.core.chunk_size;
366
367        let mut buf = vec![0u8; chunk * count];
368        let base = buf.as_mut_ptr();
369
370        let mut ptrs = Vec::with_capacity(count);
371        for i in 0..count {
372            ptrs.push(unsafe { base.add(i * chunk) });
373        }
374
375        self.core.file.preadv(&ptrs, index)?;
376        Ok(buf)
377    }
378
379    /// Blocks until given `epoch` becomes durable
380    ///
381    /// Durability epochs increase when the background flusher successfully syncs the underlying [`FrozenFile`]
382    ///
383    /// ## Example
384    ///
385    /// ```
386    /// use frozen_core::fpipe::FrozenPipe;
387    /// use frozen_core::ffile::{FrozenFile, FFCfg};
388    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
389    /// use std::time::Duration;
390    ///
391    /// let dir = tempfile::tempdir().unwrap();
392    /// let path = dir.path().join("tmp_wait");
393    ///
394    /// let file = FrozenFile::new(FFCfg {
395    ///     mid: 0,
396    ///     path,
397    ///     chunk_size: 0x20,
398    ///     initial_chunk_amount: 2,
399    /// }).unwrap();
400    ///
401    /// let pool = BufPool::new(BPCfg {
402    ///     mid: 0,
403    ///     chunk_size: 0x20,
404    ///     backend: BPBackend::Dynamic,
405    /// });
406    ///
407    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
408    ///
409    /// let buf = vec![1u8; 0x20];
410    /// let epoch = pipe.write(&buf, 0).unwrap();
411    ///
412    /// pipe.wait_for_durability(epoch).unwrap();
413    /// ```
414    pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()> {
415        self.internal_wait(epoch)
416    }
417
418    /// Force instant durability for the current batch
419    ///
420    /// This wakes the flusher thread and waits for the durability epoch
421    ///
422    /// ## Example
423    ///
424    /// ```
425    /// use frozen_core::fpipe::FrozenPipe;
426    /// use frozen_core::ffile::{FrozenFile, FFCfg};
427    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
428    /// use std::time::Duration;
429    ///
430    /// let dir = tempfile::tempdir().unwrap();
431    /// let path = dir.path().join("tmp_force");
432    ///
433    /// let file = FrozenFile::new(FFCfg {
434    ///     mid: 0,
435    ///     path,
436    ///     chunk_size: 0x20,
437    ///     initial_chunk_amount: 2,
438    /// }).unwrap();
439    ///
440    /// let pool = BufPool::new(BPCfg {
441    ///     mid: 0,
442    ///     chunk_size: 0x20,
443    ///     backend: BPBackend::Dynamic,
444    /// });
445    ///
446    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
447    ///
448    /// let buf = vec![0x0Au8; 0x20];
449    /// let epoch = pipe.write(&buf, 0).unwrap();
450    ///
451    /// pipe.force_durability(epoch).unwrap();
452    /// ```
453    pub fn force_durability(&self, epoch: u64) -> FrozenRes<()> {
454        let guard = self.core.lock.lock().map_err(|e| new_err_raw(FPErr::Lpn, e))?;
455        self.core.cv.notify_one();
456        drop(guard);
457
458        self.internal_wait(epoch)
459    }
460
461    /// Grow the underlying [`FrozenFile`] by given `count`
462    ///
463    /// The pipeline waits until all pending writes are flushed before extending the file
464    ///
465    /// ## Example
466    ///
467    /// ```
468    /// use frozen_core::fpipe::FrozenPipe;
469    /// use frozen_core::ffile::{FrozenFile, FFCfg};
470    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
471    /// use std::time::Duration;
472    ///
473    /// let dir = tempfile::tempdir().unwrap();
474    /// let path = dir.path().join("tmp_grow");
475    ///
476    /// let file = FrozenFile::new(FFCfg {
477    ///     mid: 0,
478    ///     path,
479    ///     chunk_size: 0x20,
480    ///     initial_chunk_amount: 2,
481    /// }).unwrap();
482    ///
483    /// let pool = BufPool::new(BPCfg {
484    ///     mid: 0,
485    ///     chunk_size: 0x20,
486    ///     backend: BPBackend::Dynamic,
487    /// });
488    ///
489    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
490    /// pipe.grow(4).unwrap();
491    /// ```
492    pub fn grow(&self, count: usize) -> FrozenRes<()> {
493        loop {
494            // NOTE: we must make sure there are no remaining items in the queue left for sync
495            let epoch = self.core.epoch.load(atomic::Ordering::Acquire);
496            self.force_durability(epoch)?;
497
498            // we acquire an exclusive lock to block write, read and sync ops
499            let lock = self.core.acquire_exclusive_io_lock()?;
500
501            // NOTE: it is possible that a write could sneak in between the sync and lock acquire, if so we must
502            // make sure that it has synced
503
504            if self.core.mpscq.is_empty() {
505                self.core.file.grow(count)?;
506                drop(lock);
507                return Ok(());
508            }
509
510            drop(lock);
511        }
512    }
513
514    fn internal_wait(&self, epoch: u64) -> FrozenRes<()> {
515        if hints::unlikely(self.core.epoch.load(atomic::Ordering::Acquire) >= epoch) {
516            return Ok(());
517        }
518
519        if let Some(sync_err) = self.core.get_sync_error() {
520            return Err(sync_err);
521        }
522
523        let mut guard = match self.core.durable_lock.lock() {
524            Ok(g) => g,
525            Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
526        };
527
528        loop {
529            if let Some(sync_err) = self.core.get_sync_error() {
530                return Err(sync_err);
531            }
532
533            if self.core.epoch.load(atomic::Ordering::Acquire) >= epoch {
534                return Ok(());
535            }
536
537            guard = match self.core.durable_cv.wait(guard) {
538                Ok(g) => g,
539                Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
540            };
541        }
542    }
543}
544
545impl Drop for FrozenPipe {
546    fn drop(&mut self) {
547        self.core.closed.store(true, atomic::Ordering::Release);
548        self.core.cv.notify_one(); // notify flusher tx to shut
549
550        if let Some(handle) = self.tx.take() {
551            let _ = handle.join();
552        }
553
554        // INFO: we must acquire an exclusive lock, to prevent dropping while sync,
555        // growing or any read/write ops
556        let _io_lock = self.core.acquire_exclusive_io_lock();
557
558        // free up the boxed error (if any)
559        let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
560        if !ptr.is_null() {
561            unsafe {
562                drop(Box::from_raw(ptr));
563            }
564        }
565    }
566}
567
568#[derive(Debug)]
569struct Core {
570    chunk_size: usize,
571    cv: sync::Condvar,
572    pool: bpool::BufPool,
573    lock: sync::Mutex<()>,
574    file: ffile::FrozenFile,
575    epoch: atomic::AtomicU64,
576    io_lock: sync::RwLock<()>,
577    durable_cv: sync::Condvar,
578    closed: atomic::AtomicBool,
579    durable_lock: sync::Mutex<()>,
580    flush_duration: time::Duration,
581    mpscq: mpscq::MPSCQueue<WriteReq>,
582    error: atomic::AtomicPtr<FrozenErr>,
583}
584
585impl Core {
586    fn new(
587        file: ffile::FrozenFile,
588        pool: bpool::BufPool,
589        flush_duration: time::Duration,
590    ) -> FrozenRes<sync::Arc<Self>> {
591        let cfg = file.cfg();
592        let chunk_size = cfg.chunk_size;
593
594        // NOTE: we only set it the module_id once, hence it'll be only set once per instance
595        // and is only used for error logging
596        unsafe { MODULE_ID = cfg.mid };
597
598        Ok(sync::Arc::new(Self {
599            file,
600            pool,
601            chunk_size,
602            flush_duration,
603            cv: sync::Condvar::new(),
604            lock: sync::Mutex::new(()),
605            io_lock: sync::RwLock::new(()),
606            epoch: atomic::AtomicU64::new(0),
607            durable_cv: sync::Condvar::new(),
608            mpscq: mpscq::MPSCQueue::default(),
609            durable_lock: sync::Mutex::new(()),
610            closed: atomic::AtomicBool::new(false),
611            error: atomic::AtomicPtr::new(std::ptr::null_mut()),
612        }))
613    }
614
615    #[inline]
616    fn acquire_io_lock(&self) -> FrozenRes<sync::RwLockReadGuard<'_, ()>> {
617        self.io_lock.read().map_err(|e| new_err_raw(FPErr::Lpn, e))
618    }
619
620    #[inline]
621    fn acquire_exclusive_io_lock(&self) -> FrozenRes<sync::RwLockWriteGuard<'_, ()>> {
622        self.io_lock.write().map_err(|e| new_err_raw(FPErr::Lpn, e))
623    }
624
625    #[inline]
626    fn get_sync_error(&self) -> Option<FrozenErr> {
627        let ptr = self.error.load(atomic::Ordering::Acquire);
628        if hints::likely(ptr.is_null()) {
629            return None;
630        }
631
632        Some(unsafe { (*ptr).clone() })
633    }
634
635    #[inline]
636    fn set_sync_error(&self, err: FrozenErr) {
637        let boxed = Box::into_raw(Box::new(err));
638        let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
639
640        // NOTE: we must free the old error, if any, to avoid mem leaks
641        if !old.is_null() {
642            unsafe {
643                drop(Box::from_raw(old));
644            }
645        }
646    }
647
648    #[inline]
649    fn clear_sync_error(&self) {
650        let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
651        if hints::unlikely(!old.is_null()) {
652            unsafe {
653                drop(Box::from_raw(old));
654            }
655        }
656    }
657
658    #[inline]
659    fn incr_epoch(&self) {
660        self.epoch.fetch_add(1, atomic::Ordering::Release);
661    }
662
663    fn write_batch(&self, batch: Vec<WriteReq>) -> FrozenRes<(usize, usize)> {
664        let mut max_index = 0usize;
665        let mut min_index = usize::MAX;
666
667        for req in &batch {
668            let slots = req.alloc.slots();
669            match req.chunks {
670                1 => {
671                    self.file.pwrite(slots[0], req.index)?;
672                }
673                _ => {
674                    self.file.pwritev(slots, req.index)?;
675                }
676            }
677
678            min_index = min_index.min(req.index);
679            max_index = max_index.max(req.index + req.chunks);
680        }
681
682        Ok((min_index, max_index))
683    }
684
685    fn spawn_tx(core: sync::Arc<Self>) -> FrozenRes<thread::JoinHandle<()>> {
686        match thread::Builder::new()
687            .name("fpipe-flush-tx".into())
688            .spawn(move || Self::flush_tx(core))
689        {
690            Ok(tx) => Ok(tx),
691            Err(error) => {
692                let mut error = error.to_string().as_bytes().to_vec();
693                error.extend_from_slice(b"Failed to spawn flush thread for FrozenPipe");
694                new_err(FPErr::Hcf, error)
695            }
696        }
697    }
698
699    fn flush_tx(core: sync::Arc<Self>) {
700        // init phase (acquiring locks)
701        let mut guard = match core.lock.lock() {
702            Ok(g) => g,
703            Err(error) => {
704                let mut message = error.to_string().as_bytes().to_vec();
705                message.extend_from_slice(b"Flush thread died before init could be completed for FrozenPipe");
706                let error = FrozenErr::new(
707                    unsafe { MODULE_ID },
708                    ERRDOMAIN,
709                    FPErr::Lpn as u16,
710                    FPErr::Lpn.default_message(),
711                    message,
712                );
713                core.set_sync_error(error);
714                return;
715            }
716        };
717
718        // sync loop w/ non-busy waiting
719        loop {
720            guard = match core.cv.wait_timeout(guard, core.flush_duration) {
721                Ok((g, _)) => g,
722                Err(e) => {
723                    core.set_sync_error(new_err_raw(FPErr::Txe, e));
724                    return;
725                }
726            };
727
728            // INFO: we must drop the guard before syscall, as its a blocking operation and holding
729            // the mutex while the syscall takes place is not a good idea, while we drop the mutex
730            // and acqurie it again, in-between other process could acquire it and use it
731            drop(guard);
732
733            // NOTE: we must read values of close brodcast before acquire exclusive lock,
734            // if done otherwise, we impose serious deadlock sort of situation for the the flusher tx
735
736            let req_batch = core.mpscq.drain();
737            let closing = core.closed.load(atomic::Ordering::Acquire);
738
739            if req_batch.is_empty() {
740                if closing {
741                    return;
742                }
743
744                guard = match core.lock.lock() {
745                    Ok(g) => g,
746                    Err(e) => {
747                        core.set_sync_error(new_err_raw(FPErr::Lpn, e));
748                        return;
749                    }
750                };
751
752                continue;
753            }
754
755            // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
756            // while sync is in progress
757
758            let io_lock = match core.acquire_exclusive_io_lock() {
759                Ok(lock) => lock,
760                Err(e) => {
761                    core.set_sync_error(new_err_raw(FPErr::Lpn, e));
762                    return;
763                }
764            };
765
766            let (_min, _max) = match core.write_batch(req_batch) {
767                Ok(res) => res,
768                Err(err) => {
769                    core.set_sync_error(err);
770                    drop(io_lock);
771
772                    guard = match core.lock.lock() {
773                        Ok(g) => g,
774                        Err(e) => {
775                            core.set_sync_error(new_err_raw(FPErr::Lpn, e));
776                            return;
777                        }
778                    };
779
780                    continue;
781                }
782            };
783
784            // NOTE:
785            //
786            // - if sync fails, we update the Core::error w/ the received error object
787            // - we clear it up when another sync call succeeds
788            // - this is valid, as the underlying sync flushes entire mmaped region, hence
789            //   even if the last call failed, and the new one succeeded, we do get the durability
790            //   guarenty for the old data as well
791
792            match core.file.sync() {
793                Err(err) => core.set_sync_error(err),
794                Ok(()) => {
795                    core.incr_epoch();
796                    let _g = match core.durable_lock.lock() {
797                        Ok(g) => g,
798                        Err(e) => {
799                            core.set_sync_error(new_err_raw(FPErr::Lpn, e));
800                            return;
801                        }
802                    };
803
804                    core.durable_cv.notify_all();
805                    core.clear_sync_error();
806                }
807            }
808
809            drop(io_lock);
810            guard = match core.lock.lock() {
811                Ok(g) => g,
812                Err(e) => {
813                    core.set_sync_error(new_err_raw(FPErr::Lpn, e));
814                    return;
815                }
816            };
817        }
818    }
819}
820
821unsafe impl Send for Core {}
822unsafe impl Sync for Core {}
823
824#[derive(Debug)]
825struct WriteReq {
826    index: usize,
827    chunks: usize,
828    alloc: bpool::Allocation,
829}
830
831impl WriteReq {
832    fn new(index: usize, chunks: usize, alloc: bpool::Allocation) -> Self {
833        Self { alloc, index, chunks }
834    }
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840    use crate::{
841        bpool::{BPBackend, BPCfg, BufPool},
842        error::TEST_MID,
843        ffile::{FFCfg, FrozenFile},
844    };
845    use std::sync::{Arc, Barrier};
846    use std::thread;
847    use std::time::{Duration, Instant};
848
849    const CHUNK: usize = 0x20;
850    const INIT: usize = 0x20;
851    const FLUSH: Duration = Duration::from_micros(10);
852
853    fn new_env() -> (tempfile::TempDir, FrozenPipe) {
854        let dir = tempfile::tempdir().unwrap();
855        let path = dir.path().join("tmp_pipe");
856
857        let file = FrozenFile::new(FFCfg {
858            mid: TEST_MID,
859            path,
860            chunk_size: CHUNK,
861            initial_chunk_amount: INIT,
862        })
863        .unwrap();
864        let pool = BufPool::new(BPCfg {
865            mid: TEST_MID,
866            chunk_size: CHUNK,
867            backend: BPBackend::Prealloc { capacity: 0x100 },
868        });
869
870        let pipe = FrozenPipe::new(file, pool, FLUSH).unwrap();
871
872        (dir, pipe)
873    }
874
875    mod lifecycle {
876        use super::*;
877
878        #[test]
879        fn ok_new() {
880            let (_dir, pipe) = new_env();
881            assert_eq!(pipe.core.epoch.load(atomic::Ordering::Acquire), 0);
882        }
883
884        #[test]
885        fn ok_drop() {
886            let (_dir, pipe) = new_env();
887            drop(pipe);
888        }
889    }
890
891    mod fp_write {
892        use super::*;
893
894        #[test]
895        fn ok_write_and_wait() {
896            let (_dir, pipe) = new_env();
897
898            let buf = vec![0xAB; CHUNK];
899            let epoch = pipe.write(&buf, 0).unwrap();
900            pipe.wait_for_durability(epoch).unwrap();
901        }
902
903        #[test]
904        fn ok_write_multiple_chunks() {
905            let (_dir, pipe) = new_env();
906
907            let buf = vec![0xAA; CHUNK * 4];
908            let epoch = pipe.write(&buf, 0).unwrap();
909            pipe.wait_for_durability(epoch).unwrap();
910        }
911
912        #[test]
913        fn ok_force_durability() {
914            let (_dir, pipe) = new_env();
915
916            let buf = vec![1u8; CHUNK];
917            let epoch = pipe.write(&buf, 0).unwrap();
918            pipe.force_durability(epoch).unwrap();
919        }
920
921        #[test]
922        fn ok_write_epoch_monotonic() {
923            let (_dir, pipe) = new_env();
924            let buf = vec![1u8; CHUNK];
925
926            let e1 = pipe.write(&buf, 0).unwrap();
927            pipe.wait_for_durability(e1).unwrap();
928
929            let e2 = pipe.write(&buf, 1).unwrap();
930            pipe.wait_for_durability(e2).unwrap();
931
932            assert!(e2 >= e1);
933        }
934
935        #[test]
936        fn ok_write_large() {
937            let (_dir, pipe) = new_env();
938            let buf = vec![0xAB; CHUNK * 0x80];
939
940            let epoch = pipe.write(&buf, 0).unwrap();
941            pipe.wait_for_durability(epoch).unwrap();
942        }
943
944        #[test]
945        fn ok_write_large_batch() {
946            let (_dir, pipe) = new_env();
947
948            for i in 0..0x100 {
949                let buf = vec![i as u8; CHUNK];
950                let epoch = pipe.write(&buf, i).unwrap();
951                pipe.wait_for_durability(epoch).unwrap();
952            }
953        }
954
955        #[test]
956        fn ok_write_is_blocked_at_pool_exhaustion_for_prealloc_backend() {
957            let dir = tempfile::tempdir().unwrap();
958            let path = dir.path().join("tmp_pipe");
959
960            let file = FrozenFile::new(FFCfg {
961                mid: TEST_MID,
962                path,
963                chunk_size: CHUNK,
964                initial_chunk_amount: INIT,
965            })
966            .unwrap();
967
968            let pool = BufPool::new(BPCfg {
969                mid: TEST_MID,
970                chunk_size: CHUNK,
971                backend: BPBackend::Prealloc { capacity: 1 },
972            });
973
974            let pipe = Arc::new(FrozenPipe::new(file, pool, FLUSH).unwrap());
975
976            let p2 = pipe.clone();
977            let t = thread::spawn(move || {
978                let buf = vec![1u8; CHUNK];
979                let epoch = p2.write(&buf, 0).unwrap();
980                p2.wait_for_durability(epoch).unwrap();
981            });
982
983            thread::sleep(Duration::from_millis(0x0A));
984
985            let buf = vec![2u8; CHUNK];
986            let epoch = pipe.write(&buf, 1).unwrap();
987            pipe.wait_for_durability(epoch).unwrap();
988
989            t.join().unwrap();
990        }
991    }
992
993    mod fp_read {
994        use super::*;
995
996        #[test]
997        fn ok_read_single_after_write() {
998            let (_dir, pipe) = new_env();
999
1000            let buf = vec![0xAB; CHUNK];
1001            let epoch = pipe.write(&buf, 0).unwrap();
1002            pipe.wait_for_durability(epoch).unwrap();
1003
1004            let read = pipe.read_single(0).unwrap();
1005            assert_eq!(read, buf);
1006        }
1007
1008        #[test]
1009        fn ok_read_2x() {
1010            let (_dir, pipe) = new_env();
1011
1012            let buf = vec![0xAA; CHUNK * 2];
1013            let epoch = pipe.write(&buf, 0).unwrap();
1014            pipe.wait_for_durability(epoch).unwrap();
1015
1016            let read = pipe.read(0, 2).unwrap();
1017            assert_eq!(read, buf);
1018        }
1019
1020        #[test]
1021        fn ok_read_4x() {
1022            let (_dir, pipe) = new_env();
1023
1024            let buf = vec![0xBB; CHUNK * 4];
1025            let epoch = pipe.write(&buf, 0).unwrap();
1026            pipe.wait_for_durability(epoch).unwrap();
1027
1028            let read = pipe.read(0, 4).unwrap();
1029            assert_eq!(read, buf);
1030        }
1031
1032        #[test]
1033        fn ok_read_multi_generic() {
1034            let (_dir, pipe) = new_env();
1035
1036            let buf = vec![0xCC; CHUNK * 6];
1037            let epoch = pipe.write(&buf, 0).unwrap();
1038            pipe.wait_for_durability(epoch).unwrap();
1039
1040            let read = pipe.read(0, 6).unwrap();
1041            assert_eq!(read, buf);
1042        }
1043
1044        #[test]
1045        fn ok_read_multiple_indices() {
1046            let (_dir, pipe) = new_env();
1047
1048            for i in 0..8 {
1049                let buf = vec![i as u8; CHUNK];
1050                let epoch = pipe.write(&buf, i).unwrap();
1051                pipe.wait_for_durability(epoch).unwrap();
1052            }
1053
1054            for i in 0..8 {
1055                let read = pipe.read_single(i).unwrap();
1056                assert_eq!(read, vec![i as u8; CHUNK]);
1057            }
1058        }
1059
1060        #[test]
1061        fn ok_overwrite_same_index() {
1062            let (_dir, pipe) = new_env();
1063
1064            let buf1 = vec![0xAA; CHUNK];
1065            let e1 = pipe.write(&buf1, 0).unwrap();
1066            pipe.wait_for_durability(e1).unwrap();
1067
1068            let buf2 = vec![0xBB; CHUNK];
1069            let e2 = pipe.write(&buf2, 0).unwrap();
1070            pipe.wait_for_durability(e2).unwrap();
1071
1072            let read = pipe.read_single(0).unwrap();
1073            assert_eq!(read, buf2);
1074        }
1075
1076        #[test]
1077        fn ok_large_read_multi() {
1078            let (_dir, pipe) = new_env();
1079
1080            let buf = vec![0x7A; CHUNK * 0x10];
1081            let epoch = pipe.write(&buf, 0).unwrap();
1082            pipe.wait_for_durability(epoch).unwrap();
1083
1084            let read = pipe.read(0, 0x10).unwrap();
1085            assert_eq!(read, buf);
1086        }
1087
1088        #[test]
1089        fn ok_read_concurrent() {
1090            const THREADS: usize = 8;
1091
1092            let (_dir, pipe) = new_env();
1093            let pipe = Arc::new(pipe);
1094
1095            for i in 0..THREADS {
1096                let buf = vec![i as u8; CHUNK];
1097                let epoch = pipe.write(&buf, i).unwrap();
1098                pipe.wait_for_durability(epoch).unwrap();
1099            }
1100
1101            let mut handles = Vec::new();
1102
1103            for i in 0..THREADS {
1104                let pipe = pipe.clone();
1105
1106                handles.push(thread::spawn(move || {
1107                    let read = pipe.read_single(i).unwrap();
1108                    assert_eq!(read, vec![i as u8; CHUNK]);
1109                }));
1110            }
1111
1112            for h in handles {
1113                h.join().unwrap();
1114            }
1115        }
1116
1117        #[test]
1118        fn ok_concurrent_read_write() {
1119            let (_dir, pipe) = new_env();
1120            let pipe = Arc::new(pipe);
1121
1122            let writer = {
1123                let pipe = pipe.clone();
1124                thread::spawn(move || {
1125                    for i in 0..0x40 {
1126                        let buf = vec![i as u8; CHUNK];
1127                        let epoch = pipe.write(&buf, i).unwrap();
1128                        pipe.wait_for_durability(epoch).unwrap();
1129                    }
1130                })
1131            };
1132
1133            let reader = {
1134                let pipe = pipe.clone();
1135                thread::spawn(move || {
1136                    for _ in 0..0x40 {
1137                        let _ = pipe.read_single(0);
1138                    }
1139                })
1140            };
1141
1142            writer.join().unwrap();
1143            reader.join().unwrap();
1144        }
1145
1146        #[test]
1147        fn ok_read_after_grow() {
1148            let (_dir, pipe) = new_env();
1149
1150            pipe.grow(8).unwrap();
1151
1152            let buf = vec![0x5A; CHUNK];
1153            let epoch = pipe.write(&buf, INIT).unwrap();
1154            pipe.wait_for_durability(epoch).unwrap();
1155
1156            let read = pipe.read_single(INIT).unwrap();
1157            assert_eq!(read, buf);
1158        }
1159    }
1160
1161    mod batching {
1162        use super::*;
1163
1164        #[test]
1165        fn ok_multiple_writes_single_batch() {
1166            let (_dir, pipe) = new_env();
1167
1168            let mut epochs = Vec::new();
1169            for i in 0..0x10 {
1170                let buf = vec![i as u8; CHUNK];
1171                epochs.push(pipe.write(&buf, i).unwrap());
1172            }
1173
1174            for e in epochs {
1175                pipe.wait_for_durability(e).unwrap();
1176            }
1177
1178            assert!(pipe.core.epoch.load(atomic::Ordering::Acquire) > 0);
1179        }
1180    }
1181
1182    mod fp_grow {
1183        use super::*;
1184
1185        #[test]
1186        fn ok_grow_file() {
1187            let (_dir, pipe) = new_env();
1188            let curr_len = pipe.core.file.length().unwrap();
1189
1190            pipe.grow(0x10).unwrap();
1191            let new_len = pipe.core.file.length().unwrap();
1192
1193            assert_eq!(new_len, curr_len + (0x10 * pipe.core.chunk_size));
1194        }
1195
1196        #[test]
1197        fn ok_write_after_grow() {
1198            let (_dir, pipe) = new_env();
1199            pipe.grow(0x10).unwrap();
1200
1201            let buf = vec![0xBB; CHUNK];
1202            let epoch = pipe.write(&buf, INIT).unwrap();
1203            pipe.wait_for_durability(epoch).unwrap();
1204        }
1205
1206        #[test]
1207        fn ok_grow_while_writing() {
1208            let (_dir, pipe) = new_env();
1209            let pipe = Arc::new(pipe);
1210            let curr_len = pipe.core.file.length().unwrap();
1211
1212            let p2 = pipe.clone();
1213            let writer = thread::spawn(move || {
1214                for i in 0..INIT {
1215                    let buf = vec![1u8; CHUNK];
1216                    let epoch = p2.write(&buf, i).unwrap();
1217                    p2.wait_for_durability(epoch).unwrap();
1218                }
1219            });
1220
1221            thread::sleep(Duration::from_millis(10));
1222
1223            pipe.grow(0x3A).unwrap();
1224            writer.join().unwrap();
1225
1226            let new_len = pipe.core.file.length().unwrap();
1227            assert_eq!(new_len, curr_len + (0x3A * pipe.core.chunk_size));
1228        }
1229    }
1230
1231    mod concurrency {
1232        use super::*;
1233
1234        #[test]
1235        fn ok_multi_writer() {
1236            const THREADS: usize = 8;
1237            const ITERS: usize = 0x100;
1238
1239            let (_dir, pipe) = new_env();
1240            let pipe = Arc::new(pipe);
1241
1242            let mut handles = Vec::new();
1243            for t in 0..THREADS {
1244                let pipe = pipe.clone();
1245
1246                handles.push(thread::spawn(move || {
1247                    for i in 0..ITERS {
1248                        let buf = vec![t as u8; CHUNK];
1249                        let epoch = pipe.write(&buf, i).unwrap();
1250                        pipe.wait_for_durability(epoch).unwrap();
1251                    }
1252                }));
1253            }
1254
1255            for h in handles {
1256                h.join().unwrap();
1257            }
1258        }
1259
1260        #[test]
1261        fn ok_barrier_start_parallel_writes() {
1262            const THREADS: usize = 8;
1263
1264            let (_dir, pipe) = new_env();
1265            let pipe = Arc::new(pipe);
1266            let barrier = Arc::new(Barrier::new(THREADS));
1267
1268            let mut handles = Vec::new();
1269
1270            for i in 0..THREADS {
1271                let pipe = pipe.clone();
1272                let barrier = barrier.clone();
1273
1274                handles.push(thread::spawn(move || {
1275                    barrier.wait();
1276
1277                    let buf = vec![i as u8; CHUNK];
1278                    let epoch = pipe.write(&buf, i).unwrap();
1279                    pipe.wait_for_durability(epoch).unwrap();
1280                }));
1281            }
1282
1283            for h in handles {
1284                h.join().unwrap();
1285            }
1286        }
1287    }
1288
1289    mod durability_wait {
1290        use super::*;
1291
1292        #[test]
1293        fn ok_wait_blocks_until_flush() {
1294            let (_dir, pipe) = new_env();
1295
1296            let buf = vec![0x55; CHUNK];
1297            let epoch = pipe.write(&buf, 0).unwrap();
1298
1299            let start = Instant::now();
1300            pipe.wait_for_durability(epoch).unwrap();
1301
1302            assert!(start.elapsed() >= Duration::from_micros(1));
1303        }
1304
1305        #[test]
1306        fn ok_force_durability_concurrent() {
1307            let (_dir, pipe) = new_env();
1308            let pipe = Arc::new(pipe);
1309
1310            let mut handles = Vec::new();
1311            for i in 0..0x0A {
1312                let pipe = pipe.clone();
1313
1314                handles.push(thread::spawn(move || {
1315                    let buf = vec![i as u8; CHUNK];
1316                    let epoch = pipe.write(&buf, i).unwrap();
1317                    pipe.force_durability(epoch).unwrap();
1318                }));
1319            }
1320
1321            for h in handles {
1322                h.join().unwrap();
1323            }
1324        }
1325    }
1326
1327    mod shutdown {
1328        use super::*;
1329
1330        #[test]
1331        fn ok_drop_with_pending_writes() {
1332            let (_dir, pipe) = new_env();
1333
1334            let buf = vec![0xAA; CHUNK];
1335            pipe.write(&buf, 0).unwrap();
1336            drop(pipe);
1337        }
1338
1339        #[test]
1340        fn ok_drop_during_activity() {
1341            let (_dir, pipe) = new_env();
1342            let pipe = Arc::new(pipe);
1343
1344            let p2 = pipe.clone();
1345
1346            let handle = thread::spawn(move || {
1347                let buf = vec![1u8; CHUNK];
1348                let epoch = p2.write(&buf, 0).unwrap();
1349                p2.wait_for_durability(epoch).unwrap();
1350            });
1351
1352            thread::sleep(Duration::from_millis(10));
1353            drop(pipe);
1354
1355            handle.join().unwrap();
1356        }
1357
1358        #[test]
1359        fn ok_drop_while_writer_waiting() {
1360            let (_dir, pipe) = new_env();
1361            let pipe = Arc::new(pipe);
1362
1363            let p2 = pipe.clone();
1364            let handle = thread::spawn(move || {
1365                for i in 0..0x80 {
1366                    let buf = vec![1u8; CHUNK];
1367                    let epoch = p2.write(&buf, i).unwrap();
1368                    p2.wait_for_durability(epoch).unwrap();
1369                }
1370            });
1371
1372            thread::sleep(Duration::from_millis(0x0A));
1373            drop(pipe);
1374
1375            handle.join().unwrap();
1376        }
1377    }
1378}