1use std::collections::HashMap;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use futures::StreamExt;
11use lance_core::deepsize::DeepSizeOf;
12use lance_io::object_store::ObjectStore;
13use object_store::path::Path;
14use roaring::RoaringBitmap;
15use uuid::Uuid;
16
17use super::pb;
18use lance_core::cache::{CacheEntryReader, CacheEntryWriter};
19use lance_core::{Error, Result};
20
21#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
23pub struct IndexFile {
24 pub path: String,
26 pub size_bytes: u64,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct IndexMetadata {
33 pub uuid: Uuid,
35
36 pub fields: Vec<i32>,
38
39 pub name: String,
41
42 pub dataset_version: u64,
47
48 pub fragment_bitmap: Option<RoaringBitmap>,
54
55 pub index_details: Option<Arc<prost_types::Any>>,
60
61 pub index_version: i32,
63
64 pub created_at: Option<DateTime<Utc>>,
69
70 pub base_id: Option<u32>,
73
74 pub files: Option<Vec<IndexFile>>,
80}
81
82impl IndexMetadata {
83 pub fn effective_fragment_bitmap(
84 &self,
85 existing_fragments: &RoaringBitmap,
86 ) -> Option<RoaringBitmap> {
87 let fragment_bitmap = self.fragment_bitmap.as_ref()?;
88 Some(fragment_bitmap & existing_fragments)
89 }
90
91 pub fn file_size_map(&self) -> HashMap<String, u64> {
94 self.files
95 .as_ref()
96 .map(|files| {
97 files
98 .iter()
99 .map(|f| (f.path.clone(), f.size_bytes))
100 .collect()
101 })
102 .unwrap_or_default()
103 }
104
105 pub fn total_size_bytes(&self) -> Option<u64> {
108 self.files
109 .as_ref()
110 .map(|files| files.iter().map(|f| f.size_bytes).sum())
111 }
112
113 pub fn deleted_fragment_bitmap(
116 &self,
117 existing_fragments: &RoaringBitmap,
118 ) -> Option<RoaringBitmap> {
119 let fragment_bitmap = self.fragment_bitmap.as_ref()?;
120 Some(fragment_bitmap - existing_fragments)
121 }
122}
123
124impl DeepSizeOf for IndexMetadata {
125 fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
126 self.uuid.as_bytes().deep_size_of_children(context)
127 + self.fields.deep_size_of_children(context)
128 + self.name.deep_size_of_children(context)
129 + self.dataset_version.deep_size_of_children(context)
130 + self
131 .fragment_bitmap
132 .as_ref()
133 .map(|fragment_bitmap| fragment_bitmap.serialized_size())
134 .unwrap_or(0)
135 + self.files.deep_size_of_children(context)
136 }
137}
138
139impl TryFrom<pb::IndexMetadata> for IndexMetadata {
140 type Error = Error;
141
142 fn try_from(proto: pb::IndexMetadata) -> Result<Self> {
143 let fragment_bitmap = if proto.fragment_bitmap.is_empty() {
144 None
145 } else {
146 Some(RoaringBitmap::deserialize_from(
147 &mut proto.fragment_bitmap.as_slice(),
148 )?)
149 };
150
151 let files = if proto.files.is_empty() {
152 None
153 } else {
154 Some(
155 proto
156 .files
157 .into_iter()
158 .map(|f| IndexFile {
159 path: f.path,
160 size_bytes: f.size_bytes,
161 })
162 .collect(),
163 )
164 };
165
166 Ok(Self {
167 uuid: proto.uuid.as_ref().map(Uuid::try_from).ok_or_else(|| {
168 Error::invalid_input("uuid field does not exist in Index metadata".to_string())
169 })??,
170 name: proto.name,
171 fields: proto.fields,
172 dataset_version: proto.dataset_version,
173 fragment_bitmap,
174 index_details: proto.index_details.map(Arc::new),
175 index_version: proto.index_version.unwrap_or_default(),
176 created_at: proto.created_at.map(|ts| {
177 DateTime::from_timestamp_millis(ts as i64)
178 .expect("Invalid timestamp in index metadata")
179 }),
180 base_id: proto.base_id,
181 files,
182 })
183 }
184}
185
186impl From<&IndexMetadata> for pb::IndexMetadata {
187 fn from(idx: &IndexMetadata) -> Self {
188 let mut fragment_bitmap = Vec::new();
189 if let Some(bitmap) = &idx.fragment_bitmap
190 && let Err(e) = bitmap.serialize_into(&mut fragment_bitmap)
191 {
192 log::error!("Failed to serialize fragment bitmap: {}", e);
195 fragment_bitmap.clear();
196 }
197
198 let files = idx
199 .files
200 .as_ref()
201 .map(|files| {
202 files
203 .iter()
204 .map(|f| pb::IndexFile {
205 path: f.path.clone(),
206 size_bytes: f.size_bytes,
207 })
208 .collect()
209 })
210 .unwrap_or_default();
211
212 Self {
213 uuid: Some((&idx.uuid).into()),
214 name: idx.name.clone(),
215 fields: idx.fields.clone(),
216 dataset_version: idx.dataset_version,
217 fragment_bitmap,
218 index_details: idx
219 .index_details
220 .as_ref()
221 .map(|details| details.as_ref().clone()),
222 index_version: Some(idx.index_version),
223 created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64),
224 base_id: idx.base_id,
225 files,
226 }
227 }
228}
229
230type ArcAny = Arc<dyn std::any::Any + Send + Sync>;
238
239const INDEX_METADATA_TYPE_ID: &str = "lance.table.IndexMetadataList";
241const INDEX_METADATA_VERSION: u32 = 1;
243
244fn serialize_index_metadata(
245 any: &ArcAny,
246 writer: &mut CacheEntryWriter<'_>,
247) -> lance_core::Result<()> {
248 let vec = any
249 .downcast_ref::<Vec<IndexMetadata>>()
250 .expect("index_metadata_codec: wrong type (this is a bug in the cache layer)");
251 let section = pb::IndexSection {
252 indices: vec.iter().map(pb::IndexMetadata::from).collect(),
253 };
254 writer.write_header(§ion)
255}
256
257fn deserialize_index_metadata(reader: &mut CacheEntryReader<'_>) -> lance_core::Result<ArcAny> {
258 let section: pb::IndexSection = reader.read_header()?;
259 let indices: Vec<IndexMetadata> = section
260 .indices
261 .into_iter()
262 .map(IndexMetadata::try_from)
263 .collect::<lance_core::Result<_>>()?;
264 Ok(Arc::new(indices))
265}
266
267pub fn index_metadata_codec() -> lance_core::cache::CacheCodec {
268 lance_core::cache::CacheCodec::new(
269 INDEX_METADATA_TYPE_ID,
270 INDEX_METADATA_VERSION,
271 serialize_index_metadata,
272 deserialize_index_metadata,
273 )
274}
275
276pub async fn list_index_files_with_sizes(
281 object_store: &ObjectStore,
282 index_dir: &Path,
283) -> Result<Vec<IndexFile>> {
284 let mut files = Vec::new();
285 let mut stream = object_store.read_dir_all(index_dir, None);
286 while let Some(meta) = stream.next().await {
287 let meta = meta?;
288 let relative_path = meta
290 .location
291 .as_ref()
292 .strip_prefix(index_dir.as_ref())
293 .map(|s| s.trim_start_matches('/').to_string())
294 .unwrap_or_else(|| meta.location.filename().unwrap_or("").to_string());
295 files.push(IndexFile {
296 path: relative_path,
297 size_bytes: meta.size,
298 });
299 }
300 Ok(files)
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use std::collections::HashMap;
307
308 #[test]
312 fn test_index_metadata_codec_roundtrip() {
313 let codec = index_metadata_codec();
314
315 let original = vec![
316 IndexMetadata {
317 uuid: Uuid::new_v4(),
318 name: "my_index".to_string(),
319 fields: vec![0, 1],
320 dataset_version: 42,
321 fragment_bitmap: Some(RoaringBitmap::from_iter([1, 2, 3])),
322 index_details: None,
323 index_version: 1,
324 created_at: None,
325 base_id: None,
326 files: Some(vec![IndexFile {
327 path: "index.idx".to_string(),
328 size_bytes: 1024,
329 }]),
330 },
331 IndexMetadata {
332 uuid: Uuid::new_v4(),
333 name: "second_index".to_string(),
334 fields: vec![2],
335 dataset_version: 43,
336 fragment_bitmap: None,
337 index_details: None,
338 index_version: 2,
339 created_at: None,
340 base_id: Some(7),
341 files: None,
342 },
343 ];
344
345 let mut store: HashMap<String, Vec<u8>> = HashMap::new();
347
348 let key = "dataset/v42/Vec<IndexMetadata>".to_string();
350 let mut buf = Vec::new();
351 let entry: Arc<dyn std::any::Any + Send + Sync> = Arc::new(original.clone());
352 codec.serialize(&entry, &mut buf).unwrap();
353 store.insert(key.clone(), buf);
354
355 let bytes = store.get(&key).unwrap();
357 let recovered = codec
358 .deserialize(&bytes::Bytes::copy_from_slice(bytes))
359 .hit()
360 .expect("entry should decode as a hit");
361 let recovered = recovered
362 .downcast::<Vec<IndexMetadata>>()
363 .expect("downcast should succeed");
364
365 assert_eq!(original.len(), recovered.len());
366 for (orig, rec) in original.iter().zip(recovered.iter()) {
367 assert_eq!(orig.uuid, rec.uuid);
368 assert_eq!(orig.name, rec.name);
369 assert_eq!(orig.fields, rec.fields);
370 assert_eq!(orig.dataset_version, rec.dataset_version);
371 assert_eq!(orig.fragment_bitmap, rec.fragment_bitmap);
372 assert_eq!(orig.index_version, rec.index_version);
373 assert_eq!(orig.base_id, rec.base_id);
374 assert_eq!(orig.files, rec.files);
375 }
376 }
377}