1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team
//! ID allocation for vertices and edges using pure auto-increment counters.
//!
//! VIDs and EIDs are simple auto-incrementing u64 values. Unlike the previous
//! design, they no longer embed label/type information - that's now handled
//! by the VidLabelsIndex and edge tables.
use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
use anyhow::Result;
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectStore, PutMode, PutOptions, UpdateVersion};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use uni_common::core::id::{Eid, Vid};
/// Persisted counter manifest - stores the reserved counter ranges.
#[derive(Serialize, Deserialize, Default, Clone)]
struct CounterManifest {
/// Next VID value that needs to be reserved (end of current batch)
next_vid_batch: u64,
/// Next EID value that needs to be reserved (end of current batch)
next_eid_batch: u64,
}
/// Internal allocator state - tracks current position within reserved batch.
struct AllocatorState {
manifest: CounterManifest,
manifest_version: Option<String>, // ETag for optimistic locking
current_vid: u64,
current_eid: u64,
}
/// Allocates globally unique VIDs and EIDs using auto-increment counters.
///
/// This allocator uses batch reservation to minimize object store writes:
/// - Reserves a batch of IDs (e.g., 1000) from the object store
/// - Allocates from the local batch until exhausted
/// - Reserves a new batch when needed
pub struct IdAllocator {
store: Arc<dyn ObjectStore>,
path: Path,
state: Mutex<AllocatorState>,
batch_size: u64,
}
impl IdAllocator {
/// Creates a new ID allocator, loading existing state from object store.
pub async fn new(store: Arc<dyn ObjectStore>, path: Path, batch_size: u64) -> Result<Self> {
let (manifest, version) = match get_with_timeout(&store, &path, DEFAULT_TIMEOUT).await {
Ok(get_result) => {
let version = get_result.meta.e_tag.clone();
let bytes = get_result.bytes().await?;
let manifest: CounterManifest = serde_json::from_slice(&bytes)?;
(manifest, version)
}
Err(e) if e.to_string().contains("not found") => (CounterManifest::default(), None),
Err(e) => return Err(e),
};
// Start allocating from where the last batch ended
let current_vid = manifest.next_vid_batch;
let current_eid = manifest.next_eid_batch;
Ok(Self {
store,
path,
state: Mutex::new(AllocatorState {
manifest,
manifest_version: version,
current_vid,
current_eid,
}),
batch_size,
})
}
/// Allocates a new VID.
///
/// Returns a globally unique, auto-incrementing vertex ID.
pub async fn allocate_vid(&self) -> Result<Vid> {
let mut state = self.state.lock().await;
// Check if we've exhausted our current batch
if state.current_vid >= state.manifest.next_vid_batch {
// Reserve a new batch. `checked_add` guards against u64
// exhaustion (defense-in-depth; ~1.8e19 ids — physically
// unreachable, but wrapping silently would be a correctness
// disaster). L13.
state.manifest.next_vid_batch = state
.current_vid
.checked_add(self.batch_size)
.ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
self.persist_manifest(&mut state).await?;
}
let vid = Vid::new(state.current_vid);
state.current_vid += 1;
Ok(vid)
}
/// Allocates multiple VIDs at once.
pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
let mut state = self.state.lock().await;
let needed = count as u64;
// Check if we need to expand our batch (L13: checked).
let want = state
.current_vid
.checked_add(needed)
.ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
if want > state.manifest.next_vid_batch {
// Reserve enough for the request plus a full batch
state.manifest.next_vid_batch = want
.checked_add(self.batch_size)
.ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
self.persist_manifest(&mut state).await?;
}
let vids: Vec<Vid> = (0..count)
.map(|i| Vid::new(state.current_vid + i as u64))
.collect();
state.current_vid += needed;
Ok(vids)
}
/// Allocates a new EID.
///
/// Returns a globally unique, auto-incrementing edge ID.
pub async fn allocate_eid(&self) -> Result<Eid> {
let mut state = self.state.lock().await;
// Check if we've exhausted our current batch (L13: checked).
if state.current_eid >= state.manifest.next_eid_batch {
// Reserve a new batch
state.manifest.next_eid_batch = state
.current_eid
.checked_add(self.batch_size)
.ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
self.persist_manifest(&mut state).await?;
}
let eid = Eid::new(state.current_eid);
state.current_eid += 1;
Ok(eid)
}
/// Allocates multiple EIDs at once.
pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
let mut state = self.state.lock().await;
let needed = count as u64;
// Check if we need to expand our batch (L13: checked).
let want = state
.current_eid
.checked_add(needed)
.ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
if want > state.manifest.next_eid_batch {
// Reserve enough for the request plus a full batch
state.manifest.next_eid_batch = want
.checked_add(self.batch_size)
.ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
self.persist_manifest(&mut state).await?;
}
let eids: Vec<Eid> = (0..count)
.map(|i| Eid::new(state.current_eid + i as u64))
.collect();
state.current_eid += needed;
Ok(eids)
}
/// Returns the current VID counter value (next VID that would be allocated).
pub async fn current_vid(&self) -> u64 {
self.state.lock().await.current_vid
}
/// Returns the current EID counter value (next EID that would be allocated).
pub async fn current_eid(&self) -> u64 {
self.state.lock().await.current_eid
}
/// Snapshot the current high-water-marks for VID and EID.
///
/// Returns `(next_vid, next_eid)` — the values the next
/// allocations would produce if not constrained by batch
/// reservation. Used by Phase 2 fork-creation to bootstrap a
/// fork's allocator above primary's range without going through
/// disk (the primary and fork allocators may live on different
/// `ObjectStore` instances, making file-copy bootstrap fragile).
pub async fn current_hwm(&self) -> (u64, u64) {
let state = self.state.lock().await;
(state.current_vid, state.current_eid)
}
/// Force a checkpoint of the in-memory state to the underlying
/// object store.
///
/// Used by Phase 2 fork creation to bootstrap a fork's allocator
/// from primary's *current* HWM. Without this, primary's allocator
/// has an in-memory state that the on-disk manifest doesn't yet
/// reflect (the disk file is only updated on batch-boundary
/// crossings), and the fork would start at VID 0 — colliding with
/// primary rows visible through the `base_paths` chain.
///
/// Idempotent and safe to call frequently; the persisted manifest
/// reflects the same state if nothing has changed.
///
/// # Errors
///
/// Returns the underlying [`anyhow::Error`] from `persist_manifest`
/// (object-store put failure).
pub async fn checkpoint(&self) -> Result<()> {
let mut state = self.state.lock().await;
// Advance the persisted batch HWM to at least the current
// allocation cursor so reloads start above any allocated VIDs.
if state.manifest.next_vid_batch < state.current_vid {
state.manifest.next_vid_batch = state.current_vid;
}
if state.manifest.next_eid_batch < state.current_eid {
state.manifest.next_eid_batch = state.current_eid;
}
self.persist_manifest(&mut state).await
}
/// Persists the counter manifest to object store with optimistic locking.
async fn persist_manifest(&self, state: &mut AllocatorState) -> Result<()> {
let json = serde_json::to_vec_pretty(&state.manifest)?;
let bytes = Bytes::from(json);
// Try conditional put first, fall back to unconditional if not supported
// (LocalFileSystem doesn't support ETag-based conditional puts)
let put_result = if let Some(version) = &state.manifest_version {
let opts: PutOptions = PutMode::Update(UpdateVersion {
e_tag: Some(version.clone()),
version: None,
})
.into();
match tokio::time::timeout(
DEFAULT_TIMEOUT,
self.store.put_opts(&self.path, bytes.clone().into(), opts),
)
.await
{
Ok(Ok(result)) => result,
Ok(Err(e))
if e.to_string().contains("not yet implemented")
|| e.to_string().contains("not supported") =>
{
// LocalFileSystem doesn't support conditional puts, use regular put
put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
}
Ok(Err(e)) => return Err(e.into()),
Err(_) => {
return Err(anyhow::anyhow!(
"Object store put_opts timed out after {:?}",
DEFAULT_TIMEOUT
));
}
}
} else {
// No version yet, try create mode, fall back to regular put
let opts: PutOptions = PutMode::Create.into();
match tokio::time::timeout(
DEFAULT_TIMEOUT,
self.store.put_opts(&self.path, bytes.clone().into(), opts),
)
.await
{
Ok(Ok(result)) => result,
Ok(Err(object_store::Error::AlreadyExists { .. })) => {
// Another process created it, just overwrite
put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
}
Ok(Err(e)) if e.to_string().contains("not yet implemented") => {
put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
}
Ok(Err(e)) => return Err(e.into()),
Err(_) => {
return Err(anyhow::anyhow!(
"Object store put_opts timed out after {:?}",
DEFAULT_TIMEOUT
));
}
}
};
state.manifest_version = put_result.e_tag;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::memory::InMemory;
#[tokio::test]
async fn test_allocate_vid() {
let store = Arc::new(InMemory::new());
let path = Path::from("id_counters.json");
let allocator = IdAllocator::new(store, path, 100).await.unwrap();
let vid1 = allocator.allocate_vid().await.unwrap();
let vid2 = allocator.allocate_vid().await.unwrap();
let vid3 = allocator.allocate_vid().await.unwrap();
assert_eq!(vid1.as_u64(), 0);
assert_eq!(vid2.as_u64(), 1);
assert_eq!(vid3.as_u64(), 2);
}
#[tokio::test]
async fn test_allocate_eid() {
let store = Arc::new(InMemory::new());
let path = Path::from("id_counters.json");
let allocator = IdAllocator::new(store, path, 100).await.unwrap();
let eid1 = allocator.allocate_eid().await.unwrap();
let eid2 = allocator.allocate_eid().await.unwrap();
assert_eq!(eid1.as_u64(), 0);
assert_eq!(eid2.as_u64(), 1);
}
#[tokio::test]
async fn test_allocate_many() {
let store = Arc::new(InMemory::new());
let path = Path::from("id_counters.json");
let allocator = IdAllocator::new(store, path, 100).await.unwrap();
let vids = allocator.allocate_vids(5).await.unwrap();
assert_eq!(vids.len(), 5);
for (i, vid) in vids.iter().enumerate() {
assert_eq!(vid.as_u64(), i as u64);
}
// Next allocation should continue from 5
let next = allocator.allocate_vid().await.unwrap();
assert_eq!(next.as_u64(), 5);
}
#[tokio::test]
async fn test_persistence() {
let store = Arc::new(InMemory::new());
let path = Path::from("id_counters.json");
// Allocate some IDs
{
let allocator = IdAllocator::new(store.clone(), path.clone(), 10)
.await
.unwrap();
for _ in 0..15 {
allocator.allocate_vid().await.unwrap();
}
}
// Re-open and verify continuation
{
let allocator = IdAllocator::new(store, path, 10).await.unwrap();
// After allocating 15 IDs with batch size 10, we reserved up to 20
// So next allocation should be 20 (start of new batch after reload)
let vid = allocator.allocate_vid().await.unwrap();
assert_eq!(vid.as_u64(), 20);
}
}
}