ankurah_core/
context.rs

1use std::sync::Arc;
2
3use crate::{
4    error::RetrievalError,
5    model::{Entity, View},
6    node::{MatchArgs, Node},
7    policy::PolicyAgent,
8    resultset::ResultSet,
9    storage::{StorageCollectionWrapper, StorageEngine},
10    transaction::Transaction,
11};
12use ankurah_proto as proto;
13use async_trait::async_trait;
14use tracing::info;
15
16/// Type-erased context wrapper
17#[cfg_attr(feature = "wasm", wasm_bindgen::prelude::wasm_bindgen)]
18pub struct Context(Arc<dyn TContext + Send + Sync + 'static>);
19impl Clone for Context {
20    fn clone(&self) -> Self { Self(self.0.clone()) }
21}
22
23pub struct NodeAndContext<SE, PA: PolicyAgent> {
24    node: Node<SE, PA>,
25    cdata: PA::ContextData,
26}
27
28#[async_trait]
29pub trait TContext {
30    fn node_id(&self) -> proto::NodeId;
31    fn next_entity_id(&self) -> proto::ID;
32    async fn get_entity(&self, id: proto::ID, collection: &proto::CollectionId) -> Result<Arc<Entity>, RetrievalError>;
33    async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Arc<Entity>>, RetrievalError>;
34    async fn insert_entity(&self, entity: Arc<Entity>) -> anyhow::Result<()>;
35    async fn commit_events(&self, events: &Vec<proto::Event>) -> anyhow::Result<()>;
36    async fn subscribe(
37        &self,
38        sub_id: proto::SubscriptionId,
39        collection: &proto::CollectionId,
40        args: MatchArgs,
41        callback: Box<dyn Fn(crate::changes::ChangeSet<Arc<Entity>>) + Send + Sync + 'static>,
42    ) -> Result<crate::subscription::SubscriptionHandle, RetrievalError>;
43    fn cloned(&self) -> Box<dyn TContext + Send + Sync + 'static>;
44    async fn collection(&self, id: &proto::CollectionId) -> StorageCollectionWrapper;
45}
46
47#[async_trait]
48impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> TContext for NodeAndContext<SE, PA> {
49    fn node_id(&self) -> proto::NodeId { self.node.id.clone() }
50    fn next_entity_id(&self) -> proto::ID { self.node.next_entity_id() }
51    async fn get_entity(&self, id: proto::ID, collection: &proto::CollectionId) -> Result<Arc<Entity>, RetrievalError> {
52        self.node.get_entity(collection, id /*&self.cdata*/).await
53    }
54    async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Arc<Entity>>, RetrievalError> {
55        self.node.fetch_entities(collection, args, &self.cdata).await
56    }
57    async fn insert_entity(&self, entity: Arc<Entity>) -> anyhow::Result<()> { self.node.insert_entity(entity).await }
58    async fn commit_events(&self, events: &Vec<proto::Event>) -> anyhow::Result<()> { self.node.commit_events(events).await }
59    async fn subscribe(
60        &self,
61        sub_id: proto::SubscriptionId,
62        collection: &proto::CollectionId,
63        args: MatchArgs,
64        callback: Box<dyn Fn(crate::changes::ChangeSet<Arc<Entity>>) + Send + Sync + 'static>,
65    ) -> Result<crate::subscription::SubscriptionHandle, RetrievalError> {
66        self.node.subscribe(sub_id, collection, args, callback).await
67    }
68    fn cloned(&self) -> Box<dyn TContext + Send + Sync + 'static> {
69        Box::new(NodeAndContext { node: self.node.clone(), cdata: self.cdata.clone() })
70    }
71    async fn collection(&self, id: &proto::CollectionId) -> StorageCollectionWrapper { self.node.collection(id).await }
72}
73
74impl Context {
75    pub fn new<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static>(
76        node: Node<SE, PA>,
77        data: PA::ContextData,
78    ) -> Self {
79        Self(Arc::new(NodeAndContext { node, cdata: data }))
80    }
81
82    pub fn node_id(&self) -> proto::NodeId { self.0.node_id() }
83
84    /// Begin a transaction.
85    pub fn begin(&self) -> Transaction { Transaction::new(self.0.cloned()) }
86    // TODO: Fix this - arghhh async lifetimes
87    // pub async fn trx<T, F, Fut>(self: &Arc<Self>, f: F) -> anyhow::Result<T>
88    // where
89    //     F: for<'a> FnOnce(&'a Transaction) -> Fut,
90    //     Fut: std::future::Future<Output = anyhow::Result<T>>,
91    // {
92    //     let trx = self.begin();
93    //     let result = f(&trx).await?;
94    //     trx.commit().await?;
95    //     Ok(result)
96    // }
97
98    pub async fn get<R: View>(&self, id: proto::ID) -> Result<R, RetrievalError> {
99        let entity = self.0.get_entity(id, &R::collection()).await?;
100        Ok(R::from_entity(entity))
101    }
102
103    pub async fn fetch<R: View>(
104        &self,
105        args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
106    ) -> Result<ResultSet<R>, RetrievalError> {
107        let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
108        use crate::model::Model;
109        let collection_id = R::Model::collection();
110
111        let entities = self.0.fetch_entities(&collection_id, args).await?;
112
113        let views = entities.into_iter().map(|entity| R::from_entity(entity)).collect();
114
115        Ok(ResultSet { items: views })
116    }
117
118    /// Subscribe to changes in entities matching a predicate
119    pub async fn subscribe<F, R>(
120        &self,
121        args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
122        callback: F,
123    ) -> Result<crate::subscription::SubscriptionHandle, RetrievalError>
124    where
125        F: Fn(crate::changes::ChangeSet<R>) + Send + Sync + 'static,
126        R: View,
127    {
128        let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
129
130        use crate::model::Model;
131        let collection_id = R::Model::collection();
132
133        // Using one subscription id for local and remote subscriptions
134        let sub_id = proto::SubscriptionId::new();
135        // Now set up our local subscription
136        let handle = self
137            .0
138            .subscribe(
139                sub_id,
140                &collection_id,
141                args,
142                Box::new(move |changeset| {
143                    info!("Node notified");
144                    callback(changeset.into());
145                }),
146            )
147            .await?;
148        Ok(handle)
149    }
150    pub async fn collection(&self, id: &proto::CollectionId) -> StorageCollectionWrapper { self.0.collection(id).await }
151}