use crate::error::Error;
use crate::metadata::Metadata;
use crate::wal::WalFileSet;
use std::sync::{Arc, RwLock};
use std::thread;
use tokio::sync::oneshot;
use tracing::{debug, error};
#[allow(clippy::type_complexity)]
pub enum Action {
Logs {
from: u64,
until: u64,
ack: flume::Sender<Option<Result<Vec<u8>, Error>>>,
},
LogState(oneshot::Sender<Result<LogState, Error>>),
Vote(oneshot::Sender<Result<Option<Vec<u8>>, Error>>),
Shutdown,
}
#[derive(Debug)]
pub struct LogState {
pub last_purged_log_id: Option<Vec<u8>>,
pub last_log: Option<Vec<u8>>,
}
#[derive(Debug)]
pub struct LogReadMemo {
pub last_wal_no: u64,
pub last_log_id: u64,
pub data_end: u32,
}
pub fn spawn(
meta: Arc<RwLock<Metadata>>,
wal_locked: Arc<RwLock<WalFileSet>>,
) -> Result<flume::Sender<Action>, Error> {
let (tx, rx) = flume::bounded::<Action>(1);
thread::spawn(move || run(meta, wal_locked, rx));
Ok(tx)
}
fn run(
meta: Arc<RwLock<Metadata>>,
wal_locked: Arc<RwLock<WalFileSet>>,
rx: flume::Receiver<Action>,
) {
let mut wal = wal_locked.read().unwrap().clone_no_map();
let mut buf = Vec::with_capacity(64);
let mut memo: Option<LogReadMemo> = None;
while let Ok(action) = rx.recv() {
match action {
Action::Logs { from, until, ack } => {
debug!("WAL Reader - Action::Logs - read from {from} until {until}");
{
let wal_upd = wal_locked.read().unwrap();
wal.active = wal_upd.active;
wal.clone_files_from_no_mmap(&wal_upd.files);
}
let mut from_next = from;
for log in wal.files.iter_mut() {
if log.id_until < from_next {
debug!(
"log.id_until < from_next -> {} < {}",
log.id_until, from_next
);
continue;
}
log.mmap().unwrap();
buf.clear();
if log.id_until < until {
debug!("log.id_until < until -> {} < {}", log.id_until, until);
log.read_logs(from_next, log.id_until, &mut memo, &mut buf)
.unwrap();
for (_id, data) in buf.drain(..) {
debug_assert!(_id >= from_next && _id <= until);
ack.send(Some(Ok(data))).unwrap()
}
log.mmap_drop();
from_next = log.id_until + 1;
} else {
debug!("log contains end of read request");
match log.read_logs(from_next, until, &mut memo, &mut buf) {
Ok(_) => {
for (_, data) in buf.drain(..) {
ack.send(Some(Ok(data))).unwrap();
}
}
Err(err) => {
error!("Error reading logs: {:?}", err);
}
}
break;
};
}
ack.send(None).unwrap();
}
Action::LogState(ack) => {
debug!("WAL Reader - Action::LogState");
{
let wal_upd = wal_locked.read().unwrap();
wal.active = wal_upd.active;
wal.clone_files_from_no_mmap(&wal_upd.files);
}
let latest_log_id = {
let file = &wal.files[wal.files.len() - 1];
if file.data_start.is_some() {
Some(file.id_until)
} else if wal.files.len() > 1 {
Some(wal.files[wal.files.len() - 2].id_until)
} else {
None
}
};
let last_log = if let Some(latest_log_id) = latest_log_id {
buf.clear();
let active = wal.active();
if active.data_start.is_some() {
active.mmap().unwrap();
active
.read_logs(latest_log_id, latest_log_id, &mut memo, &mut buf)
.unwrap();
} else if wal.files.len() > 1 {
let file = wal.files.get_mut(wal.files.len() - 2).unwrap();
file.mmap().unwrap();
file.read_logs(latest_log_id, latest_log_id, &mut memo, &mut buf)
.unwrap();
file.mmap_drop();
}
let (_, data) = buf.swap_remove(0);
Some(data)
} else {
None
};
let st = LogState {
last_purged_log_id: meta.read().unwrap().last_purged_log_id.clone(),
last_log,
};
debug!(
"WAL Reader - Action::LogState -> latest_log_id: {:?}\n{:?}",
latest_log_id, st
);
ack.send(Ok(st)).unwrap();
}
Action::Vote(ack) => {
debug!("WAL Reader - Action::Vote");
let vote = meta.read().unwrap().vote.clone();
ack.send(Ok(vote)).unwrap();
}
Action::Shutdown => {
debug!("Raft logs store reader is being shut down");
break;
}
}
}
debug!("Logs Reader exiting");
}