1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use zerocopy::FromBytes;
6
7use crate::Key;
8use crate::disk_loc::DiskLoc;
9use crate::entry::{EntryHeader, entry_size};
10use crate::error::DbResult;
11use crate::hint;
12use crate::io::direct;
13use crate::shard::{ImmutableFile, Shard};
14
15type ReadFn = dyn Fn(&std::fs::File, u64, usize) -> DbResult<Vec<u8>>;
16
17#[cfg(feature = "encryption")]
18use crate::crypto::PageCipher;
19#[cfg(feature = "encryption")]
20use crate::io::tags::{self, TagFile};
21
22fn dir_fsync(dir: &std::path::Path) -> DbResult<()> {
23 let d = fs::File::open(dir)?;
24 d.sync_all()?;
25 Ok(())
26}
27
28pub trait CompactionIndex<K: Key>: Send + Sync {
29 fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool;
31
32 fn invalidate_blocks(&self, _shard_id: u8, _file_id: u32, _total_bytes: u64) {}
34
35 fn contains_key(&self, key: &K) -> bool;
37}
38
39#[cfg(feature = "replication")]
41pub trait CompactionGuard: Send + Sync {
42 fn min_replicated_gsn(&self, shard_id: u8) -> u64;
45}
46
47#[cfg(feature = "replication")]
49pub struct NoReplicationGuard;
50
51#[cfg(feature = "replication")]
52impl CompactionGuard for NoReplicationGuard {
53 fn min_replicated_gsn(&self, _shard_id: u8) -> u64 {
54 u64::MAX
55 }
56}
57
58fn should_rotate_output(write_offset: u64, needed: u64, max_file_size: u64) -> bool {
61 write_offset + needed > max_file_size
62}
63
64pub fn compact_shard<K: Key, I: CompactionIndex<K>>(
65 shard: &Shard,
66 index: &I,
67 threshold: f64,
68) -> DbResult<usize> {
69 compact_shard_inner::<K, I>(shard, index, threshold, u64::MAX)
70}
71
72#[cfg(feature = "replication")]
74pub fn compact_shard_guarded<K: Key, I: CompactionIndex<K>>(
75 shard: &Shard,
76 index: &I,
77 threshold: f64,
78 guard: &dyn CompactionGuard,
79) -> DbResult<usize> {
80 let min_gsn = guard.min_replicated_gsn(shard.id);
81 compact_shard_inner::<K, I>(shard, index, threshold, min_gsn)
82}
83
84struct OutputState {
86 file_id: u32,
87 tmp_file: fs::File,
88 write_offset: u64,
89 #[cfg(feature = "encryption")]
90 enc_state: Option<EncryptionState>,
91}
92
93#[cfg(feature = "encryption")]
94struct EncryptionState {
95 cipher: Arc<PageCipher>,
96 page_buf: Box<[u8; 4096]>,
97 page_offset: usize,
98 pages_written: u64,
99 tag_list: Vec<[u8; 16]>,
100}
101
102#[cfg(feature = "encryption")]
103impl EncryptionState {
104 fn flush_page(&mut self, file: &fs::File, file_id: u32) -> DbResult<()> {
105 let tag = self
106 .cipher
107 .encrypt_page(file_id, self.pages_written, &mut *self.page_buf)?;
108 direct::pwrite_at(file, &*self.page_buf, self.pages_written * 4096)?;
109 self.tag_list.push(tag);
110 self.pages_written += 1;
111 self.page_buf.fill(0);
112 self.page_offset = 0;
113 Ok(())
114 }
115
116 fn write_bytes(&mut self, file: &fs::File, file_id: u32, data: &[u8]) -> DbResult<()> {
117 let mut remaining = data;
118 while !remaining.is_empty() {
119 let space = 4096 - self.page_offset;
120 let chunk = remaining.len().min(space);
121 self.page_buf[self.page_offset..self.page_offset + chunk]
122 .copy_from_slice(&remaining[..chunk]);
123 self.page_offset += chunk;
124 remaining = &remaining[chunk..];
125 if self.page_offset == 4096 {
126 self.flush_page(file, file_id)?;
127 }
128 }
129 Ok(())
130 }
131}
132
133struct FinalizedOutput {
135 file_id: u32,
136 total_bytes: u64,
137 final_data_path: std::path::PathBuf,
138 #[cfg(feature = "encryption")]
139 final_tag_path: Option<std::path::PathBuf>,
140}
141
142fn compact_shard_inner<K: Key, I: CompactionIndex<K>>(
143 shard: &Shard,
144 index: &I,
145 threshold: f64,
146 min_replicated_gsn: u64,
147) -> DbResult<usize> {
148 let mut files_to_compact = Vec::new();
149
150 #[cfg(feature = "encryption")]
152 let cipher_opt: Option<Arc<PageCipher>>;
153 let max_file_size: u64;
154 let cooldown_ids: Vec<u32>;
155 {
156 let mut inner = shard.lock();
157 max_file_size = inner.max_file_size;
158 cooldown_ids = std::mem::take(&mut inner.last_compaction_output_ids);
159 #[cfg(feature = "encryption")]
160 {
161 cipher_opt = inner.cipher.clone();
162 }
163 for file in &inner.immutable {
164 let total = file.total_bytes;
165 let dead = inner.dead_bytes.get(&file.file_id).copied().unwrap_or(0);
166 if total > 0
167 && (dead as f64 / total as f64) > threshold
168 && !cooldown_ids.contains(&file.file_id)
169 {
170 files_to_compact.push(file.clone());
171 }
172 }
173 }
174
175 if files_to_compact.is_empty() {
176 return Ok(0);
177 }
178
179 if min_replicated_gsn < u64::MAX {
181 files_to_compact.retain(|file| {
182 let hint_path = shard.dir().join(format!("{:06}.hint", file.file_id));
183 match file_max_gsn(&hint_path, file.file_id, size_of::<K>()) {
184 Some(max_gsn) => max_gsn < min_replicated_gsn,
185 None => false, }
187 });
188 if files_to_compact.is_empty() {
189 return Ok(0);
190 }
191 }
192
193 files_to_compact.sort_by_key(|f| f.file_id);
194 files_to_compact.truncate(4);
195 let compact_start = std::time::Instant::now();
196
197 let old_file_ids: Vec<u32> = files_to_compact.iter().map(|f| f.file_id).collect();
198
199 let first_file_id = {
201 let mut inner = shard.lock();
202 inner.allocate_file_id()?
203 };
204
205 let mut output = open_output(shard, first_file_id)?;
206
207 const BATCH_SIZE: usize = 256;
209
210 struct BatchEntry<K> {
211 key: K,
212 gsn: u64,
213 old_loc: DiskLoc,
214 new_loc: DiskLoc,
215 is_tombstone: bool,
216 }
217
218 let mut batch: Vec<BatchEntry<K>> = Vec::with_capacity(BATCH_SIZE);
219 let mut pending_outputs: Vec<FinalizedOutput> = Vec::new();
221
222 for old_arc in &files_to_compact {
224 let file = &old_arc.file;
225 let file_len = old_arc.total_bytes;
226 let mut offset: u64 = 0;
227
228 #[cfg(feature = "encryption")]
230 let read_fn: Box<ReadFn> =
231 if let (Some(cipher), Some(_tag_file)) = (&cipher_opt, &old_arc.tag_file) {
232 let c = cipher.clone();
233 let fid = old_arc.file_id;
234 let tp = tags::tags_path_for_data(&old_arc.path);
235 let tf = Arc::new(TagFile::open_read(&tp)?);
236 Box::new(move |f, o, l| direct::pread_value_encrypted(f, &tf, &c, fid, o, l))
237 } else {
238 Box::new(direct::pread_value)
239 };
240 #[cfg(not(feature = "encryption"))]
241 let read_fn: Box<ReadFn> = Box::new(direct::pread_value);
242
243 while offset + size_of::<EntryHeader>() as u64 <= file_len {
244 let header_bytes = match read_fn(file, offset, size_of::<EntryHeader>()) {
245 Ok(b) => b,
246 Err(_) => break,
247 };
248 let header = match EntryHeader::read_from_bytes(&header_bytes) {
249 Ok(h) => h,
250 Err(_) => break,
251 };
252
253 if header.gsn == 0 && header.crc32 == 0 && header.value_len == 0 {
255 break;
256 }
257
258 let total = entry_size(size_of::<K>(), header.value_len);
259 if offset + total > file_len {
260 break;
261 }
262
263 if output.write_offset > 0
268 && should_rotate_output(output.write_offset, total, max_file_size)
269 {
270 let next_file_id = {
271 let mut inner = shard.lock();
272 inner.allocate_file_id()?
273 };
274 let finished = finalize_output(shard, output)?;
275 pending_outputs.push(finished);
276 output = open_output(shard, next_file_id)?;
277 }
278
279 let old_loc = DiskLoc::new(
280 shard.id,
281 old_arc.file_id,
282 (offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
283 header.value_len,
284 );
285
286 let entry_bytes = read_fn(file, offset, total as usize)?;
287
288 let key_start = size_of::<EntryHeader>();
290 let key_end = key_start + size_of::<K>();
291 let val_end = key_end + header.value_len as usize;
292 let computed_crc = crate::entry::compute_crc32(
293 header.gsn,
294 header.value_len,
295 &entry_bytes[key_start..key_end],
296 &entry_bytes[key_end..val_end],
297 );
298 if computed_crc != header.crc32 {
299 for out in &pending_outputs {
301 let _ = fs::remove_file(&out.final_data_path);
302 #[cfg(feature = "encryption")]
303 if let Some(ref tp) = out.final_tag_path {
304 let _ = fs::remove_file(tp);
305 }
306 }
307 let cur_tmp = shard.dir().join(format!("{:06}.data.tmp", output.file_id));
309 let _ = fs::remove_file(&cur_tmp);
310 #[cfg(feature = "encryption")]
311 {
312 let cur_tags_tmp = shard.dir().join(format!("{:06}.tags.tmp", output.file_id));
313 let _ = fs::remove_file(&cur_tags_tmp);
314 }
315 return Err(crate::error::DbError::CrcMismatch {
316 expected: header.crc32,
317 actual: computed_crc,
318 });
319 }
320
321 #[cfg(feature = "encryption")]
323 if let Some(ref mut enc) = output.enc_state {
324 enc.write_bytes(&output.tmp_file, output.file_id, &entry_bytes)?;
325 } else {
326 direct::pwrite_at(&output.tmp_file, &entry_bytes, output.write_offset)?;
327 }
328 #[cfg(not(feature = "encryption"))]
329 direct::pwrite_at(&output.tmp_file, &entry_bytes, output.write_offset)?;
330
331 let value_offset =
332 output.write_offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64;
333 debug_assert!(value_offset <= u32::MAX as u64);
334 let new_loc = DiskLoc::new(
335 shard.id,
336 output.file_id,
337 value_offset as u32,
338 header.value_len,
339 );
340
341 let key_bytes = &entry_bytes[16..16 + size_of::<K>()];
342 let key: K = K::from_bytes(key_bytes);
343
344 batch.push(BatchEntry {
345 key,
346 gsn: header.gsn,
347 old_loc,
348 new_loc,
349 is_tombstone: header.is_tombstone(),
350 });
351
352 output.write_offset += total;
353 offset += total;
354 }
355 }
356
357 if output.write_offset > 0 {
359 let last_output = finalize_output(shard, output)?;
360 pending_outputs.push(last_output);
361 } else {
362 let tmp_data_path = shard.dir().join(format!("{:06}.data.tmp", output.file_id));
366 let _ = std::fs::remove_file(&tmp_data_path);
367 #[cfg(feature = "encryption")]
368 {
369 let tmp_tags_path = shard.dir().join(format!("{:06}.tags.tmp", output.file_id));
370 let _ = std::fs::remove_file(&tmp_tags_path);
371 }
372 }
373
374 #[cfg(feature = "encryption")]
378 let new_immutables: Vec<Arc<ImmutableFile>> = pending_outputs
379 .iter()
380 .map(|out| {
381 let final_file = direct::open_read(&out.final_data_path)?;
382 let final_tag_file = if let Some(ref tp) = out.final_tag_path {
383 Some(Arc::new(TagFile::open_read(tp)?))
384 } else {
385 None
386 };
387 Ok(Arc::new(ImmutableFile {
388 file: final_file,
389 file_id: out.file_id,
390 path: out.final_data_path.clone(),
391 total_bytes: out.total_bytes,
392 tag_file: final_tag_file,
393 }))
394 })
395 .collect::<DbResult<Vec<_>>>()?;
396
397 #[cfg(not(feature = "encryption"))]
398 let new_immutables: Vec<Arc<ImmutableFile>> = pending_outputs
399 .iter()
400 .map(|out| {
401 let final_file = direct::open_read(&out.final_data_path)?;
402 Ok(Arc::new(ImmutableFile {
403 file: final_file,
404 file_id: out.file_id,
405 total_bytes: out.total_bytes,
406 }))
407 })
408 .collect::<DbResult<Vec<_>>>()?;
409
410 {
411 let mut inner = shard.lock();
412 inner.immutable.extend(new_immutables);
413 inner.immutable.sort_by_key(|f| f.file_id);
414 inner.last_compaction_output_ids = pending_outputs.iter().map(|o| o.file_id).collect();
415 }
416
417 let mut compacted_entries = 0;
418 let key_len = size_of::<K>();
419
420 let mut live_hint_data: std::collections::HashMap<u32, Vec<u8>> = pending_outputs
425 .iter()
426 .map(|o| (o.file_id, Vec::new()))
427 .collect();
428
429 for chunk in batch.chunks(BATCH_SIZE) {
430 let mut inner = shard.lock();
431 for entry in chunk {
432 if entry.is_tombstone {
433 if index.contains_key(&entry.key) {
434 inner.add_dead_bytes(
435 entry.new_loc.file_id,
436 entry_size(size_of::<K>(), entry.new_loc.len),
437 );
438 } else {
439 compacted_entries += 1;
440 if let Some(buf) = live_hint_data.get_mut(&entry.new_loc.file_id) {
441 append_hint_entry(
442 buf,
443 entry.gsn,
444 &entry.key,
445 entry.new_loc.offset as u64,
446 entry.new_loc.len,
447 key_len,
448 );
449 }
450 }
451 } else if index.update_if_match(&entry.key, entry.old_loc, entry.new_loc) {
452 compacted_entries += 1;
453 if let Some(buf) = live_hint_data.get_mut(&entry.new_loc.file_id) {
454 append_hint_entry(
455 buf,
456 entry.gsn,
457 &entry.key,
458 entry.new_loc.offset as u64,
459 entry.new_loc.len,
460 key_len,
461 );
462 }
463 } else {
464 inner.add_dead_bytes(
465 entry.new_loc.file_id,
466 entry_size(size_of::<K>(), entry.new_loc.len),
467 );
468 }
469 }
470 }
471
472 {
474 let mut inner = shard.lock();
475 inner
476 .immutable
477 .retain(|f| !old_file_ids.contains(&f.file_id));
478 for fid in &old_file_ids {
479 inner.dead_bytes.remove(fid);
480 }
481 for old_arc in &files_to_compact {
482 index.invalidate_blocks(shard.id, old_arc.file_id, old_arc.total_bytes);
483 }
484 }
485
486 for out in &pending_outputs {
488 let hint_data = live_hint_data.remove(&out.file_id).unwrap_or_default();
489 let hint_path = shard.dir().join(format!("{:06}.hint", out.file_id));
490 hint::write_hint_file(&hint_path, &hint_data)?;
491 }
492 dir_fsync(shard.dir())?;
493
494 for fid in &old_file_ids {
496 let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.data")));
497 let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.hint")));
498 #[cfg(feature = "encryption")]
499 let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.tags")));
500 }
501 dir_fsync(shard.dir())?;
502
503 let elapsed = compact_start.elapsed().as_secs_f64();
504 metrics::counter!("armdb.compaction.runs").increment(1);
505 metrics::counter!("armdb.compaction.entries").increment(compacted_entries as u64);
506 metrics::histogram!("armdb.compaction.duration_seconds").record(elapsed);
507 tracing::info!(
508 entries = compacted_entries,
509 files = old_file_ids.len(),
510 elapsed_ms = (elapsed * 1000.0) as u64,
511 "compaction complete"
512 );
513 Ok(compacted_entries)
514}
515
516fn open_output(shard: &Shard, file_id: u32) -> DbResult<OutputState> {
518 let tmp_path = shard.dir().join(format!("{file_id:06}.data.tmp"));
519 match fs::remove_file(&tmp_path) {
520 Ok(()) => {}
521 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
522 Err(e) => return Err(e.into()),
523 }
524 let tmp_file = direct::open_write(&tmp_path)?;
525 Ok(OutputState {
526 file_id,
527 tmp_file,
528 write_offset: 0,
529 #[cfg(feature = "encryption")]
530 enc_state: shard.lock().cipher.clone().map(|cipher| EncryptionState {
531 cipher,
532 page_buf: Box::new([0u8; 4096]),
533 page_offset: 0,
534 pages_written: 0,
535 tag_list: Vec::new(),
536 }),
537 })
538}
539
540fn finalize_output(shard: &Shard, output: OutputState) -> DbResult<FinalizedOutput> {
543 let file_id = output.file_id;
544 let tmp_path = shard.dir().join(format!("{file_id:06}.data.tmp"));
545 let final_data_path = shard.dir().join(format!("{file_id:06}.data"));
546
547 #[cfg(feature = "encryption")]
548 let (total_bytes, final_tag_path) = match output.enc_state {
549 Some(mut enc) => {
550 if enc.page_offset > 0 {
552 enc.flush_page(&output.tmp_file, file_id)?;
553 }
554
555 let tp = shard.dir().join(format!("{file_id:06}.tags.tmp"));
556 match fs::remove_file(&tp) {
557 Ok(()) => {}
558 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
559 Err(e) => return Err(e.into()),
560 }
561 let tf = TagFile::open_write(&tp)?;
562 tf.write_tags(0, &enc.tag_list)?;
563 tf.sync()?;
564
565 let final_tags_path = shard.dir().join(format!("{file_id:06}.tags"));
566 direct::fsync(&output.tmp_file)?;
567 fs::rename(&tmp_path, &final_data_path)?;
568 fs::rename(&tp, &final_tags_path)?;
569 dir_fsync(shard.dir())?;
570
571 (enc.pages_written * 4096, Some(final_tags_path))
572 }
573 None => {
574 direct::fsync(&output.tmp_file)?;
575 fs::rename(&tmp_path, &final_data_path)?;
576 dir_fsync(shard.dir())?;
577 (output.write_offset, None)
578 }
579 };
580
581 #[cfg(not(feature = "encryption"))]
582 {
583 direct::fsync(&output.tmp_file)?;
584 fs::rename(&tmp_path, &final_data_path)?;
585 dir_fsync(shard.dir())?;
586 }
587
588 Ok(FinalizedOutput {
589 file_id,
590 #[cfg(feature = "encryption")]
591 total_bytes,
592 #[cfg(not(feature = "encryption"))]
593 total_bytes: output.write_offset,
594 final_data_path,
595 #[cfg(feature = "encryption")]
596 final_tag_path,
597 })
598}
599
600pub struct Compactor {
602 stop: crate::shutdown::ShutdownSignal,
603 handle: Option<std::thread::JoinHandle<()>>,
604}
605
606impl Compactor {
607 pub fn start(
609 compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
610 interval: Duration,
611 ) -> Self {
612 Self::start_with_signal(compact_fn, interval, crate::shutdown::ShutdownSignal::new())
613 }
614
615 pub fn start_with_signal(
619 compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
620 interval: Duration,
621 signal: crate::shutdown::ShutdownSignal,
622 ) -> Self {
623 let stop = signal.clone();
624 let handle = std::thread::spawn(move || {
625 while !stop.is_shutdown() {
626 if stop.wait_timeout(interval) {
627 break;
628 }
629 match compact_fn() {
630 Ok(n) if n > 0 => tracing::info!(entries = n, "compaction cycle"),
631 Err(e) => tracing::error!(error = %e, "compaction error"),
632 _ => {}
633 }
634 }
635 });
636 Self {
637 stop: signal,
638 handle: Some(handle),
639 }
640 }
641
642 pub fn stop(&mut self) {
643 self.stop.shutdown();
644 if let Some(h) = self.handle.take() {
645 let _ = h.join();
646 }
647 }
648}
649
650impl Drop for Compactor {
651 fn drop(&mut self) {
652 self.stop();
653 }
654}
655
656fn append_hint_entry<K: Key>(
659 buf: &mut Vec<u8>,
660 gsn: u64,
661 key: &K,
662 value_offset: u64,
663 value_len: u32,
664 _key_len: usize,
665) {
666 buf.extend_from_slice(&gsn.to_ne_bytes());
667 buf.extend_from_slice(key.as_bytes());
668 buf.extend_from_slice(&value_offset.to_ne_bytes());
669 buf.extend_from_slice(&value_len.to_ne_bytes());
670}
671
672fn file_max_gsn(hint_path: &std::path::Path, _file_id: u32, key_len: usize) -> Option<u64> {
679 let data = hint::read_hint_file(hint_path).ok()??;
680 let entry_size = hint::hint_entry_size(key_len);
681 if data.is_empty() || data.len() % entry_size != 0 {
682 return None;
683 }
684 let mut max_gsn = 0u64;
685 for chunk in data.chunks_exact(entry_size) {
686 let gsn_bytes: [u8; 8] = chunk[..8].try_into().ok()?;
687 let gsn = u64::from_ne_bytes(gsn_bytes) & !crate::entry::TOMBSTONE_BIT;
688 if gsn > max_gsn {
689 max_gsn = gsn;
690 }
691 }
692 Some(max_gsn)
693}
694
695#[cfg(test)]
696mod compaction_output_rotation_tests {
697 use super::*;
698
699 #[test]
700 fn output_should_rotate_when_offset_plus_needed_exceeds_limit() {
701 assert!(!should_rotate_output(0, 4096, 64 * 4096));
702 assert!(!should_rotate_output(60 * 4096, 4096, 64 * 4096));
703 assert!(should_rotate_output(63 * 4096, 4096 + 1, 64 * 4096));
704 assert!(should_rotate_output(
705 u32::MAX as u64 - 10,
706 100,
707 u32::MAX as u64
708 ));
709 }
710}
711
712#[cfg(test)]
713mod compaction_file_max_gsn_tests {
714 use super::*;
715 use tempfile::tempdir;
716
717 #[test]
718 fn file_max_gsn_returns_true_max_not_last_entry() {
719 let dir = tempdir().unwrap();
720 let hint_path = dir.path().join("000001.hint");
721 let key_len = 8;
722 let entry_size = crate::hint::hint_entry_size(key_len);
723
724 let mut data = Vec::new();
727 for &gsn in &[100u64, 300u64, 150u64] {
728 let start = data.len();
729 data.resize(start + entry_size, 0);
730 data[start..start + 8].copy_from_slice(&gsn.to_ne_bytes());
731 }
732 std::fs::write(&hint_path, &data).unwrap();
733
734 let result = file_max_gsn(&hint_path, 1, key_len);
735 assert_eq!(result, Some(300));
736 }
737
738 #[test]
739 fn file_max_gsn_strips_tombstone_bit() {
740 let dir = tempdir().unwrap();
741 let hint_path = dir.path().join("000002.hint");
742 let key_len = 8;
743 let entry_size = crate::hint::hint_entry_size(key_len);
744
745 let tombstone_gsn = 200u64 | crate::entry::TOMBSTONE_BIT;
746 let mut data = vec![0u8; entry_size * 2];
747 data[0..8].copy_from_slice(&100u64.to_ne_bytes());
748 data[entry_size..entry_size + 8].copy_from_slice(&tombstone_gsn.to_ne_bytes());
749 std::fs::write(&hint_path, &data).unwrap();
750
751 let result = file_max_gsn(&hint_path, 2, key_len);
752 assert_eq!(result, Some(200));
753 }
754}