use std::collections::BTreeMap;
use crate::infinitedb_core::{
address::{RevisionId, SpaceId},
block::{Block, BlockId, Record},
checksum::Checksum,
snapshot::SnapshotId,
};
use super::gc::{apply_retention, RetentionPolicy};
#[derive(Debug, Clone)]
pub struct CompactionConfig {
pub max_records_per_block: usize,
pub retain_history: bool,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_records_per_block: 4096,
retain_history: true,
}
}
}
pub struct CompactionResult {
pub new_blocks: Vec<Block>,
pub superseded: Vec<BlockId>,
}
pub fn compact<F>(
input_blocks: Vec<Block>,
config: &CompactionConfig,
retention: Option<&RetentionPolicy>,
_snapshot: SnapshotId,
mut next_block_id: F,
) -> CompactionResult
where
F: FnMut() -> BlockId,
{
let superseded: Vec<BlockId> = input_blocks.iter().map(|b| b.id).collect();
let space = input_blocks
.first()
.map(|b| b.space)
.unwrap_or(SpaceId(0));
let mut all: Vec<Record> = input_blocks
.into_iter()
.flat_map(|b| b.records.into_iter())
.collect();
all.sort_by(|a, b| {
a.address
.point
.coords
.cmp(&b.address.point.coords)
.then_with(|| a.revision.cmp(&b.revision))
});
if let Some(policy) = retention {
all = apply_retention(all, policy);
}
let records: Vec<Record> = if config.retain_history {
all
} else {
let mut map: BTreeMap<Vec<u32>, Record> = BTreeMap::new();
for rec in all {
map.insert(rec.address.point.coords.clone(), rec);
}
map.into_values().collect()
};
let new_blocks = records
.chunks(config.max_records_per_block)
.map(|chunk| {
let chunk = chunk.to_vec();
let min_rev = chunk.iter().map(|r| r.revision).min().unwrap_or(RevisionId::ZERO);
let max_rev = chunk.iter().map(|r| r.revision).max().unwrap_or(RevisionId::ZERO);
Block {
id: next_block_id(),
space,
records: chunk,
min_revision: min_rev,
max_revision: max_rev,
checksum: Checksum::ZERO, }
})
.collect();
CompactionResult { new_blocks, superseded }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::infinitedb_core::{
address::{Address, DimensionVector, RevisionId, SpaceId},
block::Record,
hilbert_key::CachedHilbertKey,
snapshot::SnapshotId,
};
fn make_record(x: u32, rev: u64, tombstone: bool) -> Record {
Record {
address: Address::new(SpaceId(1), DimensionVector::new(vec![x, 0])),
revision: RevisionId::legacy(rev),
data: vec![],
tombstone,
hilbert_key: CachedHilbertKey::UNSET,
}
}
fn make_block(id: u64, records: Vec<Record>) -> Block {
Block {
id: BlockId(id),
space: SpaceId(1),
min_revision: RevisionId::legacy(0),
max_revision: RevisionId::legacy(99),
records,
checksum: Checksum::ZERO,
}
}
#[test]
fn compacts_two_blocks_into_one() {
let blocks = vec![
make_block(1, vec![make_record(1, 1, false), make_record(2, 1, false)]),
make_block(2, vec![make_record(3, 1, false), make_record(4, 1, false)]),
];
let mut next_id = 10u64;
let result = compact(
blocks,
&CompactionConfig::default(),
None,
SnapshotId(1),
|| { let id = BlockId(next_id); next_id += 1; id },
);
assert_eq!(result.superseded, vec![BlockId(1), BlockId(2)]);
assert_eq!(result.new_blocks.len(), 1);
assert_eq!(result.new_blocks[0].records.len(), 4);
}
#[test]
fn dedup_keeps_latest_revision() {
let blocks = vec![make_block(
1,
vec![make_record(1, 1, false), make_record(1, 2, false)],
)];
let config = CompactionConfig { retain_history: false, ..Default::default() };
let mut next_id = 10u64;
let result = compact(
blocks,
&config,
None,
SnapshotId(1),
|| { let id = BlockId(next_id); next_id += 1; id },
);
assert_eq!(result.new_blocks[0].records.len(), 1);
assert_eq!(result.new_blocks[0].records[0].revision, RevisionId::legacy(2));
}
}