lance_file/previous/format/
metadata.rs1use std::collections::BTreeMap;
5use std::ops::Range;
6
7use crate::datatypes::{Fields, FieldsWithMeta};
8use crate::format::pb;
9use deepsize::DeepSizeOf;
10use lance_core::datatypes::Schema;
11use lance_core::{Error, Result};
12use lance_io::traits::ProtoStruct;
13use snafu::location;
14
15#[derive(Debug, Default, DeepSizeOf, PartialEq)]
17pub struct Metadata {
18 pub batch_offsets: Vec<i32>,
20
21 pub page_table_position: usize,
23
24 pub manifest_position: Option<usize>,
26
27 pub stats_metadata: Option<StatisticsMetadata>,
29}
30
31impl ProtoStruct for Metadata {
32 type Proto = pb::Metadata;
33}
34
35impl From<&Metadata> for pb::Metadata {
36 fn from(m: &Metadata) -> Self {
37 let statistics = if let Some(stats_meta) = &m.stats_metadata {
38 let fields_with_meta: FieldsWithMeta = (&stats_meta.schema).into();
39 Some(pb::metadata::StatisticsMetadata {
40 schema: fields_with_meta.fields.0,
41 fields: stats_meta.leaf_field_ids.clone(),
42 page_table_position: stats_meta.page_table_position as u64,
43 })
44 } else {
45 None
46 };
47
48 Self {
49 batch_offsets: m.batch_offsets.clone(),
50 page_table_position: m.page_table_position as u64,
51 manifest_position: m.manifest_position.unwrap_or(0) as u64,
52 statistics,
53 }
54 }
55}
56
57impl TryFrom<pb::Metadata> for Metadata {
58 type Error = Error;
59 fn try_from(m: pb::Metadata) -> Result<Self> {
60 Ok(Self {
61 batch_offsets: m.batch_offsets.clone(),
62 page_table_position: m.page_table_position as usize,
63 manifest_position: Some(m.manifest_position as usize),
64 stats_metadata: if let Some(stats_meta) = m.statistics {
65 Some(StatisticsMetadata {
66 schema: Schema::from(FieldsWithMeta {
67 fields: Fields(stats_meta.schema),
68 metadata: Default::default(),
69 }),
70 leaf_field_ids: stats_meta.fields,
71 page_table_position: stats_meta.page_table_position as usize,
72 })
73 } else {
74 None
75 },
76 })
77 }
78}
79
80#[derive(Debug, PartialEq)]
81pub struct BatchOffsets {
82 pub batch_id: i32,
83 pub offsets: Vec<u32>,
84}
85
86impl Metadata {
87 pub fn num_batches(&self) -> usize {
89 if self.batch_offsets.is_empty() {
90 0
91 } else {
92 self.batch_offsets.len() - 1
93 }
94 }
95
96 pub fn len(&self) -> usize {
98 *self.batch_offsets.last().unwrap_or(&0) as usize
99 }
100
101 pub fn is_empty(&self) -> bool {
102 self.len() == 0
103 }
104
105 pub fn push_batch_length(&mut self, batch_len: i32) {
107 if self.batch_offsets.is_empty() {
108 self.batch_offsets.push(0)
109 }
110 self.batch_offsets
111 .push(batch_len + self.batch_offsets.last().unwrap())
112 }
113
114 pub fn get_offset(&self, batch_id: i32) -> Option<i32> {
116 self.batch_offsets.get(batch_id as usize).copied()
117 }
118
119 pub fn get_batch_length(&self, batch_id: i32) -> Option<i32> {
121 self.get_offset(batch_id + 1)
122 .map(|o| o - self.get_offset(batch_id).unwrap_or_default())
123 }
124
125 pub fn group_indices_to_batches(&self, indices: &[u32]) -> Vec<BatchOffsets> {
130 let mut batch_id: i32 = 0;
131 let num_batches = self.num_batches() as i32;
132 let mut indices_per_batch: BTreeMap<i32, Vec<u32>> = BTreeMap::new();
133
134 let mut indices = Vec::from(indices);
135 indices.sort_unstable();
137
138 for idx in indices.iter() {
139 while batch_id < num_batches && *idx >= self.batch_offsets[batch_id as usize + 1] as u32
140 {
141 batch_id += 1;
142 }
143 indices_per_batch
144 .entry(batch_id)
145 .and_modify(|v| v.push(*idx))
146 .or_insert(vec![*idx]);
147 }
148
149 indices_per_batch
150 .iter()
151 .map(|(batch_id, indices)| {
152 let batch_offset = self.batch_offsets[*batch_id as usize];
153 let in_batch_offsets = indices
155 .iter()
156 .map(|i| i - batch_offset as u32)
157 .collect::<Vec<_>>();
158 BatchOffsets {
159 batch_id: *batch_id,
160 offsets: in_batch_offsets,
161 }
162 })
163 .collect()
164 }
165
166 pub fn range_to_batches(&self, range: Range<usize>) -> Result<Vec<(i32, Range<usize>)>> {
171 if range.end > *(self.batch_offsets.last().unwrap()) as usize {
172 return Err(Error::io(
173 format!(
174 "Range {:?} is out of bounds {}",
175 range,
176 self.batch_offsets.last().unwrap()
177 ),
178 location!(),
179 ));
180 }
181 let offsets = self.batch_offsets.as_slice();
182 let mut batch_id = offsets
183 .binary_search(&(range.start as i32))
184 .unwrap_or_else(|x| x - 1);
185 let mut batches = vec![];
186
187 while batch_id < self.num_batches() {
188 let batch_start = offsets[batch_id] as usize;
189 if batch_start >= range.end {
190 break;
191 }
192 let start = std::cmp::max(range.start, batch_start) - batch_start;
193 let end = std::cmp::min(range.end, offsets[batch_id + 1] as usize) - batch_start;
194 batches.push((batch_id as i32, start..end));
195 batch_id += 1;
196 }
197 Ok(batches)
198 }
199}
200
201#[derive(Debug, PartialEq, DeepSizeOf)]
203pub struct StatisticsMetadata {
204 pub schema: Schema,
209 pub leaf_field_ids: Vec<i32>,
210 pub page_table_position: usize,
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn test_group_indices_to_batch() {
219 let mut metadata = Metadata::default();
220 metadata.push_batch_length(20);
221 metadata.push_batch_length(20);
222
223 let batches = metadata.group_indices_to_batches(&[6, 24]);
224 assert_eq!(batches.len(), 2);
225 assert_eq!(
226 batches,
227 vec![
228 BatchOffsets {
229 batch_id: 0,
230 offsets: vec![6]
231 },
232 BatchOffsets {
233 batch_id: 1,
234 offsets: vec![4]
235 }
236 ]
237 );
238 }
239
240 #[test]
241 fn test_range_to_batches() {
242 let mut metadata = Metadata::default();
243 for l in [5, 10, 15, 20] {
244 metadata.push_batch_length(l);
245 }
246
247 let batches = metadata.range_to_batches(0..10).unwrap();
248 assert_eq!(batches, vec![(0, 0..5), (1, 0..5)]);
249
250 let batches = metadata.range_to_batches(2..10).unwrap();
251 assert_eq!(batches, vec![(0, 2..5), (1, 0..5)]);
252
253 let batches = metadata.range_to_batches(15..33).unwrap();
254 assert_eq!(batches, vec![(2, 0..15), (3, 0..3)]);
255
256 let batches = metadata.range_to_batches(14..33).unwrap();
257 assert_eq!(batches, vec![(1, 9..10), (2, 0..15), (3, 0..3)]);
258 }
259}