pub struct WritePipe { /* private fields */ }Expand description
A low latency asynchronous write pipeline for buffer based storage
§Design
By design, every write call is fire-and-forget, i.e. the call is immediately returned after pushing the bytes to be written into the MPSC queue.
The background thread pulls from MPSC queue and performs indivisual pwrite/v calls and a common
hard sync right after. This provides durability for all the writes submitted within the same
WritePipeCfg::flush_duration batching window.
§Example
use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};
const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");
let file_cfg = ffile::FrozenFileCfg {
path,
module_id: MODULE_ID,
initial_available_buffers: 0x400,
buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
ffile::FrozenFile::new(file_cfg)
.expect("file creation should succeed"),
);
let pool_cfg = bufpool::BufPoolCfg {
buffer_size: utils::BufferSize::S128,
max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);
let pipe_cfg = wpipe::WritePipeCfg {
module_id: MODULE_ID,
flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
.expect("pipe creation should succeed");
let payload = [0xAAu8; BUFFER_SIZE as usize];
let mut latest_ticket = None;
for slot_index in 0..3 {
let allocation = pool.allocate(1);
unsafe {
ptr::copy_nonoverlapping(
payload.as_ptr(),
allocation.first(),
payload.len(),
);
}
let ticket = pipe
.write(wpipe::WriteRequest {
allocation,
slot_index,
})
.expect("write should succeed");
latest_ticket = Some(ticket);
}
let durable_epoch = futures::executor::block_on(
latest_ticket.expect("ticket should exist"),
)
.expect("writes should become durable");
assert!(durable_epoch >= 3);Implementations§
Source§impl WritePipe
impl WritePipe
Sourcepub fn new(cfg: WritePipeCfg, file: Arc<FrozenFile>) -> FrozenResult<Self>
pub fn new(cfg: WritePipeCfg, file: Arc<FrozenFile>) -> FrozenResult<Self>
Create a new instance of WritePipe
§Example
use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};
const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");
let file_cfg = ffile::FrozenFileCfg {
path,
module_id: MODULE_ID,
initial_available_buffers: 0x400,
buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
ffile::FrozenFile::new(file_cfg)
.expect("file creation should succeed"),
);
let pool_cfg = bufpool::BufPoolCfg {
buffer_size: utils::BufferSize::S128,
max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);
let pipe_cfg = wpipe::WritePipeCfg {
module_id: MODULE_ID,
flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
.expect("pipe creation should succeed");
let payload = vec![0x0A; BUFFER_SIZE as usize];
let allocation = pool.allocate(1);
unsafe {ptr::copy_nonoverlapping(
payload.as_ptr(),
allocation.first(),
payload.len()
)};
let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
assert!(
futures::executor::block_on(ticket.expect("ticket should exist"))
.is_ok()
);Sourcepub fn write(&self, request: WriteRequest) -> FrozenResult<AckTicket>
pub fn write(&self, request: WriteRequest) -> FrozenResult<AckTicket>
Push a write into WritePipe
Every write call is fire-and-forget for the caller by default, unless the caller choose to
wait for durability using the manual await on [WriteTicket].
§Example
use frozen_core::{bufpool, ffile, utils, wpipe};
use std::{ptr, sync, time};
const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: utils::BufferSize = utils::BufferSize::S128;
let dir = tempfile::tempdir().expect("tempdir creation should succeed");
let path = dir.path().join("wpipe_example");
let file_cfg = ffile::FrozenFileCfg {
path,
module_id: MODULE_ID,
initial_available_buffers: 0x400,
buffer_size: BUFFER_SIZE as usize,
};
let file = sync::Arc::new(
ffile::FrozenFile::new(file_cfg)
.expect("file creation should succeed"),
);
let pool_cfg = bufpool::BufPoolCfg {
buffer_size: utils::BufferSize::S128,
max_memory: 0x400 * BUFFER_SIZE as usize,
};
let pool = bufpool::BufPool::new(pool_cfg);
let pipe_cfg = wpipe::WritePipeCfg {
module_id: MODULE_ID,
flush_duration: time::Duration::from_millis(1),
};
let pipe = wpipe::WritePipe::new(pipe_cfg, file)
.expect("pipe creation should succeed");
let payload = vec![0x0A; BUFFER_SIZE as usize];
let allocation = pool.allocate(1);
unsafe {ptr::copy_nonoverlapping(
payload.as_ptr(),
allocation.first(),
payload.len()
)};
let ticket = pipe.write(wpipe::WriteRequest {allocation, slot_index: 0});
assert!(
futures::executor::block_on(ticket.expect("ticket should exist"))
.is_ok()
);Trait Implementations§
impl Send for WritePipe
impl Sync for WritePipe
Auto Trait Implementations§
impl !RefUnwindSafe for WritePipe
impl !UnwindSafe for WritePipe
impl Freeze for WritePipe
impl Unpin for WritePipe
impl UnsafeUnpin for WritePipe
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more