Skip to main content

frozen_core/
wpipe.rs

1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50    |         0.091 |          0.275 |
19//! | P90    |         0.092 |          0.458 |
20//! | P99    |         0.825 |          0.917 |
21//! | Mean   |         1.185 |          3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FrozenFileCfg {
45//!     path,
46//!     module_id: MODULE_ID,
47//!     initial_available_buffers: 0x400,
48//!     buffer_size: BUFFER_SIZE as usize,
49//! };
50//! let file = sync::Arc::new(
51//!     ffile::FrozenFile::new(file_cfg)
52//!         .expect("file creation should succeed"),
53//! );
54//!
55//! let pool_cfg = bufpool::BufPoolCfg {
56//!     buffer_size: utils::BufferSize::S128,
57//!     max_memory: 0x400 * BUFFER_SIZE as usize,
58//! };
59//! let pool = bufpool::BufPool::new(pool_cfg);
60//!
61//! let pipe_cfg = wpipe::WritePipeCfg {
62//!     module_id: MODULE_ID,
63//!     flush_duration: time::Duration::from_millis(1),
64//! };
65//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
66//!     .expect("pipe creation should succeed");
67//!
68//! let payload = [0xAAu8; BUFFER_SIZE as usize];
69//!
70//! let mut latest_ticket = None;
71//! for slot_index in 0..3 {
72//!     let allocation = pool.allocate(1);
73//!
74//!     unsafe {
75//!         ptr::copy_nonoverlapping(
76//!             payload.as_ptr(),
77//!             allocation.first(),
78//!             payload.len(),
79//!         );
80//!     }
81//!
82//!     let ticket = pipe
83//!         .write(wpipe::WriteRequest {
84//!             allocation,
85//!             slot_index,
86//!         })
87//!         .expect("write should succeed");
88//!
89//!     latest_ticket = Some(ticket);
90//! }
91//!
92//! let durable_epoch = futures::executor::block_on(
93//!     latest_ticket.expect("ticket should exist"),
94//! )
95//! .expect("writes should become durable");
96//!
97//! assert!(durable_epoch >= 3);
98//! ```
99
100use crate::{
101    bufpool,
102    error::{FrozenError, FrozenResult},
103    ffile, hints, mpscq,
104};
105use std::{
106    ptr,
107    sync::{self, atomic},
108    thread, time,
109};
110
111/// All the available configurations for [`WritePipe`]
112///
113/// ## Example
114///
115/// ```
116/// use frozen_core::wpipe::WritePipeCfg;
117///
118/// let cfg = WritePipeCfg {
119///     module_id: 2,
120///     flush_duration: std::time::Duration::from_millis(0x0A),
121/// };
122///
123/// assert_ne!(cfg.module_id, 0);
124/// assert_ne!(cfg.flush_duration.as_millis(), 0);
125/// ```
126#[derive(Debug, Clone)]
127pub struct WritePipeCfg {
128    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
129    pub module_id: u8,
130
131    /// Time interval used by the background thread to perform hard sync for all the write
132    /// operations submitted in the last durability window
133    pub flush_duration: time::Duration,
134}
135
136/// A low latency asynchronous write pipeline for buffer based storage
137///
138/// ## Design
139///
140/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
141/// pushing the bytes to be written into the MPSC queue.
142///
143/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
144/// hard sync right after. This provides durability for all the writes submitted within the same
145/// [`WritePipeCfg::flush_duration`] batching window.
146///
147/// ## Example
148///
149/// ```
150/// use frozen_core::{bufpool, ffile, utils, wpipe};
151/// use std::{ptr, sync, time};
152///
153/// const MODULE_ID: u8 = 0x00;
154/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
155///
156/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
157/// let path = dir.path().join("wpipe_example");
158///
159/// let file_cfg = ffile::FrozenFileCfg {
160///     path,
161///     module_id: MODULE_ID,
162///     initial_available_buffers: 0x400,
163///     buffer_size: BUFFER_SIZE as usize,
164/// };
165/// let file = sync::Arc::new(
166///     ffile::FrozenFile::new(file_cfg)
167///         .expect("file creation should succeed"),
168/// );
169///
170/// let pool_cfg = bufpool::BufPoolCfg {
171///     buffer_size: utils::BufferSize::S128,
172///     max_memory: 0x400 * BUFFER_SIZE as usize,
173/// };
174/// let pool = bufpool::BufPool::new(pool_cfg);
175///
176/// let pipe_cfg = wpipe::WritePipeCfg {
177///     module_id: MODULE_ID,
178///     flush_duration: time::Duration::from_millis(1),
179/// };
180/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
181///     .expect("pipe creation should succeed");
182///
183/// let payload = [0xAAu8; BUFFER_SIZE as usize];
184///
185/// let mut latest_ticket = None;
186/// for slot_index in 0..3 {
187///     let allocation = pool.allocate(1);
188///
189///     unsafe {
190///         ptr::copy_nonoverlapping(
191///             payload.as_ptr(),
192///             allocation.first(),
193///             payload.len(),
194///         );
195///     }
196///
197///     let ticket = pipe
198///         .write(wpipe::WriteRequest {
199///             allocation,
200///             slot_index,
201///         })
202///         .expect("write should succeed");
203///
204///     latest_ticket = Some(ticket);
205/// }
206///
207/// let durable_epoch = futures::executor::block_on(
208///     latest_ticket.expect("ticket should exist"),
209/// )
210/// .expect("writes should become durable");
211///
212/// assert!(durable_epoch >= 3);
213/// ```
214#[derive(Debug)]
215pub struct WritePipe {
216    core: sync::Arc<Core>,
217    flush_tx_handle: Option<thread::JoinHandle<()>>,
218}
219
220unsafe impl Send for WritePipe {}
221unsafe impl Sync for WritePipe {}
222
223impl WritePipe {
224    /// Create a new instance of [`WritePipe`]
225    ///
226    /// ## Example
227    ///
228    /// ```
229    /// use frozen_core::{bufpool, ffile, utils, wpipe};
230    /// use std::{ptr, sync, time};
231    ///
232    /// const MODULE_ID: u8 = 0x00;
233    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
234    ///
235    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
236    /// let path = dir.path().join("wpipe_example");
237    ///
238    /// let file_cfg = ffile::FrozenFileCfg {
239    ///     path,
240    ///     module_id: MODULE_ID,
241    ///     initial_available_buffers: 0x400,
242    ///     buffer_size: BUFFER_SIZE as usize,
243    /// };
244    /// let file = sync::Arc::new(
245    ///     ffile::FrozenFile::new(file_cfg)
246    ///         .expect("file creation should succeed"),
247    /// );
248    ///
249    /// let pool_cfg = bufpool::BufPoolCfg {
250    ///     buffer_size: utils::BufferSize::S128,
251    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
252    /// };
253    /// let pool = bufpool::BufPool::new(pool_cfg);
254    ///
255    /// let pipe_cfg = wpipe::WritePipeCfg {
256    ///     module_id: MODULE_ID,
257    ///     flush_duration: time::Duration::from_millis(1),
258    /// };
259    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
260    ///     .expect("pipe creation should succeed");
261    ///
262    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
263    /// let allocation = pool.allocate(1);
264    ///
265    /// unsafe {ptr::copy_nonoverlapping(
266    ///     payload.as_ptr(),
267    ///     allocation.first(),
268    ///     payload.len()
269    /// )};
270    ///
271    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
272    ///
273    /// assert!(
274    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
275    ///     .is_ok()
276    /// );
277    /// ```
278    #[inline]
279    pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
280        let core = sync::Arc::new(Core::new(file));
281        let cloned_core = core.clone();
282        let flush_tx_handle = match thread::Builder::new()
283            .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
284            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
285        {
286            Ok(handle) => Some(handle),
287            Err(observed_error) => {
288                return Err(err::new_error(cfg.module_id, err::FXE, observed_error));
289            }
290        };
291
292        Ok(Self { core: core, flush_tx_handle })
293    }
294
295    /// Push a write into [`WritePipe`]
296    ///
297    /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
298    /// wait for durability using the manual `await` on [`WriteTicket`].
299    ///
300    /// ## Example
301    ///
302    /// ```
303    /// use frozen_core::{bufpool, ffile, utils, wpipe};
304    /// use std::{ptr, sync, time};
305    ///
306    /// const MODULE_ID: u8 = 0x00;
307    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
308    ///
309    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
310    /// let path = dir.path().join("wpipe_example");
311    ///
312    /// let file_cfg = ffile::FrozenFileCfg {
313    ///     path,
314    ///     module_id: MODULE_ID,
315    ///     initial_available_buffers: 0x400,
316    ///     buffer_size: BUFFER_SIZE as usize,
317    /// };
318    /// let file = sync::Arc::new(
319    ///     ffile::FrozenFile::new(file_cfg)
320    ///         .expect("file creation should succeed"),
321    /// );
322    ///
323    /// let pool_cfg = bufpool::BufPoolCfg {
324    ///     buffer_size: utils::BufferSize::S128,
325    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
326    /// };
327    /// let pool = bufpool::BufPool::new(pool_cfg);
328    ///
329    /// let pipe_cfg = wpipe::WritePipeCfg {
330    ///     module_id: MODULE_ID,
331    ///     flush_duration: time::Duration::from_millis(1),
332    /// };
333    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
334    ///     .expect("pipe creation should succeed");
335    ///
336    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
337    /// let allocation = pool.allocate(1);
338    ///
339    /// unsafe {ptr::copy_nonoverlapping(
340    ///     payload.as_ptr(),
341    ///     allocation.first(),
342    ///     payload.len()
343    /// )};
344    ///
345    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
346    ///
347    /// assert!(
348    ///     futures::executor::block_on(ticket.expect("ticket should exist"))
349    ///     .is_ok()
350    /// );
351    /// ```
352    #[inline]
353    pub fn write(&self, request: WriteRequest) -> FrozenResult<WriteTicket> {
354        let _io_lock = self.core.acquire_shared_io_lock();
355        if let Some(frozen_error) = self.core.completion.error.get() {
356            return Err(frozen_error);
357        }
358
359        let epoch = self.core.increment_current_epoch();
360        let internal_req = WriteRequestInternal { request, epoch };
361        self.core.queue.push(internal_req);
362
363        Ok(WriteTicket { epoch, completion: self.core.completion.clone() })
364    }
365}
366
367impl Drop for WritePipe {
368    fn drop(&mut self) {
369        self.core.closed.store(true, atomic::Ordering::Release);
370        self.core.flush_cv.notify_one();
371
372        if let Some(handle) = self.flush_tx_handle.take() {
373            let _ = handle.join();
374        }
375    }
376}
377
378/// A write operation submitted to [`WritePipe`]
379///
380/// The request contains the buffers to persist along with the destination slot index in the
381/// underlying [`FrozenFile`].
382///
383/// ## Example
384///
385/// ```
386/// use frozen_core::{bufpool, utils, wpipe};
387/// use std::ptr;
388///
389/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
390///
391/// let pool_cfg = bufpool::BufPoolCfg {
392///     buffer_size: utils::BufferSize::S128,
393///     max_memory: 0x400 * BUFFER_SIZE as usize,
394/// };
395/// let pool = bufpool::BufPool::new(pool_cfg);
396///
397/// let payload = vec![0x0A; BUFFER_SIZE as usize];
398/// let allocation = pool.allocate(1);
399///
400/// unsafe {ptr::copy_nonoverlapping(
401///     payload.as_ptr(),
402///     allocation.first(),
403///     payload.len()
404/// )};
405///
406/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
407/// assert!(request.slot_index >= 0);
408/// ```
409#[derive(Debug)]
410pub struct WriteRequest {
411    /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
412    ///
413    /// ## Example
414    ///
415    /// ```
416    /// use frozen_core::{bufpool, utils, wpipe};
417    /// use std::ptr;
418    ///
419    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
420    ///
421    /// let pool_cfg = bufpool::BufPoolCfg {
422    ///     buffer_size: utils::BufferSize::S128,
423    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
424    /// };
425    /// let pool = bufpool::BufPool::new(pool_cfg);
426    ///
427    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
428    /// let allocation = pool.allocate(1);
429    ///
430    /// unsafe {ptr::copy_nonoverlapping(
431    ///     payload.as_ptr(),
432    ///     allocation.first(),
433    ///     payload.len()
434    /// )};
435    ///
436    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
437    /// assert!(!request.allocation.first().is_null());
438    /// ```
439    pub allocation: bufpool::BufPoolAllocation,
440
441    /// Destination slot index where the pages of the allocation will be written from
442    ///
443    /// ## Example
444    ///
445    /// ```
446    /// use frozen_core::{bufpool, utils, wpipe};
447    /// use std::ptr;
448    ///
449    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
450    ///
451    /// let pool_cfg = bufpool::BufPoolCfg {
452    ///     buffer_size: utils::BufferSize::S128,
453    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
454    /// };
455    /// let pool = bufpool::BufPool::new(pool_cfg);
456    ///
457    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
458    /// let allocation = pool.allocate(1);
459    ///
460    /// unsafe {ptr::copy_nonoverlapping(
461    ///     payload.as_ptr(),
462    ///     allocation.first(),
463    ///     payload.len()
464    /// )};
465    ///
466    /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
467    /// assert!(request.slot_index >= 0);
468    /// ```
469    pub slot_index: usize,
470}
471
472/// Durability handle associated with the submitted write operation via [`WritePipe::write`]
473///
474/// ## Epoch
475///
476/// Every ticket is assigned a monotonically increasing epoch to moniter durability
477///
478/// ## Durability Guarantee
479///
480/// If wanted, the ticket could be awaited to poll till the epoch becomes durable.
481///
482/// Once a await on ticket is completed successfully, all writes assigned to earlier epochs are
483/// also guaranteed to be durable.
484///
485/// *NOTE:* Using `await` is optional. Callers that only require fire-and-forget semantics may
486/// simply discard the returned ticket.
487///
488/// ## Example
489///
490/// ```
491/// use frozen_core::{bufpool, ffile, utils, wpipe};
492/// use std::{ptr, sync, time};
493///
494/// const MODULE_ID: u8 = 0x00;
495/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
496///
497/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
498/// let path = dir.path().join("wpipe_example");
499///
500/// let file_cfg = ffile::FrozenFileCfg {
501///     path,
502///     module_id: MODULE_ID,
503///     initial_available_buffers: 0x400,
504///     buffer_size: BUFFER_SIZE as usize,
505/// };
506/// let file = sync::Arc::new(
507///     ffile::FrozenFile::new(file_cfg)
508///         .expect("file creation should succeed"),
509/// );
510///
511/// let pool_cfg = bufpool::BufPoolCfg {
512///     buffer_size: utils::BufferSize::S128,
513///     max_memory: 0x400 * BUFFER_SIZE as usize,
514/// };
515/// let pool = bufpool::BufPool::new(pool_cfg);
516///
517/// let pipe_cfg = wpipe::WritePipeCfg {
518///     module_id: MODULE_ID,
519///     flush_duration: time::Duration::from_millis(1),
520/// };
521/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
522///     .expect("pipe creation should succeed");
523///
524/// let payload = vec![0x0A; BUFFER_SIZE as usize];
525/// let allocation = pool.allocate(1);
526///
527/// unsafe {ptr::copy_nonoverlapping(
528///     payload.as_ptr(),
529///     allocation.first(),
530///     payload.len()
531/// )};
532///
533/// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
534///     .unwrap();
535/// let epoch = ticket.epoch();
536///
537/// assert!(epoch > 0);
538/// ```
539#[derive(Debug)]
540pub struct WriteTicket {
541    epoch: u64,
542    completion: sync::Arc<Completion>,
543}
544
545impl WriteTicket {
546    /// Read assigned durability epoch for the [`WriteTicket`]
547    ///
548    /// ## Example
549    ///
550    /// ```
551    /// use frozen_core::{bufpool, ffile, utils, wpipe};
552    /// use std::{ptr, sync, time};
553    ///
554    /// const MODULE_ID: u8 = 0x00;
555    /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
556    ///
557    /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
558    /// let path = dir.path().join("wpipe_example");
559    ///
560    /// let file_cfg = ffile::FrozenFileCfg {
561    ///     path,
562    ///     module_id: MODULE_ID,
563    ///     initial_available_buffers: 0x400,
564    ///     buffer_size: BUFFER_SIZE as usize,
565    /// };
566    /// let file = sync::Arc::new(
567    ///     ffile::FrozenFile::new(file_cfg)
568    ///         .expect("file creation should succeed"),
569    /// );
570    ///
571    /// let pool_cfg = bufpool::BufPoolCfg {
572    ///     buffer_size: utils::BufferSize::S128,
573    ///     max_memory: 0x400 * BUFFER_SIZE as usize,
574    /// };
575    /// let pool = bufpool::BufPool::new(pool_cfg);
576    ///
577    /// let pipe_cfg = wpipe::WritePipeCfg {
578    ///     module_id: MODULE_ID,
579    ///     flush_duration: time::Duration::from_millis(1),
580    /// };
581    /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
582    ///     .expect("pipe creation should succeed");
583    ///
584    /// let payload = vec![0x0A; BUFFER_SIZE as usize];
585    /// let allocation = pool.allocate(1);
586    ///
587    /// unsafe {ptr::copy_nonoverlapping(
588    ///     payload.as_ptr(),
589    ///     allocation.first(),
590    ///     payload.len()
591    /// )};
592    ///
593    /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
594    ///     .unwrap();
595    /// let epoch = ticket.epoch();
596    ///
597    /// assert!(epoch > 0);
598    /// ```
599    pub const fn epoch(&self) -> u64 {
600        self.epoch
601    }
602
603    #[inline]
604    fn is_ready(&self) -> bool {
605        let durable = self.completion.durable_epoch.load(atomic::Ordering::Acquire);
606        if hints::likely(durable >= self.epoch) {
607            return true;
608        }
609
610        false
611    }
612}
613
614impl std::future::Future for WriteTicket {
615    type Output = FrozenResult<u64>;
616
617    #[inline(always)]
618    fn poll(
619        self: std::pin::Pin<&mut Self>,
620        cx: &mut std::task::Context<'_>,
621    ) -> std::task::Poll<Self::Output> {
622        if self.is_ready() {
623            return std::task::Poll::Ready(Ok(self.epoch));
624        }
625
626        if let Some(frozen_error) = self.completion.error.get() {
627            return std::task::Poll::Ready(Err(frozen_error));
628        }
629
630        self.completion.waker.register(cx.waker());
631
632        if self.is_ready() {
633            return std::task::Poll::Ready(Ok(self.epoch));
634        }
635
636        if let Some(frozen_error) = self.completion.error.get() {
637            return std::task::Poll::Ready(Err(frozen_error));
638        }
639
640        std::task::Poll::Pending
641    }
642}
643
644/// ## Why we ignore [`std::sync::PoisonError`]?
645///
646/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
647/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
648/// and are completely seperated from the mutex.
649///
650/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
651/// an inconsistent state of the protected value. Since no state can be left partially modified
652/// under this lock, there is no possible consistency risk to recover from and propagating the
653/// poison error would only introduce unnecessary failures into the allocation path.
654///
655/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
656/// with the recovered guard.
657fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
658    let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
659    loop {
660        (guard, _) =
661            core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
662
663        // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
664        // otherwise, we impose serious deadlock sort of situation for the the flusher tx
665
666        let queued_ops = core.queue.drain();
667        let closed = core.closed.load(atomic::Ordering::Acquire);
668
669        if queued_ops.is_empty() {
670            if closed {
671                return;
672            }
673
674            continue;
675        }
676
677        // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
678        // while sync is in progress
679        let _io_lock = core.acquire_exclusive_io_lock();
680
681        let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
682            Ok(res) => res,
683            Err(new_error) => {
684                core.completion.error.set(new_error);
685                core.completion.waker.wake();
686                drop(_io_lock);
687
688                continue;
689            }
690        };
691
692        // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
693        #[cfg(target_os = "linux")]
694        if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
695            core.completion.error.set(new_error);
696            core.completion.waker.wake();
697            drop(_io_lock);
698
699            continue;
700        }
701
702        // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
703        // clear it up when another call succeeds.
704        //
705        // This is valid as the underlying sync flushes entire batch all at once, hence even if the
706        // last call failed, and the new one succeeded, we do get the durability guarantee for the
707        // old data as well.
708
709        if let Err(new_error) = core.file.sync() {
710            core.completion.error.set(new_error);
711            core.completion.waker.wake();
712            drop(_io_lock);
713
714            continue;
715        } else {
716            core.completion.mark_epoch_as_durable(max_epoch);
717            core.completion.error.del();
718        }
719    }
720}
721
722#[derive(Debug)]
723struct Core {
724    completion: sync::Arc<Completion>,
725    closed: atomic::AtomicBool,
726    epoch: atomic::AtomicU64,
727    file: sync::Arc<ffile::FrozenFile>,
728    flush_cv: sync::Condvar,
729    flush_guard: sync::Mutex<()>,
730    io_lock: sync::RwLock<()>,
731    queue: mpscq::MPSCQueue<WriteRequestInternal>,
732}
733
734impl Core {
735    fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
736        Self {
737            file,
738            completion: sync::Arc::new(Completion::default()),
739            closed: atomic::AtomicBool::new(false),
740            epoch: atomic::AtomicU64::new(0),
741            flush_cv: sync::Condvar::new(),
742            flush_guard: sync::Mutex::new(()),
743            io_lock: sync::RwLock::new(()),
744            queue: mpscq::MPSCQueue::default(),
745        }
746    }
747
748    #[inline]
749    fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
750        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
751        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
752    }
753
754    #[inline]
755    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
756        // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
757        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
758    }
759
760    #[inline(always)]
761    fn write_queued_ops(
762        &self,
763        queued_ops: Vec<WriteRequestInternal>,
764    ) -> FrozenResult<(usize, usize, u64)> {
765        let mut max_epoch = 0;
766        let mut max_index = 0;
767        let mut min_index = usize::MAX;
768
769        for op in queued_ops {
770            let ops_len = op.request.allocation.length();
771            match ops_len {
772                1 => {
773                    self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
774                }
775                _ => {
776                    let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
777                    self.file.pwritev(&bufs, op.request.slot_index)?;
778                }
779            }
780
781            max_epoch = max_epoch.max(op.epoch);
782            min_index = min_index.min(op.request.slot_index);
783            max_index = max_index.max(op.request.slot_index + ops_len);
784        }
785
786        Ok((min_index, max_index, max_epoch))
787    }
788
789    #[inline]
790    fn increment_current_epoch(&self) -> u64 {
791        self.epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
792    }
793}
794
795#[derive(Debug)]
796struct WriteRequestInternal {
797    epoch: u64,
798    request: WriteRequest,
799}
800
801#[derive(Debug)]
802struct Completion {
803    durable_epoch: atomic::AtomicU64,
804    error: FlushError,
805    waker: futures::task::AtomicWaker,
806}
807
808impl Default for Completion {
809    fn default() -> Self {
810        Self {
811            durable_epoch: atomic::AtomicU64::new(0),
812            waker: futures::task::AtomicWaker::new(),
813            error: FlushError::default(),
814        }
815    }
816}
817
818impl Completion {
819    fn mark_epoch_as_durable(&self, epoch: u64) {
820        self.durable_epoch.store(epoch, atomic::Ordering::Release);
821        self.waker.wake();
822    }
823}
824
825#[derive(Debug)]
826struct FlushError(atomic::AtomicPtr<FrozenError>);
827
828impl Default for FlushError {
829    fn default() -> Self {
830        Self(atomic::AtomicPtr::new(ptr::null_mut()))
831    }
832}
833
834impl Drop for FlushError {
835    fn drop(&mut self) {
836        let err_ptr = self.0.load(atomic::Ordering::Acquire);
837        if !err_ptr.is_null() {
838            let _ = unsafe { Box::from_raw(err_ptr) };
839        }
840    }
841}
842
843impl FlushError {
844    #[inline]
845    fn get(&self) -> Option<FrozenError> {
846        let curr_err = self.0.load(atomic::Ordering::Acquire);
847        if hints::unlikely(!curr_err.is_null()) {
848            let frozen_error = unsafe { (*curr_err).clone() };
849            return Some(frozen_error);
850        }
851
852        None
853    }
854
855    #[inline]
856    fn set(&self, new_error: FrozenError) {
857        let boxed_error = Box::into_raw(Box::new(new_error));
858        let old_err = self.0.swap(boxed_error, atomic::Ordering::AcqRel);
859
860        if hints::unlikely(!old_err.is_null()) {
861            let _ = unsafe { Box::from_raw(old_err) };
862        }
863    }
864
865    #[inline]
866    fn del(&self) {
867        let old_err = self.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
868        if hints::unlikely(!old_err.is_null()) {
869            let _ = unsafe { Box::from_raw(old_err) };
870        }
871    }
872}
873
874mod err {
875    use crate::error::{ErrCode, FrozenError};
876
877    /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
878    const DOMAIN_ID: u8 = 0x02;
879
880    #[inline]
881    pub fn new_error<E: std::fmt::Display>(
882        module_id: u8,
883        code: ErrCode,
884        observed_error: E,
885    ) -> FrozenError {
886        FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
887    }
888
889    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
890}
891
892#[cfg(test)]
893mod tests {
894    use super::*;
895    use crate::utils::BufferSize;
896
897    const MODULE_ID: u8 = 0x00;
898    const BUFFER_SIZE: BufferSize = BufferSize::S128;
899    const INITIAL_BUFFER_AMOUT: usize = 0x200;
900    const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
901
902    fn new_objects<P: AsRef<std::path::Path>>(
903        path: P,
904    ) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
905        let file_cfg = ffile::FrozenFileCfg {
906            module_id: MODULE_ID,
907            path: path.as_ref().to_path_buf(),
908            buffer_size: BUFFER_SIZE as usize,
909            initial_available_buffers: INITIAL_BUFFER_AMOUT,
910        };
911        let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
912
913        let pool_cfg = bufpool::BufPoolCfg {
914            buffer_size: BUFFER_SIZE,
915            max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize,
916        };
917        let pool = bufpool::BufPool::new(pool_cfg);
918
919        let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
920        let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
921
922        (file, pool, pipe)
923    }
924
925    fn prep_write(
926        buf_ptr: *const u8,
927        n: usize,
928        pool: &bufpool::BufPool,
929    ) -> bufpool::BufPoolAllocation {
930        let allocation = pool.allocate(n);
931        for allocated_buf in allocation.iter() {
932            unsafe { ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
933        }
934
935        allocation
936    }
937
938    fn compare_with_readback(
939        buf: &[u8],
940        read_index: usize,
941        required: usize,
942        pool: &bufpool::BufPool,
943        file: &ffile::FrozenFile,
944    ) {
945        let read_allocation = pool.allocate(required);
946        let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
947
948        file.preadv(&read_bufs, read_index).unwrap();
949
950        for read_buf in read_allocation.iter() {
951            let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
952            assert_eq!(buf, observed);
953        }
954    }
955
956    mod lifecycle {
957        use super::*;
958
959        #[test]
960        fn ok_new() {
961            let dir = tempfile::tempdir().unwrap();
962            let path = dir.path().join("write_single");
963
964            let file_cfg = ffile::FrozenFileCfg {
965                path,
966                module_id: MODULE_ID,
967                buffer_size: BUFFER_SIZE as usize,
968                initial_available_buffers: INITIAL_BUFFER_AMOUT,
969            };
970            let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
971
972            let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
973            assert!(WritePipe::new(pipe_cfg, file).is_ok());
974        }
975
976        #[test]
977        fn ok_drop() {
978            let dir = tempfile::tempdir().unwrap();
979            let path = dir.path().join("write_single");
980            let (_file, _, pipe) = new_objects(path);
981
982            drop(pipe);
983        }
984    }
985
986    mod shutdown {
987        use super::*;
988
989        #[test]
990        fn ok_drop_before_pending_write_call() {
991            let dir = tempfile::tempdir().unwrap();
992            let path = dir.path().join("write_single");
993            let (_file, pool, pipe) = new_objects(path);
994
995            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
996
997            let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
998            let request = WriteRequest { allocation, slot_index: 0 };
999
1000            assert!(pipe.write(request).is_ok());
1001            drop(pipe);
1002        }
1003
1004        #[test]
1005        fn ok_drop_waits_for_pending_write_call() {
1006            let dir = tempfile::tempdir().unwrap();
1007            let path = dir.path().join("write_single");
1008
1009            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1010
1011            // new + write + drop
1012            {
1013                let (_file, pool, pipe) = new_objects(path.clone());
1014
1015                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1016                let request = WriteRequest { allocation, slot_index: 0 };
1017
1018                assert!(pipe.write(request).is_ok());
1019                drop(pipe);
1020            }
1021
1022            // open + readback
1023            {
1024                let (file, pool, _) = new_objects(path);
1025                compare_with_readback(&BUFFER, 0, 1, &pool, &file);
1026            }
1027        }
1028
1029        #[test]
1030        fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
1031            let dir = tempfile::tempdir().unwrap();
1032            let path = dir.path().join("write_single");
1033            let (_file, pool, pipe) = new_objects(path);
1034
1035            for i in 0..INITIAL_BUFFER_AMOUT {
1036                let buffer = vec![i as u8; BUFFER_SIZE as usize];
1037                let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1038                let request = WriteRequest { allocation, slot_index: 0 };
1039
1040                assert!(pipe.write(request).is_ok());
1041            }
1042
1043            drop(pipe);
1044        }
1045
1046        #[test]
1047        fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
1048            let dir = tempfile::tempdir().unwrap();
1049            let path = dir.path().join("write_single");
1050            let (_file, pool, pipe) = new_objects(path);
1051
1052            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1053
1054            let pipe = sync::Arc::new(pipe);
1055            let pipe2 = sync::Arc::clone(&pipe);
1056
1057            let handle = thread::spawn(move || {
1058                let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1059                let request = WriteRequest { allocation, slot_index: 0 };
1060
1061                assert!(pipe2.write(request).is_ok());
1062            });
1063
1064            drop(pipe);
1065            handle.join().unwrap();
1066        }
1067    }
1068
1069    mod pipe_writes {
1070        use super::*;
1071
1072        #[test]
1073        fn ok_write() {
1074            let dir = tempfile::tempdir().unwrap();
1075            let path = dir.path().join("write_single");
1076            let (_file, pool, pipe) = new_objects(path);
1077
1078            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1079            let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
1080
1081            let request = WriteRequest { allocation, slot_index: 0 };
1082            assert!(pipe.write(request).is_ok());
1083        }
1084
1085        #[test]
1086        fn ok_write_epoch_is_monotonic() {
1087            let dir = tempfile::tempdir().unwrap();
1088            let path = dir.path().join("write_single");
1089            let (_file, pool, pipe) = new_objects(path);
1090
1091            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1092
1093            let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
1094            let ticket1 =
1095                pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1096
1097            let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
1098            let ticket2 =
1099                pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
1100
1101            let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
1102            let ticket3 =
1103                pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
1104
1105            assert!(ticket3.epoch() > ticket2.epoch());
1106            assert!(ticket2.epoch() > ticket1.epoch());
1107        }
1108    }
1109
1110    mod write_ticket {
1111        use super::*;
1112
1113        #[test]
1114        fn ok_readback_after_write_with_await() {
1115            let dir = tempfile::tempdir().unwrap();
1116            let path = dir.path().join("write_single");
1117            let (file, pool, pipe) = new_objects(path);
1118
1119            const REQUIRED: usize = 0x0A;
1120            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1121
1122            let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1123            let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
1124
1125            let ticket = pipe.write(request).unwrap();
1126            let ticket_epoch = ticket.epoch();
1127
1128            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1129            assert!(durable_epoch >= ticket_epoch);
1130
1131            compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
1132        }
1133
1134        #[test]
1135        fn ok_readback_after_batch_write() {
1136            let dir = tempfile::tempdir().unwrap();
1137            let path = dir.path().join("write_single");
1138            let (file, pool, pipe) = new_objects(path);
1139
1140            const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
1141                ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
1142                ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
1143                ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
1144                ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
1145                ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
1146            ];
1147
1148            let mut slot_index = 0;
1149            let mut latest_ticket = None;
1150
1151            for (buf, required) in BUFFERS {
1152                let allocation = prep_write(buf.as_ptr(), required, &pool);
1153                let request = WriteRequest { allocation, slot_index };
1154                let ticket = pipe.write(request).unwrap();
1155
1156                slot_index += required;
1157                latest_ticket = Some(ticket);
1158            }
1159
1160            assert!(latest_ticket.is_some());
1161
1162            if let Some(ticket) = latest_ticket {
1163                let ticket_epoch = ticket.epoch();
1164                let durable_epoch = futures::executor::block_on(ticket).unwrap();
1165
1166                assert!(durable_epoch >= ticket_epoch);
1167            }
1168
1169            let mut read_index = 0;
1170            for (buf, required) in BUFFERS {
1171                compare_with_readback(&buf, read_index, required, &pool, &file);
1172                read_index += required;
1173            }
1174        }
1175
1176        #[test]
1177        fn ok_multiple_concurrent_awaits() {
1178            let dir = tempfile::tempdir().unwrap();
1179            let path = dir.path().join("write_single");
1180            let (_file, pool, pipe) = new_objects(path);
1181
1182            const REQUIRED: usize = 0x0A;
1183            const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1184
1185            let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1186            let ticket1 =
1187                pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1188
1189            let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1190            let ticket2 =
1191                pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
1192
1193            let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
1194
1195            assert!(e1.is_ok());
1196            assert!(e2.is_ok());
1197            assert!(e2.unwrap() > e1.unwrap());
1198        }
1199
1200        #[test]
1201        fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
1202            let dir = tempfile::tempdir().unwrap();
1203            let path = dir.path().join("durability_boundary");
1204
1205            let (file, pool, pipe) = new_objects(path);
1206
1207            const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1208            const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
1209            const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
1210
1211            let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
1212            let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
1213
1214            let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
1215            let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
1216
1217            let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
1218            let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
1219
1220            let epoch_a = ticket_a.epoch();
1221            let epoch_b = ticket_b.epoch();
1222            let epoch_c = ticket_c.epoch();
1223
1224            let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
1225            assert!(durable_epoch >= epoch_c);
1226            assert!(durable_epoch >= epoch_b);
1227            assert!(durable_epoch >= epoch_a);
1228
1229            compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
1230            compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
1231            compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
1232        }
1233    }
1234
1235    mod concurrency {
1236        use super::*;
1237
1238        #[test]
1239        fn ok_multi_threaded_writers() {
1240            const THREADS: usize = 4;
1241            const WRITES_PER_THREAD: usize = 0x40;
1242            const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
1243
1244            let dir = tempfile::tempdir().unwrap();
1245            let path = dir.path().join("multi_threaded_writers");
1246
1247            let (_file, pool, pipe) = new_objects(path);
1248
1249            let pipe = sync::Arc::new(pipe);
1250            let pool = sync::Arc::new(pool);
1251
1252            let mut handles = Vec::with_capacity(THREADS);
1253            for tid in 0..THREADS {
1254                let pipe = sync::Arc::clone(&pipe);
1255                let pool = sync::Arc::clone(&pool);
1256
1257                handles.push(thread::spawn(move || {
1258                    let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1259
1260                    for i in 0..WRITES_PER_THREAD {
1261                        let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1262                        let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1263                        let slot_index = tid * WRITES_PER_THREAD + i;
1264                        let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1265
1266                        tickets.push(ticket);
1267                    }
1268
1269                    tickets
1270                }));
1271            }
1272
1273            let mut tickets = Vec::new();
1274            for handle in handles {
1275                tickets.extend(handle.join().unwrap());
1276            }
1277            assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1278
1279            let mut epochs: Vec<u64> = tickets.iter().map(WriteTicket::epoch).collect();
1280            epochs.sort_unstable();
1281
1282            for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1283                assert_eq!(ed, observed);
1284            }
1285
1286            let latest_ticket = tickets.into_iter().max_by_key(WriteTicket::epoch).unwrap();
1287            let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1288            assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1289        }
1290    }
1291}