Skip to main content

Module wpipe

Module wpipe 

Source
Expand description

A low latency asynchronous write pipeline for buffer based storage

§Design

By design, every write call is fire-and-forget, i.e. the call is immediately returned after pushing the bytes to be written into the MPSC queue.

The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common hard sync right after. This provides durability for all the writes submitted within the same WritePipeCfg::flush_duration batching window.

§Benchmarks

Observed measurements for latency (both single and multi threaded),

Metric1 Thread (µs)4 Threads (µs)
P500.0910.275
P900.0920.458
P990.8250.917
Mean1.1853.857

Environment used for benching,

  • OS: NixOS (WSL2)
  • Architecture: x86_64
  • Memory: 8 GiB RAM (DDR4)
  • Rust: rustc 1.86.0 w/ cargo 1.86.0
  • Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
  • CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)

§Example

use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};

const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;

let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");

let file_cfg = ffile::FrozenFileCfg {
    path,
    module_id: MODULE_ID,
    initial_available_buffers: 0x400,
    buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
    ffile::FrozenFile::new(file_cfg)
        .expect("file creation should succeed"),
);

let pool_cfg = bufpool::BufPoolCfg {
    buffer_size: utils::BufferSize::S128,
    max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);

let pipe_cfg = wpipe::WritePipeCfg {
    module_id: MODULE_ID,
    flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
    .expect("pipe creation should succeed");

let payload = [0xAAu8; BUFFER_SIZE as usize];

let mut latest_ticket = None;
for slot_index in 0..3 {
    let allocation = pool.allocate(1);

    unsafe {
        ptr::copy_nonoverlapping(
            payload.as_ptr(),
            allocation.first(),
            payload.len(),
        );
    }

    let ticket = pipe
        .write(wpipe::WriteRequest {
            allocation,
            slot_index,
        })
        .expect("write should succeed");

    latest_ticket = Some(ticket);
}

let durable_epoch = futures::executor::block_on(
    latest_ticket.expect("ticket should exist"),
)
.expect("writes should become durable");

assert!(durable_epoch >= 3);

Structs§

WritePipe
A low latency asynchronous write pipeline for buffer based storage
WritePipeCfg
All the available configurations for WritePipe
WriteRequest
A write operation submitted to WritePipe