1use std::collections::BTreeSet;
2use std::fs;
3use std::io::{self, Read, Write};
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::{
7 atomic::{AtomicPtr, AtomicU64, Ordering},
8 Arc,
9};
10
11use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
12use fault_injection::{annotate, fallible, maybe};
13use fnv::FnvHashMap;
14use inline_array::InlineArray;
15use parking_lot::Mutex;
16use rayon::prelude::*;
17use zstd::stream::read::Decoder as ZstdDecoder;
18use zstd::stream::write::Encoder as ZstdEncoder;
19
20const WARN: &str = "DO_NOT_PUT_YOUR_FILES_HERE";
21const TMP_SUFFIX: &str = ".tmp";
22const LOG_PREFIX: &str = "log";
23const SNAPSHOT_PREFIX: &str = "snapshot";
24
25const ZSTD_LEVEL: i32 = 3;
26
27pub struct MetadataStore {
28 inner: Inner,
29 is_shut_down: bool,
30}
31
32impl Drop for MetadataStore {
33 fn drop(&mut self) {
34 if self.is_shut_down {
35 return;
36 }
37
38 self.shutdown_inner();
39 self.is_shut_down = true;
40 }
41}
42
43struct Recovery {
44 recovered: Vec<(u64, NonZeroU64, InlineArray)>,
45 id_for_next_log: u64,
46 snapshot_size: u64,
47}
48
49struct LogAndStats {
50 file: fs::File,
51 bytes_written: u64,
52 log_sequence_number: u64,
53}
54
55enum WorkerMessage {
56 Shutdown(Sender<()>),
57 LogReadyToCompact { log_and_stats: LogAndStats },
58}
59
60fn get_compactions(rx: &mut Receiver<WorkerMessage>) -> Result<Vec<u64>, Option<Sender<()>>> {
61 let mut ret = vec![];
62
63 match rx.recv() {
64 Ok(WorkerMessage::Shutdown(tx)) => {
65 return Err(Some(tx));
66 }
67 Ok(WorkerMessage::LogReadyToCompact { log_and_stats }) => {
68 ret.push(log_and_stats.log_sequence_number);
69 }
70 Err(e) => {
71 log::error!(
72 "metadata store worker thread unable to receive message, unexpected shutdown: {e:?}"
73 );
74 return Err(None);
75 }
76 }
77
78 loop {
80 match rx.try_recv() {
81 Ok(WorkerMessage::Shutdown(tx)) => {
82 tx.send(()).unwrap();
83 return Err(Some(tx));
84 }
85 Ok(WorkerMessage::LogReadyToCompact { log_and_stats }) => {
86 ret.push(log_and_stats.log_sequence_number);
87 }
88 Err(_timeout) => return Ok(ret),
89 }
90 }
91}
92
93fn worker(mut rx: Receiver<WorkerMessage>, mut last_snapshot_lsn: u64, inner: Inner) {
94 loop {
95 let err_ptr: *const (io::ErrorKind, String) = inner.global_error.load(Ordering::Acquire);
96
97 if !err_ptr.is_null() {
98 log::error!("compaction thread prematurely terminating after global error set");
99 return;
100 }
101
102 match get_compactions(&mut rx) {
103 Ok(log_ids) => {
104 assert_eq!(log_ids[0], last_snapshot_lsn + 1);
105
106 let write_res = read_snapshot_and_apply_logs(
107 &inner.storage_directory,
108 log_ids.into_iter().collect(),
109 Some(last_snapshot_lsn),
110 );
111 match write_res {
112 Err(e) => {
113 set_error(&inner.global_error, &e);
114 log::error!("log compactor thread encountered error - setting global fatal error and shutting down compactions");
115 return;
116 }
117 Ok(recovery) => {
118 inner
119 .snapshot_size
120 .store(recovery.snapshot_size, Ordering::Release);
121 last_snapshot_lsn = recovery.id_for_next_log.checked_sub(1).unwrap();
122 }
123 }
124 }
125 Err(Some(tx)) => {
126 drop(inner);
127 if let Err(e) = tx.send(()) {
128 log::error!("log compactor failed to send shutdown ack to system: {e:?}");
129 }
130 return;
131 }
132 Err(None) => {
133 return;
134 }
135 }
136 }
137}
138
139fn set_error(global_error: &AtomicPtr<(io::ErrorKind, String)>, error: &io::Error) {
140 let kind = error.kind();
141 let reason = error.to_string();
142
143 let boxed = Box::new((kind, reason));
144 let ptr = Box::into_raw(boxed);
145
146 if global_error
147 .compare_exchange(
148 std::ptr::null_mut(),
149 ptr,
150 Ordering::SeqCst,
151 Ordering::SeqCst,
152 )
153 .is_err()
154 {
155 unsafe {
157 drop(Box::from_raw(ptr));
158 }
159 }
160}
161
162#[derive(Clone)]
163struct Inner {
164 global_error: Arc<AtomicPtr<(io::ErrorKind, String)>>,
165 active_log: Arc<Mutex<LogAndStats>>,
166 snapshot_size: Arc<AtomicU64>,
167 storage_directory: PathBuf,
168 #[allow(unused)]
169 directory_lock: Arc<fs::File>,
170 worker_outbox: Sender<WorkerMessage>,
171}
172
173impl Drop for Inner {
174 fn drop(&mut self) {
175 let error_ptr = self.global_error.load(Ordering::Acquire);
176 if !error_ptr.is_null() {
177 unsafe {
178 drop(Box::from_raw(error_ptr));
179 }
180 }
181 }
182}
183
184impl MetadataStore {
185 pub fn shutdown(mut self) {
186 self.shutdown_inner();
187 }
188
189 fn shutdown_inner(&mut self) {
190 let (tx, rx) = bounded(1);
191 if self
192 .inner
193 .worker_outbox
194 .send(WorkerMessage::Shutdown(tx))
195 .is_ok()
196 {
197 let _ = rx.recv();
198 }
199
200 self.set_error(&io::Error::new(
201 io::ErrorKind::Other,
202 "system has been shut down".to_string(),
203 ));
204
205 self.is_shut_down = true;
206 }
207
208 fn check_error(&self) -> io::Result<()> {
209 let err_ptr: *const (io::ErrorKind, String) =
210 self.inner.global_error.load(Ordering::Acquire);
211
212 if err_ptr.is_null() {
213 Ok(())
214 } else {
215 let deref: &(io::ErrorKind, String) = unsafe { &*err_ptr };
216 Err(io::Error::new(deref.0, deref.1.clone()))
217 }
218 }
219
220 fn set_error(&self, error: &io::Error) {
221 set_error(&self.inner.global_error, error);
222 }
223
224 pub fn recover<P: AsRef<Path>>(
227 storage_directory: P,
228 ) -> io::Result<(
229 MetadataStore,
231 Vec<(u64, NonZeroU64, InlineArray)>,
233 )> {
234 use fs2::FileExt;
235
236 let path = storage_directory.as_ref();
237
238 if let Err(e) = fs::read_dir(&path) {
240 if e.kind() == io::ErrorKind::NotFound {
241 fallible!(fs::create_dir_all(&path));
242 }
243 }
244
245 let _ = fs::File::create(path.join(WARN));
246
247 let directory_lock = fallible!(fs::File::open(path));
248 fallible!(directory_lock.try_lock_exclusive());
249
250 let recovery = MetadataStore::recover_inner(&storage_directory)?;
251
252 let new_log = LogAndStats {
253 log_sequence_number: recovery.id_for_next_log,
254 bytes_written: 0,
255 file: fallible!(fs::File::create(log_path(path, recovery.id_for_next_log))),
256 };
257
258 let (tx, rx) = unbounded();
259
260 let inner = Inner {
261 snapshot_size: Arc::new(recovery.snapshot_size.into()),
262 storage_directory: path.into(),
263 directory_lock: Arc::new(directory_lock),
264 global_error: Default::default(),
265 active_log: Arc::new(Mutex::new(new_log)),
266 worker_outbox: tx,
267 };
268
269 let worker_inner = inner.clone();
270 std::thread::spawn(move || {
271 worker(
272 rx,
273 recovery.id_for_next_log.checked_sub(1).unwrap(),
274 worker_inner,
275 )
276 });
277
278 Ok((
279 MetadataStore {
280 inner,
281 is_shut_down: false,
282 },
283 recovery.recovered,
284 ))
285 }
286
287 fn recover_inner<P: AsRef<Path>>(storage_directory: P) -> io::Result<Recovery> {
289 let path = storage_directory.as_ref();
290 log::debug!("opening u64db at {:?}", path);
291
292 let mut file_lock_opts = fs::OpenOptions::new();
294 file_lock_opts.create(false).read(false).write(false);
295
296 let (log_ids, snapshot_id_opt) = enumerate_logs_and_snapshot(&path)?;
297
298 read_snapshot_and_apply_logs(path, log_ids, snapshot_id_opt)
299 }
300
301 pub fn insert_batch<I: IntoIterator<Item = (u64, Option<(NonZeroU64, InlineArray)>)>>(
304 &self,
305 batch: I,
306 ) -> io::Result<()> {
307 self.check_error()?;
308
309 let batch_bytes = serialize_batch(batch);
310
311 let mut log = self.inner.active_log.lock();
312
313 if let Err(e) = maybe!(log.file.write_all(&batch_bytes)) {
314 self.set_error(&e);
315 return Err(e);
316 }
317
318 if let Err(e) = maybe!(log.file.sync_all()) {
319 self.set_error(&e);
320 return Err(e);
321 }
322
323 log.bytes_written += batch_bytes.len() as u64;
324
325 if log.bytes_written
326 > self
327 .inner
328 .snapshot_size
329 .load(Ordering::Acquire)
330 .max(64 * 1024)
331 {
332 let next_offset = log.log_sequence_number + 1;
333 let next_path = log_path(&self.inner.storage_directory, next_offset);
334
335 let mut next_log_file_opts = fs::OpenOptions::new();
337 next_log_file_opts.create(true).read(true).write(true);
338
339 let next_log_file = match maybe!(next_log_file_opts.open(next_path)) {
340 Ok(nlf) => nlf,
341 Err(e) => {
342 self.set_error(&e);
343 return Err(e);
344 }
345 };
346
347 let next_log_and_stats = LogAndStats {
348 file: next_log_file,
349 log_sequence_number: next_offset,
350 bytes_written: 0,
351 };
352
353 let old_log_and_stats = std::mem::replace(&mut *log, next_log_and_stats);
355
356 self.inner
358 .worker_outbox
359 .send(WorkerMessage::LogReadyToCompact {
360 log_and_stats: old_log_and_stats,
361 })
362 .expect("unable to send log to compact to worker");
363 }
364
365 Ok(())
366 }
367}
368
369fn serialize_batch<I: IntoIterator<Item = (u64, Option<(NonZeroU64, InlineArray)>)>>(
370 batch: I,
371) -> Vec<u8> {
372 let batch_bytes = 0_u64.to_le_bytes().to_vec();
374
375 let mut batch_encoder = ZstdEncoder::new(batch_bytes, ZSTD_LEVEL).unwrap();
383
384 for (k, v_opt) in batch {
385 batch_encoder.write_all(&k.to_le_bytes()).unwrap();
386 if let Some((v, user_data)) = v_opt {
387 batch_encoder.write_all(&v.get().to_le_bytes()).unwrap();
388
389 let user_data_len: u64 = user_data.len() as u64;
390 batch_encoder
391 .write_all(&user_data_len.to_le_bytes())
392 .unwrap();
393 batch_encoder.write_all(&user_data).unwrap();
394 } else {
395 batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap();
397 batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap();
399 }
400 }
401
402 let mut batch_bytes = batch_encoder.finish().unwrap();
403
404 let batch_len = batch_bytes.len().checked_sub(8).unwrap();
405 batch_bytes[..8].copy_from_slice(&batch_len.to_le_bytes());
406
407 let hash: u32 = crc32fast::hash(&batch_bytes) ^ 0xAF;
408 let hash_bytes: [u8; 4] = hash.to_le_bytes();
409 batch_bytes.extend_from_slice(&hash_bytes);
410
411 batch_bytes
412}
413
414fn read_frame(
415 file: &mut fs::File,
416 reusable_frame_buffer: &mut Vec<u8>,
417) -> io::Result<Vec<(u64, (u64, InlineArray))>> {
418 let mut frame_size_buf: [u8; 8] = [0; 8];
419 fallible!(file.read_exact(&mut frame_size_buf));
421
422 let len_u64: u64 = u64::from_le_bytes(frame_size_buf);
423 let len: usize = usize::try_from(len_u64).unwrap();
425
426 reusable_frame_buffer.clear();
427 reusable_frame_buffer.reserve(len + 12);
428 unsafe {
429 reusable_frame_buffer.set_len(len + 12);
430 }
431 reusable_frame_buffer[..8].copy_from_slice(&frame_size_buf);
432
433 fallible!(file.read_exact(&mut reusable_frame_buffer[8..]));
434
435 let crc_actual = crc32fast::hash(&reusable_frame_buffer[..len + 8]) ^ 0xAF;
436 let crc_recorded = u32::from_le_bytes([
437 reusable_frame_buffer[len + 8],
438 reusable_frame_buffer[len + 9],
439 reusable_frame_buffer[len + 10],
440 reusable_frame_buffer[len + 11],
441 ]);
442
443 if crc_actual != crc_recorded {
444 log::warn!("encountered incorrect crc for batch in log");
445 return Err(annotate!(io::Error::new(
446 io::ErrorKind::InvalidData,
447 "crc mismatch for read of batch frame",
448 )));
449 }
450
451 let mut ret = vec![];
452
453 let mut decoder = ZstdDecoder::new(&reusable_frame_buffer[8..len + 8])
454 .expect("failed to create zstd decoder");
455
456 let mut k_buf: [u8; 8] = [0; 8];
457 let mut v_buf: [u8; 8] = [0; 8];
458 let mut user_data_len_buf: [u8; 8] = [0; 8];
459 let mut user_data_buf = vec![];
460 loop {
461 let first_read_res = decoder.read_exact(&mut k_buf);
462 if let Err(e) = first_read_res {
463 if e.kind() != io::ErrorKind::UnexpectedEof {
464 return Err(e);
465 } else {
466 break;
467 }
468 }
469 decoder
470 .read_exact(&mut v_buf)
471 .expect("we expect reads from crc-verified buffers to succeed");
472 decoder
473 .read_exact(&mut user_data_len_buf)
474 .expect("we expect reads from crc-verified buffers to succeed");
475
476 let k = u64::from_le_bytes(k_buf);
477 let v = u64::from_le_bytes(v_buf);
478
479 let user_data_len_raw = u64::from_le_bytes(user_data_len_buf);
480 let user_data_len = usize::try_from(user_data_len_raw).unwrap();
481 user_data_buf.reserve(user_data_len);
482 unsafe {
483 user_data_buf.set_len(user_data_len);
484 }
485
486 decoder
487 .read_exact(&mut user_data_buf)
488 .expect("we expect reads from crc-verified buffers to succeed");
489
490 let user_data = InlineArray::from(&*user_data_buf);
491
492 ret.push((k, (v, user_data)));
493 }
494
495 Ok(ret)
496}
497
498fn read_log(directory_path: &Path, lsn: u64) -> io::Result<FnvHashMap<u64, (u64, InlineArray)>> {
501 log::info!("reading log {lsn}");
502 let mut ret = FnvHashMap::default();
503
504 let mut file = fallible!(fs::File::open(log_path(directory_path, lsn)));
505
506 let mut reusable_frame_buffer: Vec<u8> = vec![];
507
508 while let Ok(frame) = read_frame(&mut file, &mut reusable_frame_buffer) {
509 for (k, v) in frame.into_iter() {
510 ret.insert(k, v);
511 }
512 }
513
514 log::info!("recovered {} items in log {}", ret.len(), lsn);
515
516 Ok(ret)
517}
518
519fn read_snapshot(
521 directory_path: &Path,
522 lsn: u64,
523) -> io::Result<(FnvHashMap<u64, (NonZeroU64, InlineArray)>, u64)> {
524 log::info!("reading snapshot {lsn}");
525 let mut reusable_frame_buffer: Vec<u8> = vec![];
526 let mut file = fallible!(fs::File::open(snapshot_path(directory_path, lsn, false)));
527 let size = fallible!(file.metadata()).len();
528 let raw_frame = read_frame(&mut file, &mut reusable_frame_buffer)?;
529
530 let frame: FnvHashMap<u64, (NonZeroU64, InlineArray)> = raw_frame
531 .into_iter()
532 .map(|(k, (v, user_data))| (k, (NonZeroU64::new(v).unwrap(), user_data)))
533 .collect();
534
535 log::info!("recovered {} items in snapshot {}", frame.len(), lsn);
536
537 Ok((frame, size))
538}
539
540fn log_path(directory_path: &Path, id: u64) -> PathBuf {
541 directory_path.join(format!("{LOG_PREFIX}_{:016x}", id))
542}
543
544fn snapshot_path(directory_path: &Path, id: u64, temporary: bool) -> PathBuf {
545 if temporary {
546 directory_path.join(format!("{SNAPSHOT_PREFIX}_{:016x}{TMP_SUFFIX}", id))
547 } else {
548 directory_path.join(format!("{SNAPSHOT_PREFIX}_{:016x}", id))
549 }
550}
551
552fn enumerate_logs_and_snapshot(directory_path: &Path) -> io::Result<(BTreeSet<u64>, Option<u64>)> {
553 let mut logs = BTreeSet::new();
554 let mut snapshot: Option<u64> = None;
555
556 for dir_entry_res in fallible!(fs::read_dir(directory_path)) {
557 let dir_entry = fallible!(dir_entry_res);
558 let file_name = if let Ok(f) = dir_entry.file_name().into_string() {
559 f
560 } else {
561 log::warn!(
562 "skipping unexpected file with non-unicode name {:?}",
563 dir_entry.file_name()
564 );
565 continue;
566 };
567
568 if file_name.ends_with(TMP_SUFFIX) {
569 log::warn!("removing incomplete snapshot rewrite {file_name:?}");
570 fallible!(fs::remove_file(directory_path.join(file_name)));
571 } else if file_name.starts_with(LOG_PREFIX) {
572 let start = LOG_PREFIX.len() + 1;
573 let stop = start + 16;
574
575 if let Ok(id) = u64::from_str_radix(&file_name[start..stop], 16) {
576 logs.insert(id);
577 } else {
578 todo!()
579 }
580 } else if file_name.starts_with(SNAPSHOT_PREFIX) {
581 let start = SNAPSHOT_PREFIX.len() + 1;
582 let stop = start + 16;
583
584 if let Ok(id) = u64::from_str_radix(&file_name[start..stop], 16) {
585 if let Some(snap_id) = snapshot {
586 if snap_id < id {
587 log::warn!(
588 "removing stale snapshot {id} that is superceded by snapshot {id}"
589 );
590
591 fallible!(fs::remove_file(file_name));
592
593 snapshot = Some(id);
594 }
595 } else {
596 snapshot = Some(id);
597 }
598 } else {
599 todo!()
600 }
601 }
602 }
603
604 let snap_id = snapshot.unwrap_or(0);
605 for stale_log_id in logs.range(..=snap_id) {
606 let file_name = log_path(directory_path, *stale_log_id);
607
608 log::warn!("removing stale log {file_name:?} that is contained within snapshot {snap_id}");
609
610 fallible!(fs::remove_file(file_name));
611 }
612 logs.retain(|l| *l > snap_id);
613
614 Ok((logs, snapshot))
615}
616
617fn read_snapshot_and_apply_logs(
618 path: &Path,
619 log_ids: BTreeSet<u64>,
620 snapshot_id_opt: Option<u64>,
621) -> io::Result<Recovery> {
622 let (snapshot_tx, snapshot_rx) = bounded(1);
623 if let Some(snapshot_id) = snapshot_id_opt {
624 let path: PathBuf = path.into();
625 rayon::spawn(move || {
626 let snap_res =
627 read_snapshot(&path, snapshot_id).map(|(snapshot, _snapshot_len)| snapshot);
628 snapshot_tx.send(snap_res).unwrap();
629 });
630 } else {
631 snapshot_tx.send(Ok(Default::default())).unwrap();
632 }
633
634 let mut max_log_id = snapshot_id_opt.unwrap_or(0);
635
636 let log_data_res: io::Result<Vec<(u64, FnvHashMap<u64, (u64, InlineArray)>)>> =
637 (&log_ids) .into_par_iter()
639 .map(move |log_id| {
640 if let Some(snapshot_id) = snapshot_id_opt {
641 assert!(*log_id > snapshot_id);
642 }
643
644 let log_datum = read_log(&path, *log_id)?;
645
646 Ok((*log_id, log_datum))
647 })
648 .collect();
649
650 let mut recovered: FnvHashMap<u64, (NonZeroU64, InlineArray)> = snapshot_rx.recv().unwrap()?;
651
652 for (log_id, log_datum) in log_data_res? {
653 max_log_id = max_log_id.max(log_id);
654
655 for (k, (v, user_data)) in log_datum {
656 if v == 0 {
657 recovered.remove(&k);
658 } else {
659 recovered.insert(k, (NonZeroU64::new(v).unwrap(), user_data));
660 }
661 }
662 }
663
664 let mut recovered: Vec<(u64, NonZeroU64, InlineArray)> = recovered
665 .into_iter()
666 .map(|(k, (v, user_data))| (k, v, user_data))
667 .collect();
668
669 recovered.par_sort_unstable();
670
671 let new_snapshot_data = serialize_batch(
673 recovered
674 .iter()
675 .map(|(k, v, user_data)| (*k, Some((*v, user_data.clone())))),
676 );
677 let snapshot_size = new_snapshot_data.len() as u64;
678
679 let new_snapshot_tmp_path = snapshot_path(path, max_log_id, true);
680 log::info!("writing snapshot to {new_snapshot_tmp_path:?}");
681
682 let mut snapshot_file_opts = fs::OpenOptions::new();
683 snapshot_file_opts.create(true).read(false).write(true);
684
685 let mut snapshot_file = fallible!(snapshot_file_opts.open(&new_snapshot_tmp_path));
686
687 fallible!(snapshot_file.write_all(&new_snapshot_data));
688 drop(new_snapshot_data);
689
690 fallible!(snapshot_file.sync_all());
691
692 let new_snapshot_path = snapshot_path(path, max_log_id, false);
693 log::info!("renaming written snapshot to {new_snapshot_path:?}");
694 fallible!(fs::rename(new_snapshot_tmp_path, new_snapshot_path));
695 fallible!(fs::File::open(path).and_then(|directory| directory.sync_all()));
696
697 for log_id in &log_ids {
698 let log_path = log_path(path, *log_id);
699 fallible!(fs::remove_file(log_path));
700 }
701
702 if let Some(old_snapshot_id) = snapshot_id_opt {
703 let old_snapshot_path = snapshot_path(path, old_snapshot_id, false);
704 fallible!(fs::remove_file(old_snapshot_path));
705 }
706
707 Ok(Recovery {
708 recovered,
709 id_for_next_log: max_log_id + 1,
710 snapshot_size,
711 })
712}