1use std::fs;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use zerocopy::FromBytes;
7
8use crate::Key;
9use crate::disk_loc::DiskLoc;
10use crate::entry::{EntryHeader, entry_size};
11use crate::error::DbResult;
12use crate::hint;
13use crate::io::direct;
14use crate::shard::{ImmutableFile, Shard};
15
16type ReadFn = dyn Fn(&std::fs::File, u64, usize) -> DbResult<Vec<u8>>;
17
18#[cfg(feature = "encryption")]
19use crate::crypto::PageCipher;
20#[cfg(feature = "encryption")]
21use crate::io::tags::{self, TagFile};
22
23pub trait CompactionIndex<K: Key>: Send + Sync {
24 fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool;
26
27 fn invalidate_blocks(&self, _shard_id: u8, _file_id: u32, _total_bytes: u64) {}
29
30 fn contains_key(&self, key: &K) -> bool;
32}
33
34#[cfg(feature = "replication")]
36pub trait CompactionGuard: Send + Sync {
37 fn min_replicated_gsn(&self, shard_id: u8) -> u64;
40}
41
42#[cfg(feature = "replication")]
44pub struct NoReplicationGuard;
45
46#[cfg(feature = "replication")]
47impl CompactionGuard for NoReplicationGuard {
48 fn min_replicated_gsn(&self, _shard_id: u8) -> u64 {
49 u64::MAX
50 }
51}
52
53pub fn compact_shard<K: Key, I: CompactionIndex<K>>(
54 shard: &Shard,
55 index: &I,
56 threshold: f64,
57) -> DbResult<usize> {
58 compact_shard_inner::<K, I>(shard, index, threshold, u64::MAX)
59}
60
61#[cfg(feature = "replication")]
63pub fn compact_shard_guarded<K: Key, I: CompactionIndex<K>>(
64 shard: &Shard,
65 index: &I,
66 threshold: f64,
67 guard: &dyn CompactionGuard,
68) -> DbResult<usize> {
69 let min_gsn = guard.min_replicated_gsn(shard.id);
70 compact_shard_inner::<K, I>(shard, index, threshold, min_gsn)
71}
72
73fn compact_shard_inner<K: Key, I: CompactionIndex<K>>(
74 shard: &Shard,
75 index: &I,
76 threshold: f64,
77 min_replicated_gsn: u64,
78) -> DbResult<usize> {
79 let mut files_to_compact = Vec::new();
80
81 #[cfg(feature = "encryption")]
83 let cipher_opt: Option<Arc<PageCipher>>;
84 {
85 let inner = shard.lock();
86 #[cfg(feature = "encryption")]
87 {
88 cipher_opt = inner.cipher.clone();
89 }
90 for file in &inner.immutable {
91 let total = file.total_bytes;
92 let dead = inner.dead_bytes.get(&file.file_id).copied().unwrap_or(0);
93 if total > 0 && (dead as f64 / total as f64) > threshold {
94 files_to_compact.push(file.clone());
95 }
96 }
97 }
98
99 if files_to_compact.is_empty() {
100 return Ok(0);
101 }
102
103 if min_replicated_gsn < u64::MAX {
105 files_to_compact.retain(|file| {
106 let hint_path = shard.dir().join(format!("{:06}.hint", file.file_id));
107 match file_max_gsn(&hint_path, file.file_id, size_of::<K>()) {
108 Some(max_gsn) => max_gsn < min_replicated_gsn,
109 None => false, }
111 });
112 if files_to_compact.is_empty() {
113 return Ok(0);
114 }
115 }
116
117 files_to_compact.sort_by_key(|f| f.file_id);
118 files_to_compact.truncate(4);
119 let compact_start = std::time::Instant::now();
120
121 let new_file_id = {
122 let mut inner = shard.lock();
123 let id = inner.next_file_id;
124 inner.next_file_id += 1;
125 id
126 };
127 let old_file_ids: Vec<u32> = files_to_compact.iter().map(|f| f.file_id).collect();
128
129 let tmp_path = shard.dir().join(format!("{new_file_id:06}.data.tmp"));
131 let tmp_file = direct::open_write(&tmp_path)?;
132 let mut write_offset: u64 = 0;
133
134 const BATCH_SIZE: usize = 256;
135
136 struct BatchEntry<K> {
137 key: K,
138 gsn: u64,
139 old_loc: DiskLoc,
140 new_loc: DiskLoc,
141 is_tombstone: bool,
142 }
143
144 let mut batch: Vec<BatchEntry<K>> = Vec::with_capacity(BATCH_SIZE);
145
146 #[cfg(feature = "encryption")]
148 let mut plaintext_buf: Option<Vec<u8>> = if cipher_opt.is_some() {
149 Some(Vec::new())
150 } else {
151 None
152 };
153
154 for old_arc in &files_to_compact {
156 let file = &old_arc.file;
157 let file_len = old_arc.total_bytes;
158 let mut offset: u64 = 0;
159
160 #[cfg(feature = "encryption")]
162 let read_fn: Box<ReadFn> =
163 if let (Some(cipher), Some(_tag_file)) = (&cipher_opt, &old_arc.tag_file) {
164 let c = cipher.clone();
165 let fid = old_arc.file_id;
166 let tp = tags::tags_path_for_data(&old_arc.path);
167 let tf = Arc::new(TagFile::open_read(&tp)?);
168 Box::new(move |f, o, l| direct::pread_value_encrypted(f, &tf, &c, fid, o, l))
169 } else {
170 Box::new(direct::pread_value)
171 };
172 #[cfg(not(feature = "encryption"))]
173 let read_fn: Box<ReadFn> = Box::new(direct::pread_value);
174
175 while offset + size_of::<EntryHeader>() as u64 <= file_len {
176 let header_bytes = match read_fn(file, offset, size_of::<EntryHeader>()) {
177 Ok(b) => b,
178 Err(_) => break,
179 };
180 let header = match EntryHeader::read_from_bytes(&header_bytes) {
181 Ok(h) => h,
182 Err(_) => break,
183 };
184
185 let total = entry_size(size_of::<K>(), header.value_len);
186 if offset + total > file_len {
187 break;
188 }
189
190 let old_loc = DiskLoc::new(
191 shard.id,
192 old_arc.file_id as u16,
193 (offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
194 header.value_len,
195 );
196
197 let entry_bytes = read_fn(file, offset, total as usize)?;
198
199 let key_bytes = &entry_bytes[16..16 + size_of::<K>()];
200 let key: K = K::from_bytes(key_bytes);
201
202 #[cfg(feature = "encryption")]
204 if let Some(ref mut buf) = plaintext_buf {
205 if buf.len() < (write_offset + total) as usize {
206 buf.resize((write_offset + total) as usize, 0);
207 }
208 buf[write_offset as usize..(write_offset + total) as usize]
209 .copy_from_slice(&entry_bytes);
210 } else {
211 direct::pwrite_at(&tmp_file, &entry_bytes, write_offset)?;
212 }
213 #[cfg(not(feature = "encryption"))]
214 direct::pwrite_at(&tmp_file, &entry_bytes, write_offset)?;
215
216 let new_loc = DiskLoc::new(
217 shard.id,
218 new_file_id as u16,
219 (write_offset + size_of::<EntryHeader>() as u64 + size_of::<K>() as u64) as u32,
220 header.value_len,
221 );
222
223 batch.push(BatchEntry {
224 key,
225 gsn: header.gsn,
226 old_loc,
227 new_loc,
228 is_tombstone: header.is_tombstone(),
229 });
230
231 write_offset += total;
232
233 offset += total;
234 }
235 }
236
237 #[cfg(feature = "encryption")]
239 let tmp_tags_path = if let (Some(cipher), Some(mut buf)) = (&cipher_opt, plaintext_buf.take()) {
240 let padded_len = (buf.len() + 4095) & !4095;
242 buf.resize(padded_len, 0);
243 let num_pages = padded_len / 4096;
244 let mut tag_list = Vec::with_capacity(num_pages);
245 for i in 0..num_pages {
246 let page = &mut buf[i * 4096..(i + 1) * 4096];
247 let tag = cipher.encrypt_page(new_file_id, i as u64, page)?;
248 tag_list.push(tag);
249 }
250 direct::pwrite_at(&tmp_file, &buf, 0)?;
251
252 let tp = shard.dir().join(format!("{new_file_id:06}.tags.tmp"));
254 let tf = TagFile::open_write(&tp)?;
255 tf.write_tags(0, &tag_list)?;
256 tf.sync()?;
257 Some(tp)
258 } else {
259 None
260 };
261
262 direct::fsync(&tmp_file)?;
263
264 let final_data_path = shard.dir().join(format!("{new_file_id:06}.data"));
265
266 {
268 let mut inner = shard.lock();
269 fs::rename(&tmp_path, &final_data_path)?;
270
271 #[cfg(feature = "encryption")]
272 let final_tag_file = if let Some(ref tp) = tmp_tags_path {
273 let final_tags_path = shard.dir().join(format!("{new_file_id:06}.tags"));
274 fs::rename(tp, &final_tags_path)?;
275 Some(TagFile::open_read(&final_tags_path)?)
276 } else {
277 None
278 };
279
280 let final_file = direct::open_read(&final_data_path)?;
281 inner.immutable.push(Arc::new(ImmutableFile {
282 file: final_file,
283 file_id: new_file_id,
284 #[cfg(feature = "encryption")]
285 path: final_data_path,
286 total_bytes: write_offset,
287 #[cfg(feature = "encryption")]
288 tag_file: final_tag_file,
289 }));
290 inner.immutable.sort_by_key(|f| f.file_id);
291 }
292
293 let mut compacted_entries = 0;
294 let mut live_hint_data: Vec<u8> = Vec::new();
295 let key_len = size_of::<K>();
296
297 for chunk in batch.chunks(BATCH_SIZE) {
298 let mut inner = shard.lock();
299 for entry in chunk {
300 if entry.is_tombstone {
301 if index.contains_key(&entry.key) {
302 inner.add_dead_bytes(
304 entry.new_loc.file_id as u32,
305 entry_size(size_of::<K>(), entry.new_loc.len),
306 );
307 } else {
308 compacted_entries += 1;
310 append_hint_entry(
311 &mut live_hint_data,
312 entry.gsn,
313 &entry.key,
314 entry.new_loc.offset as u64,
315 entry.new_loc.len,
316 key_len,
317 );
318 }
319 } else {
320 if index.update_if_match(&entry.key, entry.old_loc, entry.new_loc) {
321 compacted_entries += 1;
322 append_hint_entry(
323 &mut live_hint_data,
324 entry.gsn,
325 &entry.key,
326 entry.new_loc.offset as u64,
327 entry.new_loc.len,
328 key_len,
329 );
330 } else {
331 inner.add_dead_bytes(
332 entry.new_loc.file_id as u32,
333 entry_size(size_of::<K>(), entry.new_loc.len),
334 );
335 }
336 }
337 }
338 }
339
340 {
342 let mut inner = shard.lock();
343 inner
344 .immutable
345 .retain(|f| !old_file_ids.contains(&f.file_id));
346 for fid in &old_file_ids {
347 inner.dead_bytes.remove(fid);
348 }
349 }
350
351 let hint_data = live_hint_data;
352 let tmp_hint_path = shard.dir().join(format!("{new_file_id:06}.hint.tmp"));
353 hint::write_hint_file(&tmp_hint_path, &hint_data)?;
354 let final_hint_path = shard.dir().join(format!("{new_file_id:06}.hint"));
355 fs::rename(&tmp_hint_path, &final_hint_path)?;
356
357 for old_arc in &files_to_compact {
359 index.invalidate_blocks(shard.id, old_arc.file_id, old_arc.total_bytes);
360 }
361
362 for fid in &old_file_ids {
364 let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.data")));
365 let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.hint")));
366 #[cfg(feature = "encryption")]
367 let _ = fs::remove_file(shard.dir().join(format!("{fid:06}.tags")));
368 }
369
370 let elapsed = compact_start.elapsed().as_secs_f64();
371 metrics::counter!("armdb.compaction.runs").increment(1);
372 metrics::counter!("armdb.compaction.entries").increment(compacted_entries as u64);
373 metrics::histogram!("armdb.compaction.duration_seconds").record(elapsed);
374 tracing::info!(
375 entries = compacted_entries,
376 files = old_file_ids.len(),
377 elapsed_ms = (elapsed * 1000.0) as u64,
378 "compaction complete"
379 );
380 Ok(compacted_entries)
381}
382
383pub struct Compactor {
385 stop: Arc<AtomicBool>,
386 handle: Option<std::thread::JoinHandle<()>>,
387}
388
389impl Compactor {
390 pub fn start(
391 compact_fn: impl Fn() -> DbResult<usize> + Send + 'static,
392 interval: Duration,
393 ) -> Self {
394 let stop = Arc::new(AtomicBool::new(false));
395 let stop_flag = stop.clone();
396 let handle = std::thread::spawn(move || {
397 while !stop_flag.load(Ordering::Relaxed) {
398 std::thread::sleep(interval);
399 if stop_flag.load(Ordering::Relaxed) {
400 break;
401 }
402 match compact_fn() {
403 Ok(n) if n > 0 => tracing::info!(entries = n, "compaction cycle"),
404 Err(e) => tracing::error!(error = %e, "compaction error"),
405 _ => {}
406 }
407 }
408 });
409 Self {
410 stop,
411 handle: Some(handle),
412 }
413 }
414
415 pub fn stop(&mut self) {
416 self.stop.store(true, Ordering::Relaxed);
417 if let Some(h) = self.handle.take() {
418 let _ = h.join();
419 }
420 }
421}
422
423impl Drop for Compactor {
424 fn drop(&mut self) {
425 self.stop();
426 }
427}
428
429fn append_hint_entry<K: Key>(
432 buf: &mut Vec<u8>,
433 gsn: u64,
434 key: &K,
435 value_offset: u64,
436 value_len: u32,
437 _key_len: usize,
438) {
439 buf.extend_from_slice(&gsn.to_ne_bytes());
440 buf.extend_from_slice(key.as_bytes());
441 buf.extend_from_slice(&value_offset.to_ne_bytes());
442 buf.extend_from_slice(&value_len.to_ne_bytes());
443}
444
445fn file_max_gsn(hint_path: &std::path::Path, _file_id: u32, key_len: usize) -> Option<u64> {
448 let data = hint::read_hint_file(hint_path).ok()??;
449 let entry_size = hint::hint_entry_size(key_len);
450 if data.is_empty() || data.len() % entry_size != 0 {
451 return None;
452 }
453 let entry_count = data.len() / entry_size;
454 let last_start = (entry_count - 1) * entry_size;
455 let gsn_bytes: [u8; 8] = data[last_start..last_start + 8].try_into().ok()?;
456 let gsn = u64::from_ne_bytes(gsn_bytes);
457 Some(gsn & !crate::entry::TOMBSTONE_BIT)
458}