1use std::collections::HashMap;
5
6use lance_core::Error;
7use lance_core::deepsize::DeepSizeOf;
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11use crate::format::pb;
12
13pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
14
15pub type ShardId = Uuid;
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
20pub struct FlushedGeneration {
21 pub generation: u64,
22 pub path: String,
23}
24
25impl From<&FlushedGeneration> for pb::FlushedGeneration {
26 fn from(fg: &FlushedGeneration) -> Self {
27 Self {
28 generation: fg.generation,
29 path: fg.path.clone(),
30 }
31 }
32}
33
34impl From<pb::FlushedGeneration> for FlushedGeneration {
35 fn from(fg: pb::FlushedGeneration) -> Self {
36 Self {
37 generation: fg.generation,
38 path: fg.path,
39 }
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
45pub struct MergedGeneration {
46 pub shard_id: Uuid,
47 pub generation: u64,
48}
49
50impl DeepSizeOf for MergedGeneration {
51 fn deep_size_of_children(&self, _context: &mut lance_core::deepsize::Context) -> usize {
52 0 }
54}
55
56impl MergedGeneration {
57 pub fn new(shard_id: Uuid, generation: u64) -> Self {
58 Self {
59 shard_id,
60 generation,
61 }
62 }
63}
64
65impl From<&MergedGeneration> for pb::MergedGeneration {
66 fn from(mg: &MergedGeneration) -> Self {
67 Self {
68 shard_id: Some((&mg.shard_id).into()),
69 generation: mg.generation,
70 }
71 }
72}
73
74impl TryFrom<pb::MergedGeneration> for MergedGeneration {
75 type Error = Error;
76
77 fn try_from(mg: pb::MergedGeneration) -> lance_core::Result<Self> {
78 let shard_id = mg
79 .shard_id
80 .as_ref()
81 .map(Uuid::try_from)
82 .ok_or_else(|| Error::invalid_input("Missing shard_id in MergedGeneration"))??;
83 Ok(Self {
84 shard_id,
85 generation: mg.generation,
86 })
87 }
88}
89
90#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
93pub struct IndexCatchupProgress {
94 pub index_name: String,
95 pub caught_up_generations: Vec<MergedGeneration>,
96}
97
98impl IndexCatchupProgress {
99 pub fn new(index_name: String, caught_up_generations: Vec<MergedGeneration>) -> Self {
100 Self {
101 index_name,
102 caught_up_generations,
103 }
104 }
105
106 pub fn caught_up_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
109 self.caught_up_generations
110 .iter()
111 .find(|mg| &mg.shard_id == shard_id)
112 .map(|mg| mg.generation)
113 }
114}
115
116impl From<&IndexCatchupProgress> for pb::IndexCatchupProgress {
117 fn from(icp: &IndexCatchupProgress) -> Self {
118 Self {
119 index_name: icp.index_name.clone(),
120 caught_up_generations: icp
121 .caught_up_generations
122 .iter()
123 .map(|mg| mg.into())
124 .collect(),
125 }
126 }
127}
128
129impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
130 type Error = Error;
131
132 fn try_from(icp: pb::IndexCatchupProgress) -> lance_core::Result<Self> {
133 Ok(Self {
134 index_name: icp.index_name,
135 caught_up_generations: icp
136 .caught_up_generations
137 .into_iter()
138 .map(MergedGeneration::try_from)
139 .collect::<lance_core::Result<_>>()?,
140 })
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
147pub struct ShardManifest {
148 pub shard_id: Uuid,
149 pub version: u64,
150 pub shard_spec_id: u32,
151 pub shard_field_values: HashMap<String, Vec<u8>>,
156 pub writer_epoch: u64,
157 pub replay_after_wal_entry_position: u64,
163 pub wal_entry_position_last_seen: u64,
167 pub current_generation: u64,
168 pub flushed_generations: Vec<FlushedGeneration>,
169}
170
171impl DeepSizeOf for ShardManifest {
172 fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
173 self.shard_field_values.deep_size_of_children(context)
174 + self.flushed_generations.deep_size_of_children(context)
175 }
176}
177
178impl From<&ShardManifest> for pb::ShardManifest {
179 fn from(rm: &ShardManifest) -> Self {
180 Self {
181 shard_id: Some((&rm.shard_id).into()),
182 version: rm.version,
183 shard_spec_id: rm.shard_spec_id,
184 shard_field_entries: rm
185 .shard_field_values
186 .iter()
187 .map(|(k, v)| pb::ShardFieldEntry {
188 field_id: k.clone(),
189 value: v.clone(),
190 })
191 .collect(),
192 writer_epoch: rm.writer_epoch,
193 replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
194 wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
195 current_generation: rm.current_generation,
196 flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
197 }
198 }
199}
200
201impl TryFrom<pb::ShardManifest> for ShardManifest {
202 type Error = Error;
203
204 fn try_from(rm: pb::ShardManifest) -> lance_core::Result<Self> {
205 let shard_id = rm
206 .shard_id
207 .as_ref()
208 .map(Uuid::try_from)
209 .ok_or_else(|| Error::invalid_input("Missing shard_id in ShardManifest"))??;
210 let shard_field_values = rm
211 .shard_field_entries
212 .into_iter()
213 .map(|e| (e.field_id, e.value))
214 .collect();
215 Ok(Self {
216 shard_id,
217 version: rm.version,
218 shard_spec_id: rm.shard_spec_id,
219 shard_field_values,
220 writer_epoch: rm.writer_epoch,
221 replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
222 wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
223 current_generation: rm.current_generation,
224 flushed_generations: rm
225 .flushed_generations
226 .into_iter()
227 .map(FlushedGeneration::from)
228 .collect(),
229 })
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
235pub struct ShardingField {
236 pub field_id: String,
237 pub source_ids: Vec<i32>,
238 pub transform: Option<String>,
239 pub expression: Option<String>,
240 pub result_type: String,
241 pub parameters: HashMap<String, String>,
242}
243
244impl From<&ShardingField> for pb::ShardingField {
245 fn from(rf: &ShardingField) -> Self {
246 Self {
247 field_id: rf.field_id.clone(),
248 source_ids: rf.source_ids.clone(),
249 transform: rf.transform.clone(),
250 expression: rf.expression.clone(),
251 result_type: rf.result_type.clone(),
252 parameters: rf.parameters.clone(),
253 }
254 }
255}
256
257impl From<pb::ShardingField> for ShardingField {
258 fn from(rf: pb::ShardingField) -> Self {
259 Self {
260 field_id: rf.field_id,
261 source_ids: rf.source_ids,
262 transform: rf.transform,
263 expression: rf.expression,
264 result_type: rf.result_type,
265 parameters: rf.parameters,
266 }
267 }
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
272pub struct ShardingSpec {
273 pub spec_id: u32,
274 pub fields: Vec<ShardingField>,
275}
276
277impl From<&ShardingSpec> for pb::ShardingSpec {
278 fn from(rs: &ShardingSpec) -> Self {
279 Self {
280 spec_id: rs.spec_id,
281 fields: rs.fields.iter().map(|f| f.into()).collect(),
282 }
283 }
284}
285
286impl From<pb::ShardingSpec> for ShardingSpec {
287 fn from(rs: pb::ShardingSpec) -> Self {
288 Self {
289 spec_id: rs.spec_id,
290 fields: rs.fields.into_iter().map(ShardingField::from).collect(),
291 }
292 }
293}
294
295#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
297pub struct MemWalIndexDetails {
298 pub snapshot_ts_millis: i64,
299 pub num_shards: u32,
300 pub inline_snapshots: Option<Vec<u8>>,
301 pub sharding_specs: Vec<ShardingSpec>,
302 pub maintained_indexes: Vec<String>,
303 pub merged_generations: Vec<MergedGeneration>,
304 pub index_catchup: Vec<IndexCatchupProgress>,
305 pub writer_config_defaults: HashMap<String, String>,
312}
313
314impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
315 fn from(details: &MemWalIndexDetails) -> Self {
316 Self {
317 snapshot_ts_millis: details.snapshot_ts_millis,
318 num_shards: details.num_shards,
319 inline_snapshots: details.inline_snapshots.clone(),
320 sharding_specs: details.sharding_specs.iter().map(|rs| rs.into()).collect(),
321 maintained_indexes: details.maintained_indexes.clone(),
322 merged_generations: details
323 .merged_generations
324 .iter()
325 .map(|mg| mg.into())
326 .collect(),
327 index_catchup: details.index_catchup.iter().map(|icp| icp.into()).collect(),
328 writer_config_defaults: details.writer_config_defaults.clone(),
329 }
330 }
331}
332
333impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
334 type Error = Error;
335
336 fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
337 Ok(Self {
338 snapshot_ts_millis: details.snapshot_ts_millis,
339 num_shards: details.num_shards,
340 inline_snapshots: details.inline_snapshots,
341 sharding_specs: details
342 .sharding_specs
343 .into_iter()
344 .map(ShardingSpec::from)
345 .collect(),
346 maintained_indexes: details.maintained_indexes,
347 merged_generations: details
348 .merged_generations
349 .into_iter()
350 .map(MergedGeneration::try_from)
351 .collect::<lance_core::Result<_>>()?,
352 index_catchup: details
353 .index_catchup
354 .into_iter()
355 .map(IndexCatchupProgress::try_from)
356 .collect::<lance_core::Result<_>>()?,
357 writer_config_defaults: details.writer_config_defaults,
358 })
359 }
360}
361
362#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
364pub struct MemWalIndex {
365 pub details: MemWalIndexDetails,
366}
367
368impl MemWalIndex {
369 pub fn new(details: MemWalIndexDetails) -> Self {
370 Self { details }
371 }
372
373 pub fn merged_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
374 self.details
375 .merged_generations
376 .iter()
377 .find(|mg| &mg.shard_id == shard_id)
378 .map(|mg| mg.generation)
379 }
380
381 pub fn index_caught_up_generation(&self, index_name: &str, shard_id: &Uuid) -> Option<u64> {
384 self.details
385 .index_catchup
386 .iter()
387 .find(|icp| icp.index_name == index_name)
388 .and_then(|icp| icp.caught_up_generation_for_shard(shard_id))
389 }
390
391 pub fn is_index_caught_up(&self, index_name: &str, shard_id: &Uuid) -> bool {
394 let merged_gen = self.merged_generation_for_shard(shard_id).unwrap_or(0);
395 let caught_up_gen = self.index_caught_up_generation(index_name, shard_id);
396
397 caught_up_gen.is_none_or(|generation| generation >= merged_gen)
399 }
400}