lance_file/previous/format/
metadata.rs1use std::collections::BTreeMap;
5use std::ops::Range;
6
7use crate::datatypes::{Fields, FieldsWithMeta};
8use crate::format::pb;
9use deepsize::DeepSizeOf;
10use lance_core::datatypes::Schema;
11use lance_core::{Error, Result};
12use lance_io::traits::ProtoStruct;
13
14#[derive(Debug, Default, DeepSizeOf, PartialEq)]
16pub struct Metadata {
17 pub batch_offsets: Vec<i32>,
19
20 pub page_table_position: usize,
22
23 pub manifest_position: Option<usize>,
25
26 pub stats_metadata: Option<StatisticsMetadata>,
28}
29
30impl ProtoStruct for Metadata {
31 type Proto = pb::Metadata;
32}
33
34impl From<&Metadata> for pb::Metadata {
35 fn from(m: &Metadata) -> Self {
36 let statistics = if let Some(stats_meta) = &m.stats_metadata {
37 let fields_with_meta: FieldsWithMeta = (&stats_meta.schema).into();
38 Some(pb::metadata::StatisticsMetadata {
39 schema: fields_with_meta.fields.0,
40 fields: stats_meta.leaf_field_ids.clone(),
41 page_table_position: stats_meta.page_table_position as u64,
42 })
43 } else {
44 None
45 };
46
47 Self {
48 batch_offsets: m.batch_offsets.clone(),
49 page_table_position: m.page_table_position as u64,
50 manifest_position: m.manifest_position.unwrap_or(0) as u64,
51 statistics,
52 }
53 }
54}
55
56impl TryFrom<pb::Metadata> for Metadata {
57 type Error = Error;
58 fn try_from(m: pb::Metadata) -> Result<Self> {
59 Ok(Self {
60 batch_offsets: m.batch_offsets.clone(),
61 page_table_position: m.page_table_position as usize,
62 manifest_position: Some(m.manifest_position as usize),
63 stats_metadata: if let Some(stats_meta) = m.statistics {
64 Some(StatisticsMetadata {
65 schema: Schema::from(FieldsWithMeta {
66 fields: Fields(stats_meta.schema),
67 metadata: Default::default(),
68 }),
69 leaf_field_ids: stats_meta.fields,
70 page_table_position: stats_meta.page_table_position as usize,
71 })
72 } else {
73 None
74 },
75 })
76 }
77}
78
79#[derive(Debug, PartialEq)]
80pub struct BatchOffsets {
81 pub batch_id: i32,
82 pub offsets: Vec<u32>,
83}
84
85impl Metadata {
86 pub fn num_batches(&self) -> usize {
88 if self.batch_offsets.is_empty() {
89 0
90 } else {
91 self.batch_offsets.len() - 1
92 }
93 }
94
95 pub fn len(&self) -> usize {
97 *self.batch_offsets.last().unwrap_or(&0) as usize
98 }
99
100 pub fn is_empty(&self) -> bool {
101 self.len() == 0
102 }
103
104 pub fn push_batch_length(&mut self, batch_len: i32) {
106 if self.batch_offsets.is_empty() {
107 self.batch_offsets.push(0)
108 }
109 self.batch_offsets
110 .push(batch_len + self.batch_offsets.last().unwrap())
111 }
112
113 pub fn get_offset(&self, batch_id: i32) -> Option<i32> {
115 self.batch_offsets.get(batch_id as usize).copied()
116 }
117
118 pub fn get_batch_length(&self, batch_id: i32) -> Option<i32> {
120 self.get_offset(batch_id + 1)
121 .map(|o| o - self.get_offset(batch_id).unwrap_or_default())
122 }
123
124 pub fn group_indices_to_batches(&self, indices: &[u32]) -> Vec<BatchOffsets> {
129 let mut batch_id: i32 = 0;
130 let num_batches = self.num_batches() as i32;
131 let mut indices_per_batch: BTreeMap<i32, Vec<u32>> = BTreeMap::new();
132
133 let mut indices = Vec::from(indices);
134 indices.sort_unstable();
136
137 for idx in indices.iter() {
138 while batch_id < num_batches && *idx >= self.batch_offsets[batch_id as usize + 1] as u32
139 {
140 batch_id += 1;
141 }
142 indices_per_batch
143 .entry(batch_id)
144 .and_modify(|v| v.push(*idx))
145 .or_insert(vec![*idx]);
146 }
147
148 indices_per_batch
149 .iter()
150 .map(|(batch_id, indices)| {
151 let batch_offset = self.batch_offsets[*batch_id as usize];
152 let in_batch_offsets = indices
154 .iter()
155 .map(|i| i - batch_offset as u32)
156 .collect::<Vec<_>>();
157 BatchOffsets {
158 batch_id: *batch_id,
159 offsets: in_batch_offsets,
160 }
161 })
162 .collect()
163 }
164
165 pub fn range_to_batches(&self, range: Range<usize>) -> Result<Vec<(i32, Range<usize>)>> {
170 if range.end > *(self.batch_offsets.last().unwrap()) as usize {
171 return Err(Error::invalid_input(format!(
172 "Range {:?} is out of bounds {}",
173 range,
174 self.batch_offsets.last().unwrap()
175 )));
176 }
177 let offsets = self.batch_offsets.as_slice();
178 let mut batch_id = offsets
179 .binary_search(&(range.start as i32))
180 .unwrap_or_else(|x| x - 1);
181 let mut batches = vec![];
182
183 while batch_id < self.num_batches() {
184 let batch_start = offsets[batch_id] as usize;
185 if batch_start >= range.end {
186 break;
187 }
188 let start = std::cmp::max(range.start, batch_start) - batch_start;
189 let end = std::cmp::min(range.end, offsets[batch_id + 1] as usize) - batch_start;
190 batches.push((batch_id as i32, start..end));
191 batch_id += 1;
192 }
193 Ok(batches)
194 }
195}
196
197#[derive(Debug, PartialEq, DeepSizeOf)]
199pub struct StatisticsMetadata {
200 pub schema: Schema,
205 pub leaf_field_ids: Vec<i32>,
206 pub page_table_position: usize,
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn test_group_indices_to_batch() {
215 let mut metadata = Metadata::default();
216 metadata.push_batch_length(20);
217 metadata.push_batch_length(20);
218
219 let batches = metadata.group_indices_to_batches(&[6, 24]);
220 assert_eq!(batches.len(), 2);
221 assert_eq!(
222 batches,
223 vec![
224 BatchOffsets {
225 batch_id: 0,
226 offsets: vec![6]
227 },
228 BatchOffsets {
229 batch_id: 1,
230 offsets: vec![4]
231 }
232 ]
233 );
234 }
235
236 #[test]
237 fn test_range_to_batches() {
238 let mut metadata = Metadata::default();
239 for l in [5, 10, 15, 20] {
240 metadata.push_batch_length(l);
241 }
242
243 let batches = metadata.range_to_batches(0..10).unwrap();
244 assert_eq!(batches, vec![(0, 0..5), (1, 0..5)]);
245
246 let batches = metadata.range_to_batches(2..10).unwrap();
247 assert_eq!(batches, vec![(0, 2..5), (1, 0..5)]);
248
249 let batches = metadata.range_to_batches(15..33).unwrap();
250 assert_eq!(batches, vec![(2, 0..15), (3, 0..3)]);
251
252 let batches = metadata.range_to_batches(14..33).unwrap();
253 assert_eq!(batches, vec![(1, 9..10), (2, 0..15), (3, 0..3)]);
254 }
255}