use std::{
collections::VecDeque,
fs::File,
io::Write,
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::RwLock;
use crate::{disk_chan_page, ChanPage, IdxUsize};
pub struct DiskChan {
path: PathBuf,
_lock: File,
count: AtomicUsize,
max_pages: usize,
page_size: IdxUsize,
pages: RwLock<VecDeque<Arc<ChanPage>>>,
}
impl std::fmt::Debug for DiskChan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DiskChan")
.field("path", &self.path)
.field("count", &self.count)
.field("max_pages", &self.max_pages)
.field("page_size", &self.page_size)
.finish_non_exhaustive()
}
}
impl Drop for DiskChan {
fn drop(&mut self) {
let _ = std::fs::remove_file(self.path.join(".pid.lock"));
}
}
impl DiskChan {
fn get_lock<P: AsRef<Path>>(path: P) -> Result<File, std::io::Error> {
let mut lock = match std::fs::OpenOptions::new()
.create_new(true)
.read(true)
.truncate(true)
.write(true)
.open(path.as_ref().join(".pid.lock"))
{
Ok(lock) => lock,
Err(e) => {
eprintln!("only one DiskChan process should own a path at any time");
return Err(e);
}
};
lock.write_all(std::process::id().to_string().as_bytes())?;
Ok(lock)
}
fn parse_page_no(name: String) -> Option<usize> {
let mut name_itr = name.split('.');
if name_itr.next()? != "data" {
return None;
}
let i = name_itr.next().and_then(|s| base62::decode(s).ok())?;
if name_itr.next()? != "bin" {
return None;
}
if name_itr.next().is_some() {
return None;
}
i.try_into().ok()
}
async fn load_pages_from_path<P: AsRef<Path>>(path: P) -> (usize, VecDeque<Arc<ChanPage>>) {
let mut pages_unordered = Vec::new();
for entry in std::fs::read_dir(path).expect("path exists") {
let Ok(entry) = entry else { continue };
let Ok(meta) = entry.metadata() else { continue };
let Ok(name) = entry.file_name().into_string() else {
continue;
};
let path = entry.path();
let Some(page_no) = Self::parse_page_no(name) else {
continue;
};
let header_size: u32 = size_of::<disk_chan_page::ChanPagePersist<[u8; 0]>>()
.try_into()
.expect("to be optimized out");
let file_size: u32 = meta.len().try_into().expect("to be optimized out");
let Ok(mut page) = (unsafe { ChanPage::new(path, file_size - header_size).await })
else {
continue;
};
unsafe {
page.reset_all_waiters();
page.reset_read_count_groups();
}
pages_unordered.push((page_no, Arc::new(page)));
}
pages_unordered.sort_by(|a, b| a.0.cmp(&b.0));
let count = pages_unordered.last().map(|(c, _)| *c + 1).unwrap_or(0);
let pages = pages_unordered.into_iter().map(|(_, page)| page).collect();
(count, pages)
}
pub(super) async fn new<P: AsRef<Path>>(
path: P,
page_size: u32,
max_pages: usize,
) -> Result<Self, std::io::Error> {
let _ = std::fs::create_dir_all(path.as_ref());
let lock = Self::get_lock(path.as_ref())?;
let (count, mut pages) = Self::load_pages_from_path(&path).await;
if pages.len() > max_pages {
for i in (count - (pages.len() - 1))..=(count - max_pages) {
let _ = pages.pop_front();
let num: u64 = i.try_into().expect("to be optimized out");
if let Err(e) = std::fs::remove_file(
path.as_ref()
.join(format!("data.{}.bin", base62::encode_fmt(num))),
) {
eprintln!("something went wrong with page cleanup... channel may be corrupted");
return Err(e);
}
}
}
let pages = RwLock::new(pages);
Ok(DiskChan {
path: path.as_ref().into(),
_lock: lock,
count: AtomicUsize::new(count),
max_pages,
page_size,
pages,
})
}
fn get_page_path(&self, num: usize) -> PathBuf {
let num: u64 = num.try_into().expect("to be optimized out");
self.path
.join(format!("data.{}.bin", base62::encode_fmt(num)))
}
pub async fn get_page(&self, page_no: usize) -> Result<(usize, Arc<ChanPage>), std::io::Error> {
if page_no >= self.count.load(Ordering::Relaxed) {
let mut pages = self.pages.write().await;
if page_no >= self.count.load(Ordering::Relaxed) {
let count = self.count.fetch_add(1, Ordering::Relaxed);
if count >= self.max_pages {
tokio::fs::remove_file(self.get_page_path(count - self.max_pages)).await?;
let _ = pages.pop_front();
}
let new_page =
unsafe { ChanPage::new(self.get_page_path(count), self.page_size).await? };
let new_page = Arc::new(new_page);
pages.push_back(new_page.clone());
pages.make_contiguous();
return Ok((count, new_page));
}
drop(pages);
}
let pages = self.pages.read().await;
let count = self.count.load(Ordering::Relaxed);
let min_page = match count.cmp(&self.max_pages) {
std::cmp::Ordering::Less => 0,
std::cmp::Ordering::Equal => 0,
std::cmp::Ordering::Greater => count - self.max_pages,
};
let page_no = min_page.max(page_no);
let (pages_slice, _) = pages.as_slices();
Ok((page_no, pages_slice[page_no - min_page].clone()))
}
}