ShardedLog

Struct ShardedLog 

Source
pub struct ShardedLog { /* private fields */ }
Expand description

Allows batches of bytes to be written with low contention, and read-back in a linearized order.

Implementations§

Source§

impl ShardedLog

Source

pub fn write_batch<B: AsRef<[u8]>>(&self, write_batch: &[B]) -> Result<u64>

Write a batch of buffers to the sharded log. Returns the logical sequence number that they will be recoverable at after your next call to flush. Writes the batch into a BufWriter in front of the sharded log file, which will be flushed on Drop, but can also be flushed using the flush method.

Examples found in repository?
examples/recovery.rs (line 14)
3fn main() {
4    let config = Config {
5        path: "path/to/logs".into(),
6        ..Default::default()
7    };
8
9    config.purge().unwrap();
10
11    let sharded_log = config.create().unwrap();
12
13    sharded_log
14        .write_batch(&[b"a", b"b", b"c", b"d"])
15        .unwrap();
16
17    sharded_log.flush().unwrap();
18
19    for write_batch in config.recover().unwrap() {
20        println!("got batch: {:?}", write_batch);
21        assert_eq!(
22            write_batch,
23            vec![b"a", b"b", b"c", b"d"]
24        );
25    }
26
27    let _ = std::fs::remove_dir_all("path");
28}
More examples
Hide additional examples
examples/write_performance.rs (line 44)
4fn main() {
5    let number_of_threads = 16;
6    let buffers_per_batch = 8;
7    let buffer_size = 512;
8    let batches_per_thread = 128 * 1024;
9
10    let config = Config {
11        path: "write_performance".into(),
12        ..Default::default()
13    };
14
15    config.purge().unwrap();
16
17    let log = Arc::new(config.create().unwrap());
18
19    let barrier =
20        Arc::new(Barrier::new(number_of_threads + 1));
21
22    let mut threads = vec![];
23
24    let before = std::time::Instant::now();
25
26    for _ in 0..number_of_threads {
27        let barrier = barrier.clone();
28        let log = log.clone();
29
30        let thread = std::thread::spawn(move || {
31            barrier.wait();
32
33            for _ in 0..batches_per_thread {
34                let mut batch =
35                    Vec::with_capacity(buffers_per_batch);
36                for _ in 0..buffers_per_batch {
37                    let mut buffer =
38                        Vec::with_capacity(buffer_size);
39                    unsafe {
40                        buffer.set_len(buffer_size);
41                    }
42                    batch.push(buffer);
43                }
44                log.write_batch(&batch).unwrap();
45            }
46        });
47
48        threads.push(thread);
49    }
50
51    barrier.wait();
52
53    for thread in threads.into_iter() {
54        thread.join().unwrap();
55    }
56
57    log.flush().unwrap();
58
59    let bytes_written = number_of_threads
60        * batches_per_thread
61        * buffers_per_batch
62        * buffer_size;
63    let throughput = bytes_written as u128
64        / before.elapsed().as_micros();
65
66    println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67        throughput,
68        number_of_threads,
69        batches_per_thread,
70        buffers_per_batch,
71        buffer_size
72    );
73
74    let _ = std::fs::remove_dir_all("write_performance");
75}
Source

pub fn reservation(&self) -> Reservation<'_>

Reserve a log slot at a particular index, which can then be completed using Reservation::write_batch or aborted using Reservation::abort. This is particularly useful for logging fallible lock-free operations to disk in an order-preserving manner.

Source

pub fn flush(&self) -> Result<()>

Flushes and fsyncs in-memory shard data that have been written to directly from this instance of ShardedLog, but skipping shards that have been written to by others. To flush all shards, use flush_all.

Examples found in repository?
examples/recovery.rs (line 17)
3fn main() {
4    let config = Config {
5        path: "path/to/logs".into(),
6        ..Default::default()
7    };
8
9    config.purge().unwrap();
10
11    let sharded_log = config.create().unwrap();
12
13    sharded_log
14        .write_batch(&[b"a", b"b", b"c", b"d"])
15        .unwrap();
16
17    sharded_log.flush().unwrap();
18
19    for write_batch in config.recover().unwrap() {
20        println!("got batch: {:?}", write_batch);
21        assert_eq!(
22            write_batch,
23            vec![b"a", b"b", b"c", b"d"]
24        );
25    }
26
27    let _ = std::fs::remove_dir_all("path");
28}
More examples
Hide additional examples
examples/write_performance.rs (line 57)
4fn main() {
5    let number_of_threads = 16;
6    let buffers_per_batch = 8;
7    let buffer_size = 512;
8    let batches_per_thread = 128 * 1024;
9
10    let config = Config {
11        path: "write_performance".into(),
12        ..Default::default()
13    };
14
15    config.purge().unwrap();
16
17    let log = Arc::new(config.create().unwrap());
18
19    let barrier =
20        Arc::new(Barrier::new(number_of_threads + 1));
21
22    let mut threads = vec![];
23
24    let before = std::time::Instant::now();
25
26    for _ in 0..number_of_threads {
27        let barrier = barrier.clone();
28        let log = log.clone();
29
30        let thread = std::thread::spawn(move || {
31            barrier.wait();
32
33            for _ in 0..batches_per_thread {
34                let mut batch =
35                    Vec::with_capacity(buffers_per_batch);
36                for _ in 0..buffers_per_batch {
37                    let mut buffer =
38                        Vec::with_capacity(buffer_size);
39                    unsafe {
40                        buffer.set_len(buffer_size);
41                    }
42                    batch.push(buffer);
43                }
44                log.write_batch(&batch).unwrap();
45            }
46        });
47
48        threads.push(thread);
49    }
50
51    barrier.wait();
52
53    for thread in threads.into_iter() {
54        thread.join().unwrap();
55    }
56
57    log.flush().unwrap();
58
59    let bytes_written = number_of_threads
60        * batches_per_thread
61        * buffers_per_batch
62        * buffer_size;
63    let throughput = bytes_written as u128
64        / before.elapsed().as_micros();
65
66    println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67        throughput,
68        number_of_threads,
69        batches_per_thread,
70        buffers_per_batch,
71        buffer_size
72    );
73
74    let _ = std::fs::remove_dir_all("write_performance");
75}
Source

pub fn purge(&self) -> Result<()>

Delete all logs in the system

Trait Implementations§

Source§

impl Clone for ShardedLog

Source§

fn clone(&self) -> ShardedLog

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.