frozen_core/wpipe.rs
1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50 | 0.091 | 0.275 |
19//! | P90 | 0.092 | 0.458 |
20//! | P99 | 0.825 | 0.917 |
21//! | Mean | 1.185 | 3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FrozenFileCfg {
45//! path,
46//! module_id: MODULE_ID,
47//! initial_available_buffers: 0x400,
48//! buffer_size: BUFFER_SIZE as usize,
49//! };
50//! let file = sync::Arc::new(
51//! ffile::FrozenFile::new(file_cfg)
52//! .expect("file creation should succeed"),
53//! );
54//!
55//! let pool_cfg = bufpool::BufPoolCfg {
56//! buffer_size: utils::BufferSize::S128,
57//! max_memory: 0x400 * BUFFER_SIZE as usize,
58//! };
59//! let pool = bufpool::BufPool::new(pool_cfg);
60//!
61//! let pipe_cfg = wpipe::WritePipeCfg {
62//! module_id: MODULE_ID,
63//! flush_duration: time::Duration::from_millis(1),
64//! };
65//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
66//! .expect("pipe creation should succeed");
67//!
68//! let payload = [0xAAu8; BUFFER_SIZE as usize];
69//!
70//! let mut latest_ticket = None;
71//! for slot_index in 0..3 {
72//! let allocation = pool.allocate(1);
73//!
74//! unsafe {
75//! ptr::copy_nonoverlapping(
76//! payload.as_ptr(),
77//! allocation.first(),
78//! payload.len(),
79//! );
80//! }
81//!
82//! let ticket = pipe
83//! .write(wpipe::WriteRequest {
84//! allocation,
85//! slot_index,
86//! })
87//! .expect("write should succeed");
88//!
89//! latest_ticket = Some(ticket);
90//! }
91//!
92//! let durable_epoch = futures::executor::block_on(
93//! latest_ticket.expect("ticket should exist"),
94//! )
95//! .expect("writes should become durable");
96//!
97//! assert!(durable_epoch >= 3);
98//! ```
99
100use crate::{
101 bufpool,
102 error::{FrozenError, FrozenResult},
103 ffile, hints, mpscq,
104};
105use std::{
106 ptr,
107 sync::{self, atomic},
108 thread, time,
109};
110
111/// All the available configurations for [`WritePipe`]
112///
113/// ## Example
114///
115/// ```
116/// use frozen_core::wpipe::WritePipeCfg;
117///
118/// let cfg = WritePipeCfg {
119/// module_id: 2,
120/// flush_duration: std::time::Duration::from_millis(0x0A),
121/// };
122///
123/// assert_ne!(cfg.module_id, 0);
124/// assert_ne!(cfg.flush_duration.as_millis(), 0);
125/// ```
126#[derive(Debug, Clone)]
127pub struct WritePipeCfg {
128 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
129 pub module_id: u8,
130
131 /// Time interval used by the background thread to perform hard sync for all the write
132 /// operations submitted in the last durability window
133 pub flush_duration: time::Duration,
134}
135
136/// A low latency asynchronous write pipeline for buffer based storage
137///
138/// ## Design
139///
140/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
141/// pushing the bytes to be written into the MPSC queue.
142///
143/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
144/// hard sync right after. This provides durability for all the writes submitted within the same
145/// [`WritePipeCfg::flush_duration`] batching window.
146///
147/// ## Example
148///
149/// ```
150/// use frozen_core::{bufpool, ffile, utils, wpipe};
151/// use std::{ptr, sync, time};
152///
153/// const MODULE_ID: u8 = 0x00;
154/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
155///
156/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
157/// let path = dir.path().join("wpipe_example");
158///
159/// let file_cfg = ffile::FrozenFileCfg {
160/// path,
161/// module_id: MODULE_ID,
162/// initial_available_buffers: 0x400,
163/// buffer_size: BUFFER_SIZE as usize,
164/// };
165/// let file = sync::Arc::new(
166/// ffile::FrozenFile::new(file_cfg)
167/// .expect("file creation should succeed"),
168/// );
169///
170/// let pool_cfg = bufpool::BufPoolCfg {
171/// buffer_size: utils::BufferSize::S128,
172/// max_memory: 0x400 * BUFFER_SIZE as usize,
173/// };
174/// let pool = bufpool::BufPool::new(pool_cfg);
175///
176/// let pipe_cfg = wpipe::WritePipeCfg {
177/// module_id: MODULE_ID,
178/// flush_duration: time::Duration::from_millis(1),
179/// };
180/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
181/// .expect("pipe creation should succeed");
182///
183/// let payload = [0xAAu8; BUFFER_SIZE as usize];
184///
185/// let mut latest_ticket = None;
186/// for slot_index in 0..3 {
187/// let allocation = pool.allocate(1);
188///
189/// unsafe {
190/// ptr::copy_nonoverlapping(
191/// payload.as_ptr(),
192/// allocation.first(),
193/// payload.len(),
194/// );
195/// }
196///
197/// let ticket = pipe
198/// .write(wpipe::WriteRequest {
199/// allocation,
200/// slot_index,
201/// })
202/// .expect("write should succeed");
203///
204/// latest_ticket = Some(ticket);
205/// }
206///
207/// let durable_epoch = futures::executor::block_on(
208/// latest_ticket.expect("ticket should exist"),
209/// )
210/// .expect("writes should become durable");
211///
212/// assert!(durable_epoch >= 3);
213/// ```
214#[derive(Debug)]
215pub struct WritePipe {
216 core: sync::Arc<Core>,
217 flush_tx_handle: Option<thread::JoinHandle<()>>,
218}
219
220unsafe impl Send for WritePipe {}
221unsafe impl Sync for WritePipe {}
222
223impl WritePipe {
224 /// Create a new instance of [`WritePipe`]
225 ///
226 /// ## Example
227 ///
228 /// ```
229 /// use frozen_core::{bufpool, ffile, utils, wpipe};
230 /// use std::{ptr, sync, time};
231 ///
232 /// const MODULE_ID: u8 = 0x00;
233 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
234 ///
235 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
236 /// let path = dir.path().join("wpipe_example");
237 ///
238 /// let file_cfg = ffile::FrozenFileCfg {
239 /// path,
240 /// module_id: MODULE_ID,
241 /// initial_available_buffers: 0x400,
242 /// buffer_size: BUFFER_SIZE as usize,
243 /// };
244 /// let file = sync::Arc::new(
245 /// ffile::FrozenFile::new(file_cfg)
246 /// .expect("file creation should succeed"),
247 /// );
248 ///
249 /// let pool_cfg = bufpool::BufPoolCfg {
250 /// buffer_size: utils::BufferSize::S128,
251 /// max_memory: 0x400 * BUFFER_SIZE as usize,
252 /// };
253 /// let pool = bufpool::BufPool::new(pool_cfg);
254 ///
255 /// let pipe_cfg = wpipe::WritePipeCfg {
256 /// module_id: MODULE_ID,
257 /// flush_duration: time::Duration::from_millis(1),
258 /// };
259 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
260 /// .expect("pipe creation should succeed");
261 ///
262 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
263 /// let allocation = pool.allocate(1);
264 ///
265 /// unsafe {ptr::copy_nonoverlapping(
266 /// payload.as_ptr(),
267 /// allocation.first(),
268 /// payload.len()
269 /// )};
270 ///
271 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
272 ///
273 /// assert!(
274 /// futures::executor::block_on(ticket.expect("ticket should exist"))
275 /// .is_ok()
276 /// );
277 /// ```
278 #[inline]
279 pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
280 let core = sync::Arc::new(Core::new(file));
281 let cloned_core = core.clone();
282 let flush_tx_handle = match thread::Builder::new()
283 .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
284 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
285 {
286 Ok(handle) => Some(handle),
287 Err(observed_error) => {
288 return Err(err::new_error(cfg.module_id, err::FXE, observed_error));
289 }
290 };
291
292 Ok(Self { core: core, flush_tx_handle })
293 }
294
295 /// Push a write into [`WritePipe`]
296 ///
297 /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
298 /// wait for durability using the manual `await` on [`WriteTicket`].
299 ///
300 /// ## Example
301 ///
302 /// ```
303 /// use frozen_core::{bufpool, ffile, utils, wpipe};
304 /// use std::{ptr, sync, time};
305 ///
306 /// const MODULE_ID: u8 = 0x00;
307 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
308 ///
309 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
310 /// let path = dir.path().join("wpipe_example");
311 ///
312 /// let file_cfg = ffile::FrozenFileCfg {
313 /// path,
314 /// module_id: MODULE_ID,
315 /// initial_available_buffers: 0x400,
316 /// buffer_size: BUFFER_SIZE as usize,
317 /// };
318 /// let file = sync::Arc::new(
319 /// ffile::FrozenFile::new(file_cfg)
320 /// .expect("file creation should succeed"),
321 /// );
322 ///
323 /// let pool_cfg = bufpool::BufPoolCfg {
324 /// buffer_size: utils::BufferSize::S128,
325 /// max_memory: 0x400 * BUFFER_SIZE as usize,
326 /// };
327 /// let pool = bufpool::BufPool::new(pool_cfg);
328 ///
329 /// let pipe_cfg = wpipe::WritePipeCfg {
330 /// module_id: MODULE_ID,
331 /// flush_duration: time::Duration::from_millis(1),
332 /// };
333 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
334 /// .expect("pipe creation should succeed");
335 ///
336 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
337 /// let allocation = pool.allocate(1);
338 ///
339 /// unsafe {ptr::copy_nonoverlapping(
340 /// payload.as_ptr(),
341 /// allocation.first(),
342 /// payload.len()
343 /// )};
344 ///
345 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
346 ///
347 /// assert!(
348 /// futures::executor::block_on(ticket.expect("ticket should exist"))
349 /// .is_ok()
350 /// );
351 /// ```
352 #[inline]
353 pub fn write(&self, request: WriteRequest) -> FrozenResult<WriteTicket> {
354 let _io_lock = self.core.acquire_shared_io_lock();
355 if let Some(frozen_error) = self.core.completion.error.get() {
356 return Err(frozen_error);
357 }
358
359 let epoch = self.core.increment_current_epoch();
360 let internal_req = WriteRequestInternal { request, epoch };
361 self.core.queue.push(internal_req);
362
363 Ok(WriteTicket { epoch, completion: self.core.completion.clone() })
364 }
365}
366
367impl Drop for WritePipe {
368 fn drop(&mut self) {
369 self.core.closed.store(true, atomic::Ordering::Release);
370 self.core.flush_cv.notify_one();
371
372 if let Some(handle) = self.flush_tx_handle.take() {
373 let _ = handle.join();
374 }
375 }
376}
377
378/// A write operation submitted to [`WritePipe`]
379///
380/// The request contains the buffers to persist along with the destination slot index in the
381/// underlying [`FrozenFile`].
382///
383/// ## Example
384///
385/// ```
386/// use frozen_core::{bufpool, utils, wpipe};
387/// use std::ptr;
388///
389/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
390///
391/// let pool_cfg = bufpool::BufPoolCfg {
392/// buffer_size: utils::BufferSize::S128,
393/// max_memory: 0x400 * BUFFER_SIZE as usize,
394/// };
395/// let pool = bufpool::BufPool::new(pool_cfg);
396///
397/// let payload = vec![0x0A; BUFFER_SIZE as usize];
398/// let allocation = pool.allocate(1);
399///
400/// unsafe {ptr::copy_nonoverlapping(
401/// payload.as_ptr(),
402/// allocation.first(),
403/// payload.len()
404/// )};
405///
406/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
407/// assert!(request.slot_index >= 0);
408/// ```
409#[derive(Debug)]
410pub struct WriteRequest {
411 /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
412 ///
413 /// ## Example
414 ///
415 /// ```
416 /// use frozen_core::{bufpool, utils, wpipe};
417 /// use std::ptr;
418 ///
419 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
420 ///
421 /// let pool_cfg = bufpool::BufPoolCfg {
422 /// buffer_size: utils::BufferSize::S128,
423 /// max_memory: 0x400 * BUFFER_SIZE as usize,
424 /// };
425 /// let pool = bufpool::BufPool::new(pool_cfg);
426 ///
427 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
428 /// let allocation = pool.allocate(1);
429 ///
430 /// unsafe {ptr::copy_nonoverlapping(
431 /// payload.as_ptr(),
432 /// allocation.first(),
433 /// payload.len()
434 /// )};
435 ///
436 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
437 /// assert!(!request.allocation.first().is_null());
438 /// ```
439 pub allocation: bufpool::BufPoolAllocation,
440
441 /// Destination slot index where the pages of the allocation will be written from
442 ///
443 /// ## Example
444 ///
445 /// ```
446 /// use frozen_core::{bufpool, utils, wpipe};
447 /// use std::ptr;
448 ///
449 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
450 ///
451 /// let pool_cfg = bufpool::BufPoolCfg {
452 /// buffer_size: utils::BufferSize::S128,
453 /// max_memory: 0x400 * BUFFER_SIZE as usize,
454 /// };
455 /// let pool = bufpool::BufPool::new(pool_cfg);
456 ///
457 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
458 /// let allocation = pool.allocate(1);
459 ///
460 /// unsafe {ptr::copy_nonoverlapping(
461 /// payload.as_ptr(),
462 /// allocation.first(),
463 /// payload.len()
464 /// )};
465 ///
466 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
467 /// assert!(request.slot_index >= 0);
468 /// ```
469 pub slot_index: usize,
470}
471
472/// Durability handle associated with the submitted write operation via [`WritePipe::write`]
473///
474/// ## Epoch
475///
476/// Every ticket is assigned a monotonically increasing epoch to moniter durability
477///
478/// ## Durability Guarantee
479///
480/// If wanted, the ticket could be awaited to poll till the epoch becomes durable.
481///
482/// Once a await on ticket is completed successfully, all writes assigned to earlier epochs are
483/// also guaranteed to be durable.
484///
485/// *NOTE:* Using `await` is optional. Callers that only require fire-and-forget semantics may
486/// simply discard the returned ticket.
487///
488/// ## Example
489///
490/// ```
491/// use frozen_core::{bufpool, ffile, utils, wpipe};
492/// use std::{ptr, sync, time};
493///
494/// const MODULE_ID: u8 = 0x00;
495/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
496///
497/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
498/// let path = dir.path().join("wpipe_example");
499///
500/// let file_cfg = ffile::FrozenFileCfg {
501/// path,
502/// module_id: MODULE_ID,
503/// initial_available_buffers: 0x400,
504/// buffer_size: BUFFER_SIZE as usize,
505/// };
506/// let file = sync::Arc::new(
507/// ffile::FrozenFile::new(file_cfg)
508/// .expect("file creation should succeed"),
509/// );
510///
511/// let pool_cfg = bufpool::BufPoolCfg {
512/// buffer_size: utils::BufferSize::S128,
513/// max_memory: 0x400 * BUFFER_SIZE as usize,
514/// };
515/// let pool = bufpool::BufPool::new(pool_cfg);
516///
517/// let pipe_cfg = wpipe::WritePipeCfg {
518/// module_id: MODULE_ID,
519/// flush_duration: time::Duration::from_millis(1),
520/// };
521/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
522/// .expect("pipe creation should succeed");
523///
524/// let payload = vec![0x0A; BUFFER_SIZE as usize];
525/// let allocation = pool.allocate(1);
526///
527/// unsafe {ptr::copy_nonoverlapping(
528/// payload.as_ptr(),
529/// allocation.first(),
530/// payload.len()
531/// )};
532///
533/// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
534/// .unwrap();
535/// let epoch = ticket.epoch();
536///
537/// assert!(epoch > 0);
538/// ```
539#[derive(Debug)]
540pub struct WriteTicket {
541 epoch: u64,
542 completion: sync::Arc<Completion>,
543}
544
545impl WriteTicket {
546 /// Read assigned durability epoch for the [`WriteTicket`]
547 ///
548 /// ## Example
549 ///
550 /// ```
551 /// use frozen_core::{bufpool, ffile, utils, wpipe};
552 /// use std::{ptr, sync, time};
553 ///
554 /// const MODULE_ID: u8 = 0x00;
555 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
556 ///
557 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
558 /// let path = dir.path().join("wpipe_example");
559 ///
560 /// let file_cfg = ffile::FrozenFileCfg {
561 /// path,
562 /// module_id: MODULE_ID,
563 /// initial_available_buffers: 0x400,
564 /// buffer_size: BUFFER_SIZE as usize,
565 /// };
566 /// let file = sync::Arc::new(
567 /// ffile::FrozenFile::new(file_cfg)
568 /// .expect("file creation should succeed"),
569 /// );
570 ///
571 /// let pool_cfg = bufpool::BufPoolCfg {
572 /// buffer_size: utils::BufferSize::S128,
573 /// max_memory: 0x400 * BUFFER_SIZE as usize,
574 /// };
575 /// let pool = bufpool::BufPool::new(pool_cfg);
576 ///
577 /// let pipe_cfg = wpipe::WritePipeCfg {
578 /// module_id: MODULE_ID,
579 /// flush_duration: time::Duration::from_millis(1),
580 /// };
581 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
582 /// .expect("pipe creation should succeed");
583 ///
584 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
585 /// let allocation = pool.allocate(1);
586 ///
587 /// unsafe {ptr::copy_nonoverlapping(
588 /// payload.as_ptr(),
589 /// allocation.first(),
590 /// payload.len()
591 /// )};
592 ///
593 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
594 /// .unwrap();
595 /// let epoch = ticket.epoch();
596 ///
597 /// assert!(epoch > 0);
598 /// ```
599 pub const fn epoch(&self) -> u64 {
600 self.epoch
601 }
602
603 #[inline]
604 fn is_ready(&self) -> bool {
605 let durable = self.completion.durable_epoch.load(atomic::Ordering::Acquire);
606 if hints::likely(durable >= self.epoch) {
607 return true;
608 }
609
610 false
611 }
612}
613
614impl std::future::Future for WriteTicket {
615 type Output = FrozenResult<u64>;
616
617 #[inline(always)]
618 fn poll(
619 self: std::pin::Pin<&mut Self>,
620 cx: &mut std::task::Context<'_>,
621 ) -> std::task::Poll<Self::Output> {
622 if self.is_ready() {
623 return std::task::Poll::Ready(Ok(self.epoch));
624 }
625
626 if let Some(frozen_error) = self.completion.error.get() {
627 return std::task::Poll::Ready(Err(frozen_error));
628 }
629
630 self.completion.waker.register(cx.waker());
631
632 if self.is_ready() {
633 return std::task::Poll::Ready(Ok(self.epoch));
634 }
635
636 if let Some(frozen_error) = self.completion.error.get() {
637 return std::task::Poll::Ready(Err(frozen_error));
638 }
639
640 std::task::Poll::Pending
641 }
642}
643
644/// ## Why we ignore [`std::sync::PoisonError`]?
645///
646/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
647/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
648/// and are completely seperated from the mutex.
649///
650/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
651/// an inconsistent state of the protected value. Since no state can be left partially modified
652/// under this lock, there is no possible consistency risk to recover from and propagating the
653/// poison error would only introduce unnecessary failures into the allocation path.
654///
655/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
656/// with the recovered guard.
657fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
658 let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
659 loop {
660 (guard, _) =
661 core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
662
663 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
664 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
665
666 let queued_ops = core.queue.drain();
667 let closed = core.closed.load(atomic::Ordering::Acquire);
668
669 if queued_ops.is_empty() {
670 if closed {
671 return;
672 }
673
674 continue;
675 }
676
677 // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
678 // while sync is in progress
679 let _io_lock = core.acquire_exclusive_io_lock();
680
681 let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
682 Ok(res) => res,
683 Err(new_error) => {
684 core.completion.error.set(new_error);
685 core.completion.waker.wake();
686 drop(_io_lock);
687
688 continue;
689 }
690 };
691
692 // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
693 #[cfg(target_os = "linux")]
694 if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
695 core.completion.error.set(new_error);
696 core.completion.waker.wake();
697 drop(_io_lock);
698
699 continue;
700 }
701
702 // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
703 // clear it up when another call succeeds.
704 //
705 // This is valid as the underlying sync flushes entire batch all at once, hence even if the
706 // last call failed, and the new one succeeded, we do get the durability guarantee for the
707 // old data as well.
708
709 if let Err(new_error) = core.file.sync() {
710 core.completion.error.set(new_error);
711 core.completion.waker.wake();
712 drop(_io_lock);
713
714 continue;
715 } else {
716 core.completion.mark_epoch_as_durable(max_epoch);
717 core.completion.error.del();
718 }
719 }
720}
721
722#[derive(Debug)]
723struct Core {
724 completion: sync::Arc<Completion>,
725 closed: atomic::AtomicBool,
726 epoch: atomic::AtomicU64,
727 file: sync::Arc<ffile::FrozenFile>,
728 flush_cv: sync::Condvar,
729 flush_guard: sync::Mutex<()>,
730 io_lock: sync::RwLock<()>,
731 queue: mpscq::MPSCQueue<WriteRequestInternal>,
732}
733
734impl Core {
735 fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
736 Self {
737 file,
738 completion: sync::Arc::new(Completion::default()),
739 closed: atomic::AtomicBool::new(false),
740 epoch: atomic::AtomicU64::new(0),
741 flush_cv: sync::Condvar::new(),
742 flush_guard: sync::Mutex::new(()),
743 io_lock: sync::RwLock::new(()),
744 queue: mpscq::MPSCQueue::default(),
745 }
746 }
747
748 #[inline]
749 fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
750 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
751 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
752 }
753
754 #[inline]
755 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
756 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
757 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
758 }
759
760 #[inline(always)]
761 fn write_queued_ops(
762 &self,
763 queued_ops: Vec<WriteRequestInternal>,
764 ) -> FrozenResult<(usize, usize, u64)> {
765 let mut max_epoch = 0;
766 let mut max_index = 0;
767 let mut min_index = usize::MAX;
768
769 for op in queued_ops {
770 let ops_len = op.request.allocation.length();
771 match ops_len {
772 1 => {
773 self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
774 }
775 _ => {
776 let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
777 self.file.pwritev(&bufs, op.request.slot_index)?;
778 }
779 }
780
781 max_epoch = max_epoch.max(op.epoch);
782 min_index = min_index.min(op.request.slot_index);
783 max_index = max_index.max(op.request.slot_index + ops_len);
784 }
785
786 Ok((min_index, max_index, max_epoch))
787 }
788
789 #[inline]
790 fn increment_current_epoch(&self) -> u64 {
791 self.epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
792 }
793}
794
795#[derive(Debug)]
796struct WriteRequestInternal {
797 epoch: u64,
798 request: WriteRequest,
799}
800
801#[derive(Debug)]
802struct Completion {
803 durable_epoch: atomic::AtomicU64,
804 error: FlushError,
805 waker: futures::task::AtomicWaker,
806}
807
808impl Default for Completion {
809 fn default() -> Self {
810 Self {
811 durable_epoch: atomic::AtomicU64::new(0),
812 waker: futures::task::AtomicWaker::new(),
813 error: FlushError::default(),
814 }
815 }
816}
817
818impl Completion {
819 fn mark_epoch_as_durable(&self, epoch: u64) {
820 self.durable_epoch.store(epoch, atomic::Ordering::Release);
821 self.waker.wake();
822 }
823}
824
825#[derive(Debug)]
826struct FlushError(atomic::AtomicPtr<FrozenError>);
827
828impl Default for FlushError {
829 fn default() -> Self {
830 Self(atomic::AtomicPtr::new(ptr::null_mut()))
831 }
832}
833
834impl Drop for FlushError {
835 fn drop(&mut self) {
836 let err_ptr = self.0.load(atomic::Ordering::Acquire);
837 if !err_ptr.is_null() {
838 let _ = unsafe { Box::from_raw(err_ptr) };
839 }
840 }
841}
842
843impl FlushError {
844 #[inline]
845 fn get(&self) -> Option<FrozenError> {
846 let curr_err = self.0.load(atomic::Ordering::Acquire);
847 if hints::unlikely(!curr_err.is_null()) {
848 let frozen_error = unsafe { (*curr_err).clone() };
849 return Some(frozen_error);
850 }
851
852 None
853 }
854
855 #[inline]
856 fn set(&self, new_error: FrozenError) {
857 let boxed_error = Box::into_raw(Box::new(new_error));
858 let old_err = self.0.swap(boxed_error, atomic::Ordering::AcqRel);
859
860 if hints::unlikely(!old_err.is_null()) {
861 let _ = unsafe { Box::from_raw(old_err) };
862 }
863 }
864
865 #[inline]
866 fn del(&self) {
867 let old_err = self.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
868 if hints::unlikely(!old_err.is_null()) {
869 let _ = unsafe { Box::from_raw(old_err) };
870 }
871 }
872}
873
874mod err {
875 use crate::error::{ErrCode, FrozenError};
876
877 /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
878 const DOMAIN_ID: u8 = 0x02;
879
880 #[inline]
881 pub fn new_error<E: std::fmt::Display>(
882 module_id: u8,
883 code: ErrCode,
884 observed_error: E,
885 ) -> FrozenError {
886 FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
887 }
888
889 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
890}
891
892#[cfg(test)]
893mod tests {
894 use super::*;
895 use crate::utils::BufferSize;
896
897 const MODULE_ID: u8 = 0x00;
898 const BUFFER_SIZE: BufferSize = BufferSize::S128;
899 const INITIAL_BUFFER_AMOUT: usize = 0x200;
900 const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
901
902 fn new_objects<P: AsRef<std::path::Path>>(
903 path: P,
904 ) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
905 let file_cfg = ffile::FrozenFileCfg {
906 module_id: MODULE_ID,
907 path: path.as_ref().to_path_buf(),
908 buffer_size: BUFFER_SIZE as usize,
909 initial_available_buffers: INITIAL_BUFFER_AMOUT,
910 };
911 let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
912
913 let pool_cfg = bufpool::BufPoolCfg {
914 buffer_size: BUFFER_SIZE,
915 max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize,
916 };
917 let pool = bufpool::BufPool::new(pool_cfg);
918
919 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
920 let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
921
922 (file, pool, pipe)
923 }
924
925 fn prep_write(
926 buf_ptr: *const u8,
927 n: usize,
928 pool: &bufpool::BufPool,
929 ) -> bufpool::BufPoolAllocation {
930 let allocation = pool.allocate(n);
931 for allocated_buf in allocation.iter() {
932 unsafe { ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
933 }
934
935 allocation
936 }
937
938 fn compare_with_readback(
939 buf: &[u8],
940 read_index: usize,
941 required: usize,
942 pool: &bufpool::BufPool,
943 file: &ffile::FrozenFile,
944 ) {
945 let read_allocation = pool.allocate(required);
946 let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
947
948 file.preadv(&read_bufs, read_index).unwrap();
949
950 for read_buf in read_allocation.iter() {
951 let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
952 assert_eq!(buf, observed);
953 }
954 }
955
956 mod lifecycle {
957 use super::*;
958
959 #[test]
960 fn ok_new() {
961 let dir = tempfile::tempdir().unwrap();
962 let path = dir.path().join("write_single");
963
964 let file_cfg = ffile::FrozenFileCfg {
965 path,
966 module_id: MODULE_ID,
967 buffer_size: BUFFER_SIZE as usize,
968 initial_available_buffers: INITIAL_BUFFER_AMOUT,
969 };
970 let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
971
972 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
973 assert!(WritePipe::new(pipe_cfg, file).is_ok());
974 }
975
976 #[test]
977 fn ok_drop() {
978 let dir = tempfile::tempdir().unwrap();
979 let path = dir.path().join("write_single");
980 let (_file, _, pipe) = new_objects(path);
981
982 drop(pipe);
983 }
984 }
985
986 mod shutdown {
987 use super::*;
988
989 #[test]
990 fn ok_drop_before_pending_write_call() {
991 let dir = tempfile::tempdir().unwrap();
992 let path = dir.path().join("write_single");
993 let (_file, pool, pipe) = new_objects(path);
994
995 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
996
997 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
998 let request = WriteRequest { allocation, slot_index: 0 };
999
1000 assert!(pipe.write(request).is_ok());
1001 drop(pipe);
1002 }
1003
1004 #[test]
1005 fn ok_drop_waits_for_pending_write_call() {
1006 let dir = tempfile::tempdir().unwrap();
1007 let path = dir.path().join("write_single");
1008
1009 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1010
1011 // new + write + drop
1012 {
1013 let (_file, pool, pipe) = new_objects(path.clone());
1014
1015 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1016 let request = WriteRequest { allocation, slot_index: 0 };
1017
1018 assert!(pipe.write(request).is_ok());
1019 drop(pipe);
1020 }
1021
1022 // open + readback
1023 {
1024 let (file, pool, _) = new_objects(path);
1025 compare_with_readback(&BUFFER, 0, 1, &pool, &file);
1026 }
1027 }
1028
1029 #[test]
1030 fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
1031 let dir = tempfile::tempdir().unwrap();
1032 let path = dir.path().join("write_single");
1033 let (_file, pool, pipe) = new_objects(path);
1034
1035 for i in 0..INITIAL_BUFFER_AMOUT {
1036 let buffer = vec![i as u8; BUFFER_SIZE as usize];
1037 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1038 let request = WriteRequest { allocation, slot_index: 0 };
1039
1040 assert!(pipe.write(request).is_ok());
1041 }
1042
1043 drop(pipe);
1044 }
1045
1046 #[test]
1047 fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
1048 let dir = tempfile::tempdir().unwrap();
1049 let path = dir.path().join("write_single");
1050 let (_file, pool, pipe) = new_objects(path);
1051
1052 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1053
1054 let pipe = sync::Arc::new(pipe);
1055 let pipe2 = sync::Arc::clone(&pipe);
1056
1057 let handle = thread::spawn(move || {
1058 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1059 let request = WriteRequest { allocation, slot_index: 0 };
1060
1061 assert!(pipe2.write(request).is_ok());
1062 });
1063
1064 drop(pipe);
1065 handle.join().unwrap();
1066 }
1067 }
1068
1069 mod pipe_writes {
1070 use super::*;
1071
1072 #[test]
1073 fn ok_write() {
1074 let dir = tempfile::tempdir().unwrap();
1075 let path = dir.path().join("write_single");
1076 let (_file, pool, pipe) = new_objects(path);
1077
1078 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1079 let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
1080
1081 let request = WriteRequest { allocation, slot_index: 0 };
1082 assert!(pipe.write(request).is_ok());
1083 }
1084
1085 #[test]
1086 fn ok_write_epoch_is_monotonic() {
1087 let dir = tempfile::tempdir().unwrap();
1088 let path = dir.path().join("write_single");
1089 let (_file, pool, pipe) = new_objects(path);
1090
1091 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1092
1093 let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
1094 let ticket1 =
1095 pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1096
1097 let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
1098 let ticket2 =
1099 pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
1100
1101 let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
1102 let ticket3 =
1103 pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
1104
1105 assert!(ticket3.epoch() > ticket2.epoch());
1106 assert!(ticket2.epoch() > ticket1.epoch());
1107 }
1108 }
1109
1110 mod write_ticket {
1111 use super::*;
1112
1113 #[test]
1114 fn ok_readback_after_write_with_await() {
1115 let dir = tempfile::tempdir().unwrap();
1116 let path = dir.path().join("write_single");
1117 let (file, pool, pipe) = new_objects(path);
1118
1119 const REQUIRED: usize = 0x0A;
1120 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1121
1122 let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1123 let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
1124
1125 let ticket = pipe.write(request).unwrap();
1126 let ticket_epoch = ticket.epoch();
1127
1128 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1129 assert!(durable_epoch >= ticket_epoch);
1130
1131 compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
1132 }
1133
1134 #[test]
1135 fn ok_readback_after_batch_write() {
1136 let dir = tempfile::tempdir().unwrap();
1137 let path = dir.path().join("write_single");
1138 let (file, pool, pipe) = new_objects(path);
1139
1140 const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
1141 ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
1142 ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
1143 ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
1144 ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
1145 ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
1146 ];
1147
1148 let mut slot_index = 0;
1149 let mut latest_ticket = None;
1150
1151 for (buf, required) in BUFFERS {
1152 let allocation = prep_write(buf.as_ptr(), required, &pool);
1153 let request = WriteRequest { allocation, slot_index };
1154 let ticket = pipe.write(request).unwrap();
1155
1156 slot_index += required;
1157 latest_ticket = Some(ticket);
1158 }
1159
1160 assert!(latest_ticket.is_some());
1161
1162 if let Some(ticket) = latest_ticket {
1163 let ticket_epoch = ticket.epoch();
1164 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1165
1166 assert!(durable_epoch >= ticket_epoch);
1167 }
1168
1169 let mut read_index = 0;
1170 for (buf, required) in BUFFERS {
1171 compare_with_readback(&buf, read_index, required, &pool, &file);
1172 read_index += required;
1173 }
1174 }
1175
1176 #[test]
1177 fn ok_multiple_concurrent_awaits() {
1178 let dir = tempfile::tempdir().unwrap();
1179 let path = dir.path().join("write_single");
1180 let (_file, pool, pipe) = new_objects(path);
1181
1182 const REQUIRED: usize = 0x0A;
1183 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1184
1185 let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1186 let ticket1 =
1187 pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1188
1189 let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1190 let ticket2 =
1191 pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
1192
1193 let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
1194
1195 assert!(e1.is_ok());
1196 assert!(e2.is_ok());
1197 assert!(e2.unwrap() > e1.unwrap());
1198 }
1199
1200 #[test]
1201 fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
1202 let dir = tempfile::tempdir().unwrap();
1203 let path = dir.path().join("durability_boundary");
1204
1205 let (file, pool, pipe) = new_objects(path);
1206
1207 const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1208 const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
1209 const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
1210
1211 let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
1212 let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
1213
1214 let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
1215 let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
1216
1217 let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
1218 let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
1219
1220 let epoch_a = ticket_a.epoch();
1221 let epoch_b = ticket_b.epoch();
1222 let epoch_c = ticket_c.epoch();
1223
1224 let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
1225 assert!(durable_epoch >= epoch_c);
1226 assert!(durable_epoch >= epoch_b);
1227 assert!(durable_epoch >= epoch_a);
1228
1229 compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
1230 compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
1231 compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
1232 }
1233 }
1234
1235 mod concurrency {
1236 use super::*;
1237
1238 #[test]
1239 fn ok_multi_threaded_writers() {
1240 const THREADS: usize = 4;
1241 const WRITES_PER_THREAD: usize = 0x40;
1242 const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
1243
1244 let dir = tempfile::tempdir().unwrap();
1245 let path = dir.path().join("multi_threaded_writers");
1246
1247 let (_file, pool, pipe) = new_objects(path);
1248
1249 let pipe = sync::Arc::new(pipe);
1250 let pool = sync::Arc::new(pool);
1251
1252 let mut handles = Vec::with_capacity(THREADS);
1253 for tid in 0..THREADS {
1254 let pipe = sync::Arc::clone(&pipe);
1255 let pool = sync::Arc::clone(&pool);
1256
1257 handles.push(thread::spawn(move || {
1258 let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1259
1260 for i in 0..WRITES_PER_THREAD {
1261 let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1262 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1263 let slot_index = tid * WRITES_PER_THREAD + i;
1264 let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1265
1266 tickets.push(ticket);
1267 }
1268
1269 tickets
1270 }));
1271 }
1272
1273 let mut tickets = Vec::new();
1274 for handle in handles {
1275 tickets.extend(handle.join().unwrap());
1276 }
1277 assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1278
1279 let mut epochs: Vec<u64> = tickets.iter().map(WriteTicket::epoch).collect();
1280 epochs.sort_unstable();
1281
1282 for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1283 assert_eq!(ed, observed);
1284 }
1285
1286 let latest_ticket = tickets.into_iter().max_by_key(WriteTicket::epoch).unwrap();
1287 let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1288 assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1289 }
1290 }
1291}