Skip to main content

co_primitives/types/
co_map.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use super::lazy_transaction::Transactionable;
5use crate::{
6	library::lsm_tree_map::Root, AnyBlockStorage, LazyTransaction, LsmTreeMap, OptionLink, StorageError, Streamable,
7};
8use async_trait::async_trait;
9use cid::Cid;
10use futures::{pin_mut, stream::BoxStream, Stream, StreamExt, TryStreamExt};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::{
13	future::{ready, Future},
14	hash::Hash,
15};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord)]
18#[serde(transparent)]
19pub struct CoMap<K, V>(OptionLink<Root<K, V>>)
20where
21	K: Hash + Ord + Clone + Send + Sync + 'static,
22	V: Clone + Send + Sync + 'static;
23impl<K, V> CoMap<K, V>
24where
25	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
26	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
27{
28	/// Create collection from iterator.
29	pub async fn from_iter<S>(storage: &S, iter: impl IntoIterator<Item = (K, V)>) -> Result<Self, StorageError>
30	where
31		S: AnyBlockStorage,
32	{
33		let mut transaction = Self::default().open(storage).await?;
34		for (key, value) in iter.into_iter() {
35			transaction.insert(key, value).await?;
36		}
37		transaction.store().await
38	}
39
40	/// Whether this collection is empty.
41	pub fn is_empty(&self) -> bool {
42		self.0.is_none()
43	}
44
45	pub async fn get<S>(&self, storage: &S, key: &K) -> Result<Option<V>, StorageError>
46	where
47		S: AnyBlockStorage,
48	{
49		self.open(storage).await?.get(key).await
50	}
51
52	pub async fn contains<S>(&self, storage: &S, key: &K) -> Result<bool, StorageError>
53	where
54		S: AnyBlockStorage,
55	{
56		self.open(storage).await?.contains_key(key).await
57	}
58
59	pub fn stream<S>(&self, storage: &S) -> impl Stream<Item = Result<(K, V), StorageError>> + '_
60	where
61		S: AnyBlockStorage,
62	{
63		let storage = storage.clone();
64		async_stream::try_stream! {
65			let tree = self.open(&storage).await?;
66			let stream = tree.stream();
67			for await item in stream {
68				yield item?;
69			}
70		}
71	}
72
73	pub async fn insert<S>(&mut self, storage: &S, key: K, value: V) -> Result<(), StorageError>
74	where
75		S: AnyBlockStorage,
76	{
77		self.with_transaction(storage, |mut transaction| async move {
78			transaction.insert(key, value).await?;
79			Ok(transaction)
80		})
81		.await
82	}
83
84	pub async fn remove<S>(&mut self, storage: &S, key: K) -> Result<Option<V>, StorageError>
85	where
86		S: AnyBlockStorage,
87	{
88		let mut transaction = self.open(storage).await?;
89		let result = transaction.remove(key).await?;
90		self.commit(transaction).await?;
91		Ok(result)
92	}
93
94	/// Update (or insert default) value.
95	pub async fn update_or_insert<S, F>(&mut self, storage: &S, key: K, update: F) -> Result<(), StorageError>
96	where
97		V: Default,
98		F: FnOnce(&mut V) + Send,
99		S: AnyBlockStorage,
100	{
101		self.with_transaction(storage, |mut transaction| async move {
102			transaction.update_or_insert(key, update).await?;
103			Ok(transaction)
104		})
105		.await
106	}
107
108	/// Update (or insert default) value.
109	pub async fn try_update_or_insert_async<S, F, Fut>(
110		&mut self,
111		storage: &S,
112		key: K,
113		update: F,
114	) -> Result<(), StorageError>
115	where
116		V: Default,
117		F: FnOnce(V) -> Fut + Send,
118		Fut: Future<Output = Result<V, StorageError>> + Send,
119		S: AnyBlockStorage,
120	{
121		self.with_transaction(storage, |mut transaction| async move {
122			transaction.try_update_or_insert_async(key, update).await?;
123			Ok(transaction)
124		})
125		.await
126	}
127
128	/// Update value ignore if key not exists.
129	pub async fn update<S, F>(&mut self, storage: &S, key: K, update: F) -> Result<(), StorageError>
130	where
131		F: FnOnce(&mut V) + Send,
132		S: AnyBlockStorage,
133	{
134		self.with_transaction(storage, |mut transaction| async move {
135			transaction.update(key, update).await?;
136			Ok(transaction)
137		})
138		.await
139	}
140
141	/// Update (or insert default) value.
142	pub async fn try_update_async<S, F, Fut>(&mut self, storage: &S, key: K, update: F) -> Result<(), StorageError>
143	where
144		F: FnOnce(V) -> Fut + Send,
145		Fut: Future<Output = Result<V, StorageError>> + Send,
146		S: AnyBlockStorage,
147	{
148		self.with_transaction(storage, |mut transaction| async move {
149			transaction.try_update_async(key, update).await?;
150			Ok(transaction)
151		})
152		.await
153	}
154
155	pub async fn open_mut<'m, S>(&'m mut self, storage: &S) -> Result<CoMapMutTransaction<'m, S, K, V>, StorageError>
156	where
157		S: AnyBlockStorage,
158	{
159		Ok(CoMapMutTransaction { transaction: self.open(storage).await?, container: self })
160	}
161
162	pub async fn open<S>(&self, storage: &S) -> Result<CoMapTransaction<S, K, V>, StorageError>
163	where
164		S: AnyBlockStorage,
165	{
166		Ok(CoMapTransaction {
167			tree: match self.0.link() {
168				Some(root) => LsmTreeMap::load(storage.clone(), root).await?,
169				None => LsmTreeMap::new(storage.clone(), Default::default()),
170			},
171		})
172	}
173
174	pub async fn open_lazy<S>(&self, storage: &S) -> Result<LazyTransaction<S, Self>, StorageError>
175	where
176		S: AnyBlockStorage,
177	{
178		Ok(LazyTransaction::new(storage.clone(), self.clone()))
179	}
180
181	/// Commit transaction to this map.
182	pub async fn commit<S>(&mut self, mut transaction: CoMapTransaction<S, K, V>) -> Result<(), StorageError>
183	where
184		S: AnyBlockStorage,
185	{
186		self.0 = transaction.tree.store().await?;
187		Ok(())
188	}
189
190	/// Open transaction, apply `update` and store it.
191	pub async fn with_transaction<S, F, Fut>(&mut self, storage: &S, update: F) -> Result<(), StorageError>
192	where
193		S: AnyBlockStorage,
194		F: FnOnce(CoMapTransaction<S, K, V>) -> Fut + Send,
195		Fut: Future<Output = Result<CoMapTransaction<S, K, V>, StorageError>> + Send,
196	{
197		let transaction = self.open(storage).await?;
198		let mut result = update(transaction).await?;
199		self.0 = result.tree.store().await?;
200		Ok(())
201	}
202}
203impl<K, V> Default for CoMap<K, V>
204where
205	K: Hash + Ord + Clone + Send + Sync + 'static,
206	V: Clone + Send + Sync + 'static,
207{
208	fn default() -> Self {
209		Self(Default::default())
210	}
211}
212impl<K, V> From<Option<Cid>> for CoMap<K, V>
213where
214	K: Hash + Ord + Clone + Send + Sync + 'static,
215	V: Clone + Send + Sync + 'static,
216{
217	fn from(cid: Option<Cid>) -> Self {
218		Self(cid.into())
219	}
220}
221impl<K, V> From<&CoMap<K, V>> for Option<Cid>
222where
223	K: Hash + Ord + Clone + Send + Sync + 'static,
224	V: Clone + Send + Sync + 'static,
225{
226	fn from(value: &CoMap<K, V>) -> Self {
227		*value.0.cid()
228	}
229}
230#[async_trait]
231impl<S, K, V> Transactionable<S> for CoMap<K, V>
232where
233	S: AnyBlockStorage,
234	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
235	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
236{
237	type Transaction = CoMapTransaction<S, K, V>;
238
239	async fn open(&self, storage: &S) -> Result<Self::Transaction, StorageError> {
240		CoMap::open(self, storage).await
241	}
242}
243impl<S, K, V> Streamable<S> for CoMap<K, V>
244where
245	S: AnyBlockStorage,
246	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
247	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
248{
249	type Item = Result<(K, V), StorageError>;
250	type Stream = BoxStream<'static, Self::Item>;
251
252	fn stream(&self, storage: S) -> Self::Stream {
253		let collection = self.clone();
254		async_stream::try_stream! {
255			let transaction = collection.open(&storage).await?;
256			let stream = transaction.stream();
257			for await item in stream {
258				yield item?;
259			}
260		}
261		.boxed()
262	}
263}
264
265pub struct CoMapMutTransaction<'m, S, K, V>
266where
267	S: AnyBlockStorage,
268	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
269	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
270{
271	container: &'m mut CoMap<K, V>,
272	transaction: CoMapTransaction<S, K, V>,
273}
274impl<'m, S, K, V> CoMapMutTransaction<'m, S, K, V>
275where
276	S: AnyBlockStorage,
277	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
278	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
279{
280	pub async fn commit(mut self) -> Result<(), StorageError> {
281		self.container.0 = self.transaction.tree.store().await?;
282		Ok(())
283	}
284}
285impl<'m, S, K, V> CoMapMutTransaction<'m, S, K, V>
286where
287	S: AnyBlockStorage,
288	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
289	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
290{
291	pub async fn get(&self, key: &K) -> Result<Option<V>, StorageError> {
292		self.transaction.get(key).await
293	}
294
295	pub async fn contains_key(&self, key: &K) -> Result<bool, StorageError> {
296		self.transaction.contains_key(key).await
297	}
298
299	pub fn stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + '_ {
300		self.transaction.stream()
301	}
302
303	pub async fn insert(&mut self, key: K, value: V) -> Result<(), StorageError> {
304		self.transaction.insert(key, value).await
305	}
306
307	pub async fn remove(&mut self, key: K) -> Result<Option<V>, StorageError> {
308		self.transaction.remove(key).await
309	}
310
311	/// Update (or insert default) value.
312	pub async fn try_update_or_insert_async<F, Fut>(&mut self, key: K, update: F) -> Result<(), StorageError>
313	where
314		V: Default,
315		F: FnOnce(V) -> Fut + Send,
316		Fut: Future<Output = Result<V, StorageError>> + Send,
317	{
318		self.transaction.try_update_or_insert_async(key, update).await
319	}
320
321	/// Store as new CoMap
322	pub async fn store(&mut self) -> Result<CoMap<K, V>, StorageError> {
323		self.transaction.store().await
324	}
325}
326
327#[derive(Clone)]
328pub struct CoMapTransaction<S, K, V>
329where
330	S: AnyBlockStorage,
331	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
332	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
333{
334	tree: LsmTreeMap<S, K, V>,
335}
336impl<S, K, V> CoMapTransaction<S, K, V>
337where
338	S: AnyBlockStorage,
339	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
340	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
341{
342	pub async fn get(&self, key: &K) -> Result<Option<V>, StorageError> {
343		self.tree.get(key).await
344	}
345
346	pub async fn contains_key(&self, key: &K) -> Result<bool, StorageError> {
347		self.tree.contains_key(key).await
348	}
349
350	pub fn stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
351		self.tree.stream()
352	}
353
354	pub fn stream_filter<F: FnMut(&V) -> bool>(
355		&self,
356		mut predicate: F,
357	) -> impl Stream<Item = Result<K, StorageError>> + use<S, K, V, F> {
358		self.stream()
359			.try_filter_map(move |(key, value)| ready(Ok(if predicate(&value) { Some(key) } else { None })))
360	}
361
362	pub async fn insert(&mut self, key: K, value: V) -> Result<(), StorageError> {
363		self.tree.insert(key, value).await
364	}
365
366	pub async fn remove(&mut self, key: K) -> Result<Option<V>, StorageError> {
367		if let Some(value) = self.tree.get(&key).await? {
368			self.tree.remove(key).await?;
369			Ok(Some(value))
370		} else {
371			Ok(None)
372		}
373	}
374
375	/// Update (or insert default) value.
376	pub async fn update_or_insert<F>(&mut self, key: K, update: F) -> Result<(), StorageError>
377	where
378		V: Default,
379		F: FnOnce(&mut V) + Send,
380	{
381		let mut item = self.get(&key).await?.unwrap_or_default();
382		update(&mut item);
383		self.insert(key, item).await?;
384		Ok(())
385	}
386
387	/// Update (or insert default) value.
388	pub async fn try_update_or_insert_async<F, Fut>(&mut self, key: K, update: F) -> Result<(), StorageError>
389	where
390		V: Default,
391		F: FnOnce(V) -> Fut + Send,
392		Fut: Future<Output = Result<V, StorageError>> + Send,
393	{
394		let item = self.get(&key).await?.unwrap_or_default();
395		let next_item = update(item).await?;
396		self.insert(key, next_item).await?;
397		Ok(())
398	}
399
400	/// Update value, ignore if key not exists.
401	pub async fn update<F>(&mut self, key: K, update: F) -> Result<(), StorageError>
402	where
403		F: FnOnce(&mut V) + Send,
404	{
405		if let Some(mut item) = self.get(&key).await? {
406			update(&mut item);
407			self.insert(key, item).await?;
408		}
409		Ok(())
410	}
411
412	/// Update value, ignore if key not exists.
413	pub async fn try_update_async<F, Fut>(&mut self, key: K, update: F) -> Result<(), StorageError>
414	where
415		F: FnOnce(V) -> Fut + Send,
416		Fut: Future<Output = Result<V, StorageError>> + Send,
417	{
418		if let Some(item) = self.get(&key).await? {
419			let next_item = update(item).await?;
420			self.insert(key, next_item).await?;
421		}
422		Ok(())
423	}
424
425	pub async fn update_stream(
426		&mut self,
427		keys_to_update: impl Stream<Item = Result<K, StorageError>>,
428		mut update: impl FnMut(&K, &mut V) + Send,
429	) -> Result<(), StorageError> {
430		pin_mut!(keys_to_update);
431		while let Some(key) = keys_to_update.try_next().await? {
432			if let Some(mut value) = self.get(&key).await? {
433				(update)(&key, &mut value);
434				self.insert(key, value).await?;
435			}
436		}
437		Ok(())
438	}
439
440	pub async fn remove_stream(
441		&mut self,
442		keys_to_remove: impl Stream<Item = Result<K, StorageError>>,
443	) -> Result<(), StorageError> {
444		pin_mut!(keys_to_remove);
445		while let Some(key) = keys_to_remove.try_next().await? {
446			self.remove(key).await?;
447		}
448		Ok(())
449	}
450
451	/// Store as new CoMap
452	pub async fn store(&mut self) -> Result<CoMap<K, V>, StorageError> {
453		let link = self.tree.store().await?;
454		Ok(CoMap(link))
455	}
456}
457
458#[cfg(test)]
459mod tests {
460	use crate::{library::test::TestStorage, CoMap};
461	use futures::TryStreamExt;
462	use std::time::SystemTime;
463
464	#[tokio::test]
465	async fn smoke() {
466		let storage = TestStorage::default();
467		let mut map = CoMap::<i32, i32>::default();
468		let mut transaction = map.open(&storage).await.unwrap();
469		transaction.insert(1, 1).await.unwrap();
470		transaction.insert(2, 2).await.unwrap();
471		map.commit(transaction).await.unwrap();
472		assert_eq!(map.stream(&storage).try_collect::<Vec<(i32, i32)>>().await.unwrap(), vec![(1, 1), (2, 2)]);
473	}
474
475	const BENCHMARK_REPEATS: i32 = 1000;
476	#[tokio::test]
477	async fn benchmark_transactional() {
478		let ts = SystemTime::now();
479		let storage = TestStorage::default();
480		let mut map = CoMap::<i32, i32>::default();
481		let mut transaction = map.open(&storage).await.unwrap();
482		for i in 0..BENCHMARK_REPEATS {
483			transaction.insert(i, i).await.unwrap();
484		}
485		map.commit(transaction).await.unwrap();
486		println!(
487			"{} insert transactions done in: {:?} seconds",
488			BENCHMARK_REPEATS,
489			SystemTime::now().duration_since(ts).unwrap().as_secs_f32()
490		);
491	}
492
493	#[tokio::test]
494	async fn benchmark_pure() {
495		let ts = SystemTime::now();
496		let storage = TestStorage::default();
497		let mut map = CoMap::<i32, i32>::default();
498		for i in 0..BENCHMARK_REPEATS {
499			map.insert(&storage, i, i).await.unwrap();
500		}
501		println!(
502			"{} pure inserts done in: {:?} seconds",
503			BENCHMARK_REPEATS,
504			SystemTime::now().duration_since(ts).unwrap().as_secs_f32()
505		);
506	}
507}