Skip to main content

frozen_core/
fpipe.rs

1//! An high throughput asynchronous IO pipeline for chunk based storage, it uses batches to write requests and
2//! flushes them in the background, while providing durability guarantees via epochs
3//!
4//! `FrozenPipe` batches write requests and flushes them in the background, providing durability guarantees via epochs
5//!
6//! ## Features
7//!
8//! - Batched IO
9//! - Background durability
10//! - Backpressure via [`BufPool`]
11//! - Crash-safe durability semantics
12//! - Optimized page reads
13//!
14//! ## Example
15//!
16//! ```
17//! use frozen_core::fpipe::FrozenPipe;
18//! use frozen_core::ffile::{FrozenFile, FFCfg};
19//! use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
20//! use std::time::Duration;
21//!
22//! let dir = tempfile::tempdir().unwrap();
23//! let path = dir.path().join("tmp_pipe");
24//!
25//! let file = FrozenFile::new(FFCfg {
26//!     path,
27//!     mid: 0,
28//!     chunk_size: 0x20,
29//!     initial_chunk_amount: 4,
30//! }).unwrap();
31//!
32//! let pool = BufPool::new(BPCfg {
33//!     mid: 0,
34//!     chunk_size: 0x20,
35//!     backend: BPBackend::Prealloc { capacity: 0x10 },
36//! });
37//!
38//! let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x3A)).unwrap();
39//!
40//! let buf = vec![1u8; 0x40];
41//! let epoch = pipe.write(&buf, 0).unwrap();
42//!
43//! pipe.wait_for_durability(epoch).unwrap();
44//!
45//! let read = pipe.read(0, 2).unwrap();
46//! assert_eq!(read, buf);
47//! ```
48
49use crate::{
50    bpool,
51    error::{FrozenErr, FrozenRes},
52    ffile, hints, mpscq,
53};
54use std::{
55    sync::{self, atomic},
56    thread, time,
57};
58
59/// module id used for [`FrozenPipe`]
60static mut MODULE_ID: u8 = 0;
61
62/// Domain Id for [`FrozenPipe`] is **19**
63const ERRDOMAIN: u8 = 0x13;
64
65/// Error codes for [`FrozenPipe`]
66#[repr(u16)]
67pub enum FPErr {
68    /// (1024) internal fuck up (hault and catch fire)
69    Hcf = 0x400,
70
71    /// (1025) thread error or panic inside thread
72    Txe = 0x401,
73
74    /// (1026) lock error (failed or poisoned)
75    Lpn = 0x402,
76}
77
78impl FPErr {
79    #[inline]
80    fn default_message(&self) -> &'static [u8] {
81        match self {
82            Self::Lpn => b"lock poisoned",
83            Self::Hcf => b"hault and catch fire",
84            Self::Txe => b"thread failed or paniced",
85        }
86    }
87}
88
89#[inline]
90fn new_err<R>(res: FPErr, message: Vec<u8>) -> FrozenRes<R> {
91    let detail = res.default_message();
92    let err = FrozenErr::new(unsafe { MODULE_ID }, ERRDOMAIN, res as u16, detail, message);
93    Err(err)
94}
95
96#[inline]
97fn new_err_raw<E: std::fmt::Display>(res: FPErr, error: E) -> FrozenErr {
98    let detail = res.default_message();
99    FrozenErr::new(
100        unsafe { MODULE_ID },
101        ERRDOMAIN,
102        res as u16,
103        detail,
104        error.to_string().as_bytes().to_vec(),
105    )
106}
107
108/// An high throughput asynchronous IO pipeline for chunk based storage, it uses batches to write requests and
109/// flushes them in the background, while providing durability guarantees via epochs
110///
111/// ## Example
112///
113/// ```
114/// use frozen_core::fpipe::FrozenPipe;
115/// use frozen_core::ffile::{FrozenFile, FFCfg};
116/// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
117/// use std::time::Duration;
118///
119/// let dir = tempfile::tempdir().unwrap();
120/// let path = dir.path().join("tmp_pipe_write");
121///
122/// let file = FrozenFile::new(FFCfg {
123///     mid: 0,
124///     path,
125///     chunk_size: 0x20,
126///     initial_chunk_amount: 2,
127/// }).unwrap();
128///
129/// let pool = BufPool::new(BPCfg {
130///     mid: 0,
131///     chunk_size: 0x20,
132///     backend: BPBackend::Dynamic,
133/// });
134///
135/// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();
136///
137/// let buf = vec![0x3Bu8; 0x40];
138/// let epoch = pipe.write(&buf, 0).unwrap();
139///
140/// pipe.wait_for_durability(epoch).unwrap();
141///
142/// let read = pipe.read(0, 2).unwrap();
143/// assert_eq!(read, buf);
144/// ```
145#[derive(Debug)]
146pub struct FrozenPipe {
147    core: sync::Arc<Core>,
148    tx: Option<thread::JoinHandle<()>>,
149}
150
151impl FrozenPipe {
152    /// Create a new instance of [`FrozenPipe`]
153    pub fn new(file: ffile::FrozenFile, pool: bpool::BufPool, flush_duration: time::Duration) -> FrozenRes<Self> {
154        let core = Core::new(file, pool, flush_duration)?;
155        let tx = Core::spawn_tx(core.clone())?;
156
157        Ok(Self { core, tx: Some(tx) })
158    }
159
160    /// Submit a write request
161    ///
162    /// Returns the epoch representing the durability window of the write
163    ///
164    /// ## Working
165    ///
166    /// The buffer is split into `chunk_size` sized segments and staged using [`BufPool`] before being
167    /// written by the background flusher
168    ///
169    /// ## Example
170    ///
171    /// ```
172    /// use frozen_core::fpipe::FrozenPipe;
173    /// use frozen_core::ffile::{FrozenFile, FFCfg};
174    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
175    /// use std::time::Duration;
176    ///
177    /// let dir = tempfile::tempdir().unwrap();
178    /// let path = dir.path().join("tmp_pipe_write");
179    ///
180    /// let file = FrozenFile::new(FFCfg {
181    ///     mid: 0,
182    ///     path,
183    ///     chunk_size: 0x20,
184    ///     initial_chunk_amount: 2,
185    /// }).unwrap();
186    ///
187    /// let pool = BufPool::new(BPCfg {
188    ///     mid: 0,
189    ///     chunk_size: 0x20,
190    ///     backend: BPBackend::Dynamic,
191    /// });
192    ///
193    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();
194    ///
195    /// let buf = vec![0x3Bu8; 0x40];
196    /// let epoch = pipe.write(&buf, 0).unwrap();
197    ///
198    /// pipe.wait_for_durability(epoch).unwrap();
199    ///
200    /// let read = pipe.read(0, 2).unwrap();
201    /// assert_eq!(read, buf);
202    /// ```
203    #[inline(always)]
204    pub fn write(&self, buf: &[u8], index: usize) -> FrozenRes<u64> {
205        let chunk_size = self.core.chunk_size;
206        let chunks = buf.len().div_ceil(chunk_size);
207
208        let alloc = self.core.pool.allocate(chunks)?;
209
210        // NOTE: Read lock prevents torn syncs by ensuring the flusher_tx cannot acquire an exclusive lock unitl the
211        // write ops is submited, while this lock must be acquired after pool allocations as `BufPool::allocate` may
212        // block while waiting for chunks, otherwise the wait would delay the flusher from obtaining the lock, and
213        // potentially stalling the durability progress for the entire `FrozenPipe`
214        let _lock = self.core.acquire_io_lock()?;
215
216        let mut src_off = 0usize;
217        for ptr in alloc.slots() {
218            if src_off >= buf.len() {
219                break;
220            }
221
222            let remaining = buf.len() - src_off;
223            let copy = remaining.min(chunk_size);
224
225            unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr().add(src_off), *ptr, copy) };
226            src_off += copy;
227        }
228
229        let req = WriteReq::new(index, chunks, alloc);
230        self.core.mpscq.push(req);
231
232        Ok(self.core.epoch.load(atomic::Ordering::Acquire) + 1)
233    }
234
235    /// Read a single chunk from the given `index`
236    ///
237    /// This function performs a blocking read operation
238    ///
239    ///
240    /// ## Example
241    ///
242    /// ```
243    /// use frozen_core::fpipe::FrozenPipe;
244    /// use frozen_core::ffile::{FrozenFile, FFCfg};
245    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
246    /// use std::time::Duration;
247    ///
248    /// let dir = tempfile::tempdir().unwrap();
249    /// let path = dir.path().join("tmp_read_single");
250    ///
251    /// let file = FrozenFile::new(FFCfg {
252    ///     path,
253    ///     mid: 0,
254    ///     chunk_size: 0x20,
255    ///     initial_chunk_amount: 2,
256    /// }).unwrap();
257    ///
258    /// let pool = BufPool::new(BPCfg {
259    ///     mid: 0,
260    ///     chunk_size: 0x20,
261    ///     backend: BPBackend::Dynamic,
262    /// });
263    ///
264    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
265    ///
266    /// let data = vec![0xAAu8; 0x20];
267    /// let epoch = pipe.write(&data, 0).unwrap();
268    /// pipe.wait_for_durability(epoch).unwrap();
269    ///
270    /// let read = pipe.read_single(0).unwrap();
271    /// assert_eq!(read, data);
272    /// ```
273    #[inline(always)]
274    pub fn read_single(&self, index: usize) -> FrozenRes<Vec<u8>> {
275        let mut slice = vec![0u8; self.core.chunk_size];
276        self.core.file.pread(slice.as_mut_ptr(), index)?;
277
278        Ok(slice)
279    }
280
281    /// Read `count` chunks starting from at the given `index`
282    ///
283    /// This function performs a blocking read operation
284    ///
285    /// ## Example
286    ///
287    /// ```
288    /// use frozen_core::fpipe::FrozenPipe;
289    /// use frozen_core::ffile::{FrozenFile, FFCfg};
290    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
291    /// use std::time::Duration;
292    ///
293    /// let dir = tempfile::tempdir().unwrap();
294    /// let path = dir.path().join("tmp_read_multi");
295    ///
296    /// let file = FrozenFile::new(FFCfg {
297    ///     path,
298    ///     mid: 0,
299    ///     chunk_size: 0x20,
300    ///     initial_chunk_amount: 8,
301    /// }).unwrap();
302    ///
303    /// let pool = BufPool::new(BPCfg {
304    ///     mid: 0,
305    ///     chunk_size: 0x20,
306    ///     backend: BPBackend::Dynamic,
307    /// });
308    ///
309    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
310    ///
311    /// let buf = vec![0xBBu8; 0x20 * 2];
312    /// let epoch = pipe.write(&buf, 0).unwrap();
313    /// pipe.wait_for_durability(epoch).unwrap();
314    ///
315    /// let read = pipe.read(0, 2).unwrap();
316    /// assert_eq!(read, buf);
317    /// ```
318    #[inline(always)]
319    pub fn read(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
320        match count {
321            2 => self.read_2x(index),
322            4 => self.read_4x(index),
323            _ => self.read_multi(index, count),
324        }
325    }
326
327    #[inline(always)]
328    fn read_2x(&self, index: usize) -> FrozenRes<Vec<u8>> {
329        let chunk = self.core.chunk_size;
330
331        let mut buf = vec![0u8; chunk * 2];
332        let base = buf.as_mut_ptr();
333
334        let ptrs = [base, unsafe { base.add(chunk) }];
335        self.core.file.preadv(&ptrs, index)?;
336
337        Ok(buf)
338    }
339
340    #[inline(always)]
341    fn read_4x(&self, index: usize) -> FrozenRes<Vec<u8>> {
342        let chunk = self.core.chunk_size;
343
344        let mut buf = vec![0u8; chunk * 4];
345        let base = buf.as_mut_ptr();
346
347        let ptrs = [
348            base,
349            unsafe { base.add(chunk) },
350            unsafe { base.add(chunk * 2) },
351            unsafe { base.add(chunk * 3) },
352        ];
353        self.core.file.preadv(&ptrs, index)?;
354
355        Ok(buf)
356    }
357
358    #[inline(always)]
359    fn read_multi(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>> {
360        let chunk = self.core.chunk_size;
361
362        let mut buf = vec![0u8; chunk * count];
363        let base = buf.as_mut_ptr();
364
365        let mut ptrs = Vec::with_capacity(count);
366        for i in 0..count {
367            ptrs.push(unsafe { base.add(i * chunk) });
368        }
369
370        self.core.file.preadv(&ptrs, index)?;
371        Ok(buf)
372    }
373
374    /// Blocks until given `epoch` becomes durable
375    ///
376    /// Durability epochs increase when the background flusher successfully syncs the underlying [`FrozenFile`]
377    ///
378    /// ## Example
379    ///
380    /// ```
381    /// use frozen_core::fpipe::FrozenPipe;
382    /// use frozen_core::ffile::{FrozenFile, FFCfg};
383    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
384    /// use std::time::Duration;
385    ///
386    /// let dir = tempfile::tempdir().unwrap();
387    /// let path = dir.path().join("tmp_wait");
388    ///
389    /// let file = FrozenFile::new(FFCfg {
390    ///     mid: 0,
391    ///     path,
392    ///     chunk_size: 0x20,
393    ///     initial_chunk_amount: 2,
394    /// }).unwrap();
395    ///
396    /// let pool = BufPool::new(BPCfg {
397    ///     mid: 0,
398    ///     chunk_size: 0x20,
399    ///     backend: BPBackend::Dynamic,
400    /// });
401    ///
402    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
403    ///
404    /// let buf = vec![1u8; 0x20];
405    /// let epoch = pipe.write(&buf, 0).unwrap();
406    ///
407    /// pipe.wait_for_durability(epoch).unwrap();
408    /// ```
409    pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()> {
410        self.internal_wait(epoch)
411    }
412
413    /// Force instant durability for the current batch
414    ///
415    /// This wakes the flusher thread and waits for the durability epoch
416    ///
417    /// ## Example
418    ///
419    /// ```
420    /// use frozen_core::fpipe::FrozenPipe;
421    /// use frozen_core::ffile::{FrozenFile, FFCfg};
422    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
423    /// use std::time::Duration;
424    ///
425    /// let dir = tempfile::tempdir().unwrap();
426    /// let path = dir.path().join("tmp_force");
427    ///
428    /// let file = FrozenFile::new(FFCfg {
429    ///     mid: 0,
430    ///     path,
431    ///     chunk_size: 0x20,
432    ///     initial_chunk_amount: 2,
433    /// }).unwrap();
434    ///
435    /// let pool = BufPool::new(BPCfg {
436    ///     mid: 0,
437    ///     chunk_size: 0x20,
438    ///     backend: BPBackend::Dynamic,
439    /// });
440    ///
441    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
442    ///
443    /// let buf = vec![0x0Au8; 0x20];
444    /// let epoch = pipe.write(&buf, 0).unwrap();
445    ///
446    /// pipe.force_durability(epoch).unwrap();
447    /// ```
448    pub fn force_durability(&self, epoch: u64) -> FrozenRes<()> {
449        let guard = self.core.lock.lock().map_err(|e| new_err_raw(FPErr::Lpn, e))?;
450        self.core.cv.notify_one();
451        drop(guard);
452
453        self.internal_wait(epoch)
454    }
455
456    /// Grow the underlying [`FrozenFile`] by given `count`
457    ///
458    /// The pipeline waits until all pending writes are flushed before extending the file
459    ///
460    /// ## Example
461    ///
462    /// ```
463    /// use frozen_core::fpipe::FrozenPipe;
464    /// use frozen_core::ffile::{FrozenFile, FFCfg};
465    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
466    /// use std::time::Duration;
467    ///
468    /// let dir = tempfile::tempdir().unwrap();
469    /// let path = dir.path().join("tmp_grow");
470    ///
471    /// let file = FrozenFile::new(FFCfg {
472    ///     mid: 0,
473    ///     path,
474    ///     chunk_size: 0x20,
475    ///     initial_chunk_amount: 2,
476    /// }).unwrap();
477    ///
478    /// let pool = BufPool::new(BPCfg {
479    ///     mid: 0,
480    ///     chunk_size: 0x20,
481    ///     backend: BPBackend::Dynamic,
482    /// });
483    ///
484    /// let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
485    /// pipe.grow(4).unwrap();
486    /// ```
487    pub fn grow(&self, count: usize) -> FrozenRes<()> {
488        loop {
489            // NOTE: we must make sure there are no remaining items in the queue left for sync
490            let epoch = self.core.epoch.load(atomic::Ordering::Acquire);
491            self.force_durability(epoch)?;
492
493            // we acquire an exclusive lock to block write, read and sync ops
494            let lock = self.core.acquire_exclusive_io_lock()?;
495
496            // NOTE: it is possible that a write could sneak in between the sync and lock acquire, if so we must
497            // make sure that it has synced
498
499            if self.core.mpscq.is_empty() {
500                self.core.file.grow(count)?;
501                drop(lock);
502                return Ok(());
503            }
504
505            drop(lock);
506        }
507    }
508
509    fn internal_wait(&self, epoch: u64) -> FrozenRes<()> {
510        if hints::unlikely(self.core.epoch.load(atomic::Ordering::Acquire) >= epoch) {
511            return Ok(());
512        }
513
514        if let Some(sync_err) = self.core.get_sync_error() {
515            return Err(sync_err);
516        }
517
518        let mut guard = match self.core.durable_lock.lock() {
519            Ok(g) => g,
520            Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
521        };
522
523        loop {
524            if let Some(sync_err) = self.core.get_sync_error() {
525                return Err(sync_err);
526            }
527
528            if self.core.epoch.load(atomic::Ordering::Acquire) >= epoch {
529                return Ok(());
530            }
531
532            guard = match self.core.durable_cv.wait(guard) {
533                Ok(g) => g,
534                Err(e) => return Err(new_err_raw(FPErr::Lpn, e)),
535            };
536        }
537    }
538}
539
540impl Drop for FrozenPipe {
541    fn drop(&mut self) {
542        self.core.closed.store(true, atomic::Ordering::Release);
543        self.core.cv.notify_one(); // notify flusher tx to shut
544
545        if let Some(handle) = self.tx.take() {
546            let _ = handle.join();
547        }
548
549        // INFO: we must acquire an exclusive lock, to prevent dropping while sync,
550        // growing or any read/write ops
551        let _io_lock = self.core.acquire_exclusive_io_lock();
552
553        // free up the boxed error (if any)
554        let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
555        if !ptr.is_null() {
556            unsafe {
557                drop(Box::from_raw(ptr));
558            }
559        }
560    }
561}
562
563#[derive(Debug)]
564struct Core {
565    chunk_size: usize,
566    cv: sync::Condvar,
567    pool: bpool::BufPool,
568    lock: sync::Mutex<()>,
569    file: ffile::FrozenFile,
570    epoch: atomic::AtomicU64,
571    io_lock: sync::RwLock<()>,
572    durable_cv: sync::Condvar,
573    closed: atomic::AtomicBool,
574    durable_lock: sync::Mutex<()>,
575    flush_duration: time::Duration,
576    mpscq: mpscq::MPSCQueue<WriteReq>,
577    error: atomic::AtomicPtr<FrozenErr>,
578}
579
580impl Core {
581    fn new(
582        file: ffile::FrozenFile,
583        pool: bpool::BufPool,
584        flush_duration: time::Duration,
585    ) -> FrozenRes<sync::Arc<Self>> {
586        let cfg = file.cfg();
587        let chunk_size = cfg.chunk_size;
588
589        // NOTE: we only set it the module_id once, hence it'll be only set once per instance
590        // and is only used for error logging
591        unsafe { MODULE_ID = cfg.mid };
592
593        Ok(sync::Arc::new(Self {
594            file,
595            pool,
596            chunk_size,
597            flush_duration,
598            cv: sync::Condvar::new(),
599            lock: sync::Mutex::new(()),
600            io_lock: sync::RwLock::new(()),
601            epoch: atomic::AtomicU64::new(0),
602            durable_cv: sync::Condvar::new(),
603            mpscq: mpscq::MPSCQueue::default(),
604            durable_lock: sync::Mutex::new(()),
605            closed: atomic::AtomicBool::new(false),
606            error: atomic::AtomicPtr::new(std::ptr::null_mut()),
607        }))
608    }
609
610    #[inline]
611    fn acquire_io_lock(&self) -> FrozenRes<sync::RwLockReadGuard<'_, ()>> {
612        self.io_lock.read().map_err(|e| new_err_raw(FPErr::Lpn, e))
613    }
614
615    #[inline]
616    fn acquire_exclusive_io_lock(&self) -> FrozenRes<sync::RwLockWriteGuard<'_, ()>> {
617        self.io_lock.write().map_err(|e| new_err_raw(FPErr::Lpn, e))
618    }
619
620    #[inline]
621    fn get_sync_error(&self) -> Option<FrozenErr> {
622        let ptr = self.error.load(atomic::Ordering::Acquire);
623        if hints::likely(ptr.is_null()) {
624            return None;
625        }
626
627        Some(unsafe { (*ptr).clone() })
628    }
629
630    #[inline]
631    fn set_sync_error(&self, err: FrozenErr) {
632        let boxed = Box::into_raw(Box::new(err));
633        let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
634
635        // NOTE: we must free the old error, if any, to avoid mem leaks
636        if !old.is_null() {
637            unsafe {
638                drop(Box::from_raw(old));
639            }
640        }
641    }
642
643    #[inline]
644    fn clear_sync_error(&self) {
645        let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
646        if hints::unlikely(!old.is_null()) {
647            unsafe {
648                drop(Box::from_raw(old));
649            }
650        }
651    }
652
653    #[inline]
654    fn incr_epoch(&self) {
655        self.epoch.fetch_add(1, atomic::Ordering::Release);
656    }
657
658    fn write_batch(&self, batch: Vec<WriteReq>) -> FrozenRes<(usize, usize)> {
659        let mut max_index = 0usize;
660        let mut min_index = usize::MAX;
661
662        for req in &batch {
663            let slots = req.alloc.slots();
664            match req.chunks {
665                1 => {
666                    self.file.pwrite(slots[0], req.index)?;
667                }
668                _ => {
669                    self.file.pwritev(slots, req.index)?;
670                }
671            }
672
673            min_index = min_index.min(req.index);
674            max_index = max_index.max(req.index + req.chunks);
675        }
676
677        Ok((min_index, max_index))
678    }
679
680    fn spawn_tx(core: sync::Arc<Self>) -> FrozenRes<thread::JoinHandle<()>> {
681        match thread::Builder::new()
682            .name("fpipe-flush-tx".into())
683            .spawn(move || Self::flush_tx(core))
684        {
685            Ok(tx) => Ok(tx),
686            Err(error) => {
687                let mut error = error.to_string().as_bytes().to_vec();
688                error.extend_from_slice(b"Failed to spawn flush thread for FrozenPipe");
689                new_err(FPErr::Hcf, error)
690            }
691        }
692    }
693
694    fn flush_tx(core: sync::Arc<Self>) {
695        // init phase (acquiring locks)
696        let mut guard = match core.lock.lock() {
697            Ok(g) => g,
698            Err(error) => {
699                let mut message = error.to_string().as_bytes().to_vec();
700                message.extend_from_slice(b"Flush thread died before init could be completed for FrozenPipe");
701                let error = FrozenErr::new(
702                    unsafe { MODULE_ID },
703                    ERRDOMAIN,
704                    FPErr::Lpn as u16,
705                    FPErr::Lpn.default_message(),
706                    message,
707                );
708                core.set_sync_error(error);
709                return;
710            }
711        };
712
713        // sync loop w/ non-busy waiting
714        loop {
715            guard = match core.cv.wait_timeout(guard, core.flush_duration) {
716                Ok((g, _)) => g,
717                Err(e) => {
718                    core.set_sync_error(new_err_raw(FPErr::Txe, e));
719                    return;
720                }
721            };
722
723            // INFO: we must drop the guard before syscall, as its a blocking operation and holding
724            // the mutex while the syscall takes place is not a good idea, while we drop the mutex
725            // and acqurie it again, in-between other process could acquire it and use it
726            drop(guard);
727
728            // NOTE: we must read values of close brodcast before acquire exclusive lock,
729            // if done otherwise, we impose serious deadlock sort of situation for the the flusher tx
730
731            let req_batch = core.mpscq.drain();
732            let closing = core.closed.load(atomic::Ordering::Acquire);
733
734            if req_batch.is_empty() {
735                if closing {
736                    return;
737                }
738
739                guard = match core.lock.lock() {
740                    Ok(g) => g,
741                    Err(e) => {
742                        core.set_sync_error(new_err_raw(FPErr::Lpn, e));
743                        return;
744                    }
745                };
746
747                continue;
748            }
749
750            // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
751            // while sync is in progress
752
753            let io_lock = match core.acquire_exclusive_io_lock() {
754                Ok(lock) => lock,
755                Err(e) => {
756                    core.set_sync_error(new_err_raw(FPErr::Lpn, e));
757                    return;
758                }
759            };
760
761            let (_min, _max) = match core.write_batch(req_batch) {
762                Ok(res) => res,
763                Err(err) => {
764                    core.set_sync_error(err);
765                    drop(io_lock);
766
767                    guard = match core.lock.lock() {
768                        Ok(g) => g,
769                        Err(e) => {
770                            core.set_sync_error(new_err_raw(FPErr::Lpn, e));
771                            return;
772                        }
773                    };
774
775                    continue;
776                }
777            };
778
779            // NOTE:
780            //
781            // - if sync fails, we update the Core::error w/ the received error object
782            // - we clear it up when another sync call succeeds
783            // - this is valid, as the underlying sync flushes entire mmaped region, hence
784            //   even if the last call failed, and the new one succeeded, we do get the durability
785            //   guarenty for the old data as well
786
787            match core.file.sync() {
788                Err(err) => core.set_sync_error(err),
789                Ok(()) => {
790                    core.incr_epoch();
791                    let _g = match core.durable_lock.lock() {
792                        Ok(g) => g,
793                        Err(e) => {
794                            core.set_sync_error(new_err_raw(FPErr::Lpn, e));
795                            return;
796                        }
797                    };
798
799                    core.durable_cv.notify_all();
800                    core.clear_sync_error();
801                }
802            }
803
804            drop(io_lock);
805            guard = match core.lock.lock() {
806                Ok(g) => g,
807                Err(e) => {
808                    core.set_sync_error(new_err_raw(FPErr::Lpn, e));
809                    return;
810                }
811            };
812        }
813    }
814}
815
816unsafe impl Send for Core {}
817unsafe impl Sync for Core {}
818
819#[derive(Debug)]
820struct WriteReq {
821    index: usize,
822    chunks: usize,
823    alloc: bpool::Allocation,
824}
825
826impl WriteReq {
827    fn new(index: usize, chunks: usize, alloc: bpool::Allocation) -> Self {
828        Self { alloc, index, chunks }
829    }
830}
831
832#[cfg(test)]
833mod tests {
834    use super::*;
835    use crate::{
836        bpool::{BPBackend, BPCfg, BufPool},
837        error::TEST_MID,
838        ffile::{FFCfg, FrozenFile},
839    };
840    use std::sync::{Arc, Barrier};
841    use std::thread;
842    use std::time::{Duration, Instant};
843
844    const CHUNK: usize = 0x20;
845    const INIT: usize = 0x20;
846    const FLUSH: Duration = Duration::from_micros(10);
847
848    fn new_env() -> (tempfile::TempDir, FrozenPipe) {
849        let dir = tempfile::tempdir().unwrap();
850        let path = dir.path().join("tmp_pipe");
851
852        let file = FrozenFile::new(FFCfg {
853            mid: TEST_MID,
854            path,
855            chunk_size: CHUNK,
856            initial_chunk_amount: INIT,
857        })
858        .unwrap();
859        let pool = BufPool::new(BPCfg {
860            mid: TEST_MID,
861            chunk_size: CHUNK,
862            backend: BPBackend::Prealloc { capacity: 0x100 },
863        });
864
865        let pipe = FrozenPipe::new(file, pool, FLUSH).unwrap();
866
867        (dir, pipe)
868    }
869
870    mod lifecycle {
871        use super::*;
872
873        #[test]
874        fn ok_new() {
875            let (_dir, pipe) = new_env();
876            assert_eq!(pipe.core.epoch.load(atomic::Ordering::Acquire), 0);
877        }
878
879        #[test]
880        fn ok_drop() {
881            let (_dir, pipe) = new_env();
882            drop(pipe);
883        }
884    }
885
886    mod fp_write {
887        use super::*;
888
889        #[test]
890        fn ok_write_and_wait() {
891            let (_dir, pipe) = new_env();
892
893            let buf = vec![0xAB; CHUNK];
894            let epoch = pipe.write(&buf, 0).unwrap();
895            pipe.wait_for_durability(epoch).unwrap();
896        }
897
898        #[test]
899        fn ok_write_multiple_chunks() {
900            let (_dir, pipe) = new_env();
901
902            let buf = vec![0xAA; CHUNK * 4];
903            let epoch = pipe.write(&buf, 0).unwrap();
904            pipe.wait_for_durability(epoch).unwrap();
905        }
906
907        #[test]
908        fn ok_force_durability() {
909            let (_dir, pipe) = new_env();
910
911            let buf = vec![1u8; CHUNK];
912            let epoch = pipe.write(&buf, 0).unwrap();
913            pipe.force_durability(epoch).unwrap();
914        }
915
916        #[test]
917        fn ok_write_epoch_monotonic() {
918            let (_dir, pipe) = new_env();
919            let buf = vec![1u8; CHUNK];
920
921            let e1 = pipe.write(&buf, 0).unwrap();
922            pipe.wait_for_durability(e1).unwrap();
923
924            let e2 = pipe.write(&buf, 1).unwrap();
925            pipe.wait_for_durability(e2).unwrap();
926
927            assert!(e2 >= e1);
928        }
929
930        #[test]
931        fn ok_write_large() {
932            let (_dir, pipe) = new_env();
933            let buf = vec![0xAB; CHUNK * 0x80];
934
935            let epoch = pipe.write(&buf, 0).unwrap();
936            pipe.wait_for_durability(epoch).unwrap();
937        }
938
939        #[test]
940        fn ok_write_large_batch() {
941            let (_dir, pipe) = new_env();
942
943            for i in 0..0x100 {
944                let buf = vec![i as u8; CHUNK];
945                let epoch = pipe.write(&buf, i).unwrap();
946                pipe.wait_for_durability(epoch).unwrap();
947            }
948        }
949
950        #[test]
951        fn ok_write_is_blocked_at_pool_exhaustion_for_prealloc_backend() {
952            let dir = tempfile::tempdir().unwrap();
953            let path = dir.path().join("tmp_pipe");
954
955            let file = FrozenFile::new(FFCfg {
956                mid: TEST_MID,
957                path,
958                chunk_size: CHUNK,
959                initial_chunk_amount: INIT,
960            })
961            .unwrap();
962
963            let pool = BufPool::new(BPCfg {
964                mid: TEST_MID,
965                chunk_size: CHUNK,
966                backend: BPBackend::Prealloc { capacity: 1 },
967            });
968
969            let pipe = Arc::new(FrozenPipe::new(file, pool, FLUSH).unwrap());
970
971            let p2 = pipe.clone();
972            let t = thread::spawn(move || {
973                let buf = vec![1u8; CHUNK];
974                let epoch = p2.write(&buf, 0).unwrap();
975                p2.wait_for_durability(epoch).unwrap();
976            });
977
978            thread::sleep(Duration::from_millis(0x0A));
979
980            let buf = vec![2u8; CHUNK];
981            let epoch = pipe.write(&buf, 1).unwrap();
982            pipe.wait_for_durability(epoch).unwrap();
983
984            t.join().unwrap();
985        }
986    }
987
988    mod fp_read {
989        use super::*;
990
991        #[test]
992        fn ok_read_single_after_write() {
993            let (_dir, pipe) = new_env();
994
995            let buf = vec![0xAB; CHUNK];
996            let epoch = pipe.write(&buf, 0).unwrap();
997            pipe.wait_for_durability(epoch).unwrap();
998
999            let read = pipe.read_single(0).unwrap();
1000            assert_eq!(read, buf);
1001        }
1002
1003        #[test]
1004        fn ok_read_2x() {
1005            let (_dir, pipe) = new_env();
1006
1007            let buf = vec![0xAA; CHUNK * 2];
1008            let epoch = pipe.write(&buf, 0).unwrap();
1009            pipe.wait_for_durability(epoch).unwrap();
1010
1011            let read = pipe.read(0, 2).unwrap();
1012            assert_eq!(read, buf);
1013        }
1014
1015        #[test]
1016        fn ok_read_4x() {
1017            let (_dir, pipe) = new_env();
1018
1019            let buf = vec![0xBB; CHUNK * 4];
1020            let epoch = pipe.write(&buf, 0).unwrap();
1021            pipe.wait_for_durability(epoch).unwrap();
1022
1023            let read = pipe.read(0, 4).unwrap();
1024            assert_eq!(read, buf);
1025        }
1026
1027        #[test]
1028        fn ok_read_multi_generic() {
1029            let (_dir, pipe) = new_env();
1030
1031            let buf = vec![0xCC; CHUNK * 6];
1032            let epoch = pipe.write(&buf, 0).unwrap();
1033            pipe.wait_for_durability(epoch).unwrap();
1034
1035            let read = pipe.read(0, 6).unwrap();
1036            assert_eq!(read, buf);
1037        }
1038
1039        #[test]
1040        fn ok_read_multiple_indices() {
1041            let (_dir, pipe) = new_env();
1042
1043            for i in 0..8 {
1044                let buf = vec![i as u8; CHUNK];
1045                let epoch = pipe.write(&buf, i).unwrap();
1046                pipe.wait_for_durability(epoch).unwrap();
1047            }
1048
1049            for i in 0..8 {
1050                let read = pipe.read_single(i).unwrap();
1051                assert_eq!(read, vec![i as u8; CHUNK]);
1052            }
1053        }
1054
1055        #[test]
1056        fn ok_overwrite_same_index() {
1057            let (_dir, pipe) = new_env();
1058
1059            let buf1 = vec![0xAA; CHUNK];
1060            let e1 = pipe.write(&buf1, 0).unwrap();
1061            pipe.wait_for_durability(e1).unwrap();
1062
1063            let buf2 = vec![0xBB; CHUNK];
1064            let e2 = pipe.write(&buf2, 0).unwrap();
1065            pipe.wait_for_durability(e2).unwrap();
1066
1067            let read = pipe.read_single(0).unwrap();
1068            assert_eq!(read, buf2);
1069        }
1070
1071        #[test]
1072        fn ok_large_read_multi() {
1073            let (_dir, pipe) = new_env();
1074
1075            let buf = vec![0x7A; CHUNK * 0x10];
1076            let epoch = pipe.write(&buf, 0).unwrap();
1077            pipe.wait_for_durability(epoch).unwrap();
1078
1079            let read = pipe.read(0, 0x10).unwrap();
1080            assert_eq!(read, buf);
1081        }
1082
1083        #[test]
1084        fn ok_read_concurrent() {
1085            const THREADS: usize = 8;
1086
1087            let (_dir, pipe) = new_env();
1088            let pipe = Arc::new(pipe);
1089
1090            for i in 0..THREADS {
1091                let buf = vec![i as u8; CHUNK];
1092                let epoch = pipe.write(&buf, i).unwrap();
1093                pipe.wait_for_durability(epoch).unwrap();
1094            }
1095
1096            let mut handles = Vec::new();
1097
1098            for i in 0..THREADS {
1099                let pipe = pipe.clone();
1100
1101                handles.push(thread::spawn(move || {
1102                    let read = pipe.read_single(i).unwrap();
1103                    assert_eq!(read, vec![i as u8; CHUNK]);
1104                }));
1105            }
1106
1107            for h in handles {
1108                h.join().unwrap();
1109            }
1110        }
1111
1112        #[test]
1113        fn ok_concurrent_read_write() {
1114            let (_dir, pipe) = new_env();
1115            let pipe = Arc::new(pipe);
1116
1117            let writer = {
1118                let pipe = pipe.clone();
1119                thread::spawn(move || {
1120                    for i in 0..0x40 {
1121                        let buf = vec![i as u8; CHUNK];
1122                        let epoch = pipe.write(&buf, i).unwrap();
1123                        pipe.wait_for_durability(epoch).unwrap();
1124                    }
1125                })
1126            };
1127
1128            let reader = {
1129                let pipe = pipe.clone();
1130                thread::spawn(move || {
1131                    for _ in 0..0x40 {
1132                        let _ = pipe.read_single(0);
1133                    }
1134                })
1135            };
1136
1137            writer.join().unwrap();
1138            reader.join().unwrap();
1139        }
1140
1141        #[test]
1142        fn ok_read_after_grow() {
1143            let (_dir, pipe) = new_env();
1144
1145            pipe.grow(8).unwrap();
1146
1147            let buf = vec![0x5A; CHUNK];
1148            let epoch = pipe.write(&buf, INIT).unwrap();
1149            pipe.wait_for_durability(epoch).unwrap();
1150
1151            let read = pipe.read_single(INIT).unwrap();
1152            assert_eq!(read, buf);
1153        }
1154    }
1155
1156    mod batching {
1157        use super::*;
1158
1159        #[test]
1160        fn ok_multiple_writes_single_batch() {
1161            let (_dir, pipe) = new_env();
1162
1163            let mut epochs = Vec::new();
1164            for i in 0..0x10 {
1165                let buf = vec![i as u8; CHUNK];
1166                epochs.push(pipe.write(&buf, i).unwrap());
1167            }
1168
1169            for e in epochs {
1170                pipe.wait_for_durability(e).unwrap();
1171            }
1172
1173            assert!(pipe.core.epoch.load(atomic::Ordering::Acquire) > 0);
1174        }
1175    }
1176
1177    mod fp_grow {
1178        use super::*;
1179
1180        #[test]
1181        fn ok_grow_file() {
1182            let (_dir, pipe) = new_env();
1183            let curr_len = pipe.core.file.length().unwrap();
1184
1185            pipe.grow(0x10).unwrap();
1186            let new_len = pipe.core.file.length().unwrap();
1187
1188            assert_eq!(new_len, curr_len + (0x10 * pipe.core.chunk_size));
1189        }
1190
1191        #[test]
1192        fn ok_write_after_grow() {
1193            let (_dir, pipe) = new_env();
1194            pipe.grow(0x10).unwrap();
1195
1196            let buf = vec![0xBB; CHUNK];
1197            let epoch = pipe.write(&buf, INIT).unwrap();
1198            pipe.wait_for_durability(epoch).unwrap();
1199        }
1200
1201        #[test]
1202        fn ok_grow_while_writing() {
1203            let (_dir, pipe) = new_env();
1204            let pipe = Arc::new(pipe);
1205            let curr_len = pipe.core.file.length().unwrap();
1206
1207            let p2 = pipe.clone();
1208            let writer = thread::spawn(move || {
1209                for i in 0..INIT {
1210                    let buf = vec![1u8; CHUNK];
1211                    let epoch = p2.write(&buf, i).unwrap();
1212                    p2.wait_for_durability(epoch).unwrap();
1213                }
1214            });
1215
1216            thread::sleep(Duration::from_millis(10));
1217
1218            pipe.grow(0x3A).unwrap();
1219            writer.join().unwrap();
1220
1221            let new_len = pipe.core.file.length().unwrap();
1222            assert_eq!(new_len, curr_len + (0x3A * pipe.core.chunk_size));
1223        }
1224    }
1225
1226    mod concurrency {
1227        use super::*;
1228
1229        #[test]
1230        fn ok_multi_writer() {
1231            const THREADS: usize = 8;
1232            const ITERS: usize = 0x100;
1233
1234            let (_dir, pipe) = new_env();
1235            let pipe = Arc::new(pipe);
1236
1237            let mut handles = Vec::new();
1238            for t in 0..THREADS {
1239                let pipe = pipe.clone();
1240
1241                handles.push(thread::spawn(move || {
1242                    for i in 0..ITERS {
1243                        let buf = vec![t as u8; CHUNK];
1244                        let epoch = pipe.write(&buf, i).unwrap();
1245                        pipe.wait_for_durability(epoch).unwrap();
1246                    }
1247                }));
1248            }
1249
1250            for h in handles {
1251                h.join().unwrap();
1252            }
1253        }
1254
1255        #[test]
1256        fn ok_barrier_start_parallel_writes() {
1257            const THREADS: usize = 8;
1258
1259            let (_dir, pipe) = new_env();
1260            let pipe = Arc::new(pipe);
1261            let barrier = Arc::new(Barrier::new(THREADS));
1262
1263            let mut handles = Vec::new();
1264
1265            for i in 0..THREADS {
1266                let pipe = pipe.clone();
1267                let barrier = barrier.clone();
1268
1269                handles.push(thread::spawn(move || {
1270                    barrier.wait();
1271
1272                    let buf = vec![i as u8; CHUNK];
1273                    let epoch = pipe.write(&buf, i).unwrap();
1274                    pipe.wait_for_durability(epoch).unwrap();
1275                }));
1276            }
1277
1278            for h in handles {
1279                h.join().unwrap();
1280            }
1281        }
1282    }
1283
1284    mod durability_wait {
1285        use super::*;
1286
1287        #[test]
1288        fn ok_wait_blocks_until_flush() {
1289            let (_dir, pipe) = new_env();
1290
1291            let buf = vec![0x55; CHUNK];
1292            let epoch = pipe.write(&buf, 0).unwrap();
1293
1294            let start = Instant::now();
1295            pipe.wait_for_durability(epoch).unwrap();
1296
1297            assert!(start.elapsed() >= Duration::from_micros(1));
1298        }
1299
1300        #[test]
1301        fn ok_force_durability_concurrent() {
1302            let (_dir, pipe) = new_env();
1303            let pipe = Arc::new(pipe);
1304
1305            let mut handles = Vec::new();
1306            for i in 0..0x0A {
1307                let pipe = pipe.clone();
1308
1309                handles.push(thread::spawn(move || {
1310                    let buf = vec![i as u8; CHUNK];
1311                    let epoch = pipe.write(&buf, i).unwrap();
1312                    pipe.force_durability(epoch).unwrap();
1313                }));
1314            }
1315
1316            for h in handles {
1317                h.join().unwrap();
1318            }
1319        }
1320    }
1321
1322    mod shutdown {
1323        use super::*;
1324
1325        #[test]
1326        fn ok_drop_with_pending_writes() {
1327            let (_dir, pipe) = new_env();
1328
1329            let buf = vec![0xAA; CHUNK];
1330            pipe.write(&buf, 0).unwrap();
1331            drop(pipe);
1332        }
1333
1334        #[test]
1335        fn ok_drop_during_activity() {
1336            let (_dir, pipe) = new_env();
1337            let pipe = Arc::new(pipe);
1338
1339            let p2 = pipe.clone();
1340
1341            let handle = thread::spawn(move || {
1342                let buf = vec![1u8; CHUNK];
1343                let epoch = p2.write(&buf, 0).unwrap();
1344                p2.wait_for_durability(epoch).unwrap();
1345            });
1346
1347            thread::sleep(Duration::from_millis(10));
1348            drop(pipe);
1349
1350            handle.join().unwrap();
1351        }
1352
1353        #[test]
1354        fn ok_drop_while_writer_waiting() {
1355            let (_dir, pipe) = new_env();
1356            let pipe = Arc::new(pipe);
1357
1358            let p2 = pipe.clone();
1359            let handle = thread::spawn(move || {
1360                for i in 0..0x80 {
1361                    let buf = vec![1u8; CHUNK];
1362                    let epoch = p2.write(&buf, i).unwrap();
1363                    p2.wait_for_durability(epoch).unwrap();
1364                }
1365            });
1366
1367            thread::sleep(Duration::from_millis(0x0A));
1368            drop(pipe);
1369
1370            handle.join().unwrap();
1371        }
1372    }
1373}