sharded_log/
lib.rs

1//! Shards batches of writes across multiple log files
2//! for high throughput and low contention. The general
3//! usage pattern is:
4//!
5//! 1. recover, pass recovered data to downstream storage
6//! 2. delete old logs
7//! 3. begin writing to the sharded logs
8
9const SUBDIR: &str = "sharded_logs";
10const WARN: &str = "DO_NOT_PUT_YOUR_FILES_HERE";
11
12use std::{
13    collections::BTreeMap,
14    fs::{self, File, OpenOptions},
15    io::{self, BufReader, BufWriter, Read, Seek, Write},
16    mem::MaybeUninit,
17    path::PathBuf,
18    sync::{
19        atomic::{
20            AtomicBool, AtomicU64, AtomicUsize, Ordering,
21        },
22        Arc, Mutex, MutexGuard,
23    },
24};
25
26use crc32fast::Hasher;
27use fault_injection::{fallible, maybe};
28use fs2::FileExt;
29
30#[derive(Debug, Clone)]
31pub struct Config {
32    /// Where to open the log.
33    pub path: PathBuf,
34    /// Number of sharded log files. Future calls to `recover` must always use at least this
35    /// many shards, otherwise the system will not be able to recover. Defaults to 8.
36    pub shards: u8,
37    /// The in-memory buffer size in front of each log file. Defaults to 512k.
38    pub in_memory_buffer_per_log: usize,
39}
40
41impl Default for Config {
42    fn default() -> Config {
43        Config {
44            path: Default::default(),
45            shards: 8,
46            in_memory_buffer_per_log: 512 * 1024,
47        }
48    }
49}
50
51impl Config {
52    /// Iterate over all log shards, returning batches in the
53    /// order that they were written in, even if they landed
54    /// in different shards. This method does not take out an
55    /// exclusive file lock on the shards in the way that
56    /// `Config::purge` and `Config::create` do, so it can
57    /// be used on logs that are actively being written.
58    /// HOWEVER: keep in mind that while writing logs,
59    /// significant data may remain in the in-memory
60    /// `BufWriter` instances until you call
61    /// `ShardedLog::flush`!
62    pub fn recover(&self) -> io::Result<RecoveryIterator> {
63        let mut file_opts = OpenOptions::new();
64        file_opts.read(true);
65
66        let mut readers = vec![];
67
68        for idx in 0..self.shards {
69            let path = self
70                .path
71                .join(SUBDIR)
72                .join(idx.to_string());
73
74            let file = fallible!(file_opts.open(path));
75            readers.push(BufReader::new(file));
76        }
77
78        let mut ret = RecoveryIterator {
79            done: false,
80            next_expected_lsn: 0,
81            readers,
82            last_shard: None,
83            read_buffer: BTreeMap::new(),
84        };
85
86        for idx in 0..self.shards {
87            ret.tick(idx as usize);
88        }
89
90        Ok(ret)
91    }
92
93    fn lock(&self) -> io::Result<File> {
94        fallible!(fs::create_dir_all(
95            self.path.join(SUBDIR)
96        ));
97        let mut lock_options = OpenOptions::new();
98        lock_options.read(true).create(true).write(true);
99        let lock_file = fallible!(lock_options
100            .open(self.path.join(SUBDIR).join(WARN)));
101        fallible!(lock_file.try_lock_exclusive());
102        Ok(lock_file)
103    }
104
105    /// Clear the previous contents of a log directory.
106    /// Requires an exclusive lock over the log directory,
107    /// and may not be performed concurrently with an
108    /// active `ShardedLog`.
109    pub fn purge(&self) -> io::Result<()> {
110        let _lock_file = Arc::new(fallible!(self.lock()));
111        fs::remove_dir_all(&self.path)
112    }
113
114    /// Exclusively create a new log directory that
115    /// may be used for writing sharded logs to.
116    pub fn create(&self) -> io::Result<ShardedLog> {
117        fallible!(fs::create_dir_all(
118            self.path.join(SUBDIR)
119        ));
120
121        let lock_file = Arc::new(fallible!(self.lock()));
122
123        let mut file_opts = OpenOptions::new();
124        file_opts.create_new(true).write(true);
125
126        let mut shards = vec![];
127        for idx in 0..self.shards {
128            let path = self
129                .path
130                .join(SUBDIR)
131                .join(idx.to_string());
132
133            let file = fallible!(file_opts.open(path));
134
135            shards.push(Shard {
136                file_mu: Mutex::new(
137                    BufWriter::with_capacity(
138                        self.in_memory_buffer_per_log,
139                        file,
140                    ),
141                ),
142                dirty: false.into(),
143            })
144        }
145
146        fallible!(
147            File::open(self.path.join(SUBDIR))?.sync_all()
148        );
149
150        Ok(ShardedLog {
151            shards: shards.into(),
152            idx: 0,
153            idx_counter: Arc::new(AtomicUsize::new(1)),
154            next_lsn: Arc::new(0.into()),
155            config: self.clone(),
156            lock_file,
157        })
158    }
159}
160
161/// Allows batches of bytes to be written
162/// with low contention, and read-back
163/// in a linearized order.
164pub struct ShardedLog {
165    shards: Arc<[Shard]>,
166    idx: usize,
167    idx_counter: Arc<AtomicUsize>,
168    next_lsn: Arc<AtomicU64>,
169    config: Config,
170    lock_file: Arc<File>,
171}
172
173pub struct Reservation<'a> {
174    shard: MutexGuard<'a, BufWriter<File>>,
175    completed: bool,
176    lsn: u64,
177}
178
179impl<'a> Drop for Reservation<'a> {
180    fn drop(&mut self) {
181        if !self.completed {
182            if let Err(e) = write_batch_inner::<&[u8]>(
183                &mut self.shard,
184                self.lsn,
185                &[],
186            ) {
187                eprintln!(
188                    "error while writing empty batch on \
189                    Reservation Drop: {:?}",
190                    e
191                );
192            }
193            self.completed = true;
194        }
195    }
196}
197
198impl<'a> Reservation<'a> {
199    pub fn write_batch<B: AsRef<[u8]>>(
200        mut self,
201        write_batch: &[B],
202    ) -> io::Result<u64> {
203        self.completed = true;
204        write_batch_inner(
205            &mut self.shard,
206            self.lsn,
207            write_batch,
208        )
209    }
210
211    pub fn abort(mut self) -> io::Result<u64> {
212        self.completed = true;
213        write_batch_inner::<&[u8]>(
214            &mut self.shard,
215            self.lsn,
216            &[],
217        )
218    }
219}
220
221impl Clone for ShardedLog {
222    fn clone(&self) -> ShardedLog {
223        ShardedLog {
224            shards: self.shards.clone(),
225            idx_counter: self.idx_counter.clone(),
226            idx: self
227                .idx_counter
228                .fetch_add(1, Ordering::SeqCst),
229            next_lsn: self.next_lsn.clone(),
230            config: self.config.clone(),
231            lock_file: self.lock_file.clone(),
232        }
233    }
234}
235
236struct Shard {
237    file_mu: Mutex<BufWriter<File>>,
238    dirty: AtomicBool,
239}
240
241pub struct RecoveryIterator {
242    next_expected_lsn: u64,
243    readers: Vec<BufReader<File>>,
244    read_buffer: BTreeMap<u64, (usize, Vec<Vec<u8>>)>,
245    last_shard: Option<usize>,
246    done: bool,
247}
248
249impl Iterator for RecoveryIterator {
250    type Item = Vec<Vec<u8>>;
251
252    fn next(&mut self) -> Option<Self::Item> {
253        let ret = self.next_inner();
254        if ret.is_none() {
255            self.done = true;
256        }
257        ret
258    }
259}
260
261impl RecoveryIterator {
262    fn next_inner(&mut self) -> Option<Vec<Vec<u8>>> {
263        if let Some(last_idx) = self.last_shard {
264            self.tick(last_idx);
265        }
266
267        let (idx, buf) = self
268            .read_buffer
269            .remove(&self.next_expected_lsn)?;
270
271        self.next_expected_lsn += 1;
272        self.last_shard = Some(idx);
273        Some(buf)
274    }
275
276    fn tick(&mut self, idx: usize) {
277        macro_rules! weak_try {
278            ($e:expr) => {{
279                match $e {
280                    Ok(ok) => ok,
281                    _ => return,
282                }
283            }};
284        }
285
286        let mut reader = &mut self.readers[idx];
287
288        let crc_expected: [u8; 4] =
289            weak_try!(read_array(&mut reader));
290        let size_bytes: [u8; 8] =
291            weak_try!(read_array(&mut reader));
292        let lsn_bytes: [u8; 8] =
293            weak_try!(read_array(&mut reader));
294
295        let mut hasher = Hasher::new();
296        hasher.update(&size_bytes);
297        hasher.update(&lsn_bytes);
298        let crc_actual =
299            (hasher.finalize() ^ 0xFF).to_le_bytes();
300
301        if crc_actual != crc_expected {
302            eprintln!("encountered corrupted crc in log");
303            return;
304        }
305
306        let size =
307            usize::try_from(u64::from_le_bytes(size_bytes))
308                .unwrap();
309        let lsn = u64::from_le_bytes(lsn_bytes);
310
311        let mut write_batch = vec![];
312        for _ in 0..size {
313            let crc_expected: [u8; 4] =
314                weak_try!(read_array(&mut reader));
315            let len_bytes: [u8; 8] =
316                weak_try!(read_array(&mut reader));
317
318            let len = usize::try_from(u64::from_le_bytes(
319                len_bytes,
320            ))
321            .unwrap();
322
323            let mut buf = Vec::with_capacity(len);
324
325            unsafe { buf.set_len(len) };
326
327            weak_try!(maybe!(reader.read_exact(&mut buf)));
328
329            let mut hasher = Hasher::new();
330            hasher.update(&len_bytes);
331            hasher.update(&buf);
332            let crc_actual =
333                (hasher.finalize() ^ 0xFF).to_le_bytes();
334
335            if crc_actual != crc_expected {
336                return;
337            }
338
339            write_batch.push(buf);
340        }
341
342        self.read_buffer.insert(lsn, (idx, write_batch));
343    }
344}
345
346fn read_array<const LEN: usize>(
347    mut reader: impl io::Read,
348) -> io::Result<[u8; LEN]> {
349    let mut buf: [u8; LEN] =
350        unsafe { MaybeUninit::uninit().assume_init() };
351    reader.read_exact(&mut buf)?;
352    Ok(buf)
353}
354
355impl ShardedLog {
356    /// Write a batch of buffers to the sharded log.
357    /// Returns the logical sequence number that they
358    /// will be recoverable at after your next call
359    /// to `flush`. Writes the batch into a `BufWriter`
360    /// in front of the sharded log file, which will
361    /// be flushed on Drop, but can also be flushed
362    /// using the `flush` method.
363    pub fn write_batch<B: AsRef<[u8]>>(
364        &self,
365        write_batch: &[B],
366    ) -> io::Result<u64> {
367        self.reservation().write_batch(write_batch)
368    }
369
370    /// Reserve a log slot at a particular index,
371    /// which can then be completed using
372    /// `Reservation::write_batch` or aborted using
373    /// `Reservation::abort`. This is particularly
374    /// useful for logging fallible lock-free operations
375    /// to disk in an order-preserving manner.
376    pub fn reservation(&self) -> Reservation<'_> {
377        let shard = self.get_shard();
378
379        // NB lsn has to be created after acquiring the shard.
380        let lsn =
381            self.next_lsn.fetch_add(1, Ordering::Release);
382
383        Reservation {
384            shard,
385            lsn,
386            completed: false,
387        }
388    }
389
390    /// Flushes and fsyncs in-memory shard data that
391    /// have been written to directly from this instance
392    /// of `ShardedLog`, but skipping shards that have
393    /// been written to by others. To flush all shards,
394    /// use `flush_all`.
395    pub fn flush(&self) -> io::Result<()> {
396        for shard in &*self.shards {
397            if shard.dirty.load(Ordering::Acquire) {
398                let mut file =
399                    shard.file_mu.lock().unwrap();
400                if shard.dirty.load(Ordering::Acquire) {
401                    fallible!(file.flush());
402                    fallible!(file.get_mut().sync_all());
403                    shard
404                        .dirty
405                        .store(false, Ordering::Release);
406                }
407            }
408        }
409        Ok(())
410    }
411
412    /// Delete all logs in the system
413    pub fn purge(&self) -> io::Result<()> {
414        let mut buffers = vec![];
415        for shard in &*self.shards {
416            let buffer = shard.file_mu.lock().unwrap();
417            buffers.push(buffer);
418        }
419
420        for buffer in &mut buffers {
421            fallible!(buffer.flush());
422
423            let file = buffer.get_mut();
424            fallible!(file.seek(io::SeekFrom::Start(0)));
425            fallible!(file.set_len(0));
426            fallible!(file.sync_all());
427        }
428
429        for shard in &*self.shards {
430            shard.dirty.store(false, Ordering::Release);
431        }
432
433        // NB: buffers holds mutexes which must be held open until
434        // after all dirty flags are clear
435        drop(buffers);
436
437        Ok(())
438    }
439
440    fn get_shard(&self) -> MutexGuard<'_, BufWriter<File>> {
441        let len = self.shards.len();
442        for i in 0..len {
443            // walk backwards to avoid creating
444            // contention for the very next write
445            let idx = self.idx.wrapping_sub(i) % len;
446
447            if let Ok(shard) =
448                self.shards[idx].file_mu.try_lock()
449            {
450                self.shards[idx]
451                    .dirty
452                    .store(true, Ordering::Release);
453                return shard;
454            }
455        }
456
457        let ret =
458            self.shards[self.idx].file_mu.lock().unwrap();
459
460        // NB: dirty must be set after locking shard
461        // to properly synchronize with flush's unset
462        self.shards[self.idx]
463            .dirty
464            .store(true, Ordering::Release);
465
466        ret
467    }
468}
469
470fn write_batch_inner<B: AsRef<[u8]>>(
471    shard: &mut BufWriter<File>,
472    lsn: u64,
473    write_batch: &[B],
474) -> io::Result<u64> {
475    let size_bytes: [u8; 8] =
476        (write_batch.len() as u64).to_le_bytes();
477    let lsn_bytes: [u8; 8] = lsn.to_le_bytes();
478    let mut hasher = Hasher::new();
479    hasher.update(&size_bytes);
480    hasher.update(&lsn_bytes);
481    let crc: [u8; 4] =
482        (hasher.finalize() ^ 0xFF).to_le_bytes();
483
484    fallible!(shard.write_all(&crc));
485    fallible!(shard.write_all(&size_bytes));
486    fallible!(shard.write_all(&lsn_bytes));
487
488    for buf_i in write_batch {
489        let buf = buf_i.as_ref();
490        let crc_bytes: [u8; 4];
491        let len_bytes: [u8; 8] =
492            (buf.len() as u64).to_le_bytes();
493
494        let mut hasher = Hasher::new();
495        hasher.update(&len_bytes);
496        hasher.update(&buf);
497        crc_bytes =
498            (hasher.finalize() ^ 0xFF).to_le_bytes();
499
500        fallible!(shard.write_all(&crc_bytes));
501        fallible!(shard.write_all(&len_bytes));
502        fallible!(shard.write_all(buf));
503    }
504
505    Ok(lsn)
506}
507
508#[test]
509fn concurrency() {
510    let n_threads = 16_usize;
511    let n_ops_per_thread = 10 * 1024;
512
513    static CONCURRENCY_TEST_COUNTER: AtomicU64 =
514        AtomicU64::new(0);
515
516    let config = Config {
517        shards: (n_threads / 4).max(1) as u8,
518        path: "test_concurrency".into(),
519        ..Default::default()
520    };
521
522    config.purge().unwrap();
523
524    let log = Arc::new(config.create().unwrap());
525
526    let barrier =
527        Arc::new(std::sync::Barrier::new(n_threads + 1));
528    let mut threads = vec![];
529    for _ in 0..n_threads {
530        let barrier = barrier.clone();
531        let log = log.clone();
532
533        let thread = std::thread::spawn(move || {
534            barrier.wait();
535
536            let mut successes = 0;
537            while successes < n_ops_per_thread {
538                let old_value = CONCURRENCY_TEST_COUNTER
539                    .load(Ordering::Acquire);
540
541                std::thread::yield_now();
542
543                let reservation = log.reservation();
544
545                std::thread::yield_now();
546
547                let cas_res = CONCURRENCY_TEST_COUNTER
548                    .compare_exchange(
549                        old_value,
550                        old_value + 1,
551                        Ordering::AcqRel,
552                        Ordering::Relaxed,
553                    );
554
555                if cas_res.is_ok() {
556                    let value = old_value.to_le_bytes();
557                    reservation
558                        .write_batch(&[value])
559                        .unwrap();
560                    successes += 1;
561                } else {
562                    reservation.abort().unwrap();
563                }
564            }
565        });
566
567        threads.push(thread);
568    }
569
570    barrier.wait();
571
572    for thread in threads.into_iter() {
573        thread.join().unwrap();
574    }
575
576    drop(log);
577
578    let mut iter = config.recover().unwrap();
579
580    let mut successes = 0;
581    while successes < n_threads * n_ops_per_thread {
582        if let Some(next) = iter.next().unwrap().pop() {
583            let value = u64::from_le_bytes(
584                next.try_into().unwrap(),
585            );
586            assert_eq!(value, successes as u64);
587            successes += 1;
588        }
589    }
590
591    drop(iter);
592
593    let _ = std::fs::remove_dir_all(&config.path);
594}