1use std::ops::Range;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use deepsize::DeepSizeOf;
9use futures::future::BoxFuture;
10use object_store::path::Path;
11use prost::Message;
12use tokio::io::{AsyncWrite, AsyncWriteExt};
13
14use lance_core::Result;
15
16use crate::object_writer::WriteResult;
17
18pub trait ProtoStruct {
19 type Proto: Message;
20}
21
22#[async_trait]
24pub trait Writer: AsyncWrite + Unpin + Send {
25 async fn tell(&mut self) -> Result<usize>;
27
28 async fn shutdown(&mut self) -> Result<WriteResult>;
31}
32
33#[async_trait]
34impl Writer for Box<dyn Writer> {
35 async fn tell(&mut self) -> Result<usize> {
36 self.as_mut().tell().await
37 }
38
39 async fn shutdown(&mut self) -> Result<WriteResult> {
40 self.as_mut().shutdown().await
41 }
42}
43
44#[async_trait]
46pub trait WriteExt {
47 async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize>;
50
51 async fn write_struct<
52 'b,
53 M: Message + From<&'b T>,
54 T: ProtoStruct<Proto = M> + Send + Sync + 'b,
55 >(
56 &mut self,
57 obj: &'b T,
58 ) -> Result<usize> {
59 let msg: M = M::from(obj);
60 self.write_protobuf(&msg).await
61 }
62 async fn write_magics(
64 &mut self,
65 pos: usize,
66 major_version: i16,
67 minor_version: i16,
68 magic: &[u8],
69 ) -> Result<()>;
70}
71
72#[async_trait]
73impl<W: Writer + ?Sized> WriteExt for W {
74 async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize> {
75 let offset = self.tell().await?;
76
77 let len = msg.encoded_len();
78
79 self.write_u32_le(len as u32).await?;
80 self.write_all(&msg.encode_to_vec()).await?;
81
82 Ok(offset)
83 }
84
85 async fn write_magics(
86 &mut self,
87 pos: usize,
88 major_version: i16,
89 minor_version: i16,
90 magic: &[u8],
91 ) -> Result<()> {
92 self.write_i64_le(pos as i64).await?;
93 self.write_i16_le(major_version).await?;
94 self.write_i16_le(minor_version).await?;
95 self.write_all(magic).await?;
96 Ok(())
97 }
98}
99
100pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
101 fn path(&self) -> &Path;
102
103 fn block_size(&self) -> usize;
105
106 fn io_parallelism(&self) -> usize;
108
109 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>>;
111
112 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>>;
116
117 fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>>;
122}