use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::{Path, PathBuf};
use bytes::{Bytes, BytesMut};
use crabka_protocol::records::RecordBatch;
use crate::error::LogError;
use crate::name;
use crate::segment::Segment;
fn read_all_batches(seg: &Segment) -> Result<Vec<RecordBatch>, LogError> {
let path = name::log_path(seg.dir(), seg.base_offset());
let bytes = std::fs::read(&path)?;
let mut cursor: &[u8] = &bytes;
let mut out: Vec<RecordBatch> = Vec::new();
while !cursor.is_empty() {
let Ok(batch) = RecordBatch::decode(&mut cursor) else {
break;
};
out.push(batch);
}
Ok(out)
}
pub fn build_offset_map(segments: &[&Segment]) -> Result<HashMap<Bytes, i64>, LogError> {
let mut map: HashMap<Bytes, i64> = HashMap::new();
for seg in segments {
for batch in read_all_batches(seg)? {
for record in &batch.records {
let Some(key_bytes) = record.key.as_ref() else {
continue;
};
let absolute = batch.base_offset + i64::from(record.offset_delta);
map.insert(key_bytes.clone(), absolute);
}
}
}
Ok(map)
}
#[cfg(test)]
#[allow(
clippy::similar_names,
clippy::redundant_closure,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap
)]
mod build_map_tests {
use super::*;
use assert2::assert;
use bytes::Bytes;
use crabka_protocol::records::{Attributes, Record};
use tempfile::tempdir;
pub(super) fn make_record(
offset_delta: i32,
key: Option<&[u8]>,
value: Option<&[u8]>,
) -> Record {
Record {
offset_delta,
key: key.map(Bytes::copy_from_slice),
value: value.map(Bytes::copy_from_slice),
..Default::default()
}
}
pub(super) fn write_sealed_segment(
dir: &Path,
base_offset: i64,
records: Vec<Record>,
) -> Segment {
let mut seg = Segment::create(dir, base_offset).unwrap();
let n = i32::try_from(records.len()).expect("record count fits i32");
let max_ts = records.iter().map(|r| r.timestamp_delta).max().unwrap_or(0);
let batch = RecordBatch {
base_offset,
last_offset_delta: n - 1,
max_timestamp: max_ts,
records,
attributes: Attributes::default(),
..RecordBatch::default()
};
seg.append(&batch, 4096).unwrap();
seg.seal();
seg
}
#[test]
fn build_offset_map_keeps_newest_offset_per_key() {
let dir = tempdir().unwrap();
let seg0 = write_sealed_segment(
dir.path(),
0,
vec![
make_record(0, Some(b"k1"), Some(b"v1")),
make_record(1, Some(b"k2"), Some(b"v2")),
make_record(2, Some(b"k1"), Some(b"v3")), ],
);
let segs: Vec<&Segment> = vec![&seg0];
let map = build_offset_map(&segs).unwrap();
assert!(map.get(b"k1".as_ref()) == Some(&2));
assert!(map.get(b"k2".as_ref()) == Some(&1));
}
#[test]
fn build_offset_map_drops_null_key_records() {
let dir = tempdir().unwrap();
let seg0 = write_sealed_segment(
dir.path(),
0,
vec![
make_record(0, None, Some(b"no-key-1")),
make_record(1, Some(b"k1"), Some(b"v1")),
make_record(2, None, Some(b"no-key-2")),
],
);
let segs: Vec<&Segment> = vec![&seg0];
let map = build_offset_map(&segs).unwrap();
assert!(map.len() == 1);
assert!(map.get(b"k1".as_ref()) == Some(&1));
}
#[test]
fn build_offset_map_across_segments_uses_newest() {
let dir = tempdir().unwrap();
let seg0 = write_sealed_segment(
dir.path(),
0,
vec![make_record(0, Some(b"k1"), Some(b"v1"))],
);
let seg1 = write_sealed_segment(
dir.path(),
10,
vec![make_record(0, Some(b"k1"), Some(b"v2"))],
);
let segs: Vec<&Segment> = vec![&seg0, &seg1];
let map = build_offset_map(&segs).unwrap();
assert!(map.get(b"k1".as_ref()) == Some(&10));
}
}
pub struct RewriteOutput {
pub log_swap: PathBuf,
pub index_swap: PathBuf,
pub timeindex_swap: PathBuf,
pub new_base_offset: i64,
#[allow(dead_code)]
pub new_last_offset: i64,
}
pub fn rewrite_segments(
dir: &Path,
segments: &[&Segment],
offset_map: &HashMap<Bytes, i64>,
_index_interval_bytes: u32,
) -> Result<RewriteOutput, LogError> {
let first = segments
.first()
.ok_or_else(|| LogError::Io(std::io::Error::other("rewrite_segments: empty input")))?;
let new_base = first.base_offset();
let log_swap = swap_path(dir, new_base, "log");
let index_swap = swap_path(dir, new_base, "index");
let timeindex_swap = swap_path(dir, new_base, "timeindex");
let mut log_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&log_swap)?;
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&index_swap)?;
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&timeindex_swap)?;
let mut last_kept_offset = new_base - 1;
for seg in segments {
for batch in read_all_batches(seg)? {
let mut kept: Vec<crabka_protocol::records::Record> =
Vec::with_capacity(batch.records.len());
for record in &batch.records {
let Some(key_bytes) = record.key.as_ref() else {
continue;
};
let absolute = batch.base_offset + i64::from(record.offset_delta);
if offset_map.get(key_bytes.as_ref()).copied() == Some(absolute) {
kept.push(record.clone());
}
}
if kept.is_empty() {
continue;
}
let last_delta = kept
.iter()
.map(|r| r.offset_delta)
.max()
.expect("kept non-empty");
let out_batch = RecordBatch {
base_offset: batch.base_offset,
last_offset_delta: last_delta,
max_timestamp: batch.max_timestamp,
attributes: batch.attributes,
records: kept,
..batch.clone()
};
let mut buf = BytesMut::with_capacity(out_batch.encoded_len());
out_batch.encode(&mut buf)?;
log_file.write_all(&buf)?;
let batch_last = out_batch.base_offset + i64::from(out_batch.last_offset_delta);
if batch_last > last_kept_offset {
last_kept_offset = batch_last;
}
}
}
log_file.sync_all()?;
Ok(RewriteOutput {
log_swap,
index_swap,
timeindex_swap,
new_base_offset: new_base,
new_last_offset: last_kept_offset,
})
}
fn swap_path(dir: &Path, base_offset: i64, ext: &str) -> PathBuf {
dir.join(format!(
"{}.{}.swap",
name::format_base_offset(base_offset),
ext
))
}
#[cfg(test)]
#[allow(clippy::similar_names)]
mod rewrite_tests {
use super::build_map_tests::{make_record, write_sealed_segment};
use super::*;
use assert2::assert;
use std::fs;
#[test]
fn rewrite_drops_superseded_records() {
let dir = tempfile::tempdir().unwrap();
let seg0 = write_sealed_segment(
dir.path(),
0,
vec![
make_record(0, Some(b"k1"), Some(b"v1")),
make_record(1, Some(b"k2"), Some(b"v2")),
make_record(2, Some(b"k1"), Some(b"v3")),
],
);
let segs = vec![&seg0];
let map = build_offset_map(&segs).unwrap();
let out = rewrite_segments(dir.path(), &segs, &map, 4096).unwrap();
assert!(out.new_base_offset == 0);
let bytes = fs::read(&out.log_swap).unwrap();
let mut cursor = &bytes[..];
let batch = RecordBatch::decode(&mut cursor).unwrap();
assert!(batch.records.len() == 2);
let keys: Vec<_> = batch
.records
.iter()
.map(|r| r.key.as_ref().unwrap().to_vec())
.collect();
assert!(keys == vec![b"k2".to_vec(), b"k1".to_vec()]);
}
#[test]
fn rewrite_keeps_tombstone_as_latest() {
let dir = tempfile::tempdir().unwrap();
let seg0 = write_sealed_segment(
dir.path(),
0,
vec![
make_record(0, Some(b"k1"), Some(b"v1")),
make_record(1, Some(b"k1"), None), ],
);
let segs = vec![&seg0];
let map = build_offset_map(&segs).unwrap();
let out = rewrite_segments(dir.path(), &segs, &map, 4096).unwrap();
let bytes = fs::read(&out.log_swap).unwrap();
let mut cursor = &bytes[..];
let batch = RecordBatch::decode(&mut cursor).unwrap();
assert!(batch.records.len() == 1);
assert!(batch.records[0].value.is_none());
assert!(batch.records[0].key.as_ref().unwrap().as_ref() == b"k1");
}
#[test]
fn rewrite_preserves_absolute_offsets() {
let dir = tempfile::tempdir().unwrap();
let seg0 = write_sealed_segment(
dir.path(),
100,
vec![
make_record(0, Some(b"k1"), Some(b"v1")), make_record(1, Some(b"k2"), Some(b"v2")), make_record(2, Some(b"k1"), Some(b"v3")), ],
);
let segs = vec![&seg0];
let map = build_offset_map(&segs).unwrap();
let out = rewrite_segments(dir.path(), &segs, &map, 4096).unwrap();
assert!(out.new_base_offset == 100);
assert!(out.new_last_offset == 102);
let bytes = std::fs::read(&out.log_swap).unwrap();
let mut cursor = &bytes[..];
let batch = RecordBatch::decode(&mut cursor).unwrap();
assert!(batch.base_offset == 100);
assert!(batch.last_offset_delta == 2);
let abs_offsets: Vec<i64> = batch
.records
.iter()
.map(|r| batch.base_offset + i64::from(r.offset_delta))
.collect();
assert!(abs_offsets == vec![101, 102]);
}
}
pub fn atomic_swap(
dir: &Path,
consumed_base_offsets: &[i64],
rewrite: &RewriteOutput,
) -> Result<(), LogError> {
OpenOptions::new()
.write(true)
.open(&rewrite.log_swap)?
.sync_all()?;
OpenOptions::new()
.write(true)
.open(&rewrite.index_swap)?
.sync_all()?;
OpenOptions::new()
.write(true)
.open(&rewrite.timeindex_swap)?
.sync_all()?;
for base in consumed_base_offsets {
let _ = std::fs::remove_file(name::log_path(dir, *base));
let _ = std::fs::remove_file(name::index_path(dir, *base));
let _ = std::fs::remove_file(name::timeindex_path(dir, *base));
}
std::fs::rename(
&rewrite.log_swap,
name::log_path(dir, rewrite.new_base_offset),
)?;
std::fs::rename(
&rewrite.index_swap,
name::index_path(dir, rewrite.new_base_offset),
)?;
std::fs::rename(
&rewrite.timeindex_swap,
name::timeindex_path(dir, rewrite.new_base_offset),
)?;
#[cfg(unix)]
{
if let Ok(d) = std::fs::File::open(dir) {
let _ = d.sync_all();
}
}
Ok(())
}
#[cfg(test)]
#[allow(clippy::similar_names)]
mod swap_tests {
use super::build_map_tests::{make_record, write_sealed_segment};
use super::*;
use assert2::assert;
#[test]
fn atomic_swap_replaces_two_segments_with_one() {
let dir = tempfile::tempdir().unwrap();
let rewrite = {
let seg0 = write_sealed_segment(
dir.path(),
0,
vec![make_record(0, Some(b"k1"), Some(b"v1"))],
);
let seg1 = write_sealed_segment(
dir.path(),
10,
vec![make_record(0, Some(b"k1"), Some(b"v2"))],
);
let segs = vec![&seg0, &seg1];
let map = build_offset_map(&segs).unwrap();
rewrite_segments(dir.path(), &segs, &map, 4096).unwrap()
};
atomic_swap(dir.path(), &[0, 10], &rewrite).unwrap();
assert!(name::log_path(dir.path(), 0).exists());
assert!(!name::log_path(dir.path(), 10).exists());
assert!(!dir.path().join("00000000000000000000.log.swap").exists());
}
}