#![allow(clippy::await_holding_lock)]
#[cfg(not(feature = "async-io"))]
use std::fs::{File, OpenOptions};
#[cfg(not(feature = "async-io"))]
use std::io::{Read, Seek};
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::sync::{oneshot, Notify};
#[cfg(feature = "async-io")]
use tokio_uring_executor as executor;
#[cfg(feature = "async-io")]
use tokio_uring::fs::{File, OpenOptions};
use cfg_if::cfg_if;
use crate::memtable::Memtable;
use crate::{Error, Params, WriteOp};
mod writer;
use writer::WalWriter;
#[cfg(test)]
mod tests;
const PAGE_SIZE: u64 = 4 * 1024;
struct LogStatus {
queue_pos: u64,
write_pos: u64,
sync_pos: u64,
queue: Vec<Vec<u8>>,
sync_flag: bool,
offset_pos: u64,
flush_pos: u64,
stop_flag: bool,
}
struct LogInner {
status: RwLock<LogStatus>,
queue_cond: Notify,
write_cond: Notify,
}
pub struct WriteAheadLog {
inner: Arc<LogInner>,
finish_receiver: parking_lot::Mutex<Option<oneshot::Receiver<()>>>,
}
impl WriteAheadLog {
pub async fn new(params: Arc<Params>) -> Result<Self, std::io::Error> {
let status = LogStatus {
queue_pos: 0,
write_pos: 0,
sync_pos: 0,
flush_pos: 0,
offset_pos: 0,
queue: vec![],
stop_flag: false,
sync_flag: false,
};
let inner = Arc::new(LogInner {
status: RwLock::new(status),
queue_cond: Default::default(),
write_cond: Default::default(),
});
let finish_receiver = Self::start_writer(inner.clone(), params);
Ok(Self {
inner,
finish_receiver: parking_lot::Mutex::new(Some(finish_receiver)),
})
}
fn start_writer(inner: Arc<LogInner>, params: Arc<Params>) -> oneshot::Receiver<()> {
let (finish_sender, finish_receiver) = oneshot::channel();
cfg_if::cfg_if! {
if #[cfg(feature = "async-io")] {
unsafe {
executor::unsafe_spawn(async move {
let mut writer = WalWriter::new(params).await;
loop {
let done = writer.update_log(&inner).await;
if done {
break;
}
}
let _ = finish_sender.send(());
});
}
} else {
{
tokio::spawn(async move {
let mut writer = WalWriter::new(params).await;
loop {
let done = writer.update_log(&inner).await;
if done {
break;
}
}
let _ = finish_sender.send(());
});
}
}
}
finish_receiver
}
pub async fn open(
params: Arc<Params>,
start_position: u64,
memtable: &mut Memtable,
) -> Result<Self, std::io::Error> {
let mut position = start_position;
let mut count: usize = 0;
let fpos = position / PAGE_SIZE;
cfg_if! {
if #[cfg(feature="async-io")] {
let mut log_file = WalWriter::open_file(¶ms, fpos).await?;
} else {
let file_offset = position % PAGE_SIZE;
let mut log_file = WalWriter::open_file(¶ms, fpos).await?;
log_file.seek(std::io::SeekFrom::Start(file_offset)).unwrap();
}
}
loop {
const KEY_LEN_SIZE: usize = std::mem::size_of::<u64>();
const HEADER_SIZE: usize = std::mem::size_of::<u8>() + KEY_LEN_SIZE;
let mut op_header = [0u8; HEADER_SIZE];
let success = Self::read_from_log(
&mut log_file,
&mut position,
&mut op_header[..],
¶ms,
true,
)
.await?;
if !success {
break;
}
let op_type = op_header[0];
let key_len_data: &[u8; KEY_LEN_SIZE] = &op_header[1..].try_into().unwrap();
let key_len = u64::from_le_bytes(*key_len_data);
let mut key = vec![0; key_len as usize];
Self::read_from_log(&mut log_file, &mut position, &mut key, ¶ms, false)
.await
.unwrap();
if op_type == WriteOp::PUT_OP {
let mut val_len = [0u8; 8];
Self::read_from_log(&mut log_file, &mut position, &mut val_len, ¶ms, false)
.await
.unwrap();
let val_len = u64::from_le_bytes(val_len);
let mut value = vec![0; val_len as usize];
Self::read_from_log(&mut log_file, &mut position, &mut value, ¶ms, false)
.await
.unwrap();
memtable.put(key, value);
} else if op_type == WriteOp::DELETE_OP {
memtable.delete(key);
} else {
panic!("Unexpected op type!");
}
count += 1;
}
log::debug!("Found {count} entries in write-ahead log");
let status = LogStatus {
queue_pos: position,
write_pos: position,
sync_pos: position,
flush_pos: start_position,
offset_pos: start_position,
queue: vec![],
sync_flag: false,
stop_flag: false,
};
let inner = Arc::new(LogInner {
status: RwLock::new(status),
queue_cond: Default::default(),
write_cond: Default::default(),
});
let finish_receiver = Self::continue_writer(inner.clone(), position, params);
Ok(Self {
inner,
finish_receiver: parking_lot::Mutex::new(Some(finish_receiver)),
})
}
fn continue_writer(
inner: Arc<LogInner>,
position: u64,
params: Arc<Params>,
) -> oneshot::Receiver<()> {
let (finish_sender, finish_receiver) = oneshot::channel();
cfg_if::cfg_if! {
if #[cfg(feature = "async-io")] {
unsafe {
executor::unsafe_spawn(async move {
let mut writer = WalWriter::continue_from(position, params).await;
loop {
let done = writer.update_log(&inner).await;
if done {
break;
}
}
let _ = finish_sender.send(());
});
}
} else {
{
tokio::spawn(async move {
let mut writer = WalWriter::continue_from(position, params).await;
loop {
let done = writer.update_log(&inner).await;
if done {
break;
}
}
let _ = finish_sender.send(());
});
}
}
}
finish_receiver
}
async fn read_from_log(
log_file: &mut File,
position: &mut u64,
out: &mut [u8],
params: &Params,
maybe: bool,
) -> Result<bool, std::io::Error> {
let start_pos = *position;
let buffer_len = out.len() as u64;
let mut buffer_pos = 0;
assert!(buffer_len > 0);
while buffer_pos < buffer_len {
let mut file_offset = *position % PAGE_SIZE;
let file_remaining = PAGE_SIZE - file_offset;
assert!(file_remaining > 0);
let read_len = file_remaining.min(buffer_len - buffer_pos);
let read_start = buffer_pos as usize;
let read_end = (read_len + buffer_pos) as usize;
let read_slice = &mut out[read_start..read_end];
cfg_if! {
if #[cfg(feature="async-io")] {
let buf = vec![0u8; read_slice.len()];
let (read_result, buf) = log_file.read_exact_at(buf, file_offset).await;
read_slice.copy_from_slice(&buf);
} else {
let read_result = log_file.read_exact(read_slice);
}
}
match read_result {
Ok(_) => {
*position += read_len;
file_offset += read_len;
}
Err(err) => {
if maybe {
return Ok(false);
} else {
return Err(err);
}
}
}
assert!(file_offset <= PAGE_SIZE);
buffer_pos = *position - start_pos;
if file_offset == PAGE_SIZE {
let fpos = *position / PAGE_SIZE;
*log_file = match WalWriter::open_file(params, fpos).await {
Ok(file) => file,
Err(err) => {
if maybe {
*log_file = WalWriter::create_file(params, fpos).await?;
return Ok(buffer_pos == buffer_len);
} else {
return Err(err);
}
}
}
}
}
Ok(true)
}
#[tracing::instrument(skip(self, batch))]
pub async fn store(&self, batch: &[WriteOp]) -> Result<u64, std::io::Error> {
let mut writes = vec![];
for op in batch {
let op_type = op.get_type().to_le_bytes();
let key = op.get_key();
let klen = op.get_key_length().to_le_bytes();
let vlen = op.get_value_length().to_le_bytes();
let mut data = vec![];
data.extend_from_slice(op_type.as_slice());
data.extend_from_slice(klen.as_slice());
data.extend_from_slice(key);
match op {
WriteOp::Put(_, value) => {
data.extend_from_slice(vlen.as_slice());
data.extend_from_slice(value);
}
WriteOp::Delete(_) => {}
}
writes.push(data);
}
let end_pos = {
let mut lock = self.inner.status.write();
let mut end_pos = lock.queue_pos;
for data in writes.into_iter() {
let write_len = data.len() as u64;
lock.queue.push(data);
lock.queue_pos += write_len;
end_pos += write_len;
}
self.inner.queue_cond.notify_waiters();
end_pos
};
loop {
let fut = self.inner.write_cond.notified();
tokio::pin!(fut);
{
let lock = self.inner.status.read();
if lock.write_pos >= end_pos {
break;
}
fut.as_mut().enable();
}
fut.await;
}
Ok(end_pos)
}
pub async fn stop(&self) -> Result<(), Error> {
log::trace!("Shutting down write-ahead log. Waiting for writer to terminate.");
self.inner.status.write().stop_flag = true;
self.inner.queue_cond.notify_waiters();
self.finish_receiver
.lock()
.take()
.expect("Already stopped?")
.await
.unwrap();
log::debug!("Write-ahead log shut down");
Ok(())
}
#[tracing::instrument(skip(self))]
pub async fn sync(&self) -> Result<(), std::io::Error> {
let last_pos = {
let mut lock = self.inner.status.write();
if lock.sync_pos == lock.write_pos {
return Ok(());
}
assert!(lock.sync_pos < lock.write_pos);
lock.sync_flag = true;
self.inner.queue_cond.notify_waiters();
lock.sync_pos
};
loop {
let fut = self.inner.write_cond.notified();
tokio::pin!(fut);
{
let lock = self.inner.status.read();
if lock.sync_pos > last_pos {
return Ok(());
}
fut.as_mut().enable();
}
fut.await;
}
}
#[tracing::instrument(skip(self))]
pub async fn set_offset(&self, new_offset: u64) {
{
let mut lock = self.inner.status.write();
if new_offset <= lock.offset_pos {
panic!(
"Offset can only be increased! Requested {new_offset}, but was {}",
lock.offset_pos
);
}
lock.offset_pos = new_offset;
self.inner.queue_cond.notify_waiters();
}
loop {
let fut = self.inner.write_cond.notified();
tokio::pin!(fut);
{
let lock = self.inner.status.read();
if lock.flush_pos >= new_offset {
return;
}
fut.as_mut().enable();
}
fut.await;
}
}
}