use crate::common::{CdcEvent, CdcOp};
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::info;
#[derive(Clone)]
pub enum CompactionKeyStrategy {
PrimaryKey(Vec<String>),
FullRow,
Custom(Arc<dyn Fn(&CdcEvent) -> String + Send + Sync>),
}
impl std::fmt::Debug for CompactionKeyStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CompactionKeyStrategy::PrimaryKey(cols) => {
f.debug_tuple("PrimaryKey").field(cols).finish()
}
CompactionKeyStrategy::FullRow => write!(f, "FullRow"),
CompactionKeyStrategy::Custom(_) => write!(f, "Custom(<fn>)"),
}
}
}
impl CompactionKeyStrategy {
pub fn extract(&self, event: &CdcEvent) -> String {
match self {
CompactionKeyStrategy::PrimaryKey(columns) => {
let values: Vec<String> = columns
.iter()
.filter_map(|col| {
event
.after
.as_ref()
.or(event.before.as_ref())
.and_then(|v| v.get(col))
.map(|v| v.to_string())
})
.collect();
format!("{}:{}:{}", event.schema, event.table, values.join(":"))
}
CompactionKeyStrategy::FullRow => {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
event.schema.hash(&mut hasher);
event.table.hash(&mut hasher);
if let Some(after) = &event.after {
after.to_string().hash(&mut hasher);
}
hasher.finish().to_string()
}
CompactionKeyStrategy::Custom(f) => f(event),
}
}
}
impl Default for CompactionKeyStrategy {
fn default() -> Self {
CompactionKeyStrategy::PrimaryKey(vec!["id".to_string()])
}
}
#[derive(Debug, Clone)]
pub struct CompactionConfig {
pub key_strategy: CompactionKeyStrategy,
pub min_cleanable_ratio: f64,
pub segment_size: usize,
pub tombstone_retention: Duration,
pub max_compaction_interval: Duration,
pub background_compaction: bool,
pub batch_size: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
key_strategy: CompactionKeyStrategy::default(),
min_cleanable_ratio: 0.5,
segment_size: 1_000_000,
tombstone_retention: Duration::from_secs(86400), max_compaction_interval: Duration::from_secs(3600), background_compaction: true,
batch_size: 10_000,
}
}
}
impl CompactionConfig {
pub fn builder() -> CompactionConfigBuilder {
CompactionConfigBuilder::default()
}
pub fn aggressive() -> Self {
Self {
min_cleanable_ratio: 0.3,
segment_size: 100_000,
tombstone_retention: Duration::from_secs(3600),
max_compaction_interval: Duration::from_secs(600),
..Default::default()
}
}
pub fn conservative() -> Self {
Self {
min_cleanable_ratio: 0.7,
segment_size: 10_000_000,
tombstone_retention: Duration::from_secs(604800), max_compaction_interval: Duration::from_secs(86400),
..Default::default()
}
}
}
#[derive(Default)]
pub struct CompactionConfigBuilder {
config: CompactionConfig,
}
impl CompactionConfigBuilder {
pub fn key_strategy(mut self, strategy: CompactionKeyStrategy) -> Self {
self.config.key_strategy = strategy;
self
}
pub fn min_cleanable_ratio(mut self, ratio: f64) -> Self {
self.config.min_cleanable_ratio = ratio.clamp(0.0, 1.0);
self
}
pub fn segment_size(mut self, size: usize) -> Self {
self.config.segment_size = size;
self
}
pub fn tombstone_retention(mut self, retention: Duration) -> Self {
self.config.tombstone_retention = retention;
self
}
pub fn max_compaction_interval(mut self, interval: Duration) -> Self {
self.config.max_compaction_interval = interval;
self
}
pub fn background_compaction(mut self, enabled: bool) -> Self {
self.config.background_compaction = enabled;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.config.batch_size = size;
self
}
pub fn build(self) -> CompactionConfig {
self.config
}
}
#[derive(Debug, Clone)]
pub struct CompactedEntry {
pub key: String,
pub event: Option<CdcEvent>,
pub offset: u64,
pub timestamp: i64,
pub is_tombstone: bool,
pub tombstone_created_at: Option<Instant>,
}
impl CompactedEntry {
pub fn from_event(key: String, event: CdcEvent, offset: u64) -> Self {
let is_tombstone = event.op == CdcOp::Delete;
Self {
key,
timestamp: event.timestamp,
is_tombstone,
tombstone_created_at: if is_tombstone {
Some(Instant::now())
} else {
None
},
event: if is_tombstone { None } else { Some(event) },
offset,
}
}
pub fn tombstone(key: String, offset: u64, timestamp: i64) -> Self {
Self {
key,
event: None,
offset,
timestamp,
is_tombstone: true,
tombstone_created_at: Some(Instant::now()),
}
}
}
#[derive(Debug, Default)]
pub struct CompactionStats {
events_processed: AtomicU64,
events_compacted: AtomicU64,
events_removed: AtomicU64,
tombstones_created: AtomicU64,
tombstones_expired: AtomicU64,
compaction_runs: AtomicU64,
total_compaction_time_ms: AtomicU64,
}
impl CompactionStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_processed(&self, count: u64) {
self.events_processed.fetch_add(count, Ordering::Relaxed);
}
pub fn record_compacted(&self, count: u64) {
self.events_compacted.fetch_add(count, Ordering::Relaxed);
}
pub fn record_removed(&self, count: u64) {
self.events_removed.fetch_add(count, Ordering::Relaxed);
}
pub fn record_tombstone(&self) {
self.tombstones_created.fetch_add(1, Ordering::Relaxed);
}
pub fn record_tombstone_expired(&self, count: u64) {
self.tombstones_expired.fetch_add(count, Ordering::Relaxed);
}
pub fn record_compaction_run(&self, duration: Duration) {
self.compaction_runs.fetch_add(1, Ordering::Relaxed);
self.total_compaction_time_ms
.fetch_add(duration.as_millis() as u64, Ordering::Relaxed);
}
pub fn snapshot(&self) -> CompactionStatsSnapshot {
let runs = self.compaction_runs.load(Ordering::Relaxed);
let total_time = self.total_compaction_time_ms.load(Ordering::Relaxed);
let processed = self.events_processed.load(Ordering::Relaxed);
let compacted = self.events_compacted.load(Ordering::Relaxed);
CompactionStatsSnapshot {
events_processed: processed,
events_compacted: compacted,
events_removed: self.events_removed.load(Ordering::Relaxed),
tombstones_created: self.tombstones_created.load(Ordering::Relaxed),
tombstones_expired: self.tombstones_expired.load(Ordering::Relaxed),
compaction_runs: runs,
avg_compaction_time_ms: if runs > 0 { total_time / runs } else { 0 },
compaction_ratio: if processed > 0 {
compacted as f64 / processed as f64
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct CompactionStatsSnapshot {
pub events_processed: u64,
pub events_compacted: u64,
pub events_removed: u64,
pub tombstones_created: u64,
pub tombstones_expired: u64,
pub compaction_runs: u64,
pub avg_compaction_time_ms: u64,
pub compaction_ratio: f64,
}
pub struct Compactor {
config: CompactionConfig,
log: RwLock<BTreeMap<String, CompactedEntry>>,
dirty_count: AtomicU64,
total_count: AtomicU64,
stats: CompactionStats,
last_compaction: RwLock<Option<Instant>>,
current_offset: AtomicU64,
}
impl Compactor {
pub fn new(config: CompactionConfig) -> Self {
Self {
config,
log: RwLock::new(BTreeMap::new()),
dirty_count: AtomicU64::new(0),
total_count: AtomicU64::new(0),
stats: CompactionStats::new(),
last_compaction: RwLock::new(None),
current_offset: AtomicU64::new(0),
}
}
pub async fn append(&self, event: CdcEvent) -> u64 {
let key = self.config.key_strategy.extract(&event);
let offset = self.current_offset.fetch_add(1, Ordering::SeqCst);
let entry = CompactedEntry::from_event(key.clone(), event, offset);
let mut log = self.log.write().await;
if log.contains_key(&key) {
self.dirty_count.fetch_add(1, Ordering::Relaxed);
} else {
self.total_count.fetch_add(1, Ordering::Relaxed);
}
if entry.is_tombstone {
self.stats.record_tombstone();
}
log.insert(key, entry);
self.stats.record_processed(1);
offset
}
pub async fn append_batch(&self, events: Vec<CdcEvent>) -> Vec<u64> {
let mut offsets = Vec::with_capacity(events.len());
let mut log = self.log.write().await;
for event in events {
let key = self.config.key_strategy.extract(&event);
let offset = self.current_offset.fetch_add(1, Ordering::SeqCst);
let entry = CompactedEntry::from_event(key.clone(), event, offset);
if log.contains_key(&key) {
self.dirty_count.fetch_add(1, Ordering::Relaxed);
} else {
self.total_count.fetch_add(1, Ordering::Relaxed);
}
if entry.is_tombstone {
self.stats.record_tombstone();
}
log.insert(key, entry);
offsets.push(offset);
}
self.stats.record_processed(offsets.len() as u64);
offsets
}
pub async fn get(&self, key: &str) -> Option<CdcEvent> {
let log = self.log.read().await;
log.get(key).and_then(|e| e.event.clone())
}
pub async fn snapshot(&self) -> Vec<CompactedEntry> {
let log = self.log.read().await;
log.values().filter(|e| !e.is_tombstone).cloned().collect()
}
pub async fn snapshot_with_tombstones(&self) -> Vec<CompactedEntry> {
let log = self.log.read().await;
log.values().cloned().collect()
}
pub async fn needs_compaction(&self) -> bool {
let dirty = self.dirty_count.load(Ordering::Relaxed);
let total = self.total_count.load(Ordering::Relaxed);
if total == 0 {
return false;
}
let dirty_ratio = dirty as f64 / total as f64;
if dirty_ratio >= self.config.min_cleanable_ratio {
return true;
}
if let Some(last) = *self.last_compaction.read().await {
if last.elapsed() >= self.config.max_compaction_interval {
return true;
}
}
if total >= self.config.segment_size as u64 {
return true;
}
false
}
pub async fn compact(&self) -> CompactionResult {
let start = Instant::now();
let mut log = self.log.write().await;
let before_count = log.len();
let mut removed = 0u64;
let mut tombstones_expired = 0u64;
let retention = self.config.tombstone_retention;
log.retain(|_, entry| {
if entry.is_tombstone {
if let Some(created) = entry.tombstone_created_at {
if created.elapsed() >= retention {
tombstones_expired += 1;
removed += 1;
return false;
}
}
}
true
});
let after_count = log.len();
let duration = start.elapsed();
self.dirty_count.store(0, Ordering::Relaxed);
self.total_count
.store(after_count as u64, Ordering::Relaxed);
*self.last_compaction.write().await = Some(Instant::now());
self.stats
.record_compacted((before_count - after_count) as u64);
self.stats.record_removed(removed);
self.stats.record_tombstone_expired(tombstones_expired);
self.stats.record_compaction_run(duration);
info!(
"Compaction complete: {} -> {} entries ({} removed, {} tombstones expired) in {:?}",
before_count, after_count, removed, tombstones_expired, duration
);
CompactionResult {
entries_before: before_count,
entries_after: after_count,
entries_removed: removed as usize,
tombstones_expired: tombstones_expired as usize,
duration,
}
}
pub fn compact_batch(
events: &[CdcEvent],
key_strategy: &CompactionKeyStrategy,
) -> Vec<CdcEvent> {
let mut latest: HashMap<String, (usize, &CdcEvent)> = HashMap::new();
for (i, event) in events.iter().enumerate() {
let key = key_strategy.extract(event);
latest.insert(key, (i, event));
}
let mut result: Vec<_> = latest
.into_values()
.filter(|(_, e)| e.op != CdcOp::Delete)
.collect();
result.sort_by_key(|(i, _)| *i);
result.into_iter().map(|(_, e)| e.clone()).collect()
}
pub fn stats(&self) -> CompactionStatsSnapshot {
self.stats.snapshot()
}
pub async fn len(&self) -> usize {
self.log.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.log.read().await.is_empty()
}
pub async fn clear(&self) {
let mut log = self.log.write().await;
log.clear();
self.dirty_count.store(0, Ordering::Relaxed);
self.total_count.store(0, Ordering::Relaxed);
self.current_offset.store(0, Ordering::Relaxed);
}
pub async fn get_by_table(&self, schema: &str, table: &str) -> Vec<CdcEvent> {
let log = self.log.read().await;
log.values()
.filter_map(|entry| {
entry.event.as_ref().and_then(|e| {
if e.schema == schema && e.table == table {
Some(e.clone())
} else {
None
}
})
})
.collect()
}
pub async fn get_by_prefix(&self, prefix: &str) -> Vec<CompactedEntry> {
let log = self.log.read().await;
log.range(prefix.to_string()..)
.take_while(|(k, _)| k.starts_with(prefix))
.map(|(_, v)| v.clone())
.collect()
}
}
#[derive(Debug, Clone)]
pub struct CompactionResult {
pub entries_before: usize,
pub entries_after: usize,
pub entries_removed: usize,
pub tombstones_expired: usize,
pub duration: Duration,
}
impl CompactionResult {
pub fn ratio(&self) -> f64 {
if self.entries_before == 0 {
1.0
} else {
self.entries_after as f64 / self.entries_before as f64
}
}
}
pub struct SegmentedCompactor {
config: CompactionConfig,
active_segment: RwLock<Compactor>,
sealed_segments: RwLock<Vec<Arc<Compactor>>>,
#[allow(dead_code)]
stats: CompactionStats,
}
impl SegmentedCompactor {
pub fn new(config: CompactionConfig) -> Self {
Self {
active_segment: RwLock::new(Compactor::new(config.clone())),
sealed_segments: RwLock::new(Vec::new()),
stats: CompactionStats::new(),
config,
}
}
pub async fn append(&self, event: CdcEvent) -> u64 {
let active = self.active_segment.read().await;
let offset = active.append(event).await;
if active.len().await >= self.config.segment_size {
drop(active);
self.roll_segment().await;
}
offset
}
async fn roll_segment(&self) {
let mut active = self.active_segment.write().await;
let mut sealed = self.sealed_segments.write().await;
let old_active = std::mem::replace(&mut *active, Compactor::new(self.config.clone()));
old_active.compact().await;
sealed.push(Arc::new(old_active));
info!("Rolled to new segment, {} sealed segments", sealed.len());
}
pub async fn get(&self, key: &str) -> Option<CdcEvent> {
let active = self.active_segment.read().await;
if let Some(event) = active.get(key).await {
return Some(event);
}
drop(active);
let sealed = self.sealed_segments.read().await;
for segment in sealed.iter().rev() {
if let Some(event) = segment.get(key).await {
return Some(event);
}
}
None
}
pub async fn snapshot(&self) -> Vec<CdcEvent> {
let mut result: HashMap<String, CdcEvent> = HashMap::new();
let sealed = self.sealed_segments.read().await;
for segment in sealed.iter() {
for entry in segment.snapshot().await {
if let Some(event) = entry.event {
let key = self.config.key_strategy.extract(&event);
result.insert(key, event);
}
}
}
drop(sealed);
let active = self.active_segment.read().await;
for entry in active.snapshot().await {
if let Some(event) = entry.event {
let key = self.config.key_strategy.extract(&event);
result.insert(key, event);
}
}
result.into_values().collect()
}
pub async fn compact_all(&self) {
let active = self.active_segment.read().await;
active.compact().await;
drop(active);
let sealed = self.sealed_segments.read().await;
for segment in sealed.iter() {
segment.compact().await;
}
}
pub async fn len(&self) -> usize {
let active = self.active_segment.read().await;
let mut total = active.len().await;
drop(active);
let sealed = self.sealed_segments.read().await;
for segment in sealed.iter() {
total += segment.len().await;
}
total
}
pub async fn is_empty(&self) -> bool {
self.len().await == 0
}
pub async fn segment_count(&self) -> usize {
self.sealed_segments.read().await.len() + 1 }
}
#[cfg(test)]
mod tests {
use super::*;
fn make_event(table: &str, id: i64, op: CdcOp) -> CdcEvent {
CdcEvent {
source_type: "test".to_string(),
database: "testdb".to_string(),
schema: "public".to_string(),
table: table.to_string(),
op,
before: if op == CdcOp::Delete || op == CdcOp::Update {
Some(serde_json::json!({"id": id}))
} else {
None
},
after: if op != CdcOp::Delete {
Some(serde_json::json!({"id": id, "value": format!("v{}", id)}))
} else {
None
},
timestamp: chrono::Utc::now().timestamp(),
transaction: None,
}
}
#[test]
fn test_key_strategy_primary_key() {
let strategy = CompactionKeyStrategy::PrimaryKey(vec!["id".to_string()]);
let event = make_event("users", 1, CdcOp::Insert);
let key = strategy.extract(&event);
assert!(key.contains("public"));
assert!(key.contains("users"));
assert!(key.contains("1"));
}
#[test]
fn test_key_strategy_full_row() {
let strategy = CompactionKeyStrategy::FullRow;
let event1 = make_event("users", 1, CdcOp::Insert);
let event2 = make_event("users", 2, CdcOp::Insert);
let key1 = strategy.extract(&event1);
let key2 = strategy.extract(&event2);
assert_ne!(key1, key2);
}
#[test]
fn test_config_defaults() {
let config = CompactionConfig::default();
assert_eq!(config.min_cleanable_ratio, 0.5);
assert_eq!(config.segment_size, 1_000_000);
}
#[test]
fn test_config_presets() {
let aggressive = CompactionConfig::aggressive();
assert_eq!(aggressive.min_cleanable_ratio, 0.3);
let conservative = CompactionConfig::conservative();
assert_eq!(conservative.min_cleanable_ratio, 0.7);
}
#[test]
fn test_config_builder() {
let config = CompactionConfig::builder()
.min_cleanable_ratio(0.4)
.segment_size(500_000)
.tombstone_retention(Duration::from_secs(3600))
.build();
assert_eq!(config.min_cleanable_ratio, 0.4);
assert_eq!(config.segment_size, 500_000);
assert_eq!(config.tombstone_retention, Duration::from_secs(3600));
}
#[test]
fn test_compacted_entry_from_event() {
let event = make_event("users", 1, CdcOp::Insert);
let entry = CompactedEntry::from_event("key1".to_string(), event.clone(), 0);
assert_eq!(entry.key, "key1");
assert!(!entry.is_tombstone);
assert!(entry.event.is_some());
}
#[test]
fn test_compacted_entry_tombstone() {
let event = make_event("users", 1, CdcOp::Delete);
let entry = CompactedEntry::from_event("key1".to_string(), event, 0);
assert!(entry.is_tombstone);
assert!(entry.event.is_none());
assert!(entry.tombstone_created_at.is_some());
}
#[tokio::test]
async fn test_compactor_append() {
let compactor = Compactor::new(CompactionConfig::default());
let event = make_event("users", 1, CdcOp::Insert);
let offset = compactor.append(event).await;
assert_eq!(offset, 0);
assert_eq!(compactor.len().await, 1);
}
#[tokio::test]
async fn test_compactor_get() {
let config = CompactionConfig::builder()
.key_strategy(CompactionKeyStrategy::PrimaryKey(vec!["id".to_string()]))
.build();
let compactor = Compactor::new(config);
let event = make_event("users", 1, CdcOp::Insert);
compactor.append(event.clone()).await;
let key = "public:users:1";
let result = compactor.get(key).await;
assert!(result.is_some());
}
#[tokio::test]
async fn test_compactor_keeps_latest() {
let config = CompactionConfig::builder()
.key_strategy(CompactionKeyStrategy::PrimaryKey(vec!["id".to_string()]))
.build();
let compactor = Compactor::new(config);
for i in 0..5 {
let mut event = make_event("users", 1, CdcOp::Update);
event.after = Some(serde_json::json!({"id": 1, "value": format!("v{}", i)}));
compactor.append(event).await;
}
assert_eq!(compactor.len().await, 1);
let key = "public:users:1";
let result = compactor.get(key).await.unwrap();
let after_value = result.after.clone().unwrap();
let value = after_value.get("value").unwrap().as_str().unwrap();
assert_eq!(value, "v4");
}
#[tokio::test]
async fn test_compactor_snapshot() {
let compactor = Compactor::new(CompactionConfig::default());
for i in 0..10 {
let event = make_event("users", i, CdcOp::Insert);
compactor.append(event).await;
}
let snapshot = compactor.snapshot().await;
assert_eq!(snapshot.len(), 10);
}
#[tokio::test]
async fn test_compactor_delete_creates_tombstone() {
let config = CompactionConfig::builder()
.key_strategy(CompactionKeyStrategy::PrimaryKey(vec!["id".to_string()]))
.build();
let compactor = Compactor::new(config);
let insert = make_event("users", 1, CdcOp::Insert);
compactor.append(insert).await;
let delete = make_event("users", 1, CdcOp::Delete);
compactor.append(delete).await;
assert_eq!(compactor.len().await, 1);
let snapshot = compactor.snapshot().await;
assert_eq!(snapshot.len(), 0);
let with_tombstones = compactor.snapshot_with_tombstones().await;
assert_eq!(with_tombstones.len(), 1);
assert!(with_tombstones[0].is_tombstone);
}
#[tokio::test]
async fn test_compactor_needs_compaction() {
let config = CompactionConfig::builder()
.min_cleanable_ratio(0.5)
.segment_size(100)
.build();
let compactor = Compactor::new(config);
assert!(!compactor.needs_compaction().await);
for _ in 0..10 {
let event = make_event("users", 1, CdcOp::Update);
compactor.append(event).await;
}
}
#[tokio::test]
async fn test_compactor_compact() {
let config = CompactionConfig::builder()
.tombstone_retention(Duration::ZERO)
.build();
let compactor = Compactor::new(config);
let insert = make_event("users", 1, CdcOp::Insert);
compactor.append(insert).await;
let delete = make_event("users", 1, CdcOp::Delete);
compactor.append(delete).await;
let result = compactor.compact().await;
assert!(result.tombstones_expired > 0 || result.entries_removed > 0);
}
#[test]
fn test_compact_batch_stateless() {
let events = vec![
make_event("users", 1, CdcOp::Insert),
make_event("users", 2, CdcOp::Insert),
make_event("users", 1, CdcOp::Update), make_event("users", 3, CdcOp::Insert),
make_event("users", 2, CdcOp::Delete), ];
let strategy = CompactionKeyStrategy::PrimaryKey(vec!["id".to_string()]);
let compacted = Compactor::compact_batch(&events, &strategy);
assert_eq!(compacted.len(), 2);
}
#[tokio::test]
async fn test_compactor_clear() {
let compactor = Compactor::new(CompactionConfig::default());
for i in 0..10 {
let event = make_event("users", i, CdcOp::Insert);
compactor.append(event).await;
}
assert!(!compactor.is_empty().await);
compactor.clear().await;
assert!(compactor.is_empty().await);
}
#[tokio::test]
async fn test_compactor_get_by_table() {
let compactor = Compactor::new(CompactionConfig::default());
for i in 0..5 {
compactor
.append(make_event("users", i, CdcOp::Insert))
.await;
compactor
.append(make_event("orders", i, CdcOp::Insert))
.await;
}
let users = compactor.get_by_table("public", "users").await;
assert_eq!(users.len(), 5);
assert!(users.iter().all(|e| e.table == "users"));
let orders = compactor.get_by_table("public", "orders").await;
assert_eq!(orders.len(), 5);
}
#[test]
fn test_stats_snapshot() {
let stats = CompactionStats::new();
stats.record_processed(100);
stats.record_compacted(20);
stats.record_removed(10);
stats.record_tombstone();
stats.record_tombstone_expired(5);
stats.record_compaction_run(Duration::from_millis(100));
let snapshot = stats.snapshot();
assert_eq!(snapshot.events_processed, 100);
assert_eq!(snapshot.events_compacted, 20);
assert_eq!(snapshot.events_removed, 10);
assert_eq!(snapshot.tombstones_created, 1);
assert_eq!(snapshot.tombstones_expired, 5);
assert_eq!(snapshot.compaction_runs, 1);
assert!((snapshot.compaction_ratio - 0.2).abs() < 0.001);
}
#[test]
fn test_compaction_result_ratio() {
let result = CompactionResult {
entries_before: 1000,
entries_after: 800,
entries_removed: 200,
tombstones_expired: 50,
duration: Duration::from_millis(100),
};
assert!((result.ratio() - 0.8).abs() < 0.001);
}
#[tokio::test]
async fn test_segmented_compactor_basic() {
let config = CompactionConfig::builder().segment_size(10).build();
let compactor = SegmentedCompactor::new(config);
for i in 0..5 {
let event = make_event("users", i, CdcOp::Insert);
compactor.append(event).await;
}
assert_eq!(compactor.len().await, 5);
}
#[tokio::test]
async fn test_segmented_compactor_snapshot() {
let config = CompactionConfig::builder().segment_size(100).build();
let compactor = SegmentedCompactor::new(config);
for i in 0..20 {
let event = make_event("users", i, CdcOp::Insert);
compactor.append(event).await;
}
let snapshot = compactor.snapshot().await;
assert_eq!(snapshot.len(), 20);
}
}