Skip to main content

uni_store/runtime/
id_allocator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! ID allocation for vertices and edges using pure auto-increment counters.
5//!
6//! VIDs and EIDs are simple auto-incrementing u64 values. Unlike the previous
7//! design, they no longer embed label/type information - that's now handled
8//! by the VidLabelsIndex and edge tables.
9
10use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
11use anyhow::Result;
12use bytes::Bytes;
13use object_store::path::Path;
14use object_store::{ObjectStore, PutMode, PutOptions, UpdateVersion};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use uni_common::core::id::{Eid, Vid};
19
20/// Persisted counter manifest - stores the reserved counter ranges.
21#[derive(Serialize, Deserialize, Default, Clone)]
22struct CounterManifest {
23    /// Next VID value that needs to be reserved (end of current batch)
24    next_vid_batch: u64,
25    /// Next EID value that needs to be reserved (end of current batch)
26    next_eid_batch: u64,
27}
28
29/// Internal allocator state - tracks current position within reserved batch.
30struct AllocatorState {
31    manifest: CounterManifest,
32    manifest_version: Option<String>, // ETag for optimistic locking
33    current_vid: u64,
34    current_eid: u64,
35}
36
37/// Allocates globally unique VIDs and EIDs using auto-increment counters.
38///
39/// This allocator uses batch reservation to minimize object store writes:
40/// - Reserves a batch of IDs (e.g., 1000) from the object store
41/// - Allocates from the local batch until exhausted
42/// - Reserves a new batch when needed
43pub struct IdAllocator {
44    store: Arc<dyn ObjectStore>,
45    path: Path,
46    state: Mutex<AllocatorState>,
47    batch_size: u64,
48}
49
50impl IdAllocator {
51    /// Creates a new ID allocator, loading existing state from object store.
52    pub async fn new(store: Arc<dyn ObjectStore>, path: Path, batch_size: u64) -> Result<Self> {
53        let (manifest, version) = match get_with_timeout(&store, &path, DEFAULT_TIMEOUT).await {
54            Ok(get_result) => {
55                let version = get_result.meta.e_tag.clone();
56                let bytes = get_result.bytes().await?;
57                let manifest: CounterManifest = serde_json::from_slice(&bytes)?;
58                (manifest, version)
59            }
60            Err(e) if e.to_string().contains("not found") => (CounterManifest::default(), None),
61            Err(e) => return Err(e),
62        };
63
64        // Start allocating from where the last batch ended
65        let current_vid = manifest.next_vid_batch;
66        let current_eid = manifest.next_eid_batch;
67
68        Ok(Self {
69            store,
70            path,
71            state: Mutex::new(AllocatorState {
72                manifest,
73                manifest_version: version,
74                current_vid,
75                current_eid,
76            }),
77            batch_size,
78        })
79    }
80
81    /// Allocates a new VID.
82    ///
83    /// Returns a globally unique, auto-incrementing vertex ID.
84    pub async fn allocate_vid(&self) -> Result<Vid> {
85        let mut state = self.state.lock().await;
86
87        // Check if we've exhausted our current batch
88        if state.current_vid >= state.manifest.next_vid_batch {
89            // Reserve a new batch. `checked_add` guards against u64
90            // exhaustion (defense-in-depth; ~1.8e19 ids — physically
91            // unreachable, but wrapping silently would be a correctness
92            // disaster). L13.
93            state.manifest.next_vid_batch = state
94                .current_vid
95                .checked_add(self.batch_size)
96                .ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
97            self.persist_manifest(&mut state).await?;
98        }
99
100        let vid = Vid::new(state.current_vid);
101        state.current_vid += 1;
102        Ok(vid)
103    }
104
105    /// Allocates multiple VIDs at once.
106    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
107        let mut state = self.state.lock().await;
108        let needed = count as u64;
109
110        // Check if we need to expand our batch (L13: checked).
111        let want = state
112            .current_vid
113            .checked_add(needed)
114            .ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
115        if want > state.manifest.next_vid_batch {
116            // Reserve enough for the request plus a full batch
117            state.manifest.next_vid_batch = want
118                .checked_add(self.batch_size)
119                .ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
120            self.persist_manifest(&mut state).await?;
121        }
122
123        let vids: Vec<Vid> = (0..count)
124            .map(|i| Vid::new(state.current_vid + i as u64))
125            .collect();
126        state.current_vid += needed;
127        Ok(vids)
128    }
129
130    /// Allocates a new EID.
131    ///
132    /// Returns a globally unique, auto-incrementing edge ID.
133    pub async fn allocate_eid(&self) -> Result<Eid> {
134        let mut state = self.state.lock().await;
135
136        // Check if we've exhausted our current batch (L13: checked).
137        if state.current_eid >= state.manifest.next_eid_batch {
138            // Reserve a new batch
139            state.manifest.next_eid_batch = state
140                .current_eid
141                .checked_add(self.batch_size)
142                .ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
143            self.persist_manifest(&mut state).await?;
144        }
145
146        let eid = Eid::new(state.current_eid);
147        state.current_eid += 1;
148        Ok(eid)
149    }
150
151    /// Allocates multiple EIDs at once.
152    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
153        let mut state = self.state.lock().await;
154        let needed = count as u64;
155
156        // Check if we need to expand our batch (L13: checked).
157        let want = state
158            .current_eid
159            .checked_add(needed)
160            .ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
161        if want > state.manifest.next_eid_batch {
162            // Reserve enough for the request plus a full batch
163            state.manifest.next_eid_batch = want
164                .checked_add(self.batch_size)
165                .ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
166            self.persist_manifest(&mut state).await?;
167        }
168
169        let eids: Vec<Eid> = (0..count)
170            .map(|i| Eid::new(state.current_eid + i as u64))
171            .collect();
172        state.current_eid += needed;
173        Ok(eids)
174    }
175
176    /// Returns the current VID counter value (next VID that would be allocated).
177    pub async fn current_vid(&self) -> u64 {
178        self.state.lock().await.current_vid
179    }
180
181    /// Returns the current EID counter value (next EID that would be allocated).
182    pub async fn current_eid(&self) -> u64 {
183        self.state.lock().await.current_eid
184    }
185
186    /// Snapshot the current high-water-marks for VID and EID.
187    ///
188    /// Returns `(next_vid, next_eid)` — the values the next
189    /// allocations would produce if not constrained by batch
190    /// reservation. Used by Phase 2 fork-creation to bootstrap a
191    /// fork's allocator above primary's range without going through
192    /// disk (the primary and fork allocators may live on different
193    /// `ObjectStore` instances, making file-copy bootstrap fragile).
194    pub async fn current_hwm(&self) -> (u64, u64) {
195        let state = self.state.lock().await;
196        (state.current_vid, state.current_eid)
197    }
198
199    /// Force a checkpoint of the in-memory state to the underlying
200    /// object store.
201    ///
202    /// Used by Phase 2 fork creation to bootstrap a fork's allocator
203    /// from primary's *current* HWM. Without this, primary's allocator
204    /// has an in-memory state that the on-disk manifest doesn't yet
205    /// reflect (the disk file is only updated on batch-boundary
206    /// crossings), and the fork would start at VID 0 — colliding with
207    /// primary rows visible through the `base_paths` chain.
208    ///
209    /// Idempotent and safe to call frequently; the persisted manifest
210    /// reflects the same state if nothing has changed.
211    ///
212    /// # Errors
213    ///
214    /// Returns the underlying [`anyhow::Error`] from `persist_manifest`
215    /// (object-store put failure).
216    pub async fn checkpoint(&self) -> Result<()> {
217        let mut state = self.state.lock().await;
218        // Advance the persisted batch HWM to at least the current
219        // allocation cursor so reloads start above any allocated VIDs.
220        if state.manifest.next_vid_batch < state.current_vid {
221            state.manifest.next_vid_batch = state.current_vid;
222        }
223        if state.manifest.next_eid_batch < state.current_eid {
224            state.manifest.next_eid_batch = state.current_eid;
225        }
226        self.persist_manifest(&mut state).await
227    }
228
229    /// Persists the counter manifest to object store with optimistic locking.
230    async fn persist_manifest(&self, state: &mut AllocatorState) -> Result<()> {
231        let json = serde_json::to_vec_pretty(&state.manifest)?;
232        let bytes = Bytes::from(json);
233
234        // Try conditional put first, fall back to unconditional if not supported
235        // (LocalFileSystem doesn't support ETag-based conditional puts)
236        let put_result = if let Some(version) = &state.manifest_version {
237            let opts: PutOptions = PutMode::Update(UpdateVersion {
238                e_tag: Some(version.clone()),
239                version: None,
240            })
241            .into();
242            match tokio::time::timeout(
243                DEFAULT_TIMEOUT,
244                self.store.put_opts(&self.path, bytes.clone().into(), opts),
245            )
246            .await
247            {
248                Ok(Ok(result)) => result,
249                Ok(Err(e))
250                    if e.to_string().contains("not yet implemented")
251                        || e.to_string().contains("not supported") =>
252                {
253                    // LocalFileSystem doesn't support conditional puts, use regular put
254                    put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
255                }
256                Ok(Err(e)) => return Err(e.into()),
257                Err(_) => {
258                    return Err(anyhow::anyhow!(
259                        "Object store put_opts timed out after {:?}",
260                        DEFAULT_TIMEOUT
261                    ));
262                }
263            }
264        } else {
265            // No version yet, try create mode, fall back to regular put
266            let opts: PutOptions = PutMode::Create.into();
267            match tokio::time::timeout(
268                DEFAULT_TIMEOUT,
269                self.store.put_opts(&self.path, bytes.clone().into(), opts),
270            )
271            .await
272            {
273                Ok(Ok(result)) => result,
274                Ok(Err(object_store::Error::AlreadyExists { .. })) => {
275                    // Another process created it, just overwrite
276                    put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
277                }
278                Ok(Err(e)) if e.to_string().contains("not yet implemented") => {
279                    put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
280                }
281                Ok(Err(e)) => return Err(e.into()),
282                Err(_) => {
283                    return Err(anyhow::anyhow!(
284                        "Object store put_opts timed out after {:?}",
285                        DEFAULT_TIMEOUT
286                    ));
287                }
288            }
289        };
290
291        state.manifest_version = put_result.e_tag;
292        Ok(())
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use object_store::memory::InMemory;
300
301    #[tokio::test]
302    async fn test_allocate_vid() {
303        let store = Arc::new(InMemory::new());
304        let path = Path::from("id_counters.json");
305        let allocator = IdAllocator::new(store, path, 100).await.unwrap();
306
307        let vid1 = allocator.allocate_vid().await.unwrap();
308        let vid2 = allocator.allocate_vid().await.unwrap();
309        let vid3 = allocator.allocate_vid().await.unwrap();
310
311        assert_eq!(vid1.as_u64(), 0);
312        assert_eq!(vid2.as_u64(), 1);
313        assert_eq!(vid3.as_u64(), 2);
314    }
315
316    #[tokio::test]
317    async fn test_allocate_eid() {
318        let store = Arc::new(InMemory::new());
319        let path = Path::from("id_counters.json");
320        let allocator = IdAllocator::new(store, path, 100).await.unwrap();
321
322        let eid1 = allocator.allocate_eid().await.unwrap();
323        let eid2 = allocator.allocate_eid().await.unwrap();
324
325        assert_eq!(eid1.as_u64(), 0);
326        assert_eq!(eid2.as_u64(), 1);
327    }
328
329    #[tokio::test]
330    async fn test_allocate_many() {
331        let store = Arc::new(InMemory::new());
332        let path = Path::from("id_counters.json");
333        let allocator = IdAllocator::new(store, path, 100).await.unwrap();
334
335        let vids = allocator.allocate_vids(5).await.unwrap();
336        assert_eq!(vids.len(), 5);
337        for (i, vid) in vids.iter().enumerate() {
338            assert_eq!(vid.as_u64(), i as u64);
339        }
340
341        // Next allocation should continue from 5
342        let next = allocator.allocate_vid().await.unwrap();
343        assert_eq!(next.as_u64(), 5);
344    }
345
346    #[tokio::test]
347    async fn test_persistence() {
348        let store = Arc::new(InMemory::new());
349        let path = Path::from("id_counters.json");
350
351        // Allocate some IDs
352        {
353            let allocator = IdAllocator::new(store.clone(), path.clone(), 10)
354                .await
355                .unwrap();
356            for _ in 0..15 {
357                allocator.allocate_vid().await.unwrap();
358            }
359        }
360
361        // Re-open and verify continuation
362        {
363            let allocator = IdAllocator::new(store, path, 10).await.unwrap();
364            // After allocating 15 IDs with batch size 10, we reserved up to 20
365            // So next allocation should be 20 (start of new batch after reload)
366            let vid = allocator.allocate_vid().await.unwrap();
367            assert_eq!(vid.as_u64(), 20);
368        }
369    }
370}