lance_table/format/
index.rs1use std::collections::HashMap;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use deepsize::DeepSizeOf;
11use futures::StreamExt;
12use lance_io::object_store::ObjectStore;
13use object_store::path::Path;
14use roaring::RoaringBitmap;
15use uuid::Uuid;
16
17use super::pb;
18use lance_core::{Error, Result};
19
20#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
22pub struct IndexFile {
23 pub path: String,
25 pub size_bytes: u64,
27}
28
29#[derive(Debug, Clone, PartialEq)]
31pub struct IndexMetadata {
32 pub uuid: Uuid,
34
35 pub fields: Vec<i32>,
37
38 pub name: String,
40
41 pub dataset_version: u64,
46
47 pub fragment_bitmap: Option<RoaringBitmap>,
53
54 pub index_details: Option<Arc<prost_types::Any>>,
59
60 pub index_version: i32,
62
63 pub created_at: Option<DateTime<Utc>>,
68
69 pub base_id: Option<u32>,
72
73 pub files: Option<Vec<IndexFile>>,
79}
80
81impl IndexMetadata {
82 pub fn effective_fragment_bitmap(
83 &self,
84 existing_fragments: &RoaringBitmap,
85 ) -> Option<RoaringBitmap> {
86 let fragment_bitmap = self.fragment_bitmap.as_ref()?;
87 Some(fragment_bitmap & existing_fragments)
88 }
89
90 pub fn file_size_map(&self) -> HashMap<String, u64> {
93 self.files
94 .as_ref()
95 .map(|files| {
96 files
97 .iter()
98 .map(|f| (f.path.clone(), f.size_bytes))
99 .collect()
100 })
101 .unwrap_or_default()
102 }
103
104 pub fn total_size_bytes(&self) -> Option<u64> {
107 self.files
108 .as_ref()
109 .map(|files| files.iter().map(|f| f.size_bytes).sum())
110 }
111
112 pub fn deleted_fragment_bitmap(
115 &self,
116 existing_fragments: &RoaringBitmap,
117 ) -> Option<RoaringBitmap> {
118 let fragment_bitmap = self.fragment_bitmap.as_ref()?;
119 Some(fragment_bitmap - existing_fragments)
120 }
121}
122
123impl DeepSizeOf for IndexMetadata {
124 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
125 self.uuid.as_bytes().deep_size_of_children(context)
126 + self.fields.deep_size_of_children(context)
127 + self.name.deep_size_of_children(context)
128 + self.dataset_version.deep_size_of_children(context)
129 + self
130 .fragment_bitmap
131 .as_ref()
132 .map(|fragment_bitmap| fragment_bitmap.serialized_size())
133 .unwrap_or(0)
134 + self.files.deep_size_of_children(context)
135 }
136}
137
138impl TryFrom<pb::IndexMetadata> for IndexMetadata {
139 type Error = Error;
140
141 fn try_from(proto: pb::IndexMetadata) -> Result<Self> {
142 let fragment_bitmap = if proto.fragment_bitmap.is_empty() {
143 None
144 } else {
145 Some(RoaringBitmap::deserialize_from(
146 &mut proto.fragment_bitmap.as_slice(),
147 )?)
148 };
149
150 let files = if proto.files.is_empty() {
151 None
152 } else {
153 Some(
154 proto
155 .files
156 .into_iter()
157 .map(|f| IndexFile {
158 path: f.path,
159 size_bytes: f.size_bytes,
160 })
161 .collect(),
162 )
163 };
164
165 Ok(Self {
166 uuid: proto.uuid.as_ref().map(Uuid::try_from).ok_or_else(|| {
167 Error::invalid_input("uuid field does not exist in Index metadata".to_string())
168 })??,
169 name: proto.name,
170 fields: proto.fields,
171 dataset_version: proto.dataset_version,
172 fragment_bitmap,
173 index_details: proto.index_details.map(Arc::new),
174 index_version: proto.index_version.unwrap_or_default(),
175 created_at: proto.created_at.map(|ts| {
176 DateTime::from_timestamp_millis(ts as i64)
177 .expect("Invalid timestamp in index metadata")
178 }),
179 base_id: proto.base_id,
180 files,
181 })
182 }
183}
184
185impl From<&IndexMetadata> for pb::IndexMetadata {
186 fn from(idx: &IndexMetadata) -> Self {
187 let mut fragment_bitmap = Vec::new();
188 if let Some(bitmap) = &idx.fragment_bitmap
189 && let Err(e) = bitmap.serialize_into(&mut fragment_bitmap)
190 {
191 log::error!("Failed to serialize fragment bitmap: {}", e);
194 fragment_bitmap.clear();
195 }
196
197 let files = idx
198 .files
199 .as_ref()
200 .map(|files| {
201 files
202 .iter()
203 .map(|f| pb::IndexFile {
204 path: f.path.clone(),
205 size_bytes: f.size_bytes,
206 })
207 .collect()
208 })
209 .unwrap_or_default();
210
211 Self {
212 uuid: Some((&idx.uuid).into()),
213 name: idx.name.clone(),
214 fields: idx.fields.clone(),
215 dataset_version: idx.dataset_version,
216 fragment_bitmap,
217 index_details: idx
218 .index_details
219 .as_ref()
220 .map(|details| details.as_ref().clone()),
221 index_version: Some(idx.index_version),
222 created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64),
223 base_id: idx.base_id,
224 files,
225 }
226 }
227}
228
229pub async fn list_index_files_with_sizes(
234 object_store: &ObjectStore,
235 index_dir: &Path,
236) -> Result<Vec<IndexFile>> {
237 let mut files = Vec::new();
238 let mut stream = object_store.read_dir_all(index_dir, None);
239 while let Some(meta) = stream.next().await {
240 let meta = meta?;
241 let relative_path = meta
243 .location
244 .as_ref()
245 .strip_prefix(index_dir.as_ref())
246 .map(|s| s.trim_start_matches('/').to_string())
247 .unwrap_or_else(|| meta.location.filename().unwrap_or("").to_string());
248 files.push(IndexFile {
249 path: relative_path,
250 size_bytes: meta.size,
251 });
252 }
253 Ok(files)
254}