Skip to main content

frozen_core/
wpipe.rs

1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50    |         0.091 |          0.275 |
19//! | P90    |         0.092 |          0.458 |
20//! | P99    |         0.825 |          0.917 |
21//! | Mean   |         1.185 |          3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FrozenFileCfg {
45//!     path,
46//!     module_id: MODULE_ID,
47//!     initial_available_buffers: 0x400,
48//!     buffer_size: BUFFER_SIZE as usize,
49//! };
50//! let file = sync::Arc::new(
51//!     ffile::FrozenFile::new(file_cfg)
52//!         .expect("file creation should succeed"),
53//! );
54//!
55//! let pool_cfg = bufpool::BufPoolCfg {
56//!     buffer_size: utils::BufferSize::S128,
57//!     max_memory: 0x400 * BUFFER_SIZE as usize,
58//! };
59//! let pool = bufpool::BufPool::new(pool_cfg);
60//!
61//! let pipe_cfg = wpipe::WritePipeCfg {
62//!     module_id: MODULE_ID,
63//!     flush_duration: time::Duration::from_millis(1),
64//! };
65//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
66//!     .expect("pipe creation should succeed");
67//!
68//! let payload = [0xAAu8; BUFFER_SIZE as usize];
69//!
70//! let mut latest_ticket = None;
71//! for slot_index in 0..3 {
72//!     let allocation = pool.allocate(1);
73//!
74//!     unsafe {
75//!         ptr::copy_nonoverlapping(
76//!             payload.as_ptr(),
77//!             allocation.first(),
78//!             payload.len(),
79//!         );
80//!     }
81//!
82//!     let ticket = pipe
83//!         .write(wpipe::WriteRequest {
84//!             allocation,
85//!             slot_index,
86//!         })
87//!         .expect("write should succeed");
88//!
89//!     latest_ticket = Some(ticket);
90//! }
91//!
92//! let durable_epoch = futures::executor::block_on(
93//!     latest_ticket.expect("ticket should exist"),
94//! )
95//! .expect("writes should become durable");
96//!
97//! assert!(durable_epoch >= 3);
98//! ```
99
100use crate::{ack, bufpool, error::FrozenResult, ffile, mpscq};
101use std::{
102    sync::{self, atomic},
103    thread, time,
104};
105
106/// All the available configurations for [`WritePipe`]
107///
108/// ## Example
109///
110/// ```
111/// use frozen_core::wpipe::WritePipeCfg;
112///
113/// let cfg = WritePipeCfg {
114///     module_id: 2,
115///     flush_duration: std::time::Duration::from_millis(0x0A),
116/// };
117///
118/// assert_ne!(cfg.module_id, 0);
119/// assert_ne!(cfg.flush_duration.as_millis(), 0);
120/// ```
121#[derive(Debug, Clone)]
122pub struct WritePipeCfg {
123    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
124    pub module_id: u8,
125
126    /// Time interval used by the background thread to perform hard sync for all the write
127    /// operations submitted in the last durability window
128    pub flush_duration: time::Duration,
129}
130
131/// A low latency asynchronous write pipeline for buffer based storage
132///
133/// ## Design
134///
135/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
136/// pushing the bytes to be written into the MPSC queue.
137///
138/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
139/// hard sync right after. This provides durability for all the writes submitted within the same
140/// [`WritePipeCfg::flush_duration`] batching window.
141///
142/// ## Example
143///
144/// ```
145/// use frozen_core::{bufpool, ffile, utils, wpipe};
146/// use std::{ptr, sync, time};
147///
148/// const MODULE_ID: u8 = 0x00;
149/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
150///
151/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
152/// let path = dir.path().join("wpipe_example");
153///
154/// let file_cfg = ffile::FrozenFileCfg {
155///     path,
156///     module_id: MODULE_ID,
157///     initial_available_buffers: 0x400,
158///     buffer_size: BUFFER_SIZE as usize,
159/// };
160/// let file = sync::Arc::new(
161///     ffile::FrozenFile::new(file_cfg)
162///         .expect("file creation should succeed"),
163/// );
164///
165/// let pool_cfg = bufpool::BufPoolCfg {
166///     buffer_size: utils::BufferSize::S128,
167///     max_memory: 0x400 * BUFFER_SIZE as usize,
168/// };
169/// let pool = bufpool::BufPool::new(pool_cfg);
170///
171/// let pipe_cfg = wpipe::WritePipeCfg {
172///     module_id: MODULE_ID,
173///     flush_duration: time::Duration::from_millis(1),
174/// };
175/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
176///     .expect("pipe creation should succeed");
177///
178/// let payload = [0xAAu8; BUFFER_SIZE as usize];
179///
180/// let mut latest_ticket = None;
181/// for slot_index in 0..3 {
182///     let allocation = pool.allocate(1);
183///
184///     unsafe {
185///         ptr::copy_nonoverlapping(
186///             payload.as_ptr(),
187///             allocation.first(),
188///             payload.len(),
189///         );
190///     }
191///
192///     let ticket = pipe
193///         .write(wpipe::WriteRequest {
194///             allocation,
195///             slot_index,
196///         })
197///         .expect("write should succeed");
198///
199///     latest_ticket = Some(ticket);
200/// }
201///
202/// let durable_epoch = futures::executor::block_on(
203///     latest_ticket.expect("ticket should exist"),
204/// )
205/// .expect("writes should become durable");
206///
207/// assert!(durable_epoch >= 3);
208/// ```
209#[derive(Debug)]
210pub struct WritePipe {
211    core: sync::Arc<Core>,
212    flush_tx_handle: Option<thread::JoinHandle<()>>,
213}
214
215unsafe impl Send for WritePipe {}
216unsafe impl Sync for WritePipe {}
217
218impl WritePipe {
219    /// Create a new instance of [`WritePipe`]
220    ///
221    /// ## Example
222    ///
223    /// ```
224    /// use frozen_core::{bufpool, ffile, utils, wpipe};
225    /// use std::{ptr, sync, time};
226    ///
227    /// const MODULE_ID: u8 = 0x00;
228    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
229    ///
230    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
231    /// let path = dir.path().join("wpipe_example");
232    ///
233    /// let file_cfg = ffile::FrozenFileCfg {
234    ///     path,
235    ///     module_id: MODULE_ID,
236    ///     initial_available_buffers: 0x400,
237    ///     buffer_size: BUFFER_SIZE as usize,
238    /// };
239    /// let file = sync::Arc::new(
240    ///     ffile::FrozenFile::new(file_cfg)
241    ///         .expect("file creation should succeed"),
242    /// );
243    ///
244    /// let pool_cfg = bufpool::BufPoolCfg {
245    ///     buffer_size: utils::BufferSize::S128,
246    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
247    /// };
248    /// let pool = bufpool::BufPool::new(pool_cfg);
249    ///
250    /// let pipe_cfg = wpipe::WritePipeCfg {
251    ///     module_id: MODULE_ID,
252    ///     flush_duration: time::Duration::from_millis(1),
253    /// };
254    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
255    ///     .expect("pipe creation should succeed");
256    ///
257    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
258    /// let allocation = pool.allocate(1);
259    ///
260    /// unsafe {ptr::copy_nonoverlapping(
261    ///     payload.as_ptr(),
262    ///     allocation.first(),
263    ///     payload.len()
264    /// )};
265    ///
266    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
267    ///
268    /// assert!(
269    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
270    ///     .is_ok()
271    /// );
272    /// ```
273    #[inline]
274    pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
275        let core = sync::Arc::new(Core::new(file));
276        let cloned_core = core.clone();
277        let flush_tx_handle = match thread::Builder::new()
278            .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
279            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
280        {
281            Ok(handle) => Some(handle),
282            Err(observed_error) => {
283                return Err(err::new_error(cfg.module_id, err::FXE, observed_error));
284            }
285        };
286
287        Ok(Self { core: core, flush_tx_handle })
288    }
289
290    /// Push a write into [`WritePipe`]
291    ///
292    /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
293    /// wait for durability using the manual `await` on [`WriteTicket`].
294    ///
295    /// ## Example
296    ///
297    /// ```
298    /// use frozen_core::{bufpool, ffile, utils, wpipe};
299    /// use std::{ptr, sync, time};
300    ///
301    /// const MODULE_ID: u8 = 0x00;
302    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
303    ///
304    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
305    /// let path = dir.path().join("wpipe_example");
306    ///
307    /// let file_cfg = ffile::FrozenFileCfg {
308    ///     path,
309    ///     module_id: MODULE_ID,
310    ///     initial_available_buffers: 0x400,
311    ///     buffer_size: BUFFER_SIZE as usize,
312    /// };
313    /// let file = sync::Arc::new(
314    ///     ffile::FrozenFile::new(file_cfg)
315    ///         .expect("file creation should succeed"),
316    /// );
317    ///
318    /// let pool_cfg = bufpool::BufPoolCfg {
319    ///     buffer_size: utils::BufferSize::S128,
320    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
321    /// };
322    /// let pool = bufpool::BufPool::new(pool_cfg);
323    ///
324    /// let pipe_cfg = wpipe::WritePipeCfg {
325    ///     module_id: MODULE_ID,
326    ///     flush_duration: time::Duration::from_millis(1),
327    /// };
328    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
329    ///     .expect("pipe creation should succeed");
330    ///
331    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
332    /// let allocation = pool.allocate(1);
333    ///
334    /// unsafe {ptr::copy_nonoverlapping(
335    ///     payload.as_ptr(),
336    ///     allocation.first(),
337    ///     payload.len()
338    /// )};
339    ///
340    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
341    ///
342    /// assert!(
343    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
344    ///     .is_ok()
345    /// );
346    /// ```
347    #[inline]
348    pub fn write(&self, request: WriteRequest) -> FrozenResult<ack::AckTicket> {
349        let _io_lock = self.core.acquire_shared_io_lock();
350        if let Some(frozen_error) = self.core.completion.get_err() {
351            return Err(frozen_error);
352        }
353
354        let epoch = self.core.completion.increment_current_epoch();
355        let internal_req = WriteRequestInternal { request, epoch };
356        self.core.queue.push(internal_req);
357
358        Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
359    }
360}
361
362impl Drop for WritePipe {
363    fn drop(&mut self) {
364        self.core.closed.store(true, atomic::Ordering::Release);
365        self.core.flush_cv.notify_one();
366
367        if let Some(handle) = self.flush_tx_handle.take() {
368            let _ = handle.join();
369        }
370    }
371}
372
373/// A write operation submitted to [`WritePipe`]
374///
375/// The request contains the buffers to persist along with the destination slot index in the
376/// underlying [`FrozenFile`].
377///
378/// ## Example
379///
380/// ```
381/// use frozen_core::{bufpool, utils, wpipe};
382/// use std::ptr;
383///
384/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
385///
386/// let pool_cfg = bufpool::BufPoolCfg {
387///     buffer_size: utils::BufferSize::S128,
388///     max_memory: 0x400 * BUFFER_SIZE as usize,
389/// };
390/// let pool = bufpool::BufPool::new(pool_cfg);
391///
392/// let payload = vec![0x0A; BUFFER_SIZE as usize];
393/// let allocation = pool.allocate(1);
394///
395/// unsafe {ptr::copy_nonoverlapping(
396///     payload.as_ptr(),
397///     allocation.first(),
398///     payload.len()
399/// )};
400///
401/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
402/// assert!(request.slot_index >= 0);
403/// ```
404#[derive(Debug)]
405pub struct WriteRequest {
406    /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
407    ///
408    /// ## Example
409    ///
410    /// ```
411    /// use frozen_core::{bufpool, utils, wpipe};
412    /// use std::ptr;
413    ///
414    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
415    ///
416    /// let pool_cfg = bufpool::BufPoolCfg {
417    ///     buffer_size: utils::BufferSize::S128,
418    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
419    /// };
420    /// let pool = bufpool::BufPool::new(pool_cfg);
421    ///
422    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
423    /// let allocation = pool.allocate(1);
424    ///
425    /// unsafe {ptr::copy_nonoverlapping(
426    ///     payload.as_ptr(),
427    ///     allocation.first(),
428    ///     payload.len()
429    /// )};
430    ///
431    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
432    /// assert!(!request.allocation.first().is_null());
433    /// ```
434    pub allocation: bufpool::BufPoolAllocation,
435
436    /// Destination slot index where the pages of the allocation will be written from
437    ///
438    /// ## Example
439    ///
440    /// ```
441    /// use frozen_core::{bufpool, utils, wpipe};
442    /// use std::ptr;
443    ///
444    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
445    ///
446    /// let pool_cfg = bufpool::BufPoolCfg {
447    ///     buffer_size: utils::BufferSize::S128,
448    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
449    /// };
450    /// let pool = bufpool::BufPool::new(pool_cfg);
451    ///
452    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
453    /// let allocation = pool.allocate(1);
454    ///
455    /// unsafe {ptr::copy_nonoverlapping(
456    ///     payload.as_ptr(),
457    ///     allocation.first(),
458    ///     payload.len()
459    /// )};
460    ///
461    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
462    /// assert!(request.slot_index >= 0);
463    /// ```
464    pub slot_index: usize,
465}
466
467/// ## Why we ignore [`std::sync::PoisonError`]?
468///
469/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
470/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
471/// and are completely seperated from the mutex.
472///
473/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
474/// an inconsistent state of the protected value. Since no state can be left partially modified
475/// under this lock, there is no possible consistency risk to recover from and propagating the
476/// poison error would only introduce unnecessary failures into the allocation path.
477///
478/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
479/// with the recovered guard.
480fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
481    let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
482    loop {
483        (guard, _) =
484            core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
485
486        // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
487        // otherwise, we impose serious deadlock sort of situation for the the flusher tx
488
489        let queued_ops = core.queue.drain();
490        let closed = core.closed.load(atomic::Ordering::Acquire);
491
492        if queued_ops.is_empty() {
493            if closed {
494                return;
495            }
496
497            continue;
498        }
499
500        // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
501        // while sync is in progress
502        let _io_lock = core.acquire_exclusive_io_lock();
503
504        let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
505            Ok(res) => res,
506            Err(new_error) => {
507                core.completion.set_err(new_error);
508                core.completion.notify_all_listeners();
509                drop(_io_lock);
510
511                continue;
512            }
513        };
514
515        // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
516        #[cfg(target_os = "linux")]
517        if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
518            core.completion.set_err(new_error);
519            core.completion.notify_all_listeners();
520            drop(_io_lock);
521
522            continue;
523        }
524
525        // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
526        // clear it up when another call succeeds.
527        //
528        // This is valid as the underlying sync flushes entire batch all at once, hence even if the
529        // last call failed, and the new one succeeded, we do get the durability guarantee for the
530        // old data as well.
531
532        if let Err(new_error) = core.file.sync() {
533            core.completion.set_err(new_error);
534            drop(_io_lock);
535
536            continue;
537        } else {
538            core.completion.mark_epoch_as_durable(max_epoch);
539            core.completion.del_err();
540        }
541
542        core.completion.notify_all_listeners();
543    }
544}
545
546#[derive(Debug)]
547struct Core {
548    completion: sync::Arc<ack::Completion>,
549    closed: atomic::AtomicBool,
550    file: sync::Arc<ffile::FrozenFile>,
551    flush_cv: sync::Condvar,
552    flush_guard: sync::Mutex<()>,
553    io_lock: sync::RwLock<()>,
554    queue: mpscq::MPSCQueue<WriteRequestInternal>,
555}
556
557impl Core {
558    fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
559        Self {
560            file,
561            completion: sync::Arc::new(ack::Completion::default()),
562            closed: atomic::AtomicBool::new(false),
563            flush_cv: sync::Condvar::new(),
564            flush_guard: sync::Mutex::new(()),
565            io_lock: sync::RwLock::new(()),
566            queue: mpscq::MPSCQueue::default(),
567        }
568    }
569
570    #[inline]
571    fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
572        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
573        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
574    }
575
576    #[inline]
577    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
578        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
579        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
580    }
581
582    #[inline(always)]
583    fn write_queued_ops(
584        &self,
585        queued_ops: Vec<WriteRequestInternal>,
586    ) -> FrozenResult<(usize, usize, u64)> {
587        let mut max_epoch = 0;
588        let mut max_index = 0;
589        let mut min_index = usize::MAX;
590
591        for op in queued_ops {
592            let ops_len = op.request.allocation.length();
593            match ops_len {
594                1 => {
595                    self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
596                }
597                _ => {
598                    let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
599                    self.file.pwritev(&bufs, op.request.slot_index)?;
600                }
601            }
602
603            max_epoch = max_epoch.max(op.epoch);
604            min_index = min_index.min(op.request.slot_index);
605            max_index = max_index.max(op.request.slot_index + ops_len);
606        }
607
608        Ok((min_index, max_index, max_epoch))
609    }
610}
611
612#[derive(Debug)]
613struct WriteRequestInternal {
614    epoch: u64,
615    request: WriteRequest,
616}
617
618mod err {
619    use crate::error::{ErrCode, FrozenError};
620
621    /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
622    const DOMAIN_ID: u8 = 0x02;
623
624    #[inline]
625    pub fn new_error<E: std::fmt::Display>(
626        module_id: u8,
627        code: ErrCode,
628        observed_error: E,
629    ) -> FrozenError {
630        FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
631    }
632
633    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use crate::utils::BufferSize;
640
641    const MODULE_ID: u8 = 0x00;
642    const BUFFER_SIZE: BufferSize = BufferSize::S128;
643    const INITIAL_BUFFER_AMOUT: usize = 0x200;
644    const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
645
646    fn new_objects<P: AsRef<std::path::Path>>(
647        path: P,
648    ) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
649        let file_cfg = ffile::FrozenFileCfg {
650            module_id: MODULE_ID,
651            path: path.as_ref().to_path_buf(),
652            buffer_size: BUFFER_SIZE as usize,
653            initial_available_buffers: INITIAL_BUFFER_AMOUT,
654        };
655        let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
656
657        let pool_cfg = bufpool::BufPoolCfg {
658            buffer_size: BUFFER_SIZE,
659            max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize,
660        };
661        let pool = bufpool::BufPool::new(pool_cfg);
662
663        let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
664        let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
665
666        (file, pool, pipe)
667    }
668
669    fn prep_write(
670        buf_ptr: *const u8,
671        n: usize,
672        pool: &bufpool::BufPool,
673    ) -> bufpool::BufPoolAllocation {
674        let allocation = pool.allocate(n);
675        for allocated_buf in allocation.iter() {
676            unsafe { std::ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
677        }
678
679        allocation
680    }
681
682    fn compare_with_readback(
683        buf: &[u8],
684        read_index: usize,
685        required: usize,
686        pool: &bufpool::BufPool,
687        file: &ffile::FrozenFile,
688    ) {
689        let read_allocation = pool.allocate(required);
690        let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
691
692        file.preadv(&read_bufs, read_index).unwrap();
693
694        for read_buf in read_allocation.iter() {
695            let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
696            assert_eq!(buf, observed);
697        }
698    }
699
700    mod lifecycle {
701        use super::*;
702
703        #[test]
704        fn ok_new() {
705            let dir = tempfile::tempdir().unwrap();
706            let path = dir.path().join("write_single");
707
708            let file_cfg = ffile::FrozenFileCfg {
709                path,
710                module_id: MODULE_ID,
711                buffer_size: BUFFER_SIZE as usize,
712                initial_available_buffers: INITIAL_BUFFER_AMOUT,
713            };
714            let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
715
716            let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
717            assert!(WritePipe::new(pipe_cfg, file).is_ok());
718        }
719
720        #[test]
721        fn ok_drop() {
722            let dir = tempfile::tempdir().unwrap();
723            let path = dir.path().join("write_single");
724            let (_file, _, pipe) = new_objects(path);
725
726            drop(pipe);
727        }
728    }
729
730    mod shutdown {
731        use super::*;
732
733        #[test]
734        fn ok_drop_before_pending_write_call() {
735            let dir = tempfile::tempdir().unwrap();
736            let path = dir.path().join("write_single");
737            let (_file, pool, pipe) = new_objects(path);
738
739            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
740
741            let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
742            let request = WriteRequest { allocation, slot_index: 0 };
743
744            assert!(pipe.write(request).is_ok());
745            drop(pipe);
746        }
747
748        #[test]
749        fn ok_drop_waits_for_pending_write_call() {
750            let dir = tempfile::tempdir().unwrap();
751            let path = dir.path().join("write_single");
752
753            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
754
755            // new + write + drop
756            {
757                let (_file, pool, pipe) = new_objects(path.clone());
758
759                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
760                let request = WriteRequest { allocation, slot_index: 0 };
761
762                assert!(pipe.write(request).is_ok());
763                drop(pipe);
764            }
765
766            // open + readback
767            {
768                let (file, pool, _) = new_objects(path);
769                compare_with_readback(&BUFFER, 0, 1, &pool, &file);
770            }
771        }
772
773        #[test]
774        fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
775            let dir = tempfile::tempdir().unwrap();
776            let path = dir.path().join("write_single");
777            let (_file, pool, pipe) = new_objects(path);
778
779            for i in 0..INITIAL_BUFFER_AMOUT {
780                let buffer = vec![i as u8; BUFFER_SIZE as usize];
781                let allocation = prep_write(buffer.as_ptr(), 1, &pool);
782                let request = WriteRequest { allocation, slot_index: 0 };
783
784                assert!(pipe.write(request).is_ok());
785            }
786
787            drop(pipe);
788        }
789
790        #[test]
791        fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
792            let dir = tempfile::tempdir().unwrap();
793            let path = dir.path().join("write_single");
794            let (_file, pool, pipe) = new_objects(path);
795
796            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
797
798            let pipe = sync::Arc::new(pipe);
799            let pipe2 = sync::Arc::clone(&pipe);
800
801            let handle = thread::spawn(move || {
802                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
803                let request = WriteRequest { allocation, slot_index: 0 };
804
805                assert!(pipe2.write(request).is_ok());
806            });
807
808            drop(pipe);
809            handle.join().unwrap();
810        }
811    }
812
813    mod pipe_writes {
814        use super::*;
815
816        #[test]
817        fn ok_write() {
818            let dir = tempfile::tempdir().unwrap();
819            let path = dir.path().join("write_single");
820            let (_file, pool, pipe) = new_objects(path);
821
822            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
823            let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
824
825            let request = WriteRequest { allocation, slot_index: 0 };
826            assert!(pipe.write(request).is_ok());
827        }
828
829        #[test]
830        fn ok_write_epoch_is_monotonic() {
831            let dir = tempfile::tempdir().unwrap();
832            let path = dir.path().join("write_single");
833            let (_file, pool, pipe) = new_objects(path);
834
835            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
836
837            let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
838            let ticket1 =
839                pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
840
841            let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
842            let ticket2 =
843                pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
844
845            let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
846            let ticket3 =
847                pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
848
849            assert!(ticket3.epoch() > ticket2.epoch());
850            assert!(ticket2.epoch() > ticket1.epoch());
851        }
852    }
853
854    mod write_ticket {
855        use super::*;
856
857        #[test]
858        fn ok_readback_after_write_with_await() {
859            let dir = tempfile::tempdir().unwrap();
860            let path = dir.path().join("write_single");
861            let (file, pool, pipe) = new_objects(path);
862
863            const REQUIRED: usize = 0x0A;
864            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
865
866            let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
867            let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
868
869            let ticket = pipe.write(request).unwrap();
870            let ticket_epoch = ticket.epoch();
871
872            let durable_epoch = futures::executor::block_on(ticket).unwrap();
873            assert!(durable_epoch >= ticket_epoch);
874
875            compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
876        }
877
878        #[test]
879        fn ok_readback_after_batch_write() {
880            let dir = tempfile::tempdir().unwrap();
881            let path = dir.path().join("write_single");
882            let (file, pool, pipe) = new_objects(path);
883
884            const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
885                ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
886                ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
887                ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
888                ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
889                ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
890            ];
891
892            let mut slot_index = 0;
893            let mut latest_ticket = None;
894
895            for (buf, required) in BUFFERS {
896                let allocation = prep_write(buf.as_ptr(), required, &pool);
897                let request = WriteRequest { allocation, slot_index };
898                let ticket = pipe.write(request).unwrap();
899
900                slot_index += required;
901                latest_ticket = Some(ticket);
902            }
903
904            assert!(latest_ticket.is_some());
905
906            if let Some(ticket) = latest_ticket {
907                let ticket_epoch = ticket.epoch();
908                let durable_epoch = futures::executor::block_on(ticket).unwrap();
909
910                assert!(durable_epoch >= ticket_epoch);
911            }
912
913            let mut read_index = 0;
914            for (buf, required) in BUFFERS {
915                compare_with_readback(&buf, read_index, required, &pool, &file);
916                read_index += required;
917            }
918        }
919
920        #[test]
921        fn ok_multiple_concurrent_awaits() {
922            let dir = tempfile::tempdir().unwrap();
923            let path = dir.path().join("write_single");
924            let (_file, pool, pipe) = new_objects(path);
925
926            const REQUIRED: usize = 0x0A;
927            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
928
929            let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
930            let ticket1 =
931                pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
932
933            let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
934            let ticket2 =
935                pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
936
937            let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
938
939            assert!(e1.is_ok());
940            assert!(e2.is_ok());
941            assert!(e2.unwrap() > e1.unwrap());
942        }
943
944        #[test]
945        fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
946            let dir = tempfile::tempdir().unwrap();
947            let path = dir.path().join("durability_boundary");
948
949            let (file, pool, pipe) = new_objects(path);
950
951            const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
952            const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
953            const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
954
955            let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
956            let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
957
958            let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
959            let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
960
961            let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
962            let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
963
964            let epoch_a = ticket_a.epoch();
965            let epoch_b = ticket_b.epoch();
966            let epoch_c = ticket_c.epoch();
967
968            let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
969            assert!(durable_epoch >= epoch_c);
970            assert!(durable_epoch >= epoch_b);
971            assert!(durable_epoch >= epoch_a);
972
973            compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
974            compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
975            compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
976        }
977    }
978
979    mod concurrency {
980        use super::*;
981
982        #[test]
983        fn ok_multi_threaded_writers() {
984            const THREADS: usize = 4;
985            const WRITES_PER_THREAD: usize = 0x40;
986            const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
987
988            let dir = tempfile::tempdir().unwrap();
989            let path = dir.path().join("multi_threaded_writers");
990
991            let (_file, pool, pipe) = new_objects(path);
992
993            let pipe = sync::Arc::new(pipe);
994            let pool = sync::Arc::new(pool);
995
996            let mut handles = Vec::with_capacity(THREADS);
997            for tid in 0..THREADS {
998                let pipe = sync::Arc::clone(&pipe);
999                let pool = sync::Arc::clone(&pool);
1000
1001                handles.push(thread::spawn(move || {
1002                    let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1003
1004                    for i in 0..WRITES_PER_THREAD {
1005                        let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1006                        let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1007                        let slot_index = tid * WRITES_PER_THREAD + i;
1008                        let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1009
1010                        tickets.push(ticket);
1011                    }
1012
1013                    tickets
1014                }));
1015            }
1016
1017            let mut tickets = Vec::new();
1018            for handle in handles {
1019                tickets.extend(handle.join().unwrap());
1020            }
1021            assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1022
1023            let mut epochs: Vec<u64> = tickets.iter().map(ack::AckTicket::epoch).collect();
1024            epochs.sort_unstable();
1025
1026            for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1027                assert_eq!(ed, observed);
1028            }
1029
1030            let latest_ticket = tickets.into_iter().max_by_key(ack::AckTicket::epoch).unwrap();
1031            let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1032            assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1033        }
1034    }
1035
1036    mod parallel_listeners {
1037        use super::*;
1038
1039        #[test]
1040        fn ok_many_parallel_waiters_same_durability_window() {
1041            const WAITERS: usize = 0x20;
1042            const BUFFER: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1043
1044            let dir = tempfile::tempdir().unwrap();
1045            let path = dir.path().join("parallel_waiters");
1046
1047            let (_file, pool, pipe) = new_objects(path);
1048
1049            let mut tickets = Vec::with_capacity(WAITERS);
1050            for i in 0..WAITERS {
1051                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1052                let ticket = pipe.write(WriteRequest { allocation, slot_index: i }).unwrap();
1053
1054                tickets.push(ticket);
1055            }
1056
1057            let mut handles = Vec::with_capacity(WAITERS);
1058            for ticket in tickets {
1059                handles.push(thread::spawn(move || {
1060                    let epoch = ticket.epoch();
1061                    let durable_epoch = futures::executor::block_on(ticket).unwrap();
1062
1063                    assert!(durable_epoch >= epoch);
1064                }));
1065            }
1066
1067            for handle in handles {
1068                handle.join().unwrap();
1069            }
1070        }
1071
1072        #[test]
1073        fn ok_parallel_waiters_multiple_batches() {
1074            const THREADS: usize = 0x0A;
1075            const WRITES: usize = 0x10;
1076
1077            let dir = tempfile::tempdir().unwrap();
1078            let path = dir.path().join("parallel_batches");
1079
1080            let (_file, pool, pipe) = new_objects(path);
1081
1082            let pipe = sync::Arc::new(pipe);
1083            let pool = sync::Arc::new(pool);
1084
1085            let mut handles = Vec::new();
1086            for tid in 0..THREADS {
1087                let pipe = pipe.clone();
1088                let pool = pool.clone();
1089
1090                handles.push(thread::spawn(move || {
1091                    let buffer = [tid as u8; BUFFER_SIZE as usize];
1092
1093                    for i in 0..WRITES {
1094                        let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1095                        let ticket = pipe
1096                            .write(WriteRequest { allocation, slot_index: tid * WRITES + i })
1097                            .unwrap();
1098
1099                        futures::executor::block_on(ticket).unwrap();
1100                    }
1101                }));
1102            }
1103
1104            for handle in handles {
1105                handle.join().unwrap();
1106            }
1107        }
1108    }
1109}