Skip to main content

ankurah_core/
entity.rs

1use crate::lineage::{self, GetEvents, Retrieve};
2use crate::selection::filter::Filterable;
3use crate::{
4    error::{LineageError, MutationError, RetrievalError, StateError},
5    model::View,
6    property::backend::{backend_from_string, PropertyBackend},
7    reactor::AbstractEntity,
8    value::Value,
9};
10use ankurah_proto::{Clock, CollectionId, EntityId, EntityState, Event, EventId, OperationSet, State};
11use std::collections::BTreeMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Weak};
14use tracing::{debug, warn};
15
16/// An entity represents a unique thing within a collection. Entity can only be constructed via a WeakEntitySet
17/// which provides duplication guarantees.
18#[derive(Debug, Clone)]
19pub struct Entity(Arc<EntityInner>);
20
21// TODO optimize this to be faster for scanning over entries in a collection
22/// Used only for reconstituting state to filter database results. No duplication guarantees are provided
23pub struct TemporaryEntity(Arc<EntityInner>);
24
25/// Combined state for atomic updates of head and backends
26#[derive(Debug)]
27struct EntityInnerState {
28    head: Clock,
29    // TODO: remove interior mutability from backends; make mutation methods take &mut self
30    backends: BTreeMap<String, Arc<dyn PropertyBackend>>,
31}
32
33impl EntityInnerState {
34    /// Apply operations to a specific backend within this state
35    /// TODO: backends currently rely on interior mutability; refactor to externalize mutability
36    fn apply_operations(&mut self, backend_name: String, operations: &Vec<ankurah_proto::Operation>) -> Result<(), MutationError> {
37        if let Some(backend) = self.backends.get(&backend_name) {
38            backend.apply_operations(operations)?;
39        } else {
40            let backend = backend_from_string(&backend_name, None)?;
41            backend.apply_operations(operations)?;
42            self.backends.insert(backend_name, backend);
43        }
44        Ok(())
45    }
46}
47
48#[derive(Debug)]
49pub struct EntityInner {
50    pub id: EntityId,
51    pub collection: CollectionId,
52    /// Combined state RwLock for atomic head/backends updates
53    state: std::sync::RwLock<EntityInnerState>,
54    pub(crate) kind: EntityKind,
55    /// Broadcast for notifying Signal subscribers about entity changes
56    pub(crate) broadcast: ankurah_signals::broadcast::Broadcast,
57}
58
59#[derive(Debug)]
60pub enum EntityKind {
61    Primary,                                                     // New or resident entity - TODO delineate these
62    Transacted { trx_alive: Arc<AtomicBool>, upstream: Entity }, // Transaction fork with liveness tracking
63}
64
65impl std::ops::Deref for Entity {
66    type Target = EntityInner;
67
68    fn deref(&self) -> &Self::Target { &self.0 }
69}
70
71impl std::ops::Deref for TemporaryEntity {
72    type Target = EntityInner;
73
74    fn deref(&self) -> &Self::Target { &self.0 }
75}
76
77impl PartialEq for Entity {
78    fn eq(&self, other: &Self) -> bool { Arc::ptr_eq(&self.0, &other.0) }
79}
80
81/// A weak reference to an entity
82pub struct WeakEntity(Weak<EntityInner>);
83
84impl WeakEntity {
85    pub fn upgrade(&self) -> Option<Entity> { self.0.upgrade().map(Entity) }
86}
87
88impl Entity {
89    pub fn id(&self) -> EntityId { self.id }
90
91    // This is intentionally private - only WeakEntitySet should be constructing Entities
92    fn weak(&self) -> WeakEntity { WeakEntity(Arc::downgrade(&self.0)) }
93
94    pub fn collection(&self) -> &CollectionId { &self.collection }
95
96    pub fn head(&self) -> Clock { self.state.read().unwrap().head.clone() }
97
98    /// Check if this entity is writable (i.e., it's a transaction fork that's still alive)
99    pub fn is_writable(&self) -> bool {
100        match &self.kind {
101            EntityKind::Primary => false, // Primary entities are read-only
102            EntityKind::Transacted { trx_alive, .. } => trx_alive.load(Ordering::Acquire),
103        }
104    }
105
106    pub fn to_state(&self) -> Result<State, StateError> {
107        let state = self.state.read().expect("other thread panicked, panic here too");
108        let mut state_buffers = BTreeMap::default();
109        for (name, backend) in &state.backends {
110            let state_buffer = backend.to_state_buffer()?;
111            state_buffers.insert(name.clone(), state_buffer);
112        }
113        let state_buffers = ankurah_proto::StateBuffers(state_buffers);
114        Ok(State { state_buffers, head: state.head.clone() })
115    }
116
117    pub fn to_entity_state(&self) -> Result<EntityState, StateError> {
118        let state = self.to_state()?;
119        Ok(EntityState { entity_id: self.id(), collection: self.collection.clone(), state })
120    }
121
122    // used by the Model macro
123    pub fn create(id: EntityId, collection: CollectionId) -> Self {
124        Self(Arc::new(EntityInner {
125            id,
126            collection,
127            state: std::sync::RwLock::new(EntityInnerState { head: Clock::default(), backends: BTreeMap::default() }),
128            kind: EntityKind::Primary,
129            broadcast: ankurah_signals::broadcast::Broadcast::new(),
130        }))
131    }
132
133    /// This must remain private - ONLY WeakEntitySet should be constructing Entities
134    fn from_state(id: EntityId, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
135        let mut backends = BTreeMap::new();
136        for (name, state_buffer) in state.state_buffers.iter() {
137            let backend = backend_from_string(name, Some(state_buffer))?;
138            backends.insert(name.to_owned(), backend);
139        }
140
141        Ok(Self(Arc::new(EntityInner {
142            id,
143            collection,
144            state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends }),
145            kind: EntityKind::Primary,
146            broadcast: ankurah_signals::broadcast::Broadcast::new(),
147        })))
148    }
149
150    /// Generate an event which contains all operations for all backends since the last time they were collected
151    /// Used for transaction commit. Notably this does not apply the head to the entity, which must be done
152    /// using commit_head
153    pub(crate) fn generate_commit_event(&self) -> Result<Option<Event>, MutationError> {
154        let state = self.state.read().expect("other thread panicked, panic here too");
155        let mut operations = BTreeMap::<String, Vec<ankurah_proto::Operation>>::new();
156        for (name, backend) in &state.backends {
157            if let Some(ops) = backend.to_operations()? {
158                operations.insert(name.clone(), ops);
159            }
160        }
161
162        if operations.is_empty() {
163            Ok(None)
164        } else {
165            let operations = OperationSet(operations);
166            let event = Event { entity_id: self.id, collection: self.collection.clone(), operations, parent: state.head.clone() };
167            Ok(Some(event))
168        }
169    }
170
171    /// Updates the head of the entity to the given clock, which should come exclusively from generate_commit_event
172    pub(crate) fn commit_head(&self, new_head: Clock) {
173        // TODO figure out how to implement CAS with the backend state
174        // probably need an increment for local edits
175        self.state.write().unwrap().head = new_head;
176    }
177
178    /// Attempts to mutate the entity state if the head matches the expected value.
179    ///
180    /// This provides TOCTOU protection: grabs the write lock, checks that `state.head == expected_head`,
181    /// and only then runs the closure. If the head changed, updates `expected_head` to the current value
182    /// and returns `Ok(false)` so the caller can retry with fresh lineage info.
183    ///
184    /// Returns `Ok(true)` if the mutation succeeded, `Ok(false)` if the head moved (retry needed),
185    /// or `Err` if the closure returned an error.
186    fn try_mutate<F, E>(&self, expected_head: &mut Clock, body: F) -> Result<bool, E>
187    where F: FnOnce(&mut EntityInnerState) -> Result<(), E> {
188        let mut state = self.state.write().unwrap();
189        if &state.head != expected_head {
190            *expected_head = state.head.clone();
191            return Ok(false);
192        }
193        body(&mut state)?;
194        Ok(true)
195    }
196
197    pub fn view<V: View>(&self) -> Option<V> {
198        if self.collection() != &V::collection() {
199            None
200        } else {
201            Some(V::from_entity(self.clone()))
202        }
203    }
204
205    /// Attempt to apply an event to the entity
206    #[cfg_attr(feature = "instrument", tracing::instrument(level="debug", skip_all, fields(entity = %self, event = %event)))]
207    pub async fn apply_event<G>(&self, getter: &G, event: &Event) -> Result<bool, MutationError>
208    where G: GetEvents<Id = EventId, Event = Event> {
209        debug!("apply_event head: {event} to {self}");
210
211        // Check for entity creation under the mutex to avoid TOCTOU race
212        if event.is_entity_create() {
213            let mut state = self.state.write().unwrap();
214            // Re-check if head is still empty now that we hold the lock
215            if state.head.is_empty() {
216                // this is the creation event for a new entity, so we simply accept it
217                for (backend_name, operations) in event.operations.iter() {
218                    state.apply_operations(backend_name.clone(), operations)?;
219                }
220                state.head = event.id().into();
221                drop(state); // Release lock before broadcast
222                             // Notify Signal subscribers about the change
223                self.broadcast.send(());
224                return Ok(true);
225            }
226            // If head is no longer empty, fall through to normal lineage comparison
227        }
228
229        let mut head = self.head();
230        // Retry loop to handle head changes between lineage comparison and mutation
231        const MAX_RETRIES: usize = 5;
232        let budget = 100;
233
234        for attempt in 0..MAX_RETRIES {
235            let new_head: Clock = match crate::lineage::compare_unstored_event(getter, event, &head, budget).await? {
236                lineage::Ordering::Equal => return Ok(false),
237                lineage::Ordering::Descends => event.id().into(),
238                lineage::Ordering::NotDescends { meet: _ } => {
239                    warn!("NotDescends - HACK - applying (attempt {})", attempt + 1);
240                    head.with_event(event.id())
241                }
242                lineage::Ordering::Incomparable => {
243                    return Err(LineageError::Incomparable.into());
244                }
245                lineage::Ordering::PartiallyDescends { meet } => {
246                    return Err(LineageError::PartiallyDescends { meet }.into());
247                }
248                lineage::Ordering::BudgetExceeded { subject_frontier, other_frontier } => {
249                    warn!(
250                        "apply_event budget exhausted after {budget} events. Assuming Descends. subject_frontier: {}, other_frontier: {}",
251                        subject_frontier.iter().map(|id| id.to_base64_short()).collect::<Vec<String>>().join(", "),
252                        other_frontier.iter().map(|id| id.to_base64_short()).collect::<Vec<String>>().join(", ")
253                    );
254                    event.id().into()
255                }
256            };
257
258            if self.try_mutate(&mut head, move |state| -> Result<(), MutationError> {
259                for (backend_name, operations) in event.operations.iter() {
260                    state.apply_operations(backend_name.clone(), operations)?;
261                }
262                state.head = new_head;
263                Ok(())
264            })? {
265                self.broadcast.send(());
266                return Ok(true);
267            }
268            continue;
269        }
270
271        warn!("apply_event retries exhausted while chasing moving head; applying event as Descends");
272        Err(MutationError::TOCTOUAttemptsExhausted)
273    }
274
275    pub async fn apply_state<G>(&self, getter: &G, state: &State) -> Result<bool, MutationError>
276    where G: GetEvents<Id = EventId, Event = Event> {
277        let mut head = self.head();
278        let new_head = state.head.clone();
279
280        debug!("{self} apply_state - new head: {new_head}");
281        let budget = 100;
282        const MAX_RETRIES: usize = 5;
283
284        for _attempt in 0..MAX_RETRIES {
285            let apply = match crate::lineage::compare(getter, &new_head, &head, budget).await? {
286                lineage::Ordering::Equal => return Ok(false),
287                lineage::Ordering::Descends => true,
288                lineage::Ordering::NotDescends { meet: _ } => return Ok(false),
289                lineage::Ordering::Incomparable => return Err(LineageError::Incomparable.into()),
290                lineage::Ordering::PartiallyDescends { meet } => return Err(LineageError::PartiallyDescends { meet }.into()),
291                lineage::Ordering::BudgetExceeded { subject_frontier, other_frontier } => {
292                    warn!(
293                        "{self} apply_state - budget exhausted after {budget} events. Assuming Descends. subject: {subject_frontier:?}, other: {other_frontier:?}"
294                    );
295                    true
296                }
297            };
298
299            if apply {
300                if self.try_mutate::<_, MutationError>(&mut head, |es| -> Result<(), MutationError> {
301                    for (name, state_buffer) in state.state_buffers.iter() {
302                        let backend = backend_from_string(name, Some(state_buffer))?;
303                        es.backends.insert(name.to_owned(), backend);
304                    }
305                    es.head = state.head.clone();
306                    Ok(())
307                })? {
308                    self.broadcast.send(());
309                    return Ok(true);
310                }
311                continue;
312            }
313        }
314
315        warn!("{self} apply_state retries exhausted while chasing moving head");
316        Err(MutationError::TOCTOUAttemptsExhausted)
317    }
318
319    /// Create a snapshot of the Entity which is detached from this one, and will not receive the updates this one does
320    /// The trx_alive parameter tracks whether the transaction that owns this snapshot is still alive
321    pub fn snapshot(&self, trx_alive: Arc<AtomicBool>) -> Self {
322        // Inline fork logic
323        let state = self.state.read().expect("other thread panicked, panic here too");
324        let mut forked = BTreeMap::new();
325        for (name, backend) in &state.backends {
326            forked.insert(name.clone(), backend.fork());
327        }
328
329        Self(Arc::new(EntityInner {
330            id: self.id,
331            collection: self.collection.clone(),
332            state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends: forked }),
333            kind: EntityKind::Transacted { trx_alive, upstream: self.clone() },
334            broadcast: ankurah_signals::broadcast::Broadcast::new(),
335        }))
336    }
337
338    /// Get a reference to the entity's broadcast for Signal implementations
339    pub fn broadcast(&self) -> &ankurah_signals::broadcast::Broadcast { &self.broadcast }
340
341    /// Get a specific backend, creating it if it doesn't exist
342    pub fn get_backend<P: PropertyBackend>(&self) -> Result<Arc<P>, RetrievalError> {
343        let backend_name = P::property_backend_name();
344        let mut state = self.state.write().expect("other thread panicked, panic here too");
345        if let Some(backend) = state.backends.get(&backend_name) {
346            let upcasted = backend.clone().as_arc_dyn_any();
347            Ok(upcasted.downcast::<P>().unwrap()) // TODO: handle downcast error
348        } else {
349            let backend = backend_from_string(&backend_name, None)?;
350            let upcasted = backend.clone().as_arc_dyn_any();
351            let typed_backend = upcasted.downcast::<P>().unwrap(); // TODO handle downcast error
352            state.backends.insert(backend_name, backend);
353            Ok(typed_backend)
354        }
355    }
356
357    pub fn values(&self) -> Vec<(String, Option<Value>)> {
358        let state = self.state.read().expect("other thread panicked, panic here too");
359        state
360            .backends
361            .values()
362            .flat_map(|backend| {
363                backend
364                    .property_values()
365                    .iter()
366                    .map(|(name, value)| (name.to_string(), value.clone()))
367                    .collect::<Vec<(String, Option<Value>)>>()
368            })
369            .collect()
370    }
371}
372
373// Implement AbstractEntity for Entity (used by reactor)
374impl AbstractEntity for Entity {
375    fn collection(&self) -> ankurah_proto::CollectionId { self.collection.clone() }
376
377    fn id(&self) -> &ankurah_proto::EntityId { &self.id }
378
379    fn value(&self, field: &str) -> Option<crate::value::Value> {
380        if field == "id" {
381            Some(crate::value::Value::EntityId(self.id))
382        } else {
383            // Iterate through backends to find one that has this property
384            let state = self.state.read().expect("other thread panicked, panic here too");
385            state.backends.values().find_map(|backend| backend.property_value(&field.into()))
386        }
387    }
388}
389
390impl std::fmt::Display for Entity {
391    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392        write!(f, "Entity({}/{} {:#})", self.collection, self.id.to_base64_short(), self.head())
393    }
394}
395
396impl Filterable for Entity {
397    fn collection(&self) -> &str { self.collection.as_str() }
398
399    fn value(&self, name: &str) -> Option<Value> {
400        if name == "id" {
401            Some(Value::EntityId(self.id))
402        } else {
403            // Iterate through backends to find one that has this property
404            let state = self.state.read().expect("other thread panicked, panic here too");
405            state.backends.values().find_map(|backend| backend.property_value(&name.to_owned()))
406        }
407    }
408}
409
410impl TemporaryEntity {
411    pub fn new(id: EntityId, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
412        // Inline from_state_buffers logic
413        let mut backends = BTreeMap::new();
414        for (name, state_buffer) in state.state_buffers.iter() {
415            let backend = backend_from_string(name, Some(state_buffer))?;
416            backends.insert(name.to_owned(), backend);
417        }
418
419        Ok(Self(Arc::new(EntityInner {
420            id,
421            collection,
422            state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends }),
423            kind: EntityKind::Primary,
424            // slightly annoying that we need to populate this, given that it won't be used
425            broadcast: ankurah_signals::broadcast::Broadcast::new(),
426        })))
427    }
428    pub fn values(&self) -> Vec<(String, Option<Value>)> {
429        let state = self.0.state.read().expect("other thread panicked, panic here too");
430        state.backends.values().flat_map(|backend| backend.property_values()).collect()
431    }
432}
433
434// TODO - clean this up and consolidate with Entity somehow, while still preventing anyone from creating unregistered (non-temporary) Entities
435impl Filterable for TemporaryEntity {
436    fn collection(&self) -> &str { self.0.collection.as_str() }
437
438    fn value(&self, name: &str) -> Option<Value> {
439        if name == "id" {
440            Some(Value::EntityId(self.0.id))
441        } else {
442            // Iterate through backends to find one that has this property
443            let state = self.0.state.read().expect("other thread panicked, panic here too");
444            state.backends.values().find_map(|backend| backend.property_value(&name.to_owned()))
445        }
446    }
447}
448
449impl std::fmt::Display for TemporaryEntity {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        write!(f, "TemporaryEntity({}/{}) = {}", &self.collection, self.id, self.0.state.read().unwrap().head)
452    }
453}
454
455// TODO - Implement TOCTOU Race condition tests. Require real backend state mutations to be meaningful. punting that for now
456/// A set of entities held weakly
457#[derive(Clone, Default)]
458pub struct WeakEntitySet(Arc<std::sync::RwLock<BTreeMap<EntityId, WeakEntity>>>);
459impl WeakEntitySet {
460    pub fn get(&self, id: &EntityId) -> Option<Entity> {
461        let entities = self.0.read().unwrap();
462        // TODO: call policy agent with cdata
463        if let Some(entity) = entities.get(id) {
464            entity.upgrade()
465        } else {
466            None
467        }
468    }
469
470    pub async fn get_or_retrieve<R>(
471        &self,
472        retriever: &R,
473        collection_id: &CollectionId,
474        id: &EntityId,
475    ) -> Result<Option<Entity>, RetrievalError>
476    where
477        R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
478    {
479        // do it in two phases to avoid holding the lock while waiting for the collection
480        match self.get(id) {
481            Some(entity) => Ok(Some(entity)),
482            None => match retriever.get_state(*id).await {
483                Ok(None) => Ok(None),
484                Ok(Some(state)) => {
485                    // technically someone could have added the entity since we last checked, so it's better to use the
486                    // with_state method to re-check
487                    let (_, entity) = self.with_state(retriever, *id, collection_id.to_owned(), state.payload.state).await?;
488                    Ok(Some(entity))
489                }
490                Err(e) => Err(e),
491            },
492        }
493    }
494    /// Returns a resident entity, or fetches it from storage, or finally creates if neither of the two are found
495    pub async fn get_retrieve_or_create<R>(
496        &self,
497        retriever: &R,
498        collection_id: &CollectionId,
499        id: &EntityId,
500    ) -> Result<Entity, RetrievalError>
501    where
502        R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
503    {
504        match self.get_or_retrieve(retriever, collection_id, id).await? {
505            Some(entity) => Ok(entity),
506            None => {
507                let mut entities = self.0.write().unwrap();
508                // TODO: call policy agent with cdata
509                if let Some(entity) = entities.get(id) {
510                    if let Some(entity) = entity.upgrade() {
511                        return Ok(entity);
512                    }
513                }
514                let entity = Entity::create(*id, collection_id.to_owned());
515                entities.insert(*id, entity.weak());
516                Ok(entity)
517            }
518        }
519    }
520    /// Create a brand new entity, and add it to the set
521    pub fn create(&self, collection: CollectionId) -> Entity {
522        let mut entities = self.0.write().unwrap();
523        let id = EntityId::new();
524        let entity = Entity::create(id, collection);
525        entities.insert(id, entity.weak());
526        entity
527    }
528
529    /// TEST ONLY: Create a phantom entity with a specific ID.
530    ///
531    /// This creates an entity that was never properly created via Transaction::create(),
532    /// has no creation event, and has an empty state. Used for adversarial testing to
533    /// verify that commit paths properly reject such entities.
534    ///
535    /// WARNING: This bypasses all normal entity creation validation. Only use in tests
536    /// to verify security properties.
537    ///
538    /// Requires the `test-helpers` feature to be enabled.
539    #[cfg(feature = "test-helpers")]
540    pub fn conjure_evil_phantom(&self, id: EntityId, collection: CollectionId) -> Entity {
541        let mut entities = self.0.write().unwrap();
542        let entity = Entity::create(id, collection);
543        entities.insert(id, entity.weak());
544        entity
545    }
546
547    /// Get or create entity after async operations, checking for race conditions
548    /// Returns (existed, entity) where existed is true if the entity was already present
549    fn private_get_or_create(&self, id: EntityId, collection_id: &CollectionId, state: &State) -> Result<(bool, Entity), RetrievalError> {
550        let mut entities = self.0.write().unwrap();
551        if let Some(existing_weak) = entities.get(&id) {
552            if let Some(existing_entity) = existing_weak.upgrade() {
553                debug!("Entity {id} was created by another thread during async work, using that one");
554                return Ok((true, existing_entity));
555            }
556        }
557        let entity = Entity::from_state(id, collection_id.to_owned(), state)?;
558        entities.insert(id, entity.weak());
559        Ok((false, entity))
560    }
561
562    /// Returns a tuple of (changed, entity)
563    /// changed is Some(true) if the entity was changed, Some(false) if it already exists and the state was not applied
564    /// None if the entity was not previously on the local node (either in the WeakEntitySet or in storage)
565    pub async fn with_state<R>(
566        &self,
567        retriever: &R,
568        id: EntityId,
569        collection_id: CollectionId,
570        state: State,
571    ) -> Result<(Option<bool>, Entity), RetrievalError>
572    where
573        R: Retrieve<Id = EventId, Event = Event>,
574    {
575        let entity = match self.get(&id) {
576            Some(entity) => entity, // already resident
577            None => {
578                // not yet resident. We have to retrieve our baseline state before applying the new state
579                if let Some(stored_state) = retriever.get_state(id).await? {
580                    // get a resident entity for this retrieved state. It's possible somebody frontran us to create it
581                    // but we don't actually care, so we ignore the created flag
582                    self.private_get_or_create(id, &collection_id, &stored_state.payload.state)?.1
583                } else {
584                    // no stored state, so we can use the given state directly
585                    match self.private_get_or_create(id, &collection_id, &state)? {
586                        (true, entity) => entity, // some body frontran us to create it, so we have to apply the new state
587                        (false, entity) => {
588                            // we just created it with the given state, so there's nothing to apply. early return
589                            return Ok((None, entity));
590                        }
591                    }
592                }
593            }
594        };
595
596        // if we're here, we've retrieved the entity from the set and need to apply the state
597        let changed = entity.apply_state(retriever, &state).await?;
598        Ok((Some(changed), entity))
599    }
600}