use anyhow::{Context, Result};
use lance::Dataset;
pub async fn open_dataset(uri: &str) -> Result<Dataset> {
Dataset::open(uri)
.await
.with_context(|| format!("open lance dataset at {uri}"))
}
pub async fn open_branch(uri: &str, branch: &str) -> Result<Dataset> {
let dataset = open_dataset(uri).await?;
dataset
.checkout_branch(branch)
.await
.with_context(|| format!("checkout branch {branch} on {uri}"))
}
pub async fn current_version(uri: &str) -> Result<u64> {
let dataset = open_dataset(uri).await?;
Ok(dataset.version().version)
}
pub async fn create_branch(uri: &str, branch: &str, parent_version: u64) -> Result<()> {
inject_fault_create_branch()?;
let mut dataset = open_dataset(uri).await?;
dataset
.create_branch(branch, parent_version, None)
.await
.with_context(|| format!("create branch {branch} on {uri} at v{parent_version}"))?;
Ok(())
}
pub async fn current_version_on_branch(uri: &str, branch: &str) -> Result<u64> {
let dataset = open_branch(uri, branch).await?;
Ok(dataset.version().version)
}
pub async fn create_branch_from(
uri: &str,
new_branch: &str,
parent_branch: &str,
parent_version: u64,
) -> Result<()> {
inject_fault_create_branch()?;
let mut on_parent = open_branch(uri, parent_branch).await?;
on_parent
.create_branch(new_branch, parent_version, None)
.await
.with_context(|| {
format!("create branch {new_branch} off {parent_branch} on {uri} at v{parent_version}")
})?;
Ok(())
}
#[doc(hidden)]
pub mod fault_injection {
use std::sync::atomic::{AtomicI64, Ordering};
pub(super) static CALL_COUNT: AtomicI64 = AtomicI64::new(0);
pub(super) static DELETE_CALL_COUNT: AtomicI64 = AtomicI64::new(0);
pub fn reset() {
CALL_COUNT.store(0, Ordering::SeqCst);
}
pub fn calls_so_far() -> i64 {
CALL_COUNT.load(Ordering::SeqCst)
}
pub fn reset_delete() {
DELETE_CALL_COUNT.store(0, Ordering::SeqCst);
}
pub fn delete_calls_so_far() -> i64 {
DELETE_CALL_COUNT.load(Ordering::SeqCst)
}
}
fn inject_fault_create_branch() -> Result<()> {
let cur = fault_injection::CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let Some(threshold_str) = std::env::var_os("UNI_FORK_INJECT_FAIL_AFTER") else {
return Ok(());
};
let Ok(threshold) = threshold_str
.into_string()
.map_err(|_| ())
.and_then(|s| s.parse::<i64>().map_err(|_| ()))
else {
return Ok(());
};
if cur >= threshold {
anyhow::bail!(
"UNI_FORK_INJECT_FAIL_AFTER triggered at call #{cur} (threshold {threshold})"
);
}
Ok(())
}
fn inject_fault_delete_branch() -> Result<()> {
let cur = fault_injection::DELETE_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let Some(threshold_str) = std::env::var_os("UNI_FORK_INJECT_FAIL_DELETE_AFTER") else {
return Ok(());
};
let Ok(threshold) = threshold_str
.into_string()
.map_err(|_| ())
.and_then(|s| s.parse::<i64>().map_err(|_| ()))
else {
return Ok(());
};
if cur >= threshold {
anyhow::bail!(
"UNI_FORK_INJECT_FAIL_DELETE_AFTER triggered at call #{cur} (threshold {threshold})"
);
}
Ok(())
}
pub async fn create_scalar_index_on_branch(
uri: &str,
branch: &str,
column: &str,
index_name: &str,
) -> Result<()> {
use lance::index::DatasetIndexExt;
use lance_index::{IndexType, scalar::ScalarIndexParams};
let mut on_branch = open_branch(uri, branch).await?;
on_branch
.create_index_builder(&[column], IndexType::Scalar, &ScalarIndexParams::default())
.name(index_name.to_string())
.replace(true)
.await
.with_context(|| {
format!("create_scalar_index_on_branch({uri}@{branch}, column={column})")
})?;
Ok(())
}
pub async fn vector_search_on_branch(
uri: &str,
branch: &str,
column: &str,
query: &[f32],
k: usize,
) -> Result<Vec<arrow_array::RecordBatch>> {
use arrow_array::Float32Array;
use futures::TryStreamExt;
let on_branch = open_branch(uri, branch).await?;
let key = Float32Array::from(query.to_vec());
let mut scanner = on_branch.scan();
scanner
.nearest(column, &key, k)
.map_err(|e| anyhow::anyhow!("vector_search_on_branch nearest({column}, k={k}): {e}"))?;
let stream = scanner
.try_into_stream()
.await
.map_err(|e| anyhow::anyhow!("vector_search_on_branch stream: {e}"))?;
stream
.try_collect::<Vec<_>>()
.await
.map_err(|e| anyhow::anyhow!("vector_search_on_branch collect: {e}"))
}
pub async fn full_text_search_on_branch(
uri: &str,
branch: &str,
column: &str,
query: &str,
k: usize,
) -> Result<Vec<arrow_array::RecordBatch>> {
use futures::TryStreamExt;
use lance_index::scalar::FullTextSearchQuery;
use lance_index::scalar::inverted::query::MatchQuery;
let on_branch = open_branch(uri, branch).await?;
let match_query = MatchQuery::new(query.to_string()).with_column(Some(column.to_string()));
let fts_query = FullTextSearchQuery {
query: match_query.into(),
limit: Some(k as i64),
wand_factor: None,
};
let mut scanner = on_branch.scan();
scanner
.full_text_search(fts_query)
.map_err(|e| anyhow::anyhow!("full_text_search_on_branch({column}, k={k}): {e}"))?;
let stream = scanner
.try_into_stream()
.await
.map_err(|e| anyhow::anyhow!("full_text_search_on_branch stream: {e}"))?;
stream
.try_collect::<Vec<_>>()
.await
.map_err(|e| anyhow::anyhow!("full_text_search_on_branch collect: {e}"))
}
pub async fn create_vector_index_on_branch(
uri: &str,
branch: &str,
column: &str,
index_name: &str,
) -> Result<()> {
use lance::index::DatasetIndexExt;
use lance::index::vector::VectorIndexParams;
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
let mut on_branch = open_branch(uri, branch).await?;
let params = VectorIndexParams::ivf_flat(1, MetricType::L2);
on_branch
.create_index(
&[column],
IndexType::Vector,
Some(index_name.to_string()),
¶ms,
true,
)
.await
.with_context(|| {
format!("create_vector_index_on_branch({uri}@{branch}, column={column})")
})?;
Ok(())
}
pub async fn create_fts_index_on_branch(
uri: &str,
branch: &str,
column: &str,
index_name: &str,
) -> Result<()> {
use lance::index::DatasetIndexExt;
use lance_index::{IndexType, scalar::InvertedIndexParams};
let mut on_branch = open_branch(uri, branch).await?;
let fts_params = InvertedIndexParams::default();
on_branch
.create_index_builder(&[column], IndexType::Inverted, &fts_params)
.name(index_name.to_string())
.replace(true)
.await
.with_context(|| format!("create_fts_index_on_branch({uri}@{branch}, column={column})"))?;
Ok(())
}
pub async fn create_tag(uri: &str, tag: &str, branch: &str) -> Result<()> {
let on_branch = open_branch(uri, branch).await?;
let version = on_branch.version().version;
let dataset = open_dataset(uri).await?;
dataset
.tags()
.create(tag, (branch, version))
.await
.with_context(|| format!("create tag {tag} on {uri} -> {branch}@v{version}"))?;
Ok(())
}
pub async fn delete_tag(uri: &str, tag: &str) -> Result<()> {
let dataset = open_dataset(uri).await?;
let existing = dataset
.tags()
.list()
.await
.with_context(|| format!("list tags on {uri}"))?;
if !existing.contains_key(tag) {
return Ok(());
}
dataset
.tags()
.delete(tag)
.await
.with_context(|| format!("delete tag {tag} on {uri}"))
}
pub async fn list_tags(uri: &str) -> Result<Vec<(String, u64)>> {
let dataset = open_dataset(uri).await?;
let map = dataset
.tags()
.list()
.await
.with_context(|| format!("list tags on {uri}"))?;
Ok(map
.into_iter()
.map(|(name, contents)| (name, contents.version))
.collect())
}
pub async fn delete_branch(uri: &str, branch: &str) -> Result<()> {
inject_fault_delete_branch()?;
let mut dataset = open_dataset(uri).await?;
let branches = dataset
.list_branches()
.await
.with_context(|| format!("list branches on {uri}"))?;
if !branches.contains_key(branch) {
let zombie_uri = format!("{uri}/tree/{branch}");
if Dataset::open(&zombie_uri).await.is_err() {
return Ok(());
}
}
dataset
.force_delete_branch(branch)
.await
.with_context(|| format!("force-delete branch {branch} on {uri}"))?;
Ok(())
}
pub async fn list_branches(uri: &str) -> Result<Vec<String>> {
let dataset = open_dataset(uri).await?;
let branches = dataset
.list_branches()
.await
.with_context(|| format!("list branches on {uri}"))?;
Ok(branches.into_keys().collect())
}
pub async fn write_to_branch<R>(uri: &str, branch: &str, batches: R) -> Result<()>
where
R: arrow_array::RecordBatchReader + Send + 'static,
{
let mut on_branch = open_branch(uri, branch)
.await
.with_context(|| format!("open branch {branch} on {uri} for append"))?;
on_branch
.append(batches, None)
.await
.with_context(|| format!("append to branch {branch} on {uri}"))?;
Ok(())
}
pub async fn delete_from_branch(uri: &str, branch: &str, predicate: &str) -> Result<()> {
let mut on_branch = open_branch(uri, branch)
.await
.with_context(|| format!("open branch {branch} on {uri} for delete"))?;
on_branch.delete(predicate).await.with_context(|| {
format!("delete on branch {branch} on {uri} with predicate `{predicate}`")
})?;
Ok(())
}
pub async fn replace_branch_tip<R>(uri: &str, branch: &str, batches: R) -> Result<()>
where
R: arrow_array::RecordBatchReader + Send + 'static,
{
let mut on_branch = open_branch(uri, branch)
.await
.with_context(|| format!("open branch {branch} on {uri} for replace"))?;
on_branch
.delete("true")
.await
.with_context(|| format!("delete-all on branch {branch} on {uri}"))?;
on_branch
.append(batches, None)
.await
.with_context(|| format!("append on branch {branch} on {uri} after delete-all"))?;
Ok(())
}
pub async fn create_dataset_then_branch<R>(
uri: &str,
branch: &str,
initial_batches: R,
) -> Result<()>
where
R: arrow_array::RecordBatchReader + Send + 'static,
{
Dataset::write(initial_batches, uri, None)
.await
.with_context(|| format!("create lance dataset at {uri}"))?;
let parent_v = current_version(uri).await?;
create_branch(uri, branch, parent_v)
.await
.with_context(|| format!("branch {branch} off newly-created dataset at {uri}"))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{Int64Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use std::sync::Arc;
use tempfile::TempDir;
fn test_schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::UInt64, false),
Field::new("value", DataType::Int64, false),
]))
}
fn test_batch(ids: Vec<u64>, values: Vec<i64>) -> RecordBatch {
RecordBatch::try_new(
test_schema(),
vec![
Arc::new(UInt64Array::from(ids)),
Arc::new(Int64Array::from(values)),
],
)
.unwrap()
}
async fn seed_dataset() -> (TempDir, String) {
let dir = TempDir::new().unwrap();
let uri = format!("{}/ds.lance", dir.path().display());
let batch = test_batch(vec![1, 2, 3], vec![100, 200, 300]);
let reader =
arrow_array::RecordBatchIterator::new(vec![Ok(batch)].into_iter(), test_schema());
Dataset::write(reader, &uri, None).await.unwrap();
(dir, uri)
}
#[tokio::test]
async fn current_version_returns_positive_value() {
let (_dir, uri) = seed_dataset().await;
let v = current_version(&uri).await.unwrap();
assert!(v >= 1, "fresh dataset should have version >= 1, got {v}");
}
#[tokio::test]
async fn create_open_list_delete_roundtrip() {
let (_dir, uri) = seed_dataset().await;
let v = current_version(&uri).await.unwrap();
create_branch(&uri, "fork-a", v).await.unwrap();
let branches = list_branches(&uri).await.unwrap();
assert!(
branches.iter().any(|b| b == "fork-a"),
"expected fork-a in {branches:?}"
);
let branched = open_branch(&uri, "fork-a").await.unwrap();
let count = branched.count_rows(None).await.unwrap();
assert_eq!(count, 3, "branch reads should chain to parent");
delete_branch(&uri, "fork-a").await.unwrap();
let branches = list_branches(&uri).await.unwrap();
assert!(!branches.iter().any(|b| b == "fork-a"));
}
#[tokio::test]
async fn create_branch_not_idempotent_at_same_version() {
let (_dir, uri) = seed_dataset().await;
let v = current_version(&uri).await.unwrap();
create_branch(&uri, "fork-x", v).await.unwrap();
let second = create_branch(&uri, "fork-x", v).await;
assert!(second.is_err(), "second create_branch should fail");
delete_branch(&uri, "fork-x").await.unwrap();
create_branch(&uri, "fork-x", v).await.unwrap();
}
#[tokio::test]
async fn delete_missing_branch_is_force_safe() {
let (_dir, uri) = seed_dataset().await;
delete_branch(&uri, "never-existed").await.unwrap();
}
#[tokio::test]
async fn current_version_on_branch_tracks_branch_tip() {
let (_dir, uri) = seed_dataset().await;
let v_main = current_version(&uri).await.unwrap();
create_branch(&uri, "child", v_main).await.unwrap();
let v_branch_initial = current_version_on_branch(&uri, "child").await.unwrap();
assert!(v_branch_initial >= v_main);
let batch = test_batch(vec![10, 11], vec![1000, 1100]);
let reader =
arrow_array::RecordBatchIterator::new(vec![Ok(batch)].into_iter(), test_schema());
write_to_branch(&uri, "child", reader).await.unwrap();
let v_branch_after = current_version_on_branch(&uri, "child").await.unwrap();
let v_main_after = current_version(&uri).await.unwrap();
assert!(
v_branch_after > v_branch_initial,
"branch tip should advance after append"
);
assert_eq!(
v_main_after, v_main,
"main version must not move when branch is written"
);
}
#[tokio::test]
async fn create_branch_from_chains_through_parent() {
let (_dir, uri) = seed_dataset().await;
let v_main = current_version(&uri).await.unwrap();
create_branch(&uri, "level1", v_main).await.unwrap();
let batch = test_batch(vec![10, 11], vec![1000, 1100]);
let reader =
arrow_array::RecordBatchIterator::new(vec![Ok(batch)].into_iter(), test_schema());
write_to_branch(&uri, "level1", reader).await.unwrap();
let v_l1 = current_version_on_branch(&uri, "level1").await.unwrap();
create_branch_from(&uri, "level2", "level1", v_l1)
.await
.unwrap();
let on_l2 = open_branch(&uri, "level2").await.unwrap();
assert_eq!(on_l2.count_rows(None).await.unwrap(), 5);
let batch2 = test_batch(vec![20], vec![2000]);
let reader2 =
arrow_array::RecordBatchIterator::new(vec![Ok(batch2)].into_iter(), test_schema());
write_to_branch(&uri, "level1", reader2).await.unwrap();
let on_l2_again = open_branch(&uri, "level2").await.unwrap();
assert_eq!(
on_l2_again.count_rows(None).await.unwrap(),
5,
"level2 must not see writes to level1 that happened after its creation"
);
}
#[tokio::test]
async fn create_list_delete_tag_roundtrip() {
let (_dir, uri) = seed_dataset().await;
let v = current_version(&uri).await.unwrap();
create_branch(&uri, "to-tag", v).await.unwrap();
create_tag(&uri, "audit-2026", "to-tag").await.unwrap();
let tags = list_tags(&uri).await.unwrap();
assert!(
tags.iter().any(|(n, _)| n == "audit-2026"),
"tags = {tags:?}"
);
delete_tag(&uri, "audit-2026").await.unwrap();
let tags2 = list_tags(&uri).await.unwrap();
assert!(!tags2.iter().any(|(n, _)| n == "audit-2026"));
delete_tag(&uri, "audit-2026").await.unwrap();
}
#[tokio::test]
async fn create_tag_pins_version_at_call_time() {
let (_dir, uri) = seed_dataset().await;
let v = current_version(&uri).await.unwrap();
create_branch(&uri, "snap-branch", v).await.unwrap();
let batch = test_batch(vec![10], vec![1000]);
let reader =
arrow_array::RecordBatchIterator::new(vec![Ok(batch)].into_iter(), test_schema());
write_to_branch(&uri, "snap-branch", reader).await.unwrap();
create_tag(&uri, "v1", "snap-branch").await.unwrap();
let tags = list_tags(&uri).await.unwrap();
let (_, pinned_v) = tags.iter().find(|(n, _)| n == "v1").unwrap();
let batch2 = test_batch(vec![11], vec![1100]);
let reader2 =
arrow_array::RecordBatchIterator::new(vec![Ok(batch2)].into_iter(), test_schema());
write_to_branch(&uri, "snap-branch", reader2).await.unwrap();
let tags_after = list_tags(&uri).await.unwrap();
let (_, pinned_after) = tags_after.iter().find(|(n, _)| n == "v1").unwrap();
assert_eq!(
pinned_v, pinned_after,
"tag must pin to fork-time version, not follow branch tip"
);
}
#[tokio::test]
async fn parent_writes_after_branch_invisible_to_branch() {
let (_dir, uri) = seed_dataset().await;
let v = current_version(&uri).await.unwrap();
create_branch(&uri, "snap", v).await.unwrap();
let batch = test_batch(vec![4, 5], vec![400, 500]);
let reader =
arrow_array::RecordBatchIterator::new(vec![Ok(batch)].into_iter(), test_schema());
let mut primary = Dataset::open(&uri).await.unwrap();
primary.append(reader, None).await.unwrap();
let branched = open_branch(&uri, "snap").await.unwrap();
let count = branched.count_rows(None).await.unwrap();
assert_eq!(count, 3, "branch must not see post-fork primary writes");
let primary = Dataset::open(&uri).await.unwrap();
assert_eq!(primary.count_rows(None).await.unwrap(), 5);
}
#[tokio::test]
#[ignore = "phase-5b spike: documents Lance per-branch vector index behavior; run with --run-ignored ignored-only"]
async fn phase5b_spike_per_branch_vector() {
use arrow_array::{Float32Array, RecordBatch as Batch, UInt64Array};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use lance::index::DatasetIndexExt;
use lance::index::vector::VectorIndexParams;
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
let dir = TempDir::new().unwrap();
let uri = format!("{}/vec_ds.lance", dir.path().display());
let vec_field = Field::new("item", DataType::Float32, false);
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::UInt64, false),
Field::new(
"vector",
DataType::FixedSizeList(Arc::new(vec_field.clone()), 4),
false,
),
]));
let make_batch = |ids: Vec<u64>, vecs: Vec<[f32; 4]>| -> Batch {
let flat: Vec<f32> = vecs.into_iter().flatten().collect();
let values = Float32Array::from(flat);
let list = arrow_array::FixedSizeListArray::new(
Arc::new(vec_field.clone()),
4,
Arc::new(values),
None,
);
Batch::try_new(
schema.clone(),
vec![Arc::new(UInt64Array::from(ids)), Arc::new(list)],
)
.unwrap()
};
let primary_batch = {
let ids: Vec<u64> = (0..100).collect();
let vecs: Vec<[f32; 4]> = (0..100)
.map(|i| [(i as f32) * 0.001, 0.0, 0.0, 0.0])
.collect();
make_batch(ids, vecs)
};
let reader = arrow_array::RecordBatchIterator::new(
vec![Ok(primary_batch)].into_iter(),
schema.clone(),
);
Dataset::write(reader, &uri, None).await.unwrap();
let mut main_ds = Dataset::open(&uri).await.unwrap();
let params = VectorIndexParams::ivf_flat(1, MetricType::L2);
main_ds
.create_index(
&["vector"],
IndexType::Vector,
Some("primary_vec".into()),
¶ms,
true,
)
.await
.unwrap();
let v_main = current_version(&uri).await.unwrap();
create_branch(&uri, "fork-vec", v_main).await.unwrap();
let fork_batch = {
let ids: Vec<u64> = (1000..1005).collect();
let vecs: Vec<[f32; 4]> = (0..5)
.map(|i| [100.0 + (i as f32), 0.0, 0.0, 0.0])
.collect();
make_batch(ids, vecs)
};
let reader2 =
arrow_array::RecordBatchIterator::new(vec![Ok(fork_batch)].into_iter(), schema.clone());
write_to_branch(&uri, "fork-vec", reader2).await.unwrap();
let on_branch = open_branch(&uri, "fork-vec").await.unwrap();
let query = Float32Array::from(vec![100.5_f32, 0.0, 0.0, 0.0]);
let mut scanner = on_branch.scan();
scanner.nearest("vector", &query, 5).unwrap();
let stream = scanner.try_into_stream().await.unwrap();
let batches = futures::TryStreamExt::try_collect::<Vec<_>>(stream)
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
let mut ids: Vec<u64> = Vec::new();
for b in &batches {
let id_col = b
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
for i in 0..b.num_rows() {
ids.push(id_col.value(i));
}
}
eprintln!("SPIKE: branch nearest-5 to [100.5,0,0,0]: {total} rows, ids={ids:?}");
let saw_fork_id = ids.iter().any(|i| *i >= 1000);
let saw_primary_id = ids.iter().any(|i| *i < 1000);
eprintln!(
"SPIKE VERDICT: branch sees fork rows={saw_fork_id} parent rows={saw_primary_id}"
);
let mut on_branch_mut = open_branch(&uri, "fork-vec").await.unwrap();
let result = on_branch_mut
.create_index(
&["vector"],
IndexType::Vector,
Some("fork_vec".into()),
¶ms,
true,
)
.await;
match result {
Ok(_) => {
let main_after = Dataset::open(&uri).await.unwrap();
let main_indices = main_after.load_indices().await.unwrap();
let branch_after = open_branch(&uri, "fork-vec").await.unwrap();
let branch_indices = branch_after.load_indices().await.unwrap();
let main_has_fork = main_indices
.iter()
.any(|i: &lance::table::format::IndexMetadata| i.name == "fork_vec");
let branch_has_fork = branch_indices
.iter()
.any(|i: &lance::table::format::IndexMetadata| i.name == "fork_vec");
eprintln!(
"SPIKE: vector index branch-local? {} leaked-to-main? {}",
branch_has_fork && !main_has_fork,
main_has_fork
);
}
Err(e) => {
eprintln!("SPIKE: vector index build on branch refused: {e}");
}
}
}
#[tokio::test]
#[ignore = "phase-5a spike: documents Lance per-branch index semantics; run with --run-ignored ignored-only"]
async fn phase5a_spike_per_branch_index() {
use lance::index::DatasetIndexExt;
use lance_index::{IndexType, scalar::ScalarIndexParams};
let (_dir, uri) = seed_dataset().await;
let v_main = current_version(&uri).await.unwrap();
create_branch(&uri, "fork-spike", v_main).await.unwrap();
let batch = test_batch(vec![100, 101, 102], vec![1000, 1100, 1200]);
let reader =
arrow_array::RecordBatchIterator::new(vec![Ok(batch)].into_iter(), test_schema());
write_to_branch(&uri, "fork-spike", reader).await.unwrap();
let mut on_branch = open_branch(&uri, "fork-spike").await.unwrap();
let scalar_params = ScalarIndexParams::default();
let result = on_branch
.create_index_builder(&["id"], IndexType::Scalar, &scalar_params)
.name("phase5a_spike".to_string())
.replace(true)
.await;
match result {
Ok(metadata) => {
eprintln!(
"SPIKE OUTCOME 1 OR 2: index created. name={} uuid={} dataset_version={}",
metadata.name, metadata.uuid, metadata.dataset_version
);
let main_after = Dataset::open(&uri).await.unwrap();
let main_indices = main_after.load_indices().await.unwrap();
let branch_after = open_branch(&uri, "fork-spike").await.unwrap();
let branch_indices = branch_after.load_indices().await.unwrap();
eprintln!(
"main has {} index(es) after branch build; branch has {}",
main_indices.len(),
branch_indices.len()
);
for idx in main_indices.iter() {
eprintln!(" main index: name={} uuid={}", idx.name, idx.uuid);
}
for idx in branch_indices.iter() {
eprintln!(" branch index: name={} uuid={}", idx.name, idx.uuid);
}
let leaked_to_main = main_indices
.iter()
.any(|i: &lance::table::format::IndexMetadata| i.name == "phase5a_spike");
let on_branch_only = branch_indices
.iter()
.any(|i: &lance::table::format::IndexMetadata| i.name == "phase5a_spike");
eprintln!(
"SPIKE VERDICT: branch-local={} leaked-to-main={}",
on_branch_only && !leaked_to_main,
leaked_to_main
);
}
Err(e) => {
eprintln!("SPIKE OUTCOME 3: Lance refused per-branch index: {e}");
}
}
}
}