ankurah_core/
context.rs

1use crate::{
2    changes::EntityChange,
3    entity::Entity,
4    error::{MutationError, RetrievalError},
5    livequery::{EntityLiveQuery, LiveQuery},
6    model::View,
7    node::{MatchArgs, Node},
8    policy::{AccessDenied, PolicyAgent},
9    storage::{StorageCollectionWrapper, StorageEngine},
10    transaction::Transaction,
11};
12use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
13use async_trait::async_trait;
14use std::sync::{atomic::AtomicBool, Arc};
15use tracing::debug;
16#[cfg(feature = "wasm")]
17use wasm_bindgen::prelude::*;
18
19/// Context is used to provide a local interface to fetch and subscribe to entities
20/// with a specific ContextData. Generally this means your auth token for a specific user,
21/// but ContextData is abstracted so you can use what you want.
22#[cfg_attr(feature = "wasm", wasm_bindgen)]
23#[cfg_attr(feature = "uniffi", derive(uniffi::Object))]
24pub struct Context(Arc<dyn TContext + Send + Sync + 'static>);
25impl Clone for Context {
26    fn clone(&self) -> Self { Self(self.0.clone()) }
27}
28
29pub struct NodeAndContext<SE, PA: PolicyAgent>
30where
31    SE: StorageEngine + Send + Sync + 'static,
32    PA: PolicyAgent + Send + Sync + 'static,
33{
34    pub node: Node<SE, PA>,
35    pub cdata: PA::ContextData,
36}
37
38#[async_trait]
39pub trait TContext {
40    fn node_id(&self) -> proto::EntityId;
41    /// Create a brand new entity for a transaction, and add it to the WeakEntitySet
42    /// Note that this does not actually persist the entity to the storage engine
43    /// It merely ensures that there are no duplicate entities with the same ID (except forked entities)
44    fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity;
45    fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied>;
46    async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError>;
47    fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity>;
48    async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError>;
49    async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError>;
50    fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError>;
51    async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError>;
52}
53
54#[async_trait]
55impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> TContext for NodeAndContext<SE, PA> {
56    fn node_id(&self) -> proto::EntityId { self.node.id }
57    fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity {
58        let primary_entity = self.node.entities.create(collection);
59        primary_entity.snapshot(trx_alive)
60    }
61    fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied> { self.node.policy_agent.check_write(&self.cdata, entity, None) }
62    async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError> {
63        self.get_entity(collection, id, cached).await
64    }
65    fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity> { self.node.entities.get(&id) }
66    async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
67        self.fetch_entities(collection, args).await
68    }
69    async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError> { self.commit_local_trx(trx).await }
70    fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError> {
71        EntityLiveQuery::new(&self.node, collection_id, args, self.cdata.clone())
72    }
73    async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
74        self.node.system.collection(id).await
75    }
76}
77
78// This whole impl is conditionalized by the wasm feature flag
79#[cfg(feature = "wasm")]
80#[wasm_bindgen]
81impl Context {
82    #[wasm_bindgen(js_name = "node_id")]
83    pub fn js_node_id(&self) -> proto::EntityId { self.0.node_id() }
84}
85
86// This impl may or may not have the wasm_bindgen attribute but the functions will always be defined
87#[cfg_attr(feature = "wasm", wasm_bindgen)]
88#[cfg_attr(feature = "uniffi", uniffi::export)]
89impl Context {
90    /// Begin a transaction.
91    pub fn begin(&self) -> Transaction { Transaction::new(self.0.clone()) }
92}
93
94impl Context {
95    pub fn new<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static>(
96        node: Node<SE, PA>,
97        data: PA::ContextData,
98    ) -> Self {
99        Self(Arc::new(NodeAndContext { node, cdata: data }))
100    }
101
102    pub fn node_id(&self) -> proto::EntityId { self.0.node_id() }
103
104    // TODO: Fix this - arghhh async lifetimes
105    // pub async fn trx<T, F, Fut>(self: &Arc<Self>, f: F) -> anyhow::Result<T>
106    // where
107    //     F: for<'a> FnOnce(&'a Transaction) -> Fut,
108    //     Fut: std::future::Future<Output = anyhow::Result<T>>,
109    // {
110    //     let trx = self.begin();
111    //     let result = f(&trx).await?;
112    //     trx.commit().await?;
113    //     Ok(result)
114    // }
115
116    pub async fn get<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
117        let entity = self.0.get_entity(id, &R::collection(), false).await?;
118        Ok(R::from_entity(entity))
119    }
120
121    /// Get an entity, but its ok to return early if the entity is already in the local node storage
122    pub async fn get_cached<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
123        let entity = self.0.get_entity(id, &R::collection(), true).await?;
124        Ok(R::from_entity(entity))
125    }
126
127    pub async fn fetch<R: View>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<Vec<R>, RetrievalError> {
128        let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
129        use crate::model::Model;
130        let collection_id = R::Model::collection();
131
132        let entities = self.0.fetch_entities(&collection_id, args).await?;
133
134        Ok(entities.into_iter().map(|e| R::from_entity(e)).collect())
135    }
136
137    pub async fn fetch_one<R: View + Clone + 'static>(
138        &self,
139        args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
140    ) -> Result<Option<R>, RetrievalError> {
141        let views = self.fetch::<R>(args).await?;
142        Ok(views.into_iter().next())
143    }
144    /// Subscribe to changes in entities matching a selection
145    pub fn query<R>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<LiveQuery<R>, RetrievalError>
146    where R: View {
147        let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
148        use crate::model::Model;
149        Ok(self.0.query(R::Model::collection(), args)?.map::<R>())
150    }
151
152    /// Subscribe to changes in entities matching a selection and wait for initialization
153    pub async fn query_wait<R>(
154        &self,
155        args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
156    ) -> Result<LiveQuery<R>, RetrievalError>
157    where
158        R: View,
159    {
160        let livequery = self.query::<R>(args)?;
161        livequery.wait_initialized().await;
162        Ok(livequery)
163    }
164    pub async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
165        self.0.collection(id).await
166    }
167}
168
169impl<SE, PA> NodeAndContext<SE, PA>
170where
171    SE: StorageEngine + Send + Sync + 'static,
172    PA: PolicyAgent + Send + Sync + 'static,
173{
174    /// Retrieve a single entity, either by cloning the resident Entity from the Node's WeakEntitySet or fetching from storage
175    pub(crate) async fn get_entity(
176        &self,
177        collection_id: &CollectionId,
178        id: proto::EntityId,
179        cached: bool,
180    ) -> Result<Entity, RetrievalError> {
181        debug!("Node({}).get_entity {:?}-{:?}", self.node.id, id, collection_id);
182
183        if !self.node.durable {
184            // Fetch from peers and commit first response
185            match self.node.get_from_peer(collection_id, vec![id], &self.cdata).await {
186                Ok(_) => (),
187                Err(RetrievalError::NoDurablePeers) if cached => (),
188                Err(e) => {
189                    return Err(e);
190                }
191            }
192        }
193
194        if let Some(local) = self.node.entities.get(&id) {
195            debug!("Node({}).get_entity found local entity - returning", self.node.id);
196            return Ok(local);
197        }
198        debug!("{}.get_entity fetching from storage", self.node);
199
200        let collection = self.node.collections.get(collection_id).await?;
201        match collection.get_state(id).await {
202            Ok(entity_state) => {
203                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
204                let (_changed, entity) =
205                    self.node.entities.with_state(&retriever, id, collection_id.clone(), entity_state.payload.state).await?;
206                Ok(entity)
207            }
208            Err(RetrievalError::EntityNotFound(id)) => {
209                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
210                let (_, entity) = self.node.entities.with_state(&retriever, id, collection_id.clone(), proto::State::default()).await?;
211                Ok(entity)
212            }
213            Err(e) => Err(e),
214        }
215    }
216    /// Fetch a list of entities based on a selection
217    pub async fn fetch_entities(&self, collection_id: &CollectionId, mut args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
218        self.node.policy_agent.can_access_collection(&self.cdata, collection_id)?;
219        // Fetch raw states from storage
220
221        args.selection.predicate = self.node.policy_agent.filter_predicate(&self.cdata, collection_id, args.selection.predicate)?;
222
223        // Resolve types in the AST (converts literals for JSON path comparisons)
224        args.selection = self.node.type_resolver.resolve_selection_types(args.selection);
225
226        // TODO implement cached: true
227        if !self.node.durable {
228            // Fetch from peers and commit first response
229            Ok(self.fetch_from_peer(collection_id, args.selection).await?)
230        } else {
231            let storage_collection = self.node.collections.get(collection_id).await?;
232            let states = storage_collection.fetch_states(&args.selection).await?;
233
234            // Convert states to entities
235            let mut entities = Vec::new();
236            for state in states {
237                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
238                let (_, entity) =
239                    self.node.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state).await?;
240                entities.push(entity);
241            }
242            Ok(entities)
243        }
244    }
245
246    /// Does all the things necessary to commit a local transaction
247    /// notably, the application of events to Entities works differently versus remote transactions
248    pub async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError> {
249        use std::sync::atomic::Ordering;
250
251        // Atomically mark transaction as no longer alive, preventing double-commit.
252        // compare_exchange returns Err if the value was already false (already committed/rolled back).
253        if trx.alive.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
254            return Err(MutationError::General("Transaction already committed or rolled back".into()));
255        }
256
257        // Generate events from the transaction entities
258        let trx_id = trx.id.clone();
259        let mut entity_events = Vec::new();
260        for entity in trx.entities.iter() {
261            if let Some(event) = entity.generate_commit_event()? {
262                entity_events.push((entity.clone(), event));
263            }
264        }
265
266        // Now commit the events
267        let mut attested_events = Vec::new();
268        let mut entity_attested_events = Vec::new();
269
270        // Check policy and collect attestations
271        for (entity, event) in entity_events {
272            // Create a temporary fork to apply the event for validation
273            use std::sync::atomic::AtomicBool;
274            let trx_alive = Arc::new(AtomicBool::new(true));
275            let forked = entity.snapshot(trx_alive);
276
277            // Get the canonical (upstream) entity for before state
278            let entity_before = match &entity.kind {
279                crate::entity::EntityKind::Transacted { upstream, .. } => upstream.clone(),
280                crate::entity::EntityKind::Primary => entity.clone(),
281            };
282
283            // Apply event to fork for after state
284            let collection_id = &event.collection;
285            let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
286            forked.apply_event(&retriever, &event).await?;
287
288            let attestation = self.node.policy_agent.check_event(&self.node, &self.cdata, &entity_before, &forked, &event)?;
289            let attested = Attested::opt(event.clone(), attestation);
290            attested_events.push(attested.clone());
291            entity_attested_events.push((entity, attested));
292        }
293
294        // Store events and update heads BEFORE relaying (makes entities visible to server echo)
295        for (entity, attested_event) in &entity_attested_events {
296            let collection = self.node.collections.get(&attested_event.payload.collection).await?;
297            collection.add_event(&attested_event).await?;
298            entity.commit_head(Clock::new([attested_event.payload.id()]));
299        }
300
301        // Relay to peers and wait for confirmation
302        self.node.relay_to_required_peers(&self.cdata, trx_id, &attested_events).await?;
303
304        // All peers confirmed, persist state to storage
305        let mut changes: Vec<EntityChange> = Vec::new();
306        for (entity, attested_event) in entity_attested_events {
307            let collection_id = &attested_event.payload.collection;
308            let collection = self.node.collections.get(collection_id).await?;
309
310            // Persist canonical entity (upstream for transactional forks, entity itself for primary)
311            let canonical_entity = match &entity.kind {
312                crate::entity::EntityKind::Transacted { upstream, .. } => {
313                    let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
314                    upstream.apply_event(&retriever, &attested_event.payload).await?;
315                    upstream.clone()
316                }
317                crate::entity::EntityKind::Primary => entity,
318            };
319
320            let state = canonical_entity.to_state()?;
321
322            let entity_state = EntityState { entity_id: canonical_entity.id(), collection: canonical_entity.collection().clone(), state };
323            let attestation = self.node.policy_agent.attest_state(&self.node, &entity_state);
324            let attested = Attested::opt(entity_state, attestation);
325            collection.set_state(attested).await?;
326
327            changes.push(EntityChange::new(canonical_entity, vec![attested_event])?);
328        }
329
330        // Notify reactor of ALL changes
331        self.node.reactor.notify_change(changes).await;
332        Ok(())
333    }
334
335    /// Fetch entities from the first available durable peer with known_matches support
336    async fn fetch_from_peer(
337        &self,
338        collection_id: &proto::CollectionId,
339        selection: ankql::ast::Selection,
340    ) -> Result<Vec<crate::entity::Entity>, RetrievalError> {
341        let peer_id = self.node.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
342
343        // 1. Pre-fetch known_matches from local storage
344        let known_matched_entities = self.node.fetch_entities_from_local(collection_id, &selection).await?;
345
346        let known_matches = known_matched_entities
347            .iter()
348            .map(|entity| proto::KnownEntity { entity_id: entity.id(), head: entity.head().clone() })
349            .collect();
350
351        // 2. Send fetch request with known_matches
352        let selection_clone = selection.clone();
353        match self
354            .node
355            .request(peer_id, &self.cdata, proto::NodeRequestBody::Fetch { collection: collection_id.clone(), selection, known_matches })
356            .await?
357        {
358            proto::NodeResponseBody::Fetch(deltas) => {
359                // TASK: Clarify retriever semantics for durable vs ephemeral nodes https://github.com/ankurah/ankurah/issues/144
360                let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
361
362                // 3. Apply deltas to local storage using NodeApplier
363                crate::node_applier::NodeApplier::apply_deltas(&self.node, &peer_id, deltas, &retriever).await?;
364                // ARCHITECTURAL QUESTION: Optimize in-place mutation vs re-fetching for remote-peer-assisted operations https://github.com/ankurah/ankurah/issues/145
365
366                // 4. Re-fetch entities from local storage after applying deltas
367                self.node.fetch_entities_from_local(collection_id, &selection_clone).await
368            }
369            proto::NodeResponseBody::Error(e) => {
370                tracing::debug!("Error from peer fetch: {}", e);
371                Err(RetrievalError::Other(format!("{:?}", e)))
372            }
373            _ => {
374                tracing::debug!("Unexpected response type from peer fetch");
375                Err(RetrievalError::Other("Unexpected response type".to_string()))
376            }
377        }
378    }
379}