Skip to main content

frozen_core/
wpipe.rs

1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50    |         0.091 |          0.275 |
19//! | P90    |         0.092 |          0.458 |
20//! | P99    |         0.825 |          0.917 |
21//! | Mean   |         1.185 |          3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FFCfg {
45//!     path,
46//!     initial_chunk_amount: 0x400,
47//!     chunk_size: BUFFER_SIZE as usize,
48//! };
49//! let file = sync::Arc::new(
50//!     ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
51//!         .expect("file creation should succeed"),
52//! );
53//!
54//! let pool_cfg = bufpool::BufPoolCfg {
55//!     buffer_size: utils::BufferSize::S128,
56//!     max_memory: 0x400 * BUFFER_SIZE as usize,
57//! };
58//! let pool = bufpool::BufPool::new(pool_cfg);
59//!
60//! let pipe_cfg = wpipe::WritePipeCfg {
61//!     module_id: MODULE_ID,
62//!     flush_duration: time::Duration::from_millis(1),
63//! };
64//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
65//!     .expect("pipe creation should succeed");
66//!
67//! let payload = [0xAAu8; BUFFER_SIZE as usize];
68//!
69//! let mut latest_ticket = None;
70//! for slot_index in 0..3 {
71//!     let allocation = pool.allocate(1);
72//!
73//!     unsafe {
74//!         ptr::copy_nonoverlapping(
75//!             payload.as_ptr(),
76//!             allocation.first(),
77//!             payload.len(),
78//!         );
79//!     }
80//!
81//!     let ticket = pipe
82//!         .write(wpipe::WriteRequest {
83//!             allocation,
84//!             slot_index,
85//!         })
86//!         .expect("write should succeed");
87//!
88//!     latest_ticket = Some(ticket);
89//! }
90//!
91//! let durable_epoch = futures::executor::block_on(
92//!     latest_ticket.expect("ticket should exist"),
93//! )
94//! .expect("writes should become durable");
95//!
96//! assert!(durable_epoch >= 3);
97//! ```
98
99use crate::{
100    bufpool,
101    error::{FrozenError, FrozenResult},
102    ffile, hints, mpscq,
103};
104use std::{
105    ptr,
106    sync::{self, atomic},
107    thread, time,
108};
109
110/// All the available configurations for [`WritePipe`]
111///
112/// ## Example
113///
114/// ```
115/// use frozen_core::wpipe::WritePipeCfg;
116///
117/// let cfg = WritePipeCfg {
118///     module_id: 2,
119///     flush_duration: std::time::Duration::from_millis(0x0A),
120/// };
121///
122/// assert_ne!(cfg.module_id, 0);
123/// assert_ne!(cfg.flush_duration.as_millis(), 0);
124/// ```
125#[derive(Debug, Clone)]
126pub struct WritePipeCfg {
127    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
128    pub module_id: u8,
129
130    /// Time interval used by the background thread to perform hard sync for all the write
131    /// operations submitted in the last durability window
132    pub flush_duration: time::Duration,
133}
134
135/// A low latency asynchronous write pipeline for buffer based storage
136///
137/// ## Design
138///
139/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
140/// pushing the bytes to be written into the MPSC queue.
141///
142/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
143/// hard sync right after. This provides durability for all the writes submitted within the same
144/// [`WritePipeCfg::flush_duration`] batching window.
145///
146/// ## Example
147///
148/// ```
149/// use frozen_core::{bufpool, ffile, utils, wpipe};
150/// use std::{ptr, sync, time};
151///
152/// const MODULE_ID: u8 = 0x00;
153/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
154///
155/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
156/// let path = dir.path().join("wpipe_example");
157///
158/// let file_cfg = ffile::FFCfg {
159///     path,
160///     initial_chunk_amount: 0x400,
161///     chunk_size: BUFFER_SIZE as usize,
162/// };
163/// let file = sync::Arc::new(
164///     ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
165///         .expect("file creation should succeed"),
166/// );
167///
168/// let pool_cfg = bufpool::BufPoolCfg {
169///     buffer_size: utils::BufferSize::S128,
170///     max_memory: 0x400 * BUFFER_SIZE as usize,
171/// };
172/// let pool = bufpool::BufPool::new(pool_cfg);
173///
174/// let pipe_cfg = wpipe::WritePipeCfg {
175///     module_id: MODULE_ID,
176///     flush_duration: time::Duration::from_millis(1),
177/// };
178/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
179///     .expect("pipe creation should succeed");
180///
181/// let payload = [0xAAu8; BUFFER_SIZE as usize];
182///
183/// let mut latest_ticket = None;
184/// for slot_index in 0..3 {
185///     let allocation = pool.allocate(1);
186///
187///     unsafe {
188///         ptr::copy_nonoverlapping(
189///             payload.as_ptr(),
190///             allocation.first(),
191///             payload.len(),
192///         );
193///     }
194///
195///     let ticket = pipe
196///         .write(wpipe::WriteRequest {
197///             allocation,
198///             slot_index,
199///         })
200///         .expect("write should succeed");
201///
202///     latest_ticket = Some(ticket);
203/// }
204///
205/// let durable_epoch = futures::executor::block_on(
206///     latest_ticket.expect("ticket should exist"),
207/// )
208/// .expect("writes should become durable");
209///
210/// assert!(durable_epoch >= 3);
211/// ```
212#[derive(Debug)]
213pub struct WritePipe {
214    core: sync::Arc<Core>,
215    flush_tx_handle: Option<thread::JoinHandle<()>>,
216}
217
218unsafe impl Send for WritePipe {}
219unsafe impl Sync for WritePipe {}
220
221impl WritePipe {
222    /// Create a new instance of [`WritePipe`]
223    ///
224    /// ## Example
225    ///
226    /// ```
227    /// use frozen_core::{bufpool, ffile, utils, wpipe};
228    /// use std::{ptr, sync, time};
229    ///
230    /// const MODULE_ID: u8 = 0x00;
231    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
232    ///
233    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
234    /// let path = dir.path().join("wpipe_example");
235    ///
236    /// let file_cfg = ffile::FFCfg {
237    ///     path,
238    ///     initial_chunk_amount: 0x400,
239    ///     chunk_size: BUFFER_SIZE as usize,
240    /// };
241    /// let file = sync::Arc::new(
242    ///     ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
243    ///         .expect("file creation should succeed"),
244    /// );
245    ///
246    /// let pool_cfg = bufpool::BufPoolCfg {
247    ///     buffer_size: utils::BufferSize::S128,
248    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
249    /// };
250    /// let pool = bufpool::BufPool::new(pool_cfg);
251    ///
252    /// let pipe_cfg = wpipe::WritePipeCfg {
253    ///     module_id: MODULE_ID,
254    ///     flush_duration: time::Duration::from_millis(1),
255    /// };
256    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
257    ///     .expect("pipe creation should succeed");
258    ///
259    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
260    /// let allocation = pool.allocate(1);
261    ///
262    /// unsafe {ptr::copy_nonoverlapping(
263    ///     payload.as_ptr(),
264    ///     allocation.first(),
265    ///     payload.len()
266    /// )};
267    ///
268    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
269    ///
270    /// assert!(
271    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
272    ///     .is_ok()
273    /// );
274    /// ```
275    #[inline]
276    pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
277        let core = sync::Arc::new(Core::new(file));
278        let cloned_core = core.clone();
279        let flush_tx_handle = match thread::Builder::new()
280            .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
281            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
282        {
283            Ok(handle) => Some(handle),
284            Err(observed_error) => return Err(err::new_error(cfg.module_id, err::FXE, observed_error)),
285        };
286
287        Ok(Self { core: core, flush_tx_handle })
288    }
289
290    /// Push a write into [`WritePipe`]
291    ///
292    /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
293    /// wait for durability using the manual `await` on [`WriteTicket`].
294    ///
295    /// ## Example
296    ///
297    /// ```
298    /// use frozen_core::{bufpool, ffile, utils, wpipe};
299    /// use std::{ptr, sync, time};
300    ///
301    /// const MODULE_ID: u8 = 0x00;
302    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
303    ///
304    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
305    /// let path = dir.path().join("wpipe_example");
306    ///
307    /// let file_cfg = ffile::FFCfg {
308    ///     path,
309    ///     initial_chunk_amount: 0x400,
310    ///     chunk_size: BUFFER_SIZE as usize,
311    /// };
312    /// let file = sync::Arc::new(
313    ///     ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
314    ///         .expect("file creation should succeed"),
315    /// );
316    ///
317    /// let pool_cfg = bufpool::BufPoolCfg {
318    ///     buffer_size: utils::BufferSize::S128,
319    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
320    /// };
321    /// let pool = bufpool::BufPool::new(pool_cfg);
322    ///
323    /// let pipe_cfg = wpipe::WritePipeCfg {
324    ///     module_id: MODULE_ID,
325    ///     flush_duration: time::Duration::from_millis(1),
326    /// };
327    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
328    ///     .expect("pipe creation should succeed");
329    ///
330    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
331    /// let allocation = pool.allocate(1);
332    ///
333    /// unsafe {ptr::copy_nonoverlapping(
334    ///     payload.as_ptr(),
335    ///     allocation.first(),
336    ///     payload.len()
337    /// )};
338    ///
339    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
340    ///
341    /// assert!(
342    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
343    ///     .is_ok()
344    /// );
345    /// ```
346    #[inline]
347    pub fn write(&self, request: WriteRequest) -> FrozenResult<WriteTicket> {
348        let _io_lock = self.core.acquire_shared_io_lock();
349        if let Some(frozen_error) = self.core.completion.error.get() {
350            return Err(frozen_error);
351        }
352
353        let epoch = self.core.increment_current_epoch();
354        let internal_req = WriteRequestInternal { request, epoch };
355        self.core.queue.push(internal_req);
356
357        Ok(WriteTicket { epoch, completion: self.core.completion.clone() })
358    }
359}
360
361impl Drop for WritePipe {
362    fn drop(&mut self) {
363        self.core.closed.store(true, atomic::Ordering::Release);
364        self.core.flush_cv.notify_one();
365
366        if let Some(handle) = self.flush_tx_handle.take() {
367            let _ = handle.join();
368        }
369    }
370}
371
372/// A write operation submitted to [`WritePipe`]
373///
374/// The request contains the buffers to persist along with the destination slot index in the
375/// underlying [`FrozenFile`].
376///
377/// ## Example
378///
379/// ```
380/// use frozen_core::{bufpool, utils, wpipe};
381/// use std::ptr;
382///
383/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
384///
385/// let pool_cfg = bufpool::BufPoolCfg {
386///     buffer_size: utils::BufferSize::S128,
387///     max_memory: 0x400 * BUFFER_SIZE as usize,
388/// };
389/// let pool = bufpool::BufPool::new(pool_cfg);
390///
391/// let payload = vec![0x0A; BUFFER_SIZE as usize];
392/// let allocation = pool.allocate(1);
393///
394/// unsafe {ptr::copy_nonoverlapping(
395///     payload.as_ptr(),
396///     allocation.first(),
397///     payload.len()
398/// )};
399///
400/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
401/// assert!(request.slot_index >= 0);
402/// ```
403#[derive(Debug)]
404pub struct WriteRequest {
405    /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
406    ///
407    /// ## Example
408    ///
409    /// ```
410    /// use frozen_core::{bufpool, utils, wpipe};
411    /// use std::ptr;
412    ///
413    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
414    ///
415    /// let pool_cfg = bufpool::BufPoolCfg {
416    ///     buffer_size: utils::BufferSize::S128,
417    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
418    /// };
419    /// let pool = bufpool::BufPool::new(pool_cfg);
420    ///
421    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
422    /// let allocation = pool.allocate(1);
423    ///
424    /// unsafe {ptr::copy_nonoverlapping(
425    ///     payload.as_ptr(),
426    ///     allocation.first(),
427    ///     payload.len()
428    /// )};
429    ///
430    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
431    /// assert!(!request.allocation.first().is_null());
432    /// ```
433    pub allocation: bufpool::BufPoolAllocation,
434
435    /// Destination slot index where the pages of the allocation will be written from
436    ///
437    /// ## Example
438    ///
439    /// ```
440    /// use frozen_core::{bufpool, utils, wpipe};
441    /// use std::ptr;
442    ///
443    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
444    ///
445    /// let pool_cfg = bufpool::BufPoolCfg {
446    ///     buffer_size: utils::BufferSize::S128,
447    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
448    /// };
449    /// let pool = bufpool::BufPool::new(pool_cfg);
450    ///
451    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
452    /// let allocation = pool.allocate(1);
453    ///
454    /// unsafe {ptr::copy_nonoverlapping(
455    ///     payload.as_ptr(),
456    ///     allocation.first(),
457    ///     payload.len()
458    /// )};
459    ///
460    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
461    /// assert!(request.slot_index >= 0);
462    /// ```
463    pub slot_index: usize,
464}
465
466/// Durability handle associated with the submitted write operation via [`WritePipe::write`]
467///
468/// ## Epoch
469///
470/// Every ticket is assigned a monotonically increasing epoch to moniter durability
471///
472/// ## Durability Guarantee
473///
474/// If wanted, the ticket could be awaited to poll till the epoch becomes durable.
475///
476/// Once a await on ticket is completed successfully, all writes assigned to earlier epochs are
477/// also guaranteed to be durable.
478///
479/// *NOTE:* Using `await` is optional. Callers that only require fire-and-forget semantics may
480/// simply discard the returned ticket.
481///
482/// ## Example
483///
484/// ```
485/// use frozen_core::{bufpool, ffile, utils, wpipe};
486/// use std::{ptr, sync, time};
487///
488/// const MODULE_ID: u8 = 0x00;
489/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
490///
491/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
492/// let path = dir.path().join("wpipe_example");
493///
494/// let file_cfg = ffile::FFCfg {
495///     path,
496///     initial_chunk_amount: 0x400,
497///     chunk_size: BUFFER_SIZE as usize,
498/// };
499/// let file = sync::Arc::new(
500///     ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
501///         .expect("file creation should succeed"),
502/// );
503///
504/// let pool_cfg = bufpool::BufPoolCfg {
505///     buffer_size: utils::BufferSize::S128,
506///     max_memory: 0x400 * BUFFER_SIZE as usize,
507/// };
508/// let pool = bufpool::BufPool::new(pool_cfg);
509///
510/// let pipe_cfg = wpipe::WritePipeCfg {
511///     module_id: MODULE_ID,
512///     flush_duration: time::Duration::from_millis(1),
513/// };
514/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
515///     .expect("pipe creation should succeed");
516///
517/// let payload = vec![0x0A; BUFFER_SIZE as usize];
518/// let allocation = pool.allocate(1);
519///
520/// unsafe {ptr::copy_nonoverlapping(
521///     payload.as_ptr(),
522///     allocation.first(),
523///     payload.len()
524/// )};
525///
526/// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
527///     .unwrap();
528/// let epoch = ticket.epoch();
529///
530/// assert!(epoch > 0);
531/// ```
532#[derive(Debug)]
533pub struct WriteTicket {
534    epoch: u64,
535    completion: sync::Arc<Completion>,
536}
537
538impl WriteTicket {
539    /// Read assigned durability epoch for the [`WriteTicket`]
540    ///
541    /// ## Example
542    ///
543    /// ```
544    /// use frozen_core::{bufpool, ffile, utils, wpipe};
545    /// use std::{ptr, sync, time};
546    ///
547    /// const MODULE_ID: u8 = 0x00;
548    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
549    ///
550    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
551    /// let path = dir.path().join("wpipe_example");
552    ///
553    /// let file_cfg = ffile::FFCfg {
554    ///     path,
555    ///     initial_chunk_amount: 0x400,
556    ///     chunk_size: BUFFER_SIZE as usize,
557    /// };
558    /// let file = sync::Arc::new(
559    ///     ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
560    ///         .expect("file creation should succeed"),
561    /// );
562    ///
563    /// let pool_cfg = bufpool::BufPoolCfg {
564    ///     buffer_size: utils::BufferSize::S128,
565    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
566    /// };
567    /// let pool = bufpool::BufPool::new(pool_cfg);
568    ///
569    /// let pipe_cfg = wpipe::WritePipeCfg {
570    ///     module_id: MODULE_ID,
571    ///     flush_duration: time::Duration::from_millis(1),
572    /// };
573    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
574    ///     .expect("pipe creation should succeed");
575    ///
576    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
577    /// let allocation = pool.allocate(1);
578    ///
579    /// unsafe {ptr::copy_nonoverlapping(
580    ///     payload.as_ptr(),
581    ///     allocation.first(),
582    ///     payload.len()
583    /// )};
584    ///
585    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
586    ///     .unwrap();
587    /// let epoch = ticket.epoch();
588    ///
589    /// assert!(epoch > 0);
590    /// ```
591    pub const fn epoch(&self) -> u64 {
592        self.epoch
593    }
594
595    #[inline]
596    fn is_ready(&self) -> bool {
597        let durable = self.completion.durable_epoch.load(atomic::Ordering::Acquire);
598        if hints::likely(durable >= self.epoch) {
599            return true;
600        }
601
602        false
603    }
604}
605
606impl std::future::Future for WriteTicket {
607    type Output = FrozenResult<u64>;
608
609    #[inline(always)]
610    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
611        if self.is_ready() {
612            return std::task::Poll::Ready(Ok(self.epoch));
613        }
614
615        if let Some(frozen_error) = self.completion.error.get() {
616            return std::task::Poll::Ready(Err(frozen_error));
617        }
618
619        self.completion.waker.register(cx.waker());
620
621        if self.is_ready() {
622            return std::task::Poll::Ready(Ok(self.epoch));
623        }
624
625        if let Some(frozen_error) = self.completion.error.get() {
626            return std::task::Poll::Ready(Err(frozen_error));
627        }
628
629        std::task::Poll::Pending
630    }
631}
632
633/// ## Why we ignore [`std::sync::PoisonError`]?
634///
635/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not protect any mutable
636/// state. All the pool invariants and accounting are maintained via atomics and are completely seperated from
637/// the mutex.
638///
639/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates an inconsistent
640/// state of the protected value. Since no state can be left partially modified under this lock, there is no
641/// possible consistency risk to recover from and propagating the poison error would only introduce unnecessary
642/// failures into the allocation path.
643///
644/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating with the recovered
645/// guard.
646fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
647    let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
648    loop {
649        (guard, _) = core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
650
651        // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
652        // otherwise, we impose serious deadlock sort of situation for the the flusher tx
653
654        let queued_ops = core.queue.drain();
655        let closed = core.closed.load(atomic::Ordering::Acquire);
656
657        if queued_ops.is_empty() {
658            if closed {
659                return;
660            }
661
662            continue;
663        }
664
665        // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
666        // while sync is in progress
667        let _io_lock = core.acquire_exclusive_io_lock();
668
669        let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
670            Ok(res) => res,
671            Err(new_error) => {
672                core.completion.error.set(new_error);
673                core.completion.waker.wake();
674                drop(_io_lock);
675
676                continue;
677            }
678        };
679
680        // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
681        #[cfg(target_os = "linux")]
682        if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
683            core.completion.error.set(new_error);
684            core.completion.waker.wake();
685            drop(_io_lock);
686
687            continue;
688        }
689
690        // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
691        // clear it up when another call succeeds.
692        //
693        // This is valid as the underlying sync flushes entire batch all at once, hence even if the
694        // last call failed, and the new one succeeded, we do get the durability guarantee for the
695        // old data as well.
696
697        if let Err(new_error) = core.file.sync() {
698            core.completion.error.set(new_error);
699            core.completion.waker.wake();
700            drop(_io_lock);
701
702            continue;
703        } else {
704            core.completion.mark_epoch_as_durable(max_epoch);
705            core.completion.error.del();
706        }
707    }
708}
709
710#[derive(Debug)]
711struct Core {
712    completion: sync::Arc<Completion>,
713    closed: atomic::AtomicBool,
714    epoch: atomic::AtomicU64,
715    file: sync::Arc<ffile::FrozenFile>,
716    flush_cv: sync::Condvar,
717    flush_guard: sync::Mutex<()>,
718    io_lock: sync::RwLock<()>,
719    queue: mpscq::MPSCQueue<WriteRequestInternal>,
720}
721
722impl Core {
723    fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
724        Self {
725            file,
726            completion: sync::Arc::new(Completion::default()),
727            closed: atomic::AtomicBool::new(false),
728            epoch: atomic::AtomicU64::new(0),
729            flush_cv: sync::Condvar::new(),
730            flush_guard: sync::Mutex::new(()),
731            io_lock: sync::RwLock::new(()),
732            queue: mpscq::MPSCQueue::default(),
733        }
734    }
735
736    #[inline]
737    fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
738        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
739        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
740    }
741
742    #[inline]
743    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
744        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
745        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
746    }
747
748    #[inline(always)]
749    fn write_queued_ops(&self, queued_ops: Vec<WriteRequestInternal>) -> FrozenResult<(usize, usize, u64)> {
750        let mut max_epoch = 0;
751        let mut max_index = 0;
752        let mut min_index = usize::MAX;
753
754        for op in queued_ops {
755            let ops_len = op.request.allocation.length();
756            match ops_len {
757                1 => {
758                    self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
759                }
760                _ => {
761                    let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
762                    self.file.pwritev(&bufs, op.request.slot_index)?;
763                }
764            }
765
766            max_epoch = max_epoch.max(op.epoch);
767            min_index = min_index.min(op.request.slot_index);
768            max_index = max_index.max(op.request.slot_index + ops_len);
769        }
770
771        Ok((min_index, max_index, max_epoch))
772    }
773
774    #[inline]
775    fn increment_current_epoch(&self) -> u64 {
776        self.epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
777    }
778}
779
780#[derive(Debug)]
781struct WriteRequestInternal {
782    epoch: u64,
783    request: WriteRequest,
784}
785
786#[derive(Debug)]
787struct Completion {
788    durable_epoch: atomic::AtomicU64,
789    error: FlushError,
790    waker: futures::task::AtomicWaker,
791}
792
793impl Default for Completion {
794    fn default() -> Self {
795        Self {
796            durable_epoch: atomic::AtomicU64::new(0),
797            waker: futures::task::AtomicWaker::new(),
798            error: FlushError::default(),
799        }
800    }
801}
802
803impl Completion {
804    fn mark_epoch_as_durable(&self, epoch: u64) {
805        self.durable_epoch.store(epoch, atomic::Ordering::Release);
806        self.waker.wake();
807    }
808}
809
810#[derive(Debug)]
811struct FlushError(atomic::AtomicPtr<FrozenError>);
812
813impl Default for FlushError {
814    fn default() -> Self {
815        Self(atomic::AtomicPtr::new(ptr::null_mut()))
816    }
817}
818
819impl Drop for FlushError {
820    fn drop(&mut self) {
821        let err_ptr = self.0.load(atomic::Ordering::Acquire);
822        if !err_ptr.is_null() {
823            let _ = unsafe { Box::from_raw(err_ptr) };
824        }
825    }
826}
827
828impl FlushError {
829    #[inline]
830    fn get(&self) -> Option<FrozenError> {
831        let curr_err = self.0.load(atomic::Ordering::Acquire);
832        if hints::unlikely(!curr_err.is_null()) {
833            let frozen_error = unsafe { (*curr_err).clone() };
834            return Some(frozen_error);
835        }
836
837        None
838    }
839
840    #[inline]
841    fn set(&self, new_error: FrozenError) {
842        let boxed_error = Box::into_raw(Box::new(new_error));
843        let old_err = self.0.swap(boxed_error, atomic::Ordering::AcqRel);
844
845        if hints::unlikely(!old_err.is_null()) {
846            let _ = unsafe { Box::from_raw(old_err) };
847        }
848    }
849
850    #[inline]
851    fn del(&self) {
852        let old_err = self.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
853        if hints::unlikely(!old_err.is_null()) {
854            let _ = unsafe { Box::from_raw(old_err) };
855        }
856    }
857}
858
859mod err {
860    use crate::error::{ErrCode, FrozenError};
861
862    /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
863    const DOMAIN_ID: u8 = 0x02;
864
865    #[inline]
866    pub fn new_error<E: std::fmt::Display>(module_id: u8, code: ErrCode, observed_error: E) -> FrozenError {
867        FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
868    }
869
870    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876    use crate::utils::BufferSize;
877
878    const MODULE_ID: u8 = 0x00;
879    const BUFFER_SIZE: BufferSize = BufferSize::S128;
880    const INITIAL_BUFFER_AMOUT: usize = 0x200;
881    const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
882
883    fn new_objects<P: AsRef<std::path::Path>>(path: P) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
884        let file_cfg = ffile::FFCfg {
885            path: path.as_ref().to_path_buf(),
886            chunk_size: BUFFER_SIZE as usize,
887            initial_chunk_amount: INITIAL_BUFFER_AMOUT,
888        };
889        let file = sync::Arc::new(ffile::FrozenFile::new::<MODULE_ID>(file_cfg).unwrap());
890
891        let pool_cfg =
892            bufpool::BufPoolCfg { buffer_size: BUFFER_SIZE, max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize };
893        let pool = bufpool::BufPool::new(pool_cfg);
894
895        let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
896        let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
897
898        (file, pool, pipe)
899    }
900
901    fn prep_write(buf_ptr: *const u8, n: usize, pool: &bufpool::BufPool) -> bufpool::BufPoolAllocation {
902        let allocation = pool.allocate(n);
903        for allocated_buf in allocation.iter() {
904            unsafe { ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
905        }
906
907        allocation
908    }
909
910    fn compare_with_readback(
911        buf: &[u8],
912        read_index: usize,
913        required: usize,
914        pool: &bufpool::BufPool,
915        file: &ffile::FrozenFile,
916    ) {
917        let read_allocation = pool.allocate(required);
918        let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
919
920        file.preadv(&read_bufs, read_index).unwrap();
921
922        for read_buf in read_allocation.iter() {
923            let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
924            assert_eq!(buf, observed);
925        }
926    }
927
928    mod lifecycle {
929        use super::*;
930
931        #[test]
932        fn ok_new() {
933            let dir = tempfile::tempdir().unwrap();
934            let path = dir.path().join("write_single");
935
936            let file_cfg =
937                ffile::FFCfg { path, chunk_size: BUFFER_SIZE as usize, initial_chunk_amount: INITIAL_BUFFER_AMOUT };
938            let file = sync::Arc::new(ffile::FrozenFile::new::<MODULE_ID>(file_cfg).unwrap());
939
940            let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
941            assert!(WritePipe::new(pipe_cfg, file).is_ok());
942        }
943
944        #[test]
945        fn ok_drop() {
946            let dir = tempfile::tempdir().unwrap();
947            let path = dir.path().join("write_single");
948            let (_file, _, pipe) = new_objects(path);
949
950            drop(pipe);
951        }
952    }
953
954    mod shutdown {
955        use super::*;
956
957        #[test]
958        fn ok_drop_before_pending_write_call() {
959            let dir = tempfile::tempdir().unwrap();
960            let path = dir.path().join("write_single");
961            let (_file, pool, pipe) = new_objects(path);
962
963            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
964
965            let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
966            let request = WriteRequest { allocation, slot_index: 0 };
967
968            assert!(pipe.write(request).is_ok());
969            drop(pipe);
970        }
971
972        #[test]
973        fn ok_drop_waits_for_pending_write_call() {
974            let dir = tempfile::tempdir().unwrap();
975            let path = dir.path().join("write_single");
976
977            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
978
979            // new + write + drop
980            {
981                let (_file, pool, pipe) = new_objects(path.clone());
982
983                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
984                let request = WriteRequest { allocation, slot_index: 0 };
985
986                assert!(pipe.write(request).is_ok());
987                drop(pipe);
988            }
989
990            // open + readback
991            {
992                let (file, pool, _) = new_objects(path);
993                compare_with_readback(&BUFFER, 0, 1, &pool, &file);
994            }
995        }
996
997        #[test]
998        fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
999            let dir = tempfile::tempdir().unwrap();
1000            let path = dir.path().join("write_single");
1001            let (_file, pool, pipe) = new_objects(path);
1002
1003            for i in 0..INITIAL_BUFFER_AMOUT {
1004                let buffer = vec![i as u8; BUFFER_SIZE as usize];
1005                let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1006                let request = WriteRequest { allocation, slot_index: 0 };
1007
1008                assert!(pipe.write(request).is_ok());
1009            }
1010
1011            drop(pipe);
1012        }
1013
1014        #[test]
1015        fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
1016            let dir = tempfile::tempdir().unwrap();
1017            let path = dir.path().join("write_single");
1018            let (_file, pool, pipe) = new_objects(path);
1019
1020            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1021
1022            let pipe = sync::Arc::new(pipe);
1023            let pipe2 = sync::Arc::clone(&pipe);
1024
1025            let handle = thread::spawn(move || {
1026                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1027                let request = WriteRequest { allocation, slot_index: 0 };
1028
1029                assert!(pipe2.write(request).is_ok());
1030            });
1031
1032            drop(pipe);
1033            handle.join().unwrap();
1034        }
1035    }
1036
1037    mod pipe_writes {
1038        use super::*;
1039
1040        #[test]
1041        fn ok_write() {
1042            let dir = tempfile::tempdir().unwrap();
1043            let path = dir.path().join("write_single");
1044            let (_file, pool, pipe) = new_objects(path);
1045
1046            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1047            let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
1048
1049            let request = WriteRequest { allocation, slot_index: 0 };
1050            assert!(pipe.write(request).is_ok());
1051        }
1052
1053        #[test]
1054        fn ok_write_epoch_is_monotonic() {
1055            let dir = tempfile::tempdir().unwrap();
1056            let path = dir.path().join("write_single");
1057            let (_file, pool, pipe) = new_objects(path);
1058
1059            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1060
1061            let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
1062            let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1063
1064            let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
1065            let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
1066
1067            let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
1068            let ticket3 = pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
1069
1070            assert!(ticket3.epoch() > ticket2.epoch());
1071            assert!(ticket2.epoch() > ticket1.epoch());
1072        }
1073    }
1074
1075    mod write_ticket {
1076        use super::*;
1077
1078        #[test]
1079        fn ok_readback_after_write_with_await() {
1080            let dir = tempfile::tempdir().unwrap();
1081            let path = dir.path().join("write_single");
1082            let (file, pool, pipe) = new_objects(path);
1083
1084            const REQUIRED: usize = 0x0A;
1085            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1086
1087            let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1088            let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
1089
1090            let ticket = pipe.write(request).unwrap();
1091            let ticket_epoch = ticket.epoch();
1092
1093            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1094            assert!(durable_epoch >= ticket_epoch);
1095
1096            compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
1097        }
1098
1099        #[test]
1100        fn ok_readback_after_batch_write() {
1101            let dir = tempfile::tempdir().unwrap();
1102            let path = dir.path().join("write_single");
1103            let (file, pool, pipe) = new_objects(path);
1104
1105            const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
1106                ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
1107                ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
1108                ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
1109                ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
1110                ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
1111            ];
1112
1113            let mut slot_index = 0;
1114            let mut latest_ticket = None;
1115
1116            for (buf, required) in BUFFERS {
1117                let allocation = prep_write(buf.as_ptr(), required, &pool);
1118                let request = WriteRequest { allocation, slot_index };
1119                let ticket = pipe.write(request).unwrap();
1120
1121                slot_index += required;
1122                latest_ticket = Some(ticket);
1123            }
1124
1125            assert!(latest_ticket.is_some());
1126
1127            if let Some(ticket) = latest_ticket {
1128                let ticket_epoch = ticket.epoch();
1129                let durable_epoch = futures::executor::block_on(ticket).unwrap();
1130
1131                assert!(durable_epoch >= ticket_epoch);
1132            }
1133
1134            let mut read_index = 0;
1135            for (buf, required) in BUFFERS {
1136                compare_with_readback(&buf, read_index, required, &pool, &file);
1137                read_index += required;
1138            }
1139        }
1140
1141        #[test]
1142        fn ok_multiple_concurrent_awaits() {
1143            let dir = tempfile::tempdir().unwrap();
1144            let path = dir.path().join("write_single");
1145            let (_file, pool, pipe) = new_objects(path);
1146
1147            const REQUIRED: usize = 0x0A;
1148            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1149
1150            let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1151            let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1152
1153            let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1154            let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
1155
1156            let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
1157
1158            assert!(e1.is_ok());
1159            assert!(e2.is_ok());
1160            assert!(e2.unwrap() > e1.unwrap());
1161        }
1162
1163        #[test]
1164        fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
1165            let dir = tempfile::tempdir().unwrap();
1166            let path = dir.path().join("durability_boundary");
1167
1168            let (file, pool, pipe) = new_objects(path);
1169
1170            const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1171            const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
1172            const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
1173
1174            let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
1175            let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
1176
1177            let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
1178            let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
1179
1180            let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
1181            let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
1182
1183            let epoch_a = ticket_a.epoch();
1184            let epoch_b = ticket_b.epoch();
1185            let epoch_c = ticket_c.epoch();
1186
1187            let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
1188            assert!(durable_epoch >= epoch_c);
1189            assert!(durable_epoch >= epoch_b);
1190            assert!(durable_epoch >= epoch_a);
1191
1192            compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
1193            compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
1194            compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
1195        }
1196    }
1197
1198    mod concurrency {
1199        use super::*;
1200
1201        #[test]
1202        fn ok_multi_threaded_writers() {
1203            const THREADS: usize = 4;
1204            const WRITES_PER_THREAD: usize = 0x40;
1205            const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
1206
1207            let dir = tempfile::tempdir().unwrap();
1208            let path = dir.path().join("multi_threaded_writers");
1209
1210            let (_file, pool, pipe) = new_objects(path);
1211
1212            let pipe = sync::Arc::new(pipe);
1213            let pool = sync::Arc::new(pool);
1214
1215            let mut handles = Vec::with_capacity(THREADS);
1216            for tid in 0..THREADS {
1217                let pipe = sync::Arc::clone(&pipe);
1218                let pool = sync::Arc::clone(&pool);
1219
1220                handles.push(thread::spawn(move || {
1221                    let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1222
1223                    for i in 0..WRITES_PER_THREAD {
1224                        let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1225                        let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1226                        let slot_index = tid * WRITES_PER_THREAD + i;
1227                        let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1228
1229                        tickets.push(ticket);
1230                    }
1231
1232                    tickets
1233                }));
1234            }
1235
1236            let mut tickets = Vec::new();
1237            for handle in handles {
1238                tickets.extend(handle.join().unwrap());
1239            }
1240            assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1241
1242            let mut epochs: Vec<u64> = tickets.iter().map(WriteTicket::epoch).collect();
1243            epochs.sort_unstable();
1244
1245            for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1246                assert_eq!(ed, observed);
1247            }
1248
1249            let latest_ticket = tickets.into_iter().max_by_key(WriteTicket::epoch).unwrap();
1250            let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1251            assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1252        }
1253    }
1254}