use super::Header;
use crate::{Buf, BufferPool, Error, IoBufs, IoBufsMut};
use commonware_utils::hex;
use std::{
fs::File,
io::IoSlice,
os::{fd::AsRawFd, unix::fs::FileExt},
sync::Arc,
};
use tokio::task;
const IOVEC_BATCH_SIZE: usize = 32;
#[derive(Clone)]
pub struct Blob {
partition: String,
name: Vec<u8>,
file: Arc<File>,
pool: BufferPool,
}
impl Blob {
pub fn new(partition: String, name: &[u8], file: File, pool: BufferPool) -> Self {
Self {
partition,
name: name.into(),
file: Arc::new(file),
pool,
}
}
fn write_single_at(file: &File, offset: u64, buf: &[u8]) -> Result<(), Error> {
file.write_all_at(buf, offset)?;
Ok(())
}
fn write_vectored_at(file: &File, mut offset: u64, mut bufs: IoBufs) -> Result<(), Error> {
while bufs.has_remaining() {
let mut io_slices = [IoSlice::new(&[]); IOVEC_BATCH_SIZE];
let io_slices_len = bufs.chunks_vectored(&mut io_slices);
assert!(
io_slices_len > 0,
"chunks_vectored should produce at least one slice when bufs has remaining"
);
let ret = unsafe {
libc::pwritev(
file.as_raw_fd(),
io_slices.as_ptr().cast::<libc::iovec>(),
io_slices_len as i32,
offset.try_into().map_err(|_| Error::OffsetOverflow)?,
)
};
if ret < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
}
return Err(err.into());
}
let bytes_written = ret as usize;
if bytes_written == 0 {
return Err(Error::WriteFailed);
}
bufs.advance(bytes_written);
offset += bytes_written as u64;
}
Ok(())
}
}
impl crate::Blob for Blob {
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
self.read_at_buf(offset, len, self.pool.alloc(len)).await
}
async fn read_at_buf(
&self,
offset: u64,
len: usize,
bufs: impl Into<IoBufsMut> + Send,
) -> Result<IoBufsMut, Error> {
let mut bufs = bufs.into();
unsafe { bufs.set_len(len) };
let file = self.file.clone();
let pool = self.pool.clone();
let offset = offset
.checked_add(Header::SIZE_U64)
.ok_or(Error::OffsetOverflow)?;
task::spawn_blocking(move || {
if let Some(buf) = bufs.as_single_mut() {
file.read_exact_at(buf.as_mut(), offset)?;
} else {
let mut temp = unsafe { pool.alloc_len(len) };
file.read_exact_at(temp.as_mut(), offset)?;
bufs.copy_from_slice(temp.as_ref());
}
Ok(bufs)
})
.await
.map_err(|_| Error::ReadFailed)?
}
async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
let bufs = bufs.into();
let file = self.file.clone();
let offset = offset
.checked_add(Header::SIZE_U64)
.ok_or(Error::OffsetOverflow)?;
task::spawn_blocking(move || match bufs.try_into_single() {
Ok(buf) => Self::write_single_at(&file, offset, buf.as_ref()),
Err(bufs) => Self::write_vectored_at(&file, offset, bufs),
})
.await
.map_err(|_| Error::WriteFailed)?
}
async fn resize(&self, len: u64) -> Result<(), Error> {
let file = self.file.clone();
let len = len
.checked_add(Header::SIZE_U64)
.ok_or(Error::OffsetOverflow)?;
task::spawn_blocking(move || file.set_len(len))
.await
.map_err(|e| e.into())
.and_then(|r| r)
.map_err(|e| Error::BlobResizeFailed(self.partition.clone(), hex(&self.name), e))?;
Ok(())
}
async fn sync(&self) -> Result<(), Error> {
let file = self.file.clone();
task::spawn_blocking(move || file.sync_all())
.await
.map_err(|e| e.into())
.and_then(|r| r)
.map_err(|e| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name), e))?;
Ok(())
}
}