Skip to main content

lance_io/
traits.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::Range;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use deepsize::DeepSizeOf;
9use futures::{StreamExt, future::BoxFuture, stream::BoxStream};
10use object_store::path::Path;
11use prost::Message;
12use tokio::io::{AsyncWrite, AsyncWriteExt};
13
14use lance_core::Result;
15
16use crate::object_writer::WriteResult;
17
18pub trait ProtoStruct {
19    type Proto: Message;
20}
21
22pub type ByteStream = BoxStream<'static, object_store::Result<Bytes>>;
23
24/// A trait for writing to a file on local file system or object store.
25#[async_trait]
26pub trait Writer: AsyncWrite + Unpin + Send {
27    /// Tell the current offset.
28    async fn tell(&mut self) -> Result<usize>;
29
30    /// Flush all buffered data and finalize the write, returning metadata about
31    /// the written object.
32    async fn shutdown(&mut self) -> Result<WriteResult>;
33}
34
35#[async_trait]
36impl Writer for Box<dyn Writer> {
37    async fn tell(&mut self) -> Result<usize> {
38        self.as_mut().tell().await
39    }
40
41    async fn shutdown(&mut self) -> Result<WriteResult> {
42        self.as_mut().shutdown().await
43    }
44}
45
46/// Lance Write Extension.
47#[async_trait]
48pub trait WriteExt {
49    /// Write a Protobuf message to the [Writer], and returns the file position
50    /// where the protobuf is written.
51    async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize>;
52
53    async fn write_struct<
54        'b,
55        M: Message + From<&'b T>,
56        T: ProtoStruct<Proto = M> + Send + Sync + 'b,
57    >(
58        &mut self,
59        obj: &'b T,
60    ) -> Result<usize> {
61        let msg: M = M::from(obj);
62        self.write_protobuf(&msg).await
63    }
64    /// Write magics to the tail of a file before closing the file.
65    async fn write_magics(
66        &mut self,
67        pos: usize,
68        major_version: i16,
69        minor_version: i16,
70        magic: &[u8],
71    ) -> Result<()>;
72
73    async fn copy_from_reader(&mut self, reader: &dyn Reader) -> Result<usize>;
74
75    async fn copy_range_from_reader(
76        &mut self,
77        reader: &dyn Reader,
78        range: Range<usize>,
79    ) -> Result<usize>;
80}
81
82#[async_trait]
83impl<W: Writer + ?Sized> WriteExt for W {
84    async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize> {
85        let offset = self.tell().await?;
86
87        let len = msg.encoded_len();
88
89        self.write_u32_le(len as u32).await?;
90        self.write_all(&msg.encode_to_vec()).await?;
91
92        Ok(offset)
93    }
94
95    async fn write_magics(
96        &mut self,
97        pos: usize,
98        major_version: i16,
99        minor_version: i16,
100        magic: &[u8],
101    ) -> Result<()> {
102        self.write_i64_le(pos as i64).await?;
103        self.write_i16_le(major_version).await?;
104        self.write_i16_le(minor_version).await?;
105        self.write_all(magic).await?;
106        Ok(())
107    }
108
109    async fn copy_from_reader(&mut self, reader: &dyn Reader) -> Result<usize> {
110        let mut stream = reader.get_stream().await?;
111        let mut copied = 0usize;
112        while let Some(chunk) = stream.next().await {
113            let bytes = chunk?;
114            copied += bytes.len();
115            self.write_all(&bytes).await?;
116        }
117        Ok(copied)
118    }
119
120    async fn copy_range_from_reader(
121        &mut self,
122        reader: &dyn Reader,
123        range: Range<usize>,
124    ) -> Result<usize> {
125        let mut stream = reader.get_range_stream(range).await?;
126        let mut copied = 0usize;
127        while let Some(chunk) = stream.next().await {
128            let bytes = chunk?;
129            copied += bytes.len();
130            self.write_all(&bytes).await?;
131        }
132        Ok(copied)
133    }
134}
135
136pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
137    fn path(&self) -> &Path;
138
139    /// Suggest optimal I/O size per storage device.
140    fn block_size(&self) -> usize;
141
142    /// Suggest optimal I/O parallelism per storage device.
143    fn io_parallelism(&self) -> usize;
144
145    /// Object/File Size.
146    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>>;
147
148    /// Read a range of bytes from the object.
149    ///
150    /// TODO: change to read_at()?
151    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>>;
152
153    /// Read all bytes from the object.
154    ///
155    /// By default this reads the size in a separate IOP but some implementations
156    /// may not need the size beforehand.
157    fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>>;
158
159    /// Read the entire object as a byte stream.
160    fn get_stream(&self) -> BoxFuture<'_, object_store::Result<ByteStream>> {
161        Box::pin(async move {
162            let bytes = self.get_all().await?;
163            Ok(futures::stream::once(async move { Ok(bytes) }).boxed())
164        })
165    }
166
167    /// Read a byte range as a byte stream.
168    fn get_range_stream(
169        &self,
170        range: Range<usize>,
171    ) -> BoxFuture<'_, object_store::Result<ByteStream>> {
172        Box::pin(async move {
173            let bytes = self.get_range(range).await?;
174            Ok(futures::stream::once(async move { Ok(bytes) }).boxed())
175        })
176    }
177}