Skip to main content

armdb/
compaction.rs

1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use zerocopy::FromBytes;
6
7use crate::Key;
8use crate::disk_loc::DiskLoc;
9use crate::entry::{EntryHeader, entry_size};
10use crate::error::DbResult;
11use crate::hint;
12use crate::io::direct;
13use crate::shard::{ImmutableFile, Shard};
14
15type ReadFn = dyn Fn(&std::fs::File, u64, usize) -> DbResult<Vec<u8>>;
16
17#[cfg(feature = "encryption")]
18use crate::crypto::PageCipher;
19#[cfg(feature = "encryption")]
20use crate::io::tags::{self, TagFile};
21
22fn dir_fsync(dir: &std::path::Path) -> DbResult<()> {
23    let d = fs::File::open(dir)?;
24    d.sync_all()?;
25    Ok(())
26}
27
28pub trait CompactionIndex<K: Key>: Send + Sync {
29    /// If the current index points to `old_loc`, it is updated to `new_loc` and returns true.
30    fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool;
31
32    /// Invalidate cached blocks for a file after compaction replaces its contents.
33    fn invalidate_blocks(&self, _shard_id: u8, _file_id: u32, _total_bytes: u64) {}
34
35    /// Returns true if the key currently exists in the index (i.e. has a live Put).
36    fn contains_key(&self, key: &K) -> bool;
37}
38
39/// Guard that prevents compaction from removing files with unreplicated entries.
40#[cfg(feature = "replication")]
41pub trait CompactionGuard: Send + Sync {
42    /// Minimum GSN replicated to all followers for this shard.
43    /// Files containing entries with GSN above this value must not be compacted.
44    fn min_replicated_gsn(&self, shard_id: u8) -> u64;
45}
46
47/// No-op guard — allows compaction of all files.
48#[cfg(feature = "replication")]
49pub struct NoReplicationGuard;
50
51#[cfg(feature = "replication")]
52impl CompactionGuard for NoReplicationGuard {
53    fn min_replicated_gsn(&self, _shard_id: u8) -> u64 {
54        u64::MAX
55    }
56}
57
58/// Returns true when the next write of `needed` bytes at `write_offset` would
59/// exceed `max_file_size`. Extracted as a pure function for unit-testability.
60fn should_rotate_output(write_offset: u64, needed: u64, max_file_size: u64) -> bool {
61    write_offset + needed > max_file_size
62}
63
64pub fn compact_shard<K: Key, I: CompactionIndex<K>>(
65    shard: &Shard,
66    index: &I,
67    threshold: f64,
68) -> DbResult<usize> {
69    compact_shard_inner::<K, I>(shard, index, threshold, u64::MAX)
70}
71
72/// Compact with replication guard — skip files whose max GSN >= min_replicated_gsn.
73#[cfg(feature = "replication")]
74pub fn compact_shard_guarded<K: Key, I: CompactionIndex<K>>(
75    shard: &Shard,
76    index: &I,
77    threshold: f64,
78    guard: &dyn CompactionGuard,
79) -> DbResult<usize> {
80    let min_gsn = guard.min_replicated_gsn(shard.id);
81    compact_shard_inner::<K, I>(shard, index, threshold, min_gsn)
82}
83
84/// State for the currently open compaction output file.
85struct OutputState {
86    file_id: u32,
87    tmp_file: fs::File,
88    write_offset: u64,
89    #[cfg(feature = "encryption")]
90    enc_state: Option<EncryptionState>,
91}
92
93#[cfg(feature = "encryption")]
94struct EncryptionState {
95    cipher: Arc<PageCipher>,
96    page_buf: Box<[u8; 4096]>,
97    page_offset: usize,
98    pages_written: u64,
99    tag_list: Vec<[u8; 16]>,
100}
101
102#[cfg(feature = "encryption")]
103impl EncryptionState {
104    fn flush_page(&mut self, file: &fs::File, file_id: u32) -> DbResult<()> {
105        let tag = self
106            .cipher
107            .encrypt_page(file_id, self.pages_written, &mut *self.page_buf)?;
108        direct::pwrite_at(file, &*self.page_buf, self.pages_written * 4096)?;
109        self.tag_list.push(tag);
110        self.pages_written += 1;
111        self.page_buf.fill(0);
112        self.page_offset = 0;
113        Ok(())
114    }
115
116    fn write_bytes(&mut self, file: &fs::File, file_id: u32, data: &[u8]) -> DbResult<()> {
117        let mut remaining = data;
118        while !remaining.is_empty() {
119            let space = 4096 - self.page_offset;
120            let chunk = remaining.len().min(space);
121            self.page_buf[self.page_offset..self.page_offset + chunk]
122                .copy_from_slice(&remaining[..chunk]);
123            self.page_offset += chunk;
124            remaining = &remaining[chunk..];
125            if self.page_offset == 4096 {
126                self.flush_page(file, file_id)?;
127            }
128        }
129        Ok(())
130    }
131}
132
133/// A finalized compaction output ready to be registered as immutable.
134struct FinalizedOutput {
135    file_id: u32,
136    total_bytes: u64,
137    final_data_path: std::path::PathBuf,
138    #[cfg(feature = "encryption")]
139    final_tag_path: Option<std::path::PathBuf>,
140}
141
142fn compact_shard_inner<K: Key, I: CompactionIndex<K>>(
143    shard: &Shard,
144    index: &I,
145    threshold: f64,
146    min_replicated_gsn: u64,
147) -> DbResult<usize> {
148    let mut files_to_compact = Vec::new();
149
150    // 1. Find immutable files exceeding the garbage ratio threshold
151    #[cfg(feature = "encryption")]
152    let cipher_opt: Option<Arc<PageCipher>>;
153    let max_file_size: u64;
154    let cooldown_ids: Vec<u32>;
155    {
156        let mut inner = shard.lock();
157        max_file_size = inner.max_file_size;
158        cooldown_ids = std::mem::take(&mut inner.last_compaction_output_ids);
159        #[cfg(feature = "encryption")]
160        {
161            cipher_opt = inner.cipher.clone();
162        }
163        for file in &inner.immutable {
164            let total = file.total_bytes;
165            let dead = inner.dead_bytes.get(&file.file_id).copied().unwrap_or(0);
166            if total > 0
167                && (dead as f64 / total as f64) > threshold
168                && !cooldown_ids.contains(&file.file_id)
169            {
170                files_to_compact.push(file.clone());
171            }
172        }
173    }
174
175    if files_to_compact.is_empty() {
176        return Ok(0);
177    }
178
179    // Filter out files that haven't been fully replicated to all followers
180    if min_replicated_gsn < u64::MAX {
181        files_to_compact.retain(|file| {
182            let hint_path = shard.dir().join(format!("{:06}.hint", file.file_id));
183            match file_max_gsn(&hint_path, file.file_id, size_of::<K>()) {
184                Some(max_gsn) => max_gsn < min_replicated_gsn,
185                None => false, // No hint → conservative, skip
186            }
187        });
188        if files_to_compact.is_empty() {
189            return Ok(0);
190        }
191    }
192
193    files_to_compact.sort_by_key(|f| f.file_id);
194    files_to_compact.truncate(4);
195    let compact_start = std::time::Instant::now();
196
197    let old_file_ids: Vec<u32> = files_to_compact.iter().map(|f| f.file_id).collect();
198
199    // Allocate the first output file_id under the lock.
200    let first_file_id = {
201        let mut inner = shard.lock();
202        inner.allocate_file_id()?
203    };
204
205    let mut output = open_output(shard, first_file_id)?;
206
207    // Entries accumulated for deferred index update and dead-byte accounting.
208    const BATCH_SIZE: usize = 256;
209
210    struct BatchEntry<K> {
211        key: K,
212        gsn: u64,
213        old_loc: DiskLoc,
214        new_loc: DiskLoc,
215        is_tombstone: bool,
216    }
217
218    let mut batch: Vec<BatchEntry<K>> = Vec::with_capacity(BATCH_SIZE);
219    // Outputs that have been written and fsynced but not yet registered as immutable.
220    let mut pending_outputs: Vec<FinalizedOutput> = Vec::new();
221
222    // Scan old files
223    for old_arc in &files_to_compact {
224        let file = &old_arc.file;
225        let file_len = old_arc.total_bytes;
226        let mut offset: u64 = 0;
227
228        // Build reader for this file (encrypted or plain)
229        #[cfg(feature = "encryption")]
230        let read_fn: Box<ReadFn> =
231            if let (Some(cipher), Some(_tag_file)) = (&cipher_opt, &old_arc.tag_file) {
232                let c = cipher.clone();
233                let fid = old_arc.file_id;
234                let tp = tags::tags_path_for_data(&old_arc.path);
235                let tf = Arc::new(TagFile::open_read(&tp)?);
236                Box::new(move |f, o, l| direct::pread_value_encrypted(f, &tf, &c, fid, o, l))
237            } else {
238                Box::new(direct::pread_value)
239            };
240        #[cfg(not(feature = "encryption"))]
241        let read_fn: Box<ReadFn> = Box::new(direct::pread_value);
242
243        while offset + size_of::<EntryHeader>() as u64 <= file_len {
244            let header_bytes = match read_fn(file, offset, size_of::<EntryHeader>()) {
245                Ok(b) => b,
246                Err(_) => break,
247            };
248            let header = match EntryHeader::read_from_bytes(&header_bytes) {
249                Ok(h) => h,
250                Err(_) => break,
251            };
252
253            // Detect zeroed padding at the end of encrypted pages.
254            if header.gsn == 0 && header.crc32 == 0 && header.value_len == 0 {
255                break;
256            }
257
258            let total = entry_size(size_of::<K>(), header.value_len);
259            if offset + total > file_len {
260                break;
261            }
262
263            // Rotate output file if this entry would push past max_file_size.
264            // Skip rotation when output is empty: an oversized entry writes into
265            // an empty file regardless (defensive guard, not reachable today due
266            // to Config::validate invariants).
267            if output.write_offset > 0
268                && should_rotate_output(output.write_offset, total, max_file_size)
269            {
270                let next_file_id = {
271                    let mut inner = shard.lock();
272                    inner.allocate_file_id()?
273                };
274                let finished = finalize_output(shard, output)?;
275                pending_outputs.push(finished);
276                output = open_output(shard, next_file_id)?;
277            }
278
279            let old_loc = DiskLoc::new(
280                shard.id,
281                old_arc.file_id,
282                (offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
283                header.value_len,
284            );
285
286            let entry_bytes = read_fn(file, offset, total as usize)?;
287
288            // Verify CRC integrity before copying to compacted output (V-10 fix).
289            let key_start = size_of::<EntryHeader>();
290            let key_end = key_start + size_of::<K>();
291            let val_end = key_end + header.value_len as usize;
292            let computed_crc = crate::entry::compute_crc32(
293                header.gsn,
294                header.value_len,
295                &entry_bytes[key_start..key_end],
296                &entry_bytes[key_end..val_end],
297            );
298            if computed_crc != header.crc32 {
299                // Abort: clean up already-finalized outputs (renamed to final .data).
300                for out in &pending_outputs {
301                    let _ = fs::remove_file(&out.final_data_path);
302                    #[cfg(feature = "encryption")]
303                    if let Some(ref tp) = out.final_tag_path {
304                        let _ = fs::remove_file(tp);
305                    }
306                }
307                // Clean up current in-progress output tmp file.
308                let cur_tmp = shard.dir().join(format!("{:06}.data.tmp", output.file_id));
309                let _ = fs::remove_file(&cur_tmp);
310                #[cfg(feature = "encryption")]
311                {
312                    let cur_tags_tmp = shard.dir().join(format!("{:06}.tags.tmp", output.file_id));
313                    let _ = fs::remove_file(&cur_tags_tmp);
314                }
315                return Err(crate::error::DbError::CrcMismatch {
316                    expected: header.crc32,
317                    actual: computed_crc,
318                });
319            }
320
321            // Write to output (streaming encryption or direct).
322            #[cfg(feature = "encryption")]
323            if let Some(ref mut enc) = output.enc_state {
324                enc.write_bytes(&output.tmp_file, output.file_id, &entry_bytes)?;
325            } else {
326                direct::pwrite_at(&output.tmp_file, &entry_bytes, output.write_offset)?;
327            }
328            #[cfg(not(feature = "encryption"))]
329            direct::pwrite_at(&output.tmp_file, &entry_bytes, output.write_offset)?;
330
331            let value_offset =
332                output.write_offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64;
333            debug_assert!(value_offset <= u32::MAX as u64);
334            let new_loc = DiskLoc::new(
335                shard.id,
336                output.file_id,
337                value_offset as u32,
338                header.value_len,
339            );
340
341            let key_bytes = &entry_bytes[16..16 + size_of::<K>()];
342            let key: K = K::from_bytes(key_bytes);
343
344            batch.push(BatchEntry {
345                key,
346                gsn: header.gsn,
347                old_loc,
348                new_loc,
349                is_tombstone: header.is_tombstone(),
350            });
351
352            output.write_offset += total;
353            offset += total;
354        }
355    }
356
357    // Finalize the last output file only if something was written to it.
358    if output.write_offset > 0 {
359        let last_output = finalize_output(shard, output)?;
360        pending_outputs.push(last_output);
361    } else {
362        // Empty output (no live entries in compaction set). Remove the
363        // {file_id}.data.tmp (and tags.tmp if encrypted) that open_output
364        // already created so no orphans are left on disk.
365        let tmp_data_path = shard.dir().join(format!("{:06}.data.tmp", output.file_id));
366        let _ = std::fs::remove_file(&tmp_data_path);
367        #[cfg(feature = "encryption")]
368        {
369            let tmp_tags_path = shard.dir().join(format!("{:06}.tags.tmp", output.file_id));
370            let _ = std::fs::remove_file(&tmp_tags_path);
371        }
372    }
373
374    // Critical section 1: Expose all new output files to readers.
375    // Open all read handles outside the lock so that a failure cannot leave
376    // the shard with partially registered outputs.
377    #[cfg(feature = "encryption")]
378    let new_immutables: Vec<Arc<ImmutableFile>> = pending_outputs
379        .iter()
380        .map(|out| {
381            let final_file = direct::open_read(&out.final_data_path)?;
382            let final_tag_file = if let Some(ref tp) = out.final_tag_path {
383                Some(Arc::new(TagFile::open_read(tp)?))
384            } else {
385                None
386            };
387            Ok(Arc::new(ImmutableFile {
388                file: final_file,
389                file_id: out.file_id,
390                path: out.final_data_path.clone(),
391                total_bytes: out.total_bytes,
392                tag_file: final_tag_file,
393            }))
394        })
395        .collect::<DbResult<Vec<_>>>()?;
396
397    #[cfg(not(feature = "encryption"))]
398    let new_immutables: Vec<Arc<ImmutableFile>> = pending_outputs
399        .iter()
400        .map(|out| {
401            let final_file = direct::open_read(&out.final_data_path)?;
402            Ok(Arc::new(ImmutableFile {
403                file: final_file,
404                file_id: out.file_id,
405                total_bytes: out.total_bytes,
406            }))
407        })
408        .collect::<DbResult<Vec<_>>>()?;
409
410    {
411        let mut inner = shard.lock();
412        inner.immutable.extend(new_immutables);
413        inner.immutable.sort_by_key(|f| f.file_id);
414        inner.last_compaction_output_ids = pending_outputs.iter().map(|o| o.file_id).collect();
415    }
416
417    let mut compacted_entries = 0;
418    let key_len = size_of::<K>();
419
420    // Build per-output-file hint buffers: entries that survive the index update
421    // are kept; superseded entries are accounted as dead bytes.
422    // The hint_data already stored in each output contains ALL entries written
423    // to that file. We rebuild per-file live hint buffers during the index pass.
424    let mut live_hint_data: std::collections::HashMap<u32, Vec<u8>> = pending_outputs
425        .iter()
426        .map(|o| (o.file_id, Vec::new()))
427        .collect();
428
429    for chunk in batch.chunks(BATCH_SIZE) {
430        let mut inner = shard.lock();
431        for entry in chunk {
432            if entry.is_tombstone {
433                if index.contains_key(&entry.key) {
434                    inner.add_dead_bytes(
435                        entry.new_loc.file_id,
436                        entry_size(size_of::<K>(), entry.new_loc.len),
437                    );
438                } else {
439                    compacted_entries += 1;
440                    if let Some(buf) = live_hint_data.get_mut(&entry.new_loc.file_id) {
441                        append_hint_entry(
442                            buf,
443                            entry.gsn,
444                            &entry.key,
445                            entry.new_loc.offset as u64,
446                            entry.new_loc.len,
447                            key_len,
448                        );
449                    }
450                }
451            } else if index.update_if_match(&entry.key, entry.old_loc, entry.new_loc) {
452                compacted_entries += 1;
453                if let Some(buf) = live_hint_data.get_mut(&entry.new_loc.file_id) {
454                    append_hint_entry(
455                        buf,
456                        entry.gsn,
457                        &entry.key,
458                        entry.new_loc.offset as u64,
459                        entry.new_loc.len,
460                        key_len,
461                    );
462                }
463            } else {
464                inner.add_dead_bytes(
465                    entry.new_loc.file_id,
466                    entry_size(size_of::<K>(), entry.new_loc.len),
467                );
468            }
469        }
470    }
471
472    // Critical section 2: Remove old files and invalidate cache entries atomically (V-11 fix).
473    {
474        let mut inner = shard.lock();
475        inner
476            .immutable
477            .retain(|f| !old_file_ids.contains(&f.file_id));
478        for fid in &old_file_ids {
479            inner.dead_bytes.remove(fid);
480        }
481        for old_arc in &files_to_compact {
482            index.invalidate_blocks(shard.id, old_arc.file_id, old_arc.total_bytes);
483        }
484    }
485
486    // Write hint files for all output files.
487    for out in &pending_outputs {
488        let hint_data = live_hint_data.remove(&out.file_id).unwrap_or_default();
489        let hint_path = shard.dir().join(format!("{:06}.hint", out.file_id));
490        hint::write_hint_file(&hint_path, &hint_data)?;
491    }
492    dir_fsync(shard.dir())?;
493
494    // Delete old data, hint, and tag files
495    for fid in &old_file_ids {
496        let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.data")));
497        let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.hint")));
498        #[cfg(feature = "encryption")]
499        let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.tags")));
500    }
501    dir_fsync(shard.dir())?;
502
503    let elapsed = compact_start.elapsed().as_secs_f64();
504    metrics::counter!("armdb.compaction.runs").increment(1);
505    metrics::counter!("armdb.compaction.entries").increment(compacted_entries as u64);
506    metrics::histogram!("armdb.compaction.duration_seconds").record(elapsed);
507    tracing::info!(
508        entries = compacted_entries,
509        files = old_file_ids.len(),
510        elapsed_ms = (elapsed * 1000.0) as u64,
511        "compaction complete"
512    );
513    Ok(compacted_entries)
514}
515
516/// Open a new compaction output file, returning an `OutputState` ready for writing.
517fn open_output(shard: &Shard, file_id: u32) -> DbResult<OutputState> {
518    let tmp_path = shard.dir().join(format!("{file_id:06}.data.tmp"));
519    match fs::remove_file(&tmp_path) {
520        Ok(()) => {}
521        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
522        Err(e) => return Err(e.into()),
523    }
524    let tmp_file = direct::open_write(&tmp_path)?;
525    Ok(OutputState {
526        file_id,
527        tmp_file,
528        write_offset: 0,
529        #[cfg(feature = "encryption")]
530        enc_state: shard.lock().cipher.clone().map(|cipher| EncryptionState {
531            cipher,
532            page_buf: Box::new([0u8; 4096]),
533            page_offset: 0,
534            pages_written: 0,
535            tag_list: Vec::new(),
536        }),
537    })
538}
539
540/// Fsync, rename tmp → final, and return a `FinalizedOutput` descriptor.
541/// Does NOT register the file as immutable — that happens later under the lock.
542fn finalize_output(shard: &Shard, output: OutputState) -> DbResult<FinalizedOutput> {
543    let file_id = output.file_id;
544    let tmp_path = shard.dir().join(format!("{file_id:06}.data.tmp"));
545    let final_data_path = shard.dir().join(format!("{file_id:06}.data"));
546
547    #[cfg(feature = "encryption")]
548    let (total_bytes, final_tag_path) = match output.enc_state {
549        Some(mut enc) => {
550            // Flush the final partial page if data remains.
551            if enc.page_offset > 0 {
552                enc.flush_page(&output.tmp_file, file_id)?;
553            }
554
555            let tp = shard.dir().join(format!("{file_id:06}.tags.tmp"));
556            match fs::remove_file(&tp) {
557                Ok(()) => {}
558                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
559                Err(e) => return Err(e.into()),
560            }
561            let tf = TagFile::open_write(&tp)?;
562            tf.write_tags(0, &enc.tag_list)?;
563            tf.sync()?;
564
565            let final_tags_path = shard.dir().join(format!("{file_id:06}.tags"));
566            direct::fsync(&output.tmp_file)?;
567            fs::rename(&tmp_path, &final_data_path)?;
568            fs::rename(&tp, &final_tags_path)?;
569            dir_fsync(shard.dir())?;
570
571            (enc.pages_written * 4096, Some(final_tags_path))
572        }
573        None => {
574            direct::fsync(&output.tmp_file)?;
575            fs::rename(&tmp_path, &final_data_path)?;
576            dir_fsync(shard.dir())?;
577            (output.write_offset, None)
578        }
579    };
580
581    #[cfg(not(feature = "encryption"))]
582    {
583        direct::fsync(&output.tmp_file)?;
584        fs::rename(&tmp_path, &final_data_path)?;
585        dir_fsync(shard.dir())?;
586    }
587
588    Ok(FinalizedOutput {
589        file_id,
590        #[cfg(feature = "encryption")]
591        total_bytes,
592        #[cfg(not(feature = "encryption"))]
593        total_bytes: output.write_offset,
594        final_data_path,
595        #[cfg(feature = "encryption")]
596        final_tag_path,
597    })
598}
599
600/// Background compaction handle.
601pub struct Compactor {
602    stop: crate::shutdown::ShutdownSignal,
603    handle: Option<std::thread::JoinHandle<()>>,
604}
605
606impl Compactor {
607    /// Start a background compaction thread with its own shutdown signal.
608    pub fn start(
609        compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
610        interval: Duration,
611    ) -> Self {
612        Self::start_with_signal(compact_fn, interval, crate::shutdown::ShutdownSignal::new())
613    }
614
615    /// Start a background compaction thread controlled by an external shutdown
616    /// signal. When the signal fires the thread wakes up immediately instead of
617    /// waiting for the full sleep interval.
618    pub fn start_with_signal(
619        compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
620        interval: Duration,
621        signal: crate::shutdown::ShutdownSignal,
622    ) -> Self {
623        let stop = signal.clone();
624        let handle = std::thread::spawn(move || {
625            while !stop.is_shutdown() {
626                if stop.wait_timeout(interval) {
627                    break;
628                }
629                match compact_fn() {
630                    Ok(n) if n > 0 => tracing::info!(entries = n, "compaction cycle"),
631                    Err(e) => tracing::error!(error = %e, "compaction error"),
632                    _ => {}
633                }
634            }
635        });
636        Self {
637            stop: signal,
638            handle: Some(handle),
639        }
640    }
641
642    pub fn stop(&mut self) {
643        self.stop.shutdown();
644        if let Some(h) = self.handle.take() {
645            let _ = h.join();
646        }
647    }
648}
649
650impl Drop for Compactor {
651    fn drop(&mut self) {
652        self.stop();
653    }
654}
655
656/// Append a hint entry (GSN | Key | Offset | Len) to the buffer.
657/// Same binary format as `hint::generate_hint_data_dyn`.
658fn append_hint_entry<K: Key>(
659    buf: &mut Vec<u8>,
660    gsn: u64,
661    key: &K,
662    value_offset: u64,
663    value_len: u32,
664    _key_len: usize,
665) {
666    buf.extend_from_slice(&gsn.to_ne_bytes());
667    buf.extend_from_slice(key.as_bytes());
668    buf.extend_from_slice(&value_offset.to_ne_bytes());
669    buf.extend_from_slice(&value_len.to_ne_bytes());
670}
671
672/// Read the maximum GSN from a hint file. Returns None if the hint file
673/// doesn't exist or can't be parsed.
674///
675/// Scans ALL entries because compacted hint files concatenate entries from
676/// multiple source files and are NOT sorted by GSN — the last entry does not
677/// necessarily hold the maximum GSN.
678fn file_max_gsn(hint_path: &std::path::Path, _file_id: u32, key_len: usize) -> Option<u64> {
679    let data = hint::read_hint_file(hint_path).ok()??;
680    let entry_size = hint::hint_entry_size(key_len);
681    if data.is_empty() || data.len() % entry_size != 0 {
682        return None;
683    }
684    let mut max_gsn = 0u64;
685    for chunk in data.chunks_exact(entry_size) {
686        let gsn_bytes: [u8; 8] = chunk[..8].try_into().ok()?;
687        let gsn = u64::from_ne_bytes(gsn_bytes) & !crate::entry::TOMBSTONE_BIT;
688        if gsn > max_gsn {
689            max_gsn = gsn;
690        }
691    }
692    Some(max_gsn)
693}
694
695#[cfg(test)]
696mod compaction_output_rotation_tests {
697    use super::*;
698
699    #[test]
700    fn output_should_rotate_when_offset_plus_needed_exceeds_limit() {
701        assert!(!should_rotate_output(0, 4096, 64 * 4096));
702        assert!(!should_rotate_output(60 * 4096, 4096, 64 * 4096));
703        assert!(should_rotate_output(63 * 4096, 4096 + 1, 64 * 4096));
704        assert!(should_rotate_output(
705            u32::MAX as u64 - 10,
706            100,
707            u32::MAX as u64
708        ));
709    }
710}
711
712#[cfg(test)]
713mod compaction_file_max_gsn_tests {
714    use super::*;
715    use tempfile::tempdir;
716
717    #[test]
718    fn file_max_gsn_returns_true_max_not_last_entry() {
719        let dir = tempdir().unwrap();
720        let hint_path = dir.path().join("000001.hint");
721        let key_len = 8;
722        let entry_size = crate::hint::hint_entry_size(key_len);
723
724        // Build a hint file with 3 entries: GSN 100, 300, 150
725        // The true max is 300 (middle entry), not 150 (last entry).
726        let mut data = Vec::new();
727        for &gsn in &[100u64, 300u64, 150u64] {
728            let start = data.len();
729            data.resize(start + entry_size, 0);
730            data[start..start + 8].copy_from_slice(&gsn.to_ne_bytes());
731        }
732        std::fs::write(&hint_path, &data).unwrap();
733
734        let result = file_max_gsn(&hint_path, 1, key_len);
735        assert_eq!(result, Some(300));
736    }
737
738    #[test]
739    fn file_max_gsn_strips_tombstone_bit() {
740        let dir = tempdir().unwrap();
741        let hint_path = dir.path().join("000002.hint");
742        let key_len = 8;
743        let entry_size = crate::hint::hint_entry_size(key_len);
744
745        let tombstone_gsn = 200u64 | crate::entry::TOMBSTONE_BIT;
746        let mut data = vec![0u8; entry_size * 2];
747        data[0..8].copy_from_slice(&100u64.to_ne_bytes());
748        data[entry_size..entry_size + 8].copy_from_slice(&tombstone_gsn.to_ne_bytes());
749        std::fs::write(&hint_path, &data).unwrap();
750
751        let result = file_max_gsn(&hint_path, 2, key_len);
752        assert_eq!(result, Some(200));
753    }
754}