Skip to main content

co_primitives/types/
co_set.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use super::lazy_transaction::Transactionable;
5use crate::{
6	library::lsm_tree_map::Root, AnyBlockStorage, LazyTransaction, LsmTreeMap, OptionLink, StorageError, Streamable,
7};
8use async_trait::async_trait;
9use cid::Cid;
10use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::{fmt::Debug, future::Future, hash::Hash};
13
14#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord)]
15#[serde(transparent)]
16pub struct CoSet<K>(OptionLink<Root<K, SetValZST>>)
17where
18	K: Hash + Ord + Clone + Send + Sync + 'static;
19impl<K> CoSet<K>
20where
21	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
22{
23	/// Create collection from iterator.
24	pub async fn from_iter<S>(storage: &S, iter: impl IntoIterator<Item = K>) -> Result<Self, StorageError>
25	where
26		S: AnyBlockStorage,
27	{
28		let mut transaction = Self::default().open(storage).await?;
29		for key in iter.into_iter() {
30			transaction.insert(key).await?;
31		}
32		transaction.store().await
33	}
34
35	/// Whether this collection is empty.
36	pub fn is_empty(&self) -> bool {
37		self.0.is_none()
38	}
39
40	pub async fn contains<S>(&self, storage: &S, key: &K) -> Result<bool, StorageError>
41	where
42		S: AnyBlockStorage,
43	{
44		self.open(storage).await?.contains(key).await
45	}
46
47	pub fn stream<S>(&self, storage: &S) -> impl Stream<Item = Result<K, StorageError>> + '_
48	where
49		S: AnyBlockStorage,
50	{
51		let storage = storage.clone();
52		async_stream::try_stream! {
53			let transaction = self.open(&storage).await?;
54			let stream = transaction.stream();
55			for await item in stream {
56				yield item?;
57			}
58		}
59	}
60
61	pub fn into_stream<S>(self, storage: S) -> impl Stream<Item = Result<K, StorageError>> + 'static
62	where
63		S: AnyBlockStorage,
64	{
65		let storage = storage.clone();
66		async_stream::try_stream! {
67			let transaction = self.open(&storage).await?;
68			let stream = transaction.stream();
69			for await item in stream {
70				yield item?;
71			}
72		}
73	}
74
75	pub async fn insert<S>(&mut self, storage: &S, key: K) -> Result<(), StorageError>
76	where
77		S: AnyBlockStorage,
78	{
79		let mut transaction = self.open(storage).await?;
80		transaction.insert(key).await?;
81		self.commit(transaction).await?;
82		Ok(())
83	}
84
85	/// Remove key from set and return `true` if it was present.
86	pub async fn remove<S>(&mut self, storage: &S, key: K) -> Result<bool, StorageError>
87	where
88		S: AnyBlockStorage,
89	{
90		let mut transaction = self.open(storage).await?;
91		let result = transaction.remove(key).await?;
92		if result {
93			self.commit(transaction).await?;
94		}
95		Ok(result)
96	}
97
98	pub async fn open<S>(&self, storage: &S) -> Result<CoSetTransaction<S, K>, StorageError>
99	where
100		S: AnyBlockStorage,
101	{
102		Ok(CoSetTransaction {
103			tree: match self.0.link() {
104				Some(root) => LsmTreeMap::load(storage.clone(), root).await?,
105				None => LsmTreeMap::new(storage.clone(), Default::default()),
106			},
107		})
108	}
109
110	pub async fn open_lazy<S>(&self, storage: &S) -> Result<LazyTransaction<S, Self>, StorageError>
111	where
112		S: AnyBlockStorage,
113	{
114		Ok(LazyTransaction::new(storage.clone(), self.clone()))
115	}
116
117	/// Commit transaction to this map.
118	pub async fn commit<S>(&mut self, mut transaction: CoSetTransaction<S, K>) -> Result<(), StorageError>
119	where
120		S: AnyBlockStorage,
121	{
122		self.0 = transaction.tree.store().await?;
123		Ok(())
124	}
125
126	/// Open transaction, apply `update` and store it.
127	pub async fn with_transaction<S, F, Fut>(&mut self, storage: &S, update: F) -> Result<(), StorageError>
128	where
129		S: AnyBlockStorage,
130		F: FnOnce(CoSetTransaction<S, K>) -> Fut + Send,
131		Fut: Future<Output = Result<CoSetTransaction<S, K>, StorageError>> + Send,
132	{
133		let transaction = self.open(storage).await?;
134		let mut result = update(transaction).await?;
135		self.0 = result.tree.store().await?;
136		Ok(())
137	}
138}
139impl<K> Default for CoSet<K>
140where
141	K: Hash + Ord + Clone + Send + Sync + 'static,
142{
143	fn default() -> Self {
144		Self(Default::default())
145	}
146}
147impl<K> From<Option<Cid>> for CoSet<K>
148where
149	K: Hash + Ord + Clone + Send + Sync + 'static,
150{
151	fn from(value: Option<Cid>) -> Self {
152		Self(value.into())
153	}
154}
155impl<K> From<&CoSet<K>> for Option<Cid>
156where
157	K: Hash + Ord + Clone + Send + Sync + 'static,
158{
159	fn from(value: &CoSet<K>) -> Self {
160		*value.0.cid()
161	}
162}
163#[async_trait]
164impl<S, K> Transactionable<S> for CoSet<K>
165where
166	S: AnyBlockStorage,
167	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
168{
169	type Transaction = CoSetTransaction<S, K>;
170
171	async fn open(&self, storage: &S) -> Result<Self::Transaction, StorageError> {
172		CoSet::open(self, storage).await
173	}
174}
175impl<S, K> Streamable<S> for CoSet<K>
176where
177	S: AnyBlockStorage,
178	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
179{
180	type Item = Result<K, StorageError>;
181	type Stream = BoxStream<'static, Self::Item>;
182
183	fn stream(&self, storage: S) -> Self::Stream {
184		let collection = self.clone();
185		async_stream::try_stream! {
186			let transaction = collection.open(&storage).await?;
187			let stream = transaction.stream();
188			for await item in stream {
189				yield item?;
190			}
191		}
192		.boxed()
193	}
194}
195
196#[derive(Clone)]
197pub struct CoSetTransaction<S, K>
198where
199	S: AnyBlockStorage,
200	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
201{
202	tree: LsmTreeMap<S, K, SetValZST>,
203}
204impl<S, K> CoSetTransaction<S, K>
205where
206	S: AnyBlockStorage,
207	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
208{
209	pub async fn contains(&self, key: &K) -> Result<bool, StorageError> {
210		self.tree.contains_key(key).await
211	}
212
213	pub fn stream(&self) -> impl Stream<Item = Result<K, StorageError>> + '_ {
214		self.tree.stream().map_ok(|(key, _)| key)
215	}
216
217	pub async fn insert(&mut self, key: K) -> Result<(), StorageError> {
218		self.tree.insert(key, SetValZST).await
219	}
220
221	/// Remove key from set and return `true` if it was present.
222	pub async fn remove(&mut self, key: K) -> Result<bool, StorageError> {
223		if (self.tree.get(&key).await?).is_some() {
224			self.tree.remove(key).await?;
225			Ok(true)
226		} else {
227			Ok(false)
228		}
229	}
230
231	/// Store as new CoSet
232	pub async fn store(&mut self) -> Result<CoSet<K>, StorageError> {
233		let link = self.tree.store().await?;
234		Ok(CoSet(link))
235	}
236}
237
238/// Zero-Sized Type (ZST) for internal `CoSet` values.
239/// Used instead of `()` to differentiate between:
240/// * `CoSet<T, ()>` (possible user-defined map)
241/// * `CoSet<T, SetValZST>` (internal set representation)
242#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Default, Serialize, Deserialize)]
243struct SetValZST;
244
245#[cfg(test)]
246mod tests {
247	use crate::{library::test::TestStorage, CoSet};
248	use futures::TryStreamExt;
249
250	#[tokio::test]
251	async fn smoke() {
252		let storage = TestStorage::default();
253		let mut set = CoSet::<i32>::default();
254		let mut transaction = set.open(&storage).await.unwrap();
255		transaction.insert(1).await.unwrap();
256		transaction.insert(2).await.unwrap();
257		set.commit(transaction).await.unwrap();
258		assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), vec![1, 2]);
259	}
260
261	#[tokio::test]
262	async fn test_remove() {
263		let storage = TestStorage::default();
264		let mut set = CoSet::<i32>::default();
265		let mut transaction = set.open(&storage).await.unwrap();
266		transaction.insert(1).await.unwrap();
267		transaction.insert(2).await.unwrap();
268		transaction.insert(3).await.unwrap();
269		transaction.remove(1).await.unwrap();
270		set.commit(transaction).await.unwrap();
271		assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), vec![2, 3]);
272
273		let mut transaction = set.open(&storage).await.unwrap();
274		transaction.remove(3).await.unwrap();
275		set.commit(transaction).await.unwrap();
276		assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), vec![2]);
277	}
278
279	#[tokio::test]
280	async fn test_remove_large() {
281		let storage = TestStorage::default();
282		let mut set = CoSet::<i32>::default();
283		let mut transaction = set.open(&storage).await.unwrap();
284		let range = 0..131072;
285		for i in range.clone() {
286			transaction.insert(i).await.unwrap();
287		}
288		set.commit(transaction).await.unwrap();
289		let mut expect = range.collect::<Vec<i32>>();
290		assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), expect);
291
292		let mut transaction = set.open(&storage).await.unwrap();
293		transaction.remove(10).await.unwrap();
294		set.commit(transaction).await.unwrap();
295		expect.remove(10);
296		assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), expect);
297	}
298}