1use std::collections::VecDeque;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Mutex, RwLock};
35use std::time::{Duration, Instant};
36
37pub type ShardId = u32;
39
40pub type Version = u64;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
45pub enum CompactionPriority {
46 Background = 0,
48 Normal = 1,
50 High = 2,
52 Urgent = 3,
54}
55
56#[derive(Debug, Clone)]
58pub struct CompactionTask {
59 pub shard_id: ShardId,
61 pub priority: CompactionPriority,
63 pub work_estimate: u64,
65 pub created_at: Instant,
67 pub task_id: u64,
69}
70
71impl CompactionTask {
72 pub fn new(shard_id: ShardId, priority: CompactionPriority) -> Self {
74 static TASK_COUNTER: AtomicU64 = AtomicU64::new(0);
75 Self {
76 shard_id,
77 priority,
78 work_estimate: 0,
79 created_at: Instant::now(),
80 task_id: TASK_COUNTER.fetch_add(1, Ordering::Relaxed),
81 }
82 }
83
84 pub fn with_work_estimate(mut self, estimate: u64) -> Self {
86 self.work_estimate = estimate;
87 self
88 }
89
90 pub fn queue_time(&self) -> Duration {
92 self.created_at.elapsed()
93 }
94}
95
96pub struct ShardCompactionState {
98 shard_id: ShardId,
100 current_version: AtomicU64,
102 compacting: AtomicBool,
104 pending_tasks: Mutex<VecDeque<CompactionTask>>,
106 reader_counts: RwLock<Vec<(Version, u64)>>,
108 last_compaction: RwLock<Option<Instant>>,
110 stats: CompactionStats,
112}
113
114#[derive(Debug, Default)]
116pub struct CompactionStats {
117 pub compactions_completed: AtomicU64,
119 pub bytes_reclaimed: AtomicU64,
121 pub compaction_time_ms: AtomicU64,
123 pub max_queue_depth: AtomicU64,
125}
126
127impl ShardCompactionState {
128 pub fn new(shard_id: ShardId) -> Self {
130 Self {
131 shard_id,
132 current_version: AtomicU64::new(1),
133 compacting: AtomicBool::new(false),
134 pending_tasks: Mutex::new(VecDeque::new()),
135 reader_counts: RwLock::new(Vec::new()),
136 last_compaction: RwLock::new(None),
137 stats: CompactionStats::default(),
138 }
139 }
140
141 pub fn current_version(&self) -> Version {
143 self.current_version.load(Ordering::Acquire)
144 }
145
146 pub fn is_compacting(&self) -> bool {
148 self.compacting.load(Ordering::Acquire)
149 }
150
151 pub fn try_start_compaction(&self) -> bool {
153 self.compacting
154 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
155 .is_ok()
156 }
157
158 pub fn finish_compaction(&self, bytes_reclaimed: u64, duration: Duration) {
160 let _new_version = self.current_version.fetch_add(1, Ordering::AcqRel) + 1;
161 self.compacting.store(false, Ordering::Release);
162
163 self.stats
164 .compactions_completed
165 .fetch_add(1, Ordering::Relaxed);
166 self.stats
167 .bytes_reclaimed
168 .fetch_add(bytes_reclaimed, Ordering::Relaxed);
169 self.stats
170 .compaction_time_ms
171 .fetch_add(duration.as_millis() as u64, Ordering::Relaxed);
172
173 *self.last_compaction.write().unwrap() = Some(Instant::now());
174 }
175
176 pub fn queue_task(&self, task: CompactionTask) {
178 let mut queue = self.pending_tasks.lock().unwrap();
179 queue.push_back(task);
180
181 let depth = queue.len() as u64;
182 let max = self.stats.max_queue_depth.load(Ordering::Relaxed);
183 if depth > max {
184 self.stats.max_queue_depth.store(depth, Ordering::Relaxed);
185 }
186 }
187
188 pub fn pop_task(&self) -> Option<CompactionTask> {
190 let mut queue = self.pending_tasks.lock().unwrap();
191
192 if queue.is_empty() {
194 return None;
195 }
196
197 let mut best_idx = 0;
198 let mut best_priority = queue[0].priority;
199
200 for (i, task) in queue.iter().enumerate().skip(1) {
201 if task.priority > best_priority {
202 best_priority = task.priority;
203 best_idx = i;
204 }
205 }
206
207 Some(queue.remove(best_idx).unwrap())
208 }
209
210 pub fn pending_count(&self) -> usize {
212 self.pending_tasks.lock().unwrap().len()
213 }
214
215 pub fn register_reader(&self) -> ReaderGuard {
217 let version = self.current_version();
218
219 {
220 let mut counts = self.reader_counts.write().unwrap();
221 if let Some(entry) = counts.iter_mut().find(|(v, _)| *v == version) {
222 entry.1 += 1;
223 } else {
224 counts.push((version, 1));
225 }
226 }
227
228 ReaderGuard {
229 shard_id: self.shard_id,
230 version,
231 }
232 }
233
234 #[allow(dead_code)]
236 fn unregister_reader(&self, version: Version) {
237 let mut counts = self.reader_counts.write().unwrap();
238 if let Some(entry) = counts.iter_mut().find(|(v, _)| *v == version) {
239 entry.1 = entry.1.saturating_sub(1);
240 }
241 counts.retain(|(_, count)| *count > 0);
243 }
244
245 pub fn has_old_readers(&self) -> bool {
247 let current = self.current_version();
248 let counts = self.reader_counts.read().unwrap();
249 counts.iter().any(|(v, count)| *v < current && *count > 0)
250 }
251
252 pub fn time_since_compaction(&self) -> Option<Duration> {
254 self.last_compaction.read().unwrap().map(|t| t.elapsed())
255 }
256
257 pub fn stats(&self) -> &CompactionStats {
259 &self.stats
260 }
261}
262
263pub struct ReaderGuard {
265 shard_id: ShardId,
266 version: Version,
267}
268
269impl ReaderGuard {
270 pub fn version(&self) -> Version {
272 self.version
273 }
274
275 pub fn shard_id(&self) -> ShardId {
277 self.shard_id
278 }
279}
280
281pub struct CompactionQueue {
286 shards: Vec<Arc<ShardCompactionState>>,
288 shutdown: AtomicBool,
290 config: CompactionConfig,
292}
293
294#[derive(Debug, Clone)]
296pub struct CompactionConfig {
297 pub max_concurrent: usize,
299 pub min_interval: Duration,
301 pub work_threshold: u64,
303 pub background_enabled: bool,
305}
306
307impl Default for CompactionConfig {
308 fn default() -> Self {
309 Self {
310 max_concurrent: 4,
311 min_interval: Duration::from_secs(60),
312 work_threshold: 10000,
313 background_enabled: true,
314 }
315 }
316}
317
318impl CompactionQueue {
319 pub fn new(num_shards: usize, config: CompactionConfig) -> Self {
321 let shards = (0..num_shards)
322 .map(|i| Arc::new(ShardCompactionState::new(i as ShardId)))
323 .collect();
324
325 Self {
326 shards,
327 shutdown: AtomicBool::new(false),
328 config,
329 }
330 }
331
332 pub fn shard(&self, shard_id: ShardId) -> Option<&Arc<ShardCompactionState>> {
334 self.shards.get(shard_id as usize)
335 }
336
337 pub fn schedule(&self, shard_id: ShardId, priority: CompactionPriority) -> bool {
339 if let Some(shard) = self.shard(shard_id) {
340 if let Some(elapsed) = shard.time_since_compaction() {
342 if elapsed < self.config.min_interval && priority < CompactionPriority::Urgent {
343 return false;
344 }
345 }
346
347 let task = CompactionTask::new(shard_id, priority);
348 shard.queue_task(task);
349 true
350 } else {
351 false
352 }
353 }
354
355 pub fn next_task(&self) -> Option<(ShardId, CompactionTask)> {
357 let mut best: Option<(ShardId, CompactionTask)> = None;
359
360 for shard in &self.shards {
361 if shard.is_compacting() {
362 continue;
363 }
364
365 let queue = shard.pending_tasks.lock().unwrap();
367 if let Some(task) = queue.front() {
368 let dominated = best
369 .as_ref()
370 .map_or(false, |(_, best_task)| task.priority <= best_task.priority);
371
372 if !dominated {
373 drop(queue);
374 if let Some(task) = shard.pop_task() {
375 best = Some((shard.shard_id, task));
376 }
377 }
378 }
379 }
380
381 best
382 }
383
384 pub fn active_compactions(&self) -> usize {
386 self.shards.iter().filter(|s| s.is_compacting()).count()
387 }
388
389 pub fn can_start_compaction(&self) -> bool {
391 self.active_compactions() < self.config.max_concurrent
392 }
393
394 pub fn total_pending(&self) -> usize {
396 self.shards.iter().map(|s| s.pending_count()).sum()
397 }
398
399 pub fn shutdown(&self) {
401 self.shutdown.store(true, Ordering::Release);
402 }
403
404 pub fn is_shutdown(&self) -> bool {
406 self.shutdown.load(Ordering::Acquire)
407 }
408
409 pub fn num_shards(&self) -> usize {
411 self.shards.len()
412 }
413}
414
415#[derive(Debug, Clone)]
417pub struct CompactionResult {
418 pub shard_id: ShardId,
420 pub new_version: Version,
422 pub bytes_reclaimed: u64,
424 pub duration: Duration,
426 pub entries_merged: u64,
428 pub success: bool,
430}
431
432pub trait CompactionExecutor: Send + Sync {
434 fn compact(&self, shard_id: ShardId) -> CompactionResult;
436
437 fn estimate_work(&self, shard_id: ShardId) -> u64;
439}
440
441pub struct MockCompactionExecutor {
443 compact_time: Duration,
445 bytes_per_compact: u64,
447}
448
449impl MockCompactionExecutor {
450 pub fn new(compact_time: Duration, bytes_per_compact: u64) -> Self {
452 Self {
453 compact_time,
454 bytes_per_compact,
455 }
456 }
457}
458
459impl CompactionExecutor for MockCompactionExecutor {
460 fn compact(&self, shard_id: ShardId) -> CompactionResult {
461 std::thread::sleep(self.compact_time);
463
464 CompactionResult {
465 shard_id,
466 new_version: 0, bytes_reclaimed: self.bytes_per_compact,
468 duration: self.compact_time,
469 entries_merged: 100,
470 success: true,
471 }
472 }
473
474 fn estimate_work(&self, _shard_id: ShardId) -> u64 {
475 1000
476 }
477}
478
479pub struct CompactionWorker {
481 queue: Arc<CompactionQueue>,
483 executor: Arc<dyn CompactionExecutor>,
485 worker_id: usize,
487}
488
489impl CompactionWorker {
490 pub fn new(
492 queue: Arc<CompactionQueue>,
493 executor: Arc<dyn CompactionExecutor>,
494 worker_id: usize,
495 ) -> Self {
496 Self {
497 queue,
498 executor,
499 worker_id,
500 }
501 }
502
503 pub fn run_once(&self) -> Option<CompactionResult> {
505 if self.queue.is_shutdown() {
506 return None;
507 }
508
509 if !self.queue.can_start_compaction() {
510 return None;
511 }
512
513 let (shard_id, _task) = self.queue.next_task()?;
514 let shard = self.queue.shard(shard_id)?;
515
516 if !shard.try_start_compaction() {
517 return None;
518 }
519
520 let start = Instant::now();
521 let mut result = self.executor.compact(shard_id);
522 result.duration = start.elapsed();
523
524 shard.finish_compaction(result.bytes_reclaimed, result.duration);
525 result.new_version = shard.current_version();
526
527 Some(result)
528 }
529
530 pub fn worker_id(&self) -> usize {
532 self.worker_id
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 #[test]
541 fn test_shard_state_version() {
542 let state = ShardCompactionState::new(0);
543 assert_eq!(state.current_version(), 1);
544
545 state.try_start_compaction();
546 state.finish_compaction(1000, Duration::from_millis(10));
547
548 assert_eq!(state.current_version(), 2);
549 }
550
551 #[test]
552 fn test_compaction_lock() {
553 let state = ShardCompactionState::new(0);
554
555 assert!(!state.is_compacting());
556 assert!(state.try_start_compaction());
557 assert!(state.is_compacting());
558 assert!(!state.try_start_compaction()); state.finish_compaction(0, Duration::ZERO);
561 assert!(!state.is_compacting());
562 }
563
564 #[test]
565 fn test_task_queue() {
566 let state = ShardCompactionState::new(0);
567
568 state.queue_task(CompactionTask::new(0, CompactionPriority::Background));
569 state.queue_task(CompactionTask::new(0, CompactionPriority::High));
570 state.queue_task(CompactionTask::new(0, CompactionPriority::Normal));
571
572 assert_eq!(state.pending_count(), 3);
573
574 let task = state.pop_task().unwrap();
576 assert_eq!(task.priority, CompactionPriority::High);
577 }
578
579 #[test]
580 fn test_compaction_queue() {
581 let config = CompactionConfig {
582 max_concurrent: 2,
583 min_interval: Duration::ZERO,
584 ..Default::default()
585 };
586 let queue = CompactionQueue::new(4, config);
587
588 assert_eq!(queue.num_shards(), 4);
589 assert_eq!(queue.active_compactions(), 0);
590
591 queue.schedule(0, CompactionPriority::Normal);
592 queue.schedule(1, CompactionPriority::High);
593
594 assert_eq!(queue.total_pending(), 2);
595 }
596
597 #[test]
598 fn test_next_task_priority() {
599 let config = CompactionConfig {
600 min_interval: Duration::ZERO,
601 ..Default::default()
602 };
603 let queue = CompactionQueue::new(4, config);
604
605 queue.schedule(0, CompactionPriority::Background);
606 queue.schedule(1, CompactionPriority::Urgent);
607 queue.schedule(2, CompactionPriority::Normal);
608
609 let (shard_id, task) = queue.next_task().unwrap();
611 assert_eq!(shard_id, 1);
612 assert_eq!(task.priority, CompactionPriority::Urgent);
613 }
614
615 #[test]
616 fn test_concurrent_limit() {
617 let config = CompactionConfig {
618 max_concurrent: 2,
619 min_interval: Duration::ZERO,
620 ..Default::default()
621 };
622 let queue = CompactionQueue::new(4, config);
623
624 queue.shard(0).unwrap().try_start_compaction();
626 queue.shard(1).unwrap().try_start_compaction();
627
628 assert_eq!(queue.active_compactions(), 2);
629 assert!(!queue.can_start_compaction());
630
631 queue.shard(0).unwrap().finish_compaction(0, Duration::ZERO);
633 assert!(queue.can_start_compaction());
634 }
635
636 #[test]
637 fn test_reader_guard() {
638 let state = ShardCompactionState::new(0);
639
640 let guard = state.register_reader();
641 assert_eq!(guard.version(), 1);
642 assert_eq!(guard.shard_id(), 0);
643 }
644
645 #[test]
646 fn test_stats_tracking() {
647 let state = ShardCompactionState::new(0);
648
649 state.try_start_compaction();
650 state.finish_compaction(5000, Duration::from_millis(100));
651
652 let stats = state.stats();
653 assert_eq!(stats.compactions_completed.load(Ordering::Relaxed), 1);
654 assert_eq!(stats.bytes_reclaimed.load(Ordering::Relaxed), 5000);
655 assert!(stats.compaction_time_ms.load(Ordering::Relaxed) >= 100);
656 }
657
658 #[test]
659 fn test_mock_executor() {
660 let executor = MockCompactionExecutor::new(Duration::from_millis(10), 1000);
661
662 let result = executor.compact(0);
663 assert!(result.success);
664 assert_eq!(result.bytes_reclaimed, 1000);
665 }
666
667 #[test]
668 fn test_worker_run_once() {
669 let config = CompactionConfig {
670 min_interval: Duration::ZERO,
671 ..Default::default()
672 };
673 let queue = Arc::new(CompactionQueue::new(4, config));
674 let executor = Arc::new(MockCompactionExecutor::new(Duration::from_millis(1), 500));
675 let worker = CompactionWorker::new(queue.clone(), executor, 0);
676
677 assert!(worker.run_once().is_none());
679
680 queue.schedule(0, CompactionPriority::Normal);
682
683 let result = worker.run_once();
685 assert!(result.is_some());
686
687 let result = result.unwrap();
688 assert_eq!(result.shard_id, 0);
689 assert!(result.success);
690 }
691
692 #[test]
693 fn test_shutdown() {
694 let queue = CompactionQueue::new(4, CompactionConfig::default());
695
696 assert!(!queue.is_shutdown());
697 queue.shutdown();
698 assert!(queue.is_shutdown());
699 }
700
701 #[test]
702 fn test_time_since_compaction() {
703 let state = ShardCompactionState::new(0);
704
705 assert!(state.time_since_compaction().is_none());
707
708 state.try_start_compaction();
709 state.finish_compaction(0, Duration::ZERO);
710
711 let elapsed = state.time_since_compaction().unwrap();
713 assert!(elapsed < Duration::from_secs(1));
714 }
715}