pub struct Config {
pub path: PathBuf,
pub shards: u8,
pub in_memory_buffer_per_log: usize,
}Fields§
§path: PathBufWhere to open the log.
shards: u8Number of sharded log files. Future calls to recover must always use at least this
many shards, otherwise the system will not be able to recover. Defaults to 8.
in_memory_buffer_per_log: usizeThe in-memory buffer size in front of each log file. Defaults to 512k.
Implementations§
Source§impl Config
impl Config
Sourcepub fn recover(&self) -> Result<RecoveryIterator>
pub fn recover(&self) -> Result<RecoveryIterator>
Iterate over all log shards, returning batches in the
order that they were written in, even if they landed
in different shards. This method does not take out an
exclusive file lock on the shards in the way that
Config::purge and Config::create do, so it can
be used on logs that are actively being written.
HOWEVER: keep in mind that while writing logs,
significant data may remain in the in-memory
BufWriter instances until you call
ShardedLog::flush!
Examples found in repository?
3fn main() {
4 let config = Config {
5 path: "path/to/logs".into(),
6 ..Default::default()
7 };
8
9 config.purge().unwrap();
10
11 let sharded_log = config.create().unwrap();
12
13 sharded_log
14 .write_batch(&[b"a", b"b", b"c", b"d"])
15 .unwrap();
16
17 sharded_log.flush().unwrap();
18
19 for write_batch in config.recover().unwrap() {
20 println!("got batch: {:?}", write_batch);
21 assert_eq!(
22 write_batch,
23 vec![b"a", b"b", b"c", b"d"]
24 );
25 }
26
27 let _ = std::fs::remove_dir_all("path");
28}Sourcepub fn purge(&self) -> Result<()>
pub fn purge(&self) -> Result<()>
Clear the previous contents of a log directory.
Requires an exclusive lock over the log directory,
and may not be performed concurrently with an
active ShardedLog.
Examples found in repository?
3fn main() {
4 let config = Config {
5 path: "path/to/logs".into(),
6 ..Default::default()
7 };
8
9 config.purge().unwrap();
10
11 let sharded_log = config.create().unwrap();
12
13 sharded_log
14 .write_batch(&[b"a", b"b", b"c", b"d"])
15 .unwrap();
16
17 sharded_log.flush().unwrap();
18
19 for write_batch in config.recover().unwrap() {
20 println!("got batch: {:?}", write_batch);
21 assert_eq!(
22 write_batch,
23 vec![b"a", b"b", b"c", b"d"]
24 );
25 }
26
27 let _ = std::fs::remove_dir_all("path");
28}More examples
4fn main() {
5 let number_of_threads = 16;
6 let buffers_per_batch = 8;
7 let buffer_size = 512;
8 let batches_per_thread = 128 * 1024;
9
10 let config = Config {
11 path: "write_performance".into(),
12 ..Default::default()
13 };
14
15 config.purge().unwrap();
16
17 let log = Arc::new(config.create().unwrap());
18
19 let barrier =
20 Arc::new(Barrier::new(number_of_threads + 1));
21
22 let mut threads = vec![];
23
24 let before = std::time::Instant::now();
25
26 for _ in 0..number_of_threads {
27 let barrier = barrier.clone();
28 let log = log.clone();
29
30 let thread = std::thread::spawn(move || {
31 barrier.wait();
32
33 for _ in 0..batches_per_thread {
34 let mut batch =
35 Vec::with_capacity(buffers_per_batch);
36 for _ in 0..buffers_per_batch {
37 let mut buffer =
38 Vec::with_capacity(buffer_size);
39 unsafe {
40 buffer.set_len(buffer_size);
41 }
42 batch.push(buffer);
43 }
44 log.write_batch(&batch).unwrap();
45 }
46 });
47
48 threads.push(thread);
49 }
50
51 barrier.wait();
52
53 for thread in threads.into_iter() {
54 thread.join().unwrap();
55 }
56
57 log.flush().unwrap();
58
59 let bytes_written = number_of_threads
60 * batches_per_thread
61 * buffers_per_batch
62 * buffer_size;
63 let throughput = bytes_written as u128
64 / before.elapsed().as_micros();
65
66 println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67 throughput,
68 number_of_threads,
69 batches_per_thread,
70 buffers_per_batch,
71 buffer_size
72 );
73
74 let _ = std::fs::remove_dir_all("write_performance");
75}Sourcepub fn create(&self) -> Result<ShardedLog>
pub fn create(&self) -> Result<ShardedLog>
Exclusively create a new log directory that may be used for writing sharded logs to.
Examples found in repository?
3fn main() {
4 let config = Config {
5 path: "path/to/logs".into(),
6 ..Default::default()
7 };
8
9 config.purge().unwrap();
10
11 let sharded_log = config.create().unwrap();
12
13 sharded_log
14 .write_batch(&[b"a", b"b", b"c", b"d"])
15 .unwrap();
16
17 sharded_log.flush().unwrap();
18
19 for write_batch in config.recover().unwrap() {
20 println!("got batch: {:?}", write_batch);
21 assert_eq!(
22 write_batch,
23 vec![b"a", b"b", b"c", b"d"]
24 );
25 }
26
27 let _ = std::fs::remove_dir_all("path");
28}More examples
4fn main() {
5 let number_of_threads = 16;
6 let buffers_per_batch = 8;
7 let buffer_size = 512;
8 let batches_per_thread = 128 * 1024;
9
10 let config = Config {
11 path: "write_performance".into(),
12 ..Default::default()
13 };
14
15 config.purge().unwrap();
16
17 let log = Arc::new(config.create().unwrap());
18
19 let barrier =
20 Arc::new(Barrier::new(number_of_threads + 1));
21
22 let mut threads = vec![];
23
24 let before = std::time::Instant::now();
25
26 for _ in 0..number_of_threads {
27 let barrier = barrier.clone();
28 let log = log.clone();
29
30 let thread = std::thread::spawn(move || {
31 barrier.wait();
32
33 for _ in 0..batches_per_thread {
34 let mut batch =
35 Vec::with_capacity(buffers_per_batch);
36 for _ in 0..buffers_per_batch {
37 let mut buffer =
38 Vec::with_capacity(buffer_size);
39 unsafe {
40 buffer.set_len(buffer_size);
41 }
42 batch.push(buffer);
43 }
44 log.write_batch(&batch).unwrap();
45 }
46 });
47
48 threads.push(thread);
49 }
50
51 barrier.wait();
52
53 for thread in threads.into_iter() {
54 thread.join().unwrap();
55 }
56
57 log.flush().unwrap();
58
59 let bytes_written = number_of_threads
60 * batches_per_thread
61 * buffers_per_batch
62 * buffer_size;
63 let throughput = bytes_written as u128
64 / before.elapsed().as_micros();
65
66 println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67 throughput,
68 number_of_threads,
69 batches_per_thread,
70 buffers_per_batch,
71 buffer_size
72 );
73
74 let _ = std::fs::remove_dir_all("write_performance");
75}