1use std::{cmp::min, num::NonZero, sync::atomic::AtomicU64};
5
6use arrow_array::{
7 ArrayRef,
8 types::{BinaryType, LargeBinaryType, LargeUtf8Type, Utf8Type},
9};
10use arrow_schema::DataType;
11use byteorder::{ByteOrder, LittleEndian};
12use bytes::Bytes;
13use deepsize::DeepSizeOf;
14use lance_arrow::*;
15use prost::Message;
16use serde::{Deserialize, Serialize};
17
18use crate::{ReadBatchParams, traits::Reader};
19use crate::{
20 encodings::{AsyncIndex, Decoder, binary::BinaryDecoder, plain::PlainDecoder},
21 traits::ProtoStruct,
22};
23use lance_core::{Error, Result};
24
25pub mod tracking_store;
26
27pub async fn read_binary_array(
30 reader: &dyn Reader,
31 data_type: &DataType,
32 nullable: bool,
33 position: usize,
34 length: usize,
35 params: impl Into<ReadBatchParams>,
36) -> Result<ArrayRef> {
37 use arrow_schema::DataType::*;
38 let decoder: Box<dyn Decoder<Output = Result<ArrayRef>> + Send> = match data_type {
39 Utf8 => Box::new(BinaryDecoder::<Utf8Type>::new(
40 reader, position, length, nullable,
41 )),
42 Binary => Box::new(BinaryDecoder::<BinaryType>::new(
43 reader, position, length, nullable,
44 )),
45 LargeUtf8 => Box::new(BinaryDecoder::<LargeUtf8Type>::new(
46 reader, position, length, nullable,
47 )),
48 LargeBinary => Box::new(BinaryDecoder::<LargeBinaryType>::new(
49 reader, position, length, nullable,
50 )),
51 _ => {
52 return Err(Error::invalid_input(format!(
53 "Unsupported binary type: {}",
54 data_type
55 )));
56 }
57 };
58 let fut = decoder.as_ref().get(params.into());
59 fut.await
60}
61
62pub async fn read_fixed_stride_array(
65 reader: &dyn Reader,
66 data_type: &DataType,
67 position: usize,
68 length: usize,
69 params: impl Into<ReadBatchParams>,
70) -> Result<ArrayRef> {
71 if !data_type.is_fixed_stride() {
72 return Err(Error::schema(format!(
73 "{data_type} is not a fixed stride type"
74 )));
75 }
76 let decoder = PlainDecoder::new(reader, data_type, position, length)?;
78 decoder.get(params.into()).await
79}
80
81pub async fn read_message<M: Message + Default>(reader: &dyn Reader, pos: usize) -> Result<M> {
86 let file_size = reader.size().await?;
87 if pos > file_size {
88 return Err(Error::io("file size is too small".to_string()));
89 }
90
91 let range = pos..min(pos + reader.block_size(), file_size);
92 let buf = reader.get_range(range.clone()).await?;
93 let msg_len = LittleEndian::read_u32(&buf) as usize;
94
95 if msg_len + 4 > buf.len() {
96 let remaining_range = range.end..min(4 + pos + msg_len, file_size);
97 let remaining_bytes = reader.get_range(remaining_range).await?;
98 let buf = [buf, remaining_bytes].concat();
99 assert!(buf.len() >= msg_len + 4);
100 Ok(M::decode(&buf[4..4 + msg_len])?)
101 } else {
102 Ok(M::decode(&buf[4..4 + msg_len])?)
103 }
104}
105
106pub async fn read_struct<
109 M: Message + Default + 'static,
110 T: ProtoStruct<Proto = M> + TryFrom<M, Error = Error>,
111>(
112 reader: &dyn Reader,
113 pos: usize,
114) -> Result<T> {
115 let msg = read_message::<M>(reader, pos).await?;
116 T::try_from(msg)
117}
118
119pub async fn read_last_block(reader: &dyn Reader) -> object_store::Result<Bytes> {
120 let file_size = reader.size().await?;
121 let block_size = reader.block_size();
122 let begin = file_size.saturating_sub(block_size);
123 reader.get_range(begin..file_size).await
124}
125
126pub fn read_metadata_offset(bytes: &Bytes) -> Result<usize> {
127 let len = bytes.len();
128 if len < 16 {
129 return Err(Error::io(format!(
130 "does not have sufficient data, len: {}, bytes: {:?}",
131 len, bytes
132 )));
133 }
134 let offset_bytes = bytes.slice(len - 16..len - 8);
135 Ok(LittleEndian::read_u64(offset_bytes.as_ref()) as usize)
136}
137
138pub fn read_version(bytes: &Bytes) -> Result<(u16, u16)> {
140 let len = bytes.len();
141 if len < 8 {
142 return Err(Error::io(format!(
143 "does not have sufficient data, len: {}, bytes: {:?}",
144 len, bytes
145 )));
146 }
147
148 let major_version = LittleEndian::read_u16(bytes.slice(len - 8..len - 6).as_ref());
149 let minor_version = LittleEndian::read_u16(bytes.slice(len - 6..len - 4).as_ref());
150 Ok((major_version, minor_version))
151}
152
153pub fn read_message_from_buf<M: Message + Default>(buf: &Bytes) -> Result<M> {
155 let msg_len = LittleEndian::read_u32(buf) as usize;
156 Ok(M::decode(&buf[4..4 + msg_len])?)
157}
158
159pub fn read_struct_from_buf<
161 M: Message + Default,
162 T: ProtoStruct<Proto = M> + TryFrom<M, Error = Error>,
163>(
164 buf: &Bytes,
165) -> Result<T> {
166 let msg: M = read_message_from_buf(buf)?;
167 T::try_from(msg)
168}
169
170#[derive(Debug, DeepSizeOf)]
177pub struct CachedFileSize(AtomicU64);
178
179impl<'de> Deserialize<'de> for CachedFileSize {
180 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
181 where
182 D: serde::Deserializer<'de>,
183 {
184 let size = Option::<u64>::deserialize(deserializer)?.unwrap_or(0);
185 Ok(Self::new(size))
186 }
187}
188
189impl Serialize for CachedFileSize {
190 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
191 where
192 S: serde::Serializer,
193 {
194 let size = self.0.load(std::sync::atomic::Ordering::Relaxed);
195 if size == 0 {
196 serializer.serialize_none()
197 } else {
198 serializer.serialize_u64(size)
199 }
200 }
201}
202
203impl From<Option<NonZero<u64>>> for CachedFileSize {
204 fn from(size: Option<NonZero<u64>>) -> Self {
205 match size {
206 Some(size) => Self(AtomicU64::new(size.into())),
207 None => Self(AtomicU64::new(0)),
208 }
209 }
210}
211
212impl Default for CachedFileSize {
213 fn default() -> Self {
214 Self(AtomicU64::new(0))
215 }
216}
217
218impl Clone for CachedFileSize {
219 fn clone(&self) -> Self {
220 Self(AtomicU64::new(
221 self.0.load(std::sync::atomic::Ordering::Relaxed),
222 ))
223 }
224}
225
226impl PartialEq for CachedFileSize {
227 fn eq(&self, other: &Self) -> bool {
228 self.0.load(std::sync::atomic::Ordering::Relaxed)
229 == other.0.load(std::sync::atomic::Ordering::Relaxed)
230 }
231}
232
233impl Eq for CachedFileSize {}
234
235impl CachedFileSize {
236 pub fn new(size: u64) -> Self {
241 Self(AtomicU64::new(size))
242 }
243
244 pub fn unknown() -> Self {
245 Self(AtomicU64::new(0))
246 }
247
248 pub fn get(&self) -> Option<NonZero<u64>> {
249 NonZero::new(self.0.load(std::sync::atomic::Ordering::Relaxed))
250 }
251
252 pub fn set(&self, size: NonZero<u64>) {
253 self.0
254 .store(size.into(), std::sync::atomic::Ordering::Relaxed);
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use bytes::Bytes;
261 use object_store::path::Path;
262
263 use crate::{
264 Error, Result,
265 object_reader::CloudObjectReader,
266 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, ObjectStore},
267 object_writer::ObjectWriter,
268 traits::{ProtoStruct, WriteExt, Writer},
269 utils::read_struct,
270 };
271
272 #[derive(Debug, PartialEq)]
275 struct BytesWrapper(Bytes);
276
277 impl ProtoStruct for BytesWrapper {
278 type Proto = Bytes;
279 }
280
281 impl From<&BytesWrapper> for Bytes {
282 fn from(value: &BytesWrapper) -> Self {
283 value.0.clone()
284 }
285 }
286
287 impl TryFrom<Bytes> for BytesWrapper {
288 type Error = Error;
289 fn try_from(value: Bytes) -> Result<Self> {
290 Ok(Self(value))
291 }
292 }
293
294 #[tokio::test]
295 async fn test_write_proto_structs() {
296 let store = ObjectStore::memory();
297 let path = Path::from("/foo");
298
299 let mut object_writer = ObjectWriter::new(&store, &path).await.unwrap();
300 assert_eq!(object_writer.tell().await.unwrap(), 0);
301
302 let some_message = BytesWrapper(Bytes::from(vec![10, 20, 30]));
303
304 let pos = object_writer.write_struct(&some_message).await.unwrap();
305 assert_eq!(pos, 0);
306 object_writer.shutdown().await.unwrap();
307
308 let object_reader =
309 CloudObjectReader::new(store.inner, path, 1024, None, DEFAULT_DOWNLOAD_RETRY_COUNT)
310 .unwrap();
311 let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap();
312 assert_eq!(some_message, actual);
313 }
314
315 #[tokio::test]
316 async fn test_copy_reader_to_writer() {
317 let store = ObjectStore::memory();
318 let src = Path::from("/src");
319 let dst = Path::from("/dst");
320 store.put(&src, b"abcdef").await.unwrap();
321
322 let reader = store.open(&src).await.unwrap();
323 let mut writer = store.create(&dst).await.unwrap();
324 let copied = writer.copy_from_reader(reader.as_ref()).await.unwrap();
325 writer.shutdown().await.unwrap();
326
327 assert_eq!(copied, 6);
328 assert_eq!(store.read_one_all(&dst).await.unwrap().as_ref(), b"abcdef");
329 }
330
331 #[tokio::test]
332 async fn test_copy_reader_range_to_writer() {
333 let store = ObjectStore::memory();
334 let src = Path::from("/src-range");
335 let dst = Path::from("/dst-range");
336 store.put(&src, b"abcdef").await.unwrap();
337
338 let reader = store.open(&src).await.unwrap();
339 let mut writer = store.create(&dst).await.unwrap();
340 let copied = writer
341 .copy_range_from_reader(reader.as_ref(), 2..5)
342 .await
343 .unwrap();
344 writer.shutdown().await.unwrap();
345
346 assert_eq!(copied, 3);
347 assert_eq!(store.read_one_all(&dst).await.unwrap().as_ref(), b"cde");
348 }
349}