use std::{
cmp::Ordering,
io::{Error, ErrorKind, SeekFrom},
pin::Pin,
task::{Context, Poll},
};
use aliasable::boxed::AliasableBox;
use async_stream::try_stream;
use bytes::Bytes;
use futures::{ready, stream::BoxStream, Future, StreamExt};
use libipld::Cid;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use crate::{
IpldStore, Layout, LayoutError, LayoutSeekable, MerkleNode, SeekableReader, StoreError,
StoreResult,
};
#[derive(Clone, Debug, PartialEq, Default)]
pub struct FlatLayout {}
pub struct FlatLayoutReader<S>
where
S: IpldStore,
{
byte_cursor: u64,
chunk_index: u64,
chunk_distance: u64,
get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>>,
store: AliasableBox<S>,
node: AliasableBox<MerkleNode>,
}
impl FlatLayout {
pub fn new() -> Self {
FlatLayout {}
}
}
impl<S> FlatLayoutReader<S>
where
S: IpldStore + Sync,
{
fn new(node: MerkleNode, store: S) -> StoreResult<Self> {
let node = AliasableBox::from_unique(Box::new(node));
let store = AliasableBox::from_unique(Box::new(store));
let get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync>> =
Box::pin(
store.get_raw_block(
node.children
.first()
.map(|(cid, _)| cid)
.ok_or(StoreError::from(LayoutError::NoLeafBlock))?,
),
);
let get_raw_block_fn: Pin<
Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>,
> = unsafe { std::mem::transmute(get_raw_block_fn) };
Ok(FlatLayoutReader {
byte_cursor: 0,
chunk_index: 0,
chunk_distance: 0,
get_raw_block_fn,
node,
store,
})
}
fn fix_future(&mut self) {
let get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync>> =
Box::pin(async {
let bytes = self
.store
.get_raw_block(
self.node
.children
.get(self.chunk_index as usize)
.map(|(cid, _)| cid)
.ok_or(StoreError::from(LayoutError::NoLeafBlock))?,
)
.await?;
let bytes = Bytes::copy_from_slice(
&bytes[(self.byte_cursor - self.chunk_distance) as usize..],
);
Ok(bytes)
});
let get_raw_block_fn: Pin<
Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>,
> = unsafe { std::mem::transmute(get_raw_block_fn) };
self.get_raw_block_fn = get_raw_block_fn;
}
fn read_update(&mut self, left_over: &[u8], consumed: u64) -> StoreResult<()> {
self.byte_cursor += consumed;
if !left_over.is_empty() {
let bytes = Bytes::copy_from_slice(left_over);
let get_raw_block_fn = Box::pin(async { Ok(bytes) });
self.get_raw_block_fn = get_raw_block_fn;
return Ok(());
}
if self.byte_cursor >= self.node.size as u64 {
let get_raw_block_fn = Box::pin(async { Ok(Bytes::new()) });
self.get_raw_block_fn = get_raw_block_fn;
return Ok(());
}
self.chunk_distance += self.node.children[self.chunk_index as usize].1 as u64;
self.chunk_index += 1;
self.fix_future();
Ok(())
}
fn seek_update(&mut self, byte_cursor: u64) -> StoreResult<()> {
self.byte_cursor = byte_cursor;
if self.byte_cursor >= self.node.size as u64 {
let get_raw_block_fn = Box::pin(async { Ok(Bytes::new()) });
self.get_raw_block_fn = get_raw_block_fn;
return Ok(());
}
loop {
match self.chunk_distance.cmp(&byte_cursor) {
Ordering::Less => {
if self.chunk_distance + self.node.children[self.chunk_index as usize].1 as u64
> byte_cursor
{
break;
}
self.chunk_distance += self.node.children[self.chunk_index as usize].1 as u64;
self.chunk_index += 1;
continue;
}
Ordering::Greater => {
self.chunk_distance -= self.node.children[self.chunk_index as usize].1 as u64;
self.chunk_index -= 1;
continue;
}
_ => break,
}
}
self.fix_future();
Ok(())
}
}
impl Layout for FlatLayout {
async fn organize<'a>(
&self,
mut stream: BoxStream<'a, StoreResult<Bytes>>,
store: impl IpldStore + Send + 'a,
) -> StoreResult<BoxStream<'a, StoreResult<Cid>>> {
let s = try_stream! {
let mut children = Vec::new();
while let Some(Ok(chunk)) = stream.next().await {
let len = chunk.len();
let cid = store.put_raw_block(chunk).await?;
children.push((cid, len));
yield cid;
}
let node = MerkleNode::new(children);
let cid = store.put_node(&node).await?;
yield cid;
};
Ok(Box::pin(s))
}
async fn retrieve<'a>(
&self,
cid: &Cid,
store: impl IpldStore + Send + Sync + 'a,
) -> StoreResult<Pin<Box<dyn AsyncRead + Send + Sync + 'a>>> {
let node = store.get_node(cid).await?;
let reader = FlatLayoutReader::new(node, store)?;
Ok(Box::pin(reader))
}
}
impl LayoutSeekable for FlatLayout {
async fn retrieve_seekable<'a>(
&self,
cid: &'a Cid,
store: impl IpldStore + Send + Sync + 'a,
) -> StoreResult<Pin<Box<dyn SeekableReader + Send + 'a>>> {
let node = store.get_node(cid).await?;
let reader = FlatLayoutReader::new(node, store)?;
Ok(Box::pin(reader))
}
}
impl<S> AsyncRead for FlatLayoutReader<S>
where
S: IpldStore + Sync,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let bytes = ready!(self.get_raw_block_fn.as_mut().poll(cx))
.map_err(|e| Error::new(ErrorKind::Other, e))?;
let (taken, left_over) = if bytes.len() > buf.remaining() {
bytes.split_at(buf.remaining())
} else {
(&bytes[..], &[][..])
};
buf.put_slice(taken);
self.read_update(left_over, taken.len() as u64)
.map_err(|e| Error::new(ErrorKind::Other, e))?;
Poll::Ready(Ok(()))
}
}
impl<S> AsyncSeek for FlatLayoutReader<S>
where
S: IpldStore + Sync,
{
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
let byte_cursor = match position {
SeekFrom::Start(offset) => {
if offset >= self.node.size as u64 {
return Err(Error::new(
ErrorKind::InvalidInput,
"Seek from start position out of bounds",
));
}
offset
}
SeekFrom::Current(offset) => {
let new_cursor = self.byte_cursor as i64 + offset;
if new_cursor < 0 || new_cursor >= self.node.size as i64 {
return Err(Error::new(
ErrorKind::InvalidInput,
"Seek from current position out of bounds",
));
}
new_cursor as u64
}
SeekFrom::End(offset) => {
let new_cursor = self.node.size as i64 + offset;
if new_cursor < 0 || new_cursor >= self.node.size as i64 {
return Err(Error::new(
ErrorKind::InvalidInput,
"Seek from end position out of bounds",
));
}
new_cursor as u64
}
};
self.seek_update(byte_cursor)
.map_err(|e| Error::new(ErrorKind::Other, e))?;
Ok(())
}
fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
Poll::Ready(Ok(self.byte_cursor))
}
}
#[cfg(test)]
mod tests {
use futures::TryStreamExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use crate::MemoryStore;
use super::*;
#[tokio::test]
async fn test_flat_dag_layout_organize_and_retrieve() -> anyhow::Result<()> {
let store = MemoryStore::default();
let (data, _, chunk_stream) = fixtures::data_and_chunk_stream();
let layout = FlatLayout::default();
let cid_stream = layout.organize(chunk_stream, store.clone()).await?;
let cids = cid_stream.try_collect::<Vec<_>>().await?;
let cid = cids.last().unwrap();
let mut reader = layout.retrieve(cid, store.clone()).await?;
let mut bytes = Vec::new();
reader.read_to_end(&mut bytes).await?;
assert_eq!(bytes, data);
let mut reader = layout.retrieve(cid, store).await?;
let mut bytes: Vec<u8> = vec![];
loop {
let mut buf = vec![0; 5];
let filled = reader.read(&mut buf).await?;
if filled == 0 {
break;
}
bytes.extend(&buf[..filled]);
}
assert_eq!(bytes, data);
Ok(())
}
#[tokio::test]
async fn test_flat_dag_layout_seek() -> anyhow::Result<()> {
let store = MemoryStore::default();
let (_, chunks, chunk_stream) = fixtures::data_and_chunk_stream();
let layout = FlatLayout::default();
let cid_stream = layout.organize(chunk_stream, store.clone()).await?;
let cids = cid_stream.try_collect::<Vec<_>>().await?;
let cid = cids.last().unwrap();
let mut reader = layout.retrieve_seekable(cid, store).await?;
let mut buf = vec![0; 5];
reader.read(&mut buf).await?;
assert_eq!(buf, chunks[0]);
let mut buf = vec![0; 5];
reader.seek(SeekFrom::Current(5)).await?;
reader.read(&mut buf).await?;
assert_eq!(buf, chunks[2]);
let mut buf = vec![0; 3];
reader.seek(SeekFrom::Current(3)).await?;
reader.read(&mut buf).await?;
assert_eq!(buf, chunks[3][3..]);
let mut buf = vec![0; 5];
reader.seek(SeekFrom::End(-5)).await?;
reader.read(&mut buf).await?;
assert_eq!(buf, chunks[9]);
let mut buf = vec![0; 5];
reader.seek(SeekFrom::Start(5)).await?;
reader.read(&mut buf).await?;
assert_eq!(buf, chunks[1]);
let result = reader.seek(SeekFrom::End(5)).await;
assert!(result.is_err());
let result = reader.seek(SeekFrom::End(0)).await;
assert!(result.is_err());
let result = reader.seek(SeekFrom::Start(100)).await;
assert!(result.is_err());
let result = reader.seek(SeekFrom::Current(100)).await;
assert!(result.is_err());
let result = reader.seek(SeekFrom::Current(-100)).await;
assert!(result.is_err());
Ok(())
}
}
#[cfg(test)]
mod fixtures {
use futures::{stream, Stream};
use super::*;
pub(super) fn data_and_chunk_stream() -> (
[u8; 56],
Vec<Bytes>,
Pin<Box<dyn Stream<Item = StoreResult<Bytes>> + Send + 'static>>,
) {
let data = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit.".to_owned();
let chunks = vec![
Bytes::from("Lorem"),
Bytes::from(" ipsu"),
Bytes::from("m dol"),
Bytes::from("or sit"),
Bytes::from(" amet,"),
Bytes::from(" conse"),
Bytes::from("ctetur"),
Bytes::from(" adipi"),
Bytes::from("scing "),
Bytes::from("elit."),
];
let chunks_result = chunks
.iter()
.cloned()
.map(|b| crate::Ok(b))
.collect::<Vec<_>>();
let chunk_stream = Box::pin(stream::iter(chunks_result));
(data, chunks, chunk_stream)
}
}