ankurah_core/
context.rs

1use crate::{
2    changes::EntityChange,
3    entity::Entity,
4    error::{MutationError, RetrievalError},
5    livequery::{EntityLiveQuery, LiveQuery},
6    model::View,
7    node::{MatchArgs, Node},
8    policy::{AccessDenied, PolicyAgent},
9    storage::{StorageCollectionWrapper, StorageEngine},
10    transaction::Transaction,
11};
12use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
13use async_trait::async_trait;
14use std::sync::{atomic::AtomicBool, Arc};
15use tracing::debug;
16#[cfg(feature = "wasm")]
17use wasm_bindgen::prelude::*;
18
19/// Context is used to provide a local interface to fetch and subscribe to entities
20/// with a specific ContextData. Generally this means your auth token for a specific user,
21/// but ContextData is abstracted so you can use what you want.
22#[cfg_attr(feature = "wasm", wasm_bindgen)]
23pub struct Context(Arc<dyn TContext + Send + Sync + 'static>);
24impl Clone for Context {
25    fn clone(&self) -> Self { Self(self.0.clone()) }
26}
27
28pub struct NodeAndContext<SE, PA: PolicyAgent>
29where
30    SE: StorageEngine + Send + Sync + 'static,
31    PA: PolicyAgent + Send + Sync + 'static,
32{
33    pub node: Node<SE, PA>,
34    pub cdata: PA::ContextData,
35}
36
37#[async_trait]
38pub trait TContext {
39    fn node_id(&self) -> proto::EntityId;
40    /// Create a brand new entity for a transaction, and add it to the WeakEntitySet
41    /// Note that this does not actually persist the entity to the storage engine
42    /// It merely ensures that there are no duplicate entities with the same ID (except forked entities)
43    fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity;
44    fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied>;
45    async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError>;
46    fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity>;
47    async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError>;
48    async fn commit_local_trx(&self, trx: Transaction) -> Result<(), MutationError>;
49    fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError>;
50    async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError>;
51}
52
53#[async_trait]
54impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> TContext for NodeAndContext<SE, PA> {
55    fn node_id(&self) -> proto::EntityId { self.node.id }
56    fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity {
57        let primary_entity = self.node.entities.create(collection);
58        primary_entity.snapshot(trx_alive)
59    }
60    fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied> { self.node.policy_agent.check_write(&self.cdata, entity, None) }
61    async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError> {
62        self.get_entity(collection, id, cached).await
63    }
64    fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity> { self.node.entities.get(&id) }
65    async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
66        self.fetch_entities(collection, args).await
67    }
68    async fn commit_local_trx(&self, trx: Transaction) -> Result<(), MutationError> { self.commit_local_trx(trx).await }
69    fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError> {
70        EntityLiveQuery::new(&self.node, collection_id, args, self.cdata.clone())
71    }
72    async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
73        self.node.system.collection(id).await
74    }
75}
76
77// This whole impl is conditionalized by the wasm feature flag
78#[cfg(feature = "wasm")]
79#[wasm_bindgen]
80impl Context {
81    #[wasm_bindgen(js_name = "node_id")]
82    pub fn js_node_id(&self) -> proto::EntityId { self.0.node_id() }
83}
84
85// This impl may or may not have the wasm_bindgen attribute but the functions will always be defined
86#[cfg_attr(feature = "wasm", wasm_bindgen)]
87impl Context {
88    /// Begin a transaction.
89    pub fn begin(&self) -> Transaction { Transaction::new(self.0.clone()) }
90}
91
92impl Context {
93    pub fn new<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static>(
94        node: Node<SE, PA>,
95        data: PA::ContextData,
96    ) -> Self {
97        Self(Arc::new(NodeAndContext { node, cdata: data }))
98    }
99
100    pub fn node_id(&self) -> proto::EntityId { self.0.node_id() }
101
102    // TODO: Fix this - arghhh async lifetimes
103    // pub async fn trx<T, F, Fut>(self: &Arc<Self>, f: F) -> anyhow::Result<T>
104    // where
105    //     F: for<'a> FnOnce(&'a Transaction) -> Fut,
106    //     Fut: std::future::Future<Output = anyhow::Result<T>>,
107    // {
108    //     let trx = self.begin();
109    //     let result = f(&trx).await?;
110    //     trx.commit().await?;
111    //     Ok(result)
112    // }
113
114    pub async fn get<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
115        let entity = self.0.get_entity(id, &R::collection(), false).await?;
116        Ok(R::from_entity(entity))
117    }
118
119    /// Get an entity, but its ok to return early if the entity is already in the local node storage
120    pub async fn get_cached<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
121        let entity = self.0.get_entity(id, &R::collection(), true).await?;
122        Ok(R::from_entity(entity))
123    }
124
125    pub async fn fetch<R: View>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<Vec<R>, RetrievalError> {
126        let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
127        use crate::model::Model;
128        let collection_id = R::Model::collection();
129
130        let entities = self.0.fetch_entities(&collection_id, args).await?;
131
132        Ok(entities.into_iter().map(|e| R::from_entity(e)).collect())
133    }
134
135    pub async fn fetch_one<R: View + Clone + 'static>(
136        &self,
137        args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
138    ) -> Result<Option<R>, RetrievalError> {
139        let views = self.fetch::<R>(args).await?;
140        Ok(views.into_iter().next())
141    }
142    /// Subscribe to changes in entities matching a selection
143    pub fn query<R>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<LiveQuery<R>, RetrievalError>
144    where R: View {
145        let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
146        use crate::model::Model;
147        Ok(self.0.query(R::Model::collection(), args)?.map::<R>())
148    }
149
150    /// Subscribe to changes in entities matching a selection and wait for initialization
151    pub async fn query_wait<R>(
152        &self,
153        args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
154    ) -> Result<LiveQuery<R>, RetrievalError>
155    where
156        R: View,
157    {
158        let livequery = self.query::<R>(args)?;
159        livequery.wait_initialized().await;
160        Ok(livequery)
161    }
162    pub async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
163        self.0.collection(id).await
164    }
165}
166
167impl<SE, PA> NodeAndContext<SE, PA>
168where
169    SE: StorageEngine + Send + Sync + 'static,
170    PA: PolicyAgent + Send + Sync + 'static,
171{
172    /// Retrieve a single entity, either by cloning the resident Entity from the Node's WeakEntitySet or fetching from storage
173    pub(crate) async fn get_entity(
174        &self,
175        collection_id: &CollectionId,
176        id: proto::EntityId,
177        cached: bool,
178    ) -> Result<Entity, RetrievalError> {
179        debug!("Node({}).get_entity {:?}-{:?}", self.node.id, id, collection_id);
180
181        if !self.node.durable {
182            // Fetch from peers and commit first response
183            match self.node.get_from_peer(collection_id, vec![id], &self.cdata).await {
184                Ok(_) => (),
185                Err(RetrievalError::NoDurablePeers) if cached => (),
186                Err(e) => {
187                    return Err(e);
188                }
189            }
190        }
191
192        if let Some(local) = self.node.entities.get(&id) {
193            debug!("Node({}).get_entity found local entity - returning", self.node.id);
194            return Ok(local);
195        }
196        debug!("{}.get_entity fetching from storage", self.node);
197
198        let collection = self.node.collections.get(collection_id).await?;
199        match collection.get_state(id).await {
200            Ok(entity_state) => {
201                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
202                let (_changed, entity) =
203                    self.node.entities.with_state(&retriever, id, collection_id.clone(), entity_state.payload.state).await?;
204                Ok(entity)
205            }
206            Err(RetrievalError::EntityNotFound(id)) => {
207                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
208                let (_, entity) = self.node.entities.with_state(&retriever, id, collection_id.clone(), proto::State::default()).await?;
209                Ok(entity)
210            }
211            Err(e) => Err(e),
212        }
213    }
214    /// Fetch a list of entities based on a selection
215    pub async fn fetch_entities(&self, collection_id: &CollectionId, mut args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
216        self.node.policy_agent.can_access_collection(&self.cdata, collection_id)?;
217        // Fetch raw states from storage
218
219        args.selection.predicate = self.node.policy_agent.filter_predicate(&self.cdata, collection_id, args.selection.predicate)?;
220
221        // TODO implement cached: true
222        if !self.node.durable {
223            // Fetch from peers and commit first response
224            Ok(self.fetch_from_peer(collection_id, args.selection).await?)
225        } else {
226            let storage_collection = self.node.collections.get(collection_id).await?;
227            let states = storage_collection.fetch_states(&args.selection).await?;
228
229            // Convert states to entities
230            let mut entities = Vec::new();
231            for state in states {
232                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
233                let (_, entity) =
234                    self.node.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state).await?;
235                entities.push(entity);
236            }
237            Ok(entities)
238        }
239    }
240
241    /// Does all the things necessary to commit a local transaction
242    /// notably, the application of events to Entities works differently versus remote transactions
243    pub async fn commit_local_trx(&self, trx: Transaction) -> Result<(), MutationError> {
244        let (trx_id, entity_events) = trx.into_parts()?;
245        let mut attested_events = Vec::new();
246        let mut entity_attested_events = Vec::new();
247
248        // Check policy and collect attestations
249        for (entity, event) in entity_events {
250            let attestation = self.node.policy_agent.check_event(&self.node, &self.cdata, &entity, &event)?;
251            let attested = Attested::opt(event.clone(), attestation);
252            attested_events.push(attested.clone());
253            entity_attested_events.push((entity, attested));
254        }
255
256        // Relay to peers and wait for confirmation
257        self.node.relay_to_required_peers(&self.cdata, trx_id, &attested_events).await?;
258
259        // All peers confirmed, now we can update local state
260        let mut changes: Vec<EntityChange> = Vec::new();
261        for (entity, attested_event) in entity_attested_events {
262            let collection = self.node.collections.get(&attested_event.payload.collection).await?;
263            collection.add_event(&attested_event).await?;
264            entity.commit_head(Clock::new([attested_event.payload.id()]));
265
266            let collection_id = &attested_event.payload.collection;
267            // If this entity has an upstream, propagate the changes
268            if let crate::entity::EntityKind::Transacted { upstream, .. } = &entity.kind {
269                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
270                upstream.apply_event(&retriever, &attested_event.payload).await?;
271            }
272
273            // Persist
274
275            let state = entity.to_state()?;
276
277            let entity_state = EntityState { entity_id: entity.id(), collection: entity.collection().clone(), state };
278            let attestation = self.node.policy_agent.attest_state(&self.node, &entity_state);
279            let attested = Attested::opt(entity_state, attestation);
280            collection.set_state(attested).await?;
281
282            changes.push(EntityChange::new(entity.clone(), vec![attested_event])?);
283        }
284
285        // Notify reactor of ALL changes
286        self.node.reactor.notify_change(changes).await;
287        Ok(())
288    }
289
290    /// Fetch entities from the first available durable peer with known_matches support
291    async fn fetch_from_peer(
292        &self,
293        collection_id: &proto::CollectionId,
294        selection: ankql::ast::Selection,
295    ) -> Result<Vec<crate::entity::Entity>, RetrievalError> {
296        let peer_id = self.node.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
297
298        // 1. Pre-fetch known_matches from local storage
299        let known_matched_entities = self.node.fetch_entities_from_local(collection_id, &selection).await?;
300
301        let known_matches = known_matched_entities
302            .iter()
303            .map(|entity| proto::KnownEntity { entity_id: entity.id(), head: entity.head().clone() })
304            .collect();
305
306        // 2. Send fetch request with known_matches
307        let selection_clone = selection.clone();
308        match self
309            .node
310            .request(peer_id, &self.cdata, proto::NodeRequestBody::Fetch { collection: collection_id.clone(), selection, known_matches })
311            .await?
312        {
313            proto::NodeResponseBody::Fetch(deltas) => {
314                // TASK: Clarify retriever semantics for durable vs ephemeral nodes https://github.com/ankurah/ankurah/issues/144
315                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
316
317                // 3. Apply deltas to local storage using NodeApplier
318                crate::node_applier::NodeApplier::apply_deltas(&self.node, &peer_id, deltas, &retriever).await?;
319                // ARCHITECTURAL QUESTION: Optimize in-place mutation vs re-fetching for remote-peer-assisted operations https://github.com/ankurah/ankurah/issues/145
320
321                // 4. Re-fetch entities from local storage after applying deltas
322                self.node.fetch_entities_from_local(collection_id, &selection_clone).await
323            }
324            proto::NodeResponseBody::Error(e) => {
325                tracing::debug!("Error from peer fetch: {}", e);
326                Err(RetrievalError::Other(format!("{:?}", e)))
327            }
328            _ => {
329                tracing::debug!("Unexpected response type from peer fetch");
330                Err(RetrievalError::Other("Unexpected response type".to_string()))
331            }
332        }
333    }
334}