#![deny(clippy::unwrap_used)]
use std::sync::Arc;
use infino::{
supertable::{
Supertable,
storage::{LocalFsStorageProvider, StorageProvider},
},
test_helpers::{build_title_batch, default_supertable_options},
};
const MAX_RETRIES_SENTINEL: u32 = 42;
const CONTENTION_MAX_RETRIES: u32 = 20;
use tempfile::TempDir;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn two_handles_concurrent_commits_both_succeed_via_occ_retry() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st_a = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let st_b = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let t_a = tokio::task::spawn_blocking({
let st = st_a.clone();
move || {
let mut w = st.writer().expect("writer A");
w.append(&build_title_batch(&["from_a alpha"]))
.expect("append A");
w.commit().expect("commit A");
}
});
let t_b = tokio::task::spawn_blocking({
let st = st_b.clone();
move || {
let mut w = st.writer().expect("writer B");
w.append(&build_title_batch(&["from_b beta"]))
.expect("append B");
w.commit().expect("commit B");
}
});
t_a.await.expect("task A");
t_b.await.expect("task B");
let final_ids = [st_a.manifest_id(), st_b.manifest_id()];
let max_id = final_ids.iter().copied().max().expect("non-empty");
let min_id = final_ids.iter().copied().min().expect("non-empty");
assert_eq!(
max_id, 2,
"one handle must have committed at v2 after retry; got {final_ids:?}"
);
assert_eq!(
min_id, 1,
"both commits succeeded so both handles advanced past v0; got {final_ids:?}"
);
drop(st_a);
drop(st_b);
let consumer =
Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open");
assert_eq!(consumer.manifest_id(), 2);
assert_eq!(
consumer.reader().n_superfiles(),
2,
"post-open consumer sees both writers' superfiles"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn three_handles_concurrent_commits_all_succeed() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st_a = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let st_b = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let st_c = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let t_a = tokio::task::spawn_blocking({
let st = st_a.clone();
move || {
let mut w = st.writer().expect("writer A");
w.append(&build_title_batch(&["from_a"])).expect("append A");
w.commit().expect("commit A");
}
});
let t_b = tokio::task::spawn_blocking({
let st = st_b.clone();
move || {
let mut w = st.writer().expect("writer B");
w.append(&build_title_batch(&["from_b"])).expect("append B");
w.commit().expect("commit B");
}
});
let t_c = tokio::task::spawn_blocking({
let st = st_c.clone();
move || {
let mut w = st.writer().expect("writer C");
w.append(&build_title_batch(&["from_c"])).expect("append C");
w.commit().expect("commit C");
}
});
t_a.await.expect("task A");
t_b.await.expect("task B");
t_c.await.expect("task C");
drop(st_a);
drop(st_b);
drop(st_c);
let consumer =
Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open");
assert_eq!(
consumer.manifest_id(),
3,
"three concurrent commits must result in manifest_id = 3"
);
assert_eq!(consumer.reader().n_superfiles(), 3);
let reader = consumer.reader();
let segs = reader.manifest().get_all_superfiles();
let uris: std::collections::HashSet<_> = segs.iter().map(|s| s.uri.0).collect();
assert_eq!(
uris.len(),
segs.len(),
"every superfile carries a distinct URI"
);
assert!(
segs.len() >= 3,
"expected ≥ 3 superfiles from three writers; got {}",
segs.len()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn retry_winner_sees_loser_superfiles_in_final_manifest() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st_a = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let st_b = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let t_a = tokio::task::spawn_blocking({
let st = st_a.clone();
move || {
let mut w = st.writer().expect("writer A");
w.append(&build_title_batch(&["alpha"])).expect("append A");
w.commit().expect("commit A");
}
});
let t_b = tokio::task::spawn_blocking({
let st = st_b.clone();
move || {
let mut w = st.writer().expect("writer B");
w.append(&build_title_batch(&["beta"])).expect("append B");
w.commit().expect("commit B");
}
});
t_a.await.expect("task A");
t_b.await.expect("task B");
for st in [&st_a, &st_b] {
let r = st.reader();
if r.manifest_id() == 2 {
assert_eq!(
r.n_superfiles(),
2,
"v2 handle must see both superfiles (its own + winner's)"
);
} else if r.manifest_id() == 1 {
assert_eq!(
r.n_superfiles(),
1,
"v1 handle (winner) sees only its own superfile"
);
} else {
panic!("unexpected manifest_id: {}", r.manifest_id());
}
}
}
#[test]
fn sequential_commits_across_handles_no_retry_needed() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st_a = Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
{
let mut w = st_a.writer().expect("writer A");
w.append(&build_title_batch(&["first"])).expect("append A");
w.commit().expect("commit A");
}
assert_eq!(st_a.manifest_id(), 1);
drop(st_a);
let st_b = Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open B");
assert_eq!(st_b.manifest_id(), 1, "B opens at A's last manifest_id");
{
let mut w = st_b.writer().expect("writer B");
w.append(&build_title_batch(&["second"])).expect("append B");
w.commit().expect("commit B");
}
assert_eq!(st_b.manifest_id(), 2);
assert_eq!(
st_b.reader().n_superfiles(),
2,
"B sees both A's and B's superfiles"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn max_commit_retries_is_plumbed_through_options() {
{
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let opts = default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_max_commit_retries(MAX_RETRIES_SENTINEL);
assert_eq!(opts.max_commit_retries, MAX_RETRIES_SENTINEL);
}
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st_a = Supertable::create(
default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_max_commit_retries(CONTENTION_MAX_RETRIES),
)
.expect("create");
let st_b = Supertable::create(
default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_max_commit_retries(CONTENTION_MAX_RETRIES),
)
.expect("create");
let t_a = tokio::task::spawn_blocking({
let st = st_a.clone();
move || {
let mut w = st.writer().expect("writer A");
w.append(&build_title_batch(&["alpha"])).expect("append A");
w.commit().expect("commit A");
}
});
let t_b = tokio::task::spawn_blocking({
let st = st_b.clone();
move || {
let mut w = st.writer().expect("writer B");
w.append(&build_title_batch(&["beta"])).expect("append B");
w.commit().expect("commit B");
}
});
t_a.await.expect("task A");
t_b.await.expect("task B");
drop(st_a);
drop(st_b);
let consumer =
Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open");
assert_eq!(consumer.manifest_id(), 2);
}