1use super::lazy_transaction::Transactionable;
5use crate::{
6 library::lsm_tree_map::Root, AnyBlockStorage, LazyTransaction, LsmTreeMap, OptionLink, StorageError, Streamable,
7};
8use async_trait::async_trait;
9use cid::Cid;
10use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::{fmt::Debug, future::Future, hash::Hash};
13
14#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord)]
15#[serde(transparent)]
16pub struct CoSet<K>(OptionLink<Root<K, SetValZST>>)
17where
18 K: Hash + Ord + Clone + Send + Sync + 'static;
19impl<K> CoSet<K>
20where
21 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
22{
23 pub async fn from_iter<S>(storage: &S, iter: impl IntoIterator<Item = K>) -> Result<Self, StorageError>
25 where
26 S: AnyBlockStorage,
27 {
28 let mut transaction = Self::default().open(storage).await?;
29 for key in iter.into_iter() {
30 transaction.insert(key).await?;
31 }
32 transaction.store().await
33 }
34
35 pub fn is_empty(&self) -> bool {
37 self.0.is_none()
38 }
39
40 pub async fn contains<S>(&self, storage: &S, key: &K) -> Result<bool, StorageError>
41 where
42 S: AnyBlockStorage,
43 {
44 self.open(storage).await?.contains(key).await
45 }
46
47 pub fn stream<S>(&self, storage: &S) -> impl Stream<Item = Result<K, StorageError>> + '_
48 where
49 S: AnyBlockStorage,
50 {
51 let storage = storage.clone();
52 async_stream::try_stream! {
53 let transaction = self.open(&storage).await?;
54 let stream = transaction.stream();
55 for await item in stream {
56 yield item?;
57 }
58 }
59 }
60
61 pub fn into_stream<S>(self, storage: S) -> impl Stream<Item = Result<K, StorageError>> + 'static
62 where
63 S: AnyBlockStorage,
64 {
65 let storage = storage.clone();
66 async_stream::try_stream! {
67 let transaction = self.open(&storage).await?;
68 let stream = transaction.stream();
69 for await item in stream {
70 yield item?;
71 }
72 }
73 }
74
75 pub async fn insert<S>(&mut self, storage: &S, key: K) -> Result<(), StorageError>
76 where
77 S: AnyBlockStorage,
78 {
79 let mut transaction = self.open(storage).await?;
80 transaction.insert(key).await?;
81 self.commit(transaction).await?;
82 Ok(())
83 }
84
85 pub async fn remove<S>(&mut self, storage: &S, key: K) -> Result<bool, StorageError>
87 where
88 S: AnyBlockStorage,
89 {
90 let mut transaction = self.open(storage).await?;
91 let result = transaction.remove(key).await?;
92 if result {
93 self.commit(transaction).await?;
94 }
95 Ok(result)
96 }
97
98 pub async fn open<S>(&self, storage: &S) -> Result<CoSetTransaction<S, K>, StorageError>
99 where
100 S: AnyBlockStorage,
101 {
102 Ok(CoSetTransaction {
103 tree: match self.0.link() {
104 Some(root) => LsmTreeMap::load(storage.clone(), root).await?,
105 None => LsmTreeMap::new(storage.clone(), Default::default()),
106 },
107 })
108 }
109
110 pub async fn open_lazy<S>(&self, storage: &S) -> Result<LazyTransaction<S, Self>, StorageError>
111 where
112 S: AnyBlockStorage,
113 {
114 Ok(LazyTransaction::new(storage.clone(), self.clone()))
115 }
116
117 pub async fn commit<S>(&mut self, mut transaction: CoSetTransaction<S, K>) -> Result<(), StorageError>
119 where
120 S: AnyBlockStorage,
121 {
122 self.0 = transaction.tree.store().await?;
123 Ok(())
124 }
125
126 pub async fn with_transaction<S, F, Fut>(&mut self, storage: &S, update: F) -> Result<(), StorageError>
128 where
129 S: AnyBlockStorage,
130 F: FnOnce(CoSetTransaction<S, K>) -> Fut + Send,
131 Fut: Future<Output = Result<CoSetTransaction<S, K>, StorageError>> + Send,
132 {
133 let transaction = self.open(storage).await?;
134 let mut result = update(transaction).await?;
135 self.0 = result.tree.store().await?;
136 Ok(())
137 }
138}
139impl<K> Default for CoSet<K>
140where
141 K: Hash + Ord + Clone + Send + Sync + 'static,
142{
143 fn default() -> Self {
144 Self(Default::default())
145 }
146}
147impl<K> From<Option<Cid>> for CoSet<K>
148where
149 K: Hash + Ord + Clone + Send + Sync + 'static,
150{
151 fn from(value: Option<Cid>) -> Self {
152 Self(value.into())
153 }
154}
155impl<K> From<&CoSet<K>> for Option<Cid>
156where
157 K: Hash + Ord + Clone + Send + Sync + 'static,
158{
159 fn from(value: &CoSet<K>) -> Self {
160 *value.0.cid()
161 }
162}
163#[async_trait]
164impl<S, K> Transactionable<S> for CoSet<K>
165where
166 S: AnyBlockStorage,
167 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
168{
169 type Transaction = CoSetTransaction<S, K>;
170
171 async fn open(&self, storage: &S) -> Result<Self::Transaction, StorageError> {
172 CoSet::open(self, storage).await
173 }
174}
175impl<S, K> Streamable<S> for CoSet<K>
176where
177 S: AnyBlockStorage,
178 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
179{
180 type Item = Result<K, StorageError>;
181 type Stream = BoxStream<'static, Self::Item>;
182
183 fn stream(&self, storage: S) -> Self::Stream {
184 let collection = self.clone();
185 async_stream::try_stream! {
186 let transaction = collection.open(&storage).await?;
187 let stream = transaction.stream();
188 for await item in stream {
189 yield item?;
190 }
191 }
192 .boxed()
193 }
194}
195
196#[derive(Clone)]
197pub struct CoSetTransaction<S, K>
198where
199 S: AnyBlockStorage,
200 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
201{
202 tree: LsmTreeMap<S, K, SetValZST>,
203}
204impl<S, K> CoSetTransaction<S, K>
205where
206 S: AnyBlockStorage,
207 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
208{
209 pub async fn contains(&self, key: &K) -> Result<bool, StorageError> {
210 self.tree.contains_key(key).await
211 }
212
213 pub fn stream(&self) -> impl Stream<Item = Result<K, StorageError>> + '_ {
214 self.tree.stream().map_ok(|(key, _)| key)
215 }
216
217 pub async fn insert(&mut self, key: K) -> Result<(), StorageError> {
218 self.tree.insert(key, SetValZST).await
219 }
220
221 pub async fn remove(&mut self, key: K) -> Result<bool, StorageError> {
223 if (self.tree.get(&key).await?).is_some() {
224 self.tree.remove(key).await?;
225 Ok(true)
226 } else {
227 Ok(false)
228 }
229 }
230
231 pub async fn store(&mut self) -> Result<CoSet<K>, StorageError> {
233 let link = self.tree.store().await?;
234 Ok(CoSet(link))
235 }
236}
237
238#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Default, Serialize, Deserialize)]
243struct SetValZST;
244
245#[cfg(test)]
246mod tests {
247 use crate::{library::test::TestStorage, CoSet};
248 use futures::TryStreamExt;
249
250 #[tokio::test]
251 async fn smoke() {
252 let storage = TestStorage::default();
253 let mut set = CoSet::<i32>::default();
254 let mut transaction = set.open(&storage).await.unwrap();
255 transaction.insert(1).await.unwrap();
256 transaction.insert(2).await.unwrap();
257 set.commit(transaction).await.unwrap();
258 assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), vec![1, 2]);
259 }
260
261 #[tokio::test]
262 async fn test_remove() {
263 let storage = TestStorage::default();
264 let mut set = CoSet::<i32>::default();
265 let mut transaction = set.open(&storage).await.unwrap();
266 transaction.insert(1).await.unwrap();
267 transaction.insert(2).await.unwrap();
268 transaction.insert(3).await.unwrap();
269 transaction.remove(1).await.unwrap();
270 set.commit(transaction).await.unwrap();
271 assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), vec![2, 3]);
272
273 let mut transaction = set.open(&storage).await.unwrap();
274 transaction.remove(3).await.unwrap();
275 set.commit(transaction).await.unwrap();
276 assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), vec![2]);
277 }
278
279 #[tokio::test]
280 async fn test_remove_large() {
281 let storage = TestStorage::default();
282 let mut set = CoSet::<i32>::default();
283 let mut transaction = set.open(&storage).await.unwrap();
284 let range = 0..131072;
285 for i in range.clone() {
286 transaction.insert(i).await.unwrap();
287 }
288 set.commit(transaction).await.unwrap();
289 let mut expect = range.collect::<Vec<i32>>();
290 assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), expect);
291
292 let mut transaction = set.open(&storage).await.unwrap();
293 transaction.remove(10).await.unwrap();
294 set.commit(transaction).await.unwrap();
295 expect.remove(10);
296 assert_eq!(set.stream(&storage).try_collect::<Vec<i32>>().await.unwrap(), expect);
297 }
298}