ankurah_core/
collectionset.rs

1use std::{
2    collections::{btree_map::Entry, BTreeMap},
3    sync::Arc,
4};
5
6use ankurah_proto::CollectionId;
7use tokio::sync::RwLock;
8
9use crate::{
10    error::{MutationError, RetrievalError},
11    storage::{StorageCollectionWrapper, StorageEngine},
12};
13
14pub struct CollectionSet<SE>(Arc<Inner<SE>>);
15
16impl<SE> Clone for CollectionSet<SE> {
17    fn clone(&self) -> Self { Self(self.0.clone()) }
18}
19
20pub struct Inner<SE> {
21    storage_engine: Arc<SE>,
22    collections: RwLock<BTreeMap<CollectionId, StorageCollectionWrapper>>,
23}
24
25impl<SE: StorageEngine> CollectionSet<SE> {
26    pub fn new(storage_engine: Arc<SE>) -> Self { Self(Arc::new(Inner { storage_engine, collections: RwLock::new(BTreeMap::new()) })) }
27
28    pub async fn get(&self, id: &CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
29        let collections = self.0.collections.read().await;
30        if let Some(store) = collections.get(id) {
31            return Ok(store.clone());
32        }
33        drop(collections);
34
35        let collection = StorageCollectionWrapper::new(self.0.storage_engine.collection(id).await?);
36
37        let mut collections = self.0.collections.write().await;
38
39        // We might have raced with another node to create this collection
40        if let Entry::Vacant(entry) = collections.entry(id.clone()) {
41            entry.insert(collection.clone());
42        }
43        drop(collections);
44
45        Ok(collection)
46    }
47
48    pub async fn list_collections(&self) -> Result<Vec<CollectionId>, RetrievalError> {
49        // Just return collections we have in memory
50        let memory_collections = self.0.collections.read().await;
51        Ok(memory_collections.keys().cloned().collect())
52    }
53
54    pub async fn delete_all_collections(&self) -> Result<bool, MutationError> {
55        // Clear in-memory collections first
56        {
57            let mut collections = self.0.collections.write().await;
58            collections.clear();
59        }
60
61        // Then delete all collections from storage
62        self.0.storage_engine.delete_all_collections().await
63    }
64}