use std::{
collections::{BTreeMap, HashMap},
fs::{self, File, OpenOptions},
io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use fs2::FileExt;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Invalid log file name '{filename}'")]
InvalidLogFileName { filename: String },
#[error("Failed to parse timestamp '{value}'")]
TimestampParse {
value: String,
#[source]
source: std::num::ParseIntError,
},
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Only one writer allowed at a time")]
WriterLock,
#[error("Key not found")]
KeyNotFound,
#[error("File {0} not found")]
FileNotFound(String),
#[error("Value size must be greater than 0")]
InvalidEmptyValue,
#[error("Key size must be greater than 0")]
InvalidEmptyKey,
#[error("Timestamp error: {0}")]
TimestampError(#[from] std::time::SystemTimeError),
#[error("Timestamp overflow, converting to u64: {0}")]
TimestampOverflow(#[from] std::num::TryFromIntError),
#[error("Active file not found in non empty path")]
ActiveFileNotFound,
}
const FILE_LOCK_PATH: &str = "db.lock";
pub const MAX_ACTIVE_FILE_SIZE: u64 = 4 * 1024 * 1024;
#[derive(Debug)]
pub struct Bitask {
path: PathBuf,
_file_lock: File,
writer_id: u64,
writer: BufWriter<File>,
readers: HashMap<u64, BufReader<File>>,
keydir: BTreeMap<Vec<u8>, KeyDirEntry>,
}
#[derive(Debug)]
struct KeyDirEntry {
file_id: u64,
value_size: u32,
value_position: u64,
timestamp: u64,
}
impl Bitask {
pub fn open(path: impl AsRef<Path>) -> Result<Self, Error> {
fs::create_dir_all(&path)?;
let lock_path = path.as_ref().join(FILE_LOCK_PATH);
let lock_file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.append(false)
.open(lock_path)?;
lock_file
.try_lock_exclusive()
.map_err(|_| Error::WriterLock)?;
let is_empty = match fs::read_dir(&path)?.next() {
None => true,
Some(Ok(entry)) if entry.file_name() == FILE_LOCK_PATH => {
fs::read_dir(&path)?.nth(1).is_none()
}
Some(_) => false,
};
if is_empty {
Self::open_new(path, lock_file)
} else {
Self::open_existing(path, lock_file)
}
}
fn open_new(path: impl AsRef<Path>, lock_file: File) -> Result<Self, Error> {
let timestamp = timestamp_as_u64()?;
let writer_file = OpenOptions::new()
.create(true)
.read(true)
.truncate(false)
.append(true)
.open(file_active_log_path(path.as_ref(), timestamp))?;
let reader_file = OpenOptions::new()
.create(true)
.read(true)
.truncate(false)
.append(true)
.open(file_active_log_path(path.as_ref(), timestamp))?;
let writer = BufWriter::new(writer_file);
let mut readers = HashMap::new();
let reader = BufReader::new(reader_file);
readers.insert(timestamp, reader);
Ok(Self {
path: path.as_ref().to_path_buf(),
_file_lock: lock_file,
writer_id: timestamp,
writer,
readers,
keydir: BTreeMap::new(),
})
}
fn open_existing(path: impl AsRef<Path>, lock_file: File) -> Result<Self, Error> {
let mut active_timestamp = None;
let mut active_file = None;
let mut files: BTreeMap<u64, PathBuf> = BTreeMap::new();
for entry in fs::read_dir(&path)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().to_string();
if name == FILE_LOCK_PATH {
continue;
}
let timestamp = name
.split('.')
.next()
.ok_or_else(|| Error::InvalidLogFileName {
filename: name.to_string(),
})?
.parse()
.map_err(|e| Error::TimestampParse {
value: name.to_string(),
source: e,
})?;
if name.ends_with(".active.log") {
active_file = Some(entry.path());
active_timestamp = Some(timestamp);
} else if name.ends_with(".log") {
files.insert(timestamp, entry.path());
}
}
let active_timestamp = active_timestamp.ok_or(Error::ActiveFileNotFound)?;
let writer = {
let active_file = active_file.clone().ok_or(Error::ActiveFileNotFound)?;
let writer_file = OpenOptions::new()
.create(true)
.read(true)
.truncate(false)
.append(true)
.open(active_file)?;
BufWriter::new(writer_file)
};
let mut reader = {
let active_file = active_file.ok_or(Error::ActiveFileNotFound)?;
let reader_file = OpenOptions::new()
.create(true)
.read(true)
.truncate(false)
.append(true)
.open(active_file)?;
BufReader::new(reader_file)
};
let keydir = Self::rebuild_keydir(&mut reader, active_timestamp)?;
let mut readers = HashMap::new();
readers.insert(active_timestamp, reader);
Ok(Self {
path: path.as_ref().to_path_buf(),
_file_lock: lock_file,
writer_id: active_timestamp,
writer,
readers,
keydir,
})
}
fn rebuild_keydir(
reader: &mut BufReader<File>,
file_id: u64,
) -> Result<BTreeMap<Vec<u8>, KeyDirEntry>, Error> {
let mut keydir: BTreeMap<Vec<u8>, KeyDirEntry> = BTreeMap::new();
let mut position = 0u64;
loop {
let mut header_buf = vec![0u8; CommandHeader::SIZE];
match reader.read_exact(&mut header_buf) {
Ok(_) => (),
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let header = CommandHeader::deserialize(&header_buf)?;
let mut key = vec![0u8; header.key_len as usize];
reader.read_exact(&mut key)?;
reader.seek(SeekFrom::Current(header.value_size as i64))?;
if header.value_size == 0 {
keydir.remove(&key);
} else {
match keydir.get(&key) {
Some(existing) if existing.timestamp >= header.timestamp => {
continue;
}
_ => {
let value_position =
position + CommandHeader::SIZE as u64 + header.key_len as u64;
keydir.insert(
key,
KeyDirEntry {
file_id,
value_size: header.value_size,
value_position,
timestamp: header.timestamp,
},
);
}
}
}
position +=
CommandHeader::SIZE as u64 + header.key_len as u64 + header.value_size as u64;
}
Ok(keydir)
}
fn rotate_active_file(&mut self) -> Result<(), Error> {
let timestamp = timestamp_as_u64()?;
let old_path = file_active_log_path(&self.path, self.writer_id);
let new_path = file_log_path(&self.path, self.writer_id);
fs::rename(old_path, new_path)?;
let writer_file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(file_active_log_path(&self.path, timestamp))?;
let reader_file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(file_active_log_path(&self.path, timestamp))?;
self.writer = BufWriter::new(writer_file);
self.readers.insert(timestamp, BufReader::new(reader_file));
self.writer_id = timestamp;
Ok(())
}
pub fn ask(&mut self, key: &[u8]) -> Result<Vec<u8>, Error> {
if key.is_empty() {
return Err(Error::InvalidEmptyKey);
}
if let Some(entry) = self.keydir.get(key) {
if let std::collections::hash_map::Entry::Vacant(e) = self.readers.entry(entry.file_id)
{
let file = OpenOptions::new()
.read(true)
.open(file_log_path(&self.path, entry.file_id))?;
e.insert(BufReader::new(file));
}
let reader = self
.readers
.get_mut(&entry.file_id)
.ok_or(Error::FileNotFound(format!("{}", entry.file_id)))?;
reader.seek(SeekFrom::Start(entry.value_position))?;
let mut value = vec![0; entry.value_size as usize]; reader.read_exact(&mut value)?;
return Ok(value);
}
Err(Error::KeyNotFound)
}
pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), Error> {
if key.is_empty() {
return Err(Error::InvalidEmptyKey);
}
if value.is_empty() {
return Err(Error::InvalidEmptyValue);
}
let file_size = self.writer.get_ref().metadata()?.len();
if file_size > MAX_ACTIVE_FILE_SIZE {
log::debug!("File size {} exceeded limit, rotating", file_size);
self.rotate_active_file()?;
if false {
log::debug!("Auto-compaction is enabled, checking file count");
let immutable_files = std::fs::read_dir(&self.path)?
.filter_map(Result::ok)
.filter(|entry| {
let name = entry.file_name().to_string_lossy().to_string();
name.ends_with(".log") && !name.ends_with(".active.log")
})
.count();
log::debug!("Found {} immutable files", immutable_files);
if immutable_files >= 2 {
log::debug!(
"Auto-triggering compaction with {} immutable files",
immutable_files
);
self.compact()?;
}
} else {
log::debug!("Auto-compaction is disabled");
}
}
let command = CommandSet::new(key.clone(), value.clone())?;
let mut buffer = Vec::new();
command.serialize(&mut buffer)?;
let position = self.writer.seek(SeekFrom::End(0))?;
self.writer.write_all(&buffer)?;
self.writer.flush()?;
let value_position = position + CommandHeader::SIZE as u64 + key.len() as u64;
self.keydir.insert(
key,
KeyDirEntry {
file_id: self.writer_id,
value_size: value.len() as u32,
value_position,
timestamp: command.timestamp,
},
);
Ok(())
}
pub fn remove(&mut self, key: Vec<u8>) -> Result<(), Error> {
if key.is_empty() {
return Err(Error::InvalidEmptyKey);
}
let command = CommandRemove::new(key.clone())?;
let mut buffer = Vec::new();
command.serialize(&mut buffer)?;
self.writer.write_all(&buffer)?;
self.writer.flush()?;
self.keydir.remove(&key);
Ok(())
}
pub fn compact(&mut self) -> Result<(), Error> {
let timestamp = timestamp_as_u64()?;
let mut compaction_writer = BufWriter::new(
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(file_log_path(&self.path, timestamp))?,
);
let mut new_pos = 0;
for (key, entry) in self.keydir.iter_mut() {
if entry.file_id == self.writer_id {
continue;
}
let mut reader = BufReader::new(File::open(file_log_path(&self.path, entry.file_id))?);
let header_pos = entry.value_position - key.len() as u64 - CommandHeader::SIZE as u64;
reader.seek(SeekFrom::Start(header_pos))?;
let entry_size =
CommandHeader::SIZE as u64 + key.len() as u64 + entry.value_size as u64;
io::copy(&mut reader.take(entry_size), &mut compaction_writer)?;
entry.file_id = timestamp;
entry.value_position = new_pos + CommandHeader::SIZE as u64 + key.len() as u64;
new_pos += entry_size;
}
compaction_writer.flush()?;
for file in std::fs::read_dir(&self.path)? {
let file = file?;
let name = file.file_name().to_string_lossy().to_string();
if name.ends_with(".log")
&& !name.ends_with(".active.log")
&& !name.starts_with(×tamp.to_string())
{
std::fs::remove_file(file.path())?;
}
}
Ok(())
}
}
#[derive(Debug)]
struct CommandHeader {
crc: u32,
timestamp: u64,
key_len: u32,
value_size: u32,
}
impl CommandHeader {
const SIZE: usize = std::mem::size_of::<u32>()
+ std::mem::size_of::<u64>()
+ std::mem::size_of::<u32>()
+ std::mem::size_of::<u32>();
fn new(crc: u32, timestamp: u64, key_len: u32, value_len: u32) -> Self {
Self {
crc,
timestamp,
key_len,
value_size: value_len,
}
}
fn serialize(&self, buffer: &mut Vec<u8>) -> Result<(), Error> {
buffer.write_all(&self.crc.to_le_bytes())?;
buffer.write_all(&self.timestamp.to_le_bytes())?;
buffer.write_all(&self.key_len.to_le_bytes())?;
buffer.write_all(&self.value_size.to_le_bytes())?;
Ok(())
}
fn deserialize(buf: &[u8]) -> Result<Self, Error> {
if buf.len() < Self::SIZE {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"buffer too small for header",
)));
}
Ok(Self {
crc: u32::from_le_bytes(buf[0..4].try_into().unwrap()),
timestamp: u64::from_le_bytes(buf[4..12].try_into().unwrap()),
key_len: u32::from_le_bytes(buf[12..16].try_into().unwrap()),
value_size: u32::from_le_bytes(buf[16..20].try_into().unwrap()),
})
}
}
#[derive(Debug)]
struct CommandSet {
crc: u32,
timestamp: u64,
key: Vec<u8>,
value: Vec<u8>,
}
#[derive(Debug)]
struct CommandRemove {
crc: u32,
timestamp: u64,
key: Vec<u8>,
}
impl CommandSet {
pub fn new(key: Vec<u8>, value: Vec<u8>) -> Result<Self, Error> {
let timestamp = timestamp_as_u64()?;
let mut hasher = crc32fast::Hasher::new();
hasher.update(key.as_slice());
hasher.update(value.as_slice());
let crc = hasher.finalize();
Ok(Self {
crc,
timestamp,
key,
value,
})
}
fn serialize(&self, buffer: &mut Vec<u8>) -> Result<(), Error> {
CommandHeader::new(
self.crc,
self.timestamp,
self.key.len() as u32,
self.value.len() as u32,
)
.serialize(buffer)?;
buffer.write_all(&self.key)?;
buffer.write_all(&self.value)?;
Ok(())
}
}
impl CommandRemove {
pub fn new(key: Vec<u8>) -> Result<Self, Error> {
let timestamp = timestamp_as_u64()?;
let mut hasher = crc32fast::Hasher::new();
hasher.update(key.as_slice());
let crc = hasher.finalize();
Ok(Self {
crc,
timestamp,
key,
})
}
fn serialize(&self, buffer: &mut Vec<u8>) -> Result<(), Error> {
CommandHeader::new(self.crc, self.timestamp, self.key.len() as u32, 0).serialize(buffer)?;
buffer.write_all(&self.key)?;
Ok(())
}
}
fn file_active_log_path(path: impl AsRef<Path>, timestamp: u64) -> PathBuf {
path.as_ref().join(format!("{}.active.log", timestamp))
}
fn file_log_path(path: impl AsRef<Path>, timestamp: u64) -> PathBuf {
path.as_ref().join(format!("{}.log", timestamp))
}
fn timestamp_as_u64() -> Result<u64, Error> {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(Error::TimestampError)?
.as_millis()
.try_into()
.map_err(Error::TimestampOverflow)
}
impl Drop for Bitask {
fn drop(&mut self) {
if let Ok(path) = self.path.join("db.lock").canonicalize() {
let _ = std::fs::remove_file(path);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_set_command_serialization() {
let key = b"key".to_vec();
let value = b"value".to_vec();
let command = CommandSet::new(key.clone(), value.clone()).unwrap();
let mut buffer = Vec::new();
command.serialize(&mut buffer).unwrap();
let header = CommandHeader::deserialize(&buffer[..CommandHeader::SIZE]).unwrap();
assert_eq!(header.key_len, key.len() as u32);
assert_eq!(header.value_size, value.len() as u32);
assert_eq!(
&buffer[CommandHeader::SIZE..CommandHeader::SIZE + key.len()],
key
);
assert_eq!(&buffer[CommandHeader::SIZE + key.len()..], value);
let mut hasher = crc32fast::Hasher::new();
hasher.update(&key);
hasher.update(&value);
assert_eq!(header.crc, hasher.finalize());
}
#[test]
fn test_remove_command_serialization() {
let key = b"key".to_vec();
let command = CommandRemove::new(key.clone()).unwrap();
let mut buffer = Vec::new();
command.serialize(&mut buffer).unwrap();
let header = CommandHeader::deserialize(&buffer[..CommandHeader::SIZE]).unwrap();
assert_eq!(header.key_len, key.len() as u32);
assert_eq!(header.value_size, 0);
assert_eq!(&buffer[CommandHeader::SIZE..], key);
let mut hasher = crc32fast::Hasher::new();
hasher.update(&key);
assert_eq!(header.crc, hasher.finalize());
}
#[test]
fn test_automatic_compaction_disabled() {
let dir = tempfile::tempdir().unwrap();
let mut db = Bitask::open(dir.path()).unwrap();
for i in 0..3000 {
let key = format!("key{}", i).into_bytes();
let value = vec![0; 8 * 1024]; db.put(key, value).unwrap();
}
let log_files = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(Result::ok)
.filter(|entry| {
let name = entry.file_name().to_string_lossy().to_string();
name.ends_with(".log") && !name.ends_with(".active.log")
})
.count();
assert!(
log_files >= 2,
"Expected 2 or more log files since auto-compaction is disabled"
);
}
}