Skip to main content

frozen_core/
wpipe.rs

1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50    |         0.091 |          0.275 |
19//! | P90    |         0.092 |          0.458 |
20//! | P99    |         0.825 |          0.917 |
21//! | Mean   |         1.185 |          3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FrozenFileCfg {
45//!     path,
46//!     module_id: MODULE_ID,
47//!     initial_available_buffers: 0x400,
48//!     buffer_size: BUFFER_SIZE as usize,
49//! };
50//! let file = sync::Arc::new(
51//!     ffile::FrozenFile::new(file_cfg)
52//!         .expect("file creation should succeed"),
53//! );
54//!
55//! let pool_cfg = bufpool::BufPoolCfg {
56//!     buffer_size: utils::BufferSize::S128,
57//!     max_memory: 0x400 * BUFFER_SIZE as usize,
58//! };
59//! let pool = bufpool::BufPool::new(pool_cfg);
60//!
61//! let pipe_cfg = wpipe::WritePipeCfg {
62//!     module_id: MODULE_ID,
63//!     flush_duration: time::Duration::from_millis(1),
64//! };
65//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
66//!     .expect("pipe creation should succeed");
67//!
68//! let payload = [0xAAu8; BUFFER_SIZE as usize];
69//!
70//! let mut latest_ticket = None;
71//! for slot_index in 0..3 {
72//!     let allocation = pool.allocate(1);
73//!
74//!     unsafe {
75//!         ptr::copy_nonoverlapping(
76//!             payload.as_ptr(),
77//!             allocation.first(),
78//!             payload.len(),
79//!         );
80//!     }
81//!
82//!     let ticket = pipe
83//!         .write(wpipe::WriteRequest {
84//!             allocation,
85//!             slot_index,
86//!         })
87//!         .expect("write should succeed");
88//!
89//!     latest_ticket = Some(ticket);
90//! }
91//!
92//! let durable_epoch = futures::executor::block_on(
93//!     latest_ticket.expect("ticket should exist"),
94//! )
95//! .expect("writes should become durable");
96//!
97//! assert!(durable_epoch >= 3);
98//! ```
99
100use crate::{
101    bufpool,
102    error::{FrozenError, FrozenResult},
103    ffile, hints, mpscq,
104};
105use std::{
106    ptr,
107    sync::{self, atomic},
108    thread, time,
109};
110
111/// All the available configurations for [`WritePipe`]
112///
113/// ## Example
114///
115/// ```
116/// use frozen_core::wpipe::WritePipeCfg;
117///
118/// let cfg = WritePipeCfg {
119///     module_id: 2,
120///     flush_duration: std::time::Duration::from_millis(0x0A),
121/// };
122///
123/// assert_ne!(cfg.module_id, 0);
124/// assert_ne!(cfg.flush_duration.as_millis(), 0);
125/// ```
126#[derive(Debug, Clone)]
127pub struct WritePipeCfg {
128    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
129    pub module_id: u8,
130
131    /// Time interval used by the background thread to perform hard sync for all the write
132    /// operations submitted in the last durability window
133    pub flush_duration: time::Duration,
134}
135
136/// A low latency asynchronous write pipeline for buffer based storage
137///
138/// ## Design
139///
140/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
141/// pushing the bytes to be written into the MPSC queue.
142///
143/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
144/// hard sync right after. This provides durability for all the writes submitted within the same
145/// [`WritePipeCfg::flush_duration`] batching window.
146///
147/// ## Example
148///
149/// ```
150/// use frozen_core::{bufpool, ffile, utils, wpipe};
151/// use std::{ptr, sync, time};
152///
153/// const MODULE_ID: u8 = 0x00;
154/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
155///
156/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
157/// let path = dir.path().join("wpipe_example");
158///
159/// let file_cfg = ffile::FrozenFileCfg {
160///     path,
161///     module_id: MODULE_ID,
162///     initial_available_buffers: 0x400,
163///     buffer_size: BUFFER_SIZE as usize,
164/// };
165/// let file = sync::Arc::new(
166///     ffile::FrozenFile::new(file_cfg)
167///         .expect("file creation should succeed"),
168/// );
169///
170/// let pool_cfg = bufpool::BufPoolCfg {
171///     buffer_size: utils::BufferSize::S128,
172///     max_memory: 0x400 * BUFFER_SIZE as usize,
173/// };
174/// let pool = bufpool::BufPool::new(pool_cfg);
175///
176/// let pipe_cfg = wpipe::WritePipeCfg {
177///     module_id: MODULE_ID,
178///     flush_duration: time::Duration::from_millis(1),
179/// };
180/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
181///     .expect("pipe creation should succeed");
182///
183/// let payload = [0xAAu8; BUFFER_SIZE as usize];
184///
185/// let mut latest_ticket = None;
186/// for slot_index in 0..3 {
187///     let allocation = pool.allocate(1);
188///
189///     unsafe {
190///         ptr::copy_nonoverlapping(
191///             payload.as_ptr(),
192///             allocation.first(),
193///             payload.len(),
194///         );
195///     }
196///
197///     let ticket = pipe
198///         .write(wpipe::WriteRequest {
199///             allocation,
200///             slot_index,
201///         })
202///         .expect("write should succeed");
203///
204///     latest_ticket = Some(ticket);
205/// }
206///
207/// let durable_epoch = futures::executor::block_on(
208///     latest_ticket.expect("ticket should exist"),
209/// )
210/// .expect("writes should become durable");
211///
212/// assert!(durable_epoch >= 3);
213/// ```
214#[derive(Debug)]
215pub struct WritePipe {
216    core: sync::Arc<Core>,
217    flush_tx_handle: Option<thread::JoinHandle<()>>,
218}
219
220unsafe impl Send for WritePipe {}
221unsafe impl Sync for WritePipe {}
222
223impl WritePipe {
224    /// Create a new instance of [`WritePipe`]
225    ///
226    /// ## Example
227    ///
228    /// ```
229    /// use frozen_core::{bufpool, ffile, utils, wpipe};
230    /// use std::{ptr, sync, time};
231    ///
232    /// const MODULE_ID: u8 = 0x00;
233    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
234    ///
235    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
236    /// let path = dir.path().join("wpipe_example");
237    ///
238    /// let file_cfg = ffile::FrozenFileCfg {
239    ///     path,
240    ///     module_id: MODULE_ID,
241    ///     initial_available_buffers: 0x400,
242    ///     buffer_size: BUFFER_SIZE as usize,
243    /// };
244    /// let file = sync::Arc::new(
245    ///     ffile::FrozenFile::new(file_cfg)
246    ///         .expect("file creation should succeed"),
247    /// );
248    ///
249    /// let pool_cfg = bufpool::BufPoolCfg {
250    ///     buffer_size: utils::BufferSize::S128,
251    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
252    /// };
253    /// let pool = bufpool::BufPool::new(pool_cfg);
254    ///
255    /// let pipe_cfg = wpipe::WritePipeCfg {
256    ///     module_id: MODULE_ID,
257    ///     flush_duration: time::Duration::from_millis(1),
258    /// };
259    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
260    ///     .expect("pipe creation should succeed");
261    ///
262    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
263    /// let allocation = pool.allocate(1);
264    ///
265    /// unsafe {ptr::copy_nonoverlapping(
266    ///     payload.as_ptr(),
267    ///     allocation.first(),
268    ///     payload.len()
269    /// )};
270    ///
271    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
272    ///
273    /// assert!(
274    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
275    ///     .is_ok()
276    /// );
277    /// ```
278    #[inline]
279    pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
280        let core = sync::Arc::new(Core::new(file));
281        let cloned_core = core.clone();
282        let flush_tx_handle = match thread::Builder::new()
283            .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
284            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
285        {
286            Ok(handle) => Some(handle),
287            Err(observed_error) => return Err(err::new_error(cfg.module_id, err::FXE, observed_error)),
288        };
289
290        Ok(Self { core: core, flush_tx_handle })
291    }
292
293    /// Push a write into [`WritePipe`]
294    ///
295    /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
296    /// wait for durability using the manual `await` on [`WriteTicket`].
297    ///
298    /// ## Example
299    ///
300    /// ```
301    /// use frozen_core::{bufpool, ffile, utils, wpipe};
302    /// use std::{ptr, sync, time};
303    ///
304    /// const MODULE_ID: u8 = 0x00;
305    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
306    ///
307    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
308    /// let path = dir.path().join("wpipe_example");
309    ///
310    /// let file_cfg = ffile::FrozenFileCfg {
311    ///     path,
312    ///     module_id: MODULE_ID,
313    ///     initial_available_buffers: 0x400,
314    ///     buffer_size: BUFFER_SIZE as usize,
315    /// };
316    /// let file = sync::Arc::new(
317    ///     ffile::FrozenFile::new(file_cfg)
318    ///         .expect("file creation should succeed"),
319    /// );
320    ///
321    /// let pool_cfg = bufpool::BufPoolCfg {
322    ///     buffer_size: utils::BufferSize::S128,
323    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
324    /// };
325    /// let pool = bufpool::BufPool::new(pool_cfg);
326    ///
327    /// let pipe_cfg = wpipe::WritePipeCfg {
328    ///     module_id: MODULE_ID,
329    ///     flush_duration: time::Duration::from_millis(1),
330    /// };
331    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
332    ///     .expect("pipe creation should succeed");
333    ///
334    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
335    /// let allocation = pool.allocate(1);
336    ///
337    /// unsafe {ptr::copy_nonoverlapping(
338    ///     payload.as_ptr(),
339    ///     allocation.first(),
340    ///     payload.len()
341    /// )};
342    ///
343    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
344    ///
345    /// assert!(
346    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
347    ///     .is_ok()
348    /// );
349    /// ```
350    #[inline]
351    pub fn write(&self, request: WriteRequest) -> FrozenResult<WriteTicket> {
352        let _io_lock = self.core.acquire_shared_io_lock();
353        if let Some(frozen_error) = self.core.completion.error.get() {
354            return Err(frozen_error);
355        }
356
357        let epoch = self.core.increment_current_epoch();
358        let internal_req = WriteRequestInternal { request, epoch };
359        self.core.queue.push(internal_req);
360
361        Ok(WriteTicket { epoch, completion: self.core.completion.clone() })
362    }
363}
364
365impl Drop for WritePipe {
366    fn drop(&mut self) {
367        self.core.closed.store(true, atomic::Ordering::Release);
368        self.core.flush_cv.notify_one();
369
370        if let Some(handle) = self.flush_tx_handle.take() {
371            let _ = handle.join();
372        }
373    }
374}
375
376/// A write operation submitted to [`WritePipe`]
377///
378/// The request contains the buffers to persist along with the destination slot index in the
379/// underlying [`FrozenFile`].
380///
381/// ## Example
382///
383/// ```
384/// use frozen_core::{bufpool, utils, wpipe};
385/// use std::ptr;
386///
387/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
388///
389/// let pool_cfg = bufpool::BufPoolCfg {
390///     buffer_size: utils::BufferSize::S128,
391///     max_memory: 0x400 * BUFFER_SIZE as usize,
392/// };
393/// let pool = bufpool::BufPool::new(pool_cfg);
394///
395/// let payload = vec![0x0A; BUFFER_SIZE as usize];
396/// let allocation = pool.allocate(1);
397///
398/// unsafe {ptr::copy_nonoverlapping(
399///     payload.as_ptr(),
400///     allocation.first(),
401///     payload.len()
402/// )};
403///
404/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
405/// assert!(request.slot_index >= 0);
406/// ```
407#[derive(Debug)]
408pub struct WriteRequest {
409    /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
410    ///
411    /// ## Example
412    ///
413    /// ```
414    /// use frozen_core::{bufpool, utils, wpipe};
415    /// use std::ptr;
416    ///
417    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
418    ///
419    /// let pool_cfg = bufpool::BufPoolCfg {
420    ///     buffer_size: utils::BufferSize::S128,
421    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
422    /// };
423    /// let pool = bufpool::BufPool::new(pool_cfg);
424    ///
425    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
426    /// let allocation = pool.allocate(1);
427    ///
428    /// unsafe {ptr::copy_nonoverlapping(
429    ///     payload.as_ptr(),
430    ///     allocation.first(),
431    ///     payload.len()
432    /// )};
433    ///
434    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
435    /// assert!(!request.allocation.first().is_null());
436    /// ```
437    pub allocation: bufpool::BufPoolAllocation,
438
439    /// Destination slot index where the pages of the allocation will be written from
440    ///
441    /// ## Example
442    ///
443    /// ```
444    /// use frozen_core::{bufpool, utils, wpipe};
445    /// use std::ptr;
446    ///
447    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
448    ///
449    /// let pool_cfg = bufpool::BufPoolCfg {
450    ///     buffer_size: utils::BufferSize::S128,
451    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
452    /// };
453    /// let pool = bufpool::BufPool::new(pool_cfg);
454    ///
455    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
456    /// let allocation = pool.allocate(1);
457    ///
458    /// unsafe {ptr::copy_nonoverlapping(
459    ///     payload.as_ptr(),
460    ///     allocation.first(),
461    ///     payload.len()
462    /// )};
463    ///
464    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
465    /// assert!(request.slot_index >= 0);
466    /// ```
467    pub slot_index: usize,
468}
469
470/// Durability handle associated with the submitted write operation via [`WritePipe::write`]
471///
472/// ## Epoch
473///
474/// Every ticket is assigned a monotonically increasing epoch to moniter durability
475///
476/// ## Durability Guarantee
477///
478/// If wanted, the ticket could be awaited to poll till the epoch becomes durable.
479///
480/// Once a await on ticket is completed successfully, all writes assigned to earlier epochs are
481/// also guaranteed to be durable.
482///
483/// *NOTE:* Using `await` is optional. Callers that only require fire-and-forget semantics may
484/// simply discard the returned ticket.
485///
486/// ## Example
487///
488/// ```
489/// use frozen_core::{bufpool, ffile, utils, wpipe};
490/// use std::{ptr, sync, time};
491///
492/// const MODULE_ID: u8 = 0x00;
493/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
494///
495/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
496/// let path = dir.path().join("wpipe_example");
497///
498/// let file_cfg = ffile::FrozenFileCfg {
499///     path,
500///     module_id: MODULE_ID,
501///     initial_available_buffers: 0x400,
502///     buffer_size: BUFFER_SIZE as usize,
503/// };
504/// let file = sync::Arc::new(
505///     ffile::FrozenFile::new(file_cfg)
506///         .expect("file creation should succeed"),
507/// );
508///
509/// let pool_cfg = bufpool::BufPoolCfg {
510///     buffer_size: utils::BufferSize::S128,
511///     max_memory: 0x400 * BUFFER_SIZE as usize,
512/// };
513/// let pool = bufpool::BufPool::new(pool_cfg);
514///
515/// let pipe_cfg = wpipe::WritePipeCfg {
516///     module_id: MODULE_ID,
517///     flush_duration: time::Duration::from_millis(1),
518/// };
519/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
520///     .expect("pipe creation should succeed");
521///
522/// let payload = vec![0x0A; BUFFER_SIZE as usize];
523/// let allocation = pool.allocate(1);
524///
525/// unsafe {ptr::copy_nonoverlapping(
526///     payload.as_ptr(),
527///     allocation.first(),
528///     payload.len()
529/// )};
530///
531/// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
532///     .unwrap();
533/// let epoch = ticket.epoch();
534///
535/// assert!(epoch > 0);
536/// ```
537#[derive(Debug)]
538pub struct WriteTicket {
539    epoch: u64,
540    completion: sync::Arc<Completion>,
541}
542
543impl WriteTicket {
544    /// Read assigned durability epoch for the [`WriteTicket`]
545    ///
546    /// ## Example
547    ///
548    /// ```
549    /// use frozen_core::{bufpool, ffile, utils, wpipe};
550    /// use std::{ptr, sync, time};
551    ///
552    /// const MODULE_ID: u8 = 0x00;
553    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
554    ///
555    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
556    /// let path = dir.path().join("wpipe_example");
557    ///
558    /// let file_cfg = ffile::FrozenFileCfg {
559    ///     path,
560    ///     module_id: MODULE_ID,
561    ///     initial_available_buffers: 0x400,
562    ///     buffer_size: BUFFER_SIZE as usize,
563    /// };
564    /// let file = sync::Arc::new(
565    ///     ffile::FrozenFile::new(file_cfg)
566    ///         .expect("file creation should succeed"),
567    /// );
568    ///
569    /// let pool_cfg = bufpool::BufPoolCfg {
570    ///     buffer_size: utils::BufferSize::S128,
571    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
572    /// };
573    /// let pool = bufpool::BufPool::new(pool_cfg);
574    ///
575    /// let pipe_cfg = wpipe::WritePipeCfg {
576    ///     module_id: MODULE_ID,
577    ///     flush_duration: time::Duration::from_millis(1),
578    /// };
579    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
580    ///     .expect("pipe creation should succeed");
581    ///
582    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
583    /// let allocation = pool.allocate(1);
584    ///
585    /// unsafe {ptr::copy_nonoverlapping(
586    ///     payload.as_ptr(),
587    ///     allocation.first(),
588    ///     payload.len()
589    /// )};
590    ///
591    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
592    ///     .unwrap();
593    /// let epoch = ticket.epoch();
594    ///
595    /// assert!(epoch > 0);
596    /// ```
597    pub const fn epoch(&self) -> u64 {
598        self.epoch
599    }
600
601    #[inline]
602    fn is_ready(&self) -> bool {
603        let durable = self.completion.durable_epoch.load(atomic::Ordering::Acquire);
604        if hints::likely(durable >= self.epoch) {
605            return true;
606        }
607
608        false
609    }
610}
611
612impl std::future::Future for WriteTicket {
613    type Output = FrozenResult<u64>;
614
615    #[inline(always)]
616    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
617        if self.is_ready() {
618            return std::task::Poll::Ready(Ok(self.epoch));
619        }
620
621        if let Some(frozen_error) = self.completion.error.get() {
622            return std::task::Poll::Ready(Err(frozen_error));
623        }
624
625        self.completion.waker.register(cx.waker());
626
627        if self.is_ready() {
628            return std::task::Poll::Ready(Ok(self.epoch));
629        }
630
631        if let Some(frozen_error) = self.completion.error.get() {
632            return std::task::Poll::Ready(Err(frozen_error));
633        }
634
635        std::task::Poll::Pending
636    }
637}
638
639/// ## Why we ignore [`std::sync::PoisonError`]?
640///
641/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
642/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
643/// and are completely seperated from the mutex.
644///
645/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
646/// an inconsistent state of the protected value. Since no state can be left partially modified
647/// under this lock, there is no possible consistency risk to recover from and propagating the
648/// poison error would only introduce unnecessary failures into the allocation path.
649///
650/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
651/// with the recovered guard.
652fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
653    let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
654    loop {
655        (guard, _) = core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
656
657        // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
658        // otherwise, we impose serious deadlock sort of situation for the the flusher tx
659
660        let queued_ops = core.queue.drain();
661        let closed = core.closed.load(atomic::Ordering::Acquire);
662
663        if queued_ops.is_empty() {
664            if closed {
665                return;
666            }
667
668            continue;
669        }
670
671        // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
672        // while sync is in progress
673        let _io_lock = core.acquire_exclusive_io_lock();
674
675        let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
676            Ok(res) => res,
677            Err(new_error) => {
678                core.completion.error.set(new_error);
679                core.completion.waker.wake();
680                drop(_io_lock);
681
682                continue;
683            }
684        };
685
686        // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
687        #[cfg(target_os = "linux")]
688        if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
689            core.completion.error.set(new_error);
690            core.completion.waker.wake();
691            drop(_io_lock);
692
693            continue;
694        }
695
696        // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
697        // clear it up when another call succeeds.
698        //
699        // This is valid as the underlying sync flushes entire batch all at once, hence even if the
700        // last call failed, and the new one succeeded, we do get the durability guarantee for the
701        // old data as well.
702
703        if let Err(new_error) = core.file.sync() {
704            core.completion.error.set(new_error);
705            core.completion.waker.wake();
706            drop(_io_lock);
707
708            continue;
709        } else {
710            core.completion.mark_epoch_as_durable(max_epoch);
711            core.completion.error.del();
712        }
713    }
714}
715
716#[derive(Debug)]
717struct Core {
718    completion: sync::Arc<Completion>,
719    closed: atomic::AtomicBool,
720    epoch: atomic::AtomicU64,
721    file: sync::Arc<ffile::FrozenFile>,
722    flush_cv: sync::Condvar,
723    flush_guard: sync::Mutex<()>,
724    io_lock: sync::RwLock<()>,
725    queue: mpscq::MPSCQueue<WriteRequestInternal>,
726}
727
728impl Core {
729    fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
730        Self {
731            file,
732            completion: sync::Arc::new(Completion::default()),
733            closed: atomic::AtomicBool::new(false),
734            epoch: atomic::AtomicU64::new(0),
735            flush_cv: sync::Condvar::new(),
736            flush_guard: sync::Mutex::new(()),
737            io_lock: sync::RwLock::new(()),
738            queue: mpscq::MPSCQueue::default(),
739        }
740    }
741
742    #[inline]
743    fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
744        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
745        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
746    }
747
748    #[inline]
749    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
750        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
751        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
752    }
753
754    #[inline(always)]
755    fn write_queued_ops(&self, queued_ops: Vec<WriteRequestInternal>) -> FrozenResult<(usize, usize, u64)> {
756        let mut max_epoch = 0;
757        let mut max_index = 0;
758        let mut min_index = usize::MAX;
759
760        for op in queued_ops {
761            let ops_len = op.request.allocation.length();
762            match ops_len {
763                1 => {
764                    self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
765                }
766                _ => {
767                    let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
768                    self.file.pwritev(&bufs, op.request.slot_index)?;
769                }
770            }
771
772            max_epoch = max_epoch.max(op.epoch);
773            min_index = min_index.min(op.request.slot_index);
774            max_index = max_index.max(op.request.slot_index + ops_len);
775        }
776
777        Ok((min_index, max_index, max_epoch))
778    }
779
780    #[inline]
781    fn increment_current_epoch(&self) -> u64 {
782        self.epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
783    }
784}
785
786#[derive(Debug)]
787struct WriteRequestInternal {
788    epoch: u64,
789    request: WriteRequest,
790}
791
792#[derive(Debug)]
793struct Completion {
794    durable_epoch: atomic::AtomicU64,
795    error: FlushError,
796    waker: futures::task::AtomicWaker,
797}
798
799impl Default for Completion {
800    fn default() -> Self {
801        Self {
802            durable_epoch: atomic::AtomicU64::new(0),
803            waker: futures::task::AtomicWaker::new(),
804            error: FlushError::default(),
805        }
806    }
807}
808
809impl Completion {
810    fn mark_epoch_as_durable(&self, epoch: u64) {
811        self.durable_epoch.store(epoch, atomic::Ordering::Release);
812        self.waker.wake();
813    }
814}
815
816#[derive(Debug)]
817struct FlushError(atomic::AtomicPtr<FrozenError>);
818
819impl Default for FlushError {
820    fn default() -> Self {
821        Self(atomic::AtomicPtr::new(ptr::null_mut()))
822    }
823}
824
825impl Drop for FlushError {
826    fn drop(&mut self) {
827        let err_ptr = self.0.load(atomic::Ordering::Acquire);
828        if !err_ptr.is_null() {
829            let _ = unsafe { Box::from_raw(err_ptr) };
830        }
831    }
832}
833
834impl FlushError {
835    #[inline]
836    fn get(&self) -> Option<FrozenError> {
837        let curr_err = self.0.load(atomic::Ordering::Acquire);
838        if hints::unlikely(!curr_err.is_null()) {
839            let frozen_error = unsafe { (*curr_err).clone() };
840            return Some(frozen_error);
841        }
842
843        None
844    }
845
846    #[inline]
847    fn set(&self, new_error: FrozenError) {
848        let boxed_error = Box::into_raw(Box::new(new_error));
849        let old_err = self.0.swap(boxed_error, atomic::Ordering::AcqRel);
850
851        if hints::unlikely(!old_err.is_null()) {
852            let _ = unsafe { Box::from_raw(old_err) };
853        }
854    }
855
856    #[inline]
857    fn del(&self) {
858        let old_err = self.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
859        if hints::unlikely(!old_err.is_null()) {
860            let _ = unsafe { Box::from_raw(old_err) };
861        }
862    }
863}
864
865mod err {
866    use crate::error::{ErrCode, FrozenError};
867
868    /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
869    const DOMAIN_ID: u8 = 0x02;
870
871    #[inline]
872    pub fn new_error<E: std::fmt::Display>(module_id: u8, code: ErrCode, observed_error: E) -> FrozenError {
873        FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
874    }
875
876    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
877}
878
879#[cfg(test)]
880mod tests {
881    use super::*;
882    use crate::utils::BufferSize;
883
884    const MODULE_ID: u8 = 0x00;
885    const BUFFER_SIZE: BufferSize = BufferSize::S128;
886    const INITIAL_BUFFER_AMOUT: usize = 0x200;
887    const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
888
889    fn new_objects<P: AsRef<std::path::Path>>(path: P) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
890        let file_cfg = ffile::FrozenFileCfg {
891            module_id: MODULE_ID,
892            path: path.as_ref().to_path_buf(),
893            buffer_size: BUFFER_SIZE as usize,
894            initial_available_buffers: INITIAL_BUFFER_AMOUT,
895        };
896        let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
897
898        let pool_cfg =
899            bufpool::BufPoolCfg { buffer_size: BUFFER_SIZE, max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize };
900        let pool = bufpool::BufPool::new(pool_cfg);
901
902        let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
903        let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
904
905        (file, pool, pipe)
906    }
907
908    fn prep_write(buf_ptr: *const u8, n: usize, pool: &bufpool::BufPool) -> bufpool::BufPoolAllocation {
909        let allocation = pool.allocate(n);
910        for allocated_buf in allocation.iter() {
911            unsafe { ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
912        }
913
914        allocation
915    }
916
917    fn compare_with_readback(
918        buf: &[u8],
919        read_index: usize,
920        required: usize,
921        pool: &bufpool::BufPool,
922        file: &ffile::FrozenFile,
923    ) {
924        let read_allocation = pool.allocate(required);
925        let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
926
927        file.preadv(&read_bufs, read_index).unwrap();
928
929        for read_buf in read_allocation.iter() {
930            let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
931            assert_eq!(buf, observed);
932        }
933    }
934
935    mod lifecycle {
936        use super::*;
937
938        #[test]
939        fn ok_new() {
940            let dir = tempfile::tempdir().unwrap();
941            let path = dir.path().join("write_single");
942
943            let file_cfg = ffile::FrozenFileCfg {
944                path,
945                module_id: MODULE_ID,
946                buffer_size: BUFFER_SIZE as usize,
947                initial_available_buffers: INITIAL_BUFFER_AMOUT,
948            };
949            let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
950
951            let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
952            assert!(WritePipe::new(pipe_cfg, file).is_ok());
953        }
954
955        #[test]
956        fn ok_drop() {
957            let dir = tempfile::tempdir().unwrap();
958            let path = dir.path().join("write_single");
959            let (_file, _, pipe) = new_objects(path);
960
961            drop(pipe);
962        }
963    }
964
965    mod shutdown {
966        use super::*;
967
968        #[test]
969        fn ok_drop_before_pending_write_call() {
970            let dir = tempfile::tempdir().unwrap();
971            let path = dir.path().join("write_single");
972            let (_file, pool, pipe) = new_objects(path);
973
974            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
975
976            let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
977            let request = WriteRequest { allocation, slot_index: 0 };
978
979            assert!(pipe.write(request).is_ok());
980            drop(pipe);
981        }
982
983        #[test]
984        fn ok_drop_waits_for_pending_write_call() {
985            let dir = tempfile::tempdir().unwrap();
986            let path = dir.path().join("write_single");
987
988            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
989
990            // new + write + drop
991            {
992                let (_file, pool, pipe) = new_objects(path.clone());
993
994                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
995                let request = WriteRequest { allocation, slot_index: 0 };
996
997                assert!(pipe.write(request).is_ok());
998                drop(pipe);
999            }
1000
1001            // open + readback
1002            {
1003                let (file, pool, _) = new_objects(path);
1004                compare_with_readback(&BUFFER, 0, 1, &pool, &file);
1005            }
1006        }
1007
1008        #[test]
1009        fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
1010            let dir = tempfile::tempdir().unwrap();
1011            let path = dir.path().join("write_single");
1012            let (_file, pool, pipe) = new_objects(path);
1013
1014            for i in 0..INITIAL_BUFFER_AMOUT {
1015                let buffer = vec![i as u8; BUFFER_SIZE as usize];
1016                let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1017                let request = WriteRequest { allocation, slot_index: 0 };
1018
1019                assert!(pipe.write(request).is_ok());
1020            }
1021
1022            drop(pipe);
1023        }
1024
1025        #[test]
1026        fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
1027            let dir = tempfile::tempdir().unwrap();
1028            let path = dir.path().join("write_single");
1029            let (_file, pool, pipe) = new_objects(path);
1030
1031            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1032
1033            let pipe = sync::Arc::new(pipe);
1034            let pipe2 = sync::Arc::clone(&pipe);
1035
1036            let handle = thread::spawn(move || {
1037                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1038                let request = WriteRequest { allocation, slot_index: 0 };
1039
1040                assert!(pipe2.write(request).is_ok());
1041            });
1042
1043            drop(pipe);
1044            handle.join().unwrap();
1045        }
1046    }
1047
1048    mod pipe_writes {
1049        use super::*;
1050
1051        #[test]
1052        fn ok_write() {
1053            let dir = tempfile::tempdir().unwrap();
1054            let path = dir.path().join("write_single");
1055            let (_file, pool, pipe) = new_objects(path);
1056
1057            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1058            let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
1059
1060            let request = WriteRequest { allocation, slot_index: 0 };
1061            assert!(pipe.write(request).is_ok());
1062        }
1063
1064        #[test]
1065        fn ok_write_epoch_is_monotonic() {
1066            let dir = tempfile::tempdir().unwrap();
1067            let path = dir.path().join("write_single");
1068            let (_file, pool, pipe) = new_objects(path);
1069
1070            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1071
1072            let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
1073            let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1074
1075            let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
1076            let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
1077
1078            let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
1079            let ticket3 = pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
1080
1081            assert!(ticket3.epoch() > ticket2.epoch());
1082            assert!(ticket2.epoch() > ticket1.epoch());
1083        }
1084    }
1085
1086    mod write_ticket {
1087        use super::*;
1088
1089        #[test]
1090        fn ok_readback_after_write_with_await() {
1091            let dir = tempfile::tempdir().unwrap();
1092            let path = dir.path().join("write_single");
1093            let (file, pool, pipe) = new_objects(path);
1094
1095            const REQUIRED: usize = 0x0A;
1096            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1097
1098            let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1099            let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
1100
1101            let ticket = pipe.write(request).unwrap();
1102            let ticket_epoch = ticket.epoch();
1103
1104            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1105            assert!(durable_epoch >= ticket_epoch);
1106
1107            compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
1108        }
1109
1110        #[test]
1111        fn ok_readback_after_batch_write() {
1112            let dir = tempfile::tempdir().unwrap();
1113            let path = dir.path().join("write_single");
1114            let (file, pool, pipe) = new_objects(path);
1115
1116            const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
1117                ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
1118                ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
1119                ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
1120                ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
1121                ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
1122            ];
1123
1124            let mut slot_index = 0;
1125            let mut latest_ticket = None;
1126
1127            for (buf, required) in BUFFERS {
1128                let allocation = prep_write(buf.as_ptr(), required, &pool);
1129                let request = WriteRequest { allocation, slot_index };
1130                let ticket = pipe.write(request).unwrap();
1131
1132                slot_index += required;
1133                latest_ticket = Some(ticket);
1134            }
1135
1136            assert!(latest_ticket.is_some());
1137
1138            if let Some(ticket) = latest_ticket {
1139                let ticket_epoch = ticket.epoch();
1140                let durable_epoch = futures::executor::block_on(ticket).unwrap();
1141
1142                assert!(durable_epoch >= ticket_epoch);
1143            }
1144
1145            let mut read_index = 0;
1146            for (buf, required) in BUFFERS {
1147                compare_with_readback(&buf, read_index, required, &pool, &file);
1148                read_index += required;
1149            }
1150        }
1151
1152        #[test]
1153        fn ok_multiple_concurrent_awaits() {
1154            let dir = tempfile::tempdir().unwrap();
1155            let path = dir.path().join("write_single");
1156            let (_file, pool, pipe) = new_objects(path);
1157
1158            const REQUIRED: usize = 0x0A;
1159            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1160
1161            let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1162            let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1163
1164            let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1165            let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
1166
1167            let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
1168
1169            assert!(e1.is_ok());
1170            assert!(e2.is_ok());
1171            assert!(e2.unwrap() > e1.unwrap());
1172        }
1173
1174        #[test]
1175        fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
1176            let dir = tempfile::tempdir().unwrap();
1177            let path = dir.path().join("durability_boundary");
1178
1179            let (file, pool, pipe) = new_objects(path);
1180
1181            const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1182            const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
1183            const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
1184
1185            let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
1186            let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
1187
1188            let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
1189            let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
1190
1191            let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
1192            let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
1193
1194            let epoch_a = ticket_a.epoch();
1195            let epoch_b = ticket_b.epoch();
1196            let epoch_c = ticket_c.epoch();
1197
1198            let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
1199            assert!(durable_epoch >= epoch_c);
1200            assert!(durable_epoch >= epoch_b);
1201            assert!(durable_epoch >= epoch_a);
1202
1203            compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
1204            compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
1205            compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
1206        }
1207    }
1208
1209    mod concurrency {
1210        use super::*;
1211
1212        #[test]
1213        fn ok_multi_threaded_writers() {
1214            const THREADS: usize = 4;
1215            const WRITES_PER_THREAD: usize = 0x40;
1216            const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
1217
1218            let dir = tempfile::tempdir().unwrap();
1219            let path = dir.path().join("multi_threaded_writers");
1220
1221            let (_file, pool, pipe) = new_objects(path);
1222
1223            let pipe = sync::Arc::new(pipe);
1224            let pool = sync::Arc::new(pool);
1225
1226            let mut handles = Vec::with_capacity(THREADS);
1227            for tid in 0..THREADS {
1228                let pipe = sync::Arc::clone(&pipe);
1229                let pool = sync::Arc::clone(&pool);
1230
1231                handles.push(thread::spawn(move || {
1232                    let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1233
1234                    for i in 0..WRITES_PER_THREAD {
1235                        let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1236                        let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1237                        let slot_index = tid * WRITES_PER_THREAD + i;
1238                        let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1239
1240                        tickets.push(ticket);
1241                    }
1242
1243                    tickets
1244                }));
1245            }
1246
1247            let mut tickets = Vec::new();
1248            for handle in handles {
1249                tickets.extend(handle.join().unwrap());
1250            }
1251            assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1252
1253            let mut epochs: Vec<u64> = tickets.iter().map(WriteTicket::epoch).collect();
1254            epochs.sort_unstable();
1255
1256            for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1257                assert_eq!(ed, observed);
1258            }
1259
1260            let latest_ticket = tickets.into_iter().max_by_key(WriteTicket::epoch).unwrap();
1261            let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1262            assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1263        }
1264    }
1265}