pub struct FrozenPipe { /* private fields */ }Expand description
An high throughput asynchronous IO pipeline for chunk based storage, it uses batches to write requests and flushes them in the background, while providing durability guarantees via epochs
§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_pipe_write");
let file = FrozenFile::new(FFCfg {
mid: 0,
path,
chunk_size: 0x20,
initial_chunk_amount: 2,
}).unwrap();
let pool = BufPool::new(BPCfg {
mid: 0,
chunk_size: 0x20,
backend: BPBackend::Dynamic,
});
let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();
let buf = vec![0x3Bu8; 0x40];
let epoch = pipe.write(&buf, 0).unwrap();
pipe.wait_for_durability(epoch).unwrap();
let read = pipe.read(0, 2).unwrap();
assert_eq!(read, buf);Implementations§
Source§impl FrozenPipe
impl FrozenPipe
Sourcepub fn new(
file: FrozenFile,
pool: BufPool,
flush_duration: Duration,
) -> FrozenRes<Self>
pub fn new( file: FrozenFile, pool: BufPool, flush_duration: Duration, ) -> FrozenRes<Self>
Create a new instance of FrozenPipe
Sourcepub fn write(&self, buf: &[u8], index: usize) -> FrozenRes<u64>
pub fn write(&self, buf: &[u8], index: usize) -> FrozenRes<u64>
Submit a write request
Returns the epoch representing the durability window of the write
§Working
The buffer is split into chunk_size sized segments and staged using [BufPool] before being
written by the background flusher
§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_pipe_write");
let file = FrozenFile::new(FFCfg {
mid: 0,
path,
chunk_size: 0x20,
initial_chunk_amount: 2,
}).unwrap();
let pool = BufPool::new(BPCfg {
mid: 0,
chunk_size: 0x20,
backend: BPBackend::Dynamic,
});
let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();
let buf = vec![0x3Bu8; 0x40];
let epoch = pipe.write(&buf, 0).unwrap();
pipe.wait_for_durability(epoch).unwrap();
let read = pipe.read(0, 2).unwrap();
assert_eq!(read, buf);Sourcepub fn read_single(&self, index: usize) -> FrozenRes<Vec<u8>>
pub fn read_single(&self, index: usize) -> FrozenRes<Vec<u8>>
Read a single chunk from the given index
This function performs a blocking read operation
§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_read_single");
let file = FrozenFile::new(FFCfg {
path,
mid: 0,
chunk_size: 0x20,
initial_chunk_amount: 2,
}).unwrap();
let pool = BufPool::new(BPCfg {
mid: 0,
chunk_size: 0x20,
backend: BPBackend::Dynamic,
});
let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
let data = vec![0xAAu8; 0x20];
let epoch = pipe.write(&data, 0).unwrap();
pipe.wait_for_durability(epoch).unwrap();
let read = pipe.read_single(0).unwrap();
assert_eq!(read, data);Sourcepub fn read(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>>
pub fn read(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>>
Read count chunks starting from at the given index
This function performs a blocking read operation
§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_read_multi");
let file = FrozenFile::new(FFCfg {
path,
mid: 0,
chunk_size: 0x20,
initial_chunk_amount: 8,
}).unwrap();
let pool = BufPool::new(BPCfg {
mid: 0,
chunk_size: 0x20,
backend: BPBackend::Dynamic,
});
let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
let buf = vec![0xBBu8; 0x20 * 2];
let epoch = pipe.write(&buf, 0).unwrap();
pipe.wait_for_durability(epoch).unwrap();
let read = pipe.read(0, 2).unwrap();
assert_eq!(read, buf);Sourcepub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()>
pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()>
Blocks until given epoch becomes durable
Durability epochs increase when the background flusher successfully syncs the underlying [FrozenFile]
§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_wait");
let file = FrozenFile::new(FFCfg {
mid: 0,
path,
chunk_size: 0x20,
initial_chunk_amount: 2,
}).unwrap();
let pool = BufPool::new(BPCfg {
mid: 0,
chunk_size: 0x20,
backend: BPBackend::Dynamic,
});
let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
let buf = vec![1u8; 0x20];
let epoch = pipe.write(&buf, 0).unwrap();
pipe.wait_for_durability(epoch).unwrap();Sourcepub fn force_durability(&self, epoch: u64) -> FrozenRes<()>
pub fn force_durability(&self, epoch: u64) -> FrozenRes<()>
Force instant durability for the current batch
This wakes the flusher thread and waits for the durability epoch
§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_force");
let file = FrozenFile::new(FFCfg {
mid: 0,
path,
chunk_size: 0x20,
initial_chunk_amount: 2,
}).unwrap();
let pool = BufPool::new(BPCfg {
mid: 0,
chunk_size: 0x20,
backend: BPBackend::Dynamic,
});
let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
let buf = vec![0x0Au8; 0x20];
let epoch = pipe.write(&buf, 0).unwrap();
pipe.force_durability(epoch).unwrap();Sourcepub fn grow(&self, count: usize) -> FrozenRes<()>
pub fn grow(&self, count: usize) -> FrozenRes<()>
Grow the underlying [FrozenFile] by given count
The pipeline waits until all pending writes are flushed before extending the file
§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_grow");
let file = FrozenFile::new(FFCfg {
mid: 0,
path,
chunk_size: 0x20,
initial_chunk_amount: 2,
}).unwrap();
let pool = BufPool::new(BPCfg {
mid: 0,
chunk_size: 0x20,
backend: BPBackend::Dynamic,
});
let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
pipe.grow(4).unwrap();