frozen_core/wpipe.rs
1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50 | 0.091 | 0.275 |
19//! | P90 | 0.092 | 0.458 |
20//! | P99 | 0.825 | 0.917 |
21//! | Mean | 1.185 | 3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FFCfg {
45//! path,
46//! initial_chunk_amount: 0x400,
47//! chunk_size: BUFFER_SIZE as usize,
48//! };
49//! let file = sync::Arc::new(
50//! ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
51//! .expect("file creation should succeed"),
52//! );
53//!
54//! let pool_cfg = bufpool::BufPoolCfg {
55//! buffer_size: utils::BufferSize::S128,
56//! max_memory: 0x400 * BUFFER_SIZE as usize,
57//! };
58//! let pool = bufpool::BufPool::new(pool_cfg);
59//!
60//! let pipe_cfg = wpipe::WritePipeCfg {
61//! module_id: MODULE_ID,
62//! flush_duration: time::Duration::from_millis(1),
63//! };
64//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
65//! .expect("pipe creation should succeed");
66//!
67//! let payload = [0xAAu8; BUFFER_SIZE as usize];
68//!
69//! let mut latest_ticket = None;
70//! for slot_index in 0..3 {
71//! let allocation = pool.allocate(1);
72//!
73//! unsafe {
74//! ptr::copy_nonoverlapping(
75//! payload.as_ptr(),
76//! allocation.first(),
77//! payload.len(),
78//! );
79//! }
80//!
81//! let ticket = pipe
82//! .write(wpipe::WriteRequest {
83//! allocation,
84//! slot_index,
85//! })
86//! .expect("write should succeed");
87//!
88//! latest_ticket = Some(ticket);
89//! }
90//!
91//! let durable_epoch = futures::executor::block_on(
92//! latest_ticket.expect("ticket should exist"),
93//! )
94//! .expect("writes should become durable");
95//!
96//! assert!(durable_epoch >= 3);
97//! ```
98
99use crate::{
100 bufpool,
101 error::{FrozenError, FrozenResult},
102 ffile, hints, mpscq,
103};
104use std::{
105 ptr,
106 sync::{self, atomic},
107 thread, time,
108};
109
110/// All the available configurations for [`WritePipe`]
111///
112/// ## Example
113///
114/// ```
115/// use frozen_core::wpipe::WritePipeCfg;
116///
117/// let cfg = WritePipeCfg {
118/// module_id: 2,
119/// flush_duration: std::time::Duration::from_millis(0x0A),
120/// };
121///
122/// assert_ne!(cfg.module_id, 0);
123/// assert_ne!(cfg.flush_duration.as_millis(), 0);
124/// ```
125#[derive(Debug, Clone)]
126pub struct WritePipeCfg {
127 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
128 pub module_id: u8,
129
130 /// Time interval used by the background thread to perform hard sync for all the write
131 /// operations submitted in the last durability window
132 pub flush_duration: time::Duration,
133}
134
135/// A low latency asynchronous write pipeline for buffer based storage
136///
137/// ## Design
138///
139/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
140/// pushing the bytes to be written into the MPSC queue.
141///
142/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
143/// hard sync right after. This provides durability for all the writes submitted within the same
144/// [`WritePipeCfg::flush_duration`] batching window.
145///
146/// ## Example
147///
148/// ```
149/// use frozen_core::{bufpool, ffile, utils, wpipe};
150/// use std::{ptr, sync, time};
151///
152/// const MODULE_ID: u8 = 0x00;
153/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
154///
155/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
156/// let path = dir.path().join("wpipe_example");
157///
158/// let file_cfg = ffile::FFCfg {
159/// path,
160/// initial_chunk_amount: 0x400,
161/// chunk_size: BUFFER_SIZE as usize,
162/// };
163/// let file = sync::Arc::new(
164/// ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
165/// .expect("file creation should succeed"),
166/// );
167///
168/// let pool_cfg = bufpool::BufPoolCfg {
169/// buffer_size: utils::BufferSize::S128,
170/// max_memory: 0x400 * BUFFER_SIZE as usize,
171/// };
172/// let pool = bufpool::BufPool::new(pool_cfg);
173///
174/// let pipe_cfg = wpipe::WritePipeCfg {
175/// module_id: MODULE_ID,
176/// flush_duration: time::Duration::from_millis(1),
177/// };
178/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
179/// .expect("pipe creation should succeed");
180///
181/// let payload = [0xAAu8; BUFFER_SIZE as usize];
182///
183/// let mut latest_ticket = None;
184/// for slot_index in 0..3 {
185/// let allocation = pool.allocate(1);
186///
187/// unsafe {
188/// ptr::copy_nonoverlapping(
189/// payload.as_ptr(),
190/// allocation.first(),
191/// payload.len(),
192/// );
193/// }
194///
195/// let ticket = pipe
196/// .write(wpipe::WriteRequest {
197/// allocation,
198/// slot_index,
199/// })
200/// .expect("write should succeed");
201///
202/// latest_ticket = Some(ticket);
203/// }
204///
205/// let durable_epoch = futures::executor::block_on(
206/// latest_ticket.expect("ticket should exist"),
207/// )
208/// .expect("writes should become durable");
209///
210/// assert!(durable_epoch >= 3);
211/// ```
212#[derive(Debug)]
213pub struct WritePipe {
214 core: sync::Arc<Core>,
215 flush_tx_handle: Option<thread::JoinHandle<()>>,
216}
217
218unsafe impl Send for WritePipe {}
219unsafe impl Sync for WritePipe {}
220
221impl WritePipe {
222 /// Create a new instance of [`WritePipe`]
223 ///
224 /// ## Example
225 ///
226 /// ```
227 /// use frozen_core::{bufpool, ffile, utils, wpipe};
228 /// use std::{ptr, sync, time};
229 ///
230 /// const MODULE_ID: u8 = 0x00;
231 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
232 ///
233 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
234 /// let path = dir.path().join("wpipe_example");
235 ///
236 /// let file_cfg = ffile::FFCfg {
237 /// path,
238 /// initial_chunk_amount: 0x400,
239 /// chunk_size: BUFFER_SIZE as usize,
240 /// };
241 /// let file = sync::Arc::new(
242 /// ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
243 /// .expect("file creation should succeed"),
244 /// );
245 ///
246 /// let pool_cfg = bufpool::BufPoolCfg {
247 /// buffer_size: utils::BufferSize::S128,
248 /// max_memory: 0x400 * BUFFER_SIZE as usize,
249 /// };
250 /// let pool = bufpool::BufPool::new(pool_cfg);
251 ///
252 /// let pipe_cfg = wpipe::WritePipeCfg {
253 /// module_id: MODULE_ID,
254 /// flush_duration: time::Duration::from_millis(1),
255 /// };
256 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
257 /// .expect("pipe creation should succeed");
258 ///
259 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
260 /// let allocation = pool.allocate(1);
261 ///
262 /// unsafe {ptr::copy_nonoverlapping(
263 /// payload.as_ptr(),
264 /// allocation.first(),
265 /// payload.len()
266 /// )};
267 ///
268 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
269 ///
270 /// assert!(
271 /// futures::executor::block_on(ticket.expect("ticket should exist"))
272 /// .is_ok()
273 /// );
274 /// ```
275 #[inline]
276 pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
277 let core = sync::Arc::new(Core::new(file));
278 let cloned_core = core.clone();
279 let flush_tx_handle = match thread::Builder::new()
280 .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
281 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
282 {
283 Ok(handle) => Some(handle),
284 Err(observed_error) => return Err(err::new_error(cfg.module_id, err::FXE, observed_error)),
285 };
286
287 Ok(Self { core: core, flush_tx_handle })
288 }
289
290 /// Push a write into [`WritePipe`]
291 ///
292 /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
293 /// wait for durability using the manual `await` on [`WriteTicket`].
294 ///
295 /// ## Example
296 ///
297 /// ```
298 /// use frozen_core::{bufpool, ffile, utils, wpipe};
299 /// use std::{ptr, sync, time};
300 ///
301 /// const MODULE_ID: u8 = 0x00;
302 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
303 ///
304 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
305 /// let path = dir.path().join("wpipe_example");
306 ///
307 /// let file_cfg = ffile::FFCfg {
308 /// path,
309 /// initial_chunk_amount: 0x400,
310 /// chunk_size: BUFFER_SIZE as usize,
311 /// };
312 /// let file = sync::Arc::new(
313 /// ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
314 /// .expect("file creation should succeed"),
315 /// );
316 ///
317 /// let pool_cfg = bufpool::BufPoolCfg {
318 /// buffer_size: utils::BufferSize::S128,
319 /// max_memory: 0x400 * BUFFER_SIZE as usize,
320 /// };
321 /// let pool = bufpool::BufPool::new(pool_cfg);
322 ///
323 /// let pipe_cfg = wpipe::WritePipeCfg {
324 /// module_id: MODULE_ID,
325 /// flush_duration: time::Duration::from_millis(1),
326 /// };
327 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
328 /// .expect("pipe creation should succeed");
329 ///
330 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
331 /// let allocation = pool.allocate(1);
332 ///
333 /// unsafe {ptr::copy_nonoverlapping(
334 /// payload.as_ptr(),
335 /// allocation.first(),
336 /// payload.len()
337 /// )};
338 ///
339 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
340 ///
341 /// assert!(
342 /// futures::executor::block_on(ticket.expect("ticket should exist"))
343 /// .is_ok()
344 /// );
345 /// ```
346 #[inline]
347 pub fn write(&self, request: WriteRequest) -> FrozenResult<WriteTicket> {
348 let _io_lock = self.core.acquire_shared_io_lock();
349 if let Some(frozen_error) = self.core.completion.error.get() {
350 return Err(frozen_error);
351 }
352
353 let epoch = self.core.increment_current_epoch();
354 let internal_req = WriteRequestInternal { request, epoch };
355 self.core.queue.push(internal_req);
356
357 Ok(WriteTicket { epoch, completion: self.core.completion.clone() })
358 }
359}
360
361impl Drop for WritePipe {
362 fn drop(&mut self) {
363 self.core.closed.store(true, atomic::Ordering::Release);
364 self.core.flush_cv.notify_one();
365
366 if let Some(handle) = self.flush_tx_handle.take() {
367 let _ = handle.join();
368 }
369 }
370}
371
372/// A write operation submitted to [`WritePipe`]
373///
374/// The request contains the buffers to persist along with the destination slot index in the
375/// underlying [`FrozenFile`].
376///
377/// ## Example
378///
379/// ```
380/// use frozen_core::{bufpool, utils, wpipe};
381/// use std::ptr;
382///
383/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
384///
385/// let pool_cfg = bufpool::BufPoolCfg {
386/// buffer_size: utils::BufferSize::S128,
387/// max_memory: 0x400 * BUFFER_SIZE as usize,
388/// };
389/// let pool = bufpool::BufPool::new(pool_cfg);
390///
391/// let payload = vec![0x0A; BUFFER_SIZE as usize];
392/// let allocation = pool.allocate(1);
393///
394/// unsafe {ptr::copy_nonoverlapping(
395/// payload.as_ptr(),
396/// allocation.first(),
397/// payload.len()
398/// )};
399///
400/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
401/// assert!(request.slot_index >= 0);
402/// ```
403#[derive(Debug)]
404pub struct WriteRequest {
405 /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
406 ///
407 /// ## Example
408 ///
409 /// ```
410 /// use frozen_core::{bufpool, utils, wpipe};
411 /// use std::ptr;
412 ///
413 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
414 ///
415 /// let pool_cfg = bufpool::BufPoolCfg {
416 /// buffer_size: utils::BufferSize::S128,
417 /// max_memory: 0x400 * BUFFER_SIZE as usize,
418 /// };
419 /// let pool = bufpool::BufPool::new(pool_cfg);
420 ///
421 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
422 /// let allocation = pool.allocate(1);
423 ///
424 /// unsafe {ptr::copy_nonoverlapping(
425 /// payload.as_ptr(),
426 /// allocation.first(),
427 /// payload.len()
428 /// )};
429 ///
430 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
431 /// assert!(!request.allocation.first().is_null());
432 /// ```
433 pub allocation: bufpool::BufPoolAllocation,
434
435 /// Destination slot index where the pages of the allocation will be written from
436 ///
437 /// ## Example
438 ///
439 /// ```
440 /// use frozen_core::{bufpool, utils, wpipe};
441 /// use std::ptr;
442 ///
443 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
444 ///
445 /// let pool_cfg = bufpool::BufPoolCfg {
446 /// buffer_size: utils::BufferSize::S128,
447 /// max_memory: 0x400 * BUFFER_SIZE as usize,
448 /// };
449 /// let pool = bufpool::BufPool::new(pool_cfg);
450 ///
451 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
452 /// let allocation = pool.allocate(1);
453 ///
454 /// unsafe {ptr::copy_nonoverlapping(
455 /// payload.as_ptr(),
456 /// allocation.first(),
457 /// payload.len()
458 /// )};
459 ///
460 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
461 /// assert!(request.slot_index >= 0);
462 /// ```
463 pub slot_index: usize,
464}
465
466/// Durability handle associated with the submitted write operation via [`WritePipe::write`]
467///
468/// ## Epoch
469///
470/// Every ticket is assigned a monotonically increasing epoch to moniter durability
471///
472/// ## Durability Guarantee
473///
474/// If wanted, the ticket could be awaited to poll till the epoch becomes durable.
475///
476/// Once a await on ticket is completed successfully, all writes assigned to earlier epochs are
477/// also guaranteed to be durable.
478///
479/// *NOTE:* Using `await` is optional. Callers that only require fire-and-forget semantics may
480/// simply discard the returned ticket.
481///
482/// ## Example
483///
484/// ```
485/// use frozen_core::{bufpool, ffile, utils, wpipe};
486/// use std::{ptr, sync, time};
487///
488/// const MODULE_ID: u8 = 0x00;
489/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
490///
491/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
492/// let path = dir.path().join("wpipe_example");
493///
494/// let file_cfg = ffile::FFCfg {
495/// path,
496/// initial_chunk_amount: 0x400,
497/// chunk_size: BUFFER_SIZE as usize,
498/// };
499/// let file = sync::Arc::new(
500/// ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
501/// .expect("file creation should succeed"),
502/// );
503///
504/// let pool_cfg = bufpool::BufPoolCfg {
505/// buffer_size: utils::BufferSize::S128,
506/// max_memory: 0x400 * BUFFER_SIZE as usize,
507/// };
508/// let pool = bufpool::BufPool::new(pool_cfg);
509///
510/// let pipe_cfg = wpipe::WritePipeCfg {
511/// module_id: MODULE_ID,
512/// flush_duration: time::Duration::from_millis(1),
513/// };
514/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
515/// .expect("pipe creation should succeed");
516///
517/// let payload = vec![0x0A; BUFFER_SIZE as usize];
518/// let allocation = pool.allocate(1);
519///
520/// unsafe {ptr::copy_nonoverlapping(
521/// payload.as_ptr(),
522/// allocation.first(),
523/// payload.len()
524/// )};
525///
526/// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
527/// .unwrap();
528/// let epoch = ticket.epoch();
529///
530/// assert!(epoch > 0);
531/// ```
532#[derive(Debug)]
533pub struct WriteTicket {
534 epoch: u64,
535 completion: sync::Arc<Completion>,
536}
537
538impl WriteTicket {
539 /// Read assigned durability epoch for the [`WriteTicket`]
540 ///
541 /// ## Example
542 ///
543 /// ```
544 /// use frozen_core::{bufpool, ffile, utils, wpipe};
545 /// use std::{ptr, sync, time};
546 ///
547 /// const MODULE_ID: u8 = 0x00;
548 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
549 ///
550 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
551 /// let path = dir.path().join("wpipe_example");
552 ///
553 /// let file_cfg = ffile::FFCfg {
554 /// path,
555 /// initial_chunk_amount: 0x400,
556 /// chunk_size: BUFFER_SIZE as usize,
557 /// };
558 /// let file = sync::Arc::new(
559 /// ffile::FrozenFile::new::<MODULE_ID>(file_cfg)
560 /// .expect("file creation should succeed"),
561 /// );
562 ///
563 /// let pool_cfg = bufpool::BufPoolCfg {
564 /// buffer_size: utils::BufferSize::S128,
565 /// max_memory: 0x400 * BUFFER_SIZE as usize,
566 /// };
567 /// let pool = bufpool::BufPool::new(pool_cfg);
568 ///
569 /// let pipe_cfg = wpipe::WritePipeCfg {
570 /// module_id: MODULE_ID,
571 /// flush_duration: time::Duration::from_millis(1),
572 /// };
573 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
574 /// .expect("pipe creation should succeed");
575 ///
576 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
577 /// let allocation = pool.allocate(1);
578 ///
579 /// unsafe {ptr::copy_nonoverlapping(
580 /// payload.as_ptr(),
581 /// allocation.first(),
582 /// payload.len()
583 /// )};
584 ///
585 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0})
586 /// .unwrap();
587 /// let epoch = ticket.epoch();
588 ///
589 /// assert!(epoch > 0);
590 /// ```
591 pub const fn epoch(&self) -> u64 {
592 self.epoch
593 }
594
595 #[inline]
596 fn is_ready(&self) -> bool {
597 let durable = self.completion.durable_epoch.load(atomic::Ordering::Acquire);
598 if hints::likely(durable >= self.epoch) {
599 return true;
600 }
601
602 false
603 }
604}
605
606impl std::future::Future for WriteTicket {
607 type Output = FrozenResult<u64>;
608
609 #[inline(always)]
610 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
611 if self.is_ready() {
612 return std::task::Poll::Ready(Ok(self.epoch));
613 }
614
615 if let Some(frozen_error) = self.completion.error.get() {
616 return std::task::Poll::Ready(Err(frozen_error));
617 }
618
619 self.completion.waker.register(cx.waker());
620
621 if self.is_ready() {
622 return std::task::Poll::Ready(Ok(self.epoch));
623 }
624
625 if let Some(frozen_error) = self.completion.error.get() {
626 return std::task::Poll::Ready(Err(frozen_error));
627 }
628
629 std::task::Poll::Pending
630 }
631}
632
633/// ## Why we ignore [`std::sync::PoisonError`]?
634///
635/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not protect any mutable
636/// state. All the pool invariants and accounting are maintained via atomics and are completely seperated from
637/// the mutex.
638///
639/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates an inconsistent
640/// state of the protected value. Since no state can be left partially modified under this lock, there is no
641/// possible consistency risk to recover from and propagating the poison error would only introduce unnecessary
642/// failures into the allocation path.
643///
644/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating with the recovered
645/// guard.
646fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
647 let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
648 loop {
649 (guard, _) = core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
650
651 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
652 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
653
654 let queued_ops = core.queue.drain();
655 let closed = core.closed.load(atomic::Ordering::Acquire);
656
657 if queued_ops.is_empty() {
658 if closed {
659 return;
660 }
661
662 continue;
663 }
664
665 // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
666 // while sync is in progress
667 let _io_lock = core.acquire_exclusive_io_lock();
668
669 let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
670 Ok(res) => res,
671 Err(new_error) => {
672 core.completion.error.set(new_error);
673 core.completion.waker.wake();
674 drop(_io_lock);
675
676 continue;
677 }
678 };
679
680 // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
681 #[cfg(target_os = "linux")]
682 if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
683 core.completion.error.set(new_error);
684 core.completion.waker.wake();
685 drop(_io_lock);
686
687 continue;
688 }
689
690 // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
691 // clear it up when another call succeeds.
692 //
693 // This is valid as the underlying sync flushes entire batch all at once, hence even if the
694 // last call failed, and the new one succeeded, we do get the durability guarantee for the
695 // old data as well.
696
697 if let Err(new_error) = core.file.sync() {
698 core.completion.error.set(new_error);
699 core.completion.waker.wake();
700 drop(_io_lock);
701
702 continue;
703 } else {
704 core.completion.mark_epoch_as_durable(max_epoch);
705 core.completion.error.del();
706 }
707 }
708}
709
710#[derive(Debug)]
711struct Core {
712 completion: sync::Arc<Completion>,
713 closed: atomic::AtomicBool,
714 epoch: atomic::AtomicU64,
715 file: sync::Arc<ffile::FrozenFile>,
716 flush_cv: sync::Condvar,
717 flush_guard: sync::Mutex<()>,
718 io_lock: sync::RwLock<()>,
719 queue: mpscq::MPSCQueue<WriteRequestInternal>,
720}
721
722impl Core {
723 fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
724 Self {
725 file,
726 completion: sync::Arc::new(Completion::default()),
727 closed: atomic::AtomicBool::new(false),
728 epoch: atomic::AtomicU64::new(0),
729 flush_cv: sync::Condvar::new(),
730 flush_guard: sync::Mutex::new(()),
731 io_lock: sync::RwLock::new(()),
732 queue: mpscq::MPSCQueue::default(),
733 }
734 }
735
736 #[inline]
737 fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
738 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
739 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
740 }
741
742 #[inline]
743 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
744 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
745 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
746 }
747
748 #[inline(always)]
749 fn write_queued_ops(&self, queued_ops: Vec<WriteRequestInternal>) -> FrozenResult<(usize, usize, u64)> {
750 let mut max_epoch = 0;
751 let mut max_index = 0;
752 let mut min_index = usize::MAX;
753
754 for op in queued_ops {
755 let ops_len = op.request.allocation.length();
756 match ops_len {
757 1 => {
758 self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
759 }
760 _ => {
761 let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
762 self.file.pwritev(&bufs, op.request.slot_index)?;
763 }
764 }
765
766 max_epoch = max_epoch.max(op.epoch);
767 min_index = min_index.min(op.request.slot_index);
768 max_index = max_index.max(op.request.slot_index + ops_len);
769 }
770
771 Ok((min_index, max_index, max_epoch))
772 }
773
774 #[inline]
775 fn increment_current_epoch(&self) -> u64 {
776 self.epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
777 }
778}
779
780#[derive(Debug)]
781struct WriteRequestInternal {
782 epoch: u64,
783 request: WriteRequest,
784}
785
786#[derive(Debug)]
787struct Completion {
788 durable_epoch: atomic::AtomicU64,
789 error: FlushError,
790 waker: futures::task::AtomicWaker,
791}
792
793impl Default for Completion {
794 fn default() -> Self {
795 Self {
796 durable_epoch: atomic::AtomicU64::new(0),
797 waker: futures::task::AtomicWaker::new(),
798 error: FlushError::default(),
799 }
800 }
801}
802
803impl Completion {
804 fn mark_epoch_as_durable(&self, epoch: u64) {
805 self.durable_epoch.store(epoch, atomic::Ordering::Release);
806 self.waker.wake();
807 }
808}
809
810#[derive(Debug)]
811struct FlushError(atomic::AtomicPtr<FrozenError>);
812
813impl Default for FlushError {
814 fn default() -> Self {
815 Self(atomic::AtomicPtr::new(ptr::null_mut()))
816 }
817}
818
819impl Drop for FlushError {
820 fn drop(&mut self) {
821 let err_ptr = self.0.load(atomic::Ordering::Acquire);
822 if !err_ptr.is_null() {
823 let _ = unsafe { Box::from_raw(err_ptr) };
824 }
825 }
826}
827
828impl FlushError {
829 #[inline]
830 fn get(&self) -> Option<FrozenError> {
831 let curr_err = self.0.load(atomic::Ordering::Acquire);
832 if hints::unlikely(!curr_err.is_null()) {
833 let frozen_error = unsafe { (*curr_err).clone() };
834 return Some(frozen_error);
835 }
836
837 None
838 }
839
840 #[inline]
841 fn set(&self, new_error: FrozenError) {
842 let boxed_error = Box::into_raw(Box::new(new_error));
843 let old_err = self.0.swap(boxed_error, atomic::Ordering::AcqRel);
844
845 if hints::unlikely(!old_err.is_null()) {
846 let _ = unsafe { Box::from_raw(old_err) };
847 }
848 }
849
850 #[inline]
851 fn del(&self) {
852 let old_err = self.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
853 if hints::unlikely(!old_err.is_null()) {
854 let _ = unsafe { Box::from_raw(old_err) };
855 }
856 }
857}
858
859mod err {
860 use crate::error::{ErrCode, FrozenError};
861
862 /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
863 const DOMAIN_ID: u8 = 0x02;
864
865 #[inline]
866 pub fn new_error<E: std::fmt::Display>(module_id: u8, code: ErrCode, observed_error: E) -> FrozenError {
867 FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
868 }
869
870 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876 use crate::utils::BufferSize;
877
878 const MODULE_ID: u8 = 0x00;
879 const BUFFER_SIZE: BufferSize = BufferSize::S128;
880 const INITIAL_BUFFER_AMOUT: usize = 0x200;
881 const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
882
883 fn new_objects<P: AsRef<std::path::Path>>(path: P) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
884 let file_cfg = ffile::FFCfg {
885 path: path.as_ref().to_path_buf(),
886 chunk_size: BUFFER_SIZE as usize,
887 initial_chunk_amount: INITIAL_BUFFER_AMOUT,
888 };
889 let file = sync::Arc::new(ffile::FrozenFile::new::<MODULE_ID>(file_cfg).unwrap());
890
891 let pool_cfg =
892 bufpool::BufPoolCfg { buffer_size: BUFFER_SIZE, max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize };
893 let pool = bufpool::BufPool::new(pool_cfg);
894
895 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
896 let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
897
898 (file, pool, pipe)
899 }
900
901 fn prep_write(buf_ptr: *const u8, n: usize, pool: &bufpool::BufPool) -> bufpool::BufPoolAllocation {
902 let allocation = pool.allocate(n);
903 for allocated_buf in allocation.iter() {
904 unsafe { ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
905 }
906
907 allocation
908 }
909
910 fn compare_with_readback(
911 buf: &[u8],
912 read_index: usize,
913 required: usize,
914 pool: &bufpool::BufPool,
915 file: &ffile::FrozenFile,
916 ) {
917 let read_allocation = pool.allocate(required);
918 let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
919
920 file.preadv(&read_bufs, read_index).unwrap();
921
922 for read_buf in read_allocation.iter() {
923 let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
924 assert_eq!(buf, observed);
925 }
926 }
927
928 mod lifecycle {
929 use super::*;
930
931 #[test]
932 fn ok_new() {
933 let dir = tempfile::tempdir().unwrap();
934 let path = dir.path().join("write_single");
935
936 let file_cfg =
937 ffile::FFCfg { path, chunk_size: BUFFER_SIZE as usize, initial_chunk_amount: INITIAL_BUFFER_AMOUT };
938 let file = sync::Arc::new(ffile::FrozenFile::new::<MODULE_ID>(file_cfg).unwrap());
939
940 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
941 assert!(WritePipe::new(pipe_cfg, file).is_ok());
942 }
943
944 #[test]
945 fn ok_drop() {
946 let dir = tempfile::tempdir().unwrap();
947 let path = dir.path().join("write_single");
948 let (_file, _, pipe) = new_objects(path);
949
950 drop(pipe);
951 }
952 }
953
954 mod shutdown {
955 use super::*;
956
957 #[test]
958 fn ok_drop_before_pending_write_call() {
959 let dir = tempfile::tempdir().unwrap();
960 let path = dir.path().join("write_single");
961 let (_file, pool, pipe) = new_objects(path);
962
963 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
964
965 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
966 let request = WriteRequest { allocation, slot_index: 0 };
967
968 assert!(pipe.write(request).is_ok());
969 drop(pipe);
970 }
971
972 #[test]
973 fn ok_drop_waits_for_pending_write_call() {
974 let dir = tempfile::tempdir().unwrap();
975 let path = dir.path().join("write_single");
976
977 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
978
979 // new + write + drop
980 {
981 let (_file, pool, pipe) = new_objects(path.clone());
982
983 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
984 let request = WriteRequest { allocation, slot_index: 0 };
985
986 assert!(pipe.write(request).is_ok());
987 drop(pipe);
988 }
989
990 // open + readback
991 {
992 let (file, pool, _) = new_objects(path);
993 compare_with_readback(&BUFFER, 0, 1, &pool, &file);
994 }
995 }
996
997 #[test]
998 fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
999 let dir = tempfile::tempdir().unwrap();
1000 let path = dir.path().join("write_single");
1001 let (_file, pool, pipe) = new_objects(path);
1002
1003 for i in 0..INITIAL_BUFFER_AMOUT {
1004 let buffer = vec![i as u8; BUFFER_SIZE as usize];
1005 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1006 let request = WriteRequest { allocation, slot_index: 0 };
1007
1008 assert!(pipe.write(request).is_ok());
1009 }
1010
1011 drop(pipe);
1012 }
1013
1014 #[test]
1015 fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
1016 let dir = tempfile::tempdir().unwrap();
1017 let path = dir.path().join("write_single");
1018 let (_file, pool, pipe) = new_objects(path);
1019
1020 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
1021
1022 let pipe = sync::Arc::new(pipe);
1023 let pipe2 = sync::Arc::clone(&pipe);
1024
1025 let handle = thread::spawn(move || {
1026 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1027 let request = WriteRequest { allocation, slot_index: 0 };
1028
1029 assert!(pipe2.write(request).is_ok());
1030 });
1031
1032 drop(pipe);
1033 handle.join().unwrap();
1034 }
1035 }
1036
1037 mod pipe_writes {
1038 use super::*;
1039
1040 #[test]
1041 fn ok_write() {
1042 let dir = tempfile::tempdir().unwrap();
1043 let path = dir.path().join("write_single");
1044 let (_file, pool, pipe) = new_objects(path);
1045
1046 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1047 let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
1048
1049 let request = WriteRequest { allocation, slot_index: 0 };
1050 assert!(pipe.write(request).is_ok());
1051 }
1052
1053 #[test]
1054 fn ok_write_epoch_is_monotonic() {
1055 let dir = tempfile::tempdir().unwrap();
1056 let path = dir.path().join("write_single");
1057 let (_file, pool, pipe) = new_objects(path);
1058
1059 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1060
1061 let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
1062 let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1063
1064 let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
1065 let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
1066
1067 let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
1068 let ticket3 = pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
1069
1070 assert!(ticket3.epoch() > ticket2.epoch());
1071 assert!(ticket2.epoch() > ticket1.epoch());
1072 }
1073 }
1074
1075 mod write_ticket {
1076 use super::*;
1077
1078 #[test]
1079 fn ok_readback_after_write_with_await() {
1080 let dir = tempfile::tempdir().unwrap();
1081 let path = dir.path().join("write_single");
1082 let (file, pool, pipe) = new_objects(path);
1083
1084 const REQUIRED: usize = 0x0A;
1085 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1086
1087 let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1088 let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
1089
1090 let ticket = pipe.write(request).unwrap();
1091 let ticket_epoch = ticket.epoch();
1092
1093 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1094 assert!(durable_epoch >= ticket_epoch);
1095
1096 compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
1097 }
1098
1099 #[test]
1100 fn ok_readback_after_batch_write() {
1101 let dir = tempfile::tempdir().unwrap();
1102 let path = dir.path().join("write_single");
1103 let (file, pool, pipe) = new_objects(path);
1104
1105 const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
1106 ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
1107 ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
1108 ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
1109 ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
1110 ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
1111 ];
1112
1113 let mut slot_index = 0;
1114 let mut latest_ticket = None;
1115
1116 for (buf, required) in BUFFERS {
1117 let allocation = prep_write(buf.as_ptr(), required, &pool);
1118 let request = WriteRequest { allocation, slot_index };
1119 let ticket = pipe.write(request).unwrap();
1120
1121 slot_index += required;
1122 latest_ticket = Some(ticket);
1123 }
1124
1125 assert!(latest_ticket.is_some());
1126
1127 if let Some(ticket) = latest_ticket {
1128 let ticket_epoch = ticket.epoch();
1129 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1130
1131 assert!(durable_epoch >= ticket_epoch);
1132 }
1133
1134 let mut read_index = 0;
1135 for (buf, required) in BUFFERS {
1136 compare_with_readback(&buf, read_index, required, &pool, &file);
1137 read_index += required;
1138 }
1139 }
1140
1141 #[test]
1142 fn ok_multiple_concurrent_awaits() {
1143 let dir = tempfile::tempdir().unwrap();
1144 let path = dir.path().join("write_single");
1145 let (_file, pool, pipe) = new_objects(path);
1146
1147 const REQUIRED: usize = 0x0A;
1148 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
1149
1150 let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1151 let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
1152
1153 let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
1154 let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
1155
1156 let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
1157
1158 assert!(e1.is_ok());
1159 assert!(e2.is_ok());
1160 assert!(e2.unwrap() > e1.unwrap());
1161 }
1162
1163 #[test]
1164 fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
1165 let dir = tempfile::tempdir().unwrap();
1166 let path = dir.path().join("durability_boundary");
1167
1168 let (file, pool, pipe) = new_objects(path);
1169
1170 const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1171 const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
1172 const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
1173
1174 let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
1175 let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
1176
1177 let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
1178 let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
1179
1180 let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
1181 let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
1182
1183 let epoch_a = ticket_a.epoch();
1184 let epoch_b = ticket_b.epoch();
1185 let epoch_c = ticket_c.epoch();
1186
1187 let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
1188 assert!(durable_epoch >= epoch_c);
1189 assert!(durable_epoch >= epoch_b);
1190 assert!(durable_epoch >= epoch_a);
1191
1192 compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
1193 compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
1194 compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
1195 }
1196 }
1197
1198 mod concurrency {
1199 use super::*;
1200
1201 #[test]
1202 fn ok_multi_threaded_writers() {
1203 const THREADS: usize = 4;
1204 const WRITES_PER_THREAD: usize = 0x40;
1205 const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
1206
1207 let dir = tempfile::tempdir().unwrap();
1208 let path = dir.path().join("multi_threaded_writers");
1209
1210 let (_file, pool, pipe) = new_objects(path);
1211
1212 let pipe = sync::Arc::new(pipe);
1213 let pool = sync::Arc::new(pool);
1214
1215 let mut handles = Vec::with_capacity(THREADS);
1216 for tid in 0..THREADS {
1217 let pipe = sync::Arc::clone(&pipe);
1218 let pool = sync::Arc::clone(&pool);
1219
1220 handles.push(thread::spawn(move || {
1221 let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1222
1223 for i in 0..WRITES_PER_THREAD {
1224 let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1225 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1226 let slot_index = tid * WRITES_PER_THREAD + i;
1227 let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1228
1229 tickets.push(ticket);
1230 }
1231
1232 tickets
1233 }));
1234 }
1235
1236 let mut tickets = Vec::new();
1237 for handle in handles {
1238 tickets.extend(handle.join().unwrap());
1239 }
1240 assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1241
1242 let mut epochs: Vec<u64> = tickets.iter().map(WriteTicket::epoch).collect();
1243 epochs.sort_unstable();
1244
1245 for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1246 assert_eq!(ed, observed);
1247 }
1248
1249 let latest_ticket = tickets.into_iter().max_by_key(WriteTicket::epoch).unwrap();
1250 let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1251 assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1252 }
1253 }
1254}