use std::{
fmt::{self, Display, Formatter},
ops::Range,
};
use futures::prelude::*;
use opendal::{ErrorKind, Operator};
use crate::{Error, LinkId, Result};
#[derive(Clone, Copy, Debug)]
pub enum Kind {
Delta,
Snapshot,
}
#[derive(Clone)]
pub struct Storage {
base: Option<String>,
operator: Operator,
}
pub struct Reader {
offset: usize,
file_size: usize,
reader: opendal::Reader,
}
pub struct Writer {
writer: opendal::Writer,
file_size: usize,
}
pub(crate) const VERSION: u16 = 0;
impl Storage {
pub fn new(operator: Operator) -> Self {
Self {
base: None,
operator,
}
}
pub fn new_in(base: impl Into<String>, operator: Operator) -> Self {
Self {
base: Some(base.into()),
operator,
}
}
pub(crate) async fn open(&self, id: LinkId, kind: Kind) -> Result<Reader> {
self.open_maybe(id, kind)
.await?
.ok_or_else(|| Error::DoesNotExist { link: id, kind })
}
pub(crate) async fn open_maybe(&self, id: LinkId, kind: Kind) -> Result<Option<Reader>> {
let path = self.path(id, kind);
let metadata = match self.operator.stat(&path).await {
Ok(metadata) => metadata,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(None),
Err(error) => return Err(error.into()),
};
let file_size = metadata.content_length() as usize;
let reader = self.operator.reader(&path).await?;
Ok(Some(Reader {
offset: 0,
file_size,
reader,
}))
}
pub(crate) async fn create(&self, id: LinkId, kind: Kind) -> Result<Writer> {
let path = self.path(id, kind);
let writer = self.operator.writer(&path).await?;
Ok(Writer {
writer,
file_size: 0,
})
}
#[inline]
fn path(&self, id: LinkId, kind: Kind) -> String {
if let Some(base) = &self.base {
format!("{base}/{id}.{kind}")
} else {
format!("{id}.{kind}")
}
}
}
impl Reader {
#[inline]
pub(crate) fn file_size(&self) -> usize {
self.file_size
}
#[inline]
pub async fn read_u16(&mut self) -> Result<u16> {
let bytes = self.read_bytes().await?;
Ok(u16::from_be_bytes(bytes))
}
#[inline]
pub async fn read_u32(&mut self) -> Result<u32> {
let bytes = self.read_bytes().await?;
Ok(u32::from_be_bytes(bytes))
}
#[inline]
pub async fn read_u64(&mut self) -> Result<u64> {
let bytes = self.read_bytes().await?;
Ok(u64::from_be_bytes(bytes))
}
#[inline]
pub async fn read_u128(&mut self) -> Result<u128> {
let bytes = self.read_bytes().await?;
Ok(u128::from_be_bytes(bytes))
}
pub async fn read_bytes<const N: usize>(&mut self) -> Result<[u8; N]> {
let mut bytes = [0u8; N];
let range = self.range(N)?;
self.reader
.read_into(&mut bytes.as_mut_slice(), range)
.await?;
Ok(bytes)
}
#[inline]
pub(crate) fn range(&self, len: usize) -> Result<Range<u64>> {
if self.offset + len > self.file_size {
return Err(Error::FileSize {
expected: self.offset + len,
got: self.file_size,
});
}
Ok((self.offset as u64)..(self.offset + len) as u64)
}
pub(crate) fn goto(&mut self, offset: isize) -> Result<()> {
if offset.is_positive() {
self.offset = offset as usize;
} else {
let Some(offset) = self.file_size.checked_add_signed(offset) else {
todo!()
};
self.offset = offset;
}
Ok(())
}
pub(crate) fn set_file_size(&mut self, file_size: usize) {
self.file_size = file_size;
}
}
impl Writer {
#[inline]
pub(crate) fn file_size(&self) -> usize {
self.file_size
}
pub(crate) async fn copy_from(&mut self, reader: Reader) -> Result<()> {
let range = (reader.offset as u64)..(reader.file_size as u64);
let mut stream = reader.reader.into_stream(range).await?;
while let Some(buffer) = stream.try_next().await? {
let num_bytes = buffer.len();
self.writer.write(buffer).await?;
self.file_size += num_bytes;
}
Ok(())
}
#[inline]
pub async fn write_u16(&mut self, value: u16) -> Result<()> {
let bytes = value.to_be_bytes();
self.write_bytes(bytes).await
}
#[inline]
pub async fn write_u32(&mut self, value: u32) -> Result<()> {
let bytes = value.to_be_bytes();
self.write_bytes(bytes).await
}
#[inline]
pub async fn write_u64(&mut self, value: u64) -> Result<()> {
let bytes = value.to_be_bytes();
self.write_bytes(bytes).await
}
#[inline]
pub async fn write_u128(&mut self, value: u128) -> Result<()> {
let bytes = value.to_be_bytes();
self.write_bytes(bytes).await
}
pub async fn write_bytes<const N: usize>(&mut self, bytes: [u8; N]) -> Result<()> {
self.writer.write_from(bytes.as_slice()).await?;
self.file_size += N;
Ok(())
}
#[inline]
pub(crate) async fn finish(mut self) -> Result<()> {
self.writer.close().await?;
Ok(())
}
}
impl Display for Kind {
#[inline]
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Self::Delta => write!(f, "delta"),
Self::Snapshot => write!(f, "snapshot"),
}
}
}