1use crate::{
2 changes::EntityChange,
3 entity::Entity,
4 error::{MutationError, RetrievalError},
5 livequery::{EntityLiveQuery, LiveQuery},
6 model::View,
7 node::{MatchArgs, Node},
8 policy::{AccessDenied, PolicyAgent},
9 storage::{StorageCollectionWrapper, StorageEngine},
10 transaction::Transaction,
11};
12use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
13use async_trait::async_trait;
14use std::sync::{atomic::AtomicBool, Arc};
15use tracing::debug;
16#[cfg(feature = "wasm")]
17use wasm_bindgen::prelude::*;
18
19#[cfg_attr(feature = "wasm", wasm_bindgen)]
23#[cfg_attr(feature = "uniffi", derive(uniffi::Object))]
24pub struct Context(Arc<dyn TContext + Send + Sync + 'static>);
25impl Clone for Context {
26 fn clone(&self) -> Self { Self(self.0.clone()) }
27}
28
29pub struct NodeAndContext<SE, PA: PolicyAgent>
30where
31 SE: StorageEngine + Send + Sync + 'static,
32 PA: PolicyAgent + Send + Sync + 'static,
33{
34 pub node: Node<SE, PA>,
35 pub cdata: PA::ContextData,
36}
37
38#[async_trait]
39pub trait TContext {
40 fn node_id(&self) -> proto::EntityId;
41 fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity;
45 fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied>;
46 async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError>;
47 fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity>;
48 async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError>;
49 async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError>;
50 fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError>;
51 async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError>;
52}
53
54#[async_trait]
55impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> TContext for NodeAndContext<SE, PA> {
56 fn node_id(&self) -> proto::EntityId { self.node.id }
57 fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity {
58 let primary_entity = self.node.entities.create(collection);
59 primary_entity.snapshot(trx_alive)
60 }
61 fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied> { self.node.policy_agent.check_write(&self.cdata, entity, None) }
62 async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError> {
63 self.get_entity(collection, id, cached).await
64 }
65 fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity> { self.node.entities.get(&id) }
66 async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
67 self.fetch_entities(collection, args).await
68 }
69 async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError> { self.commit_local_trx(trx).await }
70 fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError> {
71 EntityLiveQuery::new(&self.node, collection_id, args, self.cdata.clone())
72 }
73 async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
74 self.node.system.collection(id).await
75 }
76}
77
78#[cfg(feature = "wasm")]
80#[wasm_bindgen]
81impl Context {
82 #[wasm_bindgen(js_name = "node_id")]
83 pub fn js_node_id(&self) -> proto::EntityId { self.0.node_id() }
84}
85
86#[cfg_attr(feature = "wasm", wasm_bindgen)]
88#[cfg_attr(feature = "uniffi", uniffi::export)]
89impl Context {
90 pub fn begin(&self) -> Transaction { Transaction::new(self.0.clone()) }
92}
93
94impl Context {
95 pub fn new<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static>(
96 node: Node<SE, PA>,
97 data: PA::ContextData,
98 ) -> Self {
99 Self(Arc::new(NodeAndContext { node, cdata: data }))
100 }
101
102 pub fn node_id(&self) -> proto::EntityId { self.0.node_id() }
103
104 pub async fn get<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
117 let entity = self.0.get_entity(id, &R::collection(), false).await?;
118 Ok(R::from_entity(entity))
119 }
120
121 pub async fn get_cached<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
123 let entity = self.0.get_entity(id, &R::collection(), true).await?;
124 Ok(R::from_entity(entity))
125 }
126
127 pub async fn fetch<R: View>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<Vec<R>, RetrievalError> {
128 let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
129 use crate::model::Model;
130 let collection_id = R::Model::collection();
131
132 let entities = self.0.fetch_entities(&collection_id, args).await?;
133
134 Ok(entities.into_iter().map(|e| R::from_entity(e)).collect())
135 }
136
137 pub async fn fetch_one<R: View + Clone + 'static>(
138 &self,
139 args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
140 ) -> Result<Option<R>, RetrievalError> {
141 let views = self.fetch::<R>(args).await?;
142 Ok(views.into_iter().next())
143 }
144 pub fn query<R>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<LiveQuery<R>, RetrievalError>
146 where R: View {
147 let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
148 use crate::model::Model;
149 Ok(self.0.query(R::Model::collection(), args)?.map::<R>())
150 }
151
152 pub async fn query_wait<R>(
154 &self,
155 args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
156 ) -> Result<LiveQuery<R>, RetrievalError>
157 where
158 R: View,
159 {
160 let livequery = self.query::<R>(args)?;
161 livequery.wait_initialized().await;
162 Ok(livequery)
163 }
164 pub async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
165 self.0.collection(id).await
166 }
167}
168
169impl<SE, PA> NodeAndContext<SE, PA>
170where
171 SE: StorageEngine + Send + Sync + 'static,
172 PA: PolicyAgent + Send + Sync + 'static,
173{
174 pub(crate) async fn get_entity(
176 &self,
177 collection_id: &CollectionId,
178 id: proto::EntityId,
179 cached: bool,
180 ) -> Result<Entity, RetrievalError> {
181 debug!("Node({}).get_entity {:?}-{:?}", self.node.id, id, collection_id);
182
183 if !self.node.durable {
184 match self.node.get_from_peer(collection_id, vec![id], &self.cdata).await {
186 Ok(_) => (),
187 Err(RetrievalError::NoDurablePeers) if cached => (),
188 Err(e) => {
189 return Err(e);
190 }
191 }
192 }
193
194 if let Some(local) = self.node.entities.get(&id) {
195 debug!("Node({}).get_entity found local entity - returning", self.node.id);
196 return Ok(local);
197 }
198 debug!("{}.get_entity fetching from storage", self.node);
199
200 let collection = self.node.collections.get(collection_id).await?;
201 match collection.get_state(id).await {
202 Ok(entity_state) => {
203 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
204 let (_changed, entity) =
205 self.node.entities.with_state(&retriever, id, collection_id.clone(), entity_state.payload.state).await?;
206 Ok(entity)
207 }
208 Err(RetrievalError::EntityNotFound(id)) => {
209 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
210 let (_, entity) = self.node.entities.with_state(&retriever, id, collection_id.clone(), proto::State::default()).await?;
211 Ok(entity)
212 }
213 Err(e) => Err(e),
214 }
215 }
216 pub async fn fetch_entities(&self, collection_id: &CollectionId, mut args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
218 self.node.policy_agent.can_access_collection(&self.cdata, collection_id)?;
219 args.selection.predicate = self.node.policy_agent.filter_predicate(&self.cdata, collection_id, args.selection.predicate)?;
222
223 args.selection = self.node.type_resolver.resolve_selection_types(args.selection);
225
226 if !self.node.durable {
228 Ok(self.fetch_from_peer(collection_id, args.selection).await?)
230 } else {
231 let storage_collection = self.node.collections.get(collection_id).await?;
232 let states = storage_collection.fetch_states(&args.selection).await?;
233
234 let mut entities = Vec::new();
236 for state in states {
237 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
238 let (_, entity) =
239 self.node.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state).await?;
240 entities.push(entity);
241 }
242 Ok(entities)
243 }
244 }
245
246 pub async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError> {
249 use std::sync::atomic::Ordering;
250
251 if trx.alive.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
254 return Err(MutationError::General("Transaction already committed or rolled back".into()));
255 }
256
257 let trx_id = trx.id.clone();
259 let mut entity_events = Vec::new();
260 for entity in trx.entities.iter() {
261 if let Some(event) = entity.generate_commit_event()? {
262 entity_events.push((entity.clone(), event));
263 }
264 }
265
266 let mut attested_events = Vec::new();
268 let mut entity_attested_events = Vec::new();
269
270 for (entity, event) in entity_events {
272 use std::sync::atomic::AtomicBool;
274 let trx_alive = Arc::new(AtomicBool::new(true));
275 let forked = entity.snapshot(trx_alive);
276
277 let entity_before = match &entity.kind {
279 crate::entity::EntityKind::Transacted { upstream, .. } => upstream.clone(),
280 crate::entity::EntityKind::Primary => entity.clone(),
281 };
282
283 let collection_id = &event.collection;
285 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
286 forked.apply_event(&retriever, &event).await?;
287
288 let attestation = self.node.policy_agent.check_event(&self.node, &self.cdata, &entity_before, &forked, &event)?;
289 let attested = Attested::opt(event.clone(), attestation);
290 attested_events.push(attested.clone());
291 entity_attested_events.push((entity, attested));
292 }
293
294 for (entity, attested_event) in &entity_attested_events {
296 let collection = self.node.collections.get(&attested_event.payload.collection).await?;
297 collection.add_event(&attested_event).await?;
298 entity.commit_head(Clock::new([attested_event.payload.id()]));
299 }
300
301 self.node.relay_to_required_peers(&self.cdata, trx_id, &attested_events).await?;
303
304 let mut changes: Vec<EntityChange> = Vec::new();
306 for (entity, attested_event) in entity_attested_events {
307 let collection_id = &attested_event.payload.collection;
308 let collection = self.node.collections.get(collection_id).await?;
309
310 let canonical_entity = match &entity.kind {
312 crate::entity::EntityKind::Transacted { upstream, .. } => {
313 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
314 upstream.apply_event(&retriever, &attested_event.payload).await?;
315 upstream.clone()
316 }
317 crate::entity::EntityKind::Primary => entity,
318 };
319
320 let state = canonical_entity.to_state()?;
321
322 let entity_state = EntityState { entity_id: canonical_entity.id(), collection: canonical_entity.collection().clone(), state };
323 let attestation = self.node.policy_agent.attest_state(&self.node, &entity_state);
324 let attested = Attested::opt(entity_state, attestation);
325 collection.set_state(attested).await?;
326
327 changes.push(EntityChange::new(canonical_entity, vec![attested_event])?);
328 }
329
330 self.node.reactor.notify_change(changes).await;
332 Ok(())
333 }
334
335 async fn fetch_from_peer(
337 &self,
338 collection_id: &proto::CollectionId,
339 selection: ankql::ast::Selection,
340 ) -> Result<Vec<crate::entity::Entity>, RetrievalError> {
341 let peer_id = self.node.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
342
343 let known_matched_entities = self.node.fetch_entities_from_local(collection_id, &selection).await?;
345
346 let known_matches = known_matched_entities
347 .iter()
348 .map(|entity| proto::KnownEntity { entity_id: entity.id(), head: entity.head().clone() })
349 .collect();
350
351 let selection_clone = selection.clone();
353 match self
354 .node
355 .request(peer_id, &self.cdata, proto::NodeRequestBody::Fetch { collection: collection_id.clone(), selection, known_matches })
356 .await?
357 {
358 proto::NodeResponseBody::Fetch(deltas) => {
359 let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
361
362 crate::node_applier::NodeApplier::apply_deltas(&self.node, &peer_id, deltas, &retriever).await?;
364 self.node.fetch_entities_from_local(collection_id, &selection_clone).await
368 }
369 proto::NodeResponseBody::Error(e) => {
370 tracing::debug!("Error from peer fetch: {}", e);
371 Err(RetrievalError::Other(format!("{:?}", e)))
372 }
373 _ => {
374 tracing::debug!("Unexpected response type from peer fetch");
375 Err(RetrievalError::Other("Unexpected response type".to_string()))
376 }
377 }
378 }
379}