Skip to main content

sochdb_vector/
compaction.rs

1// Copyright 2025 SochDB Authors
2//
3// Licensed under the Apache License, Version 2.0
4
5//! Compaction Isolation
6//!
7//! This module provides per-shard compaction queues that allow background
8//! index rebuilding without blocking readers.
9//!
10//! # Problem
11//!
12//! Global compaction blocks all shards:
13//! - Lock contention during CSR rebuild
14//! - Readers stall waiting for compaction
15//! - Unpredictable latency spikes
16//!
17//! # Solution
18//!
19//! Per-shard compaction with version-based isolation:
20//! 1. Each shard has independent compaction queue
21//! 2. Build new CSR/AoSoA in background
22//! 3. Atomic pointer swap when ready
23//! 4. Old version kept until readers drain (epoch-based reclamation)
24//!
25//! # Invariants
26//!
27//! - Readers never block on compaction
28//! - Writers queue updates for next compaction cycle
29//! - At most one compaction per shard at a time
30//! - Compaction preserves search correctness
31
32use std::collections::VecDeque;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Mutex, RwLock};
35use std::time::{Duration, Instant};
36
37/// Shard identifier for compaction.
38pub type ShardId = u32;
39
40/// Version number for compacted data.
41pub type Version = u64;
42
43/// Compaction priority levels.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
45pub enum CompactionPriority {
46    /// Background compaction (lowest priority).
47    Background = 0,
48    /// Normal compaction.
49    Normal = 1,
50    /// High priority (e.g., after many deletes).
51    High = 2,
52    /// Urgent (e.g., space pressure).
53    Urgent = 3,
54}
55
56/// Compaction task for a shard.
57#[derive(Debug, Clone)]
58pub struct CompactionTask {
59    /// Target shard.
60    pub shard_id: ShardId,
61    /// Priority level.
62    pub priority: CompactionPriority,
63    /// Estimated work units.
64    pub work_estimate: u64,
65    /// Creation time.
66    pub created_at: Instant,
67    /// Task ID.
68    pub task_id: u64,
69}
70
71impl CompactionTask {
72    /// Create a new compaction task.
73    pub fn new(shard_id: ShardId, priority: CompactionPriority) -> Self {
74        static TASK_COUNTER: AtomicU64 = AtomicU64::new(0);
75        Self {
76            shard_id,
77            priority,
78            work_estimate: 0,
79            created_at: Instant::now(),
80            task_id: TASK_COUNTER.fetch_add(1, Ordering::Relaxed),
81        }
82    }
83
84    /// Set work estimate.
85    pub fn with_work_estimate(mut self, estimate: u64) -> Self {
86        self.work_estimate = estimate;
87        self
88    }
89
90    /// Time waiting in queue.
91    pub fn queue_time(&self) -> Duration {
92        self.created_at.elapsed()
93    }
94}
95
96/// Per-shard compaction state.
97pub struct ShardCompactionState {
98    /// Shard ID.
99    shard_id: ShardId,
100    /// Current version.
101    current_version: AtomicU64,
102    /// Is compaction in progress?
103    compacting: AtomicBool,
104    /// Pending tasks queue.
105    pending_tasks: Mutex<VecDeque<CompactionTask>>,
106    /// Active readers on each version.
107    reader_counts: RwLock<Vec<(Version, u64)>>,
108    /// Last compaction time.
109    last_compaction: RwLock<Option<Instant>>,
110    /// Statistics.
111    stats: CompactionStats,
112}
113
114/// Compaction statistics.
115#[derive(Debug, Default)]
116pub struct CompactionStats {
117    /// Total compactions completed.
118    pub compactions_completed: AtomicU64,
119    /// Total bytes reclaimed.
120    pub bytes_reclaimed: AtomicU64,
121    /// Total time spent compacting (ms).
122    pub compaction_time_ms: AtomicU64,
123    /// Maximum queue depth seen.
124    pub max_queue_depth: AtomicU64,
125}
126
127impl ShardCompactionState {
128    /// Create new shard state.
129    pub fn new(shard_id: ShardId) -> Self {
130        Self {
131            shard_id,
132            current_version: AtomicU64::new(1),
133            compacting: AtomicBool::new(false),
134            pending_tasks: Mutex::new(VecDeque::new()),
135            reader_counts: RwLock::new(Vec::new()),
136            last_compaction: RwLock::new(None),
137            stats: CompactionStats::default(),
138        }
139    }
140
141    /// Get current version.
142    pub fn current_version(&self) -> Version {
143        self.current_version.load(Ordering::Acquire)
144    }
145
146    /// Check if compaction is in progress.
147    pub fn is_compacting(&self) -> bool {
148        self.compacting.load(Ordering::Acquire)
149    }
150
151    /// Try to start compaction (returns false if already compacting).
152    pub fn try_start_compaction(&self) -> bool {
153        self.compacting
154            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
155            .is_ok()
156    }
157
158    /// Finish compaction and bump version.
159    pub fn finish_compaction(&self, bytes_reclaimed: u64, duration: Duration) {
160        let _new_version = self.current_version.fetch_add(1, Ordering::AcqRel) + 1;
161        self.compacting.store(false, Ordering::Release);
162
163        self.stats
164            .compactions_completed
165            .fetch_add(1, Ordering::Relaxed);
166        self.stats
167            .bytes_reclaimed
168            .fetch_add(bytes_reclaimed, Ordering::Relaxed);
169        self.stats
170            .compaction_time_ms
171            .fetch_add(duration.as_millis() as u64, Ordering::Relaxed);
172
173        *self.last_compaction.write().unwrap() = Some(Instant::now());
174    }
175
176    /// Queue a compaction task.
177    pub fn queue_task(&self, task: CompactionTask) {
178        let mut queue = self.pending_tasks.lock().unwrap();
179        queue.push_back(task);
180
181        let depth = queue.len() as u64;
182        let max = self.stats.max_queue_depth.load(Ordering::Relaxed);
183        if depth > max {
184            self.stats.max_queue_depth.store(depth, Ordering::Relaxed);
185        }
186    }
187
188    /// Pop highest priority task.
189    pub fn pop_task(&self) -> Option<CompactionTask> {
190        let mut queue = self.pending_tasks.lock().unwrap();
191
192        // Find highest priority task
193        if queue.is_empty() {
194            return None;
195        }
196
197        let mut best_idx = 0;
198        let mut best_priority = queue[0].priority;
199
200        for (i, task) in queue.iter().enumerate().skip(1) {
201            if task.priority > best_priority {
202                best_priority = task.priority;
203                best_idx = i;
204            }
205        }
206
207        Some(queue.remove(best_idx).unwrap())
208    }
209
210    /// Get pending task count.
211    pub fn pending_count(&self) -> usize {
212        self.pending_tasks.lock().unwrap().len()
213    }
214
215    /// Register a reader on current version.
216    pub fn register_reader(&self) -> ReaderGuard {
217        let version = self.current_version();
218
219        {
220            let mut counts = self.reader_counts.write().unwrap();
221            if let Some(entry) = counts.iter_mut().find(|(v, _)| *v == version) {
222                entry.1 += 1;
223            } else {
224                counts.push((version, 1));
225            }
226        }
227
228        ReaderGuard {
229            shard_id: self.shard_id,
230            version,
231        }
232    }
233
234    /// Unregister a reader (called by ReaderGuard drop).
235    #[allow(dead_code)]
236    fn unregister_reader(&self, version: Version) {
237        let mut counts = self.reader_counts.write().unwrap();
238        if let Some(entry) = counts.iter_mut().find(|(v, _)| *v == version) {
239            entry.1 = entry.1.saturating_sub(1);
240        }
241        // Clean up old versions with no readers
242        counts.retain(|(_, count)| *count > 0);
243    }
244
245    /// Check if any readers on old versions.
246    pub fn has_old_readers(&self) -> bool {
247        let current = self.current_version();
248        let counts = self.reader_counts.read().unwrap();
249        counts.iter().any(|(v, count)| *v < current && *count > 0)
250    }
251
252    /// Get time since last compaction.
253    pub fn time_since_compaction(&self) -> Option<Duration> {
254        self.last_compaction.read().unwrap().map(|t| t.elapsed())
255    }
256
257    /// Get statistics.
258    pub fn stats(&self) -> &CompactionStats {
259        &self.stats
260    }
261}
262
263/// Guard that tracks reader lifetime.
264pub struct ReaderGuard {
265    shard_id: ShardId,
266    version: Version,
267}
268
269impl ReaderGuard {
270    /// Get version this reader is using.
271    pub fn version(&self) -> Version {
272        self.version
273    }
274
275    /// Get shard ID.
276    pub fn shard_id(&self) -> ShardId {
277        self.shard_id
278    }
279}
280
281// Note: In production, ReaderGuard would call unregister_reader on drop
282// via a reference to ShardCompactionState. Here we show the pattern.
283
284/// Compaction queue manager for multiple shards.
285pub struct CompactionQueue {
286    /// Per-shard state.
287    shards: Vec<Arc<ShardCompactionState>>,
288    /// Global shutdown flag.
289    shutdown: AtomicBool,
290    /// Configuration.
291    config: CompactionConfig,
292}
293
294/// Compaction configuration.
295#[derive(Debug, Clone)]
296pub struct CompactionConfig {
297    /// Maximum concurrent compactions.
298    pub max_concurrent: usize,
299    /// Minimum interval between compactions per shard.
300    pub min_interval: Duration,
301    /// Work threshold to trigger compaction.
302    pub work_threshold: u64,
303    /// Enable background compaction.
304    pub background_enabled: bool,
305}
306
307impl Default for CompactionConfig {
308    fn default() -> Self {
309        Self {
310            max_concurrent: 4,
311            min_interval: Duration::from_secs(60),
312            work_threshold: 10000,
313            background_enabled: true,
314        }
315    }
316}
317
318impl CompactionQueue {
319    /// Create a new compaction queue.
320    pub fn new(num_shards: usize, config: CompactionConfig) -> Self {
321        let shards = (0..num_shards)
322            .map(|i| Arc::new(ShardCompactionState::new(i as ShardId)))
323            .collect();
324
325        Self {
326            shards,
327            shutdown: AtomicBool::new(false),
328            config,
329        }
330    }
331
332    /// Get shard state.
333    pub fn shard(&self, shard_id: ShardId) -> Option<&Arc<ShardCompactionState>> {
334        self.shards.get(shard_id as usize)
335    }
336
337    /// Schedule compaction for a shard.
338    pub fn schedule(&self, shard_id: ShardId, priority: CompactionPriority) -> bool {
339        if let Some(shard) = self.shard(shard_id) {
340            // Check minimum interval
341            if let Some(elapsed) = shard.time_since_compaction() {
342                if elapsed < self.config.min_interval && priority < CompactionPriority::Urgent {
343                    return false;
344                }
345            }
346
347            let task = CompactionTask::new(shard_id, priority);
348            shard.queue_task(task);
349            true
350        } else {
351            false
352        }
353    }
354
355    /// Get next task to process (from any shard).
356    pub fn next_task(&self) -> Option<(ShardId, CompactionTask)> {
357        // Find shard with highest priority task that isn't already compacting
358        let mut best: Option<(ShardId, CompactionTask)> = None;
359
360        for shard in &self.shards {
361            if shard.is_compacting() {
362                continue;
363            }
364
365            // Peek at next task
366            let queue = shard.pending_tasks.lock().unwrap();
367            if let Some(task) = queue.front() {
368                let dominated = best
369                    .as_ref()
370                    .map_or(false, |(_, best_task)| task.priority <= best_task.priority);
371
372                if !dominated {
373                    drop(queue);
374                    if let Some(task) = shard.pop_task() {
375                        best = Some((shard.shard_id, task));
376                    }
377                }
378            }
379        }
380
381        best
382    }
383
384    /// Count active compactions.
385    pub fn active_compactions(&self) -> usize {
386        self.shards.iter().filter(|s| s.is_compacting()).count()
387    }
388
389    /// Check if more compactions can start.
390    pub fn can_start_compaction(&self) -> bool {
391        self.active_compactions() < self.config.max_concurrent
392    }
393
394    /// Total pending tasks across all shards.
395    pub fn total_pending(&self) -> usize {
396        self.shards.iter().map(|s| s.pending_count()).sum()
397    }
398
399    /// Signal shutdown.
400    pub fn shutdown(&self) {
401        self.shutdown.store(true, Ordering::Release);
402    }
403
404    /// Check if shutdown requested.
405    pub fn is_shutdown(&self) -> bool {
406        self.shutdown.load(Ordering::Acquire)
407    }
408
409    /// Number of shards.
410    pub fn num_shards(&self) -> usize {
411        self.shards.len()
412    }
413}
414
415/// Result of a compaction operation.
416#[derive(Debug, Clone)]
417pub struct CompactionResult {
418    /// Shard that was compacted.
419    pub shard_id: ShardId,
420    /// New version after compaction.
421    pub new_version: Version,
422    /// Bytes reclaimed.
423    pub bytes_reclaimed: u64,
424    /// Duration of compaction.
425    pub duration: Duration,
426    /// Number of entries merged.
427    pub entries_merged: u64,
428    /// Success or failure.
429    pub success: bool,
430}
431
432/// Trait for compaction executor.
433pub trait CompactionExecutor: Send + Sync {
434    /// Execute compaction for a shard.
435    fn compact(&self, shard_id: ShardId) -> CompactionResult;
436
437    /// Estimate work for compaction.
438    fn estimate_work(&self, shard_id: ShardId) -> u64;
439}
440
441/// Simple in-memory compaction executor for testing.
442pub struct MockCompactionExecutor {
443    /// Simulated compaction time.
444    compact_time: Duration,
445    /// Simulated bytes reclaimed.
446    bytes_per_compact: u64,
447}
448
449impl MockCompactionExecutor {
450    /// Create a new mock executor.
451    pub fn new(compact_time: Duration, bytes_per_compact: u64) -> Self {
452        Self {
453            compact_time,
454            bytes_per_compact,
455        }
456    }
457}
458
459impl CompactionExecutor for MockCompactionExecutor {
460    fn compact(&self, shard_id: ShardId) -> CompactionResult {
461        // Simulate work
462        std::thread::sleep(self.compact_time);
463
464        CompactionResult {
465            shard_id,
466            new_version: 0, // Caller should update
467            bytes_reclaimed: self.bytes_per_compact,
468            duration: self.compact_time,
469            entries_merged: 100,
470            success: true,
471        }
472    }
473
474    fn estimate_work(&self, _shard_id: ShardId) -> u64 {
475        1000
476    }
477}
478
479/// Background compaction worker.
480pub struct CompactionWorker {
481    /// Queue to process.
482    queue: Arc<CompactionQueue>,
483    /// Executor.
484    executor: Arc<dyn CompactionExecutor>,
485    /// Worker ID.
486    worker_id: usize,
487}
488
489impl CompactionWorker {
490    /// Create a new worker.
491    pub fn new(
492        queue: Arc<CompactionQueue>,
493        executor: Arc<dyn CompactionExecutor>,
494        worker_id: usize,
495    ) -> Self {
496        Self {
497            queue,
498            executor,
499            worker_id,
500        }
501    }
502
503    /// Run one compaction cycle.
504    pub fn run_once(&self) -> Option<CompactionResult> {
505        if self.queue.is_shutdown() {
506            return None;
507        }
508
509        if !self.queue.can_start_compaction() {
510            return None;
511        }
512
513        let (shard_id, _task) = self.queue.next_task()?;
514        let shard = self.queue.shard(shard_id)?;
515
516        if !shard.try_start_compaction() {
517            return None;
518        }
519
520        let start = Instant::now();
521        let mut result = self.executor.compact(shard_id);
522        result.duration = start.elapsed();
523
524        shard.finish_compaction(result.bytes_reclaimed, result.duration);
525        result.new_version = shard.current_version();
526
527        Some(result)
528    }
529
530    /// Get worker ID.
531    pub fn worker_id(&self) -> usize {
532        self.worker_id
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[test]
541    fn test_shard_state_version() {
542        let state = ShardCompactionState::new(0);
543        assert_eq!(state.current_version(), 1);
544
545        state.try_start_compaction();
546        state.finish_compaction(1000, Duration::from_millis(10));
547
548        assert_eq!(state.current_version(), 2);
549    }
550
551    #[test]
552    fn test_compaction_lock() {
553        let state = ShardCompactionState::new(0);
554
555        assert!(!state.is_compacting());
556        assert!(state.try_start_compaction());
557        assert!(state.is_compacting());
558        assert!(!state.try_start_compaction()); // Already compacting
559
560        state.finish_compaction(0, Duration::ZERO);
561        assert!(!state.is_compacting());
562    }
563
564    #[test]
565    fn test_task_queue() {
566        let state = ShardCompactionState::new(0);
567
568        state.queue_task(CompactionTask::new(0, CompactionPriority::Background));
569        state.queue_task(CompactionTask::new(0, CompactionPriority::High));
570        state.queue_task(CompactionTask::new(0, CompactionPriority::Normal));
571
572        assert_eq!(state.pending_count(), 3);
573
574        // Should get highest priority first
575        let task = state.pop_task().unwrap();
576        assert_eq!(task.priority, CompactionPriority::High);
577    }
578
579    #[test]
580    fn test_compaction_queue() {
581        let config = CompactionConfig {
582            max_concurrent: 2,
583            min_interval: Duration::ZERO,
584            ..Default::default()
585        };
586        let queue = CompactionQueue::new(4, config);
587
588        assert_eq!(queue.num_shards(), 4);
589        assert_eq!(queue.active_compactions(), 0);
590
591        queue.schedule(0, CompactionPriority::Normal);
592        queue.schedule(1, CompactionPriority::High);
593
594        assert_eq!(queue.total_pending(), 2);
595    }
596
597    #[test]
598    fn test_next_task_priority() {
599        let config = CompactionConfig {
600            min_interval: Duration::ZERO,
601            ..Default::default()
602        };
603        let queue = CompactionQueue::new(4, config);
604
605        queue.schedule(0, CompactionPriority::Background);
606        queue.schedule(1, CompactionPriority::Urgent);
607        queue.schedule(2, CompactionPriority::Normal);
608
609        // Should get urgent first
610        let (shard_id, task) = queue.next_task().unwrap();
611        assert_eq!(shard_id, 1);
612        assert_eq!(task.priority, CompactionPriority::Urgent);
613    }
614
615    #[test]
616    fn test_concurrent_limit() {
617        let config = CompactionConfig {
618            max_concurrent: 2,
619            min_interval: Duration::ZERO,
620            ..Default::default()
621        };
622        let queue = CompactionQueue::new(4, config);
623
624        // Start two compactions
625        queue.shard(0).unwrap().try_start_compaction();
626        queue.shard(1).unwrap().try_start_compaction();
627
628        assert_eq!(queue.active_compactions(), 2);
629        assert!(!queue.can_start_compaction());
630
631        // Finish one
632        queue.shard(0).unwrap().finish_compaction(0, Duration::ZERO);
633        assert!(queue.can_start_compaction());
634    }
635
636    #[test]
637    fn test_reader_guard() {
638        let state = ShardCompactionState::new(0);
639
640        let guard = state.register_reader();
641        assert_eq!(guard.version(), 1);
642        assert_eq!(guard.shard_id(), 0);
643    }
644
645    #[test]
646    fn test_stats_tracking() {
647        let state = ShardCompactionState::new(0);
648
649        state.try_start_compaction();
650        state.finish_compaction(5000, Duration::from_millis(100));
651
652        let stats = state.stats();
653        assert_eq!(stats.compactions_completed.load(Ordering::Relaxed), 1);
654        assert_eq!(stats.bytes_reclaimed.load(Ordering::Relaxed), 5000);
655        assert!(stats.compaction_time_ms.load(Ordering::Relaxed) >= 100);
656    }
657
658    #[test]
659    fn test_mock_executor() {
660        let executor = MockCompactionExecutor::new(Duration::from_millis(10), 1000);
661
662        let result = executor.compact(0);
663        assert!(result.success);
664        assert_eq!(result.bytes_reclaimed, 1000);
665    }
666
667    #[test]
668    fn test_worker_run_once() {
669        let config = CompactionConfig {
670            min_interval: Duration::ZERO,
671            ..Default::default()
672        };
673        let queue = Arc::new(CompactionQueue::new(4, config));
674        let executor = Arc::new(MockCompactionExecutor::new(Duration::from_millis(1), 500));
675        let worker = CompactionWorker::new(queue.clone(), executor, 0);
676
677        // No tasks - should return None
678        assert!(worker.run_once().is_none());
679
680        // Add task
681        queue.schedule(0, CompactionPriority::Normal);
682
683        // Should run compaction
684        let result = worker.run_once();
685        assert!(result.is_some());
686
687        let result = result.unwrap();
688        assert_eq!(result.shard_id, 0);
689        assert!(result.success);
690    }
691
692    #[test]
693    fn test_shutdown() {
694        let queue = CompactionQueue::new(4, CompactionConfig::default());
695
696        assert!(!queue.is_shutdown());
697        queue.shutdown();
698        assert!(queue.is_shutdown());
699    }
700
701    #[test]
702    fn test_time_since_compaction() {
703        let state = ShardCompactionState::new(0);
704
705        // No compaction yet
706        assert!(state.time_since_compaction().is_none());
707
708        state.try_start_compaction();
709        state.finish_compaction(0, Duration::ZERO);
710
711        // Now should have a time
712        let elapsed = state.time_since_compaction().unwrap();
713        assert!(elapsed < Duration::from_secs(1));
714    }
715}