lance_io/
utils.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{cmp::min, num::NonZero, sync::atomic::AtomicU64};
5
6use arrow_array::{
7    types::{BinaryType, LargeBinaryType, LargeUtf8Type, Utf8Type},
8    ArrayRef,
9};
10use arrow_schema::DataType;
11use byteorder::{ByteOrder, LittleEndian};
12use bytes::Bytes;
13use deepsize::DeepSizeOf;
14use lance_arrow::*;
15use prost::Message;
16use serde::{Deserialize, Serialize};
17use snafu::location;
18
19use crate::{
20    encodings::{binary::BinaryDecoder, plain::PlainDecoder, AsyncIndex, Decoder},
21    traits::ProtoStruct,
22};
23use crate::{traits::Reader, ReadBatchParams};
24use lance_core::{Error, Result};
25
26pub mod tracking_store;
27
28/// Read a binary array from a [Reader].
29///
30pub async fn read_binary_array(
31    reader: &dyn Reader,
32    data_type: &DataType,
33    nullable: bool,
34    position: usize,
35    length: usize,
36    params: impl Into<ReadBatchParams>,
37) -> Result<ArrayRef> {
38    use arrow_schema::DataType::*;
39    let decoder: Box<dyn Decoder<Output = Result<ArrayRef>> + Send> = match data_type {
40        Utf8 => Box::new(BinaryDecoder::<Utf8Type>::new(
41            reader, position, length, nullable,
42        )),
43        Binary => Box::new(BinaryDecoder::<BinaryType>::new(
44            reader, position, length, nullable,
45        )),
46        LargeUtf8 => Box::new(BinaryDecoder::<LargeUtf8Type>::new(
47            reader, position, length, nullable,
48        )),
49        LargeBinary => Box::new(BinaryDecoder::<LargeBinaryType>::new(
50            reader, position, length, nullable,
51        )),
52        _ => {
53            return Err(Error::io(
54                format!("Unsupported binary type: {}", data_type),
55                location!(),
56            ));
57        }
58    };
59    let fut = decoder.as_ref().get(params.into());
60    fut.await
61}
62
63/// Read a fixed stride array from disk.
64///
65pub async fn read_fixed_stride_array(
66    reader: &dyn Reader,
67    data_type: &DataType,
68    position: usize,
69    length: usize,
70    params: impl Into<ReadBatchParams>,
71) -> Result<ArrayRef> {
72    if !data_type.is_fixed_stride() {
73        return Err(Error::Schema {
74            message: format!("{data_type} is not a fixed stride type"),
75            location: location!(),
76        });
77    }
78    // TODO: support more than plain encoding here.
79    let decoder = PlainDecoder::new(reader, data_type, position, length)?;
80    decoder.get(params.into()).await
81}
82
83/// Read a protobuf message at file position 'pos'.
84///
85/// We write protobuf by first writing the length of the message as a u32,
86/// followed by the message itself.
87pub async fn read_message<M: Message + Default>(reader: &dyn Reader, pos: usize) -> Result<M> {
88    let file_size = reader.size().await?;
89    if pos > file_size {
90        return Err(Error::io("file size is too small".to_string(), location!()));
91    }
92
93    let range = pos..min(pos + reader.block_size(), file_size);
94    let buf = reader.get_range(range.clone()).await?;
95    let msg_len = LittleEndian::read_u32(&buf) as usize;
96
97    if msg_len + 4 > buf.len() {
98        let remaining_range = range.end..min(4 + pos + msg_len, file_size);
99        let remaining_bytes = reader.get_range(remaining_range).await?;
100        let buf = [buf, remaining_bytes].concat();
101        assert!(buf.len() >= msg_len + 4);
102        Ok(M::decode(&buf[4..4 + msg_len])?)
103    } else {
104        Ok(M::decode(&buf[4..4 + msg_len])?)
105    }
106}
107
108/// Read a Protobuf-backed struct at file position: `pos`.
109// TODO: pub(crate)
110pub async fn read_struct<
111    M: Message + Default + 'static,
112    T: ProtoStruct<Proto = M> + TryFrom<M, Error = Error>,
113>(
114    reader: &dyn Reader,
115    pos: usize,
116) -> Result<T> {
117    let msg = read_message::<M>(reader, pos).await?;
118    T::try_from(msg)
119}
120
121pub async fn read_last_block(reader: &dyn Reader) -> object_store::Result<Bytes> {
122    let file_size = reader.size().await?;
123    let block_size = reader.block_size();
124    let begin = file_size.saturating_sub(block_size);
125    reader.get_range(begin..file_size).await
126}
127
128pub fn read_metadata_offset(bytes: &Bytes) -> Result<usize> {
129    let len = bytes.len();
130    if len < 16 {
131        return Err(Error::io(
132            format!(
133                "does not have sufficient data, len: {}, bytes: {:?}",
134                len, bytes
135            ),
136            location!(),
137        ));
138    }
139    let offset_bytes = bytes.slice(len - 16..len - 8);
140    Ok(LittleEndian::read_u64(offset_bytes.as_ref()) as usize)
141}
142
143/// Read the version from the footer bytes
144pub fn read_version(bytes: &Bytes) -> Result<(u16, u16)> {
145    let len = bytes.len();
146    if len < 8 {
147        return Err(Error::io(
148            format!(
149                "does not have sufficient data, len: {}, bytes: {:?}",
150                len, bytes
151            ),
152            location!(),
153        ));
154    }
155
156    let major_version = LittleEndian::read_u16(bytes.slice(len - 8..len - 6).as_ref());
157    let minor_version = LittleEndian::read_u16(bytes.slice(len - 6..len - 4).as_ref());
158    Ok((major_version, minor_version))
159}
160
161/// Read protobuf from a buffer.
162pub fn read_message_from_buf<M: Message + Default>(buf: &Bytes) -> Result<M> {
163    let msg_len = LittleEndian::read_u32(buf) as usize;
164    Ok(M::decode(&buf[4..4 + msg_len])?)
165}
166
167/// Read a Protobuf-backed struct from a buffer.
168pub fn read_struct_from_buf<
169    M: Message + Default,
170    T: ProtoStruct<Proto = M> + TryFrom<M, Error = Error>,
171>(
172    buf: &Bytes,
173) -> Result<T> {
174    let msg: M = read_message_from_buf(buf)?;
175    T::try_from(msg)
176}
177
178/// A cached file size.
179///
180/// This wraps an atomic u64 to allow setting the cached file size without
181/// needed a mutable reference.
182///
183/// Zero is interpreted as unknown.
184#[derive(Debug, DeepSizeOf)]
185pub struct CachedFileSize(AtomicU64);
186
187impl<'de> Deserialize<'de> for CachedFileSize {
188    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
189    where
190        D: serde::Deserializer<'de>,
191    {
192        let size = Option::<u64>::deserialize(deserializer)?.unwrap_or(0);
193        Ok(Self::new(size))
194    }
195}
196
197impl Serialize for CachedFileSize {
198    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
199    where
200        S: serde::Serializer,
201    {
202        let size = self.0.load(std::sync::atomic::Ordering::Relaxed);
203        if size == 0 {
204            serializer.serialize_none()
205        } else {
206            serializer.serialize_u64(size)
207        }
208    }
209}
210
211impl From<Option<NonZero<u64>>> for CachedFileSize {
212    fn from(size: Option<NonZero<u64>>) -> Self {
213        match size {
214            Some(size) => Self(AtomicU64::new(size.into())),
215            None => Self(AtomicU64::new(0)),
216        }
217    }
218}
219
220impl Default for CachedFileSize {
221    fn default() -> Self {
222        Self(AtomicU64::new(0))
223    }
224}
225
226impl Clone for CachedFileSize {
227    fn clone(&self) -> Self {
228        Self(AtomicU64::new(
229            self.0.load(std::sync::atomic::Ordering::Relaxed),
230        ))
231    }
232}
233
234impl PartialEq for CachedFileSize {
235    fn eq(&self, other: &Self) -> bool {
236        self.0.load(std::sync::atomic::Ordering::Relaxed)
237            == other.0.load(std::sync::atomic::Ordering::Relaxed)
238    }
239}
240
241impl Eq for CachedFileSize {}
242
243impl CachedFileSize {
244    pub fn new(size: u64) -> Self {
245        Self(AtomicU64::new(size))
246    }
247
248    pub fn unknown() -> Self {
249        Self(AtomicU64::new(0))
250    }
251
252    pub fn get(&self) -> Option<NonZero<u64>> {
253        NonZero::new(self.0.load(std::sync::atomic::Ordering::Relaxed))
254    }
255
256    pub fn set(&self, size: NonZero<u64>) {
257        self.0
258            .store(size.into(), std::sync::atomic::Ordering::Relaxed);
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use bytes::Bytes;
265    use object_store::path::Path;
266
267    use crate::{
268        object_reader::CloudObjectReader,
269        object_store::{ObjectStore, DEFAULT_DOWNLOAD_RETRY_COUNT},
270        object_writer::ObjectWriter,
271        traits::{ProtoStruct, WriteExt, Writer},
272        utils::read_struct,
273        Error, Result,
274    };
275
276    // Bytes is a prost::Message, since we don't have any .proto files in this crate we
277    // can use it to simulate a real message object.
278    #[derive(Debug, PartialEq)]
279    struct BytesWrapper(Bytes);
280
281    impl ProtoStruct for BytesWrapper {
282        type Proto = Bytes;
283    }
284
285    impl From<&BytesWrapper> for Bytes {
286        fn from(value: &BytesWrapper) -> Self {
287            value.0.clone()
288        }
289    }
290
291    impl TryFrom<Bytes> for BytesWrapper {
292        type Error = Error;
293        fn try_from(value: Bytes) -> Result<Self> {
294            Ok(Self(value))
295        }
296    }
297
298    #[tokio::test]
299    async fn test_write_proto_structs() {
300        let store = ObjectStore::memory();
301        let path = Path::from("/foo");
302
303        let mut object_writer = ObjectWriter::new(&store, &path).await.unwrap();
304        assert_eq!(object_writer.tell().await.unwrap(), 0);
305
306        let some_message = BytesWrapper(Bytes::from(vec![10, 20, 30]));
307
308        let pos = object_writer.write_struct(&some_message).await.unwrap();
309        assert_eq!(pos, 0);
310        object_writer.shutdown().await.unwrap();
311
312        let object_reader =
313            CloudObjectReader::new(store.inner, path, 1024, None, DEFAULT_DOWNLOAD_RETRY_COUNT)
314                .unwrap();
315        let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap();
316        assert_eq!(some_message, actual);
317    }
318}