use crate::wal::block::Block;
use crate::wal::config::debug_print;
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, RwLock};
#[derive(Debug)]
pub(super) struct ColReaderInfo {
pub(super) chain: Vec<Block>,
pub(super) cur_block_idx: usize,
pub(super) cur_block_offset: u64,
pub(super) reads_since_persist: u32,
pub(super) tail_block_id: u64,
pub(super) tail_offset: u64,
pub(super) hydrated_from_index: bool,
}
pub(super) struct Reader {
pub(super) data: RwLock<HashMap<String, Arc<RwLock<ColReaderInfo>>>>,
}
impl Reader {
pub(super) fn new() -> Self {
Self {
data: RwLock::new(HashMap::new()),
}
}
pub(super) fn append_block_to_chain(&self, col: &str, block: Block) -> io::Result<()> {
if let Some(info_arc) = {
let map = self.data.read().map_err(|_| {
io::Error::new(io::ErrorKind::Other, "reader map read lock poisoned")
})?;
map.get(col).cloned()
} {
let mut info = info_arc.write().map_err(|_| {
io::Error::new(io::ErrorKind::Other, "col info write lock poisoned")
})?;
let before = info.chain.len();
info.chain.push(block.clone());
let new_idx = info.chain.len().saturating_sub(1);
if info.tail_block_id == block.id {
info.cur_block_idx = new_idx;
info.cur_block_offset = info.tail_offset.min(block.used);
}
debug_print!(
"[reader] chain append(fast): col={}, block_id={}, chain_len {}->{}",
col,
block.id,
before,
before + 1
);
return Ok(());
}
let info_arc = {
let mut map = self.data.write().map_err(|_| {
io::Error::new(io::ErrorKind::Other, "reader map write lock poisoned")
})?;
map.entry(col.to_string())
.or_insert_with(|| {
Arc::new(RwLock::new(ColReaderInfo {
chain: Vec::new(),
cur_block_idx: 0,
cur_block_offset: 0,
reads_since_persist: 0,
tail_block_id: 0,
tail_offset: 0,
hydrated_from_index: false,
}))
})
.clone()
};
let mut info = info_arc
.write()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "col info write lock poisoned"))?;
info.chain.push(block.clone());
let new_idx = info.chain.len().saturating_sub(1);
if info.tail_block_id == block.id {
info.cur_block_idx = new_idx;
info.cur_block_offset = info.tail_offset.min(block.used);
}
debug_print!(
"[reader] chain append(slow/new): col={}, block_id={}, chain_len {}->{}",
col,
block.id,
0,
1
);
Ok(())
}
}