Skip to main content

lance_index/
mem_wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::any::Any;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use deepsize::DeepSizeOf;
10use lance_core::Error;
11use lance_table::format::pb;
12use roaring::RoaringBitmap;
13use serde::{Deserialize, Serialize};
14use uuid::Uuid;
15
16use crate::{Index, IndexType};
17
18pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
19
20/// Type alias for shard identifier (UUID v4).
21pub type ShardId = Uuid;
22
23/// A flushed MemTable generation and its storage location.
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
25pub struct FlushedGeneration {
26    pub generation: u64,
27    pub path: String,
28}
29
30impl From<&FlushedGeneration> for pb::FlushedGeneration {
31    fn from(fg: &FlushedGeneration) -> Self {
32        Self {
33            generation: fg.generation,
34            path: fg.path.clone(),
35        }
36    }
37}
38
39impl From<pb::FlushedGeneration> for FlushedGeneration {
40    fn from(fg: pb::FlushedGeneration) -> Self {
41        Self {
42            generation: fg.generation,
43            path: fg.path,
44        }
45    }
46}
47
48/// A shard's merged generation, used in MemWalIndexDetails.
49#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
50pub struct MergedGeneration {
51    pub shard_id: Uuid,
52    pub generation: u64,
53}
54
55impl DeepSizeOf for MergedGeneration {
56    fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
57        0 // UUID is 16 bytes fixed size, no heap allocations
58    }
59}
60
61impl MergedGeneration {
62    pub fn new(shard_id: Uuid, generation: u64) -> Self {
63        Self {
64            shard_id,
65            generation,
66        }
67    }
68}
69
70impl From<&MergedGeneration> for pb::MergedGeneration {
71    fn from(mg: &MergedGeneration) -> Self {
72        Self {
73            shard_id: Some((&mg.shard_id).into()),
74            generation: mg.generation,
75        }
76    }
77}
78
79impl TryFrom<pb::MergedGeneration> for MergedGeneration {
80    type Error = Error;
81
82    fn try_from(mg: pb::MergedGeneration) -> lance_core::Result<Self> {
83        let shard_id = mg
84            .shard_id
85            .as_ref()
86            .map(Uuid::try_from)
87            .ok_or_else(|| Error::invalid_input("Missing shard_id in MergedGeneration"))??;
88        Ok(Self {
89            shard_id,
90            generation: mg.generation,
91        })
92    }
93}
94
95/// Tracks which merged generation a base table index has been rebuilt to cover.
96/// Used to determine whether to read from flushed MemTable indexes or base table.
97#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
98pub struct IndexCatchupProgress {
99    pub index_name: String,
100    pub caught_up_generations: Vec<MergedGeneration>,
101}
102
103impl IndexCatchupProgress {
104    pub fn new(index_name: String, caught_up_generations: Vec<MergedGeneration>) -> Self {
105        Self {
106            index_name,
107            caught_up_generations,
108        }
109    }
110
111    /// Get the caught up generation for a specific shard.
112    /// Returns None if the shard is not present (assumed fully caught up).
113    pub fn caught_up_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
114        self.caught_up_generations
115            .iter()
116            .find(|mg| &mg.shard_id == shard_id)
117            .map(|mg| mg.generation)
118    }
119}
120
121impl From<&IndexCatchupProgress> for pb::IndexCatchupProgress {
122    fn from(icp: &IndexCatchupProgress) -> Self {
123        Self {
124            index_name: icp.index_name.clone(),
125            caught_up_generations: icp
126                .caught_up_generations
127                .iter()
128                .map(|mg| mg.into())
129                .collect(),
130        }
131    }
132}
133
134impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
135    type Error = Error;
136
137    fn try_from(icp: pb::IndexCatchupProgress) -> lance_core::Result<Self> {
138        Ok(Self {
139            index_name: icp.index_name,
140            caught_up_generations: icp
141                .caught_up_generations
142                .into_iter()
143                .map(MergedGeneration::try_from)
144                .collect::<lance_core::Result<_>>()?,
145        })
146    }
147}
148
149/// Shard manifest containing epoch-based fencing and WAL state.
150/// Each shard has exactly one active writer at any time.
151#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
152pub struct ShardManifest {
153    pub shard_id: Uuid,
154    pub version: u64,
155    pub shard_spec_id: u32,
156    pub writer_epoch: u64,
157    /// The most recent WAL entry position (0-based) flushed to a MemTable.
158    /// Recovery replays from `replay_after_wal_entry_position + 1`.
159    pub replay_after_wal_entry_position: u64,
160    /// The most recent WAL entry position (0-based) when manifest was updated.
161    pub wal_entry_position_last_seen: u64,
162    pub current_generation: u64,
163    pub flushed_generations: Vec<FlushedGeneration>,
164}
165
166impl DeepSizeOf for ShardManifest {
167    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
168        self.flushed_generations.deep_size_of_children(context)
169    }
170}
171
172impl From<&ShardManifest> for pb::ShardManifest {
173    fn from(rm: &ShardManifest) -> Self {
174        Self {
175            shard_id: Some((&rm.shard_id).into()),
176            version: rm.version,
177            shard_spec_id: rm.shard_spec_id,
178            writer_epoch: rm.writer_epoch,
179            replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
180            wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
181            current_generation: rm.current_generation,
182            flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
183        }
184    }
185}
186
187impl TryFrom<pb::ShardManifest> for ShardManifest {
188    type Error = Error;
189
190    fn try_from(rm: pb::ShardManifest) -> lance_core::Result<Self> {
191        let shard_id = rm
192            .shard_id
193            .as_ref()
194            .map(Uuid::try_from)
195            .ok_or_else(|| Error::invalid_input("Missing shard_id in ShardManifest"))??;
196        Ok(Self {
197            shard_id,
198            version: rm.version,
199            shard_spec_id: rm.shard_spec_id,
200            writer_epoch: rm.writer_epoch,
201            replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
202            wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
203            current_generation: rm.current_generation,
204            flushed_generations: rm
205                .flushed_generations
206                .into_iter()
207                .map(FlushedGeneration::from)
208                .collect(),
209        })
210    }
211}
212
213/// Shard field definition.
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
215pub struct ShardField {
216    pub field_id: String,
217    pub source_ids: Vec<i32>,
218    pub transform: Option<String>,
219    pub expression: Option<String>,
220    pub result_type: String,
221    pub parameters: HashMap<String, String>,
222}
223
224impl From<&ShardField> for pb::ShardField {
225    fn from(rf: &ShardField) -> Self {
226        Self {
227            field_id: rf.field_id.clone(),
228            source_ids: rf.source_ids.clone(),
229            transform: rf.transform.clone(),
230            expression: rf.expression.clone(),
231            result_type: rf.result_type.clone(),
232            parameters: rf.parameters.clone(),
233        }
234    }
235}
236
237impl From<pb::ShardField> for ShardField {
238    fn from(rf: pb::ShardField) -> Self {
239        Self {
240            field_id: rf.field_id,
241            source_ids: rf.source_ids,
242            transform: rf.transform,
243            expression: rf.expression,
244            result_type: rf.result_type,
245            parameters: rf.parameters,
246        }
247    }
248}
249
250/// Shard spec definition.
251#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
252pub struct ShardSpec {
253    pub spec_id: u32,
254    pub fields: Vec<ShardField>,
255}
256
257impl From<&ShardSpec> for pb::ShardSpec {
258    fn from(rs: &ShardSpec) -> Self {
259        Self {
260            spec_id: rs.spec_id,
261            fields: rs.fields.iter().map(|f| f.into()).collect(),
262        }
263    }
264}
265
266impl From<pb::ShardSpec> for ShardSpec {
267    fn from(rs: pb::ShardSpec) -> Self {
268        Self {
269            spec_id: rs.spec_id,
270            fields: rs.fields.into_iter().map(ShardField::from).collect(),
271        }
272    }
273}
274
275/// Index details for MemWAL Index, stored in IndexMetadata.index_details.
276#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
277pub struct MemWalIndexDetails {
278    pub snapshot_ts_millis: i64,
279    pub num_shards: u32,
280    pub inline_snapshots: Option<Vec<u8>>,
281    pub shard_specs: Vec<ShardSpec>,
282    pub maintained_indexes: Vec<String>,
283    pub merged_generations: Vec<MergedGeneration>,
284    pub index_catchup: Vec<IndexCatchupProgress>,
285}
286
287impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
288    fn from(details: &MemWalIndexDetails) -> Self {
289        Self {
290            snapshot_ts_millis: details.snapshot_ts_millis,
291            num_shards: details.num_shards,
292            inline_snapshots: details.inline_snapshots.clone(),
293            shard_specs: details.shard_specs.iter().map(|rs| rs.into()).collect(),
294            maintained_indexes: details.maintained_indexes.clone(),
295            merged_generations: details
296                .merged_generations
297                .iter()
298                .map(|mg| mg.into())
299                .collect(),
300            index_catchup: details.index_catchup.iter().map(|icp| icp.into()).collect(),
301        }
302    }
303}
304
305impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
306    type Error = Error;
307
308    fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
309        Ok(Self {
310            snapshot_ts_millis: details.snapshot_ts_millis,
311            num_shards: details.num_shards,
312            inline_snapshots: details.inline_snapshots,
313            shard_specs: details
314                .shard_specs
315                .into_iter()
316                .map(ShardSpec::from)
317                .collect(),
318            maintained_indexes: details.maintained_indexes,
319            merged_generations: details
320                .merged_generations
321                .into_iter()
322                .map(MergedGeneration::try_from)
323                .collect::<lance_core::Result<_>>()?,
324            index_catchup: details
325                .index_catchup
326                .into_iter()
327                .map(IndexCatchupProgress::try_from)
328                .collect::<lance_core::Result<_>>()?,
329        })
330    }
331}
332
333/// MemWAL Index provides access to MemWAL configuration and state.
334#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
335pub struct MemWalIndex {
336    pub details: MemWalIndexDetails,
337}
338
339impl MemWalIndex {
340    pub fn new(details: MemWalIndexDetails) -> Self {
341        Self { details }
342    }
343
344    pub fn merged_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
345        self.details
346            .merged_generations
347            .iter()
348            .find(|mg| &mg.shard_id == shard_id)
349            .map(|mg| mg.generation)
350    }
351
352    /// Get the caught up generation for a specific index and shard.
353    /// Returns None if the index is not tracked (assumed fully caught up).
354    pub fn index_caught_up_generation(&self, index_name: &str, shard_id: &Uuid) -> Option<u64> {
355        self.details
356            .index_catchup
357            .iter()
358            .find(|icp| icp.index_name == index_name)
359            .and_then(|icp| icp.caught_up_generation_for_shard(shard_id))
360    }
361
362    /// Check if an index is fully caught up for a shard.
363    /// Returns true if the index covers all merged data for the shard.
364    pub fn is_index_caught_up(&self, index_name: &str, shard_id: &Uuid) -> bool {
365        let merged_gen = self.merged_generation_for_shard(shard_id).unwrap_or(0);
366        let caught_up_gen = self.index_caught_up_generation(index_name, shard_id);
367
368        // If not tracked in index_catchup, assumed fully caught up
369        caught_up_gen.is_none_or(|generation| generation >= merged_gen)
370    }
371}
372
373#[derive(Serialize)]
374struct MemWalStatistics {
375    num_shards: u32,
376    num_merged_generations: usize,
377    num_shard_specs: usize,
378    num_maintained_indexes: usize,
379    num_index_catchup_entries: usize,
380}
381
382#[async_trait]
383impl Index for MemWalIndex {
384    fn as_any(&self) -> &dyn Any {
385        self
386    }
387
388    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
389        self
390    }
391
392    fn as_vector_index(self: Arc<Self>) -> lance_core::Result<Arc<dyn crate::vector::VectorIndex>> {
393        Err(Error::not_supported_source(
394            "MemWalIndex is not a vector index".into(),
395        ))
396    }
397
398    fn statistics(&self) -> lance_core::Result<serde_json::Value> {
399        let stats = MemWalStatistics {
400            num_shards: self.details.num_shards,
401            num_merged_generations: self.details.merged_generations.len(),
402            num_shard_specs: self.details.shard_specs.len(),
403            num_maintained_indexes: self.details.maintained_indexes.len(),
404            num_index_catchup_entries: self.details.index_catchup.len(),
405        };
406        serde_json::to_value(stats).map_err(|e| {
407            Error::internal(format!(
408                "failed to serialize MemWAL index statistics: {}",
409                e
410            ))
411        })
412    }
413
414    async fn prewarm(&self) -> lance_core::Result<()> {
415        Ok(())
416    }
417
418    fn index_type(&self) -> IndexType {
419        IndexType::MemWal
420    }
421
422    async fn calculate_included_frags(&self) -> lance_core::Result<RoaringBitmap> {
423        Ok(RoaringBitmap::new())
424    }
425}