1use std::any::Any;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use deepsize::DeepSizeOf;
10use lance_core::Error;
11use lance_table::format::pb;
12use roaring::RoaringBitmap;
13use serde::{Deserialize, Serialize};
14use snafu::location;
15use uuid::Uuid;
16
17use crate::{Index, IndexType};
18
19pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
20
21pub type RegionId = Uuid;
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
26pub struct FlushedGeneration {
27 pub generation: u64,
28 pub path: String,
29}
30
31impl From<&FlushedGeneration> for pb::FlushedGeneration {
32 fn from(fg: &FlushedGeneration) -> Self {
33 Self {
34 generation: fg.generation,
35 path: fg.path.clone(),
36 }
37 }
38}
39
40impl From<pb::FlushedGeneration> for FlushedGeneration {
41 fn from(fg: pb::FlushedGeneration) -> Self {
42 Self {
43 generation: fg.generation,
44 path: fg.path,
45 }
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
51pub struct MergedGeneration {
52 pub region_id: Uuid,
53 pub generation: u64,
54}
55
56impl DeepSizeOf for MergedGeneration {
57 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
58 0 }
60}
61
62impl MergedGeneration {
63 pub fn new(region_id: Uuid, generation: u64) -> Self {
64 Self {
65 region_id,
66 generation,
67 }
68 }
69}
70
71impl From<&MergedGeneration> for pb::MergedGeneration {
72 fn from(mg: &MergedGeneration) -> Self {
73 Self {
74 region_id: Some((&mg.region_id).into()),
75 generation: mg.generation,
76 }
77 }
78}
79
80impl TryFrom<pb::MergedGeneration> for MergedGeneration {
81 type Error = Error;
82
83 fn try_from(mg: pb::MergedGeneration) -> lance_core::Result<Self> {
84 let region_id = mg.region_id.as_ref().map(Uuid::try_from).ok_or_else(|| {
85 Error::invalid_input("Missing region_id in MergedGeneration", location!())
86 })??;
87 Ok(Self {
88 region_id,
89 generation: mg.generation,
90 })
91 }
92}
93
94#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
97pub struct IndexCatchupProgress {
98 pub index_name: String,
99 pub caught_up_generations: Vec<MergedGeneration>,
100}
101
102impl IndexCatchupProgress {
103 pub fn new(index_name: String, caught_up_generations: Vec<MergedGeneration>) -> Self {
104 Self {
105 index_name,
106 caught_up_generations,
107 }
108 }
109
110 pub fn caught_up_generation_for_region(&self, region_id: &Uuid) -> Option<u64> {
113 self.caught_up_generations
114 .iter()
115 .find(|mg| &mg.region_id == region_id)
116 .map(|mg| mg.generation)
117 }
118}
119
120impl From<&IndexCatchupProgress> for pb::IndexCatchupProgress {
121 fn from(icp: &IndexCatchupProgress) -> Self {
122 Self {
123 index_name: icp.index_name.clone(),
124 caught_up_generations: icp
125 .caught_up_generations
126 .iter()
127 .map(|mg| mg.into())
128 .collect(),
129 }
130 }
131}
132
133impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
134 type Error = Error;
135
136 fn try_from(icp: pb::IndexCatchupProgress) -> lance_core::Result<Self> {
137 Ok(Self {
138 index_name: icp.index_name,
139 caught_up_generations: icp
140 .caught_up_generations
141 .into_iter()
142 .map(MergedGeneration::try_from)
143 .collect::<lance_core::Result<_>>()?,
144 })
145 }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
151pub struct RegionManifest {
152 pub region_id: Uuid,
153 pub version: u64,
154 pub region_spec_id: u32,
155 pub writer_epoch: u64,
156 pub replay_after_wal_id: u64,
157 pub wal_id_last_seen: u64,
158 pub current_generation: u64,
159 pub flushed_generations: Vec<FlushedGeneration>,
160}
161
162impl DeepSizeOf for RegionManifest {
163 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
164 self.flushed_generations.deep_size_of_children(context)
165 }
166}
167
168impl From<&RegionManifest> for pb::RegionManifest {
169 fn from(rm: &RegionManifest) -> Self {
170 Self {
171 region_id: Some((&rm.region_id).into()),
172 version: rm.version,
173 region_spec_id: rm.region_spec_id,
174 writer_epoch: rm.writer_epoch,
175 replay_after_wal_id: rm.replay_after_wal_id,
176 wal_id_last_seen: rm.wal_id_last_seen,
177 current_generation: rm.current_generation,
178 flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
179 }
180 }
181}
182
183impl TryFrom<pb::RegionManifest> for RegionManifest {
184 type Error = Error;
185
186 fn try_from(rm: pb::RegionManifest) -> lance_core::Result<Self> {
187 let region_id = rm.region_id.as_ref().map(Uuid::try_from).ok_or_else(|| {
188 Error::invalid_input("Missing region_id in RegionManifest", location!())
189 })??;
190 Ok(Self {
191 region_id,
192 version: rm.version,
193 region_spec_id: rm.region_spec_id,
194 writer_epoch: rm.writer_epoch,
195 replay_after_wal_id: rm.replay_after_wal_id,
196 wal_id_last_seen: rm.wal_id_last_seen,
197 current_generation: rm.current_generation,
198 flushed_generations: rm
199 .flushed_generations
200 .into_iter()
201 .map(FlushedGeneration::from)
202 .collect(),
203 })
204 }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
209pub struct RegionField {
210 pub field_id: String,
211 pub source_ids: Vec<i32>,
212 pub transform: Option<String>,
213 pub expression: Option<String>,
214 pub result_type: String,
215 pub parameters: HashMap<String, String>,
216}
217
218impl From<&RegionField> for pb::RegionField {
219 fn from(rf: &RegionField) -> Self {
220 Self {
221 field_id: rf.field_id.clone(),
222 source_ids: rf.source_ids.clone(),
223 transform: rf.transform.clone(),
224 expression: rf.expression.clone(),
225 result_type: rf.result_type.clone(),
226 parameters: rf.parameters.clone(),
227 }
228 }
229}
230
231impl From<pb::RegionField> for RegionField {
232 fn from(rf: pb::RegionField) -> Self {
233 Self {
234 field_id: rf.field_id,
235 source_ids: rf.source_ids,
236 transform: rf.transform,
237 expression: rf.expression,
238 result_type: rf.result_type,
239 parameters: rf.parameters,
240 }
241 }
242}
243
244#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
246pub struct RegionSpec {
247 pub spec_id: u32,
248 pub fields: Vec<RegionField>,
249}
250
251impl From<&RegionSpec> for pb::RegionSpec {
252 fn from(rs: &RegionSpec) -> Self {
253 Self {
254 spec_id: rs.spec_id,
255 fields: rs.fields.iter().map(|f| f.into()).collect(),
256 }
257 }
258}
259
260impl From<pb::RegionSpec> for RegionSpec {
261 fn from(rs: pb::RegionSpec) -> Self {
262 Self {
263 spec_id: rs.spec_id,
264 fields: rs.fields.into_iter().map(RegionField::from).collect(),
265 }
266 }
267}
268
269#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
271pub struct MemWalIndexDetails {
272 pub snapshot_ts_millis: i64,
273 pub num_regions: u32,
274 pub inline_snapshots: Option<Vec<u8>>,
275 pub region_specs: Vec<RegionSpec>,
276 pub maintained_indexes: Vec<String>,
277 pub merged_generations: Vec<MergedGeneration>,
278 pub index_catchup: Vec<IndexCatchupProgress>,
279}
280
281impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
282 fn from(details: &MemWalIndexDetails) -> Self {
283 Self {
284 snapshot_ts_millis: details.snapshot_ts_millis,
285 num_regions: details.num_regions,
286 inline_snapshots: details.inline_snapshots.clone(),
287 region_specs: details.region_specs.iter().map(|rs| rs.into()).collect(),
288 maintained_indexes: details.maintained_indexes.clone(),
289 merged_generations: details
290 .merged_generations
291 .iter()
292 .map(|mg| mg.into())
293 .collect(),
294 index_catchup: details.index_catchup.iter().map(|icp| icp.into()).collect(),
295 }
296 }
297}
298
299impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
300 type Error = Error;
301
302 fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
303 Ok(Self {
304 snapshot_ts_millis: details.snapshot_ts_millis,
305 num_regions: details.num_regions,
306 inline_snapshots: details.inline_snapshots,
307 region_specs: details
308 .region_specs
309 .into_iter()
310 .map(RegionSpec::from)
311 .collect(),
312 maintained_indexes: details.maintained_indexes,
313 merged_generations: details
314 .merged_generations
315 .into_iter()
316 .map(MergedGeneration::try_from)
317 .collect::<lance_core::Result<_>>()?,
318 index_catchup: details
319 .index_catchup
320 .into_iter()
321 .map(IndexCatchupProgress::try_from)
322 .collect::<lance_core::Result<_>>()?,
323 })
324 }
325}
326
327#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
329pub struct MemWalIndex {
330 pub details: MemWalIndexDetails,
331}
332
333impl MemWalIndex {
334 pub fn new(details: MemWalIndexDetails) -> Self {
335 Self { details }
336 }
337
338 pub fn merged_generation_for_region(&self, region_id: &Uuid) -> Option<u64> {
339 self.details
340 .merged_generations
341 .iter()
342 .find(|mg| &mg.region_id == region_id)
343 .map(|mg| mg.generation)
344 }
345
346 pub fn index_caught_up_generation(&self, index_name: &str, region_id: &Uuid) -> Option<u64> {
349 self.details
350 .index_catchup
351 .iter()
352 .find(|icp| icp.index_name == index_name)
353 .and_then(|icp| icp.caught_up_generation_for_region(region_id))
354 }
355
356 pub fn is_index_caught_up(&self, index_name: &str, region_id: &Uuid) -> bool {
359 let merged_gen = self.merged_generation_for_region(region_id).unwrap_or(0);
360 let caught_up_gen = self.index_caught_up_generation(index_name, region_id);
361
362 caught_up_gen.is_none_or(|gen| gen >= merged_gen)
364 }
365}
366
367#[derive(Serialize)]
368struct MemWalStatistics {
369 num_regions: u32,
370 num_merged_generations: usize,
371 num_region_specs: usize,
372 num_maintained_indexes: usize,
373 num_index_catchup_entries: usize,
374}
375
376#[async_trait]
377impl Index for MemWalIndex {
378 fn as_any(&self) -> &dyn Any {
379 self
380 }
381
382 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
383 self
384 }
385
386 fn as_vector_index(self: Arc<Self>) -> lance_core::Result<Arc<dyn crate::vector::VectorIndex>> {
387 Err(Error::NotSupported {
388 source: "MemWalIndex is not a vector index".into(),
389 location: location!(),
390 })
391 }
392
393 fn statistics(&self) -> lance_core::Result<serde_json::Value> {
394 let stats = MemWalStatistics {
395 num_regions: self.details.num_regions,
396 num_merged_generations: self.details.merged_generations.len(),
397 num_region_specs: self.details.region_specs.len(),
398 num_maintained_indexes: self.details.maintained_indexes.len(),
399 num_index_catchup_entries: self.details.index_catchup.len(),
400 };
401 serde_json::to_value(stats).map_err(|e| Error::Internal {
402 message: format!("failed to serialize MemWAL index statistics: {}", e),
403 location: location!(),
404 })
405 }
406
407 async fn prewarm(&self) -> lance_core::Result<()> {
408 Ok(())
409 }
410
411 fn index_type(&self) -> IndexType {
412 IndexType::MemWal
413 }
414
415 async fn calculate_included_frags(&self) -> lance_core::Result<RoaringBitmap> {
416 Ok(RoaringBitmap::new())
417 }
418}