use std::io::{Error, ErrorKind, Result};
use async_trait::async_trait;
use byteorder::{ByteOrder, LittleEndian};
use prost::bytes::Bytes;
use prost::Message;
use tokio::io::{AsyncWrite, AsyncWriteExt};
pub mod commit;
pub(crate) mod deletion;
pub(crate) mod exec;
pub mod object_reader;
pub mod object_store;
pub mod object_writer;
pub(crate) mod reader;
mod writer;
use crate::format::{ProtoStruct, INDEX_MAGIC, MAGIC};
pub use self::object_store::ObjectStore;
pub use deletion::deletion_file_path;
pub use lance_core::io::local;
pub use lance_core::io::RecordBatchStream;
pub use reader::{read_manifest, FileReader};
pub use writer::*;
#[async_trait]
pub trait AsyncWriteProtoExt {
async fn write_footer(&mut self, offset: u64) -> Result<()>;
}
#[async_trait]
impl<T: AsyncWrite + Unpin + std::marker::Send> AsyncWriteProtoExt for T {
async fn write_footer(&mut self, offset: u64) -> Result<()> {
self.write_u64_le(offset).await?;
self.write_all(INDEX_MAGIC).await?;
Ok(())
}
}
pub fn read_metadata_offset(bytes: &Bytes) -> Result<usize> {
let len = bytes.len();
if len < 16 {
return Err(Error::new(
ErrorKind::Interrupted,
"does not have sufficient data",
));
}
let offset_bytes = bytes.slice(len - 16..len - 8);
Ok(LittleEndian::read_u64(offset_bytes.as_ref()) as usize)
}
pub fn read_message_from_buf<M: Message + Default>(buf: &Bytes) -> Result<M> {
let msg_len = LittleEndian::read_u32(buf) as usize;
Ok(M::decode(&buf[4..4 + msg_len])?)
}
pub fn read_struct_from_buf<M: Message + Default, T: ProtoStruct<Proto = M> + From<M>>(
buf: &Bytes,
) -> Result<T> {
let msg: M = read_message_from_buf(buf)?;
Ok(T::from(msg))
}