1use std::ops::Range;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use deepsize::DeepSizeOf;
9use futures::{StreamExt, future::BoxFuture, stream::BoxStream};
10use object_store::path::Path;
11use prost::Message;
12use tokio::io::{AsyncWrite, AsyncWriteExt};
13
14use lance_core::Result;
15
16use crate::object_writer::WriteResult;
17
18pub trait ProtoStruct {
19 type Proto: Message;
20}
21
22pub type ByteStream = BoxStream<'static, object_store::Result<Bytes>>;
23
24#[async_trait]
26pub trait Writer: AsyncWrite + Unpin + Send {
27 async fn tell(&mut self) -> Result<usize>;
29
30 async fn shutdown(&mut self) -> Result<WriteResult>;
33}
34
35#[async_trait]
36impl Writer for Box<dyn Writer> {
37 async fn tell(&mut self) -> Result<usize> {
38 self.as_mut().tell().await
39 }
40
41 async fn shutdown(&mut self) -> Result<WriteResult> {
42 self.as_mut().shutdown().await
43 }
44}
45
46#[async_trait]
48pub trait WriteExt {
49 async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize>;
52
53 async fn write_struct<
54 'b,
55 M: Message + From<&'b T>,
56 T: ProtoStruct<Proto = M> + Send + Sync + 'b,
57 >(
58 &mut self,
59 obj: &'b T,
60 ) -> Result<usize> {
61 let msg: M = M::from(obj);
62 self.write_protobuf(&msg).await
63 }
64 async fn write_magics(
66 &mut self,
67 pos: usize,
68 major_version: i16,
69 minor_version: i16,
70 magic: &[u8],
71 ) -> Result<()>;
72
73 async fn copy_from_reader(&mut self, reader: &dyn Reader) -> Result<usize>;
74
75 async fn copy_range_from_reader(
76 &mut self,
77 reader: &dyn Reader,
78 range: Range<usize>,
79 ) -> Result<usize>;
80}
81
82#[async_trait]
83impl<W: Writer + ?Sized> WriteExt for W {
84 async fn write_protobuf(&mut self, msg: &impl Message) -> Result<usize> {
85 let offset = self.tell().await?;
86
87 let len = msg.encoded_len();
88
89 self.write_u32_le(len as u32).await?;
90 self.write_all(&msg.encode_to_vec()).await?;
91
92 Ok(offset)
93 }
94
95 async fn write_magics(
96 &mut self,
97 pos: usize,
98 major_version: i16,
99 minor_version: i16,
100 magic: &[u8],
101 ) -> Result<()> {
102 self.write_i64_le(pos as i64).await?;
103 self.write_i16_le(major_version).await?;
104 self.write_i16_le(minor_version).await?;
105 self.write_all(magic).await?;
106 Ok(())
107 }
108
109 async fn copy_from_reader(&mut self, reader: &dyn Reader) -> Result<usize> {
110 let mut stream = reader.get_stream().await?;
111 let mut copied = 0usize;
112 while let Some(chunk) = stream.next().await {
113 let bytes = chunk?;
114 copied += bytes.len();
115 self.write_all(&bytes).await?;
116 }
117 Ok(copied)
118 }
119
120 async fn copy_range_from_reader(
121 &mut self,
122 reader: &dyn Reader,
123 range: Range<usize>,
124 ) -> Result<usize> {
125 let mut stream = reader.get_range_stream(range).await?;
126 let mut copied = 0usize;
127 while let Some(chunk) = stream.next().await {
128 let bytes = chunk?;
129 copied += bytes.len();
130 self.write_all(&bytes).await?;
131 }
132 Ok(copied)
133 }
134}
135
136pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
137 fn path(&self) -> &Path;
138
139 fn block_size(&self) -> usize;
141
142 fn io_parallelism(&self) -> usize;
144
145 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>>;
147
148 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, object_store::Result<Bytes>>;
152
153 fn get_all(&self) -> BoxFuture<'_, object_store::Result<Bytes>>;
158
159 fn get_stream(&self) -> BoxFuture<'_, object_store::Result<ByteStream>> {
161 Box::pin(async move {
162 let bytes = self.get_all().await?;
163 Ok(futures::stream::once(async move { Ok(bytes) }).boxed())
164 })
165 }
166
167 fn get_range_stream(
169 &self,
170 range: Range<usize>,
171 ) -> BoxFuture<'_, object_store::Result<ByteStream>> {
172 Box::pin(async move {
173 let bytes = self.get_range(range).await?;
174 Ok(futures::stream::once(async move { Ok(bytes) }).boxed())
175 })
176 }
177}