use std::{
collections::BTreeSet,
fs::{File, OpenOptions},
io,
os::unix::fs::OpenOptionsExt,
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use io_uring::{cqueue, IoUring};
use parking_lot::RwLock;
use crate::{
flush_behaviour::*,
flush_buffer::{BufferError, BufferMsg, FlushBuffer, FlushBufferRing, RING_SIZE},
};
pub const FOUR_KB_PAGE: usize = 4096;
pub const FAILED_FLUSH: i32 = 0;
pub struct LogStructuredStore {
pub(crate) buffer: FlushBufferRing,
pub(crate) flusher: Arc<FlushBehavior>,
store: Arc<File>,
pub hi_stable: AtomicU64,
completed_islands: RwLock<BTreeSet<u64>>,
}
pub struct Reservation<'a> {
pub buffer: Arc<&'a FlushBuffer>,
pub(crate) offset: usize,
}
impl LogStructuredStore {
pub fn open(
path: impl AsRef<Path>,
ring: FlushBufferRing,
flusher: Arc<FlushBehavior>,
) -> io::Result<Self> {
let file = open_direct(path)?;
Ok(Self {
buffer: ring,
flusher,
store: Arc::new(file),
hi_stable: AtomicU64::new(0),
completed_islands: RwLock::new(BTreeSet::new()),
})
}
pub fn open_with_behavior(path: impl AsRef<Path>, mode: WriteMode) -> io::Result<Self> {
let file = Arc::new(open_direct(path)?);
let io_uring = Arc::new(parking_lot::Mutex::new(IoUring::new(8)?));
let flusher = Arc::new(match mode {
WriteMode::TailLocalizedWrites => {
FlushBehavior::NoWaitAppender(Appender::new(io_uring, Arc::clone(&file), mode))
}
WriteMode::SerializedWrites => {
FlushBehavior::WaitAppender(Appender::new(io_uring, Arc::clone(&file), mode))
}
});
let ring = FlushBufferRing::with_flusher(RING_SIZE, FOUR_KB_PAGE, flusher.clone());
Ok(Self {
buffer: ring,
flusher,
store: file,
hi_stable: AtomicU64::new(0),
completed_islands: RwLock::new(BTreeSet::new()),
})
}
pub fn write_payload<'a>(
&self,
payload: &[u8],
reservation: Reservation<'a>,
) -> Result<BufferMsg, BufferError> {
let (current, offset) = { (reservation.buffer, reservation.offset) };
self.buffer.put(*current, Ok(offset), payload)
}
pub fn reserve_space(&self, payload_size: usize) -> Result<Reservation, BufferError> {
let current = unsafe {
self.buffer
.current_buffer
.load(std::sync::atomic::Ordering::Acquire)
.as_ref()
}
.ok_or(BufferError::InvalidState)?;
match current.reserve_space(payload_size) {
Ok(offset) => Ok(Reservation {
buffer: Arc::new(current),
offset,
}),
Err(e) => Err(e),
}
}
pub fn check_async_cque(&self) {
let cqes: Vec<cqueue::Entry> = {
let mut ring = self.flusher.get_cqueue();
ring.completion().sync();
ring.completion().collect()
};
for cqe in cqes {
let user_data = cqe.user_data();
let ptr = user_data as *const FlushBuffer;
let buffer: &FlushBuffer = unsafe { &*ptr };
let lss_slot = buffer.local_lss_address_slot.load(Ordering::Acquire) as u64;
if cqe.result() < FAILED_FLUSH {
let sqe = unsafe {
(*buffer.submit_queue_entry.get())
.as_ref()
.expect("stored SQE must be present on retry")
};
let mut ring = self.flusher.get_cqueue();
unsafe {
let _ = ring.submission().push(&sqe);
};
let _ = ring.submit();
} else {
self.mark_slot_complete(lss_slot);
self.buffer.reset_buffer(buffer);
}
}
}
pub fn mark_slot_complete(&self, lss_slot: u64) {
loop {
let current = self.hi_stable.load(Ordering::Acquire);
if lss_slot <= current {
self.completed_islands.write().remove(&lss_slot);
return;
}
if lss_slot == current + 1 {
match self.hi_stable.compare_exchange(
current,
lss_slot,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
self.completed_islands.write().remove(&lss_slot);
self.advance_high_stable();
return;
}
Err(_) => continue, }
} else {
self.completed_islands.write().insert(lss_slot);
return;
}
}
}
pub fn advance_high_stable(&self) {
loop {
let current = self.hi_stable.load(Ordering::Acquire);
let next_expected = current + 1;
let found = self.completed_islands.read().contains(&next_expected);
if !found {
return;
}
match self.hi_stable.compare_exchange(
current,
next_expected,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
self.completed_islands.write().remove(&next_expected);
}
Err(_) => continue, }
}
}
pub(crate) fn get_high_stable_offset(&self) -> u64 {
self.hi_stable.load(Ordering::Acquire) * (FOUR_KB_PAGE as u64)
}
pub(crate) fn get_island_count(&self) -> usize {
self.completed_islands.read().len()
}
pub(crate) fn file_handle(&self) -> Arc<File> {
Arc::clone(&self.store)
}
pub(crate) fn get_cur_buffer(&self) -> &FlushBuffer {
unsafe {
self.buffer
.current_buffer
.load(std::sync::atomic::Ordering::Acquire)
.as_ref()
.unwrap()
}
}
pub(crate) fn flush_cur_buffer(&self) {
let cur = self.get_cur_buffer();
self.buffer.flush(cur);
}
pub(crate) fn flush_cur_buffer_blocking(&self) -> io::Result<()> {
let cur = self.get_cur_buffer();
cur.set_flush_in_progress();
self.flusher.submit_buffer_and_wait(cur)
}
}
fn open_direct(path: impl AsRef<Path>) -> io::Result<File> {
if let Some(parent) = path.as_ref().parent() {
std::fs::create_dir_all(parent)?;
}
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.custom_flags(libc::O_DIRECT)
.open(path.as_ref())
}