frozen_core/wpipe.rs
1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50 | 0.091 | 0.275 |
19//! | P90 | 0.092 | 0.458 |
20//! | P99 | 0.825 | 0.917 |
21//! | Mean | 1.185 | 3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FrozenFileCfg {
45//! path,
46//! module_id: MODULE_ID,
47//! initial_available_buffers: 0x400,
48//! buffer_size: BUFFER_SIZE as usize,
49//! };
50//! let file = sync::Arc::new(
51//! ffile::FrozenFile::new(file_cfg)
52//! .expect("file creation should succeed"),
53//! );
54//!
55//! let pool_cfg = bufpool::BufPoolCfg {
56//! buffer_size: utils::BufferSize::S128,
57//! max_memory: 0x400 * BUFFER_SIZE as usize,
58//! };
59//! let pool = bufpool::BufPool::new(pool_cfg);
60//!
61//! let pipe_cfg = wpipe::WritePipeCfg {
62//! module_id: MODULE_ID,
63//! flush_duration: time::Duration::from_millis(1),
64//! };
65//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
66//! .expect("pipe creation should succeed");
67//!
68//! let payload = [0xAAu8; BUFFER_SIZE as usize];
69//!
70//! let mut latest_ticket = None;
71//! for slot_index in 0..3 {
72//! let allocation = pool.allocate(1);
73//!
74//! unsafe {
75//! ptr::copy_nonoverlapping(
76//! payload.as_ptr(),
77//! allocation.first(),
78//! payload.len(),
79//! );
80//! }
81//!
82//! let ticket = pipe
83//! .write(wpipe::WriteRequest {
84//! allocation,
85//! slot_index,
86//! })
87//! .expect("write should succeed");
88//!
89//! latest_ticket = Some(ticket);
90//! }
91//!
92//! let durable_epoch = futures::executor::block_on(
93//! latest_ticket.expect("ticket should exist"),
94//! )
95//! .expect("writes should become durable");
96//!
97//! assert!(durable_epoch >= 3);
98//! ```
99
100use crate::{
101 bufpool,
102 error::{FrozenError, FrozenResult},
103 ffile, hints, mpscq,
104};
105use std::{
106 ptr,
107 sync::{self, atomic},
108 thread, time,
109};
110
111/// All the available configurations for [`WritePipe`]
112///
113/// ## Example
114///
115/// ```
116/// use frozen_core::wpipe::WritePipeCfg;
117///
118/// let cfg = WritePipeCfg {
119/// module_id: 2,
120/// flush_duration: std::time::Duration::from_millis(0x0A),
121/// };
122///
123/// assert_ne!(cfg.module_id, 0);
124/// assert_ne!(cfg.flush_duration.as_millis(), 0);
125/// ```
126#[derive(Debug, Clone)]
127pub struct WritePipeCfg {
128 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
129 pub module_id: u8,
130
131 /// Time interval used by the background thread to perform hard sync for all the write
132 /// operations submitted in the last durability window
133 pub flush_duration: time::Duration,
134}
135
136/// A low latency asynchronous write pipeline for buffer based storage
137///
138/// ## Design
139///
140/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
141/// pushing the bytes to be written into the MPSC queue.
142///
143/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
144/// hard sync right after. This provides durability for all the writes submitted within the same
145/// [`WritePipeCfg::flush_duration`] batching window.
146///
147/// ## Example
148///
149/// ```
150/// use frozen_core::{bufpool, ffile, utils, wpipe};
151/// use std::{ptr, sync, time};
152///
153/// const MODULE_ID: u8 = 0x00;
154/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
155///
156/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
157/// let path = dir.path().join("wpipe_example");
158///
159/// let file_cfg = ffile::FrozenFileCfg {
160/// path,
161/// module_id: MODULE_ID,
162/// initial_available_buffers: 0x400,
163/// buffer_size: BUFFER_SIZE as usize,
164/// };
165/// let file = sync::Arc::new(
166/// ffile::FrozenFile::new(file_cfg)
167/// .expect("file creation should succeed"),
168/// );
169///
170/// let pool_cfg = bufpool::BufPoolCfg {
171/// buffer_size: utils::BufferSize::S128,
172/// max_memory: 0x400 * BUFFER_SIZE as usize,
173/// };
174/// let pool = bufpool::BufPool::new(pool_cfg);
175///
176/// let pipe_cfg = wpipe::WritePipeCfg {
177/// module_id: MODULE_ID,
178/// flush_duration: time::Duration::from_millis(1),
179/// };
180/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
181/// .expect("pipe creation should succeed");
182///
183/// let payload = [0xAAu8; BUFFER_SIZE as usize];
184///
185/// let mut latest_ticket = None;
186/// for slot_index in 0..3 {
187/// let allocation = pool.allocate(1);
188///
189/// unsafe {
190/// ptr::copy_nonoverlapping(
191/// payload.as_ptr(),
192/// allocation.first(),
193/// payload.len(),
194/// );
195/// }
196///
197/// let ticket = pipe
198/// .write(wpipe::WriteRequest {
199/// allocation,
200/// slot_index,
201/// })
202/// .expect("write should succeed");
203///
204/// latest_ticket = Some(ticket);
205/// }
206///
207/// let durable_epoch = futures::executor::block_on(
208/// latest_ticket.expect("ticket should exist"),
209/// )
210/// .expect("writes should become durable");
211///
212/// assert!(durable_epoch >= 3);
213/// ```
214#[derive(Debug)]
215pub struct WritePipe {
216 core: sync::Arc<Core>,
217 flush_tx_handle: Option<thread::JoinHandle<()>>,
218}
219
220unsafe impl Send for WritePipe {}
221unsafe impl Sync for WritePipe {}
222
223impl WritePipe {
224 /// Create a new instance of [`WritePipe`]
225 ///
226 /// ## Example
227 ///
228 /// ```
229 /// use frozen_core::{bufpool, ffile, utils, wpipe};
230 /// use std::{ptr, sync, time};
231 ///
232 /// const MODULE_ID: u8 = 0x00;
233 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
234 ///
235 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
236 /// let path = dir.path().join("wpipe_example");
237 ///
238 /// let file_cfg = ffile::FrozenFileCfg {
239 /// path,
240 /// module_id: MODULE_ID,
241 /// initial_available_buffers: 0x400,
242 /// buffer_size: BUFFER_SIZE as usize,
243 /// };
244 /// let file = sync::Arc::new(
245 /// ffile::FrozenFile::new(file_cfg)
246 /// .expect("file creation should succeed"),
247 /// );
248 ///
249 /// let pool_cfg = bufpool::BufPoolCfg {
250 /// buffer_size: utils::BufferSize::S128,
251 /// max_memory: 0x400 * BUFFER_SIZE as usize,
252 /// };
253 /// let pool = bufpool::BufPool::new(pool_cfg);
254 ///
255 /// let pipe_cfg = wpipe::WritePipeCfg {
256 /// module_id: MODULE_ID,
257 /// flush_duration: time::Duration::from_millis(1),
258 /// };
259 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
260 /// .expect("pipe creation should succeed");
261 ///
262 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
263 /// let allocation = pool.allocate(1);
264 ///
265 /// unsafe {ptr::copy_nonoverlapping(
266 /// payload.as_ptr(),
267 /// allocation.first(),
268 /// payload.len()
269 /// )};
270 ///
271 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
272 ///
273 /// assert!(
274 /// futures::executor::block_on(ticket.expect("ticket should exist"))
275 /// .is_ok()
276 /// );
277 /// ```
278 #[inline]
279 pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
280 let core = sync::Arc::new(Core::new(file));
281 let cloned_core = core.clone();
282 let flush_tx_handle = match thread::Builder::new()
283 .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
284 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
285 {
286 Ok(handle) => Some(handle),
287 Err(observed_error) => return Err(err::new_error(cfg.module_id, err::FXE, observed_error)),
288 };
289
290 Ok(Self { core: core, flush_tx_handle })
291 }
292
293 /// Push a write into [`WritePipe`]
294 ///
295 /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
296 /// wait for durability using the manual `await` on [`WriteTicket`].
297 ///
298 /// ## Example
299 ///
300 /// ```
301 /// use frozen_core::{bufpool, ffile, utils, wpipe};
302 /// use std::{ptr, sync, time};
303 ///
304 /// const MODULE_ID: u8 = 0x00;
305 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
306 ///
307 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
308 /// let path = dir.path().join("wpipe_example");
309 ///
310 /// let file_cfg = ffile::FrozenFileCfg {
311 /// path,
312 /// module_id: MODULE_ID,
313 /// initial_available_buffers: 0x400,
314 /// buffer_size: BUFFER_SIZE as usize,
315 /// };
316 /// let file = sync::Arc::new(
317 /// ffile::FrozenFile::new(file_cfg)
318 /// .expect("file creation should succeed"),
319 /// );
320 ///
321 /// let pool_cfg = bufpool::BufPoolCfg {
322 /// buffer_size: utils::BufferSize::S128,
323 /// max_memory: 0x400 * BUFFER_SIZE as usize,
324 /// };
325 /// let pool = bufpool::BufPool::new(pool_cfg);
326 ///
327 /// let pipe_cfg = wpipe::WritePipeCfg {
328 /// module_id: MODULE_ID,
329 /// flush_duration: time::Duration::from_millis(1),
330 /// };
331 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
332 /// .expect("pipe creation should succeed");
333 ///
334 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
335 /// let allocation = pool.allocate(1);
336 ///
337 /// unsafe {ptr::copy_nonoverlapping(
338 /// payload.as_ptr(),
339 /// allocation.first(),
340 /// payload.len()
341 /// )};
342 ///
343 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
344 ///
345 /// assert!(
346 /// futures::executor::block_on(ticket.expect("ticket should exist"))
347 /// .is_ok()
348 /// );
349 /// ```
350 #[inline]
351 pub fn write(&self, request: WriteRequest) -> FrozenResult<WriteTicket> {
352 let _io_lock = self.core.acquire_shared_io_lock();
353 if let Some(frozen_error) = self.core.completion.error.get() {
354 return Err(frozen_error);
355 }
356
357 let epoch = self.core.increment_current_epoch();
358 let internal_req = WriteRequestInternal { request, epoch };
359 self.core.queue.push(internal_req);
360
361 Ok(WriteTicket { epoch, completion: self.core.completion.clone() })
362 }
363}
364
365impl Drop for WritePipe {
366 fn drop(&mut self) {
367 self.core.closed.store(true, atomic::Ordering::Release);
368 self.core.flush_cv.notify_one();
369
370 if let Some(handle) = self.flush_tx_handle.take() {
371 let _ = handle.join();
372 }
373 }
374}
375
376/// A write operation submitted to [`WritePipe`]
377///
378/// The request contains the buffers to persist along with the destination slot index in the
379/// underlying [`FrozenFile`].
380///
381/// ## Example
382///
383/// ```
384/// use frozen_core::{bufpool, utils, wpipe};
385/// use std::ptr;
386///
387/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
388///
389/// let pool_cfg = bufpool::BufPoolCfg {
390/// buffer_size: utils::BufferSize::S128,
391/// max_memory: 0x400 * BUFFER_SIZE as usize,
392/// };
393/// let pool = bufpool::BufPool::new(pool_cfg);
394///
395/// let payload = vec![0x0A; BUFFER_SIZE as usize];
396/// let allocation = pool.allocate(1);
397///
398/// unsafe {ptr::copy_nonoverlapping(
399/// payload.as_ptr(),
400/// allocation.first(),
401/// payload.len()
402/// )};
403///
404/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
405/// assert!(request.slot_index >= 0);
406/// ```
407#[derive(Debug)]
408pub struct WriteRequest {
409 /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
410 ///
411 /// ## Example
412 ///
413 /// ```
414 /// use frozen_core::{bufpool, utils, wpipe};
415 /// use std::ptr;
416 ///
417 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
418 ///
419 /// let pool_cfg = bufpool::BufPoolCfg {
420 /// buffer_size: utils::BufferSize::S128,
421 /// max_memory: 0x400 * BUFFER_SIZE as usize,
422 /// };
423 /// let pool = bufpool::BufPool::new(pool_cfg);
424 ///
425 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
426 /// let allocation = pool.allocate(1);
427 ///
428 /// unsafe {ptr::copy_nonoverlapping(
429 /// payload.as_ptr(),
430 /// allocation.first(),
431 /// payload.len()
432 /// )};
433 ///
434 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
435 /// assert!(!request.allocation.first().is_null());
436 /// ```
437 pub allocation: bufpool::BufPoolAllocation,
438
439 /// Destination slot index where the pages of the allocation will be written from
440 ///
441 /// ## Example
442 ///
443 /// ```
444 /// use frozen_core::{bufpool, utils, wpipe};
445 /// use std::ptr;
446 ///
447 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
448 ///
449 /// let pool_cfg = bufpool::BufPoolCfg {
450 /// buffer_size: utils::BufferSize::S128,
451 /// max_memory: 0x400 * BUFFER_SIZE as usize,
452 /// };
453 /// let pool = bufpool::BufPool::new(pool_cfg);
454 ///
455 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
456 /// let allocation = pool.allocate(1);
457 ///
458 /// unsafe {ptr::copy_nonoverlapping(
459 /// payload.as_ptr(),
460 /// allocation.first(),
461 /// payload.len()
462 /// )};
463 ///
464 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
465 /// assert!(request.slot_index >= 0);
466 /// ```
467 pub slot_index: usize,
468}
469
470/// Durability handle associated with the submitted write operation via [`WritePipe::write`]
471///
472/// ## Epoch
473///
474/// Every ticket is assigned a monotonically increasing epoch to moniter durability
475///
476/// ## Durability Guarantee
477///
478/// If wanted, the ticket could be awaited to poll till the epoch becomes durable.
479///
480/// Once a await on ticket is completed successfully, all writes assigned to earlier epochs are
481/// also guaranteed to be durable.
482///
483/// *NOTE:* Using `await` is optional. Callers that only require fire-and-forget semantics may
484/// simply discard the returned ticket.
485///
486/// ## Example
487///
488/// ```
489/// use frozen_core::{bufpool, ffile, utils, wpipe};
490/// use std::{ptr, sync, time};
491///
492/// const MODULE_ID: u8 = 0x00;
493/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
494///
495/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
496/// let path = dir.path().join("wpipe_example");
497///
498/// let file_cfg = ffile::FrozenFileCfg {
499/// path,
500/// module_id: MODULE_ID,
501/// initial_available_buffers: 0x400,
502/// buffer_size: BUFFER_SIZE as usize,
503/// };
504/// let file = sync::Arc::new(
505/// ffile::FrozenFile::new(file_cfg)
506/// .expect("file creation should succeed"),
507/// );
508///
509/// let pool_cfg = bufpool::BufPoolCfg {
510/// buffer_size: utils::BufferSize::S128,
511/// max_memory: 0x400 * BUFFER_SIZE as usize,
512/// };
513/// let pool = bufpool::BufPool::new(pool_cfg);
514///
515/// let pipe_cfg = wpipe::WritePipeCfg {
516/// module_id: MODULE_ID,
517/// flush_duration: time::Duration::from_millis(1),
518/// };
519/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
520/// .expect("pipe creation should succeed");
521///
522/// let payload = vec![0x0A; BUFFER_SIZE as usize];
523/// let allocation = pool.allocate(1);
524///
525/// unsafe {ptr::copy_nonoverlapping(
526/// payload.as_ptr(),
527/// allocation.first(),
528/// payload.len()
529/// )};
530///
531/// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
532/// .unwrap();
533/// let epoch = ticket.epoch();
534///
535/// assert!(epoch > 0);
536/// ```
537#[derive(Debug)]
538pub struct WriteTicket {
539 epoch: u64,
540 completion: sync::Arc<Completion>,
541}
542
543impl WriteTicket {
544 /// Read assigned durability epoch for the [`WriteTicket`]
545 ///
546 /// ## Example
547 ///
548 /// ```
549 /// use frozen_core::{bufpool, ffile, utils, wpipe};
550 /// use std::{ptr, sync, time};
551 ///
552 /// const MODULE_ID: u8 = 0x00;
553 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
554 ///
555 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
556 /// let path = dir.path().join("wpipe_example");
557 ///
558 /// let file_cfg = ffile::FrozenFileCfg {
559 /// path,
560 /// module_id: MODULE_ID,
561 /// initial_available_buffers: 0x400,
562 /// buffer_size: BUFFER_SIZE as usize,
563 /// };
564 /// let file = sync::Arc::new(
565 /// ffile::FrozenFile::new(file_cfg)
566 /// .expect("file creation should succeed"),
567 /// );
568 ///
569 /// let pool_cfg = bufpool::BufPoolCfg {
570 /// buffer_size: utils::BufferSize::S128,
571 /// max_memory: 0x400 * BUFFER_SIZE as usize,
572 /// };
573 /// let pool = bufpool::BufPool::new(pool_cfg);
574 ///
575 /// let pipe_cfg = wpipe::WritePipeCfg {
576 /// module_id: MODULE_ID,
577 /// flush_duration: time::Duration::from_millis(1),
578 /// };
579 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
580 /// .expect("pipe creation should succeed");
581 ///
582 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
583 /// let allocation = pool.allocate(1);
584 ///
585 /// unsafe {ptr::copy_nonoverlapping(
586 /// payload.as_ptr(),
587 /// allocation.first(),
588 /// payload.len()
589 /// )};
590 ///
591 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
592 /// .unwrap();
593 /// let epoch = ticket.epoch();
594 ///
595 /// assert!(epoch > 0);
596 /// ```
597 pub const fn epoch(&self) -> u64 {
598 self.epoch
599 }
600
601 #[inline]
602 fn is_ready(&self) -> bool {
603 let durable = self.completion.durable_epoch.load(atomic::Ordering::Acquire);
604 if hints::likely(durable >= self.epoch) {
605 return true;
606 }
607
608 false
609 }
610}
611
612impl std::future::Future for WriteTicket {
613 type Output = FrozenResult<u64>;
614
615 #[inline(always)]
616 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
617 if self.is_ready() {
618 return std::task::Poll::Ready(Ok(self.epoch));
619 }
620
621 if let Some(frozen_error) = self.completion.error.get() {
622 return std::task::Poll::Ready(Err(frozen_error));
623 }
624
625 self.completion.waker.register(cx.waker());
626
627 if self.is_ready() {
628 return std::task::Poll::Ready(Ok(self.epoch));
629 }
630
631 if let Some(frozen_error) = self.completion.error.get() {
632 return std::task::Poll::Ready(Err(frozen_error));
633 }
634
635 std::task::Poll::Pending
636 }
637}
638
639/// ## Why we ignore [`std::sync::PoisonError`]?
640///
641/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
642/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
643/// and are completely seperated from the mutex.
644///
645/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
646/// an inconsistent state of the protected value. Since no state can be left partially modified
647/// under this lock, there is no possible consistency risk to recover from and propagating the
648/// poison error would only introduce unnecessary failures into the allocation path.
649///
650/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
651/// with the recovered guard.
652fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
653 let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
654 loop {
655 (guard, _) = core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
656
657 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
658 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
659
660 let queued_ops = core.queue.drain();
661 let closed = core.closed.load(atomic::Ordering::Acquire);
662
663 if queued_ops.is_empty() {
664 if closed {
665 return;
666 }
667
668 continue;
669 }
670
671 // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
672 // while sync is in progress
673 let _io_lock = core.acquire_exclusive_io_lock();
674
675 let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
676 Ok(res) => res,
677 Err(new_error) => {
678 core.completion.error.set(new_error);
679 core.completion.waker.wake();
680 drop(_io_lock);
681
682 continue;
683 }
684 };
685
686 // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
687 #[cfg(target_os = "linux")]
688 if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
689 core.completion.error.set(new_error);
690 core.completion.waker.wake();
691 drop(_io_lock);
692
693 continue;
694 }
695
696 // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
697 // clear it up when another call succeeds.
698 //
699 // This is valid as the underlying sync flushes entire batch all at once, hence even if the
700 // last call failed, and the new one succeeded, we do get the durability guarantee for the
701 // old data as well.
702
703 if let Err(new_error) = core.file.sync() {
704 core.completion.error.set(new_error);
705 core.completion.waker.wake();
706 drop(_io_lock);
707
708 continue;
709 } else {
710 core.completion.mark_epoch_as_durable(max_epoch);
711 core.completion.error.del();
712 }
713 }
714}
715
716#[derive(Debug)]
717struct Core {
718 completion: sync::Arc<Completion>,
719 closed: atomic::AtomicBool,
720 epoch: atomic::AtomicU64,
721 file: sync::Arc<ffile::FrozenFile>,
722 flush_cv: sync::Condvar,
723 flush_guard: sync::Mutex<()>,
724 io_lock: sync::RwLock<()>,
725 queue: mpscq::MPSCQueue<WriteRequestInternal>,
726}
727
728impl Core {
729 fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
730 Self {
731 file,
732 completion: sync::Arc::new(Completion::default()),
733 closed: atomic::AtomicBool::new(false),
734 epoch: atomic::AtomicU64::new(0),
735 flush_cv: sync::Condvar::new(),
736 flush_guard: sync::Mutex::new(()),
737 io_lock: sync::RwLock::new(()),
738 queue: mpscq::MPSCQueue::default(),
739 }
740 }
741
742 #[inline]
743 fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
744 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
745 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
746 }
747
748 #[inline]
749 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
750 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
751 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
752 }
753
754 #[inline(always)]
755 fn write_queued_ops(&self, queued_ops: Vec<WriteRequestInternal>) -> FrozenResult<(usize, usize, u64)> {
756 let mut max_epoch = 0;
757 let mut max_index = 0;
758 let mut min_index = usize::MAX;
759
760 for op in queued_ops {
761 let ops_len = op.request.allocation.length();
762 match ops_len {
763 1 => {
764 self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
765 }
766 _ => {
767 let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
768 self.file.pwritev(&bufs, op.request.slot_index)?;
769 }
770 }
771
772 max_epoch = max_epoch.max(op.epoch);
773 min_index = min_index.min(op.request.slot_index);
774 max_index = max_index.max(op.request.slot_index + ops_len);
775 }
776
777 Ok((min_index, max_index, max_epoch))
778 }
779
780 #[inline]
781 fn increment_current_epoch(&self) -> u64 {
782 self.epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
783 }
784}
785
786#[derive(Debug)]
787struct WriteRequestInternal {
788 epoch: u64,
789 request: WriteRequest,
790}
791
792#[derive(Debug)]
793struct Completion {
794 durable_epoch: atomic::AtomicU64,
795 error: FlushError,
796 waker: futures::task::AtomicWaker,
797}
798
799impl Default for Completion {
800 fn default() -> Self {
801 Self {
802 durable_epoch: atomic::AtomicU64::new(0),
803 waker: futures::task::AtomicWaker::new(),
804 error: FlushError::default(),
805 }
806 }
807}
808
809impl Completion {
810 fn mark_epoch_as_durable(&self, epoch: u64) {
811 self.durable_epoch.store(epoch, atomic::Ordering::Release);
812 self.waker.wake();
813 }
814}
815
816#[derive(Debug)]
817struct FlushError(atomic::AtomicPtr<FrozenError>);
818
819impl Default for FlushError {
820 fn default() -> Self {
821 Self(atomic::AtomicPtr::new(ptr::null_mut()))
822 }
823}
824
825impl Drop for FlushError {
826 fn drop(&mut self) {
827 let err_ptr = self.0.load(atomic::Ordering::Acquire);
828 if !err_ptr.is_null() {
829 let _ = unsafe { Box::from_raw(err_ptr) };
830 }
831 }
832}
833
834impl FlushError {
835 #[inline]
836 fn get(&self) -> Option<FrozenError> {
837 let curr_err = self.0.load(atomic::Ordering::Acquire);
838 if hints::unlikely(!curr_err.is_null()) {
839 let frozen_error = unsafe { (*curr_err).clone() };
840 return Some(frozen_error);
841 }
842
843 None
844 }
845
846 #[inline]
847 fn set(&self, new_error: FrozenError) {
848 let boxed_error = Box::into_raw(Box::new(new_error));
849 let old_err = self.0.swap(boxed_error, atomic::Ordering::AcqRel);
850
851 if hints::unlikely(!old_err.is_null()) {
852 let _ = unsafe { Box::from_raw(old_err) };
853 }
854 }
855
856 #[inline]
857 fn del(&self) {
858 let old_err = self.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
859 if hints::unlikely(!old_err.is_null()) {
860 let _ = unsafe { Box::from_raw(old_err) };
861 }
862 }
863}
864
865mod err {
866 use crate::error::{ErrCode, FrozenError};
867
868 /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
869 const DOMAIN_ID: u8 = 0x02;
870
871 #[inline]
872 pub fn new_error<E: std::fmt::Display>(module_id: u8, code: ErrCode, observed_error: E) -> FrozenError {
873 FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
874 }
875
876 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
877}
878
879#[cfg(test)]
880mod tests {
881 use super::*;
882 use crate::utils::BufferSize;
883
884 const MODULE_ID: u8 = 0x00;
885 const BUFFER_SIZE: BufferSize = BufferSize::S128;
886 const INITIAL_BUFFER_AMOUT: usize = 0x200;
887 const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
888
889 fn new_objects<P: AsRef<std::path::Path>>(path: P) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
890 let file_cfg = ffile::FrozenFileCfg {
891 module_id: MODULE_ID,
892 path: path.as_ref().to_path_buf(),
893 buffer_size: BUFFER_SIZE as usize,
894 initial_available_buffers: INITIAL_BUFFER_AMOUT,
895 };
896 let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
897
898 let pool_cfg =
899 bufpool::BufPoolCfg { buffer_size: BUFFER_SIZE, max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize };
900 let pool = bufpool::BufPool::new(pool_cfg);
901
902 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
903 let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
904
905 (file, pool, pipe)
906 }
907
908 fn prep_write(buf_ptr: *const u8, n: usize, pool: &bufpool::BufPool) -> bufpool::BufPoolAllocation {
909 let allocation = pool.allocate(n);
910 for allocated_buf in allocation.iter() {
911 unsafe { ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
912 }
913
914 allocation
915 }
916
917 fn compare_with_readback(
918 buf: &[u8],
919 read_index: usize,
920 required: usize,
921 pool: &bufpool::BufPool,
922 file: &ffile::FrozenFile,
923 ) {
924 let read_allocation = pool.allocate(required);
925 let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
926
927 file.preadv(&read_bufs, read_index).unwrap();
928
929 for read_buf in read_allocation.iter() {
930 let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
931 assert_eq!(buf, observed);
932 }
933 }
934
935 mod lifecycle {
936 use super::*;
937
938 #[test]
939 fn ok_new() {
940 let dir = tempfile::tempdir().unwrap();
941 let path = dir.path().join("write_single");
942
943 let file_cfg = ffile::FrozenFileCfg {
944 path,
945 module_id: MODULE_ID,
946 buffer_size: BUFFER_SIZE as usize,
947 initial_available_buffers: INITIAL_BUFFER_AMOUT,
948 };
949 let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
950
951 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
952 assert!(WritePipe::new(pipe_cfg, file).is_ok());
953 }
954
955 #[test]
956 fn ok_drop() {
957 let dir = tempfile::tempdir().unwrap();
958 let path = dir.path().join("write_single");
959 let (_file, _, pipe) = new_objects(path);
960
961 drop(pipe);
962 }
963 }
964
965 mod shutdown {
966 use super::*;
967
968 #[test]
969 fn ok_drop_before_pending_write_call() {
970 let dir = tempfile::tempdir().unwrap();
971 let path = dir.path().join("write_single");
972 let (_file, pool, pipe) = new_objects(path);
973
974 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
975
976 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
977 let request = WriteRequest { allocation, slot_index: 0 };
978
979 assert!(pipe.write(request).is_ok());
980 drop(pipe);
981 }
982
983 #[test]
984 fn ok_drop_waits_for_pending_write_call() {
985 let dir = tempfile::tempdir().unwrap();
986 let path = dir.path().join("write_single");
987
988 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
989
990 // new + write + drop
991 {
992 let (_file, pool, pipe) = new_objects(path.clone());
993
994 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
995 let request = WriteRequest { allocation, slot_index: 0 };
996
997 assert!(pipe.write(request).is_ok());
998 drop(pipe);
999 }
1000
1001 // open + readback
1002 {
1003 let (file, pool, _) = new_objects(path);
1004 compare_with_readback(&BUFFER, 0, 1, &pool, &file);
1005 }
1006 }
1007
1008 #[test]
1009 fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
1010 let dir = tempfile::tempdir().unwrap();
1011 let path = dir.path().join("write_single");
1012 let (_file, pool, pipe) = new_objects(path);
1013
1014 for i in 0..INITIAL_BUFFER_AMOUT {
1015 let buffer = vec![i as u8; BUFFER_SIZE as usize];
1016 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1017 let request = WriteRequest { allocation, slot_index: 0 };
1018
1019 assert!(pipe.write(request).is_ok());
1020 }
1021
1022 drop(pipe);
1023 }
1024
1025 #[test]
1026 fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
1027 let dir = tempfile::tempdir().unwrap();
1028 let path = dir.path().join("write_single");
1029 let (_file, pool, pipe) = new_objects(path);
1030
1031 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1032
1033 let pipe = sync::Arc::new(pipe);
1034 let pipe2 = sync::Arc::clone(&pipe);
1035
1036 let handle = thread::spawn(move || {
1037 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1038 let request = WriteRequest { allocation, slot_index: 0 };
1039
1040 assert!(pipe2.write(request).is_ok());
1041 });
1042
1043 drop(pipe);
1044 handle.join().unwrap();
1045 }
1046 }
1047
1048 mod pipe_writes {
1049 use super::*;
1050
1051 #[test]
1052 fn ok_write() {
1053 let dir = tempfile::tempdir().unwrap();
1054 let path = dir.path().join("write_single");
1055 let (_file, pool, pipe) = new_objects(path);
1056
1057 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1058 let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
1059
1060 let request = WriteRequest { allocation, slot_index: 0 };
1061 assert!(pipe.write(request).is_ok());
1062 }
1063
1064 #[test]
1065 fn ok_write_epoch_is_monotonic() {
1066 let dir = tempfile::tempdir().unwrap();
1067 let path = dir.path().join("write_single");
1068 let (_file, pool, pipe) = new_objects(path);
1069
1070 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1071
1072 let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
1073 let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1074
1075 let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
1076 let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
1077
1078 let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
1079 let ticket3 = pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
1080
1081 assert!(ticket3.epoch() > ticket2.epoch());
1082 assert!(ticket2.epoch() > ticket1.epoch());
1083 }
1084 }
1085
1086 mod write_ticket {
1087 use super::*;
1088
1089 #[test]
1090 fn ok_readback_after_write_with_await() {
1091 let dir = tempfile::tempdir().unwrap();
1092 let path = dir.path().join("write_single");
1093 let (file, pool, pipe) = new_objects(path);
1094
1095 const REQUIRED: usize = 0x0A;
1096 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1097
1098 let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1099 let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
1100
1101 let ticket = pipe.write(request).unwrap();
1102 let ticket_epoch = ticket.epoch();
1103
1104 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1105 assert!(durable_epoch >= ticket_epoch);
1106
1107 compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
1108 }
1109
1110 #[test]
1111 fn ok_readback_after_batch_write() {
1112 let dir = tempfile::tempdir().unwrap();
1113 let path = dir.path().join("write_single");
1114 let (file, pool, pipe) = new_objects(path);
1115
1116 const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
1117 ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
1118 ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
1119 ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
1120 ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
1121 ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
1122 ];
1123
1124 let mut slot_index = 0;
1125 let mut latest_ticket = None;
1126
1127 for (buf, required) in BUFFERS {
1128 let allocation = prep_write(buf.as_ptr(), required, &pool);
1129 let request = WriteRequest { allocation, slot_index };
1130 let ticket = pipe.write(request).unwrap();
1131
1132 slot_index += required;
1133 latest_ticket = Some(ticket);
1134 }
1135
1136 assert!(latest_ticket.is_some());
1137
1138 if let Some(ticket) = latest_ticket {
1139 let ticket_epoch = ticket.epoch();
1140 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1141
1142 assert!(durable_epoch >= ticket_epoch);
1143 }
1144
1145 let mut read_index = 0;
1146 for (buf, required) in BUFFERS {
1147 compare_with_readback(&buf, read_index, required, &pool, &file);
1148 read_index += required;
1149 }
1150 }
1151
1152 #[test]
1153 fn ok_multiple_concurrent_awaits() {
1154 let dir = tempfile::tempdir().unwrap();
1155 let path = dir.path().join("write_single");
1156 let (_file, pool, pipe) = new_objects(path);
1157
1158 const REQUIRED: usize = 0x0A;
1159 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1160
1161 let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1162 let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1163
1164 let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1165 let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
1166
1167 let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
1168
1169 assert!(e1.is_ok());
1170 assert!(e2.is_ok());
1171 assert!(e2.unwrap() > e1.unwrap());
1172 }
1173
1174 #[test]
1175 fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
1176 let dir = tempfile::tempdir().unwrap();
1177 let path = dir.path().join("durability_boundary");
1178
1179 let (file, pool, pipe) = new_objects(path);
1180
1181 const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1182 const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
1183 const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
1184
1185 let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
1186 let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
1187
1188 let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
1189 let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
1190
1191 let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
1192 let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
1193
1194 let epoch_a = ticket_a.epoch();
1195 let epoch_b = ticket_b.epoch();
1196 let epoch_c = ticket_c.epoch();
1197
1198 let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
1199 assert!(durable_epoch >= epoch_c);
1200 assert!(durable_epoch >= epoch_b);
1201 assert!(durable_epoch >= epoch_a);
1202
1203 compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
1204 compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
1205 compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
1206 }
1207 }
1208
1209 mod concurrency {
1210 use super::*;
1211
1212 #[test]
1213 fn ok_multi_threaded_writers() {
1214 const THREADS: usize = 4;
1215 const WRITES_PER_THREAD: usize = 0x40;
1216 const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
1217
1218 let dir = tempfile::tempdir().unwrap();
1219 let path = dir.path().join("multi_threaded_writers");
1220
1221 let (_file, pool, pipe) = new_objects(path);
1222
1223 let pipe = sync::Arc::new(pipe);
1224 let pool = sync::Arc::new(pool);
1225
1226 let mut handles = Vec::with_capacity(THREADS);
1227 for tid in 0..THREADS {
1228 let pipe = sync::Arc::clone(&pipe);
1229 let pool = sync::Arc::clone(&pool);
1230
1231 handles.push(thread::spawn(move || {
1232 let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1233
1234 for i in 0..WRITES_PER_THREAD {
1235 let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1236 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1237 let slot_index = tid * WRITES_PER_THREAD + i;
1238 let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1239
1240 tickets.push(ticket);
1241 }
1242
1243 tickets
1244 }));
1245 }
1246
1247 let mut tickets = Vec::new();
1248 for handle in handles {
1249 tickets.extend(handle.join().unwrap());
1250 }
1251 assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1252
1253 let mut epochs: Vec<u64> = tickets.iter().map(WriteTicket::epoch).collect();
1254 epochs.sort_unstable();
1255
1256 for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1257 assert_eq!(ed, observed);
1258 }
1259
1260 let latest_ticket = tickets.into_iter().max_by_key(WriteTicket::epoch).unwrap();
1261 let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1262 assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1263 }
1264 }
1265}