use std::sync::Arc;
use lance_core::{Error, Result};
use lance_index::mem_wal::{MEM_WAL_INDEX_NAME, MemWalIndex, MemWalIndexDetails, MergedGeneration};
use lance_table::format::{IndexMetadata, pb};
use uuid::Uuid;
pub(crate) fn load_mem_wal_index_details(index: IndexMetadata) -> Result<MemWalIndexDetails> {
if let Some(details_any) = index.index_details.as_ref() {
if !details_any.type_url.ends_with("MemWalIndexDetails") {
return Err(Error::index(format!(
"Index details is not for the MemWAL index, but {}",
details_any.type_url
)));
}
Ok(MemWalIndexDetails::try_from(
details_any.to_msg::<pb::MemWalIndexDetails>()?,
)?)
} else {
Err(Error::index("Index details not found for the MemWAL index"))
}
}
pub(crate) fn open_mem_wal_index(index: IndexMetadata) -> Result<Arc<MemWalIndex>> {
Ok(Arc::new(MemWalIndex::new(load_mem_wal_index_details(
index,
)?)))
}
pub(crate) fn update_mem_wal_index_merged_generations(
indices: &mut Vec<IndexMetadata>,
dataset_version: u64,
new_merged_generations: Vec<MergedGeneration>,
) -> Result<()> {
if new_merged_generations.is_empty() {
return Ok(());
}
let pos = indices
.iter()
.position(|idx| idx.name == MEM_WAL_INDEX_NAME);
let new_meta = if let Some(pos) = pos {
let current_meta = indices.remove(pos);
let mut details = load_mem_wal_index_details(current_meta)?;
for new_mg in new_merged_generations {
if let Some(existing) = details
.merged_generations
.iter_mut()
.find(|mg| mg.region_id == new_mg.region_id)
{
if new_mg.generation > existing.generation {
existing.generation = new_mg.generation;
}
} else {
details.merged_generations.push(new_mg);
}
}
new_mem_wal_index_meta(dataset_version, details)?
} else {
let details = MemWalIndexDetails {
merged_generations: new_merged_generations,
..Default::default()
};
new_mem_wal_index_meta(dataset_version, details)?
};
indices.push(new_meta);
Ok(())
}
pub(crate) fn new_mem_wal_index_meta(
dataset_version: u64,
details: MemWalIndexDetails,
) -> Result<IndexMetadata> {
Ok(IndexMetadata {
uuid: Uuid::new_v4(),
name: MEM_WAL_INDEX_NAME.to_string(),
fields: vec![],
dataset_version,
fragment_bitmap: None,
index_details: Some(Arc::new(prost_types::Any::from_msg(
&pb::MemWalIndexDetails::from(&details),
)?)),
index_version: 0,
created_at: Some(chrono::Utc::now()),
base_id: None,
files: None,
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use lance_index::DatasetIndexExt;
use crate::dataset::transaction::{Operation, Transaction};
use crate::dataset::{CommitBuilder, InsertBuilder, WriteParams};
async fn test_dataset() -> crate::Dataset {
let write_params = WriteParams {
max_rows_per_file: 10,
..Default::default()
};
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, true),
])),
vec![
Arc::new(Int32Array::from_iter_values(0..10_i32)),
Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(0, 10))),
],
)
.unwrap();
InsertBuilder::new("memory://test_mem_wal")
.with_params(&write_params)
.execute(vec![data])
.await
.unwrap()
}
#[tokio::test]
async fn test_update_mem_wal_state_conflict_lower_generation_no_retry() {
let dataset = test_dataset().await;
let region = Uuid::new_v4();
let txn1 = Transaction::new(
dataset.manifest.version,
Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 10)],
},
None,
);
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(txn1)
.await
.unwrap();
let txn2 = Transaction::new(
dataset.manifest.version - 1, Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 5)],
},
None,
);
let result = CommitBuilder::new(Arc::new(dataset)).execute(txn2).await;
assert!(
matches!(result, Err(crate::Error::IncompatibleTransaction { .. })),
"Expected non-retryable IncompatibleTransaction for lower generation, got {:?}",
result
);
}
#[tokio::test]
async fn test_update_mem_wal_state_conflict_equal_generation_no_retry() {
let dataset = test_dataset().await;
let region = Uuid::new_v4();
let txn1 = Transaction::new(
dataset.manifest.version,
Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 10)],
},
None,
);
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(txn1)
.await
.unwrap();
let txn2 = Transaction::new(
dataset.manifest.version - 1, Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 10)],
},
None,
);
let result = CommitBuilder::new(Arc::new(dataset)).execute(txn2).await;
assert!(
matches!(result, Err(crate::Error::IncompatibleTransaction { .. })),
"Expected non-retryable IncompatibleTransaction for equal generation, got {:?}",
result
);
}
#[tokio::test]
async fn test_update_mem_wal_state_conflict_higher_generation_retryable() {
let dataset = test_dataset().await;
let region = Uuid::new_v4();
let txn1 = Transaction::new(
dataset.manifest.version,
Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 5)],
},
None,
);
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(txn1)
.await
.unwrap();
let txn2 = Transaction::new(
dataset.manifest.version - 1, Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 10)],
},
None,
);
let result = CommitBuilder::new(Arc::new(dataset)).execute(txn2).await;
assert!(
matches!(result, Err(crate::Error::RetryableCommitConflict { .. })),
"Expected retryable conflict for higher generation, got {:?}",
result
);
}
#[tokio::test]
async fn test_update_mem_wal_state_different_regions_no_conflict() {
let dataset = test_dataset().await;
let region1 = Uuid::new_v4();
let region2 = Uuid::new_v4();
let txn1 = Transaction::new(
dataset.manifest.version,
Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region1, 10)],
},
None,
);
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(txn1)
.await
.unwrap();
let txn2 = Transaction::new(
dataset.manifest.version - 1, Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region2, 5)],
},
None,
);
let result = CommitBuilder::new(Arc::new(dataset)).execute(txn2).await;
assert!(
result.is_ok(),
"Expected success for different regions, got {:?}",
result
);
let dataset = result.unwrap();
let mem_wal_idx = dataset
.load_indices()
.await
.unwrap()
.iter()
.find(|idx| idx.name == MEM_WAL_INDEX_NAME)
.unwrap()
.clone();
let details = load_mem_wal_index_details(mem_wal_idx).unwrap();
assert_eq!(details.merged_generations.len(), 2);
}
#[tokio::test]
async fn test_create_index_rebase_against_update_mem_wal_state() {
let dataset = test_dataset().await;
let region = Uuid::new_v4();
let txn1 = Transaction::new(
dataset.manifest.version,
Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 10)],
},
None,
);
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(txn1)
.await
.unwrap();
let details = MemWalIndexDetails {
num_regions: 1,
..Default::default()
};
let mem_wal_index = new_mem_wal_index_meta(dataset.manifest.version - 1, details).unwrap();
let txn2 = Transaction::new(
dataset.manifest.version - 1, Operation::CreateIndex {
new_indices: vec![mem_wal_index],
removed_indices: vec![],
},
None,
);
let result = CommitBuilder::new(Arc::new(dataset)).execute(txn2).await;
assert!(
result.is_ok(),
"Expected CreateIndex to succeed with rebase, got {:?}",
result
);
let dataset = result.unwrap();
let mem_wal_idx = dataset
.load_indices()
.await
.unwrap()
.iter()
.find(|idx| idx.name == MEM_WAL_INDEX_NAME)
.unwrap()
.clone();
let details = load_mem_wal_index_details(mem_wal_idx).unwrap();
assert_eq!(details.merged_generations.len(), 1);
assert_eq!(details.merged_generations[0].region_id, region);
assert_eq!(details.merged_generations[0].generation, 10);
assert_eq!(details.num_regions, 1); }
#[tokio::test]
async fn test_update_mem_wal_state_against_create_index_lower_generation() {
let dataset = test_dataset().await;
let region = Uuid::new_v4();
let details = MemWalIndexDetails {
merged_generations: vec![MergedGeneration::new(region, 10)],
..Default::default()
};
let mem_wal_index = new_mem_wal_index_meta(dataset.manifest.version, details).unwrap();
let txn1 = Transaction::new(
dataset.manifest.version,
Operation::CreateIndex {
new_indices: vec![mem_wal_index],
removed_indices: vec![],
},
None,
);
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(txn1)
.await
.unwrap();
let txn2 = Transaction::new(
dataset.manifest.version - 1, Operation::UpdateMemWalState {
merged_generations: vec![MergedGeneration::new(region, 5)],
},
None,
);
let result = CommitBuilder::new(Arc::new(dataset)).execute(txn2).await;
assert!(
matches!(result, Err(crate::Error::IncompatibleTransaction { .. })),
"Expected non-retryable IncompatibleTransaction when UpdateMemWalState generation is lower than CreateIndex, got {:?}",
result
);
}
#[test]
fn test_update_merged_generations() {
let mut indices = Vec::new();
let region1 = Uuid::new_v4();
let region2 = Uuid::new_v4();
update_mem_wal_index_merged_generations(
&mut indices,
1,
vec![MergedGeneration::new(region1, 5)],
)
.unwrap();
assert_eq!(indices.len(), 1);
let details = load_mem_wal_index_details(indices[0].clone()).unwrap();
assert_eq!(details.merged_generations.len(), 1);
assert_eq!(details.merged_generations[0].region_id, region1);
assert_eq!(details.merged_generations[0].generation, 5);
update_mem_wal_index_merged_generations(
&mut indices,
2,
vec![MergedGeneration::new(region1, 10)],
)
.unwrap();
assert_eq!(indices.len(), 1);
let details = load_mem_wal_index_details(indices[0].clone()).unwrap();
assert_eq!(details.merged_generations.len(), 1);
assert_eq!(details.merged_generations[0].generation, 10);
update_mem_wal_index_merged_generations(
&mut indices,
3,
vec![MergedGeneration::new(region2, 3)],
)
.unwrap();
assert_eq!(indices.len(), 1);
let details = load_mem_wal_index_details(indices[0].clone()).unwrap();
assert_eq!(details.merged_generations.len(), 2);
update_mem_wal_index_merged_generations(
&mut indices,
4,
vec![MergedGeneration::new(region1, 8)], )
.unwrap();
let details = load_mem_wal_index_details(indices[0].clone()).unwrap();
let r1_mg = details
.merged_generations
.iter()
.find(|mg| mg.region_id == region1)
.unwrap();
assert_eq!(r1_mg.generation, 10); }
#[test]
fn test_empty_merged_generations_noop() {
let mut indices = Vec::new();
update_mem_wal_index_merged_generations(&mut indices, 1, vec![]).unwrap();
assert!(indices.is_empty());
}
}