use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::sync::{self, Mutex, MutexGuard};
use crate::disk_loc::DiskLoc;
use crate::entry::{self, make_tombstone_gsn, serialize_entry};
use crate::error::DbResult;
use crate::io::aligned_buf::AlignedBuf;
use crate::io::direct;
#[cfg(feature = "encryption")]
use crate::crypto::PageCipher;
#[cfg(feature = "encryption")]
use crate::io::tags::{self, TagFile};
pub(crate) struct WriteBuffer {
buf: AlignedBuf,
len: usize,
base_offset: u64,
}
impl WriteBuffer {
fn new(capacity: usize, base_offset: u64) -> Self {
Self {
buf: AlignedBuf::zeroed(capacity),
len: 0,
base_offset,
}
}
fn append(&mut self, data: &[u8]) -> u64 {
let offset = self.base_offset + self.len as u64;
self.buf[self.len..self.len + data.len()].copy_from_slice(data);
self.len += data.len();
offset
}
pub(crate) fn read(&self, file_offset: u64, len: usize) -> Option<&[u8]> {
if file_offset >= self.base_offset {
let start = (file_offset - self.base_offset) as usize;
if start + len <= self.len {
return Some(&self.buf[start..start + len]);
}
}
None
}
fn is_full(&self, needed: usize) -> bool {
self.buf.len() - self.len < needed
}
fn data(&self) -> &[u8] {
&self.buf[..self.len]
}
fn reset(&mut self, new_base: u64) {
self.len = 0;
self.base_offset = new_base;
}
#[cfg(feature = "encryption")]
fn compact(&mut self, flushed_bytes: usize) {
let remainder = self.len - flushed_bytes;
if remainder > 0 {
self.buf.copy_within(flushed_bytes..self.len, 0);
}
self.base_offset += flushed_bytes as u64;
self.len = remainder;
}
}
pub struct Shard {
pub id: u8,
dir: PathBuf,
gsn: Arc<AtomicU64>,
inner: Mutex<ShardInner>,
}
pub struct ShardInner {
pub(crate) active: ActiveFile,
pub(crate) write_buf: WriteBuffer,
pub(crate) immutable: Vec<std::sync::Arc<ImmutableFile>>,
pub(crate) dead_bytes: std::collections::HashMap<u32, u64>,
pub(crate) key_len: Option<usize>,
pub(crate) hints: bool,
pub(crate) next_file_id: u32,
max_file_size: u64,
gsn: Arc<AtomicU64>,
#[cfg(target_os = "linux")]
uring_writer: crate::io::uring::UringWriter,
#[cfg(feature = "encryption")]
pub(crate) cipher: Option<Arc<PageCipher>>,
#[cfg(feature = "replication")]
pub(crate) replication_tx: Option<rtrb::Producer<crate::replication::ReplicationEntry>>,
}
pub(crate) struct ActiveFile {
pub(crate) file: std::fs::File,
pub(crate) read_file: Arc<std::fs::File>,
pub(crate) file_id: u32,
pub(crate) write_offset: u64,
pub(crate) path: PathBuf,
#[cfg(feature = "encryption")]
pub(crate) tag_file: Option<TagFile>,
}
pub(crate) struct ImmutableFile {
pub(crate) file: std::fs::File,
pub(crate) file_id: u32,
#[cfg(feature = "encryption")]
pub(crate) path: PathBuf,
pub(crate) total_bytes: u64,
#[cfg(feature = "encryption")]
pub(crate) tag_file: Option<TagFile>,
}
impl Shard {
pub fn open(
id: u8,
dir: &Path,
max_file_size: u64,
write_buffer_size: usize,
hints: bool,
gsn: Arc<AtomicU64>,
) -> DbResult<Self> {
Self::open_inner(
id,
dir,
max_file_size,
write_buffer_size,
hints,
#[cfg(feature = "encryption")]
None,
gsn,
)
}
#[cfg(feature = "encryption")]
pub fn open_encrypted(
id: u8,
dir: &Path,
max_file_size: u64,
write_buffer_size: usize,
hints: bool,
cipher: Option<Arc<PageCipher>>,
gsn: Arc<AtomicU64>,
) -> DbResult<Self> {
Self::open_inner(
id,
dir,
max_file_size,
write_buffer_size,
hints,
cipher,
gsn,
)
}
fn open_inner(
id: u8,
dir: &Path,
max_file_size: u64,
write_buffer_size: usize,
hints: bool,
#[cfg(feature = "encryption")] cipher: Option<Arc<PageCipher>>,
gsn: Arc<AtomicU64>,
) -> DbResult<Self> {
fs::create_dir_all(dir)?;
let mut file_ids: Vec<u32> = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let name = entry.file_name();
let name = name.to_string_lossy();
if name.ends_with(".data")
&& let Ok(id) = name.trim_end_matches(".data").parse::<u32>()
{
file_ids.push(id);
}
}
file_ids.sort();
let mut immutable = Vec::new();
#[cfg(feature = "encryption")]
let has_cipher = cipher.is_some();
if file_ids.is_empty() {
let file_id = 1u32;
let path = dir.join(format!("{file_id:06}.data"));
let file = direct::open_write(&path)?;
let read_file = Arc::new(direct::open_read(&path)?);
#[cfg(feature = "encryption")]
let tag_file = if has_cipher {
Some(TagFile::open_write(&tags::tags_path_for_data(&path))?)
} else {
None
};
let active = ActiveFile {
file,
read_file,
file_id,
write_offset: 0,
path,
#[cfg(feature = "encryption")]
tag_file,
};
#[cfg(target_os = "linux")]
let mut uring_writer = crate::io::uring::UringWriter::new()?;
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
uring_writer.set_file(active.file.as_raw_fd());
}
return Ok(Self {
id,
dir: dir.to_path_buf(),
gsn: gsn.clone(),
inner: Mutex::new(ShardInner {
active,
write_buf: WriteBuffer::new(write_buffer_size, 0),
immutable,
dead_bytes: std::collections::HashMap::new(),
key_len: None,
hints,
next_file_id: 2,
max_file_size,
gsn,
#[cfg(target_os = "linux")]
uring_writer,
#[cfg(feature = "encryption")]
cipher,
#[cfg(feature = "replication")]
replication_tx: None,
}),
});
}
let active_id = file_ids.pop().expect("file_ids is not empty");
for &fid in &file_ids {
let path = dir.join(format!("{fid:06}.data"));
let file = direct::open_read(&path)?;
let total_bytes = file.metadata()?.len();
#[cfg(feature = "encryption")]
let tag_file = if has_cipher {
let tp = tags::tags_path_for_data(&path);
if tp.exists() {
Some(TagFile::open_read(&tp)?)
} else {
None
}
} else {
None
};
immutable.push(std::sync::Arc::new(ImmutableFile {
file,
file_id: fid,
#[cfg(feature = "encryption")]
path,
total_bytes,
#[cfg(feature = "encryption")]
tag_file,
}));
}
let active_path = dir.join(format!("{active_id:06}.data"));
let active_file = direct::open_write(&active_path)?;
let write_offset = active_file.metadata()?.len();
let active_read = Arc::new(direct::open_read(&active_path)?);
#[cfg(feature = "encryption")]
let tag_file = if has_cipher {
Some(TagFile::open_write(&tags::tags_path_for_data(
&active_path,
))?)
} else {
None
};
let active = ActiveFile {
file: active_file,
read_file: active_read,
file_id: active_id,
write_offset,
path: active_path,
#[cfg(feature = "encryption")]
tag_file,
};
#[cfg(target_os = "linux")]
let mut uring_writer = crate::io::uring::UringWriter::new()?;
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
uring_writer.set_file(active.file.as_raw_fd());
}
Ok(Self {
id,
dir: dir.to_path_buf(),
gsn: gsn.clone(),
inner: Mutex::new(ShardInner {
active,
write_buf: WriteBuffer::new(write_buffer_size, write_offset),
immutable,
dead_bytes: std::collections::HashMap::new(),
key_len: None,
hints,
next_file_id: active_id + 1,
max_file_size,
gsn,
#[cfg(target_os = "linux")]
uring_writer,
#[cfg(feature = "encryption")]
cipher,
#[cfg(feature = "replication")]
replication_tx: None,
}),
})
}
pub fn gsn(&self) -> &AtomicU64 {
&self.gsn
}
pub fn lock(&self) -> MutexGuard<'_, ShardInner> {
sync::lock(&self.inner)
}
pub fn read_value(&self, loc: &DiskLoc) -> DbResult<Vec<u8>> {
let (active_file, immutable_arc) = {
let inner = sync::lock(&self.inner);
if inner.active.file_id == loc.file_id as u32 {
if let Some(bytes) = inner.write_buf.read(loc.offset as u64, loc.len as usize) {
return Ok(bytes.to_vec());
}
(Some(inner.active.read_file.clone()), None)
} else {
let arc = inner
.immutable
.iter()
.find(|f| f.file_id == loc.file_id as u32)
.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "data file not found")
})?
.clone();
(None, Some(arc))
}
};
if let Some(file) = active_file {
direct::pread_value(&file, loc.offset as u64, loc.len as usize)
} else if let Some(arc) = immutable_arc {
direct::pread_value(&arc.file, loc.offset as u64, loc.len as usize)
} else {
unreachable!()
}
}
#[cfg(feature = "var-collections")]
pub fn read_block(
&self,
file_id: u32,
block_offset: u64,
) -> DbResult<(crate::io::aligned_buf::AlignedBuf, bool)> {
#[cfg(feature = "encryption")]
let cipher_ref;
let (active_read, immutable_arc, immutable_total) = {
let inner = sync::lock(&self.inner);
#[cfg(feature = "encryption")]
{
cipher_ref = inner.cipher.clone();
}
if inner.active.file_id == file_id {
(Some(inner.active.read_file.clone()), None, 0)
} else {
let arc = inner
.immutable
.iter()
.find(|f| f.file_id == file_id)
.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "data file not found")
})?
.clone();
let total = arc.total_bytes;
(None, Some(arc), total)
}
};
#[allow(unused_mut)]
let (mut buf, _) = if let Some(file) = &active_read {
direct::pread_block(file, block_offset)?
} else if let Some(arc) = &immutable_arc {
direct::pread_block(&arc.file, block_offset)?
} else {
unreachable!()
};
let is_full_block = active_read.is_none() && block_offset + 4096 <= immutable_total;
#[cfg(feature = "encryption")]
if let Some(cipher) = &cipher_ref {
let page_number = block_offset / 4096;
let inner = sync::lock(&self.inner);
let tag_file = if inner.active.file_id == file_id {
inner.active.tag_file.as_ref()
} else {
inner
.immutable
.iter()
.find(|f| f.file_id == file_id)
.and_then(|f| f.tag_file.as_ref())
};
if let Some(tf) = tag_file {
let tag = tf.read_tag(page_number)?;
cipher.decrypt_page(file_id, page_number, &mut buf, &tag)?;
}
}
Ok((buf, is_full_block))
}
pub fn dir(&self) -> &Path {
&self.dir
}
pub fn file_ids(&self) -> Vec<u32> {
let inner = sync::lock(&self.inner);
let mut ids: Vec<u32> = inner.immutable.iter().map(|f| f.file_id).collect();
ids.push(inner.active.file_id);
ids
}
pub fn file_sizes(&self) -> Vec<(u32, u64)> {
let inner = sync::lock(&self.inner);
let mut result: Vec<(u32, u64)> = inner
.immutable
.iter()
.map(|f| (f.file_id, f.total_bytes))
.collect();
result.push((inner.active.file_id, inner.active.write_offset));
result
}
pub fn write_active_hint(&self, key_len: usize) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
if inner.active.write_offset == 0 {
return Ok(()); }
inner.flush_write_buf_final()?;
#[cfg(target_os = "linux")]
inner.uring_writer.fsync()?;
#[cfg(not(target_os = "linux"))]
direct::fsync(&inner.active.file)?;
#[cfg(feature = "encryption")]
if let Some(ref tag_file) = inner.active.tag_file {
tag_file.sync()?;
}
let read_file = direct::open_read(&inner.active.path)?;
let file_len = inner.active.write_offset;
#[cfg(feature = "encryption")]
let hint_data =
if let (Some(cipher), Some(tag_file)) = (&inner.cipher, &inner.active.tag_file) {
crate::hint::generate_hint_data_dyn_encrypted(
&read_file,
file_len,
key_len,
cipher,
tag_file,
inner.active.file_id,
)?
} else {
crate::hint::generate_hint_data_dyn(&read_file, file_len, key_len)?
};
#[cfg(not(feature = "encryption"))]
let hint_data = crate::hint::generate_hint_data_dyn(&read_file, file_len, key_len)?;
let hint_path = crate::hint::hint_path_for_data(&inner.active.path);
crate::hint::write_hint_file(&hint_path, &hint_data)?;
Ok(())
}
pub fn flush_buf(&self) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
inner.flush_write_buf()
}
pub fn flush(&self) -> DbResult<()> {
let mut inner = sync::lock(&self.inner);
inner.flush_write_buf()?;
#[cfg(target_os = "linux")]
inner.uring_writer.fsync()?;
#[cfg(not(target_os = "linux"))]
direct::fsync(&inner.active.file)?;
#[cfg(feature = "encryption")]
if let Some(ref tag_file) = inner.active.tag_file {
tag_file.sync()?;
}
Ok(())
}
}
impl Shard {
pub fn active_file_id(&self) -> u32 {
sync::lock(&self.inner).active.file_id
}
#[cfg(feature = "replication")]
pub fn set_replication_producer(
&self,
producer: rtrb::Producer<crate::replication::ReplicationEntry>,
) {
sync::lock(&self.inner).replication_tx = Some(producer);
}
}
impl Shard {
pub(crate) fn set_key_len(&self, key_len: usize) {
sync::lock(&self.inner).key_len = Some(key_len);
}
}
impl Drop for Shard {
fn drop(&mut self) {
let inner = sync::lock(&self.inner);
let key_len = inner.key_len;
let hints = inner.hints;
drop(inner);
if hints && let Some(kl) = key_len {
if let Err(e) = self.write_active_hint(kl) {
tracing::error!(shard_id = self.id, "failed to write hint on drop: {e}");
if let Err(e2) = self.flush() {
tracing::error!(shard_id = self.id, "fallback flush also failed: {e2}");
}
}
return;
}
if let Err(e) = self.flush() {
tracing::error!(shard_id = self.id, "failed to flush shard on drop: {e}");
}
}
}
impl ShardInner {
pub(crate) fn flush_write_buf(&mut self) -> DbResult<()> {
if self.write_buf.len == 0 {
return Ok(());
}
#[cfg(feature = "encryption")]
if self.cipher.is_some() {
return self.flush_write_buf_encrypted(false);
}
self.flush_write_buf_plain()
}
pub(crate) fn flush_write_buf_final(&mut self) -> DbResult<()> {
if self.write_buf.len == 0 {
return Ok(());
}
#[cfg(feature = "encryption")]
if self.cipher.is_some() {
return self.flush_write_buf_encrypted(true);
}
self.flush_write_buf_plain()
}
fn flush_write_buf_plain(&mut self) -> DbResult<()> {
let data = self.write_buf.data();
let offset = self.write_buf.base_offset;
#[cfg(target_os = "linux")]
self.uring_writer.write_at(data, offset)?;
#[cfg(not(target_os = "linux"))]
direct::pwrite_at(&self.active.file, data, offset)?;
let flushed = data.len() as u64;
let new_base = offset + flushed;
self.write_buf.reset(new_base);
metrics::counter!("armdb.flush.count").increment(1);
metrics::counter!("armdb.flush.bytes").increment(flushed);
Ok(())
}
#[cfg(feature = "encryption")]
fn flush_write_buf_encrypted(&mut self, force: bool) -> DbResult<()> {
let cipher = self
.cipher
.as_ref()
.expect("caller checked cipher.is_some()");
let data_len = self.write_buf.len;
let complete_bytes = (data_len / 4096) * 4096;
let remainder = data_len % 4096;
let flush_bytes = if force && remainder > 0 {
let target = complete_bytes + 4096;
for i in self.write_buf.len..target {
self.write_buf.buf[i] = 0;
}
self.write_buf.len = target;
target
} else {
if complete_bytes == 0 {
return Ok(()); }
complete_bytes
};
let num_pages = flush_bytes / 4096;
let base_offset = self.write_buf.base_offset;
let start_page = base_offset / 4096;
let file_id = self.active.file_id;
let mut encrypted = vec![0u8; flush_bytes];
encrypted.copy_from_slice(&self.write_buf.buf[..flush_bytes]);
let mut tag_list = Vec::with_capacity(num_pages);
for i in 0..num_pages {
let page_start = i * 4096;
let page = &mut encrypted[page_start..page_start + 4096];
let tag = cipher.encrypt_page(file_id, start_page + i as u64, page)?;
tag_list.push(tag);
}
#[cfg(target_os = "linux")]
self.uring_writer.write_at(&encrypted, base_offset)?;
#[cfg(not(target_os = "linux"))]
direct::pwrite_at(&self.active.file, &encrypted, base_offset)?;
if let Some(ref tag_file) = self.active.tag_file {
tag_file.write_tags(start_page, &tag_list)?;
}
if force {
let new_base = base_offset + flush_bytes as u64;
self.write_buf.reset(new_base);
} else {
self.write_buf.compact(complete_bytes);
}
metrics::counter!("armdb.flush.count").increment(1);
metrics::counter!("armdb.flush.bytes").increment(flush_bytes as u64);
Ok(())
}
pub fn append_entry(
&mut self,
shard_id: u8,
key: &[u8],
value: &[u8],
tombstone: bool,
) -> DbResult<(DiskLoc, u64)> {
let gsn = self.gsn.fetch_add(1, Ordering::Relaxed);
let buf = serialize_entry(gsn, key, value, tombstone);
if self.write_buf.is_full(buf.len()) {
self.flush_write_buf()?;
}
let entry_offset = self.write_buf.append(&buf);
#[cfg(feature = "replication")]
if let Some(tx) = &mut self.replication_tx {
let _ = tx.push(crate::replication::ReplicationEntry {
data: buf,
key_len: key.len() as u16,
});
}
let header_and_key = size_of::<entry::EntryHeader>() + key.len();
let value_offset = entry_offset + header_and_key as u64;
let loc = DiskLoc::new(
shard_id,
self.active.file_id as u16,
value_offset as u32,
value.len() as u32,
);
self.active.write_offset = self.write_buf.base_offset + self.write_buf.len as u64;
if self.active.write_offset >= self.max_file_size {
self.rotate(shard_id, key.len())?;
}
let actual_gsn = if tombstone {
make_tombstone_gsn(gsn)
} else {
gsn
};
Ok((loc, actual_gsn))
}
fn rotate(&mut self, _shard_id: u8, key_len: usize) -> DbResult<()> {
metrics::counter!("armdb.rotation").increment(1);
tracing::debug!(shard_id = _shard_id, "shard file rotation");
self.flush_write_buf_final()?;
#[cfg(target_os = "linux")]
self.uring_writer.fsync()?;
#[cfg(not(target_os = "linux"))]
direct::fsync(&self.active.file)?;
let new_file_id = self.next_file_id;
self.next_file_id += 1;
let dir = self
.active
.path
.parent()
.expect("active file has parent dir");
let new_path = dir.join(format!("{new_file_id:06}.data"));
let new_file = direct::open_write(&new_path)?;
let new_read = Arc::new(direct::open_read(&new_path)?);
#[cfg(feature = "encryption")]
let new_tag_file = if self.cipher.is_some() {
Some(TagFile::open_write(&tags::tags_path_for_data(&new_path))?)
} else {
None
};
let old_path = std::mem::replace(&mut self.active.path, new_path.clone());
let old_file = std::mem::replace(&mut self.active.file, new_file);
let old_file_id = std::mem::replace(&mut self.active.file_id, new_file_id);
self.active.write_offset = 0;
self.active.read_file = new_read;
self.write_buf.reset(0);
#[cfg(feature = "encryption")]
let old_tag_file = std::mem::replace(&mut self.active.tag_file, new_tag_file);
#[cfg(feature = "encryption")]
if let Some(ref tf) = old_tag_file {
tf.sync()?;
}
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
self.uring_writer.set_file(self.active.file.as_raw_fd());
}
let imm_file = direct::open_read(&old_path)?;
let file_len = imm_file.metadata()?.len();
#[cfg(feature = "encryption")]
let imm_tag_file = if self.cipher.is_some() {
let tp = tags::tags_path_for_data(&old_path);
if tp.exists() {
Some(TagFile::open_read(&tp)?)
} else {
None
}
} else {
None
};
if self.hints {
#[cfg(feature = "encryption")]
let hint_data = if let (Some(cipher), Some(tag_file)) = (&self.cipher, &imm_tag_file) {
crate::hint::generate_hint_data_dyn_encrypted(
&imm_file,
file_len,
key_len,
cipher,
tag_file,
old_file_id,
)?
} else {
crate::hint::generate_hint_data_dyn(&imm_file, file_len, key_len)?
};
#[cfg(not(feature = "encryption"))]
let hint_data = crate::hint::generate_hint_data_dyn(&imm_file, file_len, key_len)?;
let hint_path = crate::hint::hint_path_for_data(&old_path);
crate::hint::write_hint_file(&hint_path, &hint_data)?;
}
self.immutable.push(std::sync::Arc::new(ImmutableFile {
file: imm_file,
file_id: old_file_id,
#[cfg(feature = "encryption")]
path: old_path,
total_bytes: file_len,
#[cfg(feature = "encryption")]
tag_file: imm_tag_file,
}));
drop(old_file);
#[cfg(feature = "encryption")]
drop(old_tag_file);
Ok(())
}
pub fn add_dead_bytes(&mut self, file_id: u32, size: u64) {
*self.dead_bytes.entry(file_id).or_insert(0) += size;
}
#[cfg(feature = "replication")]
pub fn append_raw_entry(&mut self, shard_id: u8, data: &[u8]) -> DbResult<u64> {
if self.write_buf.is_full(data.len()) {
self.flush_write_buf()?;
}
let entry_offset = self.write_buf.append(data);
self.active.write_offset = self.write_buf.base_offset + self.write_buf.len as u64;
if self.active.write_offset >= self.max_file_size {
self.rotate(shard_id, 0)?;
}
Ok(entry_offset)
}
}