Skip to main content

uni_store/runtime/
id_allocator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! ID allocation for vertices and edges using pure auto-increment counters.
5//!
6//! VIDs and EIDs are simple auto-incrementing u64 values. Unlike the previous
7//! design, they no longer embed label/type information - that's now handled
8//! by the VidLabelsIndex and edge tables.
9
10use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
11use anyhow::Result;
12use bytes::Bytes;
13use object_store::path::Path;
14use object_store::{ObjectStore, PutMode, PutOptions, UpdateVersion};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use uni_common::core::id::{Eid, Vid};
19
20/// Persisted counter manifest - stores the reserved counter ranges.
21#[derive(Serialize, Deserialize, Default, Clone)]
22struct CounterManifest {
23    /// Next VID value that needs to be reserved (end of current batch)
24    next_vid_batch: u64,
25    /// Next EID value that needs to be reserved (end of current batch)
26    next_eid_batch: u64,
27}
28
29/// Internal allocator state - tracks current position within reserved batch.
30struct AllocatorState {
31    manifest: CounterManifest,
32    manifest_version: Option<String>, // ETag for optimistic locking
33    current_vid: u64,
34    current_eid: u64,
35}
36
37/// Allocates globally unique VIDs and EIDs using auto-increment counters.
38///
39/// This allocator uses batch reservation to minimize object store writes:
40/// - Reserves a batch of IDs (e.g., 1000) from the object store
41/// - Allocates from the local batch until exhausted
42/// - Reserves a new batch when needed
43pub struct IdAllocator {
44    store: Arc<dyn ObjectStore>,
45    path: Path,
46    state: Mutex<AllocatorState>,
47    batch_size: u64,
48}
49
50impl IdAllocator {
51    /// Creates a new ID allocator, loading existing state from object store.
52    pub async fn new(store: Arc<dyn ObjectStore>, path: Path, batch_size: u64) -> Result<Self> {
53        let (manifest, version) = match get_with_timeout(&store, &path, DEFAULT_TIMEOUT).await {
54            Ok(get_result) => {
55                let version = get_result.meta.e_tag.clone();
56                let bytes = get_result.bytes().await?;
57                let manifest: CounterManifest = serde_json::from_slice(&bytes)?;
58                (manifest, version)
59            }
60            Err(e) if e.to_string().contains("not found") => (CounterManifest::default(), None),
61            Err(e) => return Err(e),
62        };
63
64        // Start allocating from where the last batch ended
65        let current_vid = manifest.next_vid_batch;
66        let current_eid = manifest.next_eid_batch;
67
68        Ok(Self {
69            store,
70            path,
71            state: Mutex::new(AllocatorState {
72                manifest,
73                manifest_version: version,
74                current_vid,
75                current_eid,
76            }),
77            batch_size,
78        })
79    }
80
81    /// Allocates a new VID.
82    ///
83    /// Returns a globally unique, auto-incrementing vertex ID.
84    pub async fn allocate_vid(&self) -> Result<Vid> {
85        let mut state = self.state.lock().await;
86
87        // Check if we've exhausted our current batch
88        if state.current_vid >= state.manifest.next_vid_batch {
89            // Reserve a new batch
90            state.manifest.next_vid_batch = state.current_vid + self.batch_size;
91            self.persist_manifest(&mut state).await?;
92        }
93
94        let vid = Vid::new(state.current_vid);
95        state.current_vid += 1;
96        Ok(vid)
97    }
98
99    /// Allocates multiple VIDs at once.
100    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
101        let mut state = self.state.lock().await;
102        let needed = count as u64;
103
104        // Check if we need to expand our batch
105        if state.current_vid + needed > state.manifest.next_vid_batch {
106            // Reserve enough for the request plus a full batch
107            state.manifest.next_vid_batch = state.current_vid + needed + self.batch_size;
108            self.persist_manifest(&mut state).await?;
109        }
110
111        let vids: Vec<Vid> = (0..count)
112            .map(|i| Vid::new(state.current_vid + i as u64))
113            .collect();
114        state.current_vid += needed;
115        Ok(vids)
116    }
117
118    /// Allocates a new EID.
119    ///
120    /// Returns a globally unique, auto-incrementing edge ID.
121    pub async fn allocate_eid(&self) -> Result<Eid> {
122        let mut state = self.state.lock().await;
123
124        // Check if we've exhausted our current batch
125        if state.current_eid >= state.manifest.next_eid_batch {
126            // Reserve a new batch
127            state.manifest.next_eid_batch = state.current_eid + self.batch_size;
128            self.persist_manifest(&mut state).await?;
129        }
130
131        let eid = Eid::new(state.current_eid);
132        state.current_eid += 1;
133        Ok(eid)
134    }
135
136    /// Allocates multiple EIDs at once.
137    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
138        let mut state = self.state.lock().await;
139        let needed = count as u64;
140
141        // Check if we need to expand our batch
142        if state.current_eid + needed > state.manifest.next_eid_batch {
143            // Reserve enough for the request plus a full batch
144            state.manifest.next_eid_batch = state.current_eid + needed + self.batch_size;
145            self.persist_manifest(&mut state).await?;
146        }
147
148        let eids: Vec<Eid> = (0..count)
149            .map(|i| Eid::new(state.current_eid + i as u64))
150            .collect();
151        state.current_eid += needed;
152        Ok(eids)
153    }
154
155    /// Returns the current VID counter value (next VID that would be allocated).
156    pub async fn current_vid(&self) -> u64 {
157        self.state.lock().await.current_vid
158    }
159
160    /// Returns the current EID counter value (next EID that would be allocated).
161    pub async fn current_eid(&self) -> u64 {
162        self.state.lock().await.current_eid
163    }
164
165    /// Persists the counter manifest to object store with optimistic locking.
166    async fn persist_manifest(&self, state: &mut AllocatorState) -> Result<()> {
167        let json = serde_json::to_vec_pretty(&state.manifest)?;
168        let bytes = Bytes::from(json);
169
170        // Try conditional put first, fall back to unconditional if not supported
171        // (LocalFileSystem doesn't support ETag-based conditional puts)
172        let put_result = if let Some(version) = &state.manifest_version {
173            let opts: PutOptions = PutMode::Update(UpdateVersion {
174                e_tag: Some(version.clone()),
175                version: None,
176            })
177            .into();
178            match tokio::time::timeout(
179                DEFAULT_TIMEOUT,
180                self.store.put_opts(&self.path, bytes.clone().into(), opts),
181            )
182            .await
183            {
184                Ok(Ok(result)) => result,
185                Ok(Err(e))
186                    if e.to_string().contains("not yet implemented")
187                        || e.to_string().contains("not supported") =>
188                {
189                    // LocalFileSystem doesn't support conditional puts, use regular put
190                    put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
191                }
192                Ok(Err(e)) => return Err(e.into()),
193                Err(_) => {
194                    return Err(anyhow::anyhow!(
195                        "Object store put_opts timed out after {:?}",
196                        DEFAULT_TIMEOUT
197                    ));
198                }
199            }
200        } else {
201            // No version yet, try create mode, fall back to regular put
202            let opts: PutOptions = PutMode::Create.into();
203            match tokio::time::timeout(
204                DEFAULT_TIMEOUT,
205                self.store.put_opts(&self.path, bytes.clone().into(), opts),
206            )
207            .await
208            {
209                Ok(Ok(result)) => result,
210                Ok(Err(object_store::Error::AlreadyExists { .. })) => {
211                    // Another process created it, just overwrite
212                    put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
213                }
214                Ok(Err(e)) if e.to_string().contains("not yet implemented") => {
215                    put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
216                }
217                Ok(Err(e)) => return Err(e.into()),
218                Err(_) => {
219                    return Err(anyhow::anyhow!(
220                        "Object store put_opts timed out after {:?}",
221                        DEFAULT_TIMEOUT
222                    ));
223                }
224            }
225        };
226
227        state.manifest_version = put_result.e_tag;
228        Ok(())
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use object_store::memory::InMemory;
236
237    #[tokio::test]
238    async fn test_allocate_vid() {
239        let store = Arc::new(InMemory::new());
240        let path = Path::from("id_counters.json");
241        let allocator = IdAllocator::new(store, path, 100).await.unwrap();
242
243        let vid1 = allocator.allocate_vid().await.unwrap();
244        let vid2 = allocator.allocate_vid().await.unwrap();
245        let vid3 = allocator.allocate_vid().await.unwrap();
246
247        assert_eq!(vid1.as_u64(), 0);
248        assert_eq!(vid2.as_u64(), 1);
249        assert_eq!(vid3.as_u64(), 2);
250    }
251
252    #[tokio::test]
253    async fn test_allocate_eid() {
254        let store = Arc::new(InMemory::new());
255        let path = Path::from("id_counters.json");
256        let allocator = IdAllocator::new(store, path, 100).await.unwrap();
257
258        let eid1 = allocator.allocate_eid().await.unwrap();
259        let eid2 = allocator.allocate_eid().await.unwrap();
260
261        assert_eq!(eid1.as_u64(), 0);
262        assert_eq!(eid2.as_u64(), 1);
263    }
264
265    #[tokio::test]
266    async fn test_allocate_many() {
267        let store = Arc::new(InMemory::new());
268        let path = Path::from("id_counters.json");
269        let allocator = IdAllocator::new(store, path, 100).await.unwrap();
270
271        let vids = allocator.allocate_vids(5).await.unwrap();
272        assert_eq!(vids.len(), 5);
273        for (i, vid) in vids.iter().enumerate() {
274            assert_eq!(vid.as_u64(), i as u64);
275        }
276
277        // Next allocation should continue from 5
278        let next = allocator.allocate_vid().await.unwrap();
279        assert_eq!(next.as_u64(), 5);
280    }
281
282    #[tokio::test]
283    async fn test_persistence() {
284        let store = Arc::new(InMemory::new());
285        let path = Path::from("id_counters.json");
286
287        // Allocate some IDs
288        {
289            let allocator = IdAllocator::new(store.clone(), path.clone(), 10)
290                .await
291                .unwrap();
292            for _ in 0..15 {
293                allocator.allocate_vid().await.unwrap();
294            }
295        }
296
297        // Re-open and verify continuation
298        {
299            let allocator = IdAllocator::new(store, path, 10).await.unwrap();
300            // After allocating 15 IDs with batch size 10, we reserved up to 20
301            // So next allocation should be 20 (start of new batch after reload)
302            let vid = allocator.allocate_vid().await.unwrap();
303            assert_eq!(vid.as_u64(), 20);
304        }
305    }
306}