Config

Struct Config 

Source
pub struct Config {
    pub path: PathBuf,
    pub shards: u8,
    pub in_memory_buffer_per_log: usize,
}

Fields§

§path: PathBuf

Where to open the log.

§shards: u8

Number of sharded log files. Future calls to recover must always use at least this many shards, otherwise the system will not be able to recover. Defaults to 8.

§in_memory_buffer_per_log: usize

The in-memory buffer size in front of each log file. Defaults to 512k.

Implementations§

Source§

impl Config

Source

pub fn recover(&self) -> Result<RecoveryIterator>

Iterate over all log shards, returning batches in the order that they were written in, even if they landed in different shards. This method does not take out an exclusive file lock on the shards in the way that Config::purge and Config::create do, so it can be used on logs that are actively being written. HOWEVER: keep in mind that while writing logs, significant data may remain in the in-memory BufWriter instances until you call ShardedLog::flush!

Examples found in repository?
examples/recovery.rs (line 19)
3fn main() {
4    let config = Config {
5        path: "path/to/logs".into(),
6        ..Default::default()
7    };
8
9    config.purge().unwrap();
10
11    let sharded_log = config.create().unwrap();
12
13    sharded_log
14        .write_batch(&[b"a", b"b", b"c", b"d"])
15        .unwrap();
16
17    sharded_log.flush().unwrap();
18
19    for write_batch in config.recover().unwrap() {
20        println!("got batch: {:?}", write_batch);
21        assert_eq!(
22            write_batch,
23            vec![b"a", b"b", b"c", b"d"]
24        );
25    }
26
27    let _ = std::fs::remove_dir_all("path");
28}
Source

pub fn purge(&self) -> Result<()>

Clear the previous contents of a log directory. Requires an exclusive lock over the log directory, and may not be performed concurrently with an active ShardedLog.

Examples found in repository?
examples/recovery.rs (line 9)
3fn main() {
4    let config = Config {
5        path: "path/to/logs".into(),
6        ..Default::default()
7    };
8
9    config.purge().unwrap();
10
11    let sharded_log = config.create().unwrap();
12
13    sharded_log
14        .write_batch(&[b"a", b"b", b"c", b"d"])
15        .unwrap();
16
17    sharded_log.flush().unwrap();
18
19    for write_batch in config.recover().unwrap() {
20        println!("got batch: {:?}", write_batch);
21        assert_eq!(
22            write_batch,
23            vec![b"a", b"b", b"c", b"d"]
24        );
25    }
26
27    let _ = std::fs::remove_dir_all("path");
28}
More examples
Hide additional examples
examples/write_performance.rs (line 15)
4fn main() {
5    let number_of_threads = 16;
6    let buffers_per_batch = 8;
7    let buffer_size = 512;
8    let batches_per_thread = 128 * 1024;
9
10    let config = Config {
11        path: "write_performance".into(),
12        ..Default::default()
13    };
14
15    config.purge().unwrap();
16
17    let log = Arc::new(config.create().unwrap());
18
19    let barrier =
20        Arc::new(Barrier::new(number_of_threads + 1));
21
22    let mut threads = vec![];
23
24    let before = std::time::Instant::now();
25
26    for _ in 0..number_of_threads {
27        let barrier = barrier.clone();
28        let log = log.clone();
29
30        let thread = std::thread::spawn(move || {
31            barrier.wait();
32
33            for _ in 0..batches_per_thread {
34                let mut batch =
35                    Vec::with_capacity(buffers_per_batch);
36                for _ in 0..buffers_per_batch {
37                    let mut buffer =
38                        Vec::with_capacity(buffer_size);
39                    unsafe {
40                        buffer.set_len(buffer_size);
41                    }
42                    batch.push(buffer);
43                }
44                log.write_batch(&batch).unwrap();
45            }
46        });
47
48        threads.push(thread);
49    }
50
51    barrier.wait();
52
53    for thread in threads.into_iter() {
54        thread.join().unwrap();
55    }
56
57    log.flush().unwrap();
58
59    let bytes_written = number_of_threads
60        * batches_per_thread
61        * buffers_per_batch
62        * buffer_size;
63    let throughput = bytes_written as u128
64        / before.elapsed().as_micros();
65
66    println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67        throughput,
68        number_of_threads,
69        batches_per_thread,
70        buffers_per_batch,
71        buffer_size
72    );
73
74    let _ = std::fs::remove_dir_all("write_performance");
75}
Source

pub fn create(&self) -> Result<ShardedLog>

Exclusively create a new log directory that may be used for writing sharded logs to.

Examples found in repository?
examples/recovery.rs (line 11)
3fn main() {
4    let config = Config {
5        path: "path/to/logs".into(),
6        ..Default::default()
7    };
8
9    config.purge().unwrap();
10
11    let sharded_log = config.create().unwrap();
12
13    sharded_log
14        .write_batch(&[b"a", b"b", b"c", b"d"])
15        .unwrap();
16
17    sharded_log.flush().unwrap();
18
19    for write_batch in config.recover().unwrap() {
20        println!("got batch: {:?}", write_batch);
21        assert_eq!(
22            write_batch,
23            vec![b"a", b"b", b"c", b"d"]
24        );
25    }
26
27    let _ = std::fs::remove_dir_all("path");
28}
More examples
Hide additional examples
examples/write_performance.rs (line 17)
4fn main() {
5    let number_of_threads = 16;
6    let buffers_per_batch = 8;
7    let buffer_size = 512;
8    let batches_per_thread = 128 * 1024;
9
10    let config = Config {
11        path: "write_performance".into(),
12        ..Default::default()
13    };
14
15    config.purge().unwrap();
16
17    let log = Arc::new(config.create().unwrap());
18
19    let barrier =
20        Arc::new(Barrier::new(number_of_threads + 1));
21
22    let mut threads = vec![];
23
24    let before = std::time::Instant::now();
25
26    for _ in 0..number_of_threads {
27        let barrier = barrier.clone();
28        let log = log.clone();
29
30        let thread = std::thread::spawn(move || {
31            barrier.wait();
32
33            for _ in 0..batches_per_thread {
34                let mut batch =
35                    Vec::with_capacity(buffers_per_batch);
36                for _ in 0..buffers_per_batch {
37                    let mut buffer =
38                        Vec::with_capacity(buffer_size);
39                    unsafe {
40                        buffer.set_len(buffer_size);
41                    }
42                    batch.push(buffer);
43                }
44                log.write_batch(&batch).unwrap();
45            }
46        });
47
48        threads.push(thread);
49    }
50
51    barrier.wait();
52
53    for thread in threads.into_iter() {
54        thread.join().unwrap();
55    }
56
57    log.flush().unwrap();
58
59    let bytes_written = number_of_threads
60        * batches_per_thread
61        * buffers_per_batch
62        * buffer_size;
63    let throughput = bytes_written as u128
64        / before.elapsed().as_micros();
65
66    println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67        throughput,
68        number_of_threads,
69        batches_per_thread,
70        buffers_per_batch,
71        buffer_size
72    );
73
74    let _ = std::fs::remove_dir_all("write_performance");
75}

Trait Implementations§

Source§

impl Clone for Config

Source§

fn clone(&self) -> Config

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Config

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for Config

Source§

fn default() -> Config

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl Freeze for Config

§

impl RefUnwindSafe for Config

§

impl Send for Config

§

impl Sync for Config

§

impl Unpin for Config

§

impl UnwindSafe for Config

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.