1use crate::{
2 changes::EntityChange,
3 entity::Entity,
4 error::{MutationError, RetrievalError},
5 livequery::{EntityLiveQuery, LiveQuery},
6 model::View,
7 node::{MatchArgs, Node},
8 policy::{AccessDenied, PolicyAgent},
9 storage::{StorageCollectionWrapper, StorageEngine},
10 transaction::Transaction,
11};
12use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
13use async_trait::async_trait;
14use std::sync::{atomic::AtomicBool, Arc};
15use tracing::debug;
16#[cfg(feature = "wasm")]
17use wasm_bindgen::prelude::*;
18
19#[cfg_attr(feature = "wasm", wasm_bindgen)]
23pub struct Context(Arc<dyn TContext + Send + Sync + 'static>);
24impl Clone for Context {
25 fn clone(&self) -> Self { Self(self.0.clone()) }
26}
27
28pub struct NodeAndContext<SE, PA: PolicyAgent>
29where
30 SE: StorageEngine + Send + Sync + 'static,
31 PA: PolicyAgent + Send + Sync + 'static,
32{
33 pub node: Node<SE, PA>,
34 pub cdata: PA::ContextData,
35}
36
37#[async_trait]
38pub trait TContext {
39 fn node_id(&self) -> proto::EntityId;
40 fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity;
44 fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied>;
45 async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError>;
46 fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity>;
47 async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError>;
48 async fn commit_local_trx(&self, trx: Transaction) -> Result<(), MutationError>;
49 fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError>;
50 async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError>;
51}
52
53#[async_trait]
54impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> TContext for NodeAndContext<SE, PA> {
55 fn node_id(&self) -> proto::EntityId { self.node.id }
56 fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity {
57 let primary_entity = self.node.entities.create(collection);
58 primary_entity.snapshot(trx_alive)
59 }
60 fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied> { self.node.policy_agent.check_write(&self.cdata, entity, None) }
61 async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError> {
62 self.get_entity(collection, id, cached).await
63 }
64 fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity> { self.node.entities.get(&id) }
65 async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
66 self.fetch_entities(collection, args).await
67 }
68 async fn commit_local_trx(&self, trx: Transaction) -> Result<(), MutationError> { self.commit_local_trx(trx).await }
69 fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError> {
70 EntityLiveQuery::new(&self.node, collection_id, args, self.cdata.clone())
71 }
72 async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
73 self.node.system.collection(id).await
74 }
75}
76
77#[cfg(feature = "wasm")]
79#[wasm_bindgen]
80impl Context {
81 #[wasm_bindgen(js_name = "node_id")]
82 pub fn js_node_id(&self) -> proto::EntityId { self.0.node_id() }
83}
84
85#[cfg_attr(feature = "wasm", wasm_bindgen)]
87impl Context {
88 pub fn begin(&self) -> Transaction { Transaction::new(self.0.clone()) }
90}
91
92impl Context {
93 pub fn new<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static>(
94 node: Node<SE, PA>,
95 data: PA::ContextData,
96 ) -> Self {
97 Self(Arc::new(NodeAndContext { node, cdata: data }))
98 }
99
100 pub fn node_id(&self) -> proto::EntityId { self.0.node_id() }
101
102 pub async fn get<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
115 let entity = self.0.get_entity(id, &R::collection(), false).await?;
116 Ok(R::from_entity(entity))
117 }
118
119 pub async fn get_cached<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
121 let entity = self.0.get_entity(id, &R::collection(), true).await?;
122 Ok(R::from_entity(entity))
123 }
124
125 pub async fn fetch<R: View>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<Vec<R>, RetrievalError> {
126 let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
127 use crate::model::Model;
128 let collection_id = R::Model::collection();
129
130 let entities = self.0.fetch_entities(&collection_id, args).await?;
131
132 Ok(entities.into_iter().map(|e| R::from_entity(e)).collect())
133 }
134
135 pub async fn fetch_one<R: View + Clone + 'static>(
136 &self,
137 args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
138 ) -> Result<Option<R>, RetrievalError> {
139 let views = self.fetch::<R>(args).await?;
140 Ok(views.into_iter().next())
141 }
142 pub fn query<R>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<LiveQuery<R>, RetrievalError>
144 where R: View {
145 let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
146 use crate::model::Model;
147 Ok(self.0.query(R::Model::collection(), args)?.map::<R>())
148 }
149
150 pub async fn query_wait<R>(
152 &self,
153 args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
154 ) -> Result<LiveQuery<R>, RetrievalError>
155 where
156 R: View,
157 {
158 let livequery = self.query::<R>(args)?;
159 livequery.wait_initialized().await;
160 Ok(livequery)
161 }
162 pub async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
163 self.0.collection(id).await
164 }
165}
166
167impl<SE, PA> NodeAndContext<SE, PA>
168where
169 SE: StorageEngine + Send + Sync + 'static,
170 PA: PolicyAgent + Send + Sync + 'static,
171{
172 pub(crate) async fn get_entity(
174 &self,
175 collection_id: &CollectionId,
176 id: proto::EntityId,
177 cached: bool,
178 ) -> Result<Entity, RetrievalError> {
179 debug!("Node({}).get_entity {:?}-{:?}", self.node.id, id, collection_id);
180
181 if !self.node.durable {
182 match self.node.get_from_peer(collection_id, vec![id], &self.cdata).await {
184 Ok(_) => (),
185 Err(RetrievalError::NoDurablePeers) if cached => (),
186 Err(e) => {
187 return Err(e);
188 }
189 }
190 }
191
192 if let Some(local) = self.node.entities.get(&id) {
193 debug!("Node({}).get_entity found local entity - returning", self.node.id);
194 return Ok(local);
195 }
196 debug!("{}.get_entity fetching from storage", self.node);
197
198 let collection = self.node.collections.get(collection_id).await?;
199 match collection.get_state(id).await {
200 Ok(entity_state) => {
201 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
202 let (_changed, entity) =
203 self.node.entities.with_state(&retriever, id, collection_id.clone(), entity_state.payload.state).await?;
204 Ok(entity)
205 }
206 Err(RetrievalError::EntityNotFound(id)) => {
207 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
208 let (_, entity) = self.node.entities.with_state(&retriever, id, collection_id.clone(), proto::State::default()).await?;
209 Ok(entity)
210 }
211 Err(e) => Err(e),
212 }
213 }
214 pub async fn fetch_entities(&self, collection_id: &CollectionId, mut args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
216 self.node.policy_agent.can_access_collection(&self.cdata, collection_id)?;
217 args.selection.predicate = self.node.policy_agent.filter_predicate(&self.cdata, collection_id, args.selection.predicate)?;
220
221 if !self.node.durable {
223 Ok(self.fetch_from_peer(collection_id, args.selection).await?)
225 } else {
226 let storage_collection = self.node.collections.get(collection_id).await?;
227 let states = storage_collection.fetch_states(&args.selection).await?;
228
229 let mut entities = Vec::new();
231 for state in states {
232 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
233 let (_, entity) =
234 self.node.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state).await?;
235 entities.push(entity);
236 }
237 Ok(entities)
238 }
239 }
240
241 pub async fn commit_local_trx(&self, trx: Transaction) -> Result<(), MutationError> {
244 let (trx_id, entity_events) = trx.into_parts()?;
245 let mut attested_events = Vec::new();
246 let mut entity_attested_events = Vec::new();
247
248 for (entity, event) in entity_events {
250 use std::sync::atomic::AtomicBool;
252 let trx_alive = Arc::new(AtomicBool::new(true));
253 let forked = entity.snapshot(trx_alive);
254
255 let entity_before = match &entity.kind {
257 crate::entity::EntityKind::Transacted { upstream, .. } => upstream.clone(),
258 crate::entity::EntityKind::Primary => entity.clone(),
259 };
260
261 let collection_id = &event.collection;
263 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
264 forked.apply_event(&retriever, &event).await?;
265
266 let attestation = self.node.policy_agent.check_event(&self.node, &self.cdata, &entity_before, &forked, &event)?;
267 let attested = Attested::opt(event.clone(), attestation);
268 attested_events.push(attested.clone());
269 entity_attested_events.push((entity, attested));
270 }
271
272 for (entity, attested_event) in &entity_attested_events {
274 let collection = self.node.collections.get(&attested_event.payload.collection).await?;
275 collection.add_event(&attested_event).await?;
276 entity.commit_head(Clock::new([attested_event.payload.id()]));
277 }
278
279 self.node.relay_to_required_peers(&self.cdata, trx_id, &attested_events).await?;
281
282 let mut changes: Vec<EntityChange> = Vec::new();
284 for (entity, attested_event) in entity_attested_events {
285 let collection_id = &attested_event.payload.collection;
286 let collection = self.node.collections.get(collection_id).await?;
287
288 let canonical_entity = match &entity.kind {
290 crate::entity::EntityKind::Transacted { upstream, .. } => {
291 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
292 upstream.apply_event(&retriever, &attested_event.payload).await?;
293 upstream.clone()
294 }
295 crate::entity::EntityKind::Primary => entity,
296 };
297
298 let state = canonical_entity.to_state()?;
299
300 let entity_state = EntityState { entity_id: canonical_entity.id(), collection: canonical_entity.collection().clone(), state };
301 let attestation = self.node.policy_agent.attest_state(&self.node, &entity_state);
302 let attested = Attested::opt(entity_state, attestation);
303 collection.set_state(attested).await?;
304
305 changes.push(EntityChange::new(canonical_entity, vec![attested_event])?);
306 }
307
308 self.node.reactor.notify_change(changes).await;
310 Ok(())
311 }
312
313 async fn fetch_from_peer(
315 &self,
316 collection_id: &proto::CollectionId,
317 selection: ankql::ast::Selection,
318 ) -> Result<Vec<crate::entity::Entity>, RetrievalError> {
319 let peer_id = self.node.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
320
321 let known_matched_entities = self.node.fetch_entities_from_local(collection_id, &selection).await?;
323
324 let known_matches = known_matched_entities
325 .iter()
326 .map(|entity| proto::KnownEntity { entity_id: entity.id(), head: entity.head().clone() })
327 .collect();
328
329 let selection_clone = selection.clone();
331 match self
332 .node
333 .request(peer_id, &self.cdata, proto::NodeRequestBody::Fetch { collection: collection_id.clone(), selection, known_matches })
334 .await?
335 {
336 proto::NodeResponseBody::Fetch(deltas) => {
337 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
339
340 crate::node_applier::NodeApplier::apply_deltas(&self.node, &peer_id, deltas, &retriever).await?;
342 self.node.fetch_entities_from_local(collection_id, &selection_clone).await
346 }
347 proto::NodeResponseBody::Error(e) => {
348 tracing::debug!("Error from peer fetch: {}", e);
349 Err(RetrievalError::Other(format!("{:?}", e)))
350 }
351 _ => {
352 tracing::debug!("Unexpected response type from peer fetch");
353 Err(RetrievalError::Other("Unexpected response type".to_string()))
354 }
355 }
356 }
357}