Skip to main content

lance_io/
traits.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::Range;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use deepsize::DeepSizeOf;
9use futures::future::BoxFuture;
10use object_store::path::Path;
11use prost::Message;
12use tokio::io::{AsyncWrite, AsyncWriteExt};
13
14use lance_core::Result;
15
16use crate::object_writer::WriteResult;
17
18pub trait ProtoStruct {
19    type Proto: Message;
20}
21
22/// A trait for writing to a file on local file system or object store.
23#[async_trait]
24pub trait Writer: AsyncWrite + Unpin + Send {
25    /// Tell the current offset.
26    async fn tell(&mut self) -> Result<usize>;
27
28    /// Flush all buffered data and finalize the write, returning metadata about
29    /// the written object.
30    async fn shutdown(&mut self) -> Result<WriteResult>;
31}
32
33#[async_trait]
34impl Writer for Box<dyn Writer> {
35    async fn tell(&mut self) -> Result<usize> {
36        self.as_mut().tell().await
37    }
38
39    async fn shutdown(&mut self) -> Result<WriteResult> {
40        self.as_mut().shutdown().await
41    }
42}
43
44/// Lance Write Extension.
45#[async_trait]
46pub trait WriteExt {
47    /// Write a Protobuf message to the [Writer], and returns the file position
48    /// where the protobuf is written.
49    async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize>;
50
51    async fn write_struct<
52        'b,
53        M: Message + From<&'b T>,
54        T: ProtoStruct<Proto = M> + Send + Sync + 'b,
55    >(
56        &mut self,
57        obj: &'b T,
58    ) -> Result<usize> {
59        let msg: M = M::from(obj);
60        self.write_protobuf(&msg).await
61    }
62    /// Write magics to the tail of a file before closing the file.
63    async fn write_magics(
64        &mut self,
65        pos: usize,
66        major_version: i16,
67        minor_version: i16,
68        magic: &[u8],
69    ) -> Result<()>;
70}
71
72#[async_trait]
73impl<W: Writer + ?Sized> WriteExt for W {
74    async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize> {
75        let offset = self.tell().await?;
76
77        let len = msg.encoded_len();
78
79        self.write_u32_le(len as u32).await?;
80        self.write_all(&msg.encode_to_vec()).await?;
81
82        Ok(offset)
83    }
84
85    async fn write_magics(
86        &mut self,
87        pos: usize,
88        major_version: i16,
89        minor_version: i16,
90        magic: &[u8],
91    ) -> Result<()> {
92        self.write_i64_le(pos as i64).await?;
93        self.write_i16_le(major_version).await?;
94        self.write_i16_le(minor_version).await?;
95        self.write_all(magic).await?;
96        Ok(())
97    }
98}
99
100pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
101    fn path(&self) -> &Path;
102
103    /// Suggest optimal I/O size per storage device.
104    fn block_size(&self) -> usize;
105
106    /// Suggest optimal I/O parallelism per storage device.
107    fn io_parallelism(&self) -> usize;
108
109    /// Object/File Size.
110    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>>;
111
112    /// Read a range of bytes from the object.
113    ///
114    /// TODO: change to read_at()?
115    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>>;
116
117    /// Read all bytes from the object.
118    ///
119    /// By default this reads the size in a separate IOP but some implementations
120    /// may not need the size beforehand.
121    fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>>;
122}