1use crate::lineage::{self, GetEvents, Retrieve};
2use crate::selection::filter::Filterable;
3use crate::{
4 error::{LineageError, MutationError, RetrievalError, StateError},
5 model::View,
6 property::backend::{backend_from_string, PropertyBackend},
7 reactor::AbstractEntity,
8 value::Value,
9};
10use ankurah_proto::{Clock, CollectionId, EntityId, EntityState, Event, EventId, OperationSet, State};
11use std::collections::BTreeMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Weak};
14use tracing::{debug, warn};
15
16#[derive(Debug, Clone)]
19pub struct Entity(Arc<EntityInner>);
20
21pub struct TemporaryEntity(Arc<EntityInner>);
24
25#[derive(Debug)]
27struct EntityInnerState {
28 head: Clock,
29 backends: BTreeMap<String, Arc<dyn PropertyBackend>>,
31}
32
33impl EntityInnerState {
34 fn apply_operations(&mut self, backend_name: String, operations: &Vec<ankurah_proto::Operation>) -> Result<(), MutationError> {
37 if let Some(backend) = self.backends.get(&backend_name) {
38 backend.apply_operations(operations)?;
39 } else {
40 let backend = backend_from_string(&backend_name, None)?;
41 backend.apply_operations(operations)?;
42 self.backends.insert(backend_name, backend);
43 }
44 Ok(())
45 }
46}
47
48#[derive(Debug)]
49pub struct EntityInner {
50 pub id: EntityId,
51 pub collection: CollectionId,
52 state: std::sync::RwLock<EntityInnerState>,
54 pub(crate) kind: EntityKind,
55 pub(crate) broadcast: ankurah_signals::broadcast::Broadcast,
57}
58
59#[derive(Debug)]
60pub enum EntityKind {
61 Primary, Transacted { trx_alive: Arc<AtomicBool>, upstream: Entity }, }
64
65impl std::ops::Deref for Entity {
66 type Target = EntityInner;
67
68 fn deref(&self) -> &Self::Target { &self.0 }
69}
70
71impl std::ops::Deref for TemporaryEntity {
72 type Target = EntityInner;
73
74 fn deref(&self) -> &Self::Target { &self.0 }
75}
76
77impl PartialEq for Entity {
78 fn eq(&self, other: &Self) -> bool { Arc::ptr_eq(&self.0, &other.0) }
79}
80
81pub struct WeakEntity(Weak<EntityInner>);
83
84impl WeakEntity {
85 pub fn upgrade(&self) -> Option<Entity> { self.0.upgrade().map(Entity) }
86}
87
88impl Entity {
89 pub fn id(&self) -> EntityId { self.id }
90
91 fn weak(&self) -> WeakEntity { WeakEntity(Arc::downgrade(&self.0)) }
93
94 pub fn collection(&self) -> &CollectionId { &self.collection }
95
96 pub fn head(&self) -> Clock { self.state.read().unwrap().head.clone() }
97
98 pub fn is_writable(&self) -> bool {
100 match &self.kind {
101 EntityKind::Primary => false, EntityKind::Transacted { trx_alive, .. } => trx_alive.load(Ordering::Acquire),
103 }
104 }
105
106 pub fn to_state(&self) -> Result<State, StateError> {
107 let state = self.state.read().expect("other thread panicked, panic here too");
108 let mut state_buffers = BTreeMap::default();
109 for (name, backend) in &state.backends {
110 let state_buffer = backend.to_state_buffer()?;
111 state_buffers.insert(name.clone(), state_buffer);
112 }
113 let state_buffers = ankurah_proto::StateBuffers(state_buffers);
114 Ok(State { state_buffers, head: state.head.clone() })
115 }
116
117 pub fn to_entity_state(&self) -> Result<EntityState, StateError> {
118 let state = self.to_state()?;
119 Ok(EntityState { entity_id: self.id(), collection: self.collection.clone(), state })
120 }
121
122 pub fn create(id: EntityId, collection: CollectionId) -> Self {
124 Self(Arc::new(EntityInner {
125 id,
126 collection,
127 state: std::sync::RwLock::new(EntityInnerState { head: Clock::default(), backends: BTreeMap::default() }),
128 kind: EntityKind::Primary,
129 broadcast: ankurah_signals::broadcast::Broadcast::new(),
130 }))
131 }
132
133 fn from_state(id: EntityId, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
135 let mut backends = BTreeMap::new();
136 for (name, state_buffer) in state.state_buffers.iter() {
137 let backend = backend_from_string(name, Some(state_buffer))?;
138 backends.insert(name.to_owned(), backend);
139 }
140
141 Ok(Self(Arc::new(EntityInner {
142 id,
143 collection,
144 state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends }),
145 kind: EntityKind::Primary,
146 broadcast: ankurah_signals::broadcast::Broadcast::new(),
147 })))
148 }
149
150 pub(crate) fn generate_commit_event(&self) -> Result<Option<Event>, MutationError> {
154 let state = self.state.read().expect("other thread panicked, panic here too");
155 let mut operations = BTreeMap::<String, Vec<ankurah_proto::Operation>>::new();
156 for (name, backend) in &state.backends {
157 if let Some(ops) = backend.to_operations()? {
158 operations.insert(name.clone(), ops);
159 }
160 }
161
162 if operations.is_empty() {
163 Ok(None)
164 } else {
165 let operations = OperationSet(operations);
166 let event = Event { entity_id: self.id, collection: self.collection.clone(), operations, parent: state.head.clone() };
167 Ok(Some(event))
168 }
169 }
170
171 pub(crate) fn commit_head(&self, new_head: Clock) {
173 self.state.write().unwrap().head = new_head;
176 }
177
178 fn try_mutate<F, E>(&self, expected_head: &mut Clock, body: F) -> Result<bool, E>
187 where F: FnOnce(&mut EntityInnerState) -> Result<(), E> {
188 let mut state = self.state.write().unwrap();
189 if &state.head != expected_head {
190 *expected_head = state.head.clone();
191 return Ok(false);
192 }
193 body(&mut state)?;
194 Ok(true)
195 }
196
197 pub fn view<V: View>(&self) -> Option<V> {
198 if self.collection() != &V::collection() {
199 None
200 } else {
201 Some(V::from_entity(self.clone()))
202 }
203 }
204
205 #[cfg_attr(feature = "instrument", tracing::instrument(level="debug", skip_all, fields(entity = %self, event = %event)))]
207 pub async fn apply_event<G>(&self, getter: &G, event: &Event) -> Result<bool, MutationError>
208 where G: GetEvents<Id = EventId, Event = Event> {
209 debug!("apply_event head: {event} to {self}");
210
211 if event.is_entity_create() {
213 let mut state = self.state.write().unwrap();
214 if state.head.is_empty() {
216 for (backend_name, operations) in event.operations.iter() {
218 state.apply_operations(backend_name.clone(), operations)?;
219 }
220 state.head = event.id().into();
221 drop(state); self.broadcast.send(());
224 return Ok(true);
225 }
226 }
228
229 let mut head = self.head();
230 const MAX_RETRIES: usize = 5;
232 let budget = 100;
233
234 for attempt in 0..MAX_RETRIES {
235 let new_head: Clock = match crate::lineage::compare_unstored_event(getter, event, &head, budget).await? {
236 lineage::Ordering::Equal => return Ok(false),
237 lineage::Ordering::Descends => event.id().into(),
238 lineage::Ordering::NotDescends { meet: _ } => {
239 warn!("NotDescends - HACK - applying (attempt {})", attempt + 1);
240 head.with_event(event.id())
241 }
242 lineage::Ordering::Incomparable => {
243 return Err(LineageError::Incomparable.into());
244 }
245 lineage::Ordering::PartiallyDescends { meet } => {
246 return Err(LineageError::PartiallyDescends { meet }.into());
247 }
248 lineage::Ordering::BudgetExceeded { subject_frontier, other_frontier } => {
249 warn!(
250 "apply_event budget exhausted after {budget} events. Assuming Descends. subject_frontier: {}, other_frontier: {}",
251 subject_frontier.iter().map(|id| id.to_base64_short()).collect::<Vec<String>>().join(", "),
252 other_frontier.iter().map(|id| id.to_base64_short()).collect::<Vec<String>>().join(", ")
253 );
254 event.id().into()
255 }
256 };
257
258 if self.try_mutate(&mut head, move |state| -> Result<(), MutationError> {
259 for (backend_name, operations) in event.operations.iter() {
260 state.apply_operations(backend_name.clone(), operations)?;
261 }
262 state.head = new_head;
263 Ok(())
264 })? {
265 self.broadcast.send(());
266 return Ok(true);
267 }
268 continue;
269 }
270
271 warn!("apply_event retries exhausted while chasing moving head; applying event as Descends");
272 Err(MutationError::TOCTOUAttemptsExhausted)
273 }
274
275 pub async fn apply_state<G>(&self, getter: &G, state: &State) -> Result<bool, MutationError>
276 where G: GetEvents<Id = EventId, Event = Event> {
277 let mut head = self.head();
278 let new_head = state.head.clone();
279
280 debug!("{self} apply_state - new head: {new_head}");
281 let budget = 100;
282 const MAX_RETRIES: usize = 5;
283
284 for _attempt in 0..MAX_RETRIES {
285 let apply = match crate::lineage::compare(getter, &new_head, &head, budget).await? {
286 lineage::Ordering::Equal => return Ok(false),
287 lineage::Ordering::Descends => true,
288 lineage::Ordering::NotDescends { meet: _ } => return Ok(false),
289 lineage::Ordering::Incomparable => return Err(LineageError::Incomparable.into()),
290 lineage::Ordering::PartiallyDescends { meet } => return Err(LineageError::PartiallyDescends { meet }.into()),
291 lineage::Ordering::BudgetExceeded { subject_frontier, other_frontier } => {
292 warn!(
293 "{self} apply_state - budget exhausted after {budget} events. Assuming Descends. subject: {subject_frontier:?}, other: {other_frontier:?}"
294 );
295 true
296 }
297 };
298
299 if apply {
300 if self.try_mutate::<_, MutationError>(&mut head, |es| -> Result<(), MutationError> {
301 for (name, state_buffer) in state.state_buffers.iter() {
302 let backend = backend_from_string(name, Some(state_buffer))?;
303 es.backends.insert(name.to_owned(), backend);
304 }
305 es.head = state.head.clone();
306 Ok(())
307 })? {
308 self.broadcast.send(());
309 return Ok(true);
310 }
311 continue;
312 }
313 }
314
315 warn!("{self} apply_state retries exhausted while chasing moving head");
316 Err(MutationError::TOCTOUAttemptsExhausted)
317 }
318
319 pub fn snapshot(&self, trx_alive: Arc<AtomicBool>) -> Self {
322 let state = self.state.read().expect("other thread panicked, panic here too");
324 let mut forked = BTreeMap::new();
325 for (name, backend) in &state.backends {
326 forked.insert(name.clone(), backend.fork());
327 }
328
329 Self(Arc::new(EntityInner {
330 id: self.id,
331 collection: self.collection.clone(),
332 state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends: forked }),
333 kind: EntityKind::Transacted { trx_alive, upstream: self.clone() },
334 broadcast: ankurah_signals::broadcast::Broadcast::new(),
335 }))
336 }
337
338 pub fn broadcast(&self) -> &ankurah_signals::broadcast::Broadcast { &self.broadcast }
340
341 pub fn get_backend<P: PropertyBackend>(&self) -> Result<Arc<P>, RetrievalError> {
343 let backend_name = P::property_backend_name();
344 let mut state = self.state.write().expect("other thread panicked, panic here too");
345 if let Some(backend) = state.backends.get(&backend_name) {
346 let upcasted = backend.clone().as_arc_dyn_any();
347 Ok(upcasted.downcast::<P>().unwrap()) } else {
349 let backend = backend_from_string(&backend_name, None)?;
350 let upcasted = backend.clone().as_arc_dyn_any();
351 let typed_backend = upcasted.downcast::<P>().unwrap(); state.backends.insert(backend_name, backend);
353 Ok(typed_backend)
354 }
355 }
356
357 pub fn values(&self) -> Vec<(String, Option<Value>)> {
358 let state = self.state.read().expect("other thread panicked, panic here too");
359 state
360 .backends
361 .values()
362 .flat_map(|backend| {
363 backend
364 .property_values()
365 .iter()
366 .map(|(name, value)| (name.to_string(), value.clone()))
367 .collect::<Vec<(String, Option<Value>)>>()
368 })
369 .collect()
370 }
371}
372
373impl AbstractEntity for Entity {
375 fn collection(&self) -> ankurah_proto::CollectionId { self.collection.clone() }
376
377 fn id(&self) -> &ankurah_proto::EntityId { &self.id }
378
379 fn value(&self, field: &str) -> Option<crate::value::Value> {
380 if field == "id" {
381 Some(crate::value::Value::EntityId(self.id))
382 } else {
383 let state = self.state.read().expect("other thread panicked, panic here too");
385 state.backends.values().find_map(|backend| backend.property_value(&field.into()))
386 }
387 }
388}
389
390impl std::fmt::Display for Entity {
391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392 write!(f, "Entity({}/{} {:#})", self.collection, self.id.to_base64_short(), self.head())
393 }
394}
395
396impl Filterable for Entity {
397 fn collection(&self) -> &str { self.collection.as_str() }
398
399 fn value(&self, name: &str) -> Option<Value> {
400 if name == "id" {
401 Some(Value::EntityId(self.id))
402 } else {
403 let state = self.state.read().expect("other thread panicked, panic here too");
405 state.backends.values().find_map(|backend| backend.property_value(&name.to_owned()))
406 }
407 }
408}
409
410impl TemporaryEntity {
411 pub fn new(id: EntityId, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
412 let mut backends = BTreeMap::new();
414 for (name, state_buffer) in state.state_buffers.iter() {
415 let backend = backend_from_string(name, Some(state_buffer))?;
416 backends.insert(name.to_owned(), backend);
417 }
418
419 Ok(Self(Arc::new(EntityInner {
420 id,
421 collection,
422 state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends }),
423 kind: EntityKind::Primary,
424 broadcast: ankurah_signals::broadcast::Broadcast::new(),
426 })))
427 }
428 pub fn values(&self) -> Vec<(String, Option<Value>)> {
429 let state = self.0.state.read().expect("other thread panicked, panic here too");
430 state.backends.values().flat_map(|backend| backend.property_values()).collect()
431 }
432}
433
434impl Filterable for TemporaryEntity {
436 fn collection(&self) -> &str { self.0.collection.as_str() }
437
438 fn value(&self, name: &str) -> Option<Value> {
439 if name == "id" {
440 Some(Value::EntityId(self.0.id))
441 } else {
442 let state = self.0.state.read().expect("other thread panicked, panic here too");
444 state.backends.values().find_map(|backend| backend.property_value(&name.to_owned()))
445 }
446 }
447}
448
449impl std::fmt::Display for TemporaryEntity {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 write!(f, "TemporaryEntity({}/{}) = {}", &self.collection, self.id, self.0.state.read().unwrap().head)
452 }
453}
454
455#[derive(Clone, Default)]
458pub struct WeakEntitySet(Arc<std::sync::RwLock<BTreeMap<EntityId, WeakEntity>>>);
459impl WeakEntitySet {
460 pub fn get(&self, id: &EntityId) -> Option<Entity> {
461 let entities = self.0.read().unwrap();
462 if let Some(entity) = entities.get(id) {
464 entity.upgrade()
465 } else {
466 None
467 }
468 }
469
470 pub async fn get_or_retrieve<R>(
471 &self,
472 retriever: &R,
473 collection_id: &CollectionId,
474 id: &EntityId,
475 ) -> Result<Option<Entity>, RetrievalError>
476 where
477 R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
478 {
479 match self.get(id) {
481 Some(entity) => Ok(Some(entity)),
482 None => match retriever.get_state(*id).await {
483 Ok(None) => Ok(None),
484 Ok(Some(state)) => {
485 let (_, entity) = self.with_state(retriever, *id, collection_id.to_owned(), state.payload.state).await?;
488 Ok(Some(entity))
489 }
490 Err(e) => Err(e),
491 },
492 }
493 }
494 pub async fn get_retrieve_or_create<R>(
496 &self,
497 retriever: &R,
498 collection_id: &CollectionId,
499 id: &EntityId,
500 ) -> Result<Entity, RetrievalError>
501 where
502 R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
503 {
504 match self.get_or_retrieve(retriever, collection_id, id).await? {
505 Some(entity) => Ok(entity),
506 None => {
507 let mut entities = self.0.write().unwrap();
508 if let Some(entity) = entities.get(id) {
510 if let Some(entity) = entity.upgrade() {
511 return Ok(entity);
512 }
513 }
514 let entity = Entity::create(*id, collection_id.to_owned());
515 entities.insert(*id, entity.weak());
516 Ok(entity)
517 }
518 }
519 }
520 pub fn create(&self, collection: CollectionId) -> Entity {
522 let mut entities = self.0.write().unwrap();
523 let id = EntityId::new();
524 let entity = Entity::create(id, collection);
525 entities.insert(id, entity.weak());
526 entity
527 }
528
529 #[cfg(feature = "test-helpers")]
540 pub fn conjure_evil_phantom(&self, id: EntityId, collection: CollectionId) -> Entity {
541 let mut entities = self.0.write().unwrap();
542 let entity = Entity::create(id, collection);
543 entities.insert(id, entity.weak());
544 entity
545 }
546
547 fn private_get_or_create(&self, id: EntityId, collection_id: &CollectionId, state: &State) -> Result<(bool, Entity), RetrievalError> {
550 let mut entities = self.0.write().unwrap();
551 if let Some(existing_weak) = entities.get(&id) {
552 if let Some(existing_entity) = existing_weak.upgrade() {
553 debug!("Entity {id} was created by another thread during async work, using that one");
554 return Ok((true, existing_entity));
555 }
556 }
557 let entity = Entity::from_state(id, collection_id.to_owned(), state)?;
558 entities.insert(id, entity.weak());
559 Ok((false, entity))
560 }
561
562 pub async fn with_state<R>(
566 &self,
567 retriever: &R,
568 id: EntityId,
569 collection_id: CollectionId,
570 state: State,
571 ) -> Result<(Option<bool>, Entity), RetrievalError>
572 where
573 R: Retrieve<Id = EventId, Event = Event>,
574 {
575 let entity = match self.get(&id) {
576 Some(entity) => entity, None => {
578 if let Some(stored_state) = retriever.get_state(id).await? {
580 self.private_get_or_create(id, &collection_id, &stored_state.payload.state)?.1
583 } else {
584 match self.private_get_or_create(id, &collection_id, &state)? {
586 (true, entity) => entity, (false, entity) => {
588 return Ok((None, entity));
590 }
591 }
592 }
593 }
594 };
595
596 let changed = entity.apply_state(retriever, &state).await?;
598 Ok((Some(changed), entity))
599 }
600}