Skip to main content

lance_index/
mem_wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::any::Any;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use deepsize::DeepSizeOf;
10use lance_core::Error;
11use lance_table::format::pb;
12use roaring::RoaringBitmap;
13use serde::{Deserialize, Serialize};
14use snafu::location;
15use uuid::Uuid;
16
17use crate::{Index, IndexType};
18
19pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
20
21/// Type alias for region identifier (UUID v4).
22pub type RegionId = Uuid;
23
24/// A flushed MemTable generation and its storage location.
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
26pub struct FlushedGeneration {
27    pub generation: u64,
28    pub path: String,
29}
30
31impl From<&FlushedGeneration> for pb::FlushedGeneration {
32    fn from(fg: &FlushedGeneration) -> Self {
33        Self {
34            generation: fg.generation,
35            path: fg.path.clone(),
36        }
37    }
38}
39
40impl From<pb::FlushedGeneration> for FlushedGeneration {
41    fn from(fg: pb::FlushedGeneration) -> Self {
42        Self {
43            generation: fg.generation,
44            path: fg.path,
45        }
46    }
47}
48
49/// A region's merged generation, used in MemWalIndexDetails.
50#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
51pub struct MergedGeneration {
52    pub region_id: Uuid,
53    pub generation: u64,
54}
55
56impl DeepSizeOf for MergedGeneration {
57    fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
58        0 // UUID is 16 bytes fixed size, no heap allocations
59    }
60}
61
62impl MergedGeneration {
63    pub fn new(region_id: Uuid, generation: u64) -> Self {
64        Self {
65            region_id,
66            generation,
67        }
68    }
69}
70
71impl From<&MergedGeneration> for pb::MergedGeneration {
72    fn from(mg: &MergedGeneration) -> Self {
73        Self {
74            region_id: Some((&mg.region_id).into()),
75            generation: mg.generation,
76        }
77    }
78}
79
80impl TryFrom<pb::MergedGeneration> for MergedGeneration {
81    type Error = Error;
82
83    fn try_from(mg: pb::MergedGeneration) -> lance_core::Result<Self> {
84        let region_id = mg.region_id.as_ref().map(Uuid::try_from).ok_or_else(|| {
85            Error::invalid_input("Missing region_id in MergedGeneration", location!())
86        })??;
87        Ok(Self {
88            region_id,
89            generation: mg.generation,
90        })
91    }
92}
93
94/// Tracks which merged generation a base table index has been rebuilt to cover.
95/// Used to determine whether to read from flushed MemTable indexes or base table.
96#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
97pub struct IndexCatchupProgress {
98    pub index_name: String,
99    pub caught_up_generations: Vec<MergedGeneration>,
100}
101
102impl IndexCatchupProgress {
103    pub fn new(index_name: String, caught_up_generations: Vec<MergedGeneration>) -> Self {
104        Self {
105            index_name,
106            caught_up_generations,
107        }
108    }
109
110    /// Get the caught up generation for a specific region.
111    /// Returns None if the region is not present (assumed fully caught up).
112    pub fn caught_up_generation_for_region(&self, region_id: &Uuid) -> Option<u64> {
113        self.caught_up_generations
114            .iter()
115            .find(|mg| &mg.region_id == region_id)
116            .map(|mg| mg.generation)
117    }
118}
119
120impl From<&IndexCatchupProgress> for pb::IndexCatchupProgress {
121    fn from(icp: &IndexCatchupProgress) -> Self {
122        Self {
123            index_name: icp.index_name.clone(),
124            caught_up_generations: icp
125                .caught_up_generations
126                .iter()
127                .map(|mg| mg.into())
128                .collect(),
129        }
130    }
131}
132
133impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
134    type Error = Error;
135
136    fn try_from(icp: pb::IndexCatchupProgress) -> lance_core::Result<Self> {
137        Ok(Self {
138            index_name: icp.index_name,
139            caught_up_generations: icp
140                .caught_up_generations
141                .into_iter()
142                .map(MergedGeneration::try_from)
143                .collect::<lance_core::Result<_>>()?,
144        })
145    }
146}
147
148/// Region manifest containing epoch-based fencing and WAL state.
149/// Each region has exactly one active writer at any time.
150#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
151pub struct RegionManifest {
152    pub region_id: Uuid,
153    pub version: u64,
154    pub region_spec_id: u32,
155    pub writer_epoch: u64,
156    pub replay_after_wal_id: u64,
157    pub wal_id_last_seen: u64,
158    pub current_generation: u64,
159    pub flushed_generations: Vec<FlushedGeneration>,
160}
161
162impl DeepSizeOf for RegionManifest {
163    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
164        self.flushed_generations.deep_size_of_children(context)
165    }
166}
167
168impl From<&RegionManifest> for pb::RegionManifest {
169    fn from(rm: &RegionManifest) -> Self {
170        Self {
171            region_id: Some((&rm.region_id).into()),
172            version: rm.version,
173            region_spec_id: rm.region_spec_id,
174            writer_epoch: rm.writer_epoch,
175            replay_after_wal_id: rm.replay_after_wal_id,
176            wal_id_last_seen: rm.wal_id_last_seen,
177            current_generation: rm.current_generation,
178            flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
179        }
180    }
181}
182
183impl TryFrom<pb::RegionManifest> for RegionManifest {
184    type Error = Error;
185
186    fn try_from(rm: pb::RegionManifest) -> lance_core::Result<Self> {
187        let region_id = rm.region_id.as_ref().map(Uuid::try_from).ok_or_else(|| {
188            Error::invalid_input("Missing region_id in RegionManifest", location!())
189        })??;
190        Ok(Self {
191            region_id,
192            version: rm.version,
193            region_spec_id: rm.region_spec_id,
194            writer_epoch: rm.writer_epoch,
195            replay_after_wal_id: rm.replay_after_wal_id,
196            wal_id_last_seen: rm.wal_id_last_seen,
197            current_generation: rm.current_generation,
198            flushed_generations: rm
199                .flushed_generations
200                .into_iter()
201                .map(FlushedGeneration::from)
202                .collect(),
203        })
204    }
205}
206
207/// Region field definition.
208#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
209pub struct RegionField {
210    pub field_id: String,
211    pub source_ids: Vec<i32>,
212    pub transform: Option<String>,
213    pub expression: Option<String>,
214    pub result_type: String,
215    pub parameters: HashMap<String, String>,
216}
217
218impl From<&RegionField> for pb::RegionField {
219    fn from(rf: &RegionField) -> Self {
220        Self {
221            field_id: rf.field_id.clone(),
222            source_ids: rf.source_ids.clone(),
223            transform: rf.transform.clone(),
224            expression: rf.expression.clone(),
225            result_type: rf.result_type.clone(),
226            parameters: rf.parameters.clone(),
227        }
228    }
229}
230
231impl From<pb::RegionField> for RegionField {
232    fn from(rf: pb::RegionField) -> Self {
233        Self {
234            field_id: rf.field_id,
235            source_ids: rf.source_ids,
236            transform: rf.transform,
237            expression: rf.expression,
238            result_type: rf.result_type,
239            parameters: rf.parameters,
240        }
241    }
242}
243
244/// Region spec definition.
245#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
246pub struct RegionSpec {
247    pub spec_id: u32,
248    pub fields: Vec<RegionField>,
249}
250
251impl From<&RegionSpec> for pb::RegionSpec {
252    fn from(rs: &RegionSpec) -> Self {
253        Self {
254            spec_id: rs.spec_id,
255            fields: rs.fields.iter().map(|f| f.into()).collect(),
256        }
257    }
258}
259
260impl From<pb::RegionSpec> for RegionSpec {
261    fn from(rs: pb::RegionSpec) -> Self {
262        Self {
263            spec_id: rs.spec_id,
264            fields: rs.fields.into_iter().map(RegionField::from).collect(),
265        }
266    }
267}
268
269/// Index details for MemWAL Index, stored in IndexMetadata.index_details.
270#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
271pub struct MemWalIndexDetails {
272    pub snapshot_ts_millis: i64,
273    pub num_regions: u32,
274    pub inline_snapshots: Option<Vec<u8>>,
275    pub region_specs: Vec<RegionSpec>,
276    pub maintained_indexes: Vec<String>,
277    pub merged_generations: Vec<MergedGeneration>,
278    pub index_catchup: Vec<IndexCatchupProgress>,
279}
280
281impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
282    fn from(details: &MemWalIndexDetails) -> Self {
283        Self {
284            snapshot_ts_millis: details.snapshot_ts_millis,
285            num_regions: details.num_regions,
286            inline_snapshots: details.inline_snapshots.clone(),
287            region_specs: details.region_specs.iter().map(|rs| rs.into()).collect(),
288            maintained_indexes: details.maintained_indexes.clone(),
289            merged_generations: details
290                .merged_generations
291                .iter()
292                .map(|mg| mg.into())
293                .collect(),
294            index_catchup: details.index_catchup.iter().map(|icp| icp.into()).collect(),
295        }
296    }
297}
298
299impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
300    type Error = Error;
301
302    fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
303        Ok(Self {
304            snapshot_ts_millis: details.snapshot_ts_millis,
305            num_regions: details.num_regions,
306            inline_snapshots: details.inline_snapshots,
307            region_specs: details
308                .region_specs
309                .into_iter()
310                .map(RegionSpec::from)
311                .collect(),
312            maintained_indexes: details.maintained_indexes,
313            merged_generations: details
314                .merged_generations
315                .into_iter()
316                .map(MergedGeneration::try_from)
317                .collect::<lance_core::Result<_>>()?,
318            index_catchup: details
319                .index_catchup
320                .into_iter()
321                .map(IndexCatchupProgress::try_from)
322                .collect::<lance_core::Result<_>>()?,
323        })
324    }
325}
326
327/// MemWAL Index provides access to MemWAL configuration and state.
328#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
329pub struct MemWalIndex {
330    pub details: MemWalIndexDetails,
331}
332
333impl MemWalIndex {
334    pub fn new(details: MemWalIndexDetails) -> Self {
335        Self { details }
336    }
337
338    pub fn merged_generation_for_region(&self, region_id: &Uuid) -> Option<u64> {
339        self.details
340            .merged_generations
341            .iter()
342            .find(|mg| &mg.region_id == region_id)
343            .map(|mg| mg.generation)
344    }
345
346    /// Get the caught up generation for a specific index and region.
347    /// Returns None if the index is not tracked (assumed fully caught up).
348    pub fn index_caught_up_generation(&self, index_name: &str, region_id: &Uuid) -> Option<u64> {
349        self.details
350            .index_catchup
351            .iter()
352            .find(|icp| icp.index_name == index_name)
353            .and_then(|icp| icp.caught_up_generation_for_region(region_id))
354    }
355
356    /// Check if an index is fully caught up for a region.
357    /// Returns true if the index covers all merged data for the region.
358    pub fn is_index_caught_up(&self, index_name: &str, region_id: &Uuid) -> bool {
359        let merged_gen = self.merged_generation_for_region(region_id).unwrap_or(0);
360        let caught_up_gen = self.index_caught_up_generation(index_name, region_id);
361
362        // If not tracked in index_catchup, assumed fully caught up
363        caught_up_gen.is_none_or(|gen| gen >= merged_gen)
364    }
365}
366
367#[derive(Serialize)]
368struct MemWalStatistics {
369    num_regions: u32,
370    num_merged_generations: usize,
371    num_region_specs: usize,
372    num_maintained_indexes: usize,
373    num_index_catchup_entries: usize,
374}
375
376#[async_trait]
377impl Index for MemWalIndex {
378    fn as_any(&self) -> &dyn Any {
379        self
380    }
381
382    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
383        self
384    }
385
386    fn as_vector_index(self: Arc<Self>) -> lance_core::Result<Arc<dyn crate::vector::VectorIndex>> {
387        Err(Error::NotSupported {
388            source: "MemWalIndex is not a vector index".into(),
389            location: location!(),
390        })
391    }
392
393    fn statistics(&self) -> lance_core::Result<serde_json::Value> {
394        let stats = MemWalStatistics {
395            num_regions: self.details.num_regions,
396            num_merged_generations: self.details.merged_generations.len(),
397            num_region_specs: self.details.region_specs.len(),
398            num_maintained_indexes: self.details.maintained_indexes.len(),
399            num_index_catchup_entries: self.details.index_catchup.len(),
400        };
401        serde_json::to_value(stats).map_err(|e| Error::Internal {
402            message: format!("failed to serialize MemWAL index statistics: {}", e),
403            location: location!(),
404        })
405    }
406
407    async fn prewarm(&self) -> lance_core::Result<()> {
408        Ok(())
409    }
410
411    fn index_type(&self) -> IndexType {
412        IndexType::MemWal
413    }
414
415    async fn calculate_included_frags(&self) -> lance_core::Result<RoaringBitmap> {
416        Ok(RoaringBitmap::new())
417    }
418}