Skip to main content

armdb/
compaction.rs

1use std::fs;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use zerocopy::FromBytes;
7
8use crate::Key;
9use crate::disk_loc::DiskLoc;
10use crate::entry::{EntryHeader, entry_size};
11use crate::error::DbResult;
12use crate::hint;
13use crate::io::direct;
14use crate::shard::{ImmutableFile, Shard};
15
16type ReadFn = dyn Fn(&std::fs::File, u64, usize) -> DbResult<Vec<u8>>;
17
18#[cfg(feature = "encryption")]
19use crate::crypto::PageCipher;
20#[cfg(feature = "encryption")]
21use crate::io::tags::{self, TagFile};
22
23pub trait CompactionIndex<K: Key>: Send + Sync {
24    /// If the current index points to `old_loc`, it is updated to `new_loc` and returns true.
25    fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool;
26
27    /// Invalidate cached blocks for a file after compaction replaces its contents.
28    fn invalidate_blocks(&self, _shard_id: u8, _file_id: u32, _total_bytes: u64) {}
29
30    /// Returns true if the key currently exists in the index (i.e. has a live Put).
31    fn contains_key(&self, key: &K) -> bool;
32}
33
34/// Guard that prevents compaction from removing files with unreplicated entries.
35#[cfg(feature = "replication")]
36pub trait CompactionGuard: Send + Sync {
37    /// Minimum GSN replicated to all followers for this shard.
38    /// Files containing entries with GSN above this value must not be compacted.
39    fn min_replicated_gsn(&self, shard_id: u8) -> u64;
40}
41
42/// No-op guard — allows compaction of all files.
43#[cfg(feature = "replication")]
44pub struct NoReplicationGuard;
45
46#[cfg(feature = "replication")]
47impl CompactionGuard for NoReplicationGuard {
48    fn min_replicated_gsn(&self, _shard_id: u8) -> u64 {
49        u64::MAX
50    }
51}
52
53pub fn compact_shard<K: Key, I: CompactionIndex<K>>(
54    shard: &Shard,
55    index: &I,
56    threshold: f64,
57) -> DbResult<usize> {
58    compact_shard_inner::<K, I>(shard, index, threshold, u64::MAX)
59}
60
61/// Compact with replication guard — skip files whose max GSN >= min_replicated_gsn.
62#[cfg(feature = "replication")]
63pub fn compact_shard_guarded<K: Key, I: CompactionIndex<K>>(
64    shard: &Shard,
65    index: &I,
66    threshold: f64,
67    guard: &dyn CompactionGuard,
68) -> DbResult<usize> {
69    let min_gsn = guard.min_replicated_gsn(shard.id);
70    compact_shard_inner::<K, I>(shard, index, threshold, min_gsn)
71}
72
73fn compact_shard_inner<K: Key, I: CompactionIndex<K>>(
74    shard: &Shard,
75    index: &I,
76    threshold: f64,
77    min_replicated_gsn: u64,
78) -> DbResult<usize> {
79    let mut files_to_compact = Vec::new();
80
81    // 1. Find immutable files exceeding the garbage ratio threshold
82    #[cfg(feature = "encryption")]
83    let cipher_opt: Option<Arc<PageCipher>>;
84    {
85        let inner = shard.lock();
86        #[cfg(feature = "encryption")]
87        {
88            cipher_opt = inner.cipher.clone();
89        }
90        for file in &inner.immutable {
91            let total = file.total_bytes;
92            let dead = inner.dead_bytes.get(&file.file_id).copied().unwrap_or(0);
93            if total > 0 && (dead as f64 / total as f64) > threshold {
94                files_to_compact.push(file.clone());
95            }
96        }
97    }
98
99    if files_to_compact.is_empty() {
100        return Ok(0);
101    }
102
103    // Filter out files that haven't been fully replicated to all followers
104    if min_replicated_gsn < u64::MAX {
105        files_to_compact.retain(|file| {
106            let hint_path = shard.dir().join(format!("{:06}.hint", file.file_id));
107            match file_max_gsn(&hint_path, file.file_id, size_of::<K>()) {
108                Some(max_gsn) => max_gsn < min_replicated_gsn,
109                None => false, // No hint → conservative, skip
110            }
111        });
112        if files_to_compact.is_empty() {
113            return Ok(0);
114        }
115    }
116
117    files_to_compact.sort_by_key(|f| f.file_id);
118    files_to_compact.truncate(4);
119    let compact_start = std::time::Instant::now();
120
121    let new_file_id = {
122        let mut inner = shard.lock();
123        let id = inner.next_file_id;
124        inner.next_file_id += 1;
125        id
126    };
127    let old_file_ids: Vec<u32> = files_to_compact.iter().map(|f| f.file_id).collect();
128
129    // Open temporary file
130    let tmp_path = shard.dir().join(format!("{new_file_id:06}.data.tmp"));
131    let tmp_file = direct::open_write(&tmp_path)?;
132    let mut write_offset: u64 = 0;
133
134    const BATCH_SIZE: usize = 256;
135
136    struct BatchEntry<K> {
137        key: K,
138        gsn: u64,
139        old_loc: DiskLoc,
140        new_loc: DiskLoc,
141        is_tombstone: bool,
142    }
143
144    let mut batch: Vec<BatchEntry<K>> = Vec::with_capacity(BATCH_SIZE);
145
146    // For encrypted compaction: buffer all plaintext entries to encrypt at the end
147    #[cfg(feature = "encryption")]
148    let mut plaintext_buf: Option<Vec<u8>> = if cipher_opt.is_some() {
149        Some(Vec::new())
150    } else {
151        None
152    };
153
154    // Scan old files
155    for old_arc in &files_to_compact {
156        let file = &old_arc.file;
157        let file_len = old_arc.total_bytes;
158        let mut offset: u64 = 0;
159
160        // Build reader for this file (encrypted or plain)
161        #[cfg(feature = "encryption")]
162        let read_fn: Box<ReadFn> =
163            if let (Some(cipher), Some(_tag_file)) = (&cipher_opt, &old_arc.tag_file) {
164                let c = cipher.clone();
165                let fid = old_arc.file_id;
166                let tp = tags::tags_path_for_data(&old_arc.path);
167                let tf = Arc::new(TagFile::open_read(&tp)?);
168                Box::new(move |f, o, l| direct::pread_value_encrypted(f, &tf, &c, fid, o, l))
169            } else {
170                Box::new(direct::pread_value)
171            };
172        #[cfg(not(feature = "encryption"))]
173        let read_fn: Box<ReadFn> = Box::new(direct::pread_value);
174
175        while offset + size_of::<EntryHeader>() as u64 <= file_len {
176            let header_bytes = match read_fn(file, offset, size_of::<EntryHeader>()) {
177                Ok(b) => b,
178                Err(_) => break,
179            };
180            let header = match EntryHeader::read_from_bytes(&header_bytes) {
181                Ok(h) => h,
182                Err(_) => break,
183            };
184
185            let total = entry_size(size_of::<K>(), header.value_len);
186            if offset + total > file_len {
187                break;
188            }
189
190            let old_loc = DiskLoc::new(
191                shard.id,
192                old_arc.file_id as u16,
193                (offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
194                header.value_len,
195            );
196
197            let entry_bytes = read_fn(file, offset, total as usize)?;
198
199            let key_bytes = &entry_bytes[16..16 + size_of::<K>()];
200            let key: K = K::from_bytes(key_bytes);
201
202            // Write to temp file (or buffer for later encryption)
203            #[cfg(feature = "encryption")]
204            if let Some(ref mut buf) = plaintext_buf {
205                if buf.len() < (write_offset + total) as usize {
206                    buf.resize((write_offset + total) as usize, 0);
207                }
208                buf[write_offset as usize..(write_offset + total) as usize]
209                    .copy_from_slice(&entry_bytes);
210            } else {
211                direct::pwrite_at(&tmp_file, &entry_bytes, write_offset)?;
212            }
213            #[cfg(not(feature = "encryption"))]
214            direct::pwrite_at(&tmp_file, &entry_bytes, write_offset)?;
215
216            let new_loc = DiskLoc::new(
217                shard.id,
218                new_file_id as u16,
219                (write_offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
220                header.value_len,
221            );
222
223            batch.push(BatchEntry {
224                key,
225                gsn: header.gsn,
226                old_loc,
227                new_loc,
228                is_tombstone: header.is_tombstone(),
229            });
230
231            write_offset += total;
232
233            offset += total;
234        }
235    }
236
237    // Encrypt and write the buffered plaintext if encryption is enabled
238    #[cfg(feature = "encryption")]
239    let tmp_tags_path = if let (Some(cipher), Some(mut buf)) = (&cipher_opt, plaintext_buf.take()) {
240        // Pad to page boundary
241        let padded_len = (buf.len() + 4095) & !4095;
242        buf.resize(padded_len, 0);
243        let num_pages = padded_len / 4096;
244        let mut tag_list = Vec::with_capacity(num_pages);
245        for i in 0..num_pages {
246            let page = &mut buf[i * 4096..(i + 1) * 4096];
247            let tag = cipher.encrypt_page(new_file_id, i as u64, page)?;
248            tag_list.push(tag);
249        }
250        direct::pwrite_at(&tmp_file, &buf, 0)?;
251
252        // Write tags
253        let tp = shard.dir().join(format!("{new_file_id:06}.tags.tmp"));
254        let tf = TagFile::open_write(&tp)?;
255        tf.write_tags(0, &tag_list)?;
256        tf.sync()?;
257        Some(tp)
258    } else {
259        None
260    };
261
262    direct::fsync(&tmp_file)?;
263
264    let final_data_path = shard.dir().join(format!("{new_file_id:06}.data"));
265
266    // Critical section 1: Expose new file to readers
267    {
268        let mut inner = shard.lock();
269        fs::rename(&tmp_path, &final_data_path)?;
270
271        #[cfg(feature = "encryption")]
272        let final_tag_file = if let Some(ref tp) = tmp_tags_path {
273            let final_tags_path = shard.dir().join(format!("{new_file_id:06}.tags"));
274            fs::rename(tp, &final_tags_path)?;
275            Some(TagFile::open_read(&final_tags_path)?)
276        } else {
277            None
278        };
279
280        let final_file = direct::open_read(&final_data_path)?;
281        inner.immutable.push(Arc::new(ImmutableFile {
282            file: final_file,
283            file_id: new_file_id,
284            #[cfg(feature = "encryption")]
285            path: final_data_path,
286            total_bytes: write_offset,
287            #[cfg(feature = "encryption")]
288            tag_file: final_tag_file,
289        }));
290        inner.immutable.sort_by_key(|f| f.file_id);
291    }
292
293    let mut compacted_entries = 0;
294    let mut live_hint_data: Vec<u8> = Vec::new();
295    let key_len = size_of::<K>();
296
297    for chunk in batch.chunks(BATCH_SIZE) {
298        let mut inner = shard.lock();
299        for entry in chunk {
300            if entry.is_tombstone {
301                if index.contains_key(&entry.key) {
302                    // A live Put exists, this tombstone is superseded and can be dropped.
303                    inner.add_dead_bytes(
304                        entry.new_loc.file_id as u32,
305                        entry_size(size_of::<K>(), entry.new_loc.len),
306                    );
307                } else {
308                    // No live Put, we must preserve this tombstone to shadow older files.
309                    compacted_entries += 1;
310                    append_hint_entry(
311                        &mut live_hint_data,
312                        entry.gsn,
313                        &entry.key,
314                        entry.new_loc.offset as u64,
315                        entry.new_loc.len,
316                        key_len,
317                    );
318                }
319            } else {
320                if index.update_if_match(&entry.key, entry.old_loc, entry.new_loc) {
321                    compacted_entries += 1;
322                    append_hint_entry(
323                        &mut live_hint_data,
324                        entry.gsn,
325                        &entry.key,
326                        entry.new_loc.offset as u64,
327                        entry.new_loc.len,
328                        key_len,
329                    );
330                } else {
331                    inner.add_dead_bytes(
332                        entry.new_loc.file_id as u32,
333                        entry_size(size_of::<K>(), entry.new_loc.len),
334                    );
335                }
336            }
337        }
338    }
339
340    // Critical section 2: Remove old files
341    {
342        let mut inner = shard.lock();
343        inner
344            .immutable
345            .retain(|f| !old_file_ids.contains(&f.file_id));
346        for fid in &old_file_ids {
347            inner.dead_bytes.remove(fid);
348        }
349    }
350
351    let hint_data = live_hint_data;
352    let tmp_hint_path = shard.dir().join(format!("{new_file_id:06}.hint.tmp"));
353    hint::write_hint_file(&tmp_hint_path, &hint_data)?;
354    let final_hint_path = shard.dir().join(format!("{new_file_id:06}.hint"));
355    fs::rename(&tmp_hint_path, &final_hint_path)?;
356
357    // Invalidate cached blocks for compacted files
358    for old_arc in &files_to_compact {
359        index.invalidate_blocks(shard.id, old_arc.file_id, old_arc.total_bytes);
360    }
361
362    // Delete old data, hint, and tag files
363    for fid in &old_file_ids {
364        let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.data")));
365        let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.hint")));
366        #[cfg(feature = "encryption")]
367        let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.tags")));
368    }
369
370    let elapsed = compact_start.elapsed().as_secs_f64();
371    metrics::counter!("armdb.compaction.runs").increment(1);
372    metrics::counter!("armdb.compaction.entries").increment(compacted_entries as u64);
373    metrics::histogram!("armdb.compaction.duration_seconds").record(elapsed);
374    tracing::info!(
375        entries = compacted_entries,
376        files = old_file_ids.len(),
377        elapsed_ms = (elapsed * 1000.0) as u64,
378        "compaction complete"
379    );
380    Ok(compacted_entries)
381}
382
383/// Background compaction handle.
384pub struct Compactor {
385    stop: Arc<AtomicBool>,
386    handle: Option<std::thread::JoinHandle<()>>,
387}
388
389impl Compactor {
390    pub fn start(
391        compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
392        interval: Duration,
393    ) -> Self {
394        let stop = Arc::new(AtomicBool::new(false));
395        let stop_flag = stop.clone();
396        let handle = std::thread::spawn(move || {
397            while !stop_flag.load(Ordering::Relaxed) {
398                std::thread::sleep(interval);
399                if stop_flag.load(Ordering::Relaxed) {
400                    break;
401                }
402                match compact_fn() {
403                    Ok(n) if n > 0 => tracing::info!(entries = n, "compaction cycle"),
404                    Err(e) => tracing::error!(error = %e, "compaction error"),
405                    _ => {}
406                }
407            }
408        });
409        Self {
410            stop,
411            handle: Some(handle),
412        }
413    }
414
415    pub fn stop(&mut self) {
416        self.stop.store(true, Ordering::Relaxed);
417        if let Some(h) = self.handle.take() {
418            let _ = h.join();
419        }
420    }
421}
422
423impl Drop for Compactor {
424    fn drop(&mut self) {
425        self.stop();
426    }
427}
428
429/// Append a hint entry (GSN | Key | Offset | Len) to the buffer.
430/// Same binary format as `hint::generate_hint_data_dyn`.
431fn append_hint_entry<K: Key>(
432    buf: &mut Vec<u8>,
433    gsn: u64,
434    key: &K,
435    value_offset: u64,
436    value_len: u32,
437    _key_len: usize,
438) {
439    buf.extend_from_slice(&gsn.to_ne_bytes());
440    buf.extend_from_slice(key.as_bytes());
441    buf.extend_from_slice(&value_offset.to_ne_bytes());
442    buf.extend_from_slice(&value_len.to_ne_bytes());
443}
444
445/// Read the maximum GSN from a hint file. Returns None if the hint file
446/// doesn't exist or can't be parsed.
447fn file_max_gsn(hint_path: &std::path::Path, _file_id: u32, key_len: usize) -> Option<u64> {
448    let data = hint::read_hint_file(hint_path).ok()??;
449    let entry_size = hint::hint_entry_size(key_len);
450    if data.is_empty() || data.len() % entry_size != 0 {
451        return None;
452    }
453    let entry_count = data.len() / entry_size;
454    let last_start = (entry_count - 1) * entry_size;
455    let gsn_bytes: [u8; 8] = data[last_start..last_start + 8].try_into().ok()?;
456    let gsn = u64::from_ne_bytes(gsn_bytes);
457    Some(gsn & !crate::entry::TOMBSTONE_BIT)
458}