Skip to main content

FrozenPipe

Struct FrozenPipe 

Source
pub struct FrozenPipe { /* private fields */ }
Expand description

An high throughput asynchronous IO pipeline for chunk based storage, it uses batches to write requests and flushes them in the background, while providing durability guarantees via epochs

§Example

use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_pipe_write");

let file = FrozenFile::new(FFCfg {
    mid: 0,
    path,
    chunk_size: 0x20,
    initial_chunk_amount: 2,
}).unwrap();

let pool = BufPool::new(BPCfg {
    mid: 0,
    chunk_size: 0x20,
    backend: BPBackend::Dynamic,
});

let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();

let buf = vec![0x3Bu8; 0x40];
let epoch = pipe.write(&buf, 0).unwrap();

pipe.wait_for_durability(epoch).unwrap();

let read = pipe.read(0, 2).unwrap();
assert_eq!(read, buf);

Implementations§

Source§

impl FrozenPipe

Source

pub fn new( file: FrozenFile, pool: BufPool, flush_duration: Duration, ) -> FrozenRes<Self>

Create a new instance of FrozenPipe

Source

pub fn write(&self, buf: &[u8], index: usize) -> FrozenRes<u64>

Submit a write request

Returns the epoch representing the durability window of the write

§Working

The buffer is split into chunk_size sized segments and staged using [BufPool] before being written by the background flusher

§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_pipe_write");

let file = FrozenFile::new(FFCfg {
    mid: 0,
    path,
    chunk_size: 0x20,
    initial_chunk_amount: 2,
}).unwrap();

let pool = BufPool::new(BPCfg {
    mid: 0,
    chunk_size: 0x20,
    backend: BPBackend::Dynamic,
});

let pipe = FrozenPipe::new(file, pool, Duration::from_micros(0x0A)).unwrap();

let buf = vec![0x3Bu8; 0x40];
let epoch = pipe.write(&buf, 0).unwrap();

pipe.wait_for_durability(epoch).unwrap();

let read = pipe.read(0, 2).unwrap();
assert_eq!(read, buf);
Source

pub fn read_single(&self, index: usize) -> FrozenRes<Vec<u8>>

Read a single chunk from the given index

This function performs a blocking read operation

§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_read_single");

let file = FrozenFile::new(FFCfg {
    path,
    mid: 0,
    chunk_size: 0x20,
    initial_chunk_amount: 2,
}).unwrap();

let pool = BufPool::new(BPCfg {
    mid: 0,
    chunk_size: 0x20,
    backend: BPBackend::Dynamic,
});

let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();

let data = vec![0xAAu8; 0x20];
let epoch = pipe.write(&data, 0).unwrap();
pipe.wait_for_durability(epoch).unwrap();

let read = pipe.read_single(0).unwrap();
assert_eq!(read, data);
Source

pub fn read(&self, index: usize, count: usize) -> FrozenRes<Vec<u8>>

Read count chunks starting from at the given index

This function performs a blocking read operation

§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_read_multi");

let file = FrozenFile::new(FFCfg {
    path,
    mid: 0,
    chunk_size: 0x20,
    initial_chunk_amount: 8,
}).unwrap();

let pool = BufPool::new(BPCfg {
    mid: 0,
    chunk_size: 0x20,
    backend: BPBackend::Dynamic,
});

let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();

let buf = vec![0xBBu8; 0x20 * 2];
let epoch = pipe.write(&buf, 0).unwrap();
pipe.wait_for_durability(epoch).unwrap();

let read = pipe.read(0, 2).unwrap();
assert_eq!(read, buf);
Source

pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()>

Blocks until given epoch becomes durable

Durability epochs increase when the background flusher successfully syncs the underlying [FrozenFile]

§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_wait");

let file = FrozenFile::new(FFCfg {
    mid: 0,
    path,
    chunk_size: 0x20,
    initial_chunk_amount: 2,
}).unwrap();

let pool = BufPool::new(BPCfg {
    mid: 0,
    chunk_size: 0x20,
    backend: BPBackend::Dynamic,
});

let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();

let buf = vec![1u8; 0x20];
let epoch = pipe.write(&buf, 0).unwrap();

pipe.wait_for_durability(epoch).unwrap();
Source

pub fn force_durability(&self, epoch: u64) -> FrozenRes<()>

Force instant durability for the current batch

This wakes the flusher thread and waits for the durability epoch

§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_force");

let file = FrozenFile::new(FFCfg {
    mid: 0,
    path,
    chunk_size: 0x20,
    initial_chunk_amount: 2,
}).unwrap();

let pool = BufPool::new(BPCfg {
    mid: 0,
    chunk_size: 0x20,
    backend: BPBackend::Dynamic,
});

let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();

let buf = vec![0x0Au8; 0x20];
let epoch = pipe.write(&buf, 0).unwrap();

pipe.force_durability(epoch).unwrap();
Source

pub fn grow(&self, count: usize) -> FrozenRes<()>

Grow the underlying [FrozenFile] by given count

The pipeline waits until all pending writes are flushed before extending the file

§Example
use frozen_core::fpipe::FrozenPipe;
use frozen_core::ffile::{FrozenFile, FFCfg};
use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
use std::time::Duration;

let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_grow");

let file = FrozenFile::new(FFCfg {
    mid: 0,
    path,
    chunk_size: 0x20,
    initial_chunk_amount: 2,
}).unwrap();

let pool = BufPool::new(BPCfg {
    mid: 0,
    chunk_size: 0x20,
    backend: BPBackend::Dynamic,
});

let pipe = FrozenPipe::new(file, pool, Duration::from_micros(10)).unwrap();
pipe.grow(4).unwrap();

Trait Implementations§

Source§

impl Debug for FrozenPipe

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Drop for FrozenPipe

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.