Skip to main content

lance_encoding/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{ops::Range, sync::Arc};
5
6use bytes::Bytes;
7use futures::{FutureExt, TryFutureExt, future::BoxFuture};
8
9use lance_core::Result;
10
11pub mod buffer;
12pub mod compression;
13pub mod compression_config;
14pub mod constants;
15pub mod data;
16pub mod decoder;
17pub mod encoder;
18pub mod encodings;
19pub mod format;
20pub mod previous;
21pub mod repdef;
22pub mod statistics;
23#[cfg(test)]
24pub mod testing;
25pub mod utils;
26pub mod version;
27
28// We can definitely add support for big-endian machines someday.  However, it's not a priority and
29// would involve extensive testing (probably through emulation) to ensure that the encodings are
30// correct.
31#[cfg(not(target_endian = "little"))]
32compile_error!("Lance encodings only support little-endian systems.");
33
34/// A trait for an I/O service
35///
36/// This represents the I/O API that the encoders and decoders need in order to operate.
37/// We specify this as a trait so that lance-encodings does not need to depend on lance-io
38///
39/// In general, it is assumed that this trait will be implemented by some kind of "file reader"
40/// or "file scheduler".  The encodings here are all limited to accessing a single file.
41pub trait EncodingsIo: std::fmt::Debug + Send + Sync {
42    /// Submit an I/O request
43    ///
44    /// The response must contain a `Bytes` object for each range requested even if the underlying
45    /// I/O was coalesced into fewer actual requests.
46    ///
47    /// # Arguments
48    ///
49    /// * `ranges` - the byte ranges to request
50    /// * `priority` - the priority of the request
51    ///
52    /// Priority should be set to the lowest row number that this request is delivering data for.
53    /// This is important in cases where indirect I/O causes high priority requests to be submitted
54    /// after low priority requests.  We want to fulfill the indirect I/O more quickly so that we
55    /// can decode as quickly as possible.
56    ///
57    /// The implementation should be able to handle empty ranges, and should return an empty
58    /// byte buffer for each empty range.
59    fn submit_request(
60        &self,
61        range: Vec<Range<u64>>,
62        priority: u64,
63    ) -> BoxFuture<'static, Result<Vec<Bytes>>>;
64
65    /// Submit an I/O request with a single range
66    ///
67    /// This is just a utitliy function that wraps [`EncodingsIo::submit_request`] for the common
68    /// case of a single range request.
69    fn submit_single(
70        &self,
71        range: std::ops::Range<u64>,
72        priority: u64,
73    ) -> BoxFuture<'static, lance_core::Result<bytes::Bytes>> {
74        self.submit_request(vec![range], priority)
75            .map_ok(|mut v| v.pop().unwrap())
76            .boxed()
77    }
78
79    /// Returns a version of this I/O service that bypasses backpressure for all requests.
80    ///
81    /// This is intended for indirect I/O (e.g. fetching items after decoding offsets) where
82    /// blocking on backpressure could cause deadlocks or excessive latency.
83    ///
84    /// Returns `None` if this implementation does not support bypass (e.g. in-memory or test
85    /// schedulers), in which case the caller should fall back to using self.
86    fn with_bypass_backpressure(&self) -> Option<Arc<dyn EncodingsIo>> {
87        None
88    }
89
90    /// Returns a version of this I/O service that additionally records the I/O it
91    /// performs into `stats`, on top of any global accounting.  This is the seam
92    /// used to measure exact per-scope (e.g. per-query) I/O without re-opening
93    /// files: wrap a reader's I/O service, perform the reads, then inspect the
94    /// recorder.
95    ///
96    /// Returns `None` if this implementation does not support per-scope I/O
97    /// statistics (e.g. in-memory or test schedulers), in which case the caller
98    /// should fall back to using self (and no statistics are recorded).
99    fn with_io_stats(
100        &self,
101        _stats: Arc<dyn lance_core::utils::io_stats::IoStatsRecorder>,
102    ) -> Option<Arc<dyn EncodingsIo>> {
103        None
104    }
105}
106
107/// An implementation of EncodingsIo that serves data from an in-memory buffer
108#[derive(Debug)]
109pub struct BufferScheduler {
110    data: Bytes,
111}
112
113impl BufferScheduler {
114    pub fn new(data: Bytes) -> Self {
115        Self { data }
116    }
117
118    fn satisfy_request(&self, req: Range<u64>) -> Bytes {
119        self.data.slice(req.start as usize..req.end as usize)
120    }
121}
122
123impl EncodingsIo for BufferScheduler {
124    fn submit_request(
125        &self,
126        ranges: Vec<Range<u64>>,
127        _priority: u64,
128    ) -> BoxFuture<'static, Result<Vec<Bytes>>> {
129        std::future::ready(Ok(ranges
130            .into_iter()
131            .map(|range| self.satisfy_request(range))
132            .collect::<Vec<_>>()))
133        .boxed()
134    }
135}