#![allow(dead_code)]
use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use crate::storage::{self, CompressionCodec, FileSegment, Segment, SegmentWriter};
use super::memtable::{FrozenMemTable, MemTableEntry};
#[derive(Debug, thiserror::Error)]
pub enum FlushError {
#[error("storage error: {0}")]
Storage(#[from] storage::StorageError),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
#[derive(Clone, Debug)]
pub struct IndexEntry {
pub key: MemTableKey,
pub offset: u64,
}
#[derive(Clone, Debug)]
pub struct MemTableKey(pub std::sync::Arc<[u8]>);
impl AsRef<[u8]> for MemTableKey {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
#[derive(Clone, Debug)]
pub struct IndexSidecar {
pub path: PathBuf,
pub entries: Vec<IndexEntry>,
}
impl IndexSidecar {
fn write_to_disk(&self) -> Result<(), std::io::Error> {
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?;
}
let estimated: usize = self
.entries
.iter()
.map(|e| 4 + e.key.as_ref().len() + 8)
.sum();
let mut writer =
BufWriter::with_capacity(estimated.max(8192), File::create(&self.path)?);
for entry in &self.entries {
let key = entry.key.as_ref();
let len = key.len() as u32;
writer.write_all(&len.to_le_bytes())?;
writer.write_all(key)?;
writer.write_all(&entry.offset.to_le_bytes())?;
}
writer.flush()?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct FlushOutcome {
pub segment: FileSegment,
pub index: IndexSidecar,
}
pub struct FlushCoordinator {
base_dir: PathBuf,
compression: CompressionCodec,
next_id: AtomicU64,
}
impl FlushCoordinator {
pub fn new<P: AsRef<Path>>(base_dir: P, compression: CompressionCodec) -> Self {
Self {
base_dir: base_dir.as_ref().to_path_buf(),
compression,
next_id: AtomicU64::new(0),
}
}
fn segments_dir(&self) -> PathBuf {
self.base_dir.join("segments")
}
fn index_dir(&self) -> PathBuf {
self.base_dir.join("indexes")
}
fn encode_entry(entry: &MemTableEntry, buffer: &mut Vec<u8>) {
buffer.clear();
buffer.reserve(entry.key.len() + entry.value.len() + 32);
buffer.extend_from_slice(&(entry.key.len() as u32).to_le_bytes());
buffer.extend_from_slice(&(entry.value.len() as u32).to_le_bytes());
buffer.extend_from_slice(&entry.sequence.to_le_bytes());
buffer.push(entry.tombstone as u8);
buffer.extend_from_slice(entry.key.as_ref());
buffer.extend_from_slice(entry.value.as_ref());
}
pub fn flush(&self, frozen: FrozenMemTable) -> Result<FlushOutcome, FlushError> {
let entries = frozen.into_entries();
if entries.is_empty() {
let empty_path = self.index_dir().join("empty.idx");
let sidecar = IndexSidecar {
path: empty_path,
entries: Vec::new(),
};
return Ok(FlushOutcome {
segment: FileSegment::new(storage::SegmentMetadata {
id: String::new(),
path: PathBuf::new(),
size_bytes: 0,
compressed_size_bytes: 0,
compression: self.compression.clone(),
pages: Vec::new(),
sha256: String::new(),
created_at_ms: 0,
}),
index: sidecar,
});
}
let flush_id = self.next_id.fetch_add(1, Ordering::SeqCst);
let segment_path = self
.segments_dir()
.join(format!("flush-{flush_id:016x}.seg"));
let total_payload: usize = entries
.iter()
.map(|e| 4 + 4 + 8 + 1 + e.key.len() + e.value.len())
.sum();
let mut writer = SegmentWriter::with_capacity(&segment_path, self.compression.clone(), total_payload);
let mut index_entries = Vec::with_capacity(entries.len());
for entry in &entries {
let offset = writer.bytes_written() as u64;
writer.append_entry(entry.key.as_ref(), entry.value.as_ref(), entry.sequence, entry.tombstone);
index_entries.push(IndexEntry {
key: MemTableKey(entry.key.clone()),
offset,
});
}
let segment = writer.finish()?;
let sidecar_path = self
.index_dir()
.join(segment.metadata().id.clone())
.with_extension("idx");
let sidecar = IndexSidecar {
path: sidecar_path,
entries: index_entries,
};
sidecar.write_to_disk()?;
Ok(FlushOutcome {
segment,
index: sidecar,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::CompressionCodec;
use crate::write::memtable::MemTable;
fn encoded_len(entry: &MemTableEntry) -> usize {
4 + 4 + 8 + 1 + entry.key.len()
+ entry.value.len()
}
fn align_to_page(len: usize) -> usize {
const PAGE_SIZE: usize = 4096;
if len == 0 {
0
} else {
((len + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE
}
}
#[test]
fn flush_segment_matches_payload_size() {
let tempdir = tempfile::tempdir().expect("tempdir");
let table = MemTable::new();
table
.put(b"key-a", b"value-a".to_vec())
.expect("put should succeed");
table
.put(b"key-b", b"value-b-with-some-extra-bytes".to_vec())
.expect("put should succeed");
let frozen = table.freeze();
let expected_payload: usize = frozen.entries().iter().map(encoded_len).sum();
assert_ne!(expected_payload % 4096, 0);
let coordinator = FlushCoordinator::new(tempdir.path(), CompressionCodec::None);
let outcome = coordinator
.flush(frozen)
.expect("flush should succeed without padding");
let aligned_expected = align_to_page(expected_payload);
assert_eq!(
outcome.segment.metadata().size_bytes as usize,
aligned_expected,
"segment should be padded only to the next page boundary"
);
let total_page_bytes: usize = outcome
.segment
.metadata()
.pages
.iter()
.map(|page| page.length as usize)
.sum();
assert_eq!(total_page_bytes, aligned_expected);
assert!(
outcome
.segment
.metadata()
.pages
.iter()
.all(|page| page.length as usize <= 4096)
);
}
}