1use super::lazy_transaction::Transactionable;
5use crate::{
6 library::lsm_tree_map::Root, AnyBlockStorage, LazyTransaction, LsmTreeMap, OptionLink, StorageError, Streamable,
7};
8use async_trait::async_trait;
9use cid::Cid;
10use futures::{pin_mut, stream::BoxStream, Stream, StreamExt, TryStreamExt};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::{
13 future::{ready, Future},
14 hash::Hash,
15};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord)]
18#[serde(transparent)]
19pub struct CoMap<K, V>(OptionLink<Root<K, V>>)
20where
21 K: Hash + Ord + Clone + Send + Sync + 'static,
22 V: Clone + Send + Sync + 'static;
23impl<K, V> CoMap<K, V>
24where
25 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
26 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
27{
28 pub async fn from_iter<S>(storage: &S, iter: impl IntoIterator<Item = (K, V)>) -> Result<Self, StorageError>
30 where
31 S: AnyBlockStorage,
32 {
33 let mut transaction = Self::default().open(storage).await?;
34 for (key, value) in iter.into_iter() {
35 transaction.insert(key, value).await?;
36 }
37 transaction.store().await
38 }
39
40 pub fn is_empty(&self) -> bool {
42 self.0.is_none()
43 }
44
45 pub async fn get<S>(&self, storage: &S, key: &K) -> Result<Option<V>, StorageError>
46 where
47 S: AnyBlockStorage,
48 {
49 self.open(storage).await?.get(key).await
50 }
51
52 pub async fn contains<S>(&self, storage: &S, key: &K) -> Result<bool, StorageError>
53 where
54 S: AnyBlockStorage,
55 {
56 self.open(storage).await?.contains_key(key).await
57 }
58
59 pub fn stream<S>(&self, storage: &S) -> impl Stream<Item = Result<(K, V), StorageError>> + '_
60 where
61 S: AnyBlockStorage,
62 {
63 let storage = storage.clone();
64 async_stream::try_stream! {
65 let tree = self.open(&storage).await?;
66 let stream = tree.stream();
67 for await item in stream {
68 yield item?;
69 }
70 }
71 }
72
73 pub async fn insert<S>(&mut self, storage: &S, key: K, value: V) -> Result<(), StorageError>
74 where
75 S: AnyBlockStorage,
76 {
77 self.with_transaction(storage, |mut transaction| async move {
78 transaction.insert(key, value).await?;
79 Ok(transaction)
80 })
81 .await
82 }
83
84 pub async fn remove<S>(&mut self, storage: &S, key: K) -> Result<Option<V>, StorageError>
85 where
86 S: AnyBlockStorage,
87 {
88 let mut transaction = self.open(storage).await?;
89 let result = transaction.remove(key).await?;
90 self.commit(transaction).await?;
91 Ok(result)
92 }
93
94 pub async fn update_or_insert<S, F>(&mut self, storage: &S, key: K, update: F) -> Result<(), StorageError>
96 where
97 V: Default,
98 F: FnOnce(&mut V) + Send,
99 S: AnyBlockStorage,
100 {
101 self.with_transaction(storage, |mut transaction| async move {
102 transaction.update_or_insert(key, update).await?;
103 Ok(transaction)
104 })
105 .await
106 }
107
108 pub async fn try_update_or_insert_async<S, F, Fut>(
110 &mut self,
111 storage: &S,
112 key: K,
113 update: F,
114 ) -> Result<(), StorageError>
115 where
116 V: Default,
117 F: FnOnce(V) -> Fut + Send,
118 Fut: Future<Output = Result<V, StorageError>> + Send,
119 S: AnyBlockStorage,
120 {
121 self.with_transaction(storage, |mut transaction| async move {
122 transaction.try_update_or_insert_async(key, update).await?;
123 Ok(transaction)
124 })
125 .await
126 }
127
128 pub async fn update<S, F>(&mut self, storage: &S, key: K, update: F) -> Result<(), StorageError>
130 where
131 F: FnOnce(&mut V) + Send,
132 S: AnyBlockStorage,
133 {
134 self.with_transaction(storage, |mut transaction| async move {
135 transaction.update(key, update).await?;
136 Ok(transaction)
137 })
138 .await
139 }
140
141 pub async fn try_update_async<S, F, Fut>(&mut self, storage: &S, key: K, update: F) -> Result<(), StorageError>
143 where
144 F: FnOnce(V) -> Fut + Send,
145 Fut: Future<Output = Result<V, StorageError>> + Send,
146 S: AnyBlockStorage,
147 {
148 self.with_transaction(storage, |mut transaction| async move {
149 transaction.try_update_async(key, update).await?;
150 Ok(transaction)
151 })
152 .await
153 }
154
155 pub async fn open_mut<'m, S>(&'m mut self, storage: &S) -> Result<CoMapMutTransaction<'m, S, K, V>, StorageError>
156 where
157 S: AnyBlockStorage,
158 {
159 Ok(CoMapMutTransaction { transaction: self.open(storage).await?, container: self })
160 }
161
162 pub async fn open<S>(&self, storage: &S) -> Result<CoMapTransaction<S, K, V>, StorageError>
163 where
164 S: AnyBlockStorage,
165 {
166 Ok(CoMapTransaction {
167 tree: match self.0.link() {
168 Some(root) => LsmTreeMap::load(storage.clone(), root).await?,
169 None => LsmTreeMap::new(storage.clone(), Default::default()),
170 },
171 })
172 }
173
174 pub async fn open_lazy<S>(&self, storage: &S) -> Result<LazyTransaction<S, Self>, StorageError>
175 where
176 S: AnyBlockStorage,
177 {
178 Ok(LazyTransaction::new(storage.clone(), self.clone()))
179 }
180
181 pub async fn commit<S>(&mut self, mut transaction: CoMapTransaction<S, K, V>) -> Result<(), StorageError>
183 where
184 S: AnyBlockStorage,
185 {
186 self.0 = transaction.tree.store().await?;
187 Ok(())
188 }
189
190 pub async fn with_transaction<S, F, Fut>(&mut self, storage: &S, update: F) -> Result<(), StorageError>
192 where
193 S: AnyBlockStorage,
194 F: FnOnce(CoMapTransaction<S, K, V>) -> Fut + Send,
195 Fut: Future<Output = Result<CoMapTransaction<S, K, V>, StorageError>> + Send,
196 {
197 let transaction = self.open(storage).await?;
198 let mut result = update(transaction).await?;
199 self.0 = result.tree.store().await?;
200 Ok(())
201 }
202}
203impl<K, V> Default for CoMap<K, V>
204where
205 K: Hash + Ord + Clone + Send + Sync + 'static,
206 V: Clone + Send + Sync + 'static,
207{
208 fn default() -> Self {
209 Self(Default::default())
210 }
211}
212impl<K, V> From<Option<Cid>> for CoMap<K, V>
213where
214 K: Hash + Ord + Clone + Send + Sync + 'static,
215 V: Clone + Send + Sync + 'static,
216{
217 fn from(cid: Option<Cid>) -> Self {
218 Self(cid.into())
219 }
220}
221impl<K, V> From<&CoMap<K, V>> for Option<Cid>
222where
223 K: Hash + Ord + Clone + Send + Sync + 'static,
224 V: Clone + Send + Sync + 'static,
225{
226 fn from(value: &CoMap<K, V>) -> Self {
227 *value.0.cid()
228 }
229}
230#[async_trait]
231impl<S, K, V> Transactionable<S> for CoMap<K, V>
232where
233 S: AnyBlockStorage,
234 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
235 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
236{
237 type Transaction = CoMapTransaction<S, K, V>;
238
239 async fn open(&self, storage: &S) -> Result<Self::Transaction, StorageError> {
240 CoMap::open(self, storage).await
241 }
242}
243impl<S, K, V> Streamable<S> for CoMap<K, V>
244where
245 S: AnyBlockStorage,
246 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
247 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
248{
249 type Item = Result<(K, V), StorageError>;
250 type Stream = BoxStream<'static, Self::Item>;
251
252 fn stream(&self, storage: S) -> Self::Stream {
253 let collection = self.clone();
254 async_stream::try_stream! {
255 let transaction = collection.open(&storage).await?;
256 let stream = transaction.stream();
257 for await item in stream {
258 yield item?;
259 }
260 }
261 .boxed()
262 }
263}
264
265pub struct CoMapMutTransaction<'m, S, K, V>
266where
267 S: AnyBlockStorage,
268 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
269 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
270{
271 container: &'m mut CoMap<K, V>,
272 transaction: CoMapTransaction<S, K, V>,
273}
274impl<'m, S, K, V> CoMapMutTransaction<'m, S, K, V>
275where
276 S: AnyBlockStorage,
277 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
278 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
279{
280 pub async fn commit(mut self) -> Result<(), StorageError> {
281 self.container.0 = self.transaction.tree.store().await?;
282 Ok(())
283 }
284}
285impl<'m, S, K, V> CoMapMutTransaction<'m, S, K, V>
286where
287 S: AnyBlockStorage,
288 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
289 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
290{
291 pub async fn get(&self, key: &K) -> Result<Option<V>, StorageError> {
292 self.transaction.get(key).await
293 }
294
295 pub async fn contains_key(&self, key: &K) -> Result<bool, StorageError> {
296 self.transaction.contains_key(key).await
297 }
298
299 pub fn stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + '_ {
300 self.transaction.stream()
301 }
302
303 pub async fn insert(&mut self, key: K, value: V) -> Result<(), StorageError> {
304 self.transaction.insert(key, value).await
305 }
306
307 pub async fn remove(&mut self, key: K) -> Result<Option<V>, StorageError> {
308 self.transaction.remove(key).await
309 }
310
311 pub async fn try_update_or_insert_async<F, Fut>(&mut self, key: K, update: F) -> Result<(), StorageError>
313 where
314 V: Default,
315 F: FnOnce(V) -> Fut + Send,
316 Fut: Future<Output = Result<V, StorageError>> + Send,
317 {
318 self.transaction.try_update_or_insert_async(key, update).await
319 }
320
321 pub async fn store(&mut self) -> Result<CoMap<K, V>, StorageError> {
323 self.transaction.store().await
324 }
325}
326
327#[derive(Clone)]
328pub struct CoMapTransaction<S, K, V>
329where
330 S: AnyBlockStorage,
331 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
332 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
333{
334 tree: LsmTreeMap<S, K, V>,
335}
336impl<S, K, V> CoMapTransaction<S, K, V>
337where
338 S: AnyBlockStorage,
339 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
340 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
341{
342 pub async fn get(&self, key: &K) -> Result<Option<V>, StorageError> {
343 self.tree.get(key).await
344 }
345
346 pub async fn contains_key(&self, key: &K) -> Result<bool, StorageError> {
347 self.tree.contains_key(key).await
348 }
349
350 pub fn stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
351 self.tree.stream()
352 }
353
354 pub fn stream_filter<F: FnMut(&V) -> bool>(
355 &self,
356 mut predicate: F,
357 ) -> impl Stream<Item = Result<K, StorageError>> + use<S, K, V, F> {
358 self.stream()
359 .try_filter_map(move |(key, value)| ready(Ok(if predicate(&value) { Some(key) } else { None })))
360 }
361
362 pub async fn insert(&mut self, key: K, value: V) -> Result<(), StorageError> {
363 self.tree.insert(key, value).await
364 }
365
366 pub async fn remove(&mut self, key: K) -> Result<Option<V>, StorageError> {
367 if let Some(value) = self.tree.get(&key).await? {
368 self.tree.remove(key).await?;
369 Ok(Some(value))
370 } else {
371 Ok(None)
372 }
373 }
374
375 pub async fn update_or_insert<F>(&mut self, key: K, update: F) -> Result<(), StorageError>
377 where
378 V: Default,
379 F: FnOnce(&mut V) + Send,
380 {
381 let mut item = self.get(&key).await?.unwrap_or_default();
382 update(&mut item);
383 self.insert(key, item).await?;
384 Ok(())
385 }
386
387 pub async fn try_update_or_insert_async<F, Fut>(&mut self, key: K, update: F) -> Result<(), StorageError>
389 where
390 V: Default,
391 F: FnOnce(V) -> Fut + Send,
392 Fut: Future<Output = Result<V, StorageError>> + Send,
393 {
394 let item = self.get(&key).await?.unwrap_or_default();
395 let next_item = update(item).await?;
396 self.insert(key, next_item).await?;
397 Ok(())
398 }
399
400 pub async fn update<F>(&mut self, key: K, update: F) -> Result<(), StorageError>
402 where
403 F: FnOnce(&mut V) + Send,
404 {
405 if let Some(mut item) = self.get(&key).await? {
406 update(&mut item);
407 self.insert(key, item).await?;
408 }
409 Ok(())
410 }
411
412 pub async fn try_update_async<F, Fut>(&mut self, key: K, update: F) -> Result<(), StorageError>
414 where
415 F: FnOnce(V) -> Fut + Send,
416 Fut: Future<Output = Result<V, StorageError>> + Send,
417 {
418 if let Some(item) = self.get(&key).await? {
419 let next_item = update(item).await?;
420 self.insert(key, next_item).await?;
421 }
422 Ok(())
423 }
424
425 pub async fn update_stream(
426 &mut self,
427 keys_to_update: impl Stream<Item = Result<K, StorageError>>,
428 mut update: impl FnMut(&K, &mut V) + Send,
429 ) -> Result<(), StorageError> {
430 pin_mut!(keys_to_update);
431 while let Some(key) = keys_to_update.try_next().await? {
432 if let Some(mut value) = self.get(&key).await? {
433 (update)(&key, &mut value);
434 self.insert(key, value).await?;
435 }
436 }
437 Ok(())
438 }
439
440 pub async fn remove_stream(
441 &mut self,
442 keys_to_remove: impl Stream<Item = Result<K, StorageError>>,
443 ) -> Result<(), StorageError> {
444 pin_mut!(keys_to_remove);
445 while let Some(key) = keys_to_remove.try_next().await? {
446 self.remove(key).await?;
447 }
448 Ok(())
449 }
450
451 pub async fn store(&mut self) -> Result<CoMap<K, V>, StorageError> {
453 let link = self.tree.store().await?;
454 Ok(CoMap(link))
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use crate::{library::test::TestStorage, CoMap};
461 use futures::TryStreamExt;
462 use std::time::SystemTime;
463
464 #[tokio::test]
465 async fn smoke() {
466 let storage = TestStorage::default();
467 let mut map = CoMap::<i32, i32>::default();
468 let mut transaction = map.open(&storage).await.unwrap();
469 transaction.insert(1, 1).await.unwrap();
470 transaction.insert(2, 2).await.unwrap();
471 map.commit(transaction).await.unwrap();
472 assert_eq!(map.stream(&storage).try_collect::<Vec<(i32, i32)>>().await.unwrap(), vec![(1, 1), (2, 2)]);
473 }
474
475 const BENCHMARK_REPEATS: i32 = 1000;
476 #[tokio::test]
477 async fn benchmark_transactional() {
478 let ts = SystemTime::now();
479 let storage = TestStorage::default();
480 let mut map = CoMap::<i32, i32>::default();
481 let mut transaction = map.open(&storage).await.unwrap();
482 for i in 0..BENCHMARK_REPEATS {
483 transaction.insert(i, i).await.unwrap();
484 }
485 map.commit(transaction).await.unwrap();
486 println!(
487 "{} insert transactions done in: {:?} seconds",
488 BENCHMARK_REPEATS,
489 SystemTime::now().duration_since(ts).unwrap().as_secs_f32()
490 );
491 }
492
493 #[tokio::test]
494 async fn benchmark_pure() {
495 let ts = SystemTime::now();
496 let storage = TestStorage::default();
497 let mut map = CoMap::<i32, i32>::default();
498 for i in 0..BENCHMARK_REPEATS {
499 map.insert(&storage, i, i).await.unwrap();
500 }
501 println!(
502 "{} pure inserts done in: {:?} seconds",
503 BENCHMARK_REPEATS,
504 SystemTime::now().duration_since(ts).unwrap().as_secs_f32()
505 );
506 }
507}