1use std::sync::Arc;
2
3use crate::{
4 error::RetrievalError,
5 model::{Entity, View},
6 node::{MatchArgs, Node},
7 policy::PolicyAgent,
8 resultset::ResultSet,
9 storage::{StorageCollectionWrapper, StorageEngine},
10 transaction::Transaction,
11};
12use ankurah_proto as proto;
13use async_trait::async_trait;
14use tracing::info;
15
16#[cfg_attr(feature = "wasm", wasm_bindgen::prelude::wasm_bindgen)]
18pub struct Context(Arc<dyn TContext + Send + Sync + 'static>);
19impl Clone for Context {
20 fn clone(&self) -> Self { Self(self.0.clone()) }
21}
22
23pub struct NodeAndContext<SE, PA: PolicyAgent> {
24 node: Node<SE, PA>,
25 cdata: PA::ContextData,
26}
27
28#[async_trait]
29pub trait TContext {
30 fn node_id(&self) -> proto::NodeId;
31 fn next_entity_id(&self) -> proto::ID;
32 async fn get_entity(&self, id: proto::ID, collection: &proto::CollectionId) -> Result<Arc<Entity>, RetrievalError>;
33 async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Arc<Entity>>, RetrievalError>;
34 async fn insert_entity(&self, entity: Arc<Entity>) -> anyhow::Result<()>;
35 async fn commit_events(&self, events: &Vec<proto::Event>) -> anyhow::Result<()>;
36 async fn subscribe(
37 &self,
38 sub_id: proto::SubscriptionId,
39 collection: &proto::CollectionId,
40 args: MatchArgs,
41 callback: Box<dyn Fn(crate::changes::ChangeSet<Arc<Entity>>) + Send + Sync + 'static>,
42 ) -> Result<crate::subscription::SubscriptionHandle, RetrievalError>;
43 fn cloned(&self) -> Box<dyn TContext + Send + Sync + 'static>;
44 async fn collection(&self, id: &proto::CollectionId) -> StorageCollectionWrapper;
45}
46
47#[async_trait]
48impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> TContext for NodeAndContext<SE, PA> {
49 fn node_id(&self) -> proto::NodeId { self.node.id.clone() }
50 fn next_entity_id(&self) -> proto::ID { self.node.next_entity_id() }
51 async fn get_entity(&self, id: proto::ID, collection: &proto::CollectionId) -> Result<Arc<Entity>, RetrievalError> {
52 self.node.get_entity(collection, id ).await
53 }
54 async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Arc<Entity>>, RetrievalError> {
55 self.node.fetch_entities(collection, args, &self.cdata).await
56 }
57 async fn insert_entity(&self, entity: Arc<Entity>) -> anyhow::Result<()> { self.node.insert_entity(entity).await }
58 async fn commit_events(&self, events: &Vec<proto::Event>) -> anyhow::Result<()> { self.node.commit_events(events).await }
59 async fn subscribe(
60 &self,
61 sub_id: proto::SubscriptionId,
62 collection: &proto::CollectionId,
63 args: MatchArgs,
64 callback: Box<dyn Fn(crate::changes::ChangeSet<Arc<Entity>>) + Send + Sync + 'static>,
65 ) -> Result<crate::subscription::SubscriptionHandle, RetrievalError> {
66 self.node.subscribe(sub_id, collection, args, callback).await
67 }
68 fn cloned(&self) -> Box<dyn TContext + Send + Sync + 'static> {
69 Box::new(NodeAndContext { node: self.node.clone(), cdata: self.cdata.clone() })
70 }
71 async fn collection(&self, id: &proto::CollectionId) -> StorageCollectionWrapper { self.node.collection(id).await }
72}
73
74impl Context {
75 pub fn new<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static>(
76 node: Node<SE, PA>,
77 data: PA::ContextData,
78 ) -> Self {
79 Self(Arc::new(NodeAndContext { node, cdata: data }))
80 }
81
82 pub fn node_id(&self) -> proto::NodeId { self.0.node_id() }
83
84 pub fn begin(&self) -> Transaction { Transaction::new(self.0.cloned()) }
86 pub async fn get<R: View>(&self, id: proto::ID) -> Result<R, RetrievalError> {
99 let entity = self.0.get_entity(id, &R::collection()).await?;
100 Ok(R::from_entity(entity))
101 }
102
103 pub async fn fetch<R: View>(
104 &self,
105 args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
106 ) -> Result<ResultSet<R>, RetrievalError> {
107 let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
108 use crate::model::Model;
109 let collection_id = R::Model::collection();
110
111 let entities = self.0.fetch_entities(&collection_id, args).await?;
112
113 let views = entities.into_iter().map(|entity| R::from_entity(entity)).collect();
114
115 Ok(ResultSet { items: views })
116 }
117
118 pub async fn subscribe<F, R>(
120 &self,
121 args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
122 callback: F,
123 ) -> Result<crate::subscription::SubscriptionHandle, RetrievalError>
124 where
125 F: Fn(crate::changes::ChangeSet<R>) + Send + Sync + 'static,
126 R: View,
127 {
128 let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
129
130 use crate::model::Model;
131 let collection_id = R::Model::collection();
132
133 let sub_id = proto::SubscriptionId::new();
135 let handle = self
137 .0
138 .subscribe(
139 sub_id,
140 &collection_id,
141 args,
142 Box::new(move |changeset| {
143 info!("Node notified");
144 callback(changeset.into());
145 }),
146 )
147 .await?;
148 Ok(handle)
149 }
150 pub async fn collection(&self, id: &proto::CollectionId) -> StorageCollectionWrapper { self.0.collection(id).await }
151}