use std::collections::HashMap;
use lance_core::Error;
use lance_core::deepsize::DeepSizeOf;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::format::pb;
pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
pub type ShardId = Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct FlushedGeneration {
pub generation: u64,
pub path: String,
}
impl From<&FlushedGeneration> for pb::FlushedGeneration {
fn from(fg: &FlushedGeneration) -> Self {
Self {
generation: fg.generation,
path: fg.path.clone(),
}
}
}
impl From<pb::FlushedGeneration> for FlushedGeneration {
fn from(fg: pb::FlushedGeneration) -> Self {
Self {
generation: fg.generation,
path: fg.path,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
pub struct MergedGeneration {
pub shard_id: Uuid,
pub generation: u64,
}
impl DeepSizeOf for MergedGeneration {
fn deep_size_of_children(&self, _context: &mut lance_core::deepsize::Context) -> usize {
0 }
}
impl MergedGeneration {
pub fn new(shard_id: Uuid, generation: u64) -> Self {
Self {
shard_id,
generation,
}
}
}
impl From<&MergedGeneration> for pb::MergedGeneration {
fn from(mg: &MergedGeneration) -> Self {
Self {
shard_id: Some((&mg.shard_id).into()),
generation: mg.generation,
}
}
}
impl TryFrom<pb::MergedGeneration> for MergedGeneration {
type Error = Error;
fn try_from(mg: pb::MergedGeneration) -> lance_core::Result<Self> {
let shard_id = mg
.shard_id
.as_ref()
.map(Uuid::try_from)
.ok_or_else(|| Error::invalid_input("Missing shard_id in MergedGeneration"))??;
Ok(Self {
shard_id,
generation: mg.generation,
})
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct IndexCatchupProgress {
pub index_name: String,
pub caught_up_generations: Vec<MergedGeneration>,
}
impl IndexCatchupProgress {
pub fn new(index_name: String, caught_up_generations: Vec<MergedGeneration>) -> Self {
Self {
index_name,
caught_up_generations,
}
}
pub fn caught_up_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
self.caught_up_generations
.iter()
.find(|mg| &mg.shard_id == shard_id)
.map(|mg| mg.generation)
}
}
impl From<&IndexCatchupProgress> for pb::IndexCatchupProgress {
fn from(icp: &IndexCatchupProgress) -> Self {
Self {
index_name: icp.index_name.clone(),
caught_up_generations: icp
.caught_up_generations
.iter()
.map(|mg| mg.into())
.collect(),
}
}
}
impl TryFrom<pb::IndexCatchupProgress> for IndexCatchupProgress {
type Error = Error;
fn try_from(icp: pb::IndexCatchupProgress) -> lance_core::Result<Self> {
Ok(Self {
index_name: icp.index_name,
caught_up_generations: icp
.caught_up_generations
.into_iter()
.map(MergedGeneration::try_from)
.collect::<lance_core::Result<_>>()?,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShardManifest {
pub shard_id: Uuid,
pub version: u64,
pub shard_spec_id: u32,
pub shard_field_values: HashMap<String, Vec<u8>>,
pub writer_epoch: u64,
pub replay_after_wal_entry_position: u64,
pub wal_entry_position_last_seen: u64,
pub current_generation: u64,
pub flushed_generations: Vec<FlushedGeneration>,
}
impl DeepSizeOf for ShardManifest {
fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
self.shard_field_values.deep_size_of_children(context)
+ self.flushed_generations.deep_size_of_children(context)
}
}
impl From<&ShardManifest> for pb::ShardManifest {
fn from(rm: &ShardManifest) -> Self {
Self {
shard_id: Some((&rm.shard_id).into()),
version: rm.version,
shard_spec_id: rm.shard_spec_id,
shard_field_entries: rm
.shard_field_values
.iter()
.map(|(k, v)| pb::ShardFieldEntry {
field_id: k.clone(),
value: v.clone(),
})
.collect(),
writer_epoch: rm.writer_epoch,
replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
current_generation: rm.current_generation,
flushed_generations: rm.flushed_generations.iter().map(|fg| fg.into()).collect(),
}
}
}
impl TryFrom<pb::ShardManifest> for ShardManifest {
type Error = Error;
fn try_from(rm: pb::ShardManifest) -> lance_core::Result<Self> {
let shard_id = rm
.shard_id
.as_ref()
.map(Uuid::try_from)
.ok_or_else(|| Error::invalid_input("Missing shard_id in ShardManifest"))??;
let shard_field_values = rm
.shard_field_entries
.into_iter()
.map(|e| (e.field_id, e.value))
.collect();
Ok(Self {
shard_id,
version: rm.version,
shard_spec_id: rm.shard_spec_id,
shard_field_values,
writer_epoch: rm.writer_epoch,
replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
current_generation: rm.current_generation,
flushed_generations: rm
.flushed_generations
.into_iter()
.map(FlushedGeneration::from)
.collect(),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct ShardingField {
pub field_id: String,
pub source_ids: Vec<i32>,
pub transform: Option<String>,
pub expression: Option<String>,
pub result_type: String,
pub parameters: HashMap<String, String>,
}
impl From<&ShardingField> for pb::ShardingField {
fn from(rf: &ShardingField) -> Self {
Self {
field_id: rf.field_id.clone(),
source_ids: rf.source_ids.clone(),
transform: rf.transform.clone(),
expression: rf.expression.clone(),
result_type: rf.result_type.clone(),
parameters: rf.parameters.clone(),
}
}
}
impl From<pb::ShardingField> for ShardingField {
fn from(rf: pb::ShardingField) -> Self {
Self {
field_id: rf.field_id,
source_ids: rf.source_ids,
transform: rf.transform,
expression: rf.expression,
result_type: rf.result_type,
parameters: rf.parameters,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct ShardingSpec {
pub spec_id: u32,
pub fields: Vec<ShardingField>,
}
impl From<&ShardingSpec> for pb::ShardingSpec {
fn from(rs: &ShardingSpec) -> Self {
Self {
spec_id: rs.spec_id,
fields: rs.fields.iter().map(|f| f.into()).collect(),
}
}
}
impl From<pb::ShardingSpec> for ShardingSpec {
fn from(rs: pb::ShardingSpec) -> Self {
Self {
spec_id: rs.spec_id,
fields: rs.fields.into_iter().map(ShardingField::from).collect(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct MemWalIndexDetails {
pub snapshot_ts_millis: i64,
pub num_shards: u32,
pub inline_snapshots: Option<Vec<u8>>,
pub sharding_specs: Vec<ShardingSpec>,
pub maintained_indexes: Vec<String>,
pub merged_generations: Vec<MergedGeneration>,
pub index_catchup: Vec<IndexCatchupProgress>,
pub writer_config_defaults: HashMap<String, String>,
}
impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
fn from(details: &MemWalIndexDetails) -> Self {
Self {
snapshot_ts_millis: details.snapshot_ts_millis,
num_shards: details.num_shards,
inline_snapshots: details.inline_snapshots.clone(),
sharding_specs: details.sharding_specs.iter().map(|rs| rs.into()).collect(),
maintained_indexes: details.maintained_indexes.clone(),
merged_generations: details
.merged_generations
.iter()
.map(|mg| mg.into())
.collect(),
index_catchup: details.index_catchup.iter().map(|icp| icp.into()).collect(),
writer_config_defaults: details.writer_config_defaults.clone(),
}
}
}
impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
type Error = Error;
fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
Ok(Self {
snapshot_ts_millis: details.snapshot_ts_millis,
num_shards: details.num_shards,
inline_snapshots: details.inline_snapshots,
sharding_specs: details
.sharding_specs
.into_iter()
.map(ShardingSpec::from)
.collect(),
maintained_indexes: details.maintained_indexes,
merged_generations: details
.merged_generations
.into_iter()
.map(MergedGeneration::try_from)
.collect::<lance_core::Result<_>>()?,
index_catchup: details
.index_catchup
.into_iter()
.map(IndexCatchupProgress::try_from)
.collect::<lance_core::Result<_>>()?,
writer_config_defaults: details.writer_config_defaults,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
pub struct MemWalIndex {
pub details: MemWalIndexDetails,
}
impl MemWalIndex {
pub fn new(details: MemWalIndexDetails) -> Self {
Self { details }
}
pub fn merged_generation_for_shard(&self, shard_id: &Uuid) -> Option<u64> {
self.details
.merged_generations
.iter()
.find(|mg| &mg.shard_id == shard_id)
.map(|mg| mg.generation)
}
pub fn index_caught_up_generation(&self, index_name: &str, shard_id: &Uuid) -> Option<u64> {
self.details
.index_catchup
.iter()
.find(|icp| icp.index_name == index_name)
.and_then(|icp| icp.caught_up_generation_for_shard(shard_id))
}
pub fn is_index_caught_up(&self, index_name: &str, shard_id: &Uuid) -> bool {
let merged_gen = self.merged_generation_for_shard(shard_id).unwrap_or(0);
let caught_up_gen = self.index_caught_up_generation(index_name, shard_id);
caught_up_gen.is_none_or(|generation| generation >= merged_gen)
}
}