uni_store/runtime/
id_allocator.rs1use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
11use anyhow::Result;
12use bytes::Bytes;
13use object_store::path::Path;
14use object_store::{ObjectStore, PutMode, PutOptions, UpdateVersion};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18use uni_common::core::id::{Eid, Vid};
19
20#[derive(Serialize, Deserialize, Default, Clone)]
22struct CounterManifest {
23 next_vid_batch: u64,
25 next_eid_batch: u64,
27}
28
29struct AllocatorState {
31 manifest: CounterManifest,
32 manifest_version: Option<String>, current_vid: u64,
34 current_eid: u64,
35}
36
37pub struct IdAllocator {
44 store: Arc<dyn ObjectStore>,
45 path: Path,
46 state: Mutex<AllocatorState>,
47 batch_size: u64,
48}
49
50impl IdAllocator {
51 pub async fn new(store: Arc<dyn ObjectStore>, path: Path, batch_size: u64) -> Result<Self> {
53 let (manifest, version) = match get_with_timeout(&store, &path, DEFAULT_TIMEOUT).await {
54 Ok(get_result) => {
55 let version = get_result.meta.e_tag.clone();
56 let bytes = get_result.bytes().await?;
57 let manifest: CounterManifest = serde_json::from_slice(&bytes)?;
58 (manifest, version)
59 }
60 Err(e) if e.to_string().contains("not found") => (CounterManifest::default(), None),
61 Err(e) => return Err(e),
62 };
63
64 let current_vid = manifest.next_vid_batch;
66 let current_eid = manifest.next_eid_batch;
67
68 Ok(Self {
69 store,
70 path,
71 state: Mutex::new(AllocatorState {
72 manifest,
73 manifest_version: version,
74 current_vid,
75 current_eid,
76 }),
77 batch_size,
78 })
79 }
80
81 pub async fn allocate_vid(&self) -> Result<Vid> {
85 let mut state = self.state.lock().await;
86
87 if state.current_vid >= state.manifest.next_vid_batch {
89 state.manifest.next_vid_batch = state.current_vid + self.batch_size;
91 self.persist_manifest(&mut state).await?;
92 }
93
94 let vid = Vid::new(state.current_vid);
95 state.current_vid += 1;
96 Ok(vid)
97 }
98
99 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
101 let mut state = self.state.lock().await;
102 let needed = count as u64;
103
104 if state.current_vid + needed > state.manifest.next_vid_batch {
106 state.manifest.next_vid_batch = state.current_vid + needed + self.batch_size;
108 self.persist_manifest(&mut state).await?;
109 }
110
111 let vids: Vec<Vid> = (0..count)
112 .map(|i| Vid::new(state.current_vid + i as u64))
113 .collect();
114 state.current_vid += needed;
115 Ok(vids)
116 }
117
118 pub async fn allocate_eid(&self) -> Result<Eid> {
122 let mut state = self.state.lock().await;
123
124 if state.current_eid >= state.manifest.next_eid_batch {
126 state.manifest.next_eid_batch = state.current_eid + self.batch_size;
128 self.persist_manifest(&mut state).await?;
129 }
130
131 let eid = Eid::new(state.current_eid);
132 state.current_eid += 1;
133 Ok(eid)
134 }
135
136 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
138 let mut state = self.state.lock().await;
139 let needed = count as u64;
140
141 if state.current_eid + needed > state.manifest.next_eid_batch {
143 state.manifest.next_eid_batch = state.current_eid + needed + self.batch_size;
145 self.persist_manifest(&mut state).await?;
146 }
147
148 let eids: Vec<Eid> = (0..count)
149 .map(|i| Eid::new(state.current_eid + i as u64))
150 .collect();
151 state.current_eid += needed;
152 Ok(eids)
153 }
154
155 pub async fn current_vid(&self) -> u64 {
157 self.state.lock().await.current_vid
158 }
159
160 pub async fn current_eid(&self) -> u64 {
162 self.state.lock().await.current_eid
163 }
164
165 async fn persist_manifest(&self, state: &mut AllocatorState) -> Result<()> {
167 let json = serde_json::to_vec_pretty(&state.manifest)?;
168 let bytes = Bytes::from(json);
169
170 let put_result = if let Some(version) = &state.manifest_version {
173 let opts: PutOptions = PutMode::Update(UpdateVersion {
174 e_tag: Some(version.clone()),
175 version: None,
176 })
177 .into();
178 match tokio::time::timeout(
179 DEFAULT_TIMEOUT,
180 self.store.put_opts(&self.path, bytes.clone().into(), opts),
181 )
182 .await
183 {
184 Ok(Ok(result)) => result,
185 Ok(Err(e))
186 if e.to_string().contains("not yet implemented")
187 || e.to_string().contains("not supported") =>
188 {
189 put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
191 }
192 Ok(Err(e)) => return Err(e.into()),
193 Err(_) => {
194 return Err(anyhow::anyhow!(
195 "Object store put_opts timed out after {:?}",
196 DEFAULT_TIMEOUT
197 ));
198 }
199 }
200 } else {
201 let opts: PutOptions = PutMode::Create.into();
203 match tokio::time::timeout(
204 DEFAULT_TIMEOUT,
205 self.store.put_opts(&self.path, bytes.clone().into(), opts),
206 )
207 .await
208 {
209 Ok(Ok(result)) => result,
210 Ok(Err(object_store::Error::AlreadyExists { .. })) => {
211 put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
213 }
214 Ok(Err(e)) if e.to_string().contains("not yet implemented") => {
215 put_with_timeout(&self.store, &self.path, bytes, DEFAULT_TIMEOUT).await?
216 }
217 Ok(Err(e)) => return Err(e.into()),
218 Err(_) => {
219 return Err(anyhow::anyhow!(
220 "Object store put_opts timed out after {:?}",
221 DEFAULT_TIMEOUT
222 ));
223 }
224 }
225 };
226
227 state.manifest_version = put_result.e_tag;
228 Ok(())
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use object_store::memory::InMemory;
236
237 #[tokio::test]
238 async fn test_allocate_vid() {
239 let store = Arc::new(InMemory::new());
240 let path = Path::from("id_counters.json");
241 let allocator = IdAllocator::new(store, path, 100).await.unwrap();
242
243 let vid1 = allocator.allocate_vid().await.unwrap();
244 let vid2 = allocator.allocate_vid().await.unwrap();
245 let vid3 = allocator.allocate_vid().await.unwrap();
246
247 assert_eq!(vid1.as_u64(), 0);
248 assert_eq!(vid2.as_u64(), 1);
249 assert_eq!(vid3.as_u64(), 2);
250 }
251
252 #[tokio::test]
253 async fn test_allocate_eid() {
254 let store = Arc::new(InMemory::new());
255 let path = Path::from("id_counters.json");
256 let allocator = IdAllocator::new(store, path, 100).await.unwrap();
257
258 let eid1 = allocator.allocate_eid().await.unwrap();
259 let eid2 = allocator.allocate_eid().await.unwrap();
260
261 assert_eq!(eid1.as_u64(), 0);
262 assert_eq!(eid2.as_u64(), 1);
263 }
264
265 #[tokio::test]
266 async fn test_allocate_many() {
267 let store = Arc::new(InMemory::new());
268 let path = Path::from("id_counters.json");
269 let allocator = IdAllocator::new(store, path, 100).await.unwrap();
270
271 let vids = allocator.allocate_vids(5).await.unwrap();
272 assert_eq!(vids.len(), 5);
273 for (i, vid) in vids.iter().enumerate() {
274 assert_eq!(vid.as_u64(), i as u64);
275 }
276
277 let next = allocator.allocate_vid().await.unwrap();
279 assert_eq!(next.as_u64(), 5);
280 }
281
282 #[tokio::test]
283 async fn test_persistence() {
284 let store = Arc::new(InMemory::new());
285 let path = Path::from("id_counters.json");
286
287 {
289 let allocator = IdAllocator::new(store.clone(), path.clone(), 10)
290 .await
291 .unwrap();
292 for _ in 0..15 {
293 allocator.allocate_vid().await.unwrap();
294 }
295 }
296
297 {
299 let allocator = IdAllocator::new(store, path, 10).await.unwrap();
300 let vid = allocator.allocate_vid().await.unwrap();
303 assert_eq!(vid.as_u64(), 20);
304 }
305 }
306}