1use std::any::Any;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use deepsize::DeepSizeOf;
10use lance_core::Error;
11use lance_table::format::pb;
12use roaring::RoaringBitmap;
13use serde::{Deserialize, Serialize};
14use uuid::Uuid;
15
16use crate::{Index, IndexType};
17
18pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
19
20pub type ShardId = Uuid;
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
25pub struct FlushedGeneration {
26 pub generation: u64,
27 pub path: String,
28}
29
30impl From<&FlushedGeneration> for pb::FlushedGeneration {
31 fn from(fg: &FlushedGeneration) -> Self {
32 Self {
33 generation: fg.generation,
34 path: fg.path.clone(),
35 }
36 }
37}
38
39impl From<pb::FlushedGeneration> for FlushedGeneration {
40 fn from(fg: pb::FlushedGeneration) -> Self {
41 Self {
42 generation: fg.generation,
43 path: fg.path,
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
50pub struct MergedGeneration {
51 pub shard_id: Uuid,
52 pub generation: u64,
53}
54
55impl DeepSizeOf for MergedGeneration {
56 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
57 0 }
59}
60
61impl MergedGeneration {
62 pub fn new(shard_id: Uuid, generation: u64) -> Self {
63 Self {
64 shard_id,
65 generation,
66 }
67 }
68}
69
70impl From<&MergedGeneration> for pb::MergedGeneration {
71 fn from(mg: &MergedGeneration) -> Self {
72 Self {
73 shard_id: Some((&mg.shard_id).into()),
74 generation: mg.generation,
75 }
76 }
77}
78
79impl TryFrom<pb::MergedGeneration> for MergedGeneration {
80 type Error = Error;
81
82 fn try_from(mg: pb::MergedGeneration) -> lance_core::Result<Self> {
83 let shard_id = mg
84 .shard_id
85 .as_ref()
86 .map(Uuid::try_from)
87 .ok_or_else(|| Error::invalid_input("Missing shard_id in MergedGeneration"))??;
88 Ok(Self {
89 shard_id,
90 generation: mg.generation,
91 })
92 }
93}
94
95#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
98pub struct IndexCatchupProgress {
99 pub index_name: String,
100 pub caught_up_generations: Vec<MergedGeneration>,
101}
102
103impl IndexCatchupProgress {
104 pub fn new(index_name: String, caught_up_generations: Vec<MergedGeneration>) -> Self {
105 Self {
106 index_name,
107 caught_up_generations,
108 }
109 }
110
111 pub fn caught_up_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
114 self.caught_up_generations
115 .iter()
116 .find(|mg| &mg.shard_id == shard_id)
117 .map(|mg| mg.generation)
118 }
119}
120
121impl From<&IndexCatchupProgress> for pb::IndexCatchupProgress {
122 fn from(icp: &IndexCatchupProgress) -> Self {
123 Self {
124 index_name: icp.index_name.clone(),
125 caught_up_generations: icp
126 .caught_up_generations
127 .iter()
128 .map(|mg| mg.into())
129 .collect(),
130 }
131 }
132}
133
134impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
135 type Error = Error;
136
137 fn try_from(icp: pb::IndexCatchupProgress) -> lance_core::Result<Self> {
138 Ok(Self {
139 index_name: icp.index_name,
140 caught_up_generations: icp
141 .caught_up_generations
142 .into_iter()
143 .map(MergedGeneration::try_from)
144 .collect::<lance_core::Result<_>>()?,
145 })
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
152pub struct ShardManifest {
153 pub shard_id: Uuid,
154 pub version: u64,
155 pub shard_spec_id: u32,
156 pub writer_epoch: u64,
157 pub replay_after_wal_entry_position: u64,
160 pub wal_entry_position_last_seen: u64,
162 pub current_generation: u64,
163 pub flushed_generations: Vec<FlushedGeneration>,
164}
165
166impl DeepSizeOf for ShardManifest {
167 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
168 self.flushed_generations.deep_size_of_children(context)
169 }
170}
171
172impl From<&ShardManifest> for pb::ShardManifest {
173 fn from(rm: &ShardManifest) -> Self {
174 Self {
175 shard_id: Some((&rm.shard_id).into()),
176 version: rm.version,
177 shard_spec_id: rm.shard_spec_id,
178 writer_epoch: rm.writer_epoch,
179 replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
180 wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
181 current_generation: rm.current_generation,
182 flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
183 }
184 }
185}
186
187impl TryFrom<pb::ShardManifest> for ShardManifest {
188 type Error = Error;
189
190 fn try_from(rm: pb::ShardManifest) -> lance_core::Result<Self> {
191 let shard_id = rm
192 .shard_id
193 .as_ref()
194 .map(Uuid::try_from)
195 .ok_or_else(|| Error::invalid_input("Missing shard_id in ShardManifest"))??;
196 Ok(Self {
197 shard_id,
198 version: rm.version,
199 shard_spec_id: rm.shard_spec_id,
200 writer_epoch: rm.writer_epoch,
201 replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
202 wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
203 current_generation: rm.current_generation,
204 flushed_generations: rm
205 .flushed_generations
206 .into_iter()
207 .map(FlushedGeneration::from)
208 .collect(),
209 })
210 }
211}
212
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
215pub struct ShardField {
216 pub field_id: String,
217 pub source_ids: Vec<i32>,
218 pub transform: Option<String>,
219 pub expression: Option<String>,
220 pub result_type: String,
221 pub parameters: HashMap<String, String>,
222}
223
224impl From<&ShardField> for pb::ShardField {
225 fn from(rf: &ShardField) -> Self {
226 Self {
227 field_id: rf.field_id.clone(),
228 source_ids: rf.source_ids.clone(),
229 transform: rf.transform.clone(),
230 expression: rf.expression.clone(),
231 result_type: rf.result_type.clone(),
232 parameters: rf.parameters.clone(),
233 }
234 }
235}
236
237impl From<pb::ShardField> for ShardField {
238 fn from(rf: pb::ShardField) -> Self {
239 Self {
240 field_id: rf.field_id,
241 source_ids: rf.source_ids,
242 transform: rf.transform,
243 expression: rf.expression,
244 result_type: rf.result_type,
245 parameters: rf.parameters,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
252pub struct ShardSpec {
253 pub spec_id: u32,
254 pub fields: Vec<ShardField>,
255}
256
257impl From<&ShardSpec> for pb::ShardSpec {
258 fn from(rs: &ShardSpec) -> Self {
259 Self {
260 spec_id: rs.spec_id,
261 fields: rs.fields.iter().map(|f| f.into()).collect(),
262 }
263 }
264}
265
266impl From<pb::ShardSpec> for ShardSpec {
267 fn from(rs: pb::ShardSpec) -> Self {
268 Self {
269 spec_id: rs.spec_id,
270 fields: rs.fields.into_iter().map(ShardField::from).collect(),
271 }
272 }
273}
274
275#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
277pub struct MemWalIndexDetails {
278 pub snapshot_ts_millis: i64,
279 pub num_shards: u32,
280 pub inline_snapshots: Option<Vec<u8>>,
281 pub shard_specs: Vec<ShardSpec>,
282 pub maintained_indexes: Vec<String>,
283 pub merged_generations: Vec<MergedGeneration>,
284 pub index_catchup: Vec<IndexCatchupProgress>,
285}
286
287impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
288 fn from(details: &MemWalIndexDetails) -> Self {
289 Self {
290 snapshot_ts_millis: details.snapshot_ts_millis,
291 num_shards: details.num_shards,
292 inline_snapshots: details.inline_snapshots.clone(),
293 shard_specs: details.shard_specs.iter().map(|rs| rs.into()).collect(),
294 maintained_indexes: details.maintained_indexes.clone(),
295 merged_generations: details
296 .merged_generations
297 .iter()
298 .map(|mg| mg.into())
299 .collect(),
300 index_catchup: details.index_catchup.iter().map(|icp| icp.into()).collect(),
301 }
302 }
303}
304
305impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
306 type Error = Error;
307
308 fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
309 Ok(Self {
310 snapshot_ts_millis: details.snapshot_ts_millis,
311 num_shards: details.num_shards,
312 inline_snapshots: details.inline_snapshots,
313 shard_specs: details
314 .shard_specs
315 .into_iter()
316 .map(ShardSpec::from)
317 .collect(),
318 maintained_indexes: details.maintained_indexes,
319 merged_generations: details
320 .merged_generations
321 .into_iter()
322 .map(MergedGeneration::try_from)
323 .collect::<lance_core::Result<_>>()?,
324 index_catchup: details
325 .index_catchup
326 .into_iter()
327 .map(IndexCatchupProgress::try_from)
328 .collect::<lance_core::Result<_>>()?,
329 })
330 }
331}
332
333#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
335pub struct MemWalIndex {
336 pub details: MemWalIndexDetails,
337}
338
339impl MemWalIndex {
340 pub fn new(details: MemWalIndexDetails) -> Self {
341 Self { details }
342 }
343
344 pub fn merged_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
345 self.details
346 .merged_generations
347 .iter()
348 .find(|mg| &mg.shard_id == shard_id)
349 .map(|mg| mg.generation)
350 }
351
352 pub fn index_caught_up_generation(&self, index_name: &str, shard_id: &Uuid) -> Option<u64> {
355 self.details
356 .index_catchup
357 .iter()
358 .find(|icp| icp.index_name == index_name)
359 .and_then(|icp| icp.caught_up_generation_for_shard(shard_id))
360 }
361
362 pub fn is_index_caught_up(&self, index_name: &str, shard_id: &Uuid) -> bool {
365 let merged_gen = self.merged_generation_for_shard(shard_id).unwrap_or(0);
366 let caught_up_gen = self.index_caught_up_generation(index_name, shard_id);
367
368 caught_up_gen.is_none_or(|generation| generation >= merged_gen)
370 }
371}
372
373#[derive(Serialize)]
374struct MemWalStatistics {
375 num_shards: u32,
376 num_merged_generations: usize,
377 num_shard_specs: usize,
378 num_maintained_indexes: usize,
379 num_index_catchup_entries: usize,
380}
381
382#[async_trait]
383impl Index for MemWalIndex {
384 fn as_any(&self) -> &dyn Any {
385 self
386 }
387
388 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
389 self
390 }
391
392 fn as_vector_index(self: Arc<Self>) -> lance_core::Result<Arc<dyn crate::vector::VectorIndex>> {
393 Err(Error::not_supported_source(
394 "MemWalIndex is not a vector index".into(),
395 ))
396 }
397
398 fn statistics(&self) -> lance_core::Result<serde_json::Value> {
399 let stats = MemWalStatistics {
400 num_shards: self.details.num_shards,
401 num_merged_generations: self.details.merged_generations.len(),
402 num_shard_specs: self.details.shard_specs.len(),
403 num_maintained_indexes: self.details.maintained_indexes.len(),
404 num_index_catchup_entries: self.details.index_catchup.len(),
405 };
406 serde_json::to_value(stats).map_err(|e| {
407 Error::internal(format!(
408 "failed to serialize MemWAL index statistics: {}",
409 e
410 ))
411 })
412 }
413
414 async fn prewarm(&self) -> lance_core::Result<()> {
415 Ok(())
416 }
417
418 fn index_type(&self) -> IndexType {
419 IndexType::MemWal
420 }
421
422 async fn calculate_included_frags(&self) -> lance_core::Result<RoaringBitmap> {
423 Ok(RoaringBitmap::new())
424 }
425}