use std::sync::Arc;
use anyhow::Result;
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use uni_common::core::fork::ForkId;
use crate::runtime::id_allocator::IdAllocator;
pub const DEFAULT_FORK_BATCH_SIZE: u64 = 1000;
#[must_use]
pub fn id_allocator_path(fork_id: &ForkId) -> ObjectStorePath {
ObjectStorePath::from(format!("catalog/forks/{fork_id}/id_allocator.json"))
}
pub async fn new_for_fork(
store: Arc<dyn ObjectStore>,
fork_id: &ForkId,
batch_size: u64,
) -> Result<IdAllocator> {
let path = id_allocator_path(fork_id);
IdAllocator::new(store, path, batch_size).await
}
pub async fn new_for_fork_arc(
store: Arc<dyn ObjectStore>,
fork_id: &ForkId,
batch_size: u64,
) -> Result<Arc<IdAllocator>> {
Ok(Arc::new(new_for_fork(store, fork_id, batch_size).await?))
}
pub async fn bootstrap_fork_from_primary_hwm(
store: Arc<dyn ObjectStore>,
fork_id: &ForkId,
vid_hwm: u64,
eid_hwm: u64,
) -> Result<()> {
use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
let target = id_allocator_path(fork_id);
if get_with_timeout(&store, &target, DEFAULT_TIMEOUT)
.await
.is_ok()
{
return Ok(());
}
#[derive(serde::Serialize)]
struct CounterManifestSnapshot {
next_vid_batch: u64,
next_eid_batch: u64,
}
let manifest = CounterManifestSnapshot {
next_vid_batch: vid_hwm,
next_eid_batch: eid_hwm,
};
let bytes = serde_json::to_vec_pretty(&manifest)?;
put_with_timeout(&store, &target, bytes::Bytes::from(bytes), DEFAULT_TIMEOUT).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::local::LocalFileSystem;
use tempfile::TempDir;
async fn fresh_store() -> (TempDir, Arc<dyn ObjectStore>) {
let dir = TempDir::new().unwrap();
let store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
(dir, store)
}
#[tokio::test]
async fn path_includes_fork_id_under_catalog_forks() {
let id = ForkId::new();
let p = id_allocator_path(&id);
let s = p.to_string();
assert!(s.starts_with("catalog/forks/"));
assert!(s.ends_with("/id_allocator.json"));
assert!(s.contains(&id.to_string()));
}
#[tokio::test]
async fn fresh_allocator_starts_from_zero() {
let (_dir, store) = fresh_store().await;
let id = ForkId::new();
let alloc = new_for_fork(store, &id, DEFAULT_FORK_BATCH_SIZE)
.await
.unwrap();
let v = alloc.allocate_vid().await.unwrap();
assert_eq!(u64::from(v), 0, "fresh fork allocator starts at VID 0");
}
#[tokio::test]
async fn two_forks_have_independent_vid_streams() {
let (_dir, store) = fresh_store().await;
let id_a = ForkId::new();
let id_b = ForkId::new();
let alloc_a = new_for_fork(store.clone(), &id_a, DEFAULT_FORK_BATCH_SIZE)
.await
.unwrap();
let alloc_b = new_for_fork(store.clone(), &id_b, DEFAULT_FORK_BATCH_SIZE)
.await
.unwrap();
let a_first = alloc_a.allocate_vid().await.unwrap();
let b_first = alloc_b.allocate_vid().await.unwrap();
assert_eq!(u64::from(a_first), 0);
assert_eq!(u64::from(b_first), 0);
let a_next = alloc_a.allocate_vid().await.unwrap();
let b_next = alloc_b.allocate_vid().await.unwrap();
assert_eq!(u64::from(a_next), 1);
assert_eq!(u64::from(b_next), 1);
}
#[tokio::test]
async fn allocator_persists_across_reopen() {
let (_dir, store) = fresh_store().await;
let id = ForkId::new();
{
let alloc = new_for_fork(store.clone(), &id, DEFAULT_FORK_BATCH_SIZE)
.await
.unwrap();
for _ in 0..3 {
alloc.allocate_vid().await.unwrap();
}
}
let alloc2 = new_for_fork(store, &id, DEFAULT_FORK_BATCH_SIZE)
.await
.unwrap();
let next = alloc2.allocate_vid().await.unwrap();
assert!(
u64::from(next) >= DEFAULT_FORK_BATCH_SIZE,
"reopened allocator must skip past previous batch's HWM; got {}",
u64::from(next)
);
}
#[tokio::test]
async fn primary_id_allocator_unaffected_by_fork_allocator() {
let (_dir, store) = fresh_store().await;
let id = ForkId::new();
let fork_alloc = new_for_fork(store.clone(), &id, DEFAULT_FORK_BATCH_SIZE)
.await
.unwrap();
for _ in 0..5 {
fork_alloc.allocate_vid().await.unwrap();
}
let primary = IdAllocator::new(
store,
ObjectStorePath::from("id_allocator.json"),
DEFAULT_FORK_BATCH_SIZE,
)
.await
.unwrap();
let primary_first = primary.allocate_vid().await.unwrap();
assert_eq!(
u64::from(primary_first),
0,
"primary allocator must not see any fork-side allocations"
);
}
}