#![deny(clippy::unwrap_used)]
use std::{
sync::{Arc, Barrier},
thread,
time::{Duration, Instant},
};
use arrow_array::{LargeStringArray, RecordBatch};
use infino::{
supertable::{Supertable, SupertableOptions},
test_helpers::{default_supertable_options, schema_id_title},
};
const CONTENDER_THREADS: usize = 4;
const STRESS_COMMIT_COUNT: u64 = 50;
const STRESS_READER_THREADS: usize = 16;
const STRESS_PINS_PER_READER: usize = 200;
fn options() -> SupertableOptions {
default_supertable_options()
}
fn build_batch(start: u64, n: usize) -> RecordBatch {
let titles = LargeStringArray::from(
(0..n)
.map(|i| format!("doc {} title", start + i as u64))
.collect::<Vec<_>>(),
);
RecordBatch::try_new(schema_id_title(), vec![Arc::new(titles)]).expect("batch")
}
#[test]
fn reader_pinned_before_writer_starts_never_sees_commits() {
let st = Supertable::create(options()).expect("create");
let pinned = st.reader();
assert_eq!(pinned.manifest_id(), 0);
assert_eq!(pinned.n_superfiles(), 0);
let st_for_writer = st.clone();
let writer_handle = thread::spawn(move || {
let mut w = st_for_writer.writer().expect("writer");
for i in 0..5u64 {
w.append(&build_batch(i * 10, 3)).expect("append");
w.commit().expect("commit");
thread::sleep(Duration::from_millis(2));
}
drop(w);
});
let deadline = Instant::now() + Duration::from_millis(200);
while Instant::now() < deadline {
assert_eq!(
pinned.manifest_id(),
0,
"pinned reader's manifest_id moved while writer ran",
);
assert_eq!(
pinned.n_superfiles(),
0,
"pinned reader's superfile count grew while writer ran",
);
}
writer_handle.join().expect("writer thread joined");
assert_eq!(pinned.manifest_id(), 0);
assert_eq!(pinned.n_superfiles(), 0);
let fresh = st.reader();
assert_eq!(fresh.manifest_id(), 5);
assert_eq!(fresh.n_superfiles(), 5);
assert_eq!(fresh.n_docs_total(), 5 * 3);
}
#[test]
fn reader_obtained_after_writer_finishes_sees_full_state() {
let st = Supertable::create(options()).expect("create");
let mut w = st.writer().expect("writer");
for i in 0..3u64 {
w.append(&build_batch(i * 10, 4)).expect("append");
w.commit().expect("commit");
}
drop(w);
let r = st.reader();
assert_eq!(r.manifest_id(), 3);
assert_eq!(r.n_superfiles(), 3);
assert_eq!(r.n_docs_total(), 12);
}
#[test]
fn pinned_reader_holds_arc_across_subsequent_commits() {
let st = Supertable::create(options()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, 2)).expect("a1");
w.commit().expect("c1");
let r1 = st.reader();
let r1_arc = Arc::clone(r1.manifest());
assert_eq!(r1.manifest_id(), 1);
w.append(&build_batch(10, 2)).expect("a2");
w.commit().expect("c2");
w.append(&build_batch(20, 2)).expect("a3");
w.commit().expect("commit");
assert_eq!(r1.manifest_id(), 1);
assert!(
Arc::ptr_eq(&r1_arc, r1.manifest()),
"pinned reader's manifest Arc must retain identity",
);
let r2 = st.reader();
assert_eq!(r2.manifest_id(), 3);
assert!(!Arc::ptr_eq(r1.manifest(), r2.manifest()));
}
#[test]
fn concurrent_readers_at_same_commit_share_arc_pointer() {
let st = Supertable::create(options()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&build_batch(0, 5)).expect("a1");
w.commit().expect("c1");
drop(w);
let barrier = Arc::new(Barrier::new(CONTENDER_THREADS));
let st = Arc::new(st);
let mut handles = Vec::new();
for _ in 0..CONTENDER_THREADS {
let st = Arc::clone(&st);
let bar = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
bar.wait();
let r = st.reader();
Arc::clone(r.manifest())
}));
}
let manifests: Vec<Arc<_>> = handles
.into_iter()
.map(|h| h.join().expect("thread"))
.collect();
let head = &manifests[0];
for m in &manifests[1..] {
assert!(
Arc::ptr_eq(head, m),
"readers pinned without an interleaving commit must share Arc"
);
}
}
#[test]
fn manifest_id_monotonic_across_serially_pinned_readers() {
let st = Supertable::create(options()).expect("create");
let mut w = st.writer().expect("writer");
let mut observed: Vec<u64> = Vec::new();
observed.push(st.reader().manifest_id());
for i in 0..5u64 {
w.append(&build_batch(i * 10, 2)).expect("append");
w.commit().expect("commit");
observed.push(st.reader().manifest_id());
}
drop(w);
for w in observed.windows(2) {
assert!(w[0] <= w[1], "manifest_id regressed: {observed:?}");
}
assert_eq!(observed.first(), Some(&0));
assert_eq!(observed.last(), Some(&5));
}
#[test]
fn many_concurrent_readers_during_writer_commits_no_inconsistencies() {
let st = Supertable::create(options()).expect("create");
let n_commits = STRESS_COMMIT_COUNT;
let n_readers = STRESS_READER_THREADS;
let pins_per_reader = STRESS_PINS_PER_READER;
let st_for_writer = st.clone();
let writer = thread::spawn(move || {
let mut w = st_for_writer.writer().expect("writer");
for i in 0..n_commits {
w.append(&build_batch(i * 10, 2)).expect("append");
w.commit().expect("commit");
}
drop(w);
});
let st_arc = Arc::new(st);
let mut reader_handles = Vec::with_capacity(n_readers);
for _ in 0..n_readers {
let st = Arc::clone(&st_arc);
reader_handles.push(thread::spawn(move || {
let mut max_seen: u64 = 0;
for _ in 0..pins_per_reader {
let r = st.reader();
let id_before = r.manifest_id();
let n_before = r.n_superfiles();
std::hint::black_box(&r);
let id_after = r.manifest_id();
let n_after = r.n_superfiles();
assert_eq!(
id_before, id_after,
"pinned reader observed manifest_id change mid-hold",
);
assert_eq!(
n_before, n_after,
"pinned reader observed n_superfiles change mid-hold",
);
if id_before > max_seen {
max_seen = id_before;
}
}
max_seen
}));
}
writer.join().expect("writer joined");
let final_r = st_arc.reader();
assert_eq!(final_r.manifest_id(), n_commits);
assert_eq!(final_r.n_superfiles(), n_commits as usize);
for h in reader_handles {
let max_seen = h.join().expect("reader joined");
assert!(max_seen <= n_commits, "reader saw impossible manifest_id");
}
}
#[test]
fn fresh_reader_sequence_taken_during_concurrent_commits_is_monotonic() {
let st = Supertable::create(options()).expect("create");
let n_commits = 30u64;
let st_for_writer = st.clone();
let writer = thread::spawn(move || {
let mut w = st_for_writer.writer().expect("writer");
for i in 0..n_commits {
w.append(&build_batch(i * 10, 2)).expect("append");
w.commit().expect("commit");
}
drop(w);
});
let mut samples: Vec<u64> = Vec::new();
let deadline = Instant::now() + Duration::from_millis(500);
while Instant::now() < deadline {
samples.push(st.reader().manifest_id());
if samples.last() == Some(&n_commits) {
break;
}
}
writer.join().expect("writer joined");
samples.push(st.reader().manifest_id());
for w in samples.windows(2) {
assert!(
w[0] <= w[1],
"fresh-reader sequence regressed: {} -> {}",
w[0],
w[1],
);
}
assert_eq!(*samples.last().expect("≥1 sample"), n_commits);
}