use anyhow::Result;
use futures::{Stream, StreamExt, TryStreamExt};
use opendal::Buffer;
use std::mem;
use std::pin::pin;
use crate::Fs;
const DEFAULT_CHUNK_SIZE: usize = 8 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct File {
fs: Fs,
pub(crate) path: String,
pub(crate) chunks: Vec<String>,
}
impl File {
pub(crate) fn new(fs: Fs, path: String) -> Self {
Self::with_chunks(fs, path, Vec::new())
}
pub(crate) fn with_chunks(fs: Fs, path: String, chunks: Vec<String>) -> Self {
Self { fs, path, chunks }
}
pub(crate) fn into_parts(self) -> (String, Vec<String>) {
(self.path, self.chunks)
}
pub fn path(&self) -> &str {
&self.path
}
pub async fn write(&mut self, bs: Buffer) -> Result<()> {
let chunk_id = self.fs.write_chunk(bs).await?;
self.chunks.push(chunk_id);
Ok(())
}
pub async fn sink(&mut self, mut stream: impl Stream<Item = Result<Buffer>>) -> Result<()> {
let mut stream = pin!(stream);
let mut chunks = Vec::new();
let mut size = 0;
while let Some(bs) = stream.next().await {
let bs = bs?;
if size + bs.len() < DEFAULT_CHUNK_SIZE {
size += bs.len();
chunks.push(bs);
continue;
}
let consume_size = DEFAULT_CHUNK_SIZE - size;
chunks.push(bs.slice(0..consume_size));
self.write(Buffer::from_iter(
mem::take(&mut chunks).into_iter().flatten(),
))
.await?;
if consume_size < bs.len() {
chunks.push(bs.slice(consume_size..));
size += bs.len() - consume_size;
} else {
size = 0
}
}
if size > 0 {
self.write(Buffer::from_iter(
mem::take(&mut chunks).into_iter().flatten(),
))
.await?;
}
Ok(())
}
pub async fn commit(&mut self) -> Result<()> {
self.fs.commit_file(&self.path, self.chunks.clone()).await?;
Ok(())
}
pub async fn read(&self) -> Result<Buffer> {
let buffers: Vec<_> = self.stream().await?.try_collect().await?;
Ok(Buffer::from_iter(buffers.into_iter().flatten()))
}
pub async fn stream(&self) -> Result<impl Stream<Item = Result<Buffer>>> {
let fs = self.fs.clone();
let stream = futures::stream::iter(self.chunks.clone()).then(move |v| {
let fs = fs.clone();
async move { fs.read_chunk(&v).await }
});
Ok(stream)
}
}