1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use async_trait::async_trait;
use bytes::Bytes;
use object_store::path::Path;
use prost::Message;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use lance_core::Result;
pub trait ProtoStruct {
type Proto: Message;
}
/// A trait for writing to a file on local file system or object store.
#[async_trait]
pub trait Writer: AsyncWrite + Unpin + Send {
/// Tell the current offset.
async fn tell(&mut self) -> Result<usize>;
}
/// Lance Write Extension.
#[async_trait]
pub trait WriteExt {
/// Write a Protobuf message to the [Writer], and returns the file position
/// where the protobuf is written.
async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize>;
async fn write_struct<
'b,
M: Message + From<&'b T>,
T: ProtoStruct<Proto = M> + Send + Sync + 'b,
>(
&mut self,
obj: &'b T,
) -> Result<usize> {
let msg: M = M::from(obj);
self.write_protobuf(&msg).await
}
/// Write magics to the tail of a file before closing the file.
async fn write_magics(
&mut self,
pos: usize,
major_version: i16,
minor_version: i16,
magic: &[u8],
) -> Result<()>;
}
#[async_trait]
impl<W: Writer + ?Sized> WriteExt for W {
async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize> {
let offset = self.tell().await?;
let len = msg.encoded_len();
self.write_u32_le(len as u32).await?;
self.write_all(&msg.encode_to_vec()).await?;
Ok(offset)
}
async fn write_magics(
&mut self,
pos: usize,
major_version: i16,
minor_version: i16,
magic: &[u8],
) -> Result<()> {
self.write_i64_le(pos as i64).await?;
self.write_i16_le(major_version).await?;
self.write_i16_le(minor_version).await?;
self.write_all(magic).await?;
Ok(())
}
}
#[async_trait]
pub trait Reader: Send + Sync {
fn path(&self) -> &Path;
/// Suggest optimal I/O size per storage device.
fn block_size(&self) -> usize;
/// Object/File Size.
async fn size(&self) -> Result<usize>;
/// Read a range of bytes from the object.
///
/// TODO: change to read_at()?
async fn get_range(&self, range: Range<usize>) -> Result<Bytes>;
}