metadata_store/
lib.rs

1use std::collections::BTreeSet;
2use std::fs;
3use std::io::{self, Read, Write};
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::{
7    atomic::{AtomicPtr, AtomicU64, Ordering},
8    Arc,
9};
10
11use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
12use fault_injection::{annotate, fallible, maybe};
13use fnv::FnvHashMap;
14use inline_array::InlineArray;
15use parking_lot::Mutex;
16use rayon::prelude::*;
17use zstd::stream::read::Decoder as ZstdDecoder;
18use zstd::stream::write::Encoder as ZstdEncoder;
19
20const WARN: &str = "DO_NOT_PUT_YOUR_FILES_HERE";
21const TMP_SUFFIX: &str = ".tmp";
22const LOG_PREFIX: &str = "log";
23const SNAPSHOT_PREFIX: &str = "snapshot";
24
25const ZSTD_LEVEL: i32 = 3;
26
27pub struct MetadataStore {
28    inner: Inner,
29    is_shut_down: bool,
30}
31
32impl Drop for MetadataStore {
33    fn drop(&mut self) {
34        if self.is_shut_down {
35            return;
36        }
37
38        self.shutdown_inner();
39        self.is_shut_down = true;
40    }
41}
42
43struct Recovery {
44    recovered: Vec<(u64, NonZeroU64, InlineArray)>,
45    id_for_next_log: u64,
46    snapshot_size: u64,
47}
48
49struct LogAndStats {
50    file: fs::File,
51    bytes_written: u64,
52    log_sequence_number: u64,
53}
54
55enum WorkerMessage {
56    Shutdown(Sender<()>),
57    LogReadyToCompact { log_and_stats: LogAndStats },
58}
59
60fn get_compactions(rx: &mut Receiver<WorkerMessage>) -> Result<Vec<u64>, Option<Sender<()>>> {
61    let mut ret = vec![];
62
63    match rx.recv() {
64        Ok(WorkerMessage::Shutdown(tx)) => {
65            return Err(Some(tx));
66        }
67        Ok(WorkerMessage::LogReadyToCompact { log_and_stats }) => {
68            ret.push(log_and_stats.log_sequence_number);
69        }
70        Err(e) => {
71            log::error!(
72                    "metadata store worker thread unable to receive message, unexpected shutdown: {e:?}"
73                );
74            return Err(None);
75        }
76    }
77
78    // scoop up any additional logs that have built up while we were busy compacting
79    loop {
80        match rx.try_recv() {
81            Ok(WorkerMessage::Shutdown(tx)) => {
82                tx.send(()).unwrap();
83                return Err(Some(tx));
84            }
85            Ok(WorkerMessage::LogReadyToCompact { log_and_stats }) => {
86                ret.push(log_and_stats.log_sequence_number);
87            }
88            Err(_timeout) => return Ok(ret),
89        }
90    }
91}
92
93fn worker(mut rx: Receiver<WorkerMessage>, mut last_snapshot_lsn: u64, inner: Inner) {
94    loop {
95        let err_ptr: *const (io::ErrorKind, String) = inner.global_error.load(Ordering::Acquire);
96
97        if !err_ptr.is_null() {
98            log::error!("compaction thread prematurely terminating after global error set");
99            return;
100        }
101
102        match get_compactions(&mut rx) {
103            Ok(log_ids) => {
104                assert_eq!(log_ids[0], last_snapshot_lsn + 1);
105
106                let write_res = read_snapshot_and_apply_logs(
107                    &inner.storage_directory,
108                    log_ids.into_iter().collect(),
109                    Some(last_snapshot_lsn),
110                );
111                match write_res {
112                    Err(e) => {
113                        set_error(&inner.global_error, &e);
114                        log::error!("log compactor thread encountered error - setting global fatal error and shutting down compactions");
115                        return;
116                    }
117                    Ok(recovery) => {
118                        inner
119                            .snapshot_size
120                            .store(recovery.snapshot_size, Ordering::Release);
121                        last_snapshot_lsn = recovery.id_for_next_log.checked_sub(1).unwrap();
122                    }
123                }
124            }
125            Err(Some(tx)) => {
126                drop(inner);
127                if let Err(e) = tx.send(()) {
128                    log::error!("log compactor failed to send shutdown ack to system: {e:?}");
129                }
130                return;
131            }
132            Err(None) => {
133                return;
134            }
135        }
136    }
137}
138
139fn set_error(global_error: &AtomicPtr<(io::ErrorKind, String)>, error: &io::Error) {
140    let kind = error.kind();
141    let reason = error.to_string();
142
143    let boxed = Box::new((kind, reason));
144    let ptr = Box::into_raw(boxed);
145
146    if global_error
147        .compare_exchange(
148            std::ptr::null_mut(),
149            ptr,
150            Ordering::SeqCst,
151            Ordering::SeqCst,
152        )
153        .is_err()
154    {
155        // global fatal error already installed, drop this one
156        unsafe {
157            drop(Box::from_raw(ptr));
158        }
159    }
160}
161
162#[derive(Clone)]
163struct Inner {
164    global_error: Arc<AtomicPtr<(io::ErrorKind, String)>>,
165    active_log: Arc<Mutex<LogAndStats>>,
166    snapshot_size: Arc<AtomicU64>,
167    storage_directory: PathBuf,
168    #[allow(unused)]
169    directory_lock: Arc<fs::File>,
170    worker_outbox: Sender<WorkerMessage>,
171}
172
173impl Drop for Inner {
174    fn drop(&mut self) {
175        let error_ptr = self.global_error.load(Ordering::Acquire);
176        if !error_ptr.is_null() {
177            unsafe {
178                drop(Box::from_raw(error_ptr));
179            }
180        }
181    }
182}
183
184impl MetadataStore {
185    pub fn shutdown(mut self) {
186        self.shutdown_inner();
187    }
188
189    fn shutdown_inner(&mut self) {
190        let (tx, rx) = bounded(1);
191        if self
192            .inner
193            .worker_outbox
194            .send(WorkerMessage::Shutdown(tx))
195            .is_ok()
196        {
197            let _ = rx.recv();
198        }
199
200        self.set_error(&io::Error::new(
201            io::ErrorKind::Other,
202            "system has been shut down".to_string(),
203        ));
204
205        self.is_shut_down = true;
206    }
207
208    fn check_error(&self) -> io::Result<()> {
209        let err_ptr: *const (io::ErrorKind, String) =
210            self.inner.global_error.load(Ordering::Acquire);
211
212        if err_ptr.is_null() {
213            Ok(())
214        } else {
215            let deref: &(io::ErrorKind, String) = unsafe { &*err_ptr };
216            Err(io::Error::new(deref.0, deref.1.clone()))
217        }
218    }
219
220    fn set_error(&self, error: &io::Error) {
221        set_error(&self.inner.global_error, error);
222    }
223
224    /// Returns the writer handle `MetadataStore`, a sorted array of metadata, and a sorted array
225    /// of free keys.
226    pub fn recover<P: AsRef<Path>>(
227        storage_directory: P,
228    ) -> io::Result<(
229        // Metadata writer
230        MetadataStore,
231        // Metadata - key, value, user data
232        Vec<(u64, NonZeroU64, InlineArray)>,
233    )> {
234        use fs2::FileExt;
235
236        let path = storage_directory.as_ref();
237
238        // initialize directories if not present
239        if let Err(e) = fs::read_dir(&path) {
240            if e.kind() == io::ErrorKind::NotFound {
241                fallible!(fs::create_dir_all(&path));
242            }
243        }
244
245        let _ = fs::File::create(path.join(WARN));
246
247        let directory_lock = fallible!(fs::File::open(path));
248        fallible!(directory_lock.try_lock_exclusive());
249
250        let recovery = MetadataStore::recover_inner(&storage_directory)?;
251
252        let new_log = LogAndStats {
253            log_sequence_number: recovery.id_for_next_log,
254            bytes_written: 0,
255            file: fallible!(fs::File::create(log_path(path, recovery.id_for_next_log))),
256        };
257
258        let (tx, rx) = unbounded();
259
260        let inner = Inner {
261            snapshot_size: Arc::new(recovery.snapshot_size.into()),
262            storage_directory: path.into(),
263            directory_lock: Arc::new(directory_lock),
264            global_error: Default::default(),
265            active_log: Arc::new(Mutex::new(new_log)),
266            worker_outbox: tx,
267        };
268
269        let worker_inner = inner.clone();
270        std::thread::spawn(move || {
271            worker(
272                rx,
273                recovery.id_for_next_log.checked_sub(1).unwrap(),
274                worker_inner,
275            )
276        });
277
278        Ok((
279            MetadataStore {
280                inner,
281                is_shut_down: false,
282            },
283            recovery.recovered,
284        ))
285    }
286
287    /// Returns the recovered mappings, the id for the next log file, the highest allocated object id, and the set of free ids
288    fn recover_inner<P: AsRef<Path>>(storage_directory: P) -> io::Result<Recovery> {
289        let path = storage_directory.as_ref();
290        log::debug!("opening u64db at {:?}", path);
291
292        // lock the main storage directory
293        let mut file_lock_opts = fs::OpenOptions::new();
294        file_lock_opts.create(false).read(false).write(false);
295
296        let (log_ids, snapshot_id_opt) = enumerate_logs_and_snapshot(&path)?;
297
298        read_snapshot_and_apply_logs(path, log_ids, snapshot_id_opt)
299    }
300
301    /// Write a batch of metadata. `None` for the second half of the outer tuple represents a
302    /// deletion.
303    pub fn insert_batch<I: IntoIterator<Item = (u64, Option<(NonZeroU64, InlineArray)>)>>(
304        &self,
305        batch: I,
306    ) -> io::Result<()> {
307        self.check_error()?;
308
309        let batch_bytes = serialize_batch(batch);
310
311        let mut log = self.inner.active_log.lock();
312
313        if let Err(e) = maybe!(log.file.write_all(&batch_bytes)) {
314            self.set_error(&e);
315            return Err(e);
316        }
317
318        if let Err(e) = maybe!(log.file.sync_all()) {
319            self.set_error(&e);
320            return Err(e);
321        }
322
323        log.bytes_written += batch_bytes.len() as u64;
324
325        if log.bytes_written
326            > self
327                .inner
328                .snapshot_size
329                .load(Ordering::Acquire)
330                .max(64 * 1024)
331        {
332            let next_offset = log.log_sequence_number + 1;
333            let next_path = log_path(&self.inner.storage_directory, next_offset);
334
335            // open new log
336            let mut next_log_file_opts = fs::OpenOptions::new();
337            next_log_file_opts.create(true).read(true).write(true);
338
339            let next_log_file = match maybe!(next_log_file_opts.open(next_path)) {
340                Ok(nlf) => nlf,
341                Err(e) => {
342                    self.set_error(&e);
343                    return Err(e);
344                }
345            };
346
347            let next_log_and_stats = LogAndStats {
348                file: next_log_file,
349                log_sequence_number: next_offset,
350                bytes_written: 0,
351            };
352
353            // replace log
354            let old_log_and_stats = std::mem::replace(&mut *log, next_log_and_stats);
355
356            // send to snapshot writer
357            self.inner
358                .worker_outbox
359                .send(WorkerMessage::LogReadyToCompact {
360                    log_and_stats: old_log_and_stats,
361                })
362                .expect("unable to send log to compact to worker");
363        }
364
365        Ok(())
366    }
367}
368
369fn serialize_batch<I: IntoIterator<Item = (u64, Option<(NonZeroU64, InlineArray)>)>>(
370    batch: I,
371) -> Vec<u8> {
372    // we initialize the vector to contain placeholder bytes for the frame length
373    let batch_bytes = 0_u64.to_le_bytes().to_vec();
374
375    // write format:
376    //  8 byte LE frame length (in bytes, not items)
377    //  payload:
378    //      zstd encoded 8 byte LE key
379    //      zstd encoded 8 byte LE value
380    //      repeated for each kv pair
381    //  LE encoded crc32 of length + payload raw bytes, XOR 0xAF to make non-zero in empty case
382    let mut batch_encoder = ZstdEncoder::new(batch_bytes, ZSTD_LEVEL).unwrap();
383
384    for (k, v_opt) in batch {
385        batch_encoder.write_all(&k.to_le_bytes()).unwrap();
386        if let Some((v, user_data)) = v_opt {
387            batch_encoder.write_all(&v.get().to_le_bytes()).unwrap();
388
389            let user_data_len: u64 = user_data.len() as u64;
390            batch_encoder
391                .write_all(&user_data_len.to_le_bytes())
392                .unwrap();
393            batch_encoder.write_all(&user_data).unwrap();
394        } else {
395            // v
396            batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap();
397            // user data len
398            batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap();
399        }
400    }
401
402    let mut batch_bytes = batch_encoder.finish().unwrap();
403
404    let batch_len = batch_bytes.len().checked_sub(8).unwrap();
405    batch_bytes[..8].copy_from_slice(&batch_len.to_le_bytes());
406
407    let hash: u32 = crc32fast::hash(&batch_bytes) ^ 0xAF;
408    let hash_bytes: [u8; 4] = hash.to_le_bytes();
409    batch_bytes.extend_from_slice(&hash_bytes);
410
411    batch_bytes
412}
413
414fn read_frame(
415    file: &mut fs::File,
416    reusable_frame_buffer: &mut Vec<u8>,
417) -> io::Result<Vec<(u64, (u64, InlineArray))>> {
418    let mut frame_size_buf: [u8; 8] = [0; 8];
419    // TODO only break if UnexpectedEof, otherwise propagate
420    fallible!(file.read_exact(&mut frame_size_buf));
421
422    let len_u64: u64 = u64::from_le_bytes(frame_size_buf);
423    // TODO make sure len < max len
424    let len: usize = usize::try_from(len_u64).unwrap();
425
426    reusable_frame_buffer.clear();
427    reusable_frame_buffer.reserve(len + 12);
428    unsafe {
429        reusable_frame_buffer.set_len(len + 12);
430    }
431    reusable_frame_buffer[..8].copy_from_slice(&frame_size_buf);
432
433    fallible!(file.read_exact(&mut reusable_frame_buffer[8..]));
434
435    let crc_actual = crc32fast::hash(&reusable_frame_buffer[..len + 8]) ^ 0xAF;
436    let crc_recorded = u32::from_le_bytes([
437        reusable_frame_buffer[len + 8],
438        reusable_frame_buffer[len + 9],
439        reusable_frame_buffer[len + 10],
440        reusable_frame_buffer[len + 11],
441    ]);
442
443    if crc_actual != crc_recorded {
444        log::warn!("encountered incorrect crc for batch in log");
445        return Err(annotate!(io::Error::new(
446            io::ErrorKind::InvalidData,
447            "crc mismatch for read of batch frame",
448        )));
449    }
450
451    let mut ret = vec![];
452
453    let mut decoder = ZstdDecoder::new(&reusable_frame_buffer[8..len + 8])
454        .expect("failed to create zstd decoder");
455
456    let mut k_buf: [u8; 8] = [0; 8];
457    let mut v_buf: [u8; 8] = [0; 8];
458    let mut user_data_len_buf: [u8; 8] = [0; 8];
459    let mut user_data_buf = vec![];
460    loop {
461        let first_read_res = decoder.read_exact(&mut k_buf);
462        if let Err(e) = first_read_res {
463            if e.kind() != io::ErrorKind::UnexpectedEof {
464                return Err(e);
465            } else {
466                break;
467            }
468        }
469        decoder
470            .read_exact(&mut v_buf)
471            .expect("we expect reads from crc-verified buffers to succeed");
472        decoder
473            .read_exact(&mut user_data_len_buf)
474            .expect("we expect reads from crc-verified buffers to succeed");
475
476        let k = u64::from_le_bytes(k_buf);
477        let v = u64::from_le_bytes(v_buf);
478
479        let user_data_len_raw = u64::from_le_bytes(user_data_len_buf);
480        let user_data_len = usize::try_from(user_data_len_raw).unwrap();
481        user_data_buf.reserve(user_data_len);
482        unsafe {
483            user_data_buf.set_len(user_data_len);
484        }
485
486        decoder
487            .read_exact(&mut user_data_buf)
488            .expect("we expect reads from crc-verified buffers to succeed");
489
490        let user_data = InlineArray::from(&*user_data_buf);
491
492        ret.push((k, (v, user_data)));
493    }
494
495    Ok(ret)
496}
497
498// returns the deduplicated data in this log, along with an optional offset where a
499// final torn write occurred.
500fn read_log(directory_path: &Path, lsn: u64) -> io::Result<FnvHashMap<u64, (u64, InlineArray)>> {
501    log::info!("reading log {lsn}");
502    let mut ret = FnvHashMap::default();
503
504    let mut file = fallible!(fs::File::open(log_path(directory_path, lsn)));
505
506    let mut reusable_frame_buffer: Vec<u8> = vec![];
507
508    while let Ok(frame) = read_frame(&mut file, &mut reusable_frame_buffer) {
509        for (k, v) in frame.into_iter() {
510            ret.insert(k, v);
511        }
512    }
513
514    log::info!("recovered {} items in log {}", ret.len(), lsn);
515
516    Ok(ret)
517}
518
519/// returns the data from the snapshot as well as the size of the snapshot
520fn read_snapshot(
521    directory_path: &Path,
522    lsn: u64,
523) -> io::Result<(FnvHashMap<u64, (NonZeroU64, InlineArray)>, u64)> {
524    log::info!("reading snapshot {lsn}");
525    let mut reusable_frame_buffer: Vec<u8> = vec![];
526    let mut file = fallible!(fs::File::open(snapshot_path(directory_path, lsn, false)));
527    let size = fallible!(file.metadata()).len();
528    let raw_frame = read_frame(&mut file, &mut reusable_frame_buffer)?;
529
530    let frame: FnvHashMap<u64, (NonZeroU64, InlineArray)> = raw_frame
531        .into_iter()
532        .map(|(k, (v, user_data))| (k, (NonZeroU64::new(v).unwrap(), user_data)))
533        .collect();
534
535    log::info!("recovered {} items in snapshot {}", frame.len(), lsn);
536
537    Ok((frame, size))
538}
539
540fn log_path(directory_path: &Path, id: u64) -> PathBuf {
541    directory_path.join(format!("{LOG_PREFIX}_{:016x}", id))
542}
543
544fn snapshot_path(directory_path: &Path, id: u64, temporary: bool) -> PathBuf {
545    if temporary {
546        directory_path.join(format!("{SNAPSHOT_PREFIX}_{:016x}{TMP_SUFFIX}", id))
547    } else {
548        directory_path.join(format!("{SNAPSHOT_PREFIX}_{:016x}", id))
549    }
550}
551
552fn enumerate_logs_and_snapshot(directory_path: &Path) -> io::Result<(BTreeSet<u64>, Option<u64>)> {
553    let mut logs = BTreeSet::new();
554    let mut snapshot: Option<u64> = None;
555
556    for dir_entry_res in fallible!(fs::read_dir(directory_path)) {
557        let dir_entry = fallible!(dir_entry_res);
558        let file_name = if let Ok(f) = dir_entry.file_name().into_string() {
559            f
560        } else {
561            log::warn!(
562                "skipping unexpected file with non-unicode name {:?}",
563                dir_entry.file_name()
564            );
565            continue;
566        };
567
568        if file_name.ends_with(TMP_SUFFIX) {
569            log::warn!("removing incomplete snapshot rewrite {file_name:?}");
570            fallible!(fs::remove_file(directory_path.join(file_name)));
571        } else if file_name.starts_with(LOG_PREFIX) {
572            let start = LOG_PREFIX.len() + 1;
573            let stop = start + 16;
574
575            if let Ok(id) = u64::from_str_radix(&file_name[start..stop], 16) {
576                logs.insert(id);
577            } else {
578                todo!()
579            }
580        } else if file_name.starts_with(SNAPSHOT_PREFIX) {
581            let start = SNAPSHOT_PREFIX.len() + 1;
582            let stop = start + 16;
583
584            if let Ok(id) = u64::from_str_radix(&file_name[start..stop], 16) {
585                if let Some(snap_id) = snapshot {
586                    if snap_id < id {
587                        log::warn!(
588                            "removing stale snapshot {id} that is superceded by snapshot {id}"
589                        );
590
591                        fallible!(fs::remove_file(file_name));
592
593                        snapshot = Some(id);
594                    }
595                } else {
596                    snapshot = Some(id);
597                }
598            } else {
599                todo!()
600            }
601        }
602    }
603
604    let snap_id = snapshot.unwrap_or(0);
605    for stale_log_id in logs.range(..=snap_id) {
606        let file_name = log_path(directory_path, *stale_log_id);
607
608        log::warn!("removing stale log {file_name:?} that is contained within snapshot {snap_id}");
609
610        fallible!(fs::remove_file(file_name));
611    }
612    logs.retain(|l| *l > snap_id);
613
614    Ok((logs, snapshot))
615}
616
617fn read_snapshot_and_apply_logs(
618    path: &Path,
619    log_ids: BTreeSet<u64>,
620    snapshot_id_opt: Option<u64>,
621) -> io::Result<Recovery> {
622    let (snapshot_tx, snapshot_rx) = bounded(1);
623    if let Some(snapshot_id) = snapshot_id_opt {
624        let path: PathBuf = path.into();
625        rayon::spawn(move || {
626            let snap_res =
627                read_snapshot(&path, snapshot_id).map(|(snapshot, _snapshot_len)| snapshot);
628            snapshot_tx.send(snap_res).unwrap();
629        });
630    } else {
631        snapshot_tx.send(Ok(Default::default())).unwrap();
632    }
633
634    let mut max_log_id = snapshot_id_opt.unwrap_or(0);
635
636    let log_data_res: io::Result<Vec<(u64, FnvHashMap<u64, (u64, InlineArray)>)>> =
637        (&log_ids) //.iter().collect::<Vec<_>>())
638            .into_par_iter()
639            .map(move |log_id| {
640                if let Some(snapshot_id) = snapshot_id_opt {
641                    assert!(*log_id > snapshot_id);
642                }
643
644                let log_datum = read_log(&path, *log_id)?;
645
646                Ok((*log_id, log_datum))
647            })
648            .collect();
649
650    let mut recovered: FnvHashMap<u64, (NonZeroU64, InlineArray)> = snapshot_rx.recv().unwrap()?;
651
652    for (log_id, log_datum) in log_data_res? {
653        max_log_id = max_log_id.max(log_id);
654
655        for (k, (v, user_data)) in log_datum {
656            if v == 0 {
657                recovered.remove(&k);
658            } else {
659                recovered.insert(k, (NonZeroU64::new(v).unwrap(), user_data));
660            }
661        }
662    }
663
664    let mut recovered: Vec<(u64, NonZeroU64, InlineArray)> = recovered
665        .into_iter()
666        .map(|(k, (v, user_data))| (k, v, user_data))
667        .collect();
668
669    recovered.par_sort_unstable();
670
671    // write fresh snapshot with recovered data
672    let new_snapshot_data = serialize_batch(
673        recovered
674            .iter()
675            .map(|(k, v, user_data)| (*k, Some((*v, user_data.clone())))),
676    );
677    let snapshot_size = new_snapshot_data.len() as u64;
678
679    let new_snapshot_tmp_path = snapshot_path(path, max_log_id, true);
680    log::info!("writing snapshot to {new_snapshot_tmp_path:?}");
681
682    let mut snapshot_file_opts = fs::OpenOptions::new();
683    snapshot_file_opts.create(true).read(false).write(true);
684
685    let mut snapshot_file = fallible!(snapshot_file_opts.open(&new_snapshot_tmp_path));
686
687    fallible!(snapshot_file.write_all(&new_snapshot_data));
688    drop(new_snapshot_data);
689
690    fallible!(snapshot_file.sync_all());
691
692    let new_snapshot_path = snapshot_path(path, max_log_id, false);
693    log::info!("renaming written snapshot to {new_snapshot_path:?}");
694    fallible!(fs::rename(new_snapshot_tmp_path, new_snapshot_path));
695    fallible!(fs::File::open(path).and_then(|directory| directory.sync_all()));
696
697    for log_id in &log_ids {
698        let log_path = log_path(path, *log_id);
699        fallible!(fs::remove_file(log_path));
700    }
701
702    if let Some(old_snapshot_id) = snapshot_id_opt {
703        let old_snapshot_path = snapshot_path(path, old_snapshot_id, false);
704        fallible!(fs::remove_file(old_snapshot_path));
705    }
706
707    Ok(Recovery {
708        recovered,
709        id_for_next_log: max_log_id + 1,
710        snapshot_size,
711    })
712}