pub struct ShardedLog { /* private fields */ }Expand description
Allows batches of bytes to be written with low contention, and read-back in a linearized order.
Implementations§
Source§impl ShardedLog
impl ShardedLog
Sourcepub fn write_batch<B: AsRef<[u8]>>(&self, write_batch: &[B]) -> Result<u64>
pub fn write_batch<B: AsRef<[u8]>>(&self, write_batch: &[B]) -> Result<u64>
Write a batch of buffers to the sharded log.
Returns the logical sequence number that they
will be recoverable at after your next call
to flush. Writes the batch into a BufWriter
in front of the sharded log file, which will
be flushed on Drop, but can also be flushed
using the flush method.
Examples found in repository?
3fn main() {
4 let config = Config {
5 path: "path/to/logs".into(),
6 ..Default::default()
7 };
8
9 config.purge().unwrap();
10
11 let sharded_log = config.create().unwrap();
12
13 sharded_log
14 .write_batch(&[b"a", b"b", b"c", b"d"])
15 .unwrap();
16
17 sharded_log.flush().unwrap();
18
19 for write_batch in config.recover().unwrap() {
20 println!("got batch: {:?}", write_batch);
21 assert_eq!(
22 write_batch,
23 vec![b"a", b"b", b"c", b"d"]
24 );
25 }
26
27 let _ = std::fs::remove_dir_all("path");
28}More examples
4fn main() {
5 let number_of_threads = 16;
6 let buffers_per_batch = 8;
7 let buffer_size = 512;
8 let batches_per_thread = 128 * 1024;
9
10 let config = Config {
11 path: "write_performance".into(),
12 ..Default::default()
13 };
14
15 config.purge().unwrap();
16
17 let log = Arc::new(config.create().unwrap());
18
19 let barrier =
20 Arc::new(Barrier::new(number_of_threads + 1));
21
22 let mut threads = vec![];
23
24 let before = std::time::Instant::now();
25
26 for _ in 0..number_of_threads {
27 let barrier = barrier.clone();
28 let log = log.clone();
29
30 let thread = std::thread::spawn(move || {
31 barrier.wait();
32
33 for _ in 0..batches_per_thread {
34 let mut batch =
35 Vec::with_capacity(buffers_per_batch);
36 for _ in 0..buffers_per_batch {
37 let mut buffer =
38 Vec::with_capacity(buffer_size);
39 unsafe {
40 buffer.set_len(buffer_size);
41 }
42 batch.push(buffer);
43 }
44 log.write_batch(&batch).unwrap();
45 }
46 });
47
48 threads.push(thread);
49 }
50
51 barrier.wait();
52
53 for thread in threads.into_iter() {
54 thread.join().unwrap();
55 }
56
57 log.flush().unwrap();
58
59 let bytes_written = number_of_threads
60 * batches_per_thread
61 * buffers_per_batch
62 * buffer_size;
63 let throughput = bytes_written as u128
64 / before.elapsed().as_micros();
65
66 println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67 throughput,
68 number_of_threads,
69 batches_per_thread,
70 buffers_per_batch,
71 buffer_size
72 );
73
74 let _ = std::fs::remove_dir_all("write_performance");
75}Sourcepub fn reservation(&self) -> Reservation<'_>
pub fn reservation(&self) -> Reservation<'_>
Reserve a log slot at a particular index,
which can then be completed using
Reservation::write_batch or aborted using
Reservation::abort. This is particularly
useful for logging fallible lock-free operations
to disk in an order-preserving manner.
Sourcepub fn flush(&self) -> Result<()>
pub fn flush(&self) -> Result<()>
Flushes and fsyncs in-memory shard data that
have been written to directly from this instance
of ShardedLog, but skipping shards that have
been written to by others. To flush all shards,
use flush_all.
Examples found in repository?
3fn main() {
4 let config = Config {
5 path: "path/to/logs".into(),
6 ..Default::default()
7 };
8
9 config.purge().unwrap();
10
11 let sharded_log = config.create().unwrap();
12
13 sharded_log
14 .write_batch(&[b"a", b"b", b"c", b"d"])
15 .unwrap();
16
17 sharded_log.flush().unwrap();
18
19 for write_batch in config.recover().unwrap() {
20 println!("got batch: {:?}", write_batch);
21 assert_eq!(
22 write_batch,
23 vec![b"a", b"b", b"c", b"d"]
24 );
25 }
26
27 let _ = std::fs::remove_dir_all("path");
28}More examples
4fn main() {
5 let number_of_threads = 16;
6 let buffers_per_batch = 8;
7 let buffer_size = 512;
8 let batches_per_thread = 128 * 1024;
9
10 let config = Config {
11 path: "write_performance".into(),
12 ..Default::default()
13 };
14
15 config.purge().unwrap();
16
17 let log = Arc::new(config.create().unwrap());
18
19 let barrier =
20 Arc::new(Barrier::new(number_of_threads + 1));
21
22 let mut threads = vec![];
23
24 let before = std::time::Instant::now();
25
26 for _ in 0..number_of_threads {
27 let barrier = barrier.clone();
28 let log = log.clone();
29
30 let thread = std::thread::spawn(move || {
31 barrier.wait();
32
33 for _ in 0..batches_per_thread {
34 let mut batch =
35 Vec::with_capacity(buffers_per_batch);
36 for _ in 0..buffers_per_batch {
37 let mut buffer =
38 Vec::with_capacity(buffer_size);
39 unsafe {
40 buffer.set_len(buffer_size);
41 }
42 batch.push(buffer);
43 }
44 log.write_batch(&batch).unwrap();
45 }
46 });
47
48 threads.push(thread);
49 }
50
51 barrier.wait();
52
53 for thread in threads.into_iter() {
54 thread.join().unwrap();
55 }
56
57 log.flush().unwrap();
58
59 let bytes_written = number_of_threads
60 * batches_per_thread
61 * buffers_per_batch
62 * buffer_size;
63 let throughput = bytes_written as u128
64 / before.elapsed().as_micros();
65
66 println!("wrote {} megabytes per second, {} threads, {} batches per thread, {} buffers per batch, {} bytes per buffer",
67 throughput,
68 number_of_threads,
69 batches_per_thread,
70 buffers_per_batch,
71 buffer_size
72 );
73
74 let _ = std::fs::remove_dir_all("write_performance");
75}Trait Implementations§
Source§impl Clone for ShardedLog
impl Clone for ShardedLog
Source§fn clone(&self) -> ShardedLog
fn clone(&self) -> ShardedLog
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more