use std::io::SeekFrom;
use bytes::Bytes;
use futures::{TryStream, TryStreamExt};
use interface::Query;
use menmos_client::Meta;
use snafu::prelude::*;
use crate::util;
use crate::{ClientRC, FileMetadata};
use super::error::*;
fn make_file_meta(m: FileMetadata) -> Meta {
Meta {
fields: m.fields,
tags: m.tags,
}
}
#[derive(Clone)]
pub struct MenmosFile {
blob_id: String,
client: ClientRC,
offset: u64,
}
impl MenmosFile {
#[doc(hidden)]
pub async fn create(client: ClientRC, metadata: FileMetadata) -> Result<Self> {
let metadata = make_file_meta(metadata);
let blob_id = client
.create_empty(metadata)
.await
.context(FileCreateSnafu)?;
Ok(Self {
blob_id,
client,
offset: 0,
})
}
pub fn open(client: ClientRC, id: &str) -> Result<Self> {
Ok(Self {
blob_id: String::from(id),
client,
offset: 0,
})
}
pub fn id(&self) -> &str {
&self.blob_id
}
pub async fn write(&mut self, buf: &[u8]) -> Result<usize> {
let buf = Bytes::copy_from_slice(buf);
let buf_len = buf.len();
self.client
.write(&self.blob_id, self.offset, buf)
.await
.context(FileWriteSnafu)?;
self.offset += buf_len as u64;
Ok(buf_len)
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
match pos {
SeekFrom::Current(offset) => {
let new_offset = self.offset as i64 + offset;
ensure!(new_offset >= 0, NegativeOffsetSnafu);
self.offset = new_offset as u64;
}
SeekFrom::Start(new_offset) => {
self.offset = new_offset;
}
SeekFrom::End(relative) => {
let metadata = util::get_meta(&self.client, &self.blob_id)
.await
.context(SeekMetaSnafu)?;
let end_offset = metadata.size as i64;
let new_offset = end_offset + relative;
ensure!(new_offset >= 0, NegativeOffsetSnafu);
self.offset = new_offset as u64;
}
}
Ok(self.offset)
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let r = self
.client
.read_range(
&self.blob_id,
(self.offset, (self.offset + buf.len() as u64) - 1),
)
.await
.with_context(|_| FileReadSnafu {
blob_id: self.blob_id.clone(),
})?;
buf.copy_from_slice(&r);
self.offset += r.len() as u64;
Ok(r.len())
}
pub async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
let metadata = util::get_meta(&self.client, &self.blob_id)
.await
.context(SeekMetaSnafu)?;
let out = self
.client
.read_range(&self.blob_id, (self.offset, metadata.size))
.await
.with_context(|_| FileReadSnafu {
blob_id: self.blob_id.clone(),
})?;
*buf = out;
self.offset += buf.len() as u64;
Ok(buf.len())
}
pub async fn read_to_string(&mut self, string: &mut String) -> Result<usize> {
let mut v = Vec::new();
self.read_to_end(&mut v).await?;
let buf_read = v.len();
*string = String::from_utf8(v).context(BufferEncodingSnafu)?;
Ok(buf_read)
}
pub fn list(&self) -> impl TryStream<Ok = Self, Error = FsError> + Unpin {
let query = Query::default()
.and_field("parent", &self.blob_id)
.with_from(0)
.with_size(50);
let client = self.client.clone();
Box::pin(
util::scroll_query(query, &client)
.map_err(|source| FsError::DirQueryError { source })
.and_then(move |hit| {
let client = client.clone();
async move {
let entry = MenmosFile::open(client, &hit.id)?;
Ok(entry)
}
}),
)
}
pub async fn is_empty(&self) -> Result<bool> {
let query = Query::default()
.and_field("parent", &self.blob_id)
.with_size(0);
let results = self.client.query(query).await.context(DirListSnafu)?;
Ok(results.total == 0)
}
}