Skip to main content

ankurah_core/
transaction.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use ankurah_proto::{self as proto, EntityId};
5
6use crate::error::RetrievalError;
7use crate::policy::AccessDenied;
8use crate::{
9    context::TContext,
10    entity::Entity,
11    error::MutationError,
12    model::{Model, MutableBorrow},
13};
14
15use append_only_vec::AppendOnlyVec;
16
17#[cfg(feature = "wasm")]
18use wasm_bindgen::prelude::*;
19
20// Q. When do we want unified vs individual property storage for TypeEngine operations?
21// A. When we start to care about differentiating possible recipients for different properties.
22
23#[cfg_attr(feature = "wasm", wasm_bindgen)]
24#[cfg_attr(feature = "uniffi", derive(uniffi::Object))]
25pub struct Transaction {
26    pub(crate) dyncontext: Arc<dyn TContext + Send + Sync + 'static>,
27    pub(crate) id: proto::TransactionId,
28    pub(crate) entities: AppendOnlyVec<Entity>,
29    pub(crate) alive: Arc<AtomicBool>,
30    /// Entity IDs that were created in this transaction via create().
31    /// Used to validate that creation events (empty parent) are only for entities
32    /// that were actually created in this transaction, not phantom entities.
33    pub(crate) created_entity_ids: std::sync::RwLock<std::collections::HashSet<EntityId>>,
34}
35
36#[cfg(feature = "wasm")]
37#[wasm_bindgen]
38impl Transaction {
39    #[wasm_bindgen(js_name = "commit")]
40    pub async fn js_commit(self) -> Result<(), JsValue> {
41        self.dyncontext.commit_local_trx(&self).await?;
42        Ok(())
43    }
44}
45
46impl Transaction {
47    pub(crate) fn new(dyncontext: Arc<dyn TContext + Send + Sync + 'static>) -> Self {
48        Self {
49            dyncontext,
50            id: proto::TransactionId::new(),
51            entities: AppendOnlyVec::new(),
52            alive: Arc::new(AtomicBool::new(true)),
53            created_entity_ids: std::sync::RwLock::new(std::collections::HashSet::new()),
54        }
55    }
56
57    pub(crate) fn add_entity(&self, entity: Entity) -> &Entity {
58        let index = self.entities.push(entity);
59        &self.entities[index]
60    }
61
62    pub async fn create<'rec, 'trx: 'rec, M: Model>(&'trx self, model: &M) -> Result<MutableBorrow<'rec, M::Mutable>, MutationError> {
63        let entity = self.dyncontext.create_entity(M::collection(), self.alive.clone());
64        model.initialize_new_entity(&entity);
65        self.dyncontext.check_write(&entity)?;
66
67        // Track that this entity was created in this transaction
68        self.created_entity_ids.write().unwrap().insert(entity.id);
69
70        let entity_ref = self.add_entity(entity);
71        Ok(MutableBorrow::new(entity_ref))
72    }
73    fn get_trx_entity(&self, id: &EntityId) -> Option<&Entity> { self.entities.iter().find(|e| e.id == *id) }
74    pub async fn get<'rec, 'trx: 'rec, M: Model>(&'trx self, id: &EntityId) -> Result<MutableBorrow<'rec, M::Mutable>, RetrievalError> {
75        match self.get_trx_entity(id) {
76            Some(entity) => Ok(MutableBorrow::new(entity)),
77            None => {
78                // go fetch the entity from the context
79                let retrieved_entity = self.dyncontext.get_entity(*id, &M::collection(), false).await?;
80                // double check to make sure somebody didn't add the entity to the trx during the await
81                // because we're forking the entity, we need to make sure we aren't adding the same entity twice
82                if let Some(entity) = self.get_trx_entity(&retrieved_entity.id) {
83                    // if this happens, I don't think we want to refresh the entity, because it's already snapshotted in the trx
84                    // and we should leave it that way to honor the consistency model
85                    Ok(MutableBorrow::new(entity))
86                } else {
87                    Ok(MutableBorrow::new(self.add_entity(retrieved_entity.snapshot(self.alive.clone()))))
88                }
89            }
90        }
91    }
92    pub fn edit<'rec, 'trx: 'rec, M: Model>(&'trx self, entity: &Entity) -> Result<MutableBorrow<'rec, M::Mutable>, AccessDenied> {
93        if let Some(entity) = self.get_trx_entity(&entity.id) {
94            return Ok(MutableBorrow::new(entity));
95        }
96        self.dyncontext.check_write(entity)?;
97
98        Ok(MutableBorrow::new(self.add_entity(entity.snapshot(self.alive.clone()))))
99    }
100
101    #[must_use]
102    pub async fn commit(self) -> Result<(), MutationError> { self.dyncontext.commit_local_trx(&self).await }
103
104    pub fn rollback(self) {
105        // Mark transaction as no longer alive
106        self.alive.store(false, Ordering::Release);
107        // The transaction will be dropped without committing
108    }
109
110    // TODO: Implement delete functionality after core query/edit operations are stable
111    // For now, "removal" from result sets is handled by edits that cause entities to no longer match queries
112    /*
113    pub async fn delete<'rec, 'trx: 'rec, M: Model>(
114        &'trx self,
115        id: impl Into<ID>,
116    ) -> Result<(), crate::error::RetrievalError> {
117        let id = id.into();
118        let entity = self.fetch_entity(id, M::collection()).await?;
119        let entity = Arc::new(entity.clone());
120        self.node.delete_entity(entity).await?;
121        Ok(())
122    }
123    */
124}
125
126impl Drop for Transaction {
127    fn drop(&mut self) {
128        // Mark transaction as no longer alive when dropped
129        self.alive.store(false, Ordering::Release);
130        // how do we want to do the rollback?
131    }
132}
133
134#[cfg(feature = "uniffi")]
135#[uniffi::export]
136impl Transaction {
137    /// Commit the transaction (UniFFI version - uses Arc<Self>)
138    /// Simply borrows self and calls commit_local_trx - the alive flag prevents double commits
139    #[uniffi::method(name = "commit")]
140    pub async fn uniffi_commit(self: Arc<Self>) -> Result<(), MutationError> { self.dyncontext.commit_local_trx(&self).await }
141
142    /// Rollback the transaction (UniFFI version)
143    #[uniffi::method(name = "rollback")]
144    pub fn uniffi_rollback(&self) { self.alive.store(false, Ordering::Release); }
145}