use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::sync::{self, Mutex, MutexGuard};
use crate::disk_loc::DiskLoc;
use crate::entry::{self, make_tombstone_gsn, serialize_entry};
use crate::error::DbResult;
use crate::io::aligned_buf::AlignedBuf;
use crate::io::direct;
#[cfg(feature = "encryption")]
use crate::crypto::PageCipher;
#[cfg(feature = "encryption")]
use crate::io::tags::{self, TagFile};
#[cfg(feature = "encryption")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum EncryptedFlushMode {
NonForce,
ForceAdvance,
}
pub(crate) struct WriteBuffer {
buf: AlignedBuf,
len: usize,
base_offset: u64,
}
impl WriteBuffer {
fn new(capacity: usize, base_offset: u64) -> Self {
Self {
buf: AlignedBuf::zeroed(capacity),
len: 0,
base_offset,
}
}
fn append(&mut self, data: &[u8]) -> u64 {
let offset = self.base_offset + self.len as u64;
self.buf[self.len..self.len + data.len()].copy_from_slice(data);
self.len += data.len();
offset
}
#[cfg(feature = "var-collections")]
pub(crate) fn read(&self, file_offset: u64, len: usize) -> Option<&[u8]> {
if file_offset >= self.base_offset {
let start = (file_offset - self.base_offset) as usize;
if start + len <= self.len {
return Some(&self.buf[start..start + len]);
}
}
None
}
fn is_full(&self, needed: usize) -> bool {
self.buf.len() - self.len < needed
}
#[inline]
fn capacity(&self) -> usize {
self.buf.len()
}
fn data(&self) -> &[u8] {
&self.buf[..self.len]
}
fn reset(&mut self, new_base: u64) {
self.len = 0;
self.base_offset = new_base;
}
#[cfg(feature = "encryption")]
fn compact(&mut self, flushed_bytes: usize) {
let remainder = self.len - flushed_bytes;
if remainder > 0 {
self.buf.copy_within(flushed_bytes..self.len, 0);
}
self.base_offset += flushed_bytes as u64;
self.len = remainder;
}
}
pub struct Shard {
pub id: u8,
dir: PathBuf,
gsn: Arc<AtomicU64>,
inner: Mutex<ShardInner>,
}
pub struct ShardInner {
pub(crate) active: ActiveFile,
pub(crate) write_buf: WriteBuffer,
pub(crate) immutable: Vec<std::sync::Arc<ImmutableFile>>,
pub(crate) dead_bytes: std::collections::HashMap<u32, u64>,
pub(crate) key_len: Option<usize>,
pub(crate) hints: bool,
pub(crate) next_file_id: u32,
pub(crate) max_file_size: u64,
pub(crate) last_compaction_output_ids: Vec<u32>,
gsn: Arc<AtomicU64>,
#[cfg(target_os = "linux")]
uring_writer: crate::io::uring::UringWriter,
#[cfg(feature = "encryption")]
pub(crate) cipher: Option<Arc<PageCipher>>,
#[cfg(feature = "replication")]
pub(crate) replication_tx: Option<rtrb::Producer<crate::replication::ReplicationEntry>>,
}
pub(crate) struct ActiveFile {
pub(crate) file: std::fs::File,
pub(crate) read_file: Arc<std::fs::File>,
pub(crate) file_id: u32,
pub(crate) write_offset: u64,
pub(crate) path: PathBuf,
#[cfg(feature = "encryption")]
pub(crate) tag_file: Option<Arc<TagFile>>,
}
pub(crate) struct ImmutableFile {
pub(crate) file: std::fs::File,
pub(crate) file_id: u32,
#[cfg(feature = "encryption")]
pub(crate) path: PathBuf,
pub(crate) total_bytes: u64,
#[cfg(feature = "encryption")]
pub(crate) tag_file: Option<Arc<TagFile>>,
}
impl Shard {
pub fn open(
id: u8,
dir: &Path,
max_file_size: u64,
write_buffer_size: usize,
hints: bool,
gsn: Arc<AtomicU64>,
) -> DbResult<Self> {
Self::open_inner(
id,
dir,
max_file_size,
write_buffer_size,
hints,
#[cfg(feature = "encryption")]
None,
gsn,
)
}
#[cfg(feature = "encryption")]
pub fn open_encrypted(
id: u8,
dir: &Path,
max_file_size: u64,
write_buffer_size: usize,
hints: bool,
cipher: Option<Arc<PageCipher>>,
gsn: Arc<AtomicU64>,
) -> DbResult<Self> {
Self::open_inner(
id,
dir,
max_file_size,
write_buffer_size,
hints,
cipher,
gsn,
)
}
fn open_inner(
id: u8,
dir: &Path,
max_file_size: u64,
write_buffer_size: usize,
hints: bool,
#[cfg(feature = "encryption")] cipher: Option<Arc<PageCipher>>,
gsn: Arc<AtomicU64>,
) -> DbResult<Self> {
fs::create_dir_all(dir)?;
let mut file_ids: Vec<u32> = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let name = entry.file_name();
let name = name.to_string_lossy();
if name.ends_with(".data")
&& let Ok(id) = name.trim_end_matches(".data").parse::<u32>()
{
file_ids.push(id);
}
}
file_ids.sort();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let name = entry.file_name();
let name = name.to_string_lossy();
if name.ends_with(".data.tmp")
|| name.ends_with(".tags.tmp")
|| name.ends_with(".hint.tmp")
{
match fs::remove_file(entry.path()) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
}
}
let mut immutable = Vec::new();
#[cfg(feature = "encryption")]
let has_cipher = cipher.is_some();
if file_ids.is_empty() {
let file_id = 1u32;
let path = dir.join(format!("{file_id:06}.data"));
let file = direct::open_write(&path)?;
let read_file = Arc::new(direct::open_read(&path)?);
#[cfg(feature = "encryption")]
let tag_file = if has_cipher {
Some(Arc::new(TagFile::open_write(&tags::tags_path_for_data(
&path,
))?))
} else {
None
};
let active = ActiveFile {
file,
read_file,
file_id,
write_offset: 0,
path,
#[cfg(feature = "encryption")]
tag_file,
};
#[cfg(target_os = "linux")]
let mut uring_writer = crate::io::uring::UringWriter::new()?;
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
uring_writer.set_file(active.file.as_raw_fd());
}
return Ok(Self {
id,
dir: dir.to_path_buf(),
gsn: gsn.clone(),
inner: Mutex::new(ShardInner {
active,
write_buf: WriteBuffer::new(write_buffer_size, 0),
immutable,
dead_bytes: std::collections::HashMap::new(),
key_len: None,
hints,
next_file_id: 2,
max_file_size,
last_compaction_output_ids: Vec::new(),
gsn,
#[cfg(target_os = "linux")]
uring_writer,
#[cfg(feature = "encryption")]
cipher,
#[cfg(feature = "replication")]
replication_tx: None,
}),
});
}
let active_id = file_ids.pop().expect("file_ids is not empty");
for &fid in &file_ids {
let path = dir.join(format!("{fid:06}.data"));
let file = direct::open_read(&path)?;
let total_bytes = file.metadata()?.len();
#[cfg(feature = "encryption")]
let tag_file = if has_cipher {
let tp = tags::tags_path_for_data(&path);
if tp.exists() {
Some(Arc::new(TagFile::open_read(&tp)?))
} else {
None
}
} else {
None
};
immutable.push(std::sync::Arc::new(ImmutableFile {
file,
file_id: fid,
#[cfg(feature = "encryption")]
path,
total_bytes,
#[cfg(feature = "encryption")]
tag_file,
}));
}
let active_path = dir.join(format!("{active_id:06}.data"));
let active_file = direct::open_write(&active_path)?;
let write_offset = active_file.metadata()?.len();
let active_read = Arc::new(direct::open_read(&active_path)?);
#[cfg(feature = "encryption")]
let tag_file = if has_cipher {
Some(Arc::new(TagFile::open_write(&tags::tags_path_for_data(
&active_path,
))?))
} else {
None
};
let active = ActiveFile {
file: active_file,
read_file: active_read,
file_id: active_id,
write_offset,
path: active_path,
#[cfg(feature = "encryption")]
tag_file,
};
#[cfg(target_os = "linux")]
let mut uring_writer = crate::io::uring::UringWriter::new()?;
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
uring_writer.set_file(active.file.as_raw_fd());
}
Ok(Self {
id,
dir: dir.to_path_buf(),
gsn: gsn.clone(),
inner: Mutex::new(ShardInner {
active,
write_buf: WriteBuffer::new(write_buffer_size, write_offset),
immutable,
dead_bytes: std::collections::HashMap::new(),
key_len: None,
hints,
next_file_id: active_id
.checked_add(1)
.ok_or(crate::error::DbError::Client(
"file_id space exhausted at open",
))?,
max_file_size,
last_compaction_output_ids: Vec::new(),
gsn,
#[cfg(target_os = "linux")]
uring_writer,
#[cfg(feature = "encryption")]
cipher,
#[cfg(feature = "replication")]
replication_tx: None,
}),
})
}
pub fn gsn(&self) -> &AtomicU64 {
&self.gsn
}
pub fn lock(&self) -> MutexGuard<'_, ShardInner> {
sync::lock(&self.inner)
}
#[cfg(feature = "var-collections")]
pub fn read_block(
&self,
file_id: u32,
block_offset: u64,
) -> DbResult<(crate::io::aligned_buf::AlignedBuf, bool)> {
let refs = {
let inner = sync::lock(&self.inner);
inner.resolve_block_refs(file_id)?
};
pread_decrypt_block(file_id, block_offset, &refs)
}
pub fn dir(&self) -> &Path {
&self.dir
}
pub fn file_ids(&self) -> Vec<u32> {
let inner = sync::lock(&self.inner);
let mut ids: Vec<u32> = inner.immutable.iter().map(|f| f.file_id).collect();
ids.push(inner.active.file_id);
ids
}
pub fn file_sizes(&self) -> Vec<(u32, u64)> {
let inner = sync::lock(&self.inner);
let mut result: Vec<(u32, u64)> = inner
.immutable
.iter()
.map(|f| (f.file_id, f.total_bytes))
.collect();
result.push((inner.active.file_id, inner.active.write_offset));
result
}
pub fn write_active_hint(&self, key_len: usize) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
if inner.active.write_offset == 0 {
return Ok(()); }
inner.flush_write_buf_final()?;
#[cfg(target_os = "linux")]
inner.uring_writer.fsync()?;
#[cfg(not(target_os = "linux"))]
direct::fsync(&inner.active.file)?;
#[cfg(feature = "encryption")]
if let Some(ref tag_file) = inner.active.tag_file {
tag_file.sync()?;
}
let read_file = direct::open_read(&inner.active.path)?;
let file_len = inner.active.write_offset;
#[cfg(feature = "encryption")]
let hint_data = if let (Some(cipher), Some(tag_file)) =
(&inner.cipher, &inner.active.tag_file)
{
match crate::hint::generate_hint_data_dyn_encrypted(
&read_file,
file_len,
key_len,
cipher,
tag_file.as_ref(),
inner.active.file_id,
) {
Ok(data) => data,
Err(crate::error::DbError::CorruptedEntry { offset }) => {
tracing::warn!(offset, "hint generation stopped early — skipping hint file");
return Ok(());
}
Err(e) => return Err(e),
}
} else {
match crate::hint::generate_hint_data_dyn(&read_file, file_len, key_len) {
Ok(data) => data,
Err(crate::error::DbError::CorruptedEntry { offset }) => {
tracing::warn!(offset, "hint generation stopped early — skipping hint file");
return Ok(());
}
Err(e) => return Err(e),
}
};
#[cfg(not(feature = "encryption"))]
let hint_data = match crate::hint::generate_hint_data_dyn(&read_file, file_len, key_len) {
Ok(data) => data,
Err(crate::error::DbError::CorruptedEntry { offset }) => {
tracing::warn!(offset, "hint generation stopped early — skipping hint file");
return Ok(());
}
Err(e) => return Err(e),
};
let hint_path = crate::hint::hint_path_for_data(&inner.active.path);
crate::hint::write_hint_file(&hint_path, &hint_data)?;
Ok(())
}
pub fn flush_buf(&self) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
inner.flush_write_buf()
}
pub fn flush_for_replication_catchup(&self) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
inner.flush_write_buf_final()
}
pub fn flush(&self) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
inner.flush_write_buf_final()?;
#[cfg(target_os = "linux")]
inner.uring_writer.fsync()?;
#[cfg(not(target_os = "linux"))]
direct::fsync(&inner.active.file)?;
#[cfg(feature = "encryption")]
if let Some(ref tag_file) = inner.active.tag_file {
tag_file.sync()?;
}
Ok(())
}
}
impl Shard {
pub fn active_file_id(&self) -> u32 {
sync::lock(&self.inner).active.file_id
}
#[cfg(feature = "encryption")]
pub fn cipher(&self) -> Option<Arc<crate::crypto::PageCipher>> {
sync::lock(&self.inner).cipher.clone()
}
#[cfg(feature = "replication")]
pub fn set_replication_producer(
&self,
producer: rtrb::Producer<crate::replication::ReplicationEntry>,
) {
sync::lock(&self.inner).replication_tx = Some(producer);
}
}
impl Shard {
pub(crate) fn set_key_len(&self, key_len: usize) {
sync::lock(&self.inner).key_len = Some(key_len);
}
pub(crate) fn apply_recovery_tail(&self, tail: &crate::recovery::ActiveTail) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
if inner.active.file_id != tail.file_id {
return Err(crate::error::DbError::Client(
"apply_recovery_tail: file_id mismatch",
));
}
#[cfg(feature = "encryption")]
if inner.cipher.is_some() {
const PAGE_SIZE: u64 = 4096;
let new_base = tail.last_valid_offset.div_ceil(PAGE_SIZE) * PAGE_SIZE;
inner.write_buf.reset(new_base);
inner.active.write_offset = new_base;
return Ok(());
}
inner.active.file.set_len(tail.last_valid_offset)?;
inner.write_buf.reset(tail.last_valid_offset);
inner.active.write_offset = tail.last_valid_offset;
Ok(())
}
pub(crate) fn install_dead_bytes(&self, dead_bytes: std::collections::HashMap<u32, u64>) {
let mut inner = sync::lock(&self.inner);
inner.dead_bytes = dead_bytes;
}
}
impl Drop for Shard {
fn drop(&mut self) {
let inner = sync::lock(&self.inner);
let key_len = inner.key_len;
let hints = inner.hints;
drop(inner);
if hints && let Some(kl) = key_len {
if let Err(e) = self.write_active_hint(kl) {
tracing::error!(shard_id = self.id, "failed to write hint on drop: {e}");
if let Err(e2) = self.flush() {
tracing::error!(shard_id = self.id, "fallback flush also failed: {e2}");
}
}
return;
}
if let Err(e) = self.flush() {
tracing::error!(shard_id = self.id, "failed to flush shard on drop: {e}");
}
}
}
impl ShardInner {
pub(crate) fn flush_write_buf(&mut self) -> DbResult<()> {
if self.write_buf.len == 0 {
return Ok(());
}
#[cfg(feature = "encryption")]
if self.cipher.is_some() {
return self.flush_write_buf_encrypted(EncryptedFlushMode::NonForce);
}
self.flush_write_buf_plain()
}
pub(crate) fn flush_write_buf_final(&mut self) -> DbResult<()> {
if self.write_buf.len == 0 {
return Ok(());
}
#[cfg(feature = "encryption")]
if self.cipher.is_some() {
return self.flush_write_buf_encrypted(EncryptedFlushMode::ForceAdvance);
}
self.flush_write_buf_plain()
}
fn flush_write_buf_plain(&mut self) -> DbResult<()> {
let data = self.write_buf.data();
let offset = self.write_buf.base_offset;
#[cfg(target_os = "linux")]
self.uring_writer.write_at(data, offset)?;
#[cfg(not(target_os = "linux"))]
direct::pwrite_at(&self.active.file, data, offset)?;
let flushed = data.len() as u64;
let new_base = offset + flushed;
self.write_buf.reset(new_base);
metrics::counter!("armdb.flush.count").increment(1);
metrics::counter!("armdb.flush.bytes").increment(flushed);
Ok(())
}
#[cfg(feature = "encryption")]
fn flush_write_buf_encrypted(&mut self, mode: EncryptedFlushMode) -> DbResult<()> {
let cipher = self
.cipher
.as_ref()
.expect("caller checked cipher.is_some()");
let data_len = self.write_buf.len;
let complete_bytes = (data_len / 4096) * 4096;
let remainder = data_len % 4096;
let force = mode == EncryptedFlushMode::ForceAdvance;
let (flush_bytes, original_len) = if force && remainder > 0 {
let target = complete_bytes + 4096;
let original = self.write_buf.len;
for i in original..target {
self.write_buf.buf[i] = 0;
}
self.write_buf.len = target;
(target, Some(original))
} else {
if complete_bytes == 0 {
return Ok(());
}
(complete_bytes, None)
};
let num_pages = flush_bytes / 4096;
let base_offset = self.write_buf.base_offset;
let start_page = base_offset / 4096;
let file_id = self.active.file_id;
let mut encrypted = vec![0u8; flush_bytes];
encrypted.copy_from_slice(&self.write_buf.buf[..flush_bytes]);
let mut tag_list = Vec::with_capacity(num_pages);
for i in 0..num_pages {
let page_start = i * 4096;
let page = &mut encrypted[page_start..page_start + 4096];
let tag = cipher.encrypt_page(file_id, start_page + i as u64, page)?;
tag_list.push(tag);
}
#[cfg(target_os = "linux")]
if let Err(e) = self.uring_writer.write_at(&encrypted, base_offset) {
if let Some(orig) = original_len {
for i in orig..self.write_buf.len {
self.write_buf.buf[i] = 0;
}
self.write_buf.len = orig;
}
return Err(e);
}
#[cfg(not(target_os = "linux"))]
if let Err(e) = direct::pwrite_at(&self.active.file, &encrypted, base_offset) {
if let Some(orig) = original_len {
for i in orig..self.write_buf.len {
self.write_buf.buf[i] = 0;
}
self.write_buf.len = orig;
}
return Err(e);
}
if let Some(ref tag_file) = self.active.tag_file
&& let Err(e) = tag_file.write_tags(start_page, &tag_list)
{
if let Some(orig) = original_len {
for i in orig..self.write_buf.len {
self.write_buf.buf[i] = 0;
}
self.write_buf.len = orig;
}
return Err(e);
}
match mode {
EncryptedFlushMode::NonForce => {
self.write_buf.compact(complete_bytes);
}
EncryptedFlushMode::ForceAdvance => {
let new_base = base_offset + flush_bytes as u64;
self.write_buf.reset(new_base);
}
}
metrics::counter!("armdb.flush.count").increment(1);
metrics::counter!("armdb.flush.bytes").increment(flush_bytes as u64);
Ok(())
}
pub fn append_entry(
&mut self,
shard_id: u8,
key: &[u8],
value: &[u8],
tombstone: bool,
) -> DbResult<(DiskLoc, u64)> {
let needed = crate::entry::entry_size(key.len(), value.len() as u32) as usize;
if needed > self.write_buf.capacity() {
return Err(crate::error::DbError::Client(
"entry exceeds write_buffer_size",
));
}
if self.active.write_offset + (needed as u64) > self.max_file_size {
self.rotate(shard_id, key.len())?;
}
let gsn = self.gsn.fetch_add(1, Ordering::Relaxed);
let buf = serialize_entry(gsn, key, value, tombstone);
if self.write_buf.is_full(buf.len()) {
self.flush_write_buf()?;
if self.write_buf.is_full(buf.len()) {
self.rotate(shard_id, key.len())?;
}
}
let entry_offset = self.write_buf.append(&buf);
#[cfg(feature = "replication")]
if let Some(tx) = &mut self.replication_tx {
let _ = tx.push(crate::replication::ReplicationEntry {
data: buf,
key_len: key.len() as u16,
});
}
let header_and_key = size_of::<entry::EntryHeader>() + key.len();
let value_offset = entry_offset + header_and_key as u64;
debug_assert!(value_offset <= u32::MAX as u64);
let loc = DiskLoc::new(
shard_id,
self.active.file_id,
value_offset as u32,
value.len() as u32,
);
self.active.write_offset = self.write_buf.base_offset + self.write_buf.len as u64;
let actual_gsn = if tombstone {
make_tombstone_gsn(gsn)
} else {
gsn
};
Ok((loc, actual_gsn))
}
fn rotate(&mut self, _shard_id: u8, key_len: usize) -> DbResult<()> {
metrics::counter!("armdb.rotation").increment(1);
tracing::debug!(shard_id = _shard_id, "shard file rotation");
let logical_end = self.write_buf.base_offset + self.write_buf.len as u64;
self.flush_write_buf_final()?;
#[cfg(target_os = "linux")]
self.uring_writer.fsync()?;
#[cfg(not(target_os = "linux"))]
direct::fsync(&self.active.file)?;
let new_file_id = self.allocate_file_id()?;
let dir = self
.active
.path
.parent()
.expect("active file has parent dir");
let new_path = dir.join(format!("{new_file_id:06}.data"));
let new_file = direct::open_write(&new_path)?;
let new_read = Arc::new(direct::open_read(&new_path)?);
#[cfg(feature = "encryption")]
let new_tag_file = if self.cipher.is_some() {
Some(Arc::new(TagFile::open_write(&tags::tags_path_for_data(
&new_path,
))?))
} else {
None
};
let old_path = std::mem::replace(&mut self.active.path, new_path.clone());
let old_file = std::mem::replace(&mut self.active.file, new_file);
let old_file_id = std::mem::replace(&mut self.active.file_id, new_file_id);
self.active.write_offset = 0;
self.active.read_file = new_read;
self.write_buf.reset(0);
#[cfg(feature = "encryption")]
let old_tag_file = std::mem::replace(&mut self.active.tag_file, new_tag_file);
#[cfg(feature = "encryption")]
if let Some(ref tf) = old_tag_file {
tf.sync()?;
}
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
self.uring_writer.set_file(self.active.file.as_raw_fd());
}
let imm_file = direct::open_read(&old_path)?;
let file_len = logical_end;
#[cfg(feature = "encryption")]
let imm_tag_file = if self.cipher.is_some() {
let tp = tags::tags_path_for_data(&old_path);
if tp.exists() {
Some(Arc::new(TagFile::open_read(&tp)?))
} else {
None
}
} else {
None
};
if self.hints {
#[cfg(feature = "encryption")]
let hint_data = if let (Some(cipher), Some(tag_file)) = (&self.cipher, &imm_tag_file) {
match crate::hint::generate_hint_data_dyn_encrypted(
&imm_file,
file_len,
key_len,
cipher,
tag_file.as_ref(),
old_file_id,
) {
Ok(data) => data,
Err(crate::error::DbError::CorruptedEntry { offset }) => {
tracing::warn!(
offset,
"hint generation stopped early — skipping hint file"
);
return Ok(());
}
Err(e) => return Err(e),
}
} else {
match crate::hint::generate_hint_data_dyn(&imm_file, file_len, key_len) {
Ok(data) => data,
Err(crate::error::DbError::CorruptedEntry { offset }) => {
tracing::warn!(
offset,
"hint generation stopped early — skipping hint file"
);
return Ok(());
}
Err(e) => return Err(e),
}
};
#[cfg(not(feature = "encryption"))]
let hint_data = match crate::hint::generate_hint_data_dyn(&imm_file, file_len, key_len)
{
Ok(data) => data,
Err(crate::error::DbError::CorruptedEntry { offset }) => {
tracing::warn!(offset, "hint generation stopped early — skipping hint file");
return Ok(());
}
Err(e) => return Err(e),
};
let hint_path = crate::hint::hint_path_for_data(&old_path);
crate::hint::write_hint_file(&hint_path, &hint_data)?;
}
self.immutable.push(std::sync::Arc::new(ImmutableFile {
file: imm_file,
file_id: old_file_id,
#[cfg(feature = "encryption")]
path: old_path,
total_bytes: file_len,
#[cfg(feature = "encryption")]
tag_file: imm_tag_file,
}));
drop(old_file);
#[cfg(feature = "encryption")]
drop(old_tag_file);
Ok(())
}
pub fn add_dead_bytes(&mut self, file_id: u32, size: u64) {
*self.dead_bytes.entry(file_id).or_insert(0) += size;
}
pub(crate) fn allocate_file_id(&mut self) -> DbResult<u32> {
if self.next_file_id == u32::MAX {
return Err(crate::error::DbError::Client("file_id space exhausted"));
}
let id = self.next_file_id;
self.next_file_id += 1;
Ok(id)
}
#[cfg(feature = "replication")]
pub fn append_raw_entry(
&mut self,
shard_id: u8,
key_len: u16,
data: &[u8],
) -> DbResult<(u32, u64)> {
if data.len() > self.write_buf.capacity() {
return Err(crate::error::DbError::Client(
"replicated entry exceeds write_buffer_size",
));
}
if self.active.write_offset + (data.len() as u64) > self.max_file_size {
self.rotate(shard_id, key_len as usize)?;
}
if self.write_buf.is_full(data.len()) {
self.flush_write_buf()?;
if self.write_buf.is_full(data.len()) {
self.rotate(shard_id, key_len as usize)?;
}
}
let file_id = self.active.file_id;
let entry_offset = self.write_buf.append(data);
self.active.write_offset = self.write_buf.base_offset + self.write_buf.len as u64;
Ok((file_id, entry_offset))
}
}
#[cfg(feature = "var-collections")]
struct BlockFileRefs {
active_read: Option<Arc<std::fs::File>>,
immutable_file: Option<Arc<ImmutableFile>>,
immutable_total: u64,
#[cfg(feature = "encryption")]
cipher: Option<Arc<PageCipher>>,
#[cfg(feature = "encryption")]
tag_file: Option<Arc<TagFile>>,
}
#[cfg(feature = "var-collections")]
fn pread_decrypt_block(
_file_id: u32,
block_offset: u64,
refs: &BlockFileRefs,
) -> DbResult<(AlignedBuf, bool)> {
#[allow(unused_mut)]
let (mut buf, _) = if let Some(file) = &refs.active_read {
direct::pread_block(file, block_offset)?
} else if let Some(arc) = &refs.immutable_file {
direct::pread_block(&arc.file, block_offset)?
} else {
unreachable!()
};
let is_full_block = refs.active_read.is_none() && block_offset + 4096 <= refs.immutable_total;
#[cfg(feature = "encryption")]
if let Some(cipher) = &refs.cipher {
let page_number = block_offset / 4096;
match &refs.tag_file {
Some(tf) => {
let tag = tf.read_tag(page_number)?;
cipher.decrypt_page(_file_id, page_number, &mut buf, &tag)?;
}
None => {
if refs.active_read.is_some() {
return Err(crate::error::DbError::EncryptionError(
"no tag file for active encrypted file_id".into(),
));
}
return Err(crate::error::DbError::StaleDiskLoc);
}
}
}
Ok((buf, is_full_block))
}
#[cfg(feature = "var-collections")]
impl ShardInner {
fn resolve_block_refs(&self, file_id: u32) -> DbResult<BlockFileRefs> {
if self.active.file_id == file_id {
Ok(BlockFileRefs {
active_read: Some(self.active.read_file.clone()),
immutable_file: None,
immutable_total: 0,
#[cfg(feature = "encryption")]
cipher: self.cipher.clone(),
#[cfg(feature = "encryption")]
tag_file: self.active.tag_file.clone(),
})
} else {
let arc = self
.immutable
.iter()
.find(|f| f.file_id == file_id)
.ok_or(crate::error::DbError::StaleDiskLoc)?
.clone();
let total = arc.total_bytes;
#[cfg(feature = "encryption")]
let tag_file = arc.tag_file.clone();
Ok(BlockFileRefs {
active_read: None,
immutable_file: Some(arc),
immutable_total: total,
#[cfg(feature = "encryption")]
cipher: self.cipher.clone(),
#[cfg(feature = "encryption")]
tag_file,
})
}
}
pub(crate) fn read_block_locked(
&self,
file_id: u32,
block_offset: u64,
) -> DbResult<(AlignedBuf, bool)> {
let refs = self.resolve_block_refs(file_id)?;
pread_decrypt_block(file_id, block_offset, &refs)
}
pub(crate) fn read_value_from_disk_locked(&self, loc: &DiskLoc) -> DbResult<Vec<u8>> {
let len = loc.len as usize;
let target_file_id = loc.file_id;
if self.active.file_id == target_file_id {
#[cfg(feature = "encryption")]
if let Some(cipher) = &self.cipher {
let tag_file = self.active.tag_file.as_ref().ok_or_else(|| {
crate::error::DbError::EncryptionError(
"tag file missing for active encrypted file".to_string(),
)
})?;
return crate::io::direct::pread_value_encrypted(
&self.active.read_file,
tag_file.as_ref(),
cipher,
target_file_id,
loc.offset as u64,
len,
);
}
return crate::io::direct::pread_value(&self.active.read_file, loc.offset as u64, len);
}
let imm = self
.immutable
.iter()
.find(|f| f.file_id == target_file_id)
.ok_or(crate::error::DbError::StaleDiskLoc)?;
#[cfg(feature = "encryption")]
if let Some(cipher) = &self.cipher {
let tag_file = imm
.tag_file
.as_ref()
.ok_or(crate::error::DbError::StaleDiskLoc)?;
return crate::io::direct::pread_value_encrypted(
&imm.file,
tag_file.as_ref(),
cipher,
target_file_id,
loc.offset as u64,
len,
);
}
crate::io::direct::pread_value(&imm.file, loc.offset as u64, len)
}
}
#[cfg(test)]
#[allow(unused)]
impl Shard {
pub(crate) fn set_next_file_id(&self, id: u32) {
sync::lock(&self.inner).next_file_id = id;
}
pub(crate) fn rotate_active_for_test(&self, key_len: usize) -> DbResult<()> {
sync::lock(&self.inner).rotate(self.id, key_len)
}
}
#[cfg(test)]
#[allow(unused_imports)]
mod tests {
use super::*;
use crate::error::DbError;
use std::sync::atomic::AtomicU64;
use tempfile::tempdir;
#[cfg(feature = "var-collections")]
fn open_test_shard(dir: &std::path::Path) -> Shard {
let gsn = Arc::new(AtomicU64::new(0));
Shard::open(0, dir, 1 << 20, 64 * 1024, false, gsn).expect("open test shard")
}
#[cfg(feature = "var-collections")]
#[test]
fn read_block_returns_stale_for_unknown_file_id() {
let dir = tempdir().unwrap();
let shard = open_test_shard(dir.path());
match shard.read_block(9999, 0) {
Err(DbError::StaleDiskLoc) => {}
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
}
}
#[cfg(all(feature = "encryption", feature = "var-collections"))]
#[test]
fn read_block_returns_stale_when_immutable_missing_under_encryption() {
use crate::crypto::PageCipher;
let dir = tempdir().unwrap();
let gsn = Arc::new(AtomicU64::new(0));
let cipher = Some(Arc::new(
PageCipher::new(&[0x42; 32]).expect("create cipher"),
));
let shard = Shard::open_encrypted(0, dir.path(), 1 << 20, 64 * 1024, false, cipher, gsn)
.expect("open encrypted shard");
match shard.read_block(42, 0) {
Err(DbError::StaleDiskLoc) => {}
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
}
}
#[cfg(feature = "var-collections")]
#[test]
fn read_value_from_disk_locked_returns_stale_for_unknown_file_id() {
let dir = tempdir().unwrap();
let shard = open_test_shard(dir.path());
let fake = DiskLoc::new(0, 9999, 0, 0);
let inner = shard.lock();
match inner.read_value_from_disk_locked(&fake) {
Err(DbError::StaleDiskLoc) => {}
Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[cfg(feature = "var-collections")]
#[test]
fn read_block_with_file_id_above_u16() {
let dir = tempdir().unwrap();
let shard = open_test_shard(dir.path());
shard.set_next_file_id(70_000);
shard.rotate_active_for_test(3).expect("first rotate");
assert!(
shard.active_file_id() >= 70_000,
"active_file_id should be >= 70_000 after rotation"
);
let key = b"abc";
let value = vec![0xABu8; 256];
let (disk, _gsn) = shard
.lock()
.append_entry(0, key, &value, false)
.expect("append entry");
assert!(
disk.file_id > u16::MAX as u32,
"DiskLoc.file_id must be above u16::MAX"
);
shard.flush().expect("flush");
shard.rotate_active_for_test(3).expect("second rotate");
let block_offset = disk.offset as u64 & !4095;
let (block, _) = shard
.read_block(disk.file_id, block_offset)
.expect("read_block");
let start = (disk.offset & 4095) as usize;
let end = start + value.len();
assert_eq!(&block[start..end], &value[..]);
}
#[cfg(feature = "var-collections")]
#[test]
fn write_buffer_read_with_file_id_above_u16() {
let dir = tempdir().unwrap();
let shard = open_test_shard(dir.path());
shard.set_next_file_id(70_000);
shard.rotate_active_for_test(1).expect("rotate");
assert!(shard.active_file_id() >= 70_000);
let value = vec![0x5Au8; 200];
let (disk, _gsn) = shard
.lock()
.append_entry(0, b"k", &value, false)
.expect("append");
assert!(
disk.file_id > u16::MAX as u32,
"DiskLoc.file_id must be above u16::MAX"
);
let inner = shard.lock();
let bytes = inner
.write_buf
.read(disk.offset as u64, disk.len as usize)
.expect("write-buf read must succeed for unflushed entry");
assert_eq!(bytes, &value[..]);
}
#[test]
fn rotate_errors_when_file_id_exhausted() {
let tmp = tempdir().unwrap();
let gsn = Arc::new(AtomicU64::new(1));
let shard = Shard::open(0, tmp.path(), 16 * 4096, 8192, false, gsn).unwrap();
shard.set_next_file_id(u32::MAX);
let res = shard.rotate_active_for_test(4);
match res {
Err(DbError::Client(msg)) => {
assert!(
msg.contains("file_id"),
"expected file_id error, got: {msg}"
);
}
other => panic!("expected DbError::Client, got {other:?}"),
}
}
#[test]
fn open_errors_when_active_id_is_max() {
let tmp = tempdir().unwrap();
let path = tmp.path().join(format!("{}.data", u32::MAX));
std::fs::write(&path, b"").unwrap();
let gsn = Arc::new(AtomicU64::new(1));
let res = Shard::open(0, tmp.path(), 16 * 4096, 8192, false, gsn);
match res {
Err(DbError::Client(msg)) => {
assert!(
msg.contains("file_id"),
"expected file_id error, got: {msg}"
);
}
Ok(_) => panic!("expected DbError::Client, got Ok"),
Err(e) => panic!("expected DbError::Client, got Err({e})"),
}
}
#[cfg(all(feature = "encryption", feature = "var-collections"))]
#[test]
fn flush_for_replication_catchup_pads_encrypted_trailing_page() {
use crate::crypto::PageCipher;
let dir = tempdir().unwrap();
let gsn = Arc::new(AtomicU64::new(0));
let cipher = Some(Arc::new(
PageCipher::new(&[0xAB; 32]).expect("create cipher"),
));
let shard = Shard::open_encrypted(0, dir.path(), 1 << 20, 64 * 1024, false, cipher, gsn)
.expect("open encrypted shard");
{
let mut inner = sync::lock(&shard.inner);
inner
.append_entry(0, b"key", b"value", false)
.expect("append_entry");
}
let data_path = dir.path().join("000001.data");
shard.flush_buf().expect("flush_buf");
let len_after_nonforce = std::fs::metadata(&data_path).expect("metadata").len();
assert_eq!(
len_after_nonforce, 0,
"NonForce encrypted flush must leave partial page in buffer (no whole page yet), got {len_after_nonforce}"
);
shard
.flush_for_replication_catchup()
.expect("flush_for_replication_catchup");
let len_after_force = std::fs::metadata(&data_path).expect("metadata").len();
assert!(
len_after_force > 0,
"ForceAdvance must flush the padded page to disk, got 0 bytes"
);
assert_eq!(
len_after_force % 4096,
0,
"encrypted file length must be 4096-aligned after ForceAdvance, got {len_after_force}"
);
assert_eq!(
len_after_force, 4096,
"expected exactly one 4096-byte padded page, got {len_after_force}"
);
}
#[cfg(feature = "var-collections")]
#[test]
fn flush_for_replication_catchup_plain_mode_flushes() {
let dir = tempdir().unwrap();
let shard = open_test_shard(dir.path());
let expected_entry_size = crate::entry::entry_size(b"k".len(), b"v".len() as u32);
assert_eq!(expected_entry_size, 24);
{
let mut inner = sync::lock(&shard.inner);
inner
.append_entry(0, b"k", b"v", false)
.expect("append_entry");
}
shard
.flush_for_replication_catchup()
.expect("flush_for_replication_catchup plain");
let data_path = dir.path().join("000001.data");
let file_len = std::fs::metadata(&data_path).expect("metadata").len();
assert_eq!(
file_len, expected_entry_size,
"plain mode must write exactly entry bytes (no padding); expected {expected_entry_size}, got {file_len}"
);
}
}
#[cfg(test)]
mod append_offset_tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
#[test]
fn rotate_before_append_at_u32_max_boundary() {
let tmp = tempfile::tempdir().unwrap();
let gsn = Arc::new(AtomicU64::new(1));
let shard = Shard::open(0, tmp.path(), u32::MAX as u64, 8192, false, gsn).unwrap();
{
let mut inner = sync::lock(&shard.inner);
inner.active.write_offset = u32::MAX as u64 - 100;
inner.write_buf.reset(u32::MAX as u64 - 100);
}
let key = b"k";
let value = vec![0u8; 200];
let (loc, _gsn) = {
let mut inner = sync::lock(&shard.inner);
inner.append_entry(0, key, &value, false).unwrap()
};
assert_eq!(loc.file_id, 2, "expected rotation to new file");
assert!(loc.offset < 4096, "expected offset near start of new file");
}
}