1use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
11use anyhow::Result;
12use bytes::Bytes;
13use object_store::path::Path;
14use object_store::{ObjectStore, PutMode, PutOptions, UpdateVersion};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use uni_common::core::id::{Eid, Vid};
19
20#[derive(Serialize, Deserialize, Default, Clone)]
22struct CounterManifest {
23 next_vid_batch: u64,
25 next_eid_batch: u64,
27}
28
29struct AllocatorState {
31 manifest: CounterManifest,
32 manifest_version: Option<String>, current_vid: u64,
34 current_eid: u64,
35}
36
37pub struct IdAllocator {
44 store: Arc<dyn ObjectStore>,
45 path: Path,
46 state: Mutex<AllocatorState>,
47 batch_size: u64,
48}
49
50impl IdAllocator {
51 pub async fn new(store: Arc<dyn ObjectStore>, path: Path, batch_size: u64) -> Result<Self> {
53 let (manifest, version) = match get_with_timeout(&store, &path, DEFAULT_TIMEOUT).await {
54 Ok(get_result) => {
55 let version = get_result.meta.e_tag.clone();
56 let bytes = get_result.bytes().await?;
57 let manifest: CounterManifest = serde_json::from_slice(&bytes)?;
58 (manifest, version)
59 }
60 Err(e) if e.to_string().contains("not found") => (CounterManifest::default(), None),
61 Err(e) => return Err(e),
62 };
63
64 let current_vid = manifest.next_vid_batch;
66 let current_eid = manifest.next_eid_batch;
67
68 Ok(Self {
69 store,
70 path,
71 state: Mutex::new(AllocatorState {
72 manifest,
73 manifest_version: version,
74 current_vid,
75 current_eid,
76 }),
77 batch_size,
78 })
79 }
80
81 pub async fn allocate_vid(&self) -> Result<Vid> {
85 let mut state = self.state.lock().await;
86
87 if state.current_vid >= state.manifest.next_vid_batch {
89 state.manifest.next_vid_batch = state
94 .current_vid
95 .checked_add(self.batch_size)
96 .ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
97 self.persist_manifest(&mut state).await?;
98 }
99
100 let vid = Vid::new(state.current_vid);
101 state.current_vid += 1;
102 Ok(vid)
103 }
104
105 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
107 let mut state = self.state.lock().await;
108 let needed = count as u64;
109
110 let want = state
112 .current_vid
113 .checked_add(needed)
114 .ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
115 if want > state.manifest.next_vid_batch {
116 state.manifest.next_vid_batch = want
118 .checked_add(self.batch_size)
119 .ok_or_else(|| anyhow::anyhow!("VID space exhausted"))?;
120 self.persist_manifest(&mut state).await?;
121 }
122
123 let vids: Vec<Vid> = (0..count)
124 .map(|i| Vid::new(state.current_vid + i as u64))
125 .collect();
126 state.current_vid += needed;
127 Ok(vids)
128 }
129
130 pub async fn allocate_eid(&self) -> Result<Eid> {
134 let mut state = self.state.lock().await;
135
136 if state.current_eid >= state.manifest.next_eid_batch {
138 state.manifest.next_eid_batch = state
140 .current_eid
141 .checked_add(self.batch_size)
142 .ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
143 self.persist_manifest(&mut state).await?;
144 }
145
146 let eid = Eid::new(state.current_eid);
147 state.current_eid += 1;
148 Ok(eid)
149 }
150
151 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
153 let mut state = self.state.lock().await;
154 let needed = count as u64;
155
156 let want = state
158 .current_eid
159 .checked_add(needed)
160 .ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
161 if want > state.manifest.next_eid_batch {
162 state.manifest.next_eid_batch = want
164 .checked_add(self.batch_size)
165 .ok_or_else(|| anyhow::anyhow!("EID space exhausted"))?;
166 self.persist_manifest(&mut state).await?;
167 }
168
169 let eids: Vec<Eid> = (0..count)
170 .map(|i| Eid::new(state.current_eid + i as u64))
171 .collect();
172 state.current_eid += needed;
173 Ok(eids)
174 }
175
176 pub async fn current_vid(&self) -> u64 {
178 self.state.lock().await.current_vid
179 }
180
181 pub async fn current_eid(&self) -> u64 {
183 self.state.lock().await.current_eid
184 }
185
186 pub async fn current_hwm(&self) -> (u64, u64) {
195 let state = self.state.lock().await;
196 (state.current_vid, state.current_eid)
197 }
198
199 pub async fn checkpoint(&self) -> Result<()> {
217 let mut state = self.state.lock().await;
218 if state.manifest.next_vid_batch < state.current_vid {
221 state.manifest.next_vid_batch = state.current_vid;
222 }
223 if state.manifest.next_eid_batch < state.current_eid {
224 state.manifest.next_eid_batch = state.current_eid;
225 }
226 self.persist_manifest(&mut state).await
227 }
228
229 async fn persist_manifest(&self, state: &mut AllocatorState) -> Result<()> {
231 let json = serde_json::to_vec_pretty(&state.manifest)?;
232 let bytes = Bytes::from(json);
233
234 let put_result = if let Some(version) = &state.manifest_version {
237 let opts: PutOptions = PutMode::Update(UpdateVersion {
238 e_tag: Some(version.clone()),
239 version: None,
240 })
241 .into();
242 match tokio::time::timeout(
243 DEFAULT_TIMEOUT,
244 self.store.put_opts(&self.path, bytes.clone().into(), opts),
245 )
246 .await
247 {
248 Ok(Ok(result)) => result,
249 Ok(Err(e))
250 if e.to_string().contains("not yet implemented")
251 || e.to_string().contains("not supported") =>
252 {
253 put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
255 }
256 Ok(Err(e)) => return Err(e.into()),
257 Err(_) => {
258 return Err(anyhow::anyhow!(
259 "Object store put_opts timed out after {:?}",
260 DEFAULT_TIMEOUT
261 ));
262 }
263 }
264 } else {
265 let opts: PutOptions = PutMode::Create.into();
267 match tokio::time::timeout(
268 DEFAULT_TIMEOUT,
269 self.store.put_opts(&self.path, bytes.clone().into(), opts),
270 )
271 .await
272 {
273 Ok(Ok(result)) => result,
274 Ok(Err(object_store::Error::AlreadyExists { .. })) => {
275 put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
277 }
278 Ok(Err(e)) if e.to_string().contains("not yet implemented") => {
279 put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
280 }
281 Ok(Err(e)) => return Err(e.into()),
282 Err(_) => {
283 return Err(anyhow::anyhow!(
284 "Object store put_opts timed out after {:?}",
285 DEFAULT_TIMEOUT
286 ));
287 }
288 }
289 };
290
291 state.manifest_version = put_result.e_tag;
292 Ok(())
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use object_store::memory::InMemory;
300
301 #[tokio::test]
302 async fn test_allocate_vid() {
303 let store = Arc::new(InMemory::new());
304 let path = Path::from("id_counters.json");
305 let allocator = IdAllocator::new(store, path, 100).await.unwrap();
306
307 let vid1 = allocator.allocate_vid().await.unwrap();
308 let vid2 = allocator.allocate_vid().await.unwrap();
309 let vid3 = allocator.allocate_vid().await.unwrap();
310
311 assert_eq!(vid1.as_u64(), 0);
312 assert_eq!(vid2.as_u64(), 1);
313 assert_eq!(vid3.as_u64(), 2);
314 }
315
316 #[tokio::test]
317 async fn test_allocate_eid() {
318 let store = Arc::new(InMemory::new());
319 let path = Path::from("id_counters.json");
320 let allocator = IdAllocator::new(store, path, 100).await.unwrap();
321
322 let eid1 = allocator.allocate_eid().await.unwrap();
323 let eid2 = allocator.allocate_eid().await.unwrap();
324
325 assert_eq!(eid1.as_u64(), 0);
326 assert_eq!(eid2.as_u64(), 1);
327 }
328
329 #[tokio::test]
330 async fn test_allocate_many() {
331 let store = Arc::new(InMemory::new());
332 let path = Path::from("id_counters.json");
333 let allocator = IdAllocator::new(store, path, 100).await.unwrap();
334
335 let vids = allocator.allocate_vids(5).await.unwrap();
336 assert_eq!(vids.len(), 5);
337 for (i, vid) in vids.iter().enumerate() {
338 assert_eq!(vid.as_u64(), i as u64);
339 }
340
341 let next = allocator.allocate_vid().await.unwrap();
343 assert_eq!(next.as_u64(), 5);
344 }
345
346 #[tokio::test]
347 async fn test_persistence() {
348 let store = Arc::new(InMemory::new());
349 let path = Path::from("id_counters.json");
350
351 {
353 let allocator = IdAllocator::new(store.clone(), path.clone(), 10)
354 .await
355 .unwrap();
356 for _ in 0..15 {
357 allocator.allocate_vid().await.unwrap();
358 }
359 }
360
361 {
363 let allocator = IdAllocator::new(store, path, 10).await.unwrap();
364 let vid = allocator.allocate_vid().await.unwrap();
367 assert_eq!(vid.as_u64(), 20);
368 }
369 }
370}