ankurah_core/
collectionset.rs1use std::{
2 collections::{btree_map::Entry, BTreeMap},
3 sync::Arc,
4};
5
6use ankurah_proto::CollectionId;
7use tokio::sync::RwLock;
8
9use crate::{
10 error::{MutationError, RetrievalError},
11 storage::{StorageCollectionWrapper, StorageEngine},
12};
13
14pub struct CollectionSet<SE>(Arc<Inner<SE>>);
15
16impl<SE> Clone for CollectionSet<SE> {
17 fn clone(&self) -> Self { Self(self.0.clone()) }
18}
19
20pub struct Inner<SE> {
21 storage_engine: Arc<SE>,
22 collections: RwLock<BTreeMap<CollectionId, StorageCollectionWrapper>>,
23}
24
25impl<SE: StorageEngine> CollectionSet<SE> {
26 pub fn new(storage_engine: Arc<SE>) -> Self { Self(Arc::new(Inner { storage_engine, collections: RwLock::new(BTreeMap::new()) })) }
27
28 pub async fn get(&self, id: &CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
29 let collections = self.0.collections.read().await;
30 if let Some(store) = collections.get(id) {
31 return Ok(store.clone());
32 }
33 drop(collections);
34
35 let collection = StorageCollectionWrapper::new(self.0.storage_engine.collection(id).await?);
36
37 let mut collections = self.0.collections.write().await;
38
39 if let Entry::Vacant(entry) = collections.entry(id.clone()) {
41 entry.insert(collection.clone());
42 }
43 drop(collections);
44
45 Ok(collection)
46 }
47
48 pub async fn list_collections(&self) -> Result<Vec<CollectionId>, RetrievalError> {
49 let memory_collections = self.0.collections.read().await;
51 Ok(memory_collections.keys().cloned().collect())
52 }
53
54 pub async fn delete_all_collections(&self) -> Result<bool, MutationError> {
55 {
57 let mut collections = self.0.collections.write().await;
58 collections.clear();
59 }
60
61 self.0.storage_engine.delete_all_collections().await
63 }
64}