lance_file/previous/format/
metadata.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::BTreeMap;
5use std::ops::Range;
6
7use crate::datatypes::{Fields, FieldsWithMeta};
8use crate::format::pb;
9use deepsize::DeepSizeOf;
10use lance_core::datatypes::Schema;
11use lance_core::{Error, Result};
12use lance_io::traits::ProtoStruct;
13use snafu::location;
14
15/// Data File Metadata
16#[derive(Debug, Default, DeepSizeOf, PartialEq)]
17pub struct Metadata {
18    /// Offset of each record batch.
19    pub batch_offsets: Vec<i32>,
20
21    /// The file position of the page table in the file.
22    pub page_table_position: usize,
23
24    /// The file position of the manifest block in the file.
25    pub manifest_position: Option<usize>,
26
27    /// Metadata about statistics.
28    pub stats_metadata: Option<StatisticsMetadata>,
29}
30
31impl ProtoStruct for Metadata {
32    type Proto = pb::Metadata;
33}
34
35impl From<&Metadata> for pb::Metadata {
36    fn from(m: &Metadata) -> Self {
37        let statistics = if let Some(stats_meta) = &m.stats_metadata {
38            let fields_with_meta: FieldsWithMeta = (&stats_meta.schema).into();
39            Some(pb::metadata::StatisticsMetadata {
40                schema: fields_with_meta.fields.0,
41                fields: stats_meta.leaf_field_ids.clone(),
42                page_table_position: stats_meta.page_table_position as u64,
43            })
44        } else {
45            None
46        };
47
48        Self {
49            batch_offsets: m.batch_offsets.clone(),
50            page_table_position: m.page_table_position as u64,
51            manifest_position: m.manifest_position.unwrap_or(0) as u64,
52            statistics,
53        }
54    }
55}
56
57impl TryFrom<pb::Metadata> for Metadata {
58    type Error = Error;
59    fn try_from(m: pb::Metadata) -> Result<Self> {
60        Ok(Self {
61            batch_offsets: m.batch_offsets.clone(),
62            page_table_position: m.page_table_position as usize,
63            manifest_position: Some(m.manifest_position as usize),
64            stats_metadata: if let Some(stats_meta) = m.statistics {
65                Some(StatisticsMetadata {
66                    schema: Schema::from(FieldsWithMeta {
67                        fields: Fields(stats_meta.schema),
68                        metadata: Default::default(),
69                    }),
70                    leaf_field_ids: stats_meta.fields,
71                    page_table_position: stats_meta.page_table_position as usize,
72                })
73            } else {
74                None
75            },
76        })
77    }
78}
79
80#[derive(Debug, PartialEq)]
81pub struct BatchOffsets {
82    pub batch_id: i32,
83    pub offsets: Vec<u32>,
84}
85
86impl Metadata {
87    /// Get the number of batches in this file.
88    pub fn num_batches(&self) -> usize {
89        if self.batch_offsets.is_empty() {
90            0
91        } else {
92            self.batch_offsets.len() - 1
93        }
94    }
95
96    /// Get the number of records in this file
97    pub fn len(&self) -> usize {
98        *self.batch_offsets.last().unwrap_or(&0) as usize
99    }
100
101    pub fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104
105    /// Push the length of the batch.
106    pub fn push_batch_length(&mut self, batch_len: i32) {
107        if self.batch_offsets.is_empty() {
108            self.batch_offsets.push(0)
109        }
110        self.batch_offsets
111            .push(batch_len + self.batch_offsets.last().unwrap())
112    }
113
114    /// Get the starting offset of the batch.
115    pub fn get_offset(&self, batch_id: i32) -> Option<i32> {
116        self.batch_offsets.get(batch_id as usize).copied()
117    }
118
119    /// Get the length of the batch.
120    pub fn get_batch_length(&self, batch_id: i32) -> Option<i32> {
121        self.get_offset(batch_id + 1)
122            .map(|o| o - self.get_offset(batch_id).unwrap_or_default())
123    }
124
125    /// Group row indices into each batch.
126    ///
127    /// The indices must be sorted.
128    // TODO: pub(crate)
129    pub fn group_indices_to_batches(&self, indices: &[u32]) -> Vec<BatchOffsets> {
130        let mut batch_id: i32 = 0;
131        let num_batches = self.num_batches() as i32;
132        let mut indices_per_batch: BTreeMap<i32, Vec<u32>> = BTreeMap::new();
133
134        let mut indices = Vec::from(indices);
135        // sort unstable is quick sort and is almost always faster than sort
136        indices.sort_unstable();
137
138        for idx in indices.iter() {
139            while batch_id < num_batches && *idx >= self.batch_offsets[batch_id as usize + 1] as u32
140            {
141                batch_id += 1;
142            }
143            indices_per_batch
144                .entry(batch_id)
145                .and_modify(|v| v.push(*idx))
146                .or_insert(vec![*idx]);
147        }
148
149        indices_per_batch
150            .iter()
151            .map(|(batch_id, indices)| {
152                let batch_offset = self.batch_offsets[*batch_id as usize];
153                // Adjust indices to be the in-batch offsets.
154                let in_batch_offsets = indices
155                    .iter()
156                    .map(|i| i - batch_offset as u32)
157                    .collect::<Vec<_>>();
158                BatchOffsets {
159                    batch_id: *batch_id,
160                    offsets: in_batch_offsets,
161                }
162            })
163            .collect()
164    }
165
166    /// Map the range of row indices to the corresponding batches.
167    ///
168    /// It returns a list of (batch_id, in_batch_range) tuples.
169    // TODO: pub(crate)
170    pub fn range_to_batches(&self, range: Range<usize>) -> Result<Vec<(i32, Range<usize>)>> {
171        if range.end > *(self.batch_offsets.last().unwrap()) as usize {
172            return Err(Error::io(
173                format!(
174                    "Range {:?} is out of bounds {}",
175                    range,
176                    self.batch_offsets.last().unwrap()
177                ),
178                location!(),
179            ));
180        }
181        let offsets = self.batch_offsets.as_slice();
182        let mut batch_id = offsets
183            .binary_search(&(range.start as i32))
184            .unwrap_or_else(|x| x - 1);
185        let mut batches = vec![];
186
187        while batch_id < self.num_batches() {
188            let batch_start = offsets[batch_id] as usize;
189            if batch_start >= range.end {
190                break;
191            }
192            let start = std::cmp::max(range.start, batch_start) - batch_start;
193            let end = std::cmp::min(range.end, offsets[batch_id + 1] as usize) - batch_start;
194            batches.push((batch_id as i32, start..end));
195            batch_id += 1;
196        }
197        Ok(batches)
198    }
199}
200
201/// Metadata about the statistics
202#[derive(Debug, PartialEq, DeepSizeOf)]
203pub struct StatisticsMetadata {
204    /// Schema of the page-level statistics.
205    ///
206    /// For a given field with id `i`, the statistics are stored in the field
207    /// `i.null_count`, `i.min_value`, and `i.max_value`.
208    pub schema: Schema,
209    pub leaf_field_ids: Vec<i32>,
210    pub page_table_position: usize,
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn test_group_indices_to_batch() {
219        let mut metadata = Metadata::default();
220        metadata.push_batch_length(20);
221        metadata.push_batch_length(20);
222
223        let batches = metadata.group_indices_to_batches(&[6, 24]);
224        assert_eq!(batches.len(), 2);
225        assert_eq!(
226            batches,
227            vec![
228                BatchOffsets {
229                    batch_id: 0,
230                    offsets: vec![6]
231                },
232                BatchOffsets {
233                    batch_id: 1,
234                    offsets: vec![4]
235                }
236            ]
237        );
238    }
239
240    #[test]
241    fn test_range_to_batches() {
242        let mut metadata = Metadata::default();
243        for l in [5, 10, 15, 20] {
244            metadata.push_batch_length(l);
245        }
246
247        let batches = metadata.range_to_batches(0..10).unwrap();
248        assert_eq!(batches, vec![(0, 0..5), (1, 0..5)]);
249
250        let batches = metadata.range_to_batches(2..10).unwrap();
251        assert_eq!(batches, vec![(0, 2..5), (1, 0..5)]);
252
253        let batches = metadata.range_to_batches(15..33).unwrap();
254        assert_eq!(batches, vec![(2, 0..15), (3, 0..3)]);
255
256        let batches = metadata.range_to_batches(14..33).unwrap();
257        assert_eq!(batches, vec![(1, 9..10), (2, 0..15), (3, 0..3)]);
258    }
259}