1#![cfg_attr(test, feature(no_coverage))]
55
56use std::collections::BTreeMap;
57use std::fs;
58use std::io::{self, prelude::*, BufReader, BufWriter, Result};
59use std::path::{Path, PathBuf};
60use std::sync::{
61 atomic::{AtomicU64, Ordering},
62 mpsc, Arc,
63};
64
65const SSTABLE_DIR: &str = "sstables";
66const U64_SZ: usize = std::mem::size_of::<u64>();
67
68#[derive(Debug, Clone, Copy)]
69#[cfg_attr(
70 test,
71 derive(serde::Serialize, serde::Deserialize, fuzzcheck::DefaultMutator)
72)]
73pub struct Config {
74 pub max_space_amp: u8,
81 pub max_log_length: usize,
85 pub merge_ratio: u8,
90 pub merge_window: u8,
94 pub log_bufwriter_size: u32,
98 pub zstd_sstable_compression_level: u8,
100}
101
102impl Default for Config {
103 fn default() -> Config {
104 Config {
105 max_space_amp: 2,
106 max_log_length: 32 * 1024 * 1024,
107 merge_ratio: 3,
108 merge_window: 10,
109 log_bufwriter_size: 32 * 1024,
110 zstd_sstable_compression_level: 3,
111 }
112 }
113}
114
115struct WorkerStats {
116 read_bytes: AtomicU64,
117 written_bytes: AtomicU64,
118}
119
120#[derive(Debug, Clone, Copy)]
121pub struct Stats {
122 pub resident_bytes: u64,
123 pub on_disk_bytes: u64,
124 pub logged_bytes: u64,
125 pub written_bytes: u64,
126 pub read_bytes: u64,
127 pub space_amp: f64,
128 pub write_amp: f64,
129}
130
131fn hash<const K: usize, const V: usize>(k: &[u8; K], v: &Option<[u8; V]>) -> u32 {
132 let mut hasher = crc32fast::Hasher::new();
133 hasher.update(&[v.is_some() as u8]);
134 hasher.update(&*k);
135
136 if let Some(v) = v {
137 hasher.update(v);
138 } else {
139 hasher.update(&[0; V]);
140 }
141
142 hasher.finalize() ^ 0xFF
145}
146
147#[inline]
148fn hash_batch_len(len: usize) -> u32 {
149 let mut hasher = crc32fast::Hasher::new();
150 hasher.update(&(len as u64).to_le_bytes());
151
152 hasher.finalize() ^ 0xFF
153}
154
155enum WorkerMessage {
156 NewSST { id: u64, sst_sz: u64, db_sz: u64 },
157 Stop(mpsc::Sender<()>),
158 Heartbeat(mpsc::Sender<()>),
159}
160
161struct Worker<const K: usize, const V: usize> {
162 sstable_directory: BTreeMap<u64, u64>,
163 inbox: mpsc::Receiver<WorkerMessage>,
164 db_sz: u64,
165 path: PathBuf,
166 config: Config,
167 stats: Arc<WorkerStats>,
168}
169
170impl<const K: usize, const V: usize> Worker<K, V> {
171 #[cfg(not(test))]
172 fn run(mut self) {
173 while self.tick() {}
174 log::info!("tiny-lsm compaction worker quitting");
175 }
176
177 fn tick(&mut self) -> bool {
178 match self.inbox.recv() {
179 Ok(message) => {
180 if !self.handle_message(message) {
181 return false;
182 }
183 }
184 Err(mpsc::RecvError) => {
185 return false;
186 }
187 }
188
189 if let Err(e) = self.sstable_maintenance() {
192 log::error!(
193 "error while compacting sstables \
194 in the background: {:?}",
195 e
196 );
197 }
198
199 true
200 }
201
202 fn handle_message(&mut self, message: WorkerMessage) -> bool {
203 match message {
204 WorkerMessage::NewSST { id, sst_sz, db_sz } => {
205 self.db_sz = db_sz;
206 self.sstable_directory.insert(id, sst_sz);
207 true
208 }
209 WorkerMessage::Stop(dropper) => {
210 drop(dropper);
211 false
212 }
213 WorkerMessage::Heartbeat(dropper) => {
214 drop(dropper);
215 true
216 }
217 }
218 }
219
220 fn sstable_maintenance(&mut self) -> Result<()> {
221 let on_disk_size: u64 = self.sstable_directory.values().sum();
222
223 log::debug!("disk size: {} mem size: {}", on_disk_size, self.db_sz);
224 if self.sstable_directory.len() > 1
225 && on_disk_size / (self.db_sz + 1) > self.config.max_space_amp as u64
226 {
227 log::debug!(
228 "performing full compaction, decompressed on-disk \
229 database size has grown beyond {}x the in-memory size",
230 self.config.max_space_amp
231 );
232 let run_to_compact: Vec<u64> = self.sstable_directory.keys().copied().collect();
233
234 self.compact_sstable_run(&run_to_compact)?;
235 return Ok(());
236 }
237
238 if self.sstable_directory.len() < self.config.merge_window.max(2) as usize {
239 return Ok(());
240 }
241
242 for window in self
243 .sstable_directory
244 .iter()
245 .collect::<Vec<_>>()
246 .windows(self.config.merge_window.max(2) as usize)
247 {
248 if window
249 .iter()
250 .skip(1)
251 .all(|w| *w.1 * self.config.merge_ratio as u64 > *window[0].1)
252 {
253 let run_to_compact: Vec<u64> = window.into_iter().map(|(id, _sum)| **id).collect();
254
255 self.compact_sstable_run(&run_to_compact)?;
256 return Ok(());
257 }
258 }
259
260 Ok(())
261 }
262
263 fn compact_sstable_run(&mut self, sstable_ids: &[u64]) -> Result<()> {
268 log::debug!(
269 "trying to compact sstable_ids {:?}",
270 sstable_ids
271 .iter()
272 .map(|id| id_format(*id))
273 .collect::<Vec<_>>()
274 );
275
276 let mut map = BTreeMap::new();
277
278 let mut read_pairs = 0;
279
280 for sstable_id in sstable_ids {
281 for (k, v) in read_sstable::<K, V>(&self.path, *sstable_id)? {
282 map.insert(k, v);
283 read_pairs += 1;
284 }
285 }
286
287 self.stats
288 .read_bytes
289 .fetch_add(read_pairs * (4 + 1 + K + V) as u64, Ordering::Relaxed);
290
291 let sst_id = sstable_ids
292 .iter()
293 .max()
294 .expect("compact_sstable_run called with empty set of sst ids");
295
296 write_sstable(&self.path, *sst_id, &map, true, &self.config)?;
297
298 self.stats
299 .written_bytes
300 .fetch_add(map.len() as u64 * (4 + 1 + K + V) as u64, Ordering::Relaxed);
301
302 let sst_sz = map.len() as u64 * (4 + K + V) as u64;
303 self.sstable_directory.insert(*sst_id, sst_sz);
304
305 log::debug!("compacted range into sstable {}", id_format(*sst_id));
306
307 for sstable_id in sstable_ids {
308 if sstable_id == sst_id {
309 continue;
310 }
311 fs::remove_file(self.path.join(SSTABLE_DIR).join(id_format(*sstable_id)))?;
312 self.sstable_directory
313 .remove(sstable_id)
314 .expect("compacted sst not present in sstable_directory");
315 }
316 fs::File::open(self.path.join(SSTABLE_DIR))?.sync_all()?;
317
318 Ok(())
319 }
320}
321
322fn id_format(id: u64) -> String {
323 format!("{:016x}", id)
324}
325
326fn list_sstables(path: &Path, remove_tmp: bool) -> Result<BTreeMap<u64, u64>> {
327 let mut sstable_map = BTreeMap::new();
328
329 for dir_entry_res in fs::read_dir(path.join(SSTABLE_DIR))? {
330 let dir_entry = dir_entry_res?;
331 let file_name = if let Ok(f) = dir_entry.file_name().into_string() {
332 f
333 } else {
334 continue;
335 };
336
337 if let Ok(id) = u64::from_str_radix(&file_name, 16) {
338 let metadata = dir_entry.metadata()?;
339
340 sstable_map.insert(id, metadata.len());
341 } else {
342 if remove_tmp && file_name.ends_with("-tmp") {
343 log::warn!("removing incomplete sstable rewrite {}", file_name);
344 fs::remove_file(path.join(SSTABLE_DIR).join(file_name))?;
345 }
346 }
347 }
348
349 Ok(sstable_map)
350}
351
352fn write_sstable<const K: usize, const V: usize>(
353 path: &Path,
354 id: u64,
355 items: &BTreeMap<[u8; K], Option<[u8; V]>>,
356 tmp_mv: bool,
357 config: &Config,
358) -> Result<()> {
359 let sst_dir_path = path.join(SSTABLE_DIR);
360 let sst_path = if tmp_mv {
361 sst_dir_path.join(format!("{:x}-tmp", id))
362 } else {
363 sst_dir_path.join(id_format(id))
364 };
365
366 let file = fs::OpenOptions::new()
367 .create(true)
368 .write(true)
369 .open(&sst_path)?;
370
371 let max_zstd_level = zstd::compression_level_range();
372 let zstd_level = config
373 .zstd_sstable_compression_level
374 .min(*max_zstd_level.end() as u8);
375
376 let mut bw =
377 BufWriter::new(zstd::Encoder::new(file, zstd_level as _).expect("zstd encoder failure"));
378
379 bw.write_all(&(items.len() as u64).to_le_bytes())?;
380
381 for (k, v) in items {
382 let crc: u32 = hash(k, v);
383 bw.write_all(&crc.to_le_bytes())?;
384 bw.write_all(&[v.is_some() as u8])?;
385 bw.write_all(k)?;
386
387 if let Some(v) = v {
388 bw.write_all(v)?;
389 } else {
390 bw.write_all(&[0; V])?;
391 }
392 }
393
394 bw.flush()?;
395
396 bw.get_mut().get_mut().sync_all()?;
397 fs::File::open(path.join(SSTABLE_DIR))?.sync_all()?;
398
399 if tmp_mv {
400 let new_path = sst_dir_path.join(id_format(id));
401 fs::rename(sst_path, new_path)?;
402 }
403
404 Ok(())
405}
406
407fn read_sstable<const K: usize, const V: usize>(
408 path: &Path,
409 id: u64,
410) -> Result<Vec<([u8; K], Option<[u8; V]>)>> {
411 let file = fs::OpenOptions::new()
412 .read(true)
413 .open(path.join(SSTABLE_DIR).join(id_format(id)))?;
414
415 let mut reader = zstd::Decoder::new(BufReader::with_capacity(16 * 1024 * 1024, file)).unwrap();
416
417 let mut buf = vec![0; 4 + 1 + K + V];
419
420 let len_buf = &mut [0; 8];
421
422 reader.read_exact(len_buf)?;
423
424 let expected_len: u64 = u64::from_le_bytes(*len_buf);
425 let mut sstable = Vec::with_capacity(expected_len as usize);
426
427 while let Ok(()) = reader.read_exact(&mut buf) {
428 let crc_expected: u32 = u32::from_le_bytes(buf[0..4].try_into().unwrap());
429 let d: bool = match buf[4] {
430 0 => false,
431 1 => true,
432 _ => {
433 log::warn!("detected torn-write while reading sstable {:016x}", id);
434 break;
435 }
436 };
437 let k: [u8; K] = buf[5..K + 5].try_into().unwrap();
438 let v: Option<[u8; V]> = if d {
439 Some(buf[K + 5..5 + K + V].try_into().unwrap())
440 } else {
441 None
442 };
443 let crc_actual: u32 = hash(&k, &v);
444
445 if crc_expected != crc_actual {
446 log::warn!("detected torn-write while reading sstable {:016x}", id);
447 break;
448 }
449
450 sstable.push((k, v));
451 }
452
453 if sstable.len() as u64 != expected_len {
454 log::warn!(
455 "sstable {:016x} tear detected - process probably crashed \
456 before full sstable could be written out",
457 id
458 );
459 }
460
461 Ok(sstable)
462}
463
464pub struct Lsm<const K: usize, const V: usize> {
465 memtable: BTreeMap<[u8; K], Option<[u8; V]>>,
467 db: BTreeMap<[u8; K], [u8; V]>,
468 worker_outbox: mpsc::Sender<WorkerMessage>,
469 next_sstable_id: u64,
470 dirty_bytes: usize,
471 #[cfg(test)]
472 worker: Worker<K, V>,
473 #[cfg(test)]
474 pub log: tearable::Tearable<fs::File>,
475 #[cfg(not(test))]
476 log: BufWriter<fs::File>,
477 path: PathBuf,
478 config: Config,
479 stats: Stats,
480 worker_stats: Arc<WorkerStats>,
481}
482
483impl<const K: usize, const V: usize> Drop for Lsm<K, V> {
484 fn drop(&mut self) {
485 let (tx, rx) = mpsc::channel();
486
487 if self.worker_outbox.send(WorkerMessage::Stop(tx)).is_err() {
488 log::error!("failed to shut down compaction worker on Lsm drop");
489 return;
490 }
491
492 #[cfg(test)]
493 assert!(!self.worker.tick());
494
495 for _ in rx {}
496 }
497}
498
499impl<const K: usize, const V: usize> std::ops::Deref for Lsm<K, V> {
500 type Target = BTreeMap<[u8; K], [u8; V]>;
501
502 fn deref(&self) -> &Self::Target {
503 &self.db
504 }
505}
506
507impl<const K: usize, const V: usize> Lsm<K, V> {
508 pub fn recover<P: AsRef<Path>>(p: P) -> Result<Lsm<K, V>> {
516 Lsm::recover_with_config(p, Config::default())
517 }
518
519 pub fn recover_with_config<P: AsRef<Path>>(p: P, config: Config) -> Result<Lsm<K, V>> {
524 let path = p.as_ref();
525 if !path.exists() {
526 fs::create_dir_all(path)?;
527 fs::create_dir(path.join(SSTABLE_DIR))?;
528 fs::File::open(path.join(SSTABLE_DIR))?.sync_all()?;
529 fs::File::open(path)?.sync_all()?;
530 let mut parent_opt = path.parent();
531
532 while let Some(parent) = parent_opt {
535 if parent.file_name().is_none() {
536 break;
537 }
538 if fs::File::open(parent).and_then(|f| f.sync_all()).is_err() {
539 break;
543 }
544 parent_opt = parent.parent();
545 }
546 }
547
548 let sstable_directory = list_sstables(path, true)?;
549
550 let mut db = BTreeMap::new();
551 for sstable_id in sstable_directory.keys() {
552 for (k, v) in read_sstable::<K, V>(path, *sstable_id)? {
553 if let Some(v) = v {
554 db.insert(k, v);
555 } else {
556 db.remove(&k);
557 }
558 }
559 }
560
561 let max_sstable_id = sstable_directory.keys().next_back().copied();
562
563 let log = fs::OpenOptions::new()
564 .create(true)
565 .read(true)
566 .write(true)
567 .open(path.join("log"))?;
568
569 let mut reader = BufReader::new(log);
570
571 let tuple_sz = U64_SZ.max(K + V);
572 let header_sz = 5;
573 let header_tuple_sz = header_sz + tuple_sz;
574 let mut buf = vec![0; header_tuple_sz];
575
576 let mut memtable = BTreeMap::new();
577 let mut recovered = 0;
578
579 let mut write_batch: Option<(_, usize, u64)> = None;
583 while let Ok(()) = reader.read_exact(&mut buf) {
584 let crc_expected: u32 = u32::from_le_bytes(buf[0..4].try_into().unwrap());
585 let d: bool = match buf[4] {
586 0 => false,
587 1 => true,
588 2 if write_batch.is_none() => {
589 let batch_sz_buf: [u8; 8] = buf[5..5 + 8].try_into().unwrap();
591 let batch_sz: u64 = u64::from_le_bytes(batch_sz_buf);
592 log::debug!("processing batch of len {}", batch_sz);
593
594 let crc_actual = hash_batch_len(usize::try_from(batch_sz).unwrap());
595 if crc_expected != crc_actual {
596 log::warn!("crc mismatch for batch size marker");
597 break;
598 }
599
600 if !buf[5 + U64_SZ..].iter().all(|e| *e == 0) {
601 log::warn!(
602 "expected all pad bytes after logged \
603 batch manifests to be zero, but some \
604 corruption was detected"
605 );
606 break;
607 }
608
609 if batch_sz > usize::MAX as u64 {
610 return Err(io::Error::new(
611 io::ErrorKind::InvalidInput,
612 "recovering a batch size over usize::MAX is not supported",
613 ));
614 }
615
616 let wb_remaining = batch_sz as usize;
617 let wb_recovered = buf.len() as u64;
618
619 if wb_remaining > 0 {
620 write_batch = Some((
621 Vec::with_capacity(batch_sz as usize),
622 wb_remaining,
623 wb_recovered,
624 ));
625 } else {
626 recovered += buf.len() as u64;
627 }
628
629 continue;
630 }
631 _ => {
632 log::warn!("invalid log message discriminant detected: {}", buf[4]);
633 break;
634 }
635 };
636 let k: [u8; K] = buf[5..5 + K].try_into().unwrap();
637 let v: Option<[u8; V]> = if d {
638 Some(buf[5 + K..5 + K + V].try_into().unwrap())
639 } else {
640 None
641 };
642
643 let crc_actual: u32 = hash(&k, &v);
644
645 if crc_expected != crc_actual {
646 log::warn!(
647 "crc mismatch for kv pair {:?}-{:?}: expected {} actual {}, torn log detected",
648 k,
649 v,
650 crc_expected,
651 crc_actual
652 );
653 break;
654 }
655
656 let pad_start = if v.is_some() { 5 + K + V } else { 5 + K };
657
658 if !buf[pad_start..].iter().all(|e| *e == 0) {
659 log::warn!(
660 "expected all pad bytes for logged kv entries \
661 to be zero, but some corruption was detected"
662 );
663 break;
664 }
665
666 if let Some((mut wb, mut wb_remaining, mut wb_recovered)) = write_batch.take() {
667 wb.push((k, v));
668 wb_remaining = wb_remaining.checked_sub(1).unwrap();
669 wb_recovered = wb_recovered.checked_add(buf.len() as u64).unwrap();
670
671 if wb_remaining == 0 {
674 for (k, v) in wb {
675 memtable.insert(k, v);
676
677 if let Some(v) = v {
678 db.insert(k, v);
679 } else {
680 db.remove(&k);
681 }
682 }
683 recovered += wb_recovered;
684 } else {
685 write_batch = Some((wb, wb_remaining, wb_recovered));
686 }
687 } else {
688 memtable.insert(k, v);
689
690 if let Some(v) = v {
691 db.insert(k, v);
692 } else {
693 db.remove(&k);
694 }
695
696 recovered += buf.len() as u64;
697 }
698 }
699
700 log::debug!("recovered {} kv pairs", db.len());
702 log::debug!("rewinding log down to length {}", recovered);
703 let log_file = reader.get_mut();
704 log_file.seek(io::SeekFrom::Start(recovered))?;
705 log_file.set_len(recovered)?;
706 log_file.sync_all()?;
707 fs::File::open(path.join(SSTABLE_DIR))?.sync_all()?;
708
709 let (tx, rx) = mpsc::channel();
710
711 let worker_stats = Arc::new(WorkerStats {
712 read_bytes: 0.into(),
713 written_bytes: 0.into(),
714 });
715
716 let worker: Worker<K, V> = Worker {
717 path: path.clone().into(),
718 sstable_directory,
719 inbox: rx,
720 db_sz: db.len() as u64 * (K + V) as u64,
721 config,
722 stats: worker_stats.clone(),
723 };
724
725 #[cfg(not(test))]
726 std::thread::spawn(move || worker.run());
727
728 let (hb_tx, hb_rx) = mpsc::channel();
729 tx.send(WorkerMessage::Heartbeat(hb_tx)).unwrap();
730
731 #[cfg(test)]
732 let mut worker = worker;
733
734 #[cfg(test)]
735 assert!(worker.tick());
736
737 for _ in hb_rx {}
738
739 let lsm = Lsm {
740 #[cfg(not(test))]
741 log: BufWriter::with_capacity(config.log_bufwriter_size as usize, reader.into_inner()),
742 #[cfg(test)]
743 log: tearable::Tearable::new(reader.into_inner()),
744 #[cfg(test)]
745 worker,
746 path: path.into(),
747 next_sstable_id: max_sstable_id.unwrap_or(0) + 1,
748 dirty_bytes: recovered as usize,
749 worker_outbox: tx,
750 config,
751 stats: Stats {
752 logged_bytes: recovered,
753 on_disk_bytes: 0,
754 read_bytes: 0,
755 written_bytes: 0,
756 resident_bytes: db.len() as u64 * (K + V) as u64,
757 space_amp: 0.,
758 write_amp: 0.,
759 },
760 worker_stats,
761 db,
762 memtable,
763 };
764
765 Ok(lsm)
766 }
767
768 pub fn insert(&mut self, k: [u8; K], v: [u8; V]) -> Result<Option<[u8; V]>> {
776 self.log_mutation(k, Some(v))?;
777
778 if self.dirty_bytes > self.config.max_log_length {
779 self.flush()?;
780 }
781
782 Ok(self.db.insert(k, v))
783 }
784
785 pub fn remove(&mut self, k: &[u8; K]) -> Result<Option<[u8; V]>> {
793 self.log_mutation(*k, None)?;
794
795 if self.dirty_bytes > self.config.max_log_length {
796 self.flush()?;
797 }
798
799 Ok(self.db.remove(k))
800 }
801
802 pub fn write_batch(&mut self, write_batch: &[([u8; K], Option<[u8; V]>)]) -> Result<()> {
807 let batch_len: [u8; 8] = (write_batch.len() as u64).to_le_bytes();
808 let crc = hash_batch_len(write_batch.len());
809
810 self.log.write_all(&crc.to_le_bytes())?;
811 self.log.write_all(&[2_u8])?;
812 self.log.write_all(&batch_len)?;
813
814 let tuple_sz = U64_SZ.max(K + V);
818 let pad_sz = tuple_sz - U64_SZ;
819 let pad = [0; U64_SZ];
820 self.log.write_all(&pad[..pad_sz])?;
821
822 for (k, v_opt) in write_batch {
823 if let Some(v) = v_opt {
824 self.db.insert(*k, *v);
825 } else {
826 self.db.remove(k);
827 }
828
829 self.log_mutation(*k, *v_opt)?;
830 self.memtable.insert(*k, *v_opt);
831 }
832
833 if self.dirty_bytes > self.config.max_log_length {
834 self.flush()?;
835 }
836
837 Ok(())
838 }
839
840 fn log_mutation(&mut self, k: [u8; K], v: Option<[u8; V]>) -> Result<()> {
841 let crc: u32 = hash(&k, &v);
842 self.log.write_all(&crc.to_le_bytes())?;
843 self.log.write_all(&[v.is_some() as u8])?;
844 self.log.write_all(&k)?;
845
846 if let Some(v) = v {
847 self.log.write_all(&v)?;
848 } else {
849 self.log.write_all(&[0; V])?;
850 };
851
852 let min_tuple_sz = U64_SZ.max(K + V);
856 let pad_sz = min_tuple_sz - (K + V);
857 let pad = [0; U64_SZ];
858 self.log.write_all(&pad[..pad_sz])?;
859
860 let logged_bytes = 4 + 1 + min_tuple_sz;
861
862 self.memtable.insert(k, v);
863
864 self.dirty_bytes += logged_bytes;
865 self.stats.logged_bytes += logged_bytes as u64;
866 self.stats.written_bytes += logged_bytes as u64;
867
868 Ok(())
869 }
870
871 pub fn flush(&mut self) -> Result<()> {
880 #[cfg(test)]
881 {
882 if self.log.tearing {
883 return Ok(());
884 }
885 }
886
887 self.log.flush()?;
888 self.log.get_mut().sync_all()?;
889
890 if self.dirty_bytes > self.config.max_log_length {
891 log::debug!("compacting log to sstable");
892 let memtable = std::mem::take(&mut self.memtable);
893 let sst_id = self.next_sstable_id;
894 if let Err(e) = write_sstable(&self.path, sst_id, &memtable, false, &self.config) {
895 self.memtable = memtable;
897 log::error!("failed to flush lsm log to sstable: {:?}", e);
898 return Err(e.into());
899 }
900
901 let sst_sz = 8 + (memtable.len() as u64 * (4 + K + V) as u64);
902 let db_sz = self.db.len() as u64 * (K + V) as u64;
903
904 if let Err(e) = self.worker_outbox.send(WorkerMessage::NewSST {
905 id: sst_id,
906 sst_sz,
907 db_sz,
908 }) {
909 log::error!("failed to send message to worker: {:?}", e);
910 log::logger().flush();
911 panic!("failed to send message to worker: {:?}", e);
912 }
913
914 #[cfg(test)]
915 assert!(self.worker.tick());
916
917 self.next_sstable_id += 1;
918
919 let log_file: &mut fs::File = self.log.get_mut();
920 log_file.seek(io::SeekFrom::Start(0))?;
921 log_file.set_len(0)?;
922 log_file.sync_all()?;
923 fs::File::open(self.path.join(SSTABLE_DIR))?.sync_all()?;
924
925 self.dirty_bytes = 0;
926 }
927
928 Ok(())
929 }
930
931 pub fn stats(&mut self) -> Result<Stats> {
932 self.stats.written_bytes += self.worker_stats.written_bytes.swap(0, Ordering::Relaxed);
933 self.stats.read_bytes += self.worker_stats.read_bytes.swap(0, Ordering::Relaxed);
934 self.stats.resident_bytes = self.db.len() as u64 * (K + V) as u64;
935
936 let mut on_disk_bytes: u64 = std::fs::metadata(self.path.join("log"))?.len();
937
938 on_disk_bytes += list_sstables(&self.path, false)?
939 .into_iter()
940 .map(|(_, len)| len)
941 .sum::<u64>();
942
943 self.stats.on_disk_bytes = on_disk_bytes;
944
945 self.stats.write_amp =
946 self.stats.written_bytes as f64 / self.stats.on_disk_bytes.max(1) as f64;
947 self.stats.space_amp =
948 self.stats.on_disk_bytes as f64 / self.stats.resident_bytes.max(1) as f64;
949 Ok(self.stats)
950 }
951}
952
953#[cfg(test)]
954mod tearable;
955
956#[cfg(test)]
957mod fuzz;