use crate::wal::block::{Block, Metadata};
use crate::wal::config::{
DEFAULT_BLOCK_SIZE, FsyncSchedule, MAX_FILE_SIZE, PREFIX_META_SIZE, debug_print,
};
use crate::wal::paths::WalPathManager;
use crate::wal::storage::{SharedMmapKeeper, set_fsync_schedule};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use super::WalIndex;
use super::allocator::{BlockAllocator, BlockStateTracker, FileStateTracker, flush_check};
use super::background::start_background_workers;
use super::reader::Reader;
use super::writer::Writer;
use rkyv::Deserialize;
#[derive(Clone, Copy, Debug)]
pub enum ReadConsistency {
StrictlyAtOnce,
AtLeastOnce { persist_every: u32 },
}
pub struct Walrus {
pub(super) allocator: Arc<BlockAllocator>,
pub(super) reader: Arc<Reader>,
pub(super) writers: RwLock<HashMap<String, Arc<Writer>>>,
pub(super) fsync_tx: Arc<mpsc::Sender<String>>,
pub(super) read_offset_index: Arc<RwLock<WalIndex>>,
pub(super) read_consistency: ReadConsistency,
pub(super) fsync_schedule: FsyncSchedule,
pub(super) paths: Arc<WalPathManager>,
}
impl Walrus {
pub fn new() -> std::io::Result<Self> {
Self::with_consistency(ReadConsistency::StrictlyAtOnce)
}
pub fn with_consistency(mode: ReadConsistency) -> std::io::Result<Self> {
Self::with_consistency_and_schedule(mode, FsyncSchedule::Milliseconds(200))
}
pub fn with_consistency_and_schedule(
mode: ReadConsistency,
fsync_schedule: FsyncSchedule,
) -> std::io::Result<Self> {
let paths = Arc::new(WalPathManager::default());
Self::with_paths(paths, mode, fsync_schedule)
}
pub fn new_for_key(key: &str) -> std::io::Result<Self> {
Self::with_consistency_for_key(key, ReadConsistency::StrictlyAtOnce)
}
pub fn with_consistency_for_key(key: &str, mode: ReadConsistency) -> std::io::Result<Self> {
Self::with_consistency_and_schedule_for_key(key, mode, FsyncSchedule::Milliseconds(200))
}
pub fn with_consistency_and_schedule_for_key(
key: &str,
mode: ReadConsistency,
fsync_schedule: FsyncSchedule,
) -> std::io::Result<Self> {
let paths = WalPathManager::for_key(key);
Self::with_paths(Arc::new(paths), mode, fsync_schedule)
}
fn with_paths(
paths: Arc<WalPathManager>,
mode: ReadConsistency,
fsync_schedule: FsyncSchedule,
) -> std::io::Result<Self> {
debug_print!("[walrus] new");
set_fsync_schedule(fsync_schedule);
let allocator = Arc::new(BlockAllocator::new(paths.clone())?);
let reader = Arc::new(Reader::new());
let tx_arc = start_background_workers(fsync_schedule);
let idx = WalIndex::new_in(&paths, "read_offset_idx")?;
let instance = Walrus {
allocator,
reader,
writers: RwLock::new(HashMap::new()),
fsync_tx: tx_arc,
read_offset_index: Arc::new(RwLock::new(idx)),
read_consistency: mode,
fsync_schedule,
paths,
};
instance.startup_chore()?;
Ok(instance)
}
pub(super) fn get_or_create_writer(&self, col_name: &str) -> std::io::Result<Arc<Writer>> {
if let Some(writer) = {
let map = self.writers.read().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "writers read lock poisoned")
})?;
map.get(col_name).cloned()
} {
return Ok(writer);
}
let mut map = self.writers.write().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "writers write lock poisoned")
})?;
if let Some(writer) = map.get(col_name).cloned() {
return Ok(writer);
}
let initial_block = unsafe { self.allocator.get_next_available_block()? };
let writer = Arc::new(Writer::new(
self.allocator.clone(),
initial_block,
self.reader.clone(),
col_name.to_string(),
self.fsync_tx.clone(),
self.fsync_schedule,
));
map.insert(col_name.to_string(), writer.clone());
Ok(writer)
}
pub(super) fn startup_chore(&self) -> std::io::Result<()> {
let dir = match fs::read_dir(self.paths.root()) {
Ok(d) => d,
Err(_) => return Ok(()),
};
let mut files: Vec<String> = Vec::new();
for entry in dir {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let path = entry.path();
if let Ok(ft) = entry.file_type() {
if ft.is_dir() {
continue;
}
}
if let Some(s) = path.to_str() {
if s.ends_with("_index.db") {
continue;
}
files.push(s.to_string());
}
}
files.sort();
if !files.is_empty() {
debug_print!("[recovery] scanning files: {}", files.len());
}
let mut next_block_id: usize = 1;
let mut seen_files = HashSet::new();
for file_path in files.iter() {
let mmap = match SharedMmapKeeper::get_mmap_arc(file_path) {
Ok(m) => m,
Err(e) => {
debug_print!("[recovery] mmap open failed for {}: {}", file_path, e);
continue;
}
};
seen_files.insert(file_path.clone());
FileStateTracker::register_file_if_absent(file_path);
debug_print!("[recovery] file {}", file_path);
let mut block_offset: u64 = 0;
while block_offset + DEFAULT_BLOCK_SIZE <= MAX_FILE_SIZE {
let mut probe = [0u8; 8];
mmap.read(block_offset as usize, &mut probe);
if probe.iter().all(|&b| b == 0) {
break;
}
let mut used: u64 = 0;
let mut meta_buf = vec![0u8; PREFIX_META_SIZE];
mmap.read(block_offset as usize, &mut meta_buf);
let meta_len = (meta_buf[0] as usize) | ((meta_buf[1] as usize) << 8);
if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
break;
}
let mut aligned = rkyv::AlignedVec::with_capacity(meta_len);
aligned.extend_from_slice(&meta_buf[2..2 + meta_len]);
let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
let md: Metadata = match archived.deserialize(&mut rkyv::Infallible) {
Ok(m) => m,
Err(_) => {
break;
}
};
let col_name = md.owned_by;
let block_stub = Block {
id: next_block_id as u64,
file_path: file_path.clone(),
offset: block_offset,
limit: DEFAULT_BLOCK_SIZE,
mmap: mmap.clone(),
used: 0,
};
let mut in_block_off: u64 = 0;
loop {
match block_stub.read(in_block_off) {
Ok((_entry, consumed)) => {
used += consumed as u64;
in_block_off += consumed as u64;
if in_block_off >= DEFAULT_BLOCK_SIZE {
break;
}
}
Err(_) => break,
}
}
if used == 0 {
break;
}
let block = Block {
id: next_block_id as u64,
file_path: file_path.clone(),
offset: block_offset,
limit: DEFAULT_BLOCK_SIZE,
mmap: mmap.clone(),
used,
};
BlockStateTracker::register_block(next_block_id, file_path);
FileStateTracker::add_block_to_file_state(file_path);
if !col_name.is_empty() {
let _ = self.reader.append_block_to_chain(&col_name, block.clone());
debug_print!(
"[recovery] appended block: file={}, block_id={}, used={}, col={}",
file_path,
block.id,
block.used,
col_name
);
}
next_block_id += 1;
block_offset += DEFAULT_BLOCK_SIZE;
}
}
if let Ok(idx_guard) = self.read_offset_index.read() {
let map = self.reader.data.read().ok();
if let Some(map) = map {
for (col, info_arc) in map.iter() {
if let Some(pos) = idx_guard.get(col) {
let mut info = match info_arc.write() {
Ok(v) => v,
Err(_) => continue,
};
let mut ib = pos.cur_block_idx as usize;
if ib > info.chain.len() {
ib = info.chain.len();
}
info.cur_block_idx = ib;
if ib < info.chain.len() {
let used = info.chain[ib].used;
info.cur_block_offset = pos.cur_block_offset.min(used);
} else {
info.cur_block_offset = 0;
}
for i in 0..ib {
BlockStateTracker::set_checkpointed_true(info.chain[i].id as usize);
}
if ib < info.chain.len() && info.cur_block_offset >= info.chain[ib].used {
BlockStateTracker::set_checkpointed_true(info.chain[ib].id as usize);
}
}
}
}
}
for f in seen_files.into_iter() {
flush_check(f);
}
Ok(())
}
}