Skip to main content

WritePipe

Struct WritePipe 

Source
pub struct WritePipe { /* private fields */ }
Expand description

A low latency asynchronous write pipeline for buffer based storage

§Design

By design, every write call is fire-and-forget, i.e. the call is immediately returned after pushing the bytes to be written into the MPSC queue.

The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common hard sync right after. This provides durability for all the writes submitted within the same WritePipeCfg::flush_duration batching window.

§Example

use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};

const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;

let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");

let file_cfg = ffile::FrozenFileCfg {
    path,
    module_id: MODULE_ID,
    initial_available_buffers: 0x400,
    buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
    ffile::FrozenFile::new(file_cfg)
        .expect("file creation should succeed"),
);

let pool_cfg = bufpool::BufPoolCfg {
    buffer_size: utils::BufferSize::S128,
    max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);

let pipe_cfg = wpipe::WritePipeCfg {
    module_id: MODULE_ID,
    flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
    .expect("pipe creation should succeed");

let payload = [0xAAu8; BUFFER_SIZE as usize];

let mut latest_ticket = None;
for slot_index in 0..3 {
    let allocation = pool.allocate(1);

    unsafe {
        ptr::copy_nonoverlapping(
            payload.as_ptr(),
            allocation.first(),
            payload.len(),
        );
    }

    let ticket = pipe
        .write(wpipe::WriteRequest {
            allocation,
            slot_index,
        })
        .expect("write should succeed");

    latest_ticket = Some(ticket);
}

let durable_epoch = futures::executor::block_on(
    latest_ticket.expect("ticket should exist"),
)
.expect("writes should become durable");

assert!(durable_epoch >= 3);

Implementations§

Source§

impl WritePipe

Source

pub fn new(cfg: WritePipeCfg, file: Arc<FrozenFile>) -> FrozenResult<Self>

Create a new instance of WritePipe

§Example
use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};

const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;

let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");

let file_cfg = ffile::FrozenFileCfg {
    path,
    module_id: MODULE_ID,
    initial_available_buffers: 0x400,
    buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
    ffile::FrozenFile::new(file_cfg)
        .expect("file creation should succeed"),
);

let pool_cfg = bufpool::BufPoolCfg {
    buffer_size: utils::BufferSize::S128,
    max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);

let pipe_cfg = wpipe::WritePipeCfg {
    module_id: MODULE_ID,
    flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
    .expect("pipe creation should succeed");

let payload = vec![0x0A; BUFFER_SIZE as usize];
let allocation = pool.allocate(1);

unsafe {ptr::copy_nonoverlapping(
    payload.as_ptr(),
    allocation.first(),
    payload.len()
)};

let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});

assert!(
    futures::executor::block_on(ticket.expect("ticket should exist"))
    .is_ok()
);
Source

pub fn write(&self, request: WriteRequest) -> FrozenResult<AckTicket>

Push a write into WritePipe

Every write call is fire-and-forget for the caller by default, unless the caller choose to wait for durability using the manual await on [WriteTicket].

§Example
use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};

const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;

let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");

let file_cfg = ffile::FrozenFileCfg {
    path,
    module_id: MODULE_ID,
    initial_available_buffers: 0x400,
    buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
    ffile::FrozenFile::new(file_cfg)
        .expect("file creation should succeed"),
);

let pool_cfg = bufpool::BufPoolCfg {
    buffer_size: utils::BufferSize::S128,
    max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);

let pipe_cfg = wpipe::WritePipeCfg {
    module_id: MODULE_ID,
    flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
    .expect("pipe creation should succeed");

let payload = vec![0x0A; BUFFER_SIZE as usize];
let allocation = pool.allocate(1);

unsafe {ptr::copy_nonoverlapping(
    payload.as_ptr(),
    allocation.first(),
    payload.len()
)};

let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});

assert!(
    futures::executor::block_on(ticket.expect("ticket should exist"))
    .is_ok()
);

Trait Implementations§

Source§

impl Debug for WritePipe

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Drop for WritePipe

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more
Source§

impl Send for WritePipe

Source§

impl Sync for WritePipe

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.