Expand description
A low latency asynchronous write pipeline for buffer based storage
§Design
By design, every write call is fire-and-forget, i.e. the call is immediately returned after pushing the bytes to be written into the MPSC queue.
The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
hard sync right after. This provides durability for all the writes submitted within the same
WritePipeCfg::flush_duration batching window.
§Benchmarks
Observed measurements for latency (both single and multi threaded),
| Metric | 1 Thread (µs) | 4 Threads (µs) |
|---|---|---|
| P50 | 0.091 | 0.275 |
| P90 | 0.092 | 0.458 |
| P99 | 0.825 | 0.917 |
| Mean | 1.185 | 3.857 |
Environment used for benching,
- OS: NixOS (WSL2)
- Architecture: x86_64
- Memory: 8 GiB RAM (DDR4)
- Rust: rustc 1.86.0 w/ cargo 1.86.0
- Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
- CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
§Example
use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};
const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");
let file_cfg = ffile::FrozenFileCfg {
path,
module_id: MODULE_ID,
initial_available_buffers: 0x400,
buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
ffile::FrozenFile::new(file_cfg)
.expect("file creation should succeed"),
);
let pool_cfg = bufpool::BufPoolCfg {
buffer_size: utils::BufferSize::S128,
max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);
let pipe_cfg = wpipe::WritePipeCfg {
module_id: MODULE_ID,
flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
.expect("pipe creation should succeed");
let payload = [0xAAu8; BUFFER_SIZE as usize];
let mut latest_ticket = None;
for slot_index in 0..3 {
let allocation = pool.allocate(1);
unsafe {
ptr::copy_nonoverlapping(
payload.as_ptr(),
allocation.first(),
payload.len(),
);
}
let ticket = pipe
.write(wpipe::WriteRequest {
allocation,
slot_index,
})
.expect("write should succeed");
latest_ticket = Some(ticket);
}
let durable_epoch = futures::executor::block_on(
latest_ticket.expect("ticket should exist"),
)
.expect("writes should become durable");
assert!(durable_epoch >= 3);Structs§
- Write
Pipe - A low latency asynchronous write pipeline for buffer based storage
- Write
Pipe Cfg - All the available configurations for
WritePipe - Write
Request - A write operation submitted to
WritePipe