1use std::collections::HashMap;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use deepsize::DeepSizeOf;
11use futures::StreamExt;
12use lance_io::object_store::ObjectStore;
13use object_store::path::Path;
14use roaring::RoaringBitmap;
15use uuid::Uuid;
16
17use super::pb;
18use lance_core::{Error, Result};
19
20#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
22pub struct IndexFile {
23 pub path: String,
25 pub size_bytes: u64,
27}
28
29#[derive(Debug, Clone, PartialEq)]
31pub struct IndexMetadata {
32 pub uuid: Uuid,
34
35 pub fields: Vec<i32>,
37
38 pub name: String,
40
41 pub dataset_version: u64,
46
47 pub fragment_bitmap: Option<RoaringBitmap>,
53
54 pub index_details: Option<Arc<prost_types::Any>>,
59
60 pub index_version: i32,
62
63 pub created_at: Option<DateTime<Utc>>,
68
69 pub base_id: Option<u32>,
72
73 pub files: Option<Vec<IndexFile>>,
79}
80
81impl IndexMetadata {
82 pub fn effective_fragment_bitmap(
83 &self,
84 existing_fragments: &RoaringBitmap,
85 ) -> Option<RoaringBitmap> {
86 let fragment_bitmap = self.fragment_bitmap.as_ref()?;
87 Some(fragment_bitmap & existing_fragments)
88 }
89
90 pub fn file_size_map(&self) -> HashMap<String, u64> {
93 self.files
94 .as_ref()
95 .map(|files| {
96 files
97 .iter()
98 .map(|f| (f.path.clone(), f.size_bytes))
99 .collect()
100 })
101 .unwrap_or_default()
102 }
103
104 pub fn total_size_bytes(&self) -> Option<u64> {
107 self.files
108 .as_ref()
109 .map(|files| files.iter().map(|f| f.size_bytes).sum())
110 }
111
112 pub fn deleted_fragment_bitmap(
115 &self,
116 existing_fragments: &RoaringBitmap,
117 ) -> Option<RoaringBitmap> {
118 let fragment_bitmap = self.fragment_bitmap.as_ref()?;
119 Some(fragment_bitmap - existing_fragments)
120 }
121}
122
123impl DeepSizeOf for IndexMetadata {
124 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
125 self.uuid.as_bytes().deep_size_of_children(context)
126 + self.fields.deep_size_of_children(context)
127 + self.name.deep_size_of_children(context)
128 + self.dataset_version.deep_size_of_children(context)
129 + self
130 .fragment_bitmap
131 .as_ref()
132 .map(|fragment_bitmap| fragment_bitmap.serialized_size())
133 .unwrap_or(0)
134 + self.files.deep_size_of_children(context)
135 }
136}
137
138impl TryFrom<pb::IndexMetadata> for IndexMetadata {
139 type Error = Error;
140
141 fn try_from(proto: pb::IndexMetadata) -> Result<Self> {
142 let fragment_bitmap = if proto.fragment_bitmap.is_empty() {
143 None
144 } else {
145 Some(RoaringBitmap::deserialize_from(
146 &mut proto.fragment_bitmap.as_slice(),
147 )?)
148 };
149
150 let files = if proto.files.is_empty() {
151 None
152 } else {
153 Some(
154 proto
155 .files
156 .into_iter()
157 .map(|f| IndexFile {
158 path: f.path,
159 size_bytes: f.size_bytes,
160 })
161 .collect(),
162 )
163 };
164
165 Ok(Self {
166 uuid: proto.uuid.as_ref().map(Uuid::try_from).ok_or_else(|| {
167 Error::invalid_input("uuid field does not exist in Index metadata".to_string())
168 })??,
169 name: proto.name,
170 fields: proto.fields,
171 dataset_version: proto.dataset_version,
172 fragment_bitmap,
173 index_details: proto.index_details.map(Arc::new),
174 index_version: proto.index_version.unwrap_or_default(),
175 created_at: proto.created_at.map(|ts| {
176 DateTime::from_timestamp_millis(ts as i64)
177 .expect("Invalid timestamp in index metadata")
178 }),
179 base_id: proto.base_id,
180 files,
181 })
182 }
183}
184
185impl From<&IndexMetadata> for pb::IndexMetadata {
186 fn from(idx: &IndexMetadata) -> Self {
187 let mut fragment_bitmap = Vec::new();
188 if let Some(bitmap) = &idx.fragment_bitmap
189 && let Err(e) = bitmap.serialize_into(&mut fragment_bitmap)
190 {
191 log::error!("Failed to serialize fragment bitmap: {}", e);
194 fragment_bitmap.clear();
195 }
196
197 let files = idx
198 .files
199 .as_ref()
200 .map(|files| {
201 files
202 .iter()
203 .map(|f| pb::IndexFile {
204 path: f.path.clone(),
205 size_bytes: f.size_bytes,
206 })
207 .collect()
208 })
209 .unwrap_or_default();
210
211 Self {
212 uuid: Some((&idx.uuid).into()),
213 name: idx.name.clone(),
214 fields: idx.fields.clone(),
215 dataset_version: idx.dataset_version,
216 fragment_bitmap,
217 index_details: idx
218 .index_details
219 .as_ref()
220 .map(|details| details.as_ref().clone()),
221 index_version: Some(idx.index_version),
222 created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64),
223 base_id: idx.base_id,
224 files,
225 }
226 }
227}
228
229type ArcAny = Arc<dyn std::any::Any + Send + Sync>;
237
238fn serialize_index_metadata(
239 any: &ArcAny,
240 writer: &mut dyn std::io::Write,
241) -> lance_core::Result<()> {
242 use prost::Message;
243 let vec = any
244 .downcast_ref::<Vec<IndexMetadata>>()
245 .expect("index_metadata_codec: wrong type (this is a bug in the cache layer)");
246 let section = pb::IndexSection {
247 indices: vec.iter().map(pb::IndexMetadata::from).collect(),
248 };
249 writer.write_all(§ion.encode_to_vec())?;
250 Ok(())
251}
252
253fn deserialize_index_metadata(data: &bytes::Bytes) -> lance_core::Result<ArcAny> {
254 use prost::Message;
255 let section = pb::IndexSection::decode(data.as_ref())?;
256 let indices: Vec<IndexMetadata> = section
257 .indices
258 .into_iter()
259 .map(IndexMetadata::try_from)
260 .collect::<lance_core::Result<_>>()?;
261 Ok(Arc::new(indices))
262}
263
264pub fn index_metadata_codec() -> lance_core::cache::CacheCodec {
265 lance_core::cache::CacheCodec::new(serialize_index_metadata, deserialize_index_metadata)
266}
267
268pub async fn list_index_files_with_sizes(
273 object_store: &ObjectStore,
274 index_dir: &Path,
275) -> Result<Vec<IndexFile>> {
276 let mut files = Vec::new();
277 let mut stream = object_store.read_dir_all(index_dir, None);
278 while let Some(meta) = stream.next().await {
279 let meta = meta?;
280 let relative_path = meta
282 .location
283 .as_ref()
284 .strip_prefix(index_dir.as_ref())
285 .map(|s| s.trim_start_matches('/').to_string())
286 .unwrap_or_else(|| meta.location.filename().unwrap_or("").to_string());
287 files.push(IndexFile {
288 path: relative_path,
289 size_bytes: meta.size,
290 });
291 }
292 Ok(files)
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use std::collections::HashMap;
299
300 #[test]
304 fn test_index_metadata_codec_roundtrip() {
305 let codec = index_metadata_codec();
306
307 let original = vec![
308 IndexMetadata {
309 uuid: Uuid::new_v4(),
310 name: "my_index".to_string(),
311 fields: vec![0, 1],
312 dataset_version: 42,
313 fragment_bitmap: Some(RoaringBitmap::from_iter([1, 2, 3])),
314 index_details: None,
315 index_version: 1,
316 created_at: None,
317 base_id: None,
318 files: Some(vec![IndexFile {
319 path: "index.idx".to_string(),
320 size_bytes: 1024,
321 }]),
322 },
323 IndexMetadata {
324 uuid: Uuid::new_v4(),
325 name: "second_index".to_string(),
326 fields: vec![2],
327 dataset_version: 43,
328 fragment_bitmap: None,
329 index_details: None,
330 index_version: 2,
331 created_at: None,
332 base_id: Some(7),
333 files: None,
334 },
335 ];
336
337 let mut store: HashMap<String, Vec<u8>> = HashMap::new();
339
340 let key = "dataset/v42/Vec<IndexMetadata>".to_string();
342 let mut buf = Vec::new();
343 let entry: Arc<dyn std::any::Any + Send + Sync> = Arc::new(original.clone());
344 codec.serialize(&entry, &mut buf).unwrap();
345 store.insert(key.clone(), buf);
346
347 let bytes = store.get(&key).unwrap();
349 let recovered = codec
350 .deserialize(&bytes::Bytes::copy_from_slice(bytes))
351 .unwrap();
352 let recovered = recovered
353 .downcast::<Vec<IndexMetadata>>()
354 .expect("downcast should succeed");
355
356 assert_eq!(original.len(), recovered.len());
357 for (orig, rec) in original.iter().zip(recovered.iter()) {
358 assert_eq!(orig.uuid, rec.uuid);
359 assert_eq!(orig.name, rec.name);
360 assert_eq!(orig.fields, rec.fields);
361 assert_eq!(orig.dataset_version, rec.dataset_version);
362 assert_eq!(orig.fragment_bitmap, rec.fragment_bitmap);
363 assert_eq!(orig.index_version, rec.index_version);
364 assert_eq!(orig.base_id, rec.base_id);
365 assert_eq!(orig.files, rec.files);
366 }
367 }
368}