Skip to main content

lance_table/system_index/
mem_wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5
6use lance_core::Error;
7use lance_core::deepsize::DeepSizeOf;
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11use crate::format::pb;
12
13pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
14
15/// Type alias for shard identifier (UUID v4).
16pub type ShardId = Uuid;
17
18/// A flushed MemTable generation and its storage location.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
20pub struct FlushedGeneration {
21    pub generation: u64,
22    pub path: String,
23}
24
25impl From<&FlushedGeneration> for pb::FlushedGeneration {
26    fn from(fg: &FlushedGeneration) -> Self {
27        Self {
28            generation: fg.generation,
29            path: fg.path.clone(),
30        }
31    }
32}
33
34impl From<pb::FlushedGeneration> for FlushedGeneration {
35    fn from(fg: pb::FlushedGeneration) -> Self {
36        Self {
37            generation: fg.generation,
38            path: fg.path,
39        }
40    }
41}
42
43/// A shard's merged generation, used in MemWalIndexDetails.
44#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
45pub struct MergedGeneration {
46    pub shard_id: Uuid,
47    pub generation: u64,
48}
49
50impl DeepSizeOf for MergedGeneration {
51    fn deep_size_of_children(&self, _context: &mut lance_core::deepsize::Context) -> usize {
52        0 // UUID is 16 bytes fixed size, no heap allocations
53    }
54}
55
56impl MergedGeneration {
57    pub fn new(shard_id: Uuid, generation: u64) -> Self {
58        Self {
59            shard_id,
60            generation,
61        }
62    }
63}
64
65impl From<&MergedGeneration> for pb::MergedGeneration {
66    fn from(mg: &MergedGeneration) -> Self {
67        Self {
68            shard_id: Some((&mg.shard_id).into()),
69            generation: mg.generation,
70        }
71    }
72}
73
74impl TryFrom<pb::MergedGeneration> for MergedGeneration {
75    type Error = Error;
76
77    fn try_from(mg: pb::MergedGeneration) -> lance_core::Result<Self> {
78        let shard_id = mg
79            .shard_id
80            .as_ref()
81            .map(Uuid::try_from)
82            .ok_or_else(|| Error::invalid_input("Missing shard_id in MergedGeneration"))??;
83        Ok(Self {
84            shard_id,
85            generation: mg.generation,
86        })
87    }
88}
89
90/// Tracks which merged generation a base table index has been rebuilt to cover.
91/// Used to determine whether to read from flushed MemTable indexes or base table.
92#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
93pub struct IndexCatchupProgress {
94    pub index_name: String,
95    pub caught_up_generations: Vec<MergedGeneration>,
96}
97
98impl IndexCatchupProgress {
99    pub fn new(index_name: String, caught_up_generations: Vec<MergedGeneration>) -> Self {
100        Self {
101            index_name,
102            caught_up_generations,
103        }
104    }
105
106    /// Get the caught up generation for a specific shard.
107    /// Returns None if the shard is not present (assumed fully caught up).
108    pub fn caught_up_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
109        self.caught_up_generations
110            .iter()
111            .find(|mg| &mg.shard_id == shard_id)
112            .map(|mg| mg.generation)
113    }
114}
115
116impl From<&IndexCatchupProgress> for pb::IndexCatchupProgress {
117    fn from(icp: &IndexCatchupProgress) -> Self {
118        Self {
119            index_name: icp.index_name.clone(),
120            caught_up_generations: icp
121                .caught_up_generations
122                .iter()
123                .map(|mg| mg.into())
124                .collect(),
125        }
126    }
127}
128
129impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
130    type Error = Error;
131
132    fn try_from(icp: pb::IndexCatchupProgress) -> lance_core::Result<Self> {
133        Ok(Self {
134            index_name: icp.index_name,
135            caught_up_generations: icp
136                .caught_up_generations
137                .into_iter()
138                .map(MergedGeneration::try_from)
139                .collect::<lance_core::Result<_>>()?,
140        })
141    }
142}
143
144/// Shard manifest containing epoch-based fencing and WAL state.
145/// Each shard has exactly one active writer at any time.
146#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
147pub struct ShardManifest {
148    pub shard_id: Uuid,
149    pub version: u64,
150    pub shard_spec_id: u32,
151    /// Computed shard field values as raw Arrow scalar bytes, keyed by field id.
152    /// The byte encoding follows Arrow's little-endian convention: int32 is 4 LE
153    /// bytes, utf8 is raw UTF-8 bytes, etc. The result_type in the corresponding
154    /// ShardingField from the ShardingSpec determines how to interpret each value.
155    pub shard_field_values: HashMap<String, Vec<u8>>,
156    pub writer_epoch: u64,
157    /// The most recent WAL entry position flushed to a MemTable.
158    /// Recovery replays from `replay_after_wal_entry_position + 1`. The
159    /// default value 0 means "no flush has ever stamped this shard" — WAL
160    /// positions themselves are 1-based, so 0 is never a valid covered
161    /// position.
162    pub replay_after_wal_entry_position: u64,
163    /// The most recent WAL entry position observed at manifest write time.
164    /// Default 0 means "no entry has been written yet"; WAL positions are
165    /// 1-based.
166    pub wal_entry_position_last_seen: u64,
167    pub current_generation: u64,
168    pub flushed_generations: Vec<FlushedGeneration>,
169}
170
171impl DeepSizeOf for ShardManifest {
172    fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
173        self.shard_field_values.deep_size_of_children(context)
174            + self.flushed_generations.deep_size_of_children(context)
175    }
176}
177
178impl From<&ShardManifest> for pb::ShardManifest {
179    fn from(rm: &ShardManifest) -> Self {
180        Self {
181            shard_id: Some((&rm.shard_id).into()),
182            version: rm.version,
183            shard_spec_id: rm.shard_spec_id,
184            shard_field_entries: rm
185                .shard_field_values
186                .iter()
187                .map(|(k, v)| pb::ShardFieldEntry {
188                    field_id: k.clone(),
189                    value: v.clone(),
190                })
191                .collect(),
192            writer_epoch: rm.writer_epoch,
193            replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
194            wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
195            current_generation: rm.current_generation,
196            flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
197        }
198    }
199}
200
201impl TryFrom<pb::ShardManifest> for ShardManifest {
202    type Error = Error;
203
204    fn try_from(rm: pb::ShardManifest) -> lance_core::Result<Self> {
205        let shard_id = rm
206            .shard_id
207            .as_ref()
208            .map(Uuid::try_from)
209            .ok_or_else(|| Error::invalid_input("Missing shard_id in ShardManifest"))??;
210        let shard_field_values = rm
211            .shard_field_entries
212            .into_iter()
213            .map(|e| (e.field_id, e.value))
214            .collect();
215        Ok(Self {
216            shard_id,
217            version: rm.version,
218            shard_spec_id: rm.shard_spec_id,
219            shard_field_values,
220            writer_epoch: rm.writer_epoch,
221            replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
222            wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
223            current_generation: rm.current_generation,
224            flushed_generations: rm
225                .flushed_generations
226                .into_iter()
227                .map(FlushedGeneration::from)
228                .collect(),
229        })
230    }
231}
232
233/// Sharding field definition.
234#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
235pub struct ShardingField {
236    pub field_id: String,
237    pub source_ids: Vec<i32>,
238    pub transform: Option<String>,
239    pub expression: Option<String>,
240    pub result_type: String,
241    pub parameters: HashMap<String, String>,
242}
243
244impl From<&ShardingField> for pb::ShardingField {
245    fn from(rf: &ShardingField) -> Self {
246        Self {
247            field_id: rf.field_id.clone(),
248            source_ids: rf.source_ids.clone(),
249            transform: rf.transform.clone(),
250            expression: rf.expression.clone(),
251            result_type: rf.result_type.clone(),
252            parameters: rf.parameters.clone(),
253        }
254    }
255}
256
257impl From<pb::ShardingField> for ShardingField {
258    fn from(rf: pb::ShardingField) -> Self {
259        Self {
260            field_id: rf.field_id,
261            source_ids: rf.source_ids,
262            transform: rf.transform,
263            expression: rf.expression,
264            result_type: rf.result_type,
265            parameters: rf.parameters,
266        }
267    }
268}
269
270/// Sharding spec definition.
271#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
272pub struct ShardingSpec {
273    pub spec_id: u32,
274    pub fields: Vec<ShardingField>,
275}
276
277impl From<&ShardingSpec> for pb::ShardingSpec {
278    fn from(rs: &ShardingSpec) -> Self {
279        Self {
280            spec_id: rs.spec_id,
281            fields: rs.fields.iter().map(|f| f.into()).collect(),
282        }
283    }
284}
285
286impl From<pb::ShardingSpec> for ShardingSpec {
287    fn from(rs: pb::ShardingSpec) -> Self {
288        Self {
289            spec_id: rs.spec_id,
290            fields: rs.fields.into_iter().map(ShardingField::from).collect(),
291        }
292    }
293}
294
295/// Index details for MemWAL Index, stored in IndexMetadata.index_details.
296#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
297pub struct MemWalIndexDetails {
298    pub snapshot_ts_millis: i64,
299    pub num_shards: u32,
300    pub inline_snapshots: Option<Vec<u8>>,
301    pub sharding_specs: Vec<ShardingSpec>,
302    pub maintained_indexes: Vec<String>,
303    pub merged_generations: Vec<MergedGeneration>,
304    pub index_catchup: Vec<IndexCatchupProgress>,
305    /// Default `ShardWriter` configuration values for this MemWAL index.
306    ///
307    /// Persisted so every writer — across processes and restarts — starts
308    /// from the same default writer configuration. These are defaults only;
309    /// an individual writer may still override any value at runtime in its
310    /// own (non-persisted) `ShardWriterConfig`.
311    pub writer_config_defaults: HashMap<String, String>,
312}
313
314impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
315    fn from(details: &MemWalIndexDetails) -> Self {
316        Self {
317            snapshot_ts_millis: details.snapshot_ts_millis,
318            num_shards: details.num_shards,
319            inline_snapshots: details.inline_snapshots.clone(),
320            sharding_specs: details.sharding_specs.iter().map(|rs| rs.into()).collect(),
321            maintained_indexes: details.maintained_indexes.clone(),
322            merged_generations: details
323                .merged_generations
324                .iter()
325                .map(|mg| mg.into())
326                .collect(),
327            index_catchup: details.index_catchup.iter().map(|icp| icp.into()).collect(),
328            writer_config_defaults: details.writer_config_defaults.clone(),
329        }
330    }
331}
332
333impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
334    type Error = Error;
335
336    fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
337        Ok(Self {
338            snapshot_ts_millis: details.snapshot_ts_millis,
339            num_shards: details.num_shards,
340            inline_snapshots: details.inline_snapshots,
341            sharding_specs: details
342                .sharding_specs
343                .into_iter()
344                .map(ShardingSpec::from)
345                .collect(),
346            maintained_indexes: details.maintained_indexes,
347            merged_generations: details
348                .merged_generations
349                .into_iter()
350                .map(MergedGeneration::try_from)
351                .collect::<lance_core::Result<_>>()?,
352            index_catchup: details
353                .index_catchup
354                .into_iter()
355                .map(IndexCatchupProgress::try_from)
356                .collect::<lance_core::Result<_>>()?,
357            writer_config_defaults: details.writer_config_defaults,
358        })
359    }
360}
361
362/// MemWAL Index provides access to MemWAL configuration and state.
363#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
364pub struct MemWalIndex {
365    pub details: MemWalIndexDetails,
366}
367
368impl MemWalIndex {
369    pub fn new(details: MemWalIndexDetails) -> Self {
370        Self { details }
371    }
372
373    pub fn merged_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
374        self.details
375            .merged_generations
376            .iter()
377            .find(|mg| &mg.shard_id == shard_id)
378            .map(|mg| mg.generation)
379    }
380
381    /// Get the caught up generation for a specific index and shard.
382    /// Returns None if the index is not tracked (assumed fully caught up).
383    pub fn index_caught_up_generation(&self, index_name: &str, shard_id: &Uuid) -> Option<u64> {
384        self.details
385            .index_catchup
386            .iter()
387            .find(|icp| icp.index_name == index_name)
388            .and_then(|icp| icp.caught_up_generation_for_shard(shard_id))
389    }
390
391    /// Check if an index is fully caught up for a shard.
392    /// Returns true if the index covers all merged data for the shard.
393    pub fn is_index_caught_up(&self, index_name: &str, shard_id: &Uuid) -> bool {
394        let merged_gen = self.merged_generation_for_shard(shard_id).unwrap_or(0);
395        let caught_up_gen = self.index_caught_up_generation(index_name, shard_id);
396
397        // If not tracked in index_catchup, assumed fully caught up
398        caught_up_gen.is_none_or(|generation| generation >= merged_gen)
399    }
400}