use std::collections::VecDeque;
use std::fs::File;
#[cfg(not(unix))]
use std::io::Read;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
use std::path::Path;
use anyhow::{Context, Result};
use crate::segment::segment_path;
pub const DEFAULT_CAPACITY: usize = 16;
#[derive(Debug)]
pub(crate) struct FdPool {
capacity: usize,
entries: VecDeque<(u32, File)>,
}
impl FdPool {
pub fn with_capacity(capacity: usize) -> Self {
Self {
capacity,
entries: VecDeque::with_capacity(capacity.max(1)),
}
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.entries.len()
}
fn touch_or_open(&mut self, dir: &Path, segment_id: u32) -> Result<&mut File> {
if let Some(pos) = self.entries.iter().position(|(id, _)| *id == segment_id) {
let entry = self.entries.remove(pos).expect("position just found");
self.entries.push_back(entry);
return Ok(&mut self.entries.back_mut().expect("just pushed").1);
}
let path = segment_path(dir, segment_id);
let file = File::open(&path)
.with_context(|| format!("datawal: open segment {}", path.display()))?;
if self.capacity == 0 {
self.entries.clear();
self.entries.push_back((segment_id, file));
return Ok(&mut self.entries.back_mut().expect("just pushed").1);
}
if self.entries.len() >= self.capacity {
self.entries.pop_front();
}
self.entries.push_back((segment_id, file));
Ok(&mut self.entries.back_mut().expect("just pushed").1)
}
pub fn read_at(
&mut self,
dir: &Path,
segment_id: u32,
offset: u64,
len: usize,
) -> Result<Vec<u8>> {
let file = self.touch_or_open(dir, segment_id)?;
let mut buf = vec![0u8; len];
#[cfg(unix)]
{
file.read_exact_at(&mut buf, offset).with_context(|| {
format!(
"datawal: pread {} bytes at offset {} of segment {:08}.dwal",
len, offset, segment_id
)
})?;
}
#[cfg(not(unix))]
{
use std::io::{Seek, SeekFrom};
file.seek(SeekFrom::Start(offset)).with_context(|| {
format!(
"datawal: seek to offset {} of segment {:08}.dwal",
offset, segment_id
)
})?;
file.read_exact(&mut buf).with_context(|| {
format!(
"datawal: read {} bytes at offset {} of segment {:08}.dwal",
len, offset, segment_id
)
})?;
}
Ok(buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::tempdir;
fn seed_segment(dir: &Path, id: u32, bytes: &[u8]) {
let p = segment_path(dir, id);
let mut f = File::create(&p).unwrap();
f.write_all(bytes).unwrap();
f.sync_all().unwrap();
}
#[test]
fn read_at_returns_requested_bytes() {
let dir = tempdir().unwrap();
seed_segment(dir.path(), 1, b"hello world");
let mut pool = FdPool::with_capacity(4);
let got = pool.read_at(dir.path(), 1, 6, 5).unwrap();
assert_eq!(got, b"world");
}
#[test]
fn second_read_reuses_cached_fd() {
let dir = tempdir().unwrap();
seed_segment(dir.path(), 1, b"abcdefgh");
let mut pool = FdPool::with_capacity(2);
pool.read_at(dir.path(), 1, 0, 4).unwrap();
pool.read_at(dir.path(), 1, 4, 4).unwrap();
assert_eq!(pool.len(), 1);
}
#[test]
fn lru_evicts_oldest_at_capacity() {
let dir = tempdir().unwrap();
for id in 1..=3u32 {
seed_segment(dir.path(), id, b"xxxx");
}
let mut pool = FdPool::with_capacity(2);
pool.read_at(dir.path(), 1, 0, 1).unwrap();
pool.read_at(dir.path(), 2, 0, 1).unwrap();
assert_eq!(pool.len(), 2);
pool.read_at(dir.path(), 3, 0, 1).unwrap();
assert_eq!(pool.len(), 2);
pool.read_at(dir.path(), 1, 0, 1).unwrap();
assert_eq!(pool.len(), 2);
}
#[test]
fn capacity_zero_disables_cache() {
let dir = tempdir().unwrap();
seed_segment(dir.path(), 1, b"abcd");
let mut pool = FdPool::with_capacity(0);
pool.read_at(dir.path(), 1, 0, 4).unwrap();
pool.read_at(dir.path(), 1, 0, 4).unwrap();
assert!(pool.len() <= 1);
}
#[test]
fn missing_segment_returns_error() {
let dir = tempdir().unwrap();
let mut pool = FdPool::with_capacity(2);
let err = pool.read_at(dir.path(), 7, 0, 1).unwrap_err();
assert!(format!("{err:#}").contains("open segment"));
}
#[test]
fn out_of_bounds_read_errors() {
let dir = tempdir().unwrap();
seed_segment(dir.path(), 1, b"abc");
let mut pool = FdPool::with_capacity(2);
let err = pool.read_at(dir.path(), 1, 0, 16).unwrap_err();
assert!(
format!("{err:#}").to_lowercase().contains("eof")
|| format!("{err:#}").to_lowercase().contains("pread")
|| format!("{err:#}").to_lowercase().contains("read")
);
}
}