use crate::io::{
bulk_io::{MergedBufferLimit, ReadAmplificationLimit, ReadManyArgs},
open_options::OpenOptions,
DmaStreamReaderBuilder,
DmaStreamWriter,
DmaStreamWriterBuilder,
IoVec,
ReadManyResult,
ReadResult,
ScheduledSource,
};
use futures_lite::{future::poll_fn, io::AsyncWrite, Stream};
use std::{
cell::Ref,
io,
path::Path,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
type Result<T> = crate::Result<T, ()>;
#[derive(Debug)]
pub struct ImmutableFileBuilder<P>
where
P: AsRef<Path>,
{
concurrency: usize,
buffer_size: usize,
flush_disabled: bool,
pre_allocate: Option<u64>,
hint_extent_size: Option<usize>,
path: P,
}
#[derive(Debug)]
pub struct ImmutableFilePreSealSink {
writer: DmaStreamWriter,
}
#[derive(Debug, Clone)]
pub struct ImmutableFile {
stream_builder: DmaStreamReaderBuilder,
size: u64,
}
impl<P> ImmutableFileBuilder<P>
where
P: AsRef<Path>,
{
#[must_use = "The builder must be built to be useful"]
pub fn new(fname: P) -> Self {
Self {
path: fname,
buffer_size: 128 << 10,
concurrency: 10,
flush_disabled: false,
pre_allocate: None,
hint_extent_size: None,
}
}
#[must_use = "The builder must be built to be useful"]
pub fn with_sequential_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
#[must_use = "The builder must be built to be useful"]
pub fn with_sync_on_close_disabled(mut self, flush_disabled: bool) -> Self {
self.flush_disabled = flush_disabled;
self
}
#[must_use = "The builder must be built to be useful"]
pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
#[must_use = "The builder must be built to be useful"]
pub fn with_pre_allocation(mut self, size: Option<u64>) -> Self {
self.pre_allocate = size;
self
}
#[must_use = "The builder must be built to be useful"]
pub fn with_hint_extent_size(mut self, size: Option<usize>) -> Self {
self.hint_extent_size = size;
self
}
pub async fn build_sink(self) -> Result<ImmutableFilePreSealSink> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.dma_open(self.path)
.await?;
if let Some(size) = self.pre_allocate {
let _ = file.pre_allocate(size).await;
}
if let Some(size) = self.hint_extent_size {
let _ = file.hint_extent_size(size).await;
}
let writer = DmaStreamWriterBuilder::new(file)
.with_sync_on_close_disabled(self.flush_disabled)
.with_buffer_size(self.buffer_size)
.with_write_behind(self.concurrency)
.build();
Ok(ImmutableFilePreSealSink { writer })
}
pub async fn build_existing(self) -> Result<ImmutableFile> {
let file = Rc::new(
OpenOptions::new()
.read(true)
.write(false)
.dma_open(self.path)
.await?,
);
file.attach_scheduler();
let size = file.file_size().await?;
let stream_builder = DmaStreamReaderBuilder::from_rc(file)
.with_buffer_size(self.buffer_size)
.with_read_ahead(self.concurrency);
Ok(ImmutableFile {
stream_builder,
size,
})
}
}
impl ImmutableFilePreSealSink {
pub async fn seal(mut self) -> Result<ImmutableFile> {
let stream_builder = poll_fn(|cx| self.writer.poll_seal(cx)).await?;
stream_builder.file.attach_scheduler();
let size = stream_builder.file.file_size().await?;
Ok(ImmutableFile {
stream_builder,
size,
})
}
pub async fn flush_aligned(&self) -> Result<u64> {
self.writer.flush_aligned().await
}
pub async fn sync_aligned(&self) -> Result<u64> {
self.writer.sync_aligned().await
}
pub async fn sync(&self) -> Result<u64> {
self.writer.sync().await
}
pub fn current_pos(&self) -> u64 {
self.writer.current_pos()
}
pub fn current_flushed_pos(&self) -> u64 {
self.writer.current_flushed_pos()
}
}
impl AsyncWrite for ImmutableFilePreSealSink {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}
impl ImmutableFile {
pub fn path(&self) -> Option<Ref<'_, Path>> {
self.stream_builder.file.path()
}
pub fn file_size(&self) -> u64 {
self.size
}
pub fn is_same(&self, other: &ImmutableFile) -> bool {
self.stream_builder.file.is_same(&other.stream_builder.file)
}
pub async fn read_at(&self, pos: u64, size: usize) -> Result<ReadResult> {
self.stream_builder.file.read_at(pos, size).await
}
pub fn read_many<V, S>(
&self,
iovs: S,
buffer_limit: MergedBufferLimit,
read_amp_limit: ReadAmplificationLimit,
) -> ReadManyResult<V, impl Stream<Item = (ScheduledSource, ReadManyArgs<V>)>>
where
V: IoVec + Unpin,
S: Stream<Item = V> + Unpin,
{
self.stream_builder
.file
.read_many(iovs, buffer_limit, read_amp_limit)
}
pub async fn rename<P: AsRef<Path>>(&self, new_path: P) -> Result<()> {
self.stream_builder.file.rename(new_path).await
}
pub async fn remove(&self) -> Result<()> {
self.stream_builder.file.remove().await
}
pub async fn close(self) -> Result<()> {
self.stream_builder.file.close_rc().await?;
Ok(())
}
pub fn stream_reader(&self) -> DmaStreamReaderBuilder {
self.stream_builder.clone()
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{enclose, io::DmaFile, test_utils::make_tmp_test_directory};
use futures::{AsyncReadExt, AsyncWriteExt};
use futures_lite::stream::{self, StreamExt};
macro_rules! immutable_file_test {
( $name:ident, $dir:ident, $code:block) => {
#[test]
fn $name() {
let tmpdir = make_tmp_test_directory(stringify!($name));
let $dir = tmpdir.path.clone();
test_executor!(async move { $code });
}
};
( panic: $name:ident, $dir:ident, $code:block) => {
#[test]
#[should_panic]
fn $name() {
let tmpdir = make_tmp_test_directory(stringify!($name));
let $dir = tmpdir.path.clone();
test_executor!(async move { $code });
}
};
}
immutable_file_test!(panic: fail_on_already_existent, path, {
let fname = path.join("testfile");
DmaFile::create(&fname).await.unwrap();
ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
});
immutable_file_test!(panic: fail_reader_on_non_existent, path, {
let fname = path.join("testfile");
DmaFile::create(&fname).await.unwrap();
ImmutableFileBuilder::new(fname)
.build_existing()
.await
.unwrap_err();
});
immutable_file_test!(seal_and_stream, path, {
let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
immutable.write(&[0, 1, 2, 3, 4, 5]).await.unwrap();
let stream = immutable.seal().await.unwrap();
let mut reader = stream.stream_reader().build();
let mut buf = [0u8; 128];
let x = reader.read(&mut buf).await.unwrap();
assert_eq!(x, 6);
reader.close().await.unwrap();
stream.close().await.unwrap();
});
immutable_file_test!(stream_pos, path, {
let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
assert_eq!(immutable.current_pos(), 0);
assert_eq!(immutable.current_flushed_pos(), 0);
immutable.write(&[0, 1, 2, 3, 4, 5]).await.unwrap();
assert_eq!(immutable.current_pos(), 6);
assert_eq!(immutable.current_flushed_pos(), 0);
immutable.write(&[6, 7, 8, 9]).await.unwrap();
let stream = immutable.seal().await.unwrap();
let mut reader = stream.stream_reader().build();
let mut buf = [0u8; 128];
let x = reader.read(&mut buf).await.unwrap();
assert_eq!(x, 10);
reader.close().await.unwrap();
stream.close().await.unwrap();
});
immutable_file_test!(seal_and_random, path, {
let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
immutable.write(&[0, 1, 2, 3, 4, 5]).await.unwrap();
let stream = immutable.seal().await.unwrap();
let task1 = crate::spawn_local(enclose! { (stream) async move {
let buf = stream.read_at(0, 6).await.unwrap();
assert_eq!(&*buf, &[0, 1, 2, 3, 4, 5]);
}});
let task2 = crate::spawn_local(enclose! { (stream) async move {
let buf = stream.read_at(0, 6).await.unwrap();
assert_eq!(&*buf, &[0, 1, 2, 3, 4, 5]);
}});
assert_eq!(
2,
stream::iter(vec![task1, task2]).then(|x| x).count().await
);
stream.close().await.unwrap();
});
immutable_file_test!(seal_ready_many, path, {
let fname = path.join("testfile");
let mut immutable = ImmutableFileBuilder::new(fname).build_sink().await.unwrap();
immutable.write(&[0, 1, 2, 3, 4, 5]).await.unwrap();
let stream = immutable.seal().await.unwrap();
{
let iovs = vec![(0, 1), (3, 1)];
let mut bufs = stream.read_many(
stream::iter(iovs.into_iter()),
MergedBufferLimit::NoMerging,
ReadAmplificationLimit::NoAmplification,
);
let next_buffer = bufs.next().await.unwrap();
assert_eq!(next_buffer.unwrap().1.len(), 1);
let next_buffer = bufs.next().await.unwrap();
assert_eq!(next_buffer.unwrap().1.len(), 1);
}
stream.close().await.unwrap();
});
}