frozen_core/wpipe.rs
1//! A low latency asynchronous write pipeline for buffer based storage
2//!
3//! ## Design
4//!
5//! By design, every write call is fire-and-forget, i.e. the call is immediately returned after
6//! pushing the bytes to be written into the MPSC queue.
7//!
8//! The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
9//! hard sync right after. This provides durability for all the writes submitted within the same
10//! [`WritePipeCfg::flush_duration`] batching window.
11//!
12//! ## Benchmarks
13//!
14//! Observed measurements for latency (both single and multi threaded),
15//!
16//! | Metric | 1 Thread (µs) | 4 Threads (µs) |
17//! |:-------|:--------------|:---------------|
18//! | P50 | 0.091 | 0.275 |
19//! | P90 | 0.092 | 0.458 |
20//! | P99 | 0.825 | 0.917 |
21//! | Mean | 1.185 | 3.857 |
22//!
23//! Environment used for benching,
24//!
25//! * OS: NixOS (WSL2)
26//! * Architecture: x86_64
27//! * Memory: 8 GiB RAM (DDR4)
28//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
29//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
30//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
31//!
32//! ## Example
33//!
34//! ```
35//! use frozen_core::{bufpool, ffile, utils, wpipe};
36//! use std::{ptr, sync, time};
37//!
38//! const MODULE_ID: u8 = 0x00;
39//! const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
40//!
41//! let dir = tempfile::tempdir().expect("tempdir creation should succeed");
42//! let path = dir.path().join("wpipe_example");
43//!
44//! let file_cfg = ffile::FrozenFileCfg {
45//! path,
46//! module_id: MODULE_ID,
47//! initial_available_buffers: 0x400,
48//! buffer_size: BUFFER_SIZE as usize,
49//! };
50//! let file = sync::Arc::new(
51//! ffile::FrozenFile::new(file_cfg)
52//! .expect("file creation should succeed"),
53//! );
54//!
55//! let pool_cfg = bufpool::BufPoolCfg {
56//! buffer_size: utils::BufferSize::S128,
57//! max_memory: 0x400 * BUFFER_SIZE as usize,
58//! };
59//! let pool = bufpool::BufPool::new(pool_cfg);
60//!
61//! let pipe_cfg = wpipe::WritePipeCfg {
62//! module_id: MODULE_ID,
63//! flush_duration: time::Duration::from_millis(1),
64//! };
65//! let pipe = wpipe::WritePipe::new(pipe_cfg, file)
66//! .expect("pipe creation should succeed");
67//!
68//! let payload = [0xAAu8; BUFFER_SIZE as usize];
69//!
70//! let mut latest_ticket = None;
71//! for slot_index in 0..3 {
72//! let allocation = pool.allocate(1);
73//!
74//! unsafe {
75//! ptr::copy_nonoverlapping(
76//! payload.as_ptr(),
77//! allocation.first(),
78//! payload.len(),
79//! );
80//! }
81//!
82//! let ticket = pipe
83//! .write(wpipe::WriteRequest {
84//! allocation,
85//! slot_index,
86//! })
87//! .expect("write should succeed");
88//!
89//! latest_ticket = Some(ticket);
90//! }
91//!
92//! let durable_epoch = futures::executor::block_on(
93//! latest_ticket.expect("ticket should exist"),
94//! )
95//! .expect("writes should become durable");
96//!
97//! assert!(durable_epoch >= 3);
98//! ```
99
100use crate::{ack, bufpool, error::FrozenResult, ffile, mpscq};
101use std::{
102 sync::{self, atomic},
103 thread, time,
104};
105
106/// All the available configurations for [`WritePipe`]
107///
108/// ## Example
109///
110/// ```
111/// use frozen_core::wpipe::WritePipeCfg;
112///
113/// let cfg = WritePipeCfg {
114/// module_id: 2,
115/// flush_duration: std::time::Duration::from_millis(0x0A),
116/// };
117///
118/// assert_ne!(cfg.module_id, 0);
119/// assert_ne!(cfg.flush_duration.as_millis(), 0);
120/// ```
121#[derive(Debug, Clone)]
122pub struct WritePipeCfg {
123 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
124 pub module_id: u8,
125
126 /// Time interval used by the background thread to perform hard sync for all the write
127 /// operations submitted in the last durability window
128 pub flush_duration: time::Duration,
129}
130
131/// A low latency asynchronous write pipeline for buffer based storage
132///
133/// ## Design
134///
135/// By design, every write call is fire-and-forget, i.e. the call is immediately returned after
136/// pushing the bytes to be written into the MPSC queue.
137///
138/// The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
139/// hard sync right after. This provides durability for all the writes submitted within the same
140/// [`WritePipeCfg::flush_duration`] batching window.
141///
142/// ## Example
143///
144/// ```
145/// use frozen_core::{bufpool, ffile, utils, wpipe};
146/// use std::{ptr, sync, time};
147///
148/// const MODULE_ID: u8 = 0x00;
149/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
150///
151/// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
152/// let path = dir.path().join("wpipe_example");
153///
154/// let file_cfg = ffile::FrozenFileCfg {
155/// path,
156/// module_id: MODULE_ID,
157/// initial_available_buffers: 0x400,
158/// buffer_size: BUFFER_SIZE as usize,
159/// };
160/// let file = sync::Arc::new(
161/// ffile::FrozenFile::new(file_cfg)
162/// .expect("file creation should succeed"),
163/// );
164///
165/// let pool_cfg = bufpool::BufPoolCfg {
166/// buffer_size: utils::BufferSize::S128,
167/// max_memory: 0x400 * BUFFER_SIZE as usize,
168/// };
169/// let pool = bufpool::BufPool::new(pool_cfg);
170///
171/// let pipe_cfg = wpipe::WritePipeCfg {
172/// module_id: MODULE_ID,
173/// flush_duration: time::Duration::from_millis(1),
174/// };
175/// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
176/// .expect("pipe creation should succeed");
177///
178/// let payload = [0xAAu8; BUFFER_SIZE as usize];
179///
180/// let mut latest_ticket = None;
181/// for slot_index in 0..3 {
182/// let allocation = pool.allocate(1);
183///
184/// unsafe {
185/// ptr::copy_nonoverlapping(
186/// payload.as_ptr(),
187/// allocation.first(),
188/// payload.len(),
189/// );
190/// }
191///
192/// let ticket = pipe
193/// .write(wpipe::WriteRequest {
194/// allocation,
195/// slot_index,
196/// })
197/// .expect("write should succeed");
198///
199/// latest_ticket = Some(ticket);
200/// }
201///
202/// let durable_epoch = futures::executor::block_on(
203/// latest_ticket.expect("ticket should exist"),
204/// )
205/// .expect("writes should become durable");
206///
207/// assert!(durable_epoch >= 3);
208/// ```
209#[derive(Debug)]
210pub struct WritePipe {
211 core: sync::Arc<Core>,
212 flush_tx_handle: Option<thread::JoinHandle<()>>,
213}
214
215unsafe impl Send for WritePipe {}
216unsafe impl Sync for WritePipe {}
217
218impl WritePipe {
219 /// Create a new instance of [`WritePipe`]
220 ///
221 /// ## Example
222 ///
223 /// ```
224 /// use frozen_core::{bufpool, ffile, utils, wpipe};
225 /// use std::{ptr, sync, time};
226 ///
227 /// const MODULE_ID: u8 = 0x00;
228 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
229 ///
230 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
231 /// let path = dir.path().join("wpipe_example");
232 ///
233 /// let file_cfg = ffile::FrozenFileCfg {
234 /// path,
235 /// module_id: MODULE_ID,
236 /// initial_available_buffers: 0x400,
237 /// buffer_size: BUFFER_SIZE as usize,
238 /// };
239 /// let file = sync::Arc::new(
240 /// ffile::FrozenFile::new(file_cfg)
241 /// .expect("file creation should succeed"),
242 /// );
243 ///
244 /// let pool_cfg = bufpool::BufPoolCfg {
245 /// buffer_size: utils::BufferSize::S128,
246 /// max_memory: 0x400 * BUFFER_SIZE as usize,
247 /// };
248 /// let pool = bufpool::BufPool::new(pool_cfg);
249 ///
250 /// let pipe_cfg = wpipe::WritePipeCfg {
251 /// module_id: MODULE_ID,
252 /// flush_duration: time::Duration::from_millis(1),
253 /// };
254 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
255 /// .expect("pipe creation should succeed");
256 ///
257 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
258 /// let allocation = pool.allocate(1);
259 ///
260 /// unsafe {ptr::copy_nonoverlapping(
261 /// payload.as_ptr(),
262 /// allocation.first(),
263 /// payload.len()
264 /// )};
265 ///
266 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
267 ///
268 /// assert!(
269 /// futures::executor::block_on(ticket.expect("ticket should exist"))
270 /// .is_ok()
271 /// );
272 /// ```
273 #[inline]
274 pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
275 let core = sync::Arc::new(Core::new(file));
276 let cloned_core = core.clone();
277 let flush_tx_handle = match thread::Builder::new()
278 .name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
279 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
280 {
281 Ok(handle) => Some(handle),
282 Err(observed_error) => {
283 return Err(err::new_error(cfg.module_id, err::FXE, observed_error));
284 }
285 };
286
287 Ok(Self { core: core, flush_tx_handle })
288 }
289
290 /// Push a write into [`WritePipe`]
291 ///
292 /// Every write call is fire-and-forget for the caller by default, unless the caller choose to
293 /// wait for durability using the manual `await` on [`WriteTicket`].
294 ///
295 /// ## Example
296 ///
297 /// ```
298 /// use frozen_core::{bufpool, ffile, utils, wpipe};
299 /// use std::{ptr, sync, time};
300 ///
301 /// const MODULE_ID: u8 = 0x00;
302 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
303 ///
304 /// let dir = tempfile::tempdir().expect("tempdir creation should succeed");
305 /// let path = dir.path().join("wpipe_example");
306 ///
307 /// let file_cfg = ffile::FrozenFileCfg {
308 /// path,
309 /// module_id: MODULE_ID,
310 /// initial_available_buffers: 0x400,
311 /// buffer_size: BUFFER_SIZE as usize,
312 /// };
313 /// let file = sync::Arc::new(
314 /// ffile::FrozenFile::new(file_cfg)
315 /// .expect("file creation should succeed"),
316 /// );
317 ///
318 /// let pool_cfg = bufpool::BufPoolCfg {
319 /// buffer_size: utils::BufferSize::S128,
320 /// max_memory: 0x400 * BUFFER_SIZE as usize,
321 /// };
322 /// let pool = bufpool::BufPool::new(pool_cfg);
323 ///
324 /// let pipe_cfg = wpipe::WritePipeCfg {
325 /// module_id: MODULE_ID,
326 /// flush_duration: time::Duration::from_millis(1),
327 /// };
328 /// let pipe = wpipe::WritePipe::new(pipe_cfg, file)
329 /// .expect("pipe creation should succeed");
330 ///
331 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
332 /// let allocation = pool.allocate(1);
333 ///
334 /// unsafe {ptr::copy_nonoverlapping(
335 /// payload.as_ptr(),
336 /// allocation.first(),
337 /// payload.len()
338 /// )};
339 ///
340 /// let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
341 ///
342 /// assert!(
343 /// futures::executor::block_on(ticket.expect("ticket should exist"))
344 /// .is_ok()
345 /// );
346 /// ```
347 #[inline]
348 pub fn write(&self, request: WriteRequest) -> FrozenResult<ack::AckTicket> {
349 let _io_lock = self.core.acquire_shared_io_lock();
350 if let Some(frozen_error) = self.core.completion.get_err() {
351 return Err(frozen_error);
352 }
353
354 let epoch = self.core.completion.increment_current_epoch();
355 let internal_req = WriteRequestInternal { request, epoch };
356 self.core.queue.push(internal_req);
357
358 Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
359 }
360}
361
362impl Drop for WritePipe {
363 fn drop(&mut self) {
364 self.core.closed.store(true, atomic::Ordering::Release);
365 self.core.flush_cv.notify_one();
366
367 if let Some(handle) = self.flush_tx_handle.take() {
368 let _ = handle.join();
369 }
370 }
371}
372
373/// A write operation submitted to [`WritePipe`]
374///
375/// The request contains the buffers to persist along with the destination slot index in the
376/// underlying [`FrozenFile`].
377///
378/// ## Example
379///
380/// ```
381/// use frozen_core::{bufpool, utils, wpipe};
382/// use std::ptr;
383///
384/// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
385///
386/// let pool_cfg = bufpool::BufPoolCfg {
387/// buffer_size: utils::BufferSize::S128,
388/// max_memory: 0x400 * BUFFER_SIZE as usize,
389/// };
390/// let pool = bufpool::BufPool::new(pool_cfg);
391///
392/// let payload = vec![0x0A; BUFFER_SIZE as usize];
393/// let allocation = pool.allocate(1);
394///
395/// unsafe {ptr::copy_nonoverlapping(
396/// payload.as_ptr(),
397/// allocation.first(),
398/// payload.len()
399/// )};
400///
401/// let request = wpipe::WriteRequest {allocation, slot_index: 0};
402/// assert!(request.slot_index >= 0);
403/// ```
404#[derive(Debug)]
405pub struct WriteRequest {
406 /// Buffer allocation containing the pages to be written allocated using [`bufpool::BufPool`]
407 ///
408 /// ## Example
409 ///
410 /// ```
411 /// use frozen_core::{bufpool, utils, wpipe};
412 /// use std::ptr;
413 ///
414 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
415 ///
416 /// let pool_cfg = bufpool::BufPoolCfg {
417 /// buffer_size: utils::BufferSize::S128,
418 /// max_memory: 0x400 * BUFFER_SIZE as usize,
419 /// };
420 /// let pool = bufpool::BufPool::new(pool_cfg);
421 ///
422 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
423 /// let allocation = pool.allocate(1);
424 ///
425 /// unsafe {ptr::copy_nonoverlapping(
426 /// payload.as_ptr(),
427 /// allocation.first(),
428 /// payload.len()
429 /// )};
430 ///
431 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
432 /// assert!(!request.allocation.first().is_null());
433 /// ```
434 pub allocation: bufpool::BufPoolAllocation,
435
436 /// Destination slot index where the pages of the allocation will be written from
437 ///
438 /// ## Example
439 ///
440 /// ```
441 /// use frozen_core::{bufpool, utils, wpipe};
442 /// use std::ptr;
443 ///
444 /// const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
445 ///
446 /// let pool_cfg = bufpool::BufPoolCfg {
447 /// buffer_size: utils::BufferSize::S128,
448 /// max_memory: 0x400 * BUFFER_SIZE as usize,
449 /// };
450 /// let pool = bufpool::BufPool::new(pool_cfg);
451 ///
452 /// let payload = vec![0x0A; BUFFER_SIZE as usize];
453 /// let allocation = pool.allocate(1);
454 ///
455 /// unsafe {ptr::copy_nonoverlapping(
456 /// payload.as_ptr(),
457 /// allocation.first(),
458 /// payload.len()
459 /// )};
460 ///
461 /// let request = wpipe::WriteRequest {allocation, slot_index: 0};
462 /// assert!(request.slot_index >= 0);
463 /// ```
464 pub slot_index: usize,
465}
466
467/// ## Why we ignore [`std::sync::PoisonError`]?
468///
469/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
470/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
471/// and are completely seperated from the mutex.
472///
473/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
474/// an inconsistent state of the protected value. Since no state can be left partially modified
475/// under this lock, there is no possible consistency risk to recover from and propagating the
476/// poison error would only introduce unnecessary failures into the allocation path.
477///
478/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
479/// with the recovered guard.
480fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
481 let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
482 loop {
483 (guard, _) =
484 core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
485
486 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
487 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
488
489 let queued_ops = core.queue.drain();
490 let closed = core.closed.load(atomic::Ordering::Acquire);
491
492 if queued_ops.is_empty() {
493 if closed {
494 return;
495 }
496
497 continue;
498 }
499
500 // INFO: we must acquire an exclusive IO lock for sync, hence no write/read ops are allowed
501 // while sync is in progress
502 let _io_lock = core.acquire_exclusive_io_lock();
503
504 let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
505 Ok(res) => res,
506 Err(new_error) => {
507 core.completion.set_err(new_error);
508 core.completion.notify_all_listeners();
509 drop(_io_lock);
510
511 continue;
512 }
513 };
514
515 // NOTE: On linux, we can initiate writeback (best-effort only) for a given range
516 #[cfg(target_os = "linux")]
517 if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
518 core.completion.set_err(new_error);
519 core.completion.notify_all_listeners();
520 drop(_io_lock);
521
522 continue;
523 }
524
525 // NOTE: If the sync fails, we update the Core::error w/ the received error object. We
526 // clear it up when another call succeeds.
527 //
528 // This is valid as the underlying sync flushes entire batch all at once, hence even if the
529 // last call failed, and the new one succeeded, we do get the durability guarantee for the
530 // old data as well.
531
532 if let Err(new_error) = core.file.sync() {
533 core.completion.set_err(new_error);
534 drop(_io_lock);
535
536 continue;
537 } else {
538 core.completion.mark_epoch_as_durable(max_epoch);
539 core.completion.del_err();
540 }
541
542 core.completion.notify_all_listeners();
543 }
544}
545
546#[derive(Debug)]
547struct Core {
548 completion: sync::Arc<ack::Completion>,
549 closed: atomic::AtomicBool,
550 file: sync::Arc<ffile::FrozenFile>,
551 flush_cv: sync::Condvar,
552 flush_guard: sync::Mutex<()>,
553 io_lock: sync::RwLock<()>,
554 queue: mpscq::MPSCQueue<WriteRequestInternal>,
555}
556
557impl Core {
558 fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
559 Self {
560 file,
561 completion: sync::Arc::new(ack::Completion::default()),
562 closed: atomic::AtomicBool::new(false),
563 flush_cv: sync::Condvar::new(),
564 flush_guard: sync::Mutex::new(()),
565 io_lock: sync::RwLock::new(()),
566 queue: mpscq::MPSCQueue::default(),
567 }
568 }
569
570 #[inline]
571 fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
572 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
573 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
574 }
575
576 #[inline]
577 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
578 // NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
579 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
580 }
581
582 #[inline(always)]
583 fn write_queued_ops(
584 &self,
585 queued_ops: Vec<WriteRequestInternal>,
586 ) -> FrozenResult<(usize, usize, u64)> {
587 let mut max_epoch = 0;
588 let mut max_index = 0;
589 let mut min_index = usize::MAX;
590
591 for op in queued_ops {
592 let ops_len = op.request.allocation.length();
593 match ops_len {
594 1 => {
595 self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
596 }
597 _ => {
598 let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
599 self.file.pwritev(&bufs, op.request.slot_index)?;
600 }
601 }
602
603 max_epoch = max_epoch.max(op.epoch);
604 min_index = min_index.min(op.request.slot_index);
605 max_index = max_index.max(op.request.slot_index + ops_len);
606 }
607
608 Ok((min_index, max_index, max_epoch))
609 }
610}
611
612#[derive(Debug)]
613struct WriteRequestInternal {
614 epoch: u64,
615 request: WriteRequest,
616}
617
618mod err {
619 use crate::error::{ErrCode, FrozenError};
620
621 /// Domain ID for [`wpipe`] module is `0x02` used while propagating errors
622 const DOMAIN_ID: u8 = 0x02;
623
624 #[inline]
625 pub fn new_error<E: std::fmt::Display>(
626 module_id: u8,
627 code: ErrCode,
628 observed_error: E,
629 ) -> FrozenError {
630 FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
631 }
632
633 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use crate::utils::BufferSize;
640
641 const MODULE_ID: u8 = 0x00;
642 const BUFFER_SIZE: BufferSize = BufferSize::S128;
643 const INITIAL_BUFFER_AMOUT: usize = 0x200;
644 const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
645
646 fn new_objects<P: AsRef<std::path::Path>>(
647 path: P,
648 ) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
649 let file_cfg = ffile::FrozenFileCfg {
650 module_id: MODULE_ID,
651 path: path.as_ref().to_path_buf(),
652 buffer_size: BUFFER_SIZE as usize,
653 initial_available_buffers: INITIAL_BUFFER_AMOUT,
654 };
655 let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
656
657 let pool_cfg = bufpool::BufPoolCfg {
658 buffer_size: BUFFER_SIZE,
659 max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize,
660 };
661 let pool = bufpool::BufPool::new(pool_cfg);
662
663 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
664 let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
665
666 (file, pool, pipe)
667 }
668
669 fn prep_write(
670 buf_ptr: *const u8,
671 n: usize,
672 pool: &bufpool::BufPool,
673 ) -> bufpool::BufPoolAllocation {
674 let allocation = pool.allocate(n);
675 for allocated_buf in allocation.iter() {
676 unsafe { std::ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
677 }
678
679 allocation
680 }
681
682 fn compare_with_readback(
683 buf: &[u8],
684 read_index: usize,
685 required: usize,
686 pool: &bufpool::BufPool,
687 file: &ffile::FrozenFile,
688 ) {
689 let read_allocation = pool.allocate(required);
690 let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
691
692 file.preadv(&read_bufs, read_index).unwrap();
693
694 for read_buf in read_allocation.iter() {
695 let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
696 assert_eq!(buf, observed);
697 }
698 }
699
700 mod lifecycle {
701 use super::*;
702
703 #[test]
704 fn ok_new() {
705 let dir = tempfile::tempdir().unwrap();
706 let path = dir.path().join("write_single");
707
708 let file_cfg = ffile::FrozenFileCfg {
709 path,
710 module_id: MODULE_ID,
711 buffer_size: BUFFER_SIZE as usize,
712 initial_available_buffers: INITIAL_BUFFER_AMOUT,
713 };
714 let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
715
716 let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
717 assert!(WritePipe::new(pipe_cfg, file).is_ok());
718 }
719
720 #[test]
721 fn ok_drop() {
722 let dir = tempfile::tempdir().unwrap();
723 let path = dir.path().join("write_single");
724 let (_file, _, pipe) = new_objects(path);
725
726 drop(pipe);
727 }
728 }
729
730 mod shutdown {
731 use super::*;
732
733 #[test]
734 fn ok_drop_before_pending_write_call() {
735 let dir = tempfile::tempdir().unwrap();
736 let path = dir.path().join("write_single");
737 let (_file, pool, pipe) = new_objects(path);
738
739 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
740
741 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
742 let request = WriteRequest { allocation, slot_index: 0 };
743
744 assert!(pipe.write(request).is_ok());
745 drop(pipe);
746 }
747
748 #[test]
749 fn ok_drop_waits_for_pending_write_call() {
750 let dir = tempfile::tempdir().unwrap();
751 let path = dir.path().join("write_single");
752
753 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
754
755 // new + write + drop
756 {
757 let (_file, pool, pipe) = new_objects(path.clone());
758
759 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
760 let request = WriteRequest { allocation, slot_index: 0 };
761
762 assert!(pipe.write(request).is_ok());
763 drop(pipe);
764 }
765
766 // open + readback
767 {
768 let (file, pool, _) = new_objects(path);
769 compare_with_readback(&BUFFER, 0, 1, &pool, &file);
770 }
771 }
772
773 #[test]
774 fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
775 let dir = tempfile::tempdir().unwrap();
776 let path = dir.path().join("write_single");
777 let (_file, pool, pipe) = new_objects(path);
778
779 for i in 0..INITIAL_BUFFER_AMOUT {
780 let buffer = vec![i as u8; BUFFER_SIZE as usize];
781 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
782 let request = WriteRequest { allocation, slot_index: 0 };
783
784 assert!(pipe.write(request).is_ok());
785 }
786
787 drop(pipe);
788 }
789
790 #[test]
791 fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
792 let dir = tempfile::tempdir().unwrap();
793 let path = dir.path().join("write_single");
794 let (_file, pool, pipe) = new_objects(path);
795
796 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
797
798 let pipe = sync::Arc::new(pipe);
799 let pipe2 = sync::Arc::clone(&pipe);
800
801 let handle = thread::spawn(move || {
802 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
803 let request = WriteRequest { allocation, slot_index: 0 };
804
805 assert!(pipe2.write(request).is_ok());
806 });
807
808 drop(pipe);
809 handle.join().unwrap();
810 }
811 }
812
813 mod pipe_writes {
814 use super::*;
815
816 #[test]
817 fn ok_write() {
818 let dir = tempfile::tempdir().unwrap();
819 let path = dir.path().join("write_single");
820 let (_file, pool, pipe) = new_objects(path);
821
822 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
823 let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
824
825 let request = WriteRequest { allocation, slot_index: 0 };
826 assert!(pipe.write(request).is_ok());
827 }
828
829 #[test]
830 fn ok_write_epoch_is_monotonic() {
831 let dir = tempfile::tempdir().unwrap();
832 let path = dir.path().join("write_single");
833 let (_file, pool, pipe) = new_objects(path);
834
835 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
836
837 let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
838 let ticket1 =
839 pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
840
841 let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
842 let ticket2 =
843 pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
844
845 let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
846 let ticket3 =
847 pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
848
849 assert!(ticket3.epoch() > ticket2.epoch());
850 assert!(ticket2.epoch() > ticket1.epoch());
851 }
852 }
853
854 mod write_ticket {
855 use super::*;
856
857 #[test]
858 fn ok_readback_after_write_with_await() {
859 let dir = tempfile::tempdir().unwrap();
860 let path = dir.path().join("write_single");
861 let (file, pool, pipe) = new_objects(path);
862
863 const REQUIRED: usize = 0x0A;
864 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
865
866 let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
867 let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
868
869 let ticket = pipe.write(request).unwrap();
870 let ticket_epoch = ticket.epoch();
871
872 let durable_epoch = futures::executor::block_on(ticket).unwrap();
873 assert!(durable_epoch >= ticket_epoch);
874
875 compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
876 }
877
878 #[test]
879 fn ok_readback_after_batch_write() {
880 let dir = tempfile::tempdir().unwrap();
881 let path = dir.path().join("write_single");
882 let (file, pool, pipe) = new_objects(path);
883
884 const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
885 ([0x0Au8; BUFFER_SIZE as usize], 0x1A),
886 ([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
887 ([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
888 ([0x0Du8; BUFFER_SIZE as usize], 0x1D),
889 ([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
890 ];
891
892 let mut slot_index = 0;
893 let mut latest_ticket = None;
894
895 for (buf, required) in BUFFERS {
896 let allocation = prep_write(buf.as_ptr(), required, &pool);
897 let request = WriteRequest { allocation, slot_index };
898 let ticket = pipe.write(request).unwrap();
899
900 slot_index += required;
901 latest_ticket = Some(ticket);
902 }
903
904 assert!(latest_ticket.is_some());
905
906 if let Some(ticket) = latest_ticket {
907 let ticket_epoch = ticket.epoch();
908 let durable_epoch = futures::executor::block_on(ticket).unwrap();
909
910 assert!(durable_epoch >= ticket_epoch);
911 }
912
913 let mut read_index = 0;
914 for (buf, required) in BUFFERS {
915 compare_with_readback(&buf, read_index, required, &pool, &file);
916 read_index += required;
917 }
918 }
919
920 #[test]
921 fn ok_multiple_concurrent_awaits() {
922 let dir = tempfile::tempdir().unwrap();
923 let path = dir.path().join("write_single");
924 let (_file, pool, pipe) = new_objects(path);
925
926 const REQUIRED: usize = 0x0A;
927 const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
928
929 let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
930 let ticket1 =
931 pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
932
933 let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
934 let ticket2 =
935 pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
936
937 let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
938
939 assert!(e1.is_ok());
940 assert!(e2.is_ok());
941 assert!(e2.unwrap() > e1.unwrap());
942 }
943
944 #[test]
945 fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
946 let dir = tempfile::tempdir().unwrap();
947 let path = dir.path().join("durability_boundary");
948
949 let (file, pool, pipe) = new_objects(path);
950
951 const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
952 const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
953 const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
954
955 let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
956 let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
957
958 let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
959 let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
960
961 let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
962 let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
963
964 let epoch_a = ticket_a.epoch();
965 let epoch_b = ticket_b.epoch();
966 let epoch_c = ticket_c.epoch();
967
968 let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
969 assert!(durable_epoch >= epoch_c);
970 assert!(durable_epoch >= epoch_b);
971 assert!(durable_epoch >= epoch_a);
972
973 compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
974 compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
975 compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
976 }
977 }
978
979 mod concurrency {
980 use super::*;
981
982 #[test]
983 fn ok_multi_threaded_writers() {
984 const THREADS: usize = 4;
985 const WRITES_PER_THREAD: usize = 0x40;
986 const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
987
988 let dir = tempfile::tempdir().unwrap();
989 let path = dir.path().join("multi_threaded_writers");
990
991 let (_file, pool, pipe) = new_objects(path);
992
993 let pipe = sync::Arc::new(pipe);
994 let pool = sync::Arc::new(pool);
995
996 let mut handles = Vec::with_capacity(THREADS);
997 for tid in 0..THREADS {
998 let pipe = sync::Arc::clone(&pipe);
999 let pool = sync::Arc::clone(&pool);
1000
1001 handles.push(thread::spawn(move || {
1002 let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
1003
1004 for i in 0..WRITES_PER_THREAD {
1005 let buffer = vec![tid as u8; BUFFER_SIZE as usize];
1006 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1007 let slot_index = tid * WRITES_PER_THREAD + i;
1008 let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
1009
1010 tickets.push(ticket);
1011 }
1012
1013 tickets
1014 }));
1015 }
1016
1017 let mut tickets = Vec::new();
1018 for handle in handles {
1019 tickets.extend(handle.join().unwrap());
1020 }
1021 assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
1022
1023 let mut epochs: Vec<u64> = tickets.iter().map(ack::AckTicket::epoch).collect();
1024 epochs.sort_unstable();
1025
1026 for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
1027 assert_eq!(ed, observed);
1028 }
1029
1030 let latest_ticket = tickets.into_iter().max_by_key(ack::AckTicket::epoch).unwrap();
1031 let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
1032 assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
1033 }
1034 }
1035
1036 mod parallel_listeners {
1037 use super::*;
1038
1039 #[test]
1040 fn ok_many_parallel_waiters_same_durability_window() {
1041 const WAITERS: usize = 0x20;
1042 const BUFFER: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
1043
1044 let dir = tempfile::tempdir().unwrap();
1045 let path = dir.path().join("parallel_waiters");
1046
1047 let (_file, pool, pipe) = new_objects(path);
1048
1049 let mut tickets = Vec::with_capacity(WAITERS);
1050 for i in 0..WAITERS {
1051 let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
1052 let ticket = pipe.write(WriteRequest { allocation, slot_index: i }).unwrap();
1053
1054 tickets.push(ticket);
1055 }
1056
1057 let mut handles = Vec::with_capacity(WAITERS);
1058 for ticket in tickets {
1059 handles.push(thread::spawn(move || {
1060 let epoch = ticket.epoch();
1061 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1062
1063 assert!(durable_epoch >= epoch);
1064 }));
1065 }
1066
1067 for handle in handles {
1068 handle.join().unwrap();
1069 }
1070 }
1071
1072 #[test]
1073 fn ok_parallel_waiters_multiple_batches() {
1074 const THREADS: usize = 0x0A;
1075 const WRITES: usize = 0x10;
1076
1077 let dir = tempfile::tempdir().unwrap();
1078 let path = dir.path().join("parallel_batches");
1079
1080 let (_file, pool, pipe) = new_objects(path);
1081
1082 let pipe = sync::Arc::new(pipe);
1083 let pool = sync::Arc::new(pool);
1084
1085 let mut handles = Vec::new();
1086 for tid in 0..THREADS {
1087 let pipe = pipe.clone();
1088 let pool = pool.clone();
1089
1090 handles.push(thread::spawn(move || {
1091 let buffer = [tid as u8; BUFFER_SIZE as usize];
1092
1093 for i in 0..WRITES {
1094 let allocation = prep_write(buffer.as_ptr(), 1, &pool);
1095 let ticket = pipe
1096 .write(WriteRequest { allocation, slot_index: tid * WRITES + i })
1097 .unwrap();
1098
1099 futures::executor::block_on(ticket).unwrap();
1100 }
1101 }));
1102 }
1103
1104 for handle in handles {
1105 handle.join().unwrap();
1106 }
1107 }
1108 }
1109}