#![deny(clippy::unwrap_used)]
use std::{collections::HashSet, sync::Arc, thread};
use arrow_array::{LargeStringArray, RecordBatch};
use infino::{
supertable::{
Supertable,
reader_cache::{ColdFetchMode, DiskCacheConfig, DiskCacheStore, LruPolicy},
storage::{LocalFsStorageProvider, StorageProvider},
utils::idgen::IdGenerator,
},
test_helpers::{default_supertable_options, schema_id_title},
};
const SHARED_WORKER_ID: u64 = 0xABCD;
const SAME_WORKER_MINT_COUNT: usize = 10_000;
use tempfile::TempDir;
const STRESS_N_WORKERS: usize = 16;
const STRESS_IDS_PER_WORKER: usize = 100_000;
#[test]
fn stress_16_generators_each_100k_ids_all_globally_unique() {
let handles: Vec<thread::JoinHandle<Vec<i128>>> = (0..STRESS_N_WORKERS)
.map(|_| {
thread::spawn(|| {
let g = IdGenerator::new();
(0..STRESS_IDS_PER_WORKER).map(|_| g.next_id()).collect()
})
})
.collect();
let mut all: HashSet<i128> = HashSet::with_capacity(STRESS_N_WORKERS * STRESS_IDS_PER_WORKER);
for h in handles {
let ids = h.join().expect("worker thread panicked");
assert_eq!(ids.len(), STRESS_IDS_PER_WORKER);
for id in ids {
assert!(
all.insert(id),
"duplicate id across workers: {id} — birthday collision \
on the 40-bit random worker_id would be the most likely \
cause, expected probability ~1.1e-10 at 16 workers"
);
}
}
assert_eq!(all.len(), STRESS_N_WORKERS * STRESS_IDS_PER_WORKER);
}
#[test]
fn stress_two_generators_with_explicit_same_worker_id_still_unique_within_one_run() {
let g1 = IdGenerator::with_worker_id(SHARED_WORKER_ID);
let g2 = IdGenerator::with_worker_id(SHARED_WORKER_ID);
let n = SAME_WORKER_MINT_COUNT;
let ids1: Vec<i128> = (0..n).map(|_| g1.next_id()).collect();
let ids2: Vec<i128> = (0..n).map(|_| g2.next_id()).collect();
for w in ids1.windows(2) {
assert!(w[0] < w[1]);
}
for w in ids2.windows(2) {
assert!(w[0] < w[1]);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn four_handles_to_shared_storage_produce_globally_unique_ids() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
const N_HANDLES: usize = 4;
const ROWS_PER_HANDLE: u64 = 100;
let mut tasks = Vec::with_capacity(N_HANDLES);
for handle_idx in 0..N_HANDLES {
let storage = Arc::clone(&storage);
tasks.push(tokio::task::spawn_blocking(move || {
let st = Supertable::create(default_supertable_options().with_storage(storage))
.expect("create");
let mut w = st.writer().expect("writer");
let titles: Vec<String> = (0..ROWS_PER_HANDLE)
.map(|i| format!("h{handle_idx}_doc{i}"))
.collect();
let titles_refs: Vec<&str> = titles.iter().map(String::as_str).collect();
let batch = RecordBatch::try_new(
schema_id_title(),
vec![Arc::new(LargeStringArray::from(titles_refs))],
)
.expect("batch");
w.append(&batch).expect("append");
w.commit().expect("commit");
}));
}
for t in tasks {
t.await.expect("task");
}
let cache_dir = TempDir::new().expect("cache tempdir");
let cfg = DiskCacheConfig {
cache_root: cache_dir.path().to_path_buf(),
disk_budget_bytes: 1 << 30,
cold_fetch_mode: ColdFetchMode::HybridWithPrefetch,
cold_fetch_streams: 4,
cold_fetch_chunk_bytes: 1 << 20,
mmap_cold_threshold_secs: 0,
mmap_sweep_interval_secs: 0,
eviction: Box::new(LruPolicy::new()),
verify_crc_on_open: true,
..Default::default()
};
let pinned_fn: Arc<dyn Fn() -> HashSet<_> + Send + Sync> = Arc::new(HashSet::new);
let cache = DiskCacheStore::new(Arc::clone(&storage), cfg, pinned_fn).expect("cache");
let consumer = Supertable::open(
default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_disk_cache(Arc::clone(&cache)),
)
.expect("open");
let reader = consumer.reader();
let segs = reader.manifest().get_all_superfiles();
assert_eq!(
segs.len(),
N_HANDLES,
"expected one superfile per handle; got {}",
segs.len()
);
let batches = consumer
.reader()
.query_sql("SELECT _id FROM supertable")
.expect("query _id");
let mut all: HashSet<i128> = HashSet::with_capacity(N_HANDLES * ROWS_PER_HANDLE as usize);
for b in &batches {
let col = b
.column(0)
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.expect("_id is Decimal128");
for i in 0..col.len() {
let id = col.value(i);
assert!(all.insert(id), "duplicate _id minted across handles: {id}");
}
}
let expected = N_HANDLES * ROWS_PER_HANDLE as usize;
assert_eq!(
all.len(),
expected,
"expected {expected} distinct ids, got {}",
all.len()
);
let total_rows: u64 = segs.iter().map(|s| s.n_docs).sum();
assert_eq!(total_rows, N_HANDLES as u64 * ROWS_PER_HANDLE);
}