use crate::runtime::id_allocator::IdAllocator;
use anyhow::Result;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use uni_common::core::id::{Eid, Vid};
pub const DEFAULT_RESERVOIR_BATCH: usize = 16;
pub struct TxIdReservoir {
allocator: Arc<IdAllocator>,
state: Mutex<ReservoirState>,
batch_size: usize,
}
struct ReservoirState {
vids: VecDeque<Vid>,
eids: VecDeque<Eid>,
}
impl TxIdReservoir {
pub fn new(allocator: Arc<IdAllocator>, batch_size: usize) -> Self {
let batch_size = if batch_size == 0 {
DEFAULT_RESERVOIR_BATCH
} else {
batch_size
};
Self {
allocator,
state: Mutex::new(ReservoirState {
vids: VecDeque::with_capacity(batch_size),
eids: VecDeque::with_capacity(batch_size),
}),
batch_size,
}
}
pub async fn next_vid(&self) -> Result<Vid> {
if let Some(v) = self.state.lock().vids.pop_front() {
return Ok(v);
}
let mut batch = self.allocator.allocate_vids(self.batch_size).await?;
let first = batch.remove(0);
let mut st = self.state.lock();
st.vids.extend(batch);
Ok(first)
}
pub async fn next_eid(&self) -> Result<Eid> {
if let Some(e) = self.state.lock().eids.pop_front() {
return Ok(e);
}
let mut batch = self.allocator.allocate_eids(self.batch_size).await?;
let first = batch.remove(0);
let mut st = self.state.lock();
st.eids.extend(batch);
Ok(first)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::tempdir;
async fn make_allocator() -> (tempfile::TempDir, Arc<IdAllocator>) {
use object_store::local::LocalFileSystem;
use object_store::path::Path;
let dir = tempdir().unwrap();
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
let path = Path::from("id_allocator.json");
let alloc = Arc::new(IdAllocator::new(store, path, 10_000).await.unwrap());
(dir, alloc)
}
#[tokio::test]
async fn next_vid_amortizes_global_mutex() -> Result<()> {
let (_dir, alloc) = make_allocator().await;
let r = TxIdReservoir::new(alloc, 16);
let mut vids = Vec::new();
for _ in 0..100 {
vids.push(r.next_vid().await?);
}
let unique: std::collections::HashSet<_> = vids.iter().copied().collect();
assert_eq!(unique.len(), 100, "all VIDs must be unique");
for i in 1..vids.len() {
assert!(
vids[i].as_u64() > vids[i - 1].as_u64(),
"VIDs must be monotonically increasing"
);
}
Ok(())
}
#[tokio::test]
async fn next_eid_amortizes_global_mutex() -> Result<()> {
let (_dir, alloc) = make_allocator().await;
let r = TxIdReservoir::new(alloc, 16);
let mut eids = Vec::new();
for _ in 0..100 {
eids.push(r.next_eid().await?);
}
let unique: std::collections::HashSet<_> = eids.iter().copied().collect();
assert_eq!(unique.len(), 100);
for i in 1..eids.len() {
assert!(eids[i].as_u64() > eids[i - 1].as_u64());
}
Ok(())
}
#[tokio::test]
async fn zero_batch_size_falls_back_to_default() -> Result<()> {
let (_dir, alloc) = make_allocator().await;
let r = TxIdReservoir::new(alloc, 0);
let _vid = r.next_vid().await?;
let _eid = r.next_eid().await?;
let _ = AtomicUsize::new(0).fetch_add(1, Ordering::Relaxed);
Ok(())
}
}