use crate::lineage::{self, GetEvents, Retrieve};
use crate::selection::filter::Filterable;
use crate::{
error::{LineageError, MutationError, RetrievalError, StateError},
model::View,
property::backend::{backend_from_string, PropertyBackend},
reactor::AbstractEntity,
value::Value,
};
use ankurah_proto::{Clock, CollectionId, EntityId, EntityState, Event, EventId, OperationSet, State};
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use tracing::{debug, warn};
#[derive(Debug, Clone)]
pub struct Entity(Arc<EntityInner>);
pub struct TemporaryEntity(Arc<EntityInner>);
#[derive(Debug)]
struct EntityInnerState {
head: Clock,
backends: BTreeMap<String, Arc<dyn PropertyBackend>>,
}
impl EntityInnerState {
fn apply_operations(&mut self, backend_name: String, operations: &Vec<ankurah_proto::Operation>) -> Result<(), MutationError> {
if let Some(backend) = self.backends.get(&backend_name) {
backend.apply_operations(operations)?;
} else {
let backend = backend_from_string(&backend_name, None)?;
backend.apply_operations(operations)?;
self.backends.insert(backend_name, backend);
}
Ok(())
}
}
#[derive(Debug)]
pub struct EntityInner {
pub id: EntityId,
pub collection: CollectionId,
state: std::sync::RwLock<EntityInnerState>,
pub(crate) kind: EntityKind,
pub(crate) broadcast: ankurah_signals::broadcast::Broadcast,
}
#[derive(Debug)]
pub enum EntityKind {
Primary, Transacted { trx_alive: Arc<AtomicBool>, upstream: Entity }, }
impl std::ops::Deref for Entity {
type Target = EntityInner;
fn deref(&self) -> &Self::Target { &self.0 }
}
impl std::ops::Deref for TemporaryEntity {
type Target = EntityInner;
fn deref(&self) -> &Self::Target { &self.0 }
}
impl PartialEq for Entity {
fn eq(&self, other: &Self) -> bool { Arc::ptr_eq(&self.0, &other.0) }
}
pub struct WeakEntity(Weak<EntityInner>);
impl WeakEntity {
pub fn upgrade(&self) -> Option<Entity> { self.0.upgrade().map(Entity) }
}
impl Entity {
pub fn id(&self) -> EntityId { self.id }
fn weak(&self) -> WeakEntity { WeakEntity(Arc::downgrade(&self.0)) }
pub fn collection(&self) -> &CollectionId { &self.collection }
pub fn head(&self) -> Clock { self.state.read().unwrap().head.clone() }
pub fn is_writable(&self) -> bool {
match &self.kind {
EntityKind::Primary => false, EntityKind::Transacted { trx_alive, .. } => trx_alive.load(Ordering::Acquire),
}
}
pub fn to_state(&self) -> Result<State, StateError> {
let state = self.state.read().expect("other thread panicked, panic here too");
let mut state_buffers = BTreeMap::default();
for (name, backend) in &state.backends {
let state_buffer = backend.to_state_buffer()?;
state_buffers.insert(name.clone(), state_buffer);
}
let state_buffers = ankurah_proto::StateBuffers(state_buffers);
Ok(State { state_buffers, head: state.head.clone() })
}
pub fn to_entity_state(&self) -> Result<EntityState, StateError> {
let state = self.to_state()?;
Ok(EntityState { entity_id: self.id(), collection: self.collection.clone(), state })
}
pub fn create(id: EntityId, collection: CollectionId) -> Self {
Self(Arc::new(EntityInner {
id,
collection,
state: std::sync::RwLock::new(EntityInnerState { head: Clock::default(), backends: BTreeMap::default() }),
kind: EntityKind::Primary,
broadcast: ankurah_signals::broadcast::Broadcast::new(),
}))
}
fn from_state(id: EntityId, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
let mut backends = BTreeMap::new();
for (name, state_buffer) in state.state_buffers.iter() {
let backend = backend_from_string(name, Some(state_buffer))?;
backends.insert(name.to_owned(), backend);
}
Ok(Self(Arc::new(EntityInner {
id,
collection,
state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends }),
kind: EntityKind::Primary,
broadcast: ankurah_signals::broadcast::Broadcast::new(),
})))
}
pub(crate) fn generate_commit_event(&self) -> Result<Option<Event>, MutationError> {
let state = self.state.read().expect("other thread panicked, panic here too");
let mut operations = BTreeMap::<String, Vec<ankurah_proto::Operation>>::new();
for (name, backend) in &state.backends {
if let Some(ops) = backend.to_operations()? {
operations.insert(name.clone(), ops);
}
}
if operations.is_empty() {
Ok(None)
} else {
let operations = OperationSet(operations);
let event = Event { entity_id: self.id, collection: self.collection.clone(), operations, parent: state.head.clone() };
Ok(Some(event))
}
}
pub(crate) fn commit_head(&self, new_head: Clock) {
self.state.write().unwrap().head = new_head;
}
fn try_mutate<F, E>(&self, expected_head: &mut Clock, body: F) -> Result<bool, E>
where F: FnOnce(&mut EntityInnerState) -> Result<(), E> {
let mut state = self.state.write().unwrap();
if &state.head != expected_head {
*expected_head = state.head.clone();
return Ok(false);
}
body(&mut state)?;
Ok(true)
}
pub fn view<V: View>(&self) -> Option<V> {
if self.collection() != &V::collection() {
None
} else {
Some(V::from_entity(self.clone()))
}
}
#[cfg_attr(feature = "instrument", tracing::instrument(level="debug", skip_all, fields(entity = %self, event = %event)))]
pub async fn apply_event<G>(&self, getter: &G, event: &Event) -> Result<bool, MutationError>
where G: GetEvents<Id = EventId, Event = Event> {
debug!("apply_event head: {event} to {self}");
if event.is_entity_create() {
let mut state = self.state.write().unwrap();
if state.head.is_empty() {
for (backend_name, operations) in event.operations.iter() {
state.apply_operations(backend_name.clone(), operations)?;
}
state.head = event.id().into();
drop(state); self.broadcast.send(());
return Ok(true);
}
}
let mut head = self.head();
const MAX_RETRIES: usize = 5;
let budget = 100;
for attempt in 0..MAX_RETRIES {
let new_head: Clock = match crate::lineage::compare_unstored_event(getter, event, &head, budget).await? {
lineage::Ordering::Equal => return Ok(false),
lineage::Ordering::Descends => event.id().into(),
lineage::Ordering::NotDescends { meet: _ } => {
warn!("NotDescends - HACK - applying (attempt {})", attempt + 1);
head.with_event(event.id())
}
lineage::Ordering::Incomparable => {
return Err(LineageError::Incomparable.into());
}
lineage::Ordering::PartiallyDescends { meet } => {
return Err(LineageError::PartiallyDescends { meet }.into());
}
lineage::Ordering::BudgetExceeded { subject_frontier, other_frontier } => {
warn!(
"apply_event budget exhausted after {budget} events. Assuming Descends. subject_frontier: {}, other_frontier: {}",
subject_frontier.iter().map(|id| id.to_base64_short()).collect::<Vec<String>>().join(", "),
other_frontier.iter().map(|id| id.to_base64_short()).collect::<Vec<String>>().join(", ")
);
event.id().into()
}
};
if self.try_mutate(&mut head, move |state| -> Result<(), MutationError> {
for (backend_name, operations) in event.operations.iter() {
state.apply_operations(backend_name.clone(), operations)?;
}
state.head = new_head;
Ok(())
})? {
self.broadcast.send(());
return Ok(true);
}
continue;
}
warn!("apply_event retries exhausted while chasing moving head; applying event as Descends");
Err(MutationError::TOCTOUAttemptsExhausted)
}
pub async fn apply_state<G>(&self, getter: &G, state: &State) -> Result<bool, MutationError>
where G: GetEvents<Id = EventId, Event = Event> {
let mut head = self.head();
let new_head = state.head.clone();
debug!("{self} apply_state - new head: {new_head}");
let budget = 100;
const MAX_RETRIES: usize = 5;
for _attempt in 0..MAX_RETRIES {
let apply = match crate::lineage::compare(getter, &new_head, &head, budget).await? {
lineage::Ordering::Equal => return Ok(false),
lineage::Ordering::Descends => true,
lineage::Ordering::NotDescends { meet: _ } => return Ok(false),
lineage::Ordering::Incomparable => return Err(LineageError::Incomparable.into()),
lineage::Ordering::PartiallyDescends { meet } => return Err(LineageError::PartiallyDescends { meet }.into()),
lineage::Ordering::BudgetExceeded { subject_frontier, other_frontier } => {
warn!(
"{self} apply_state - budget exhausted after {budget} events. Assuming Descends. subject: {subject_frontier:?}, other: {other_frontier:?}"
);
true
}
};
if apply {
if self.try_mutate::<_, MutationError>(&mut head, |es| -> Result<(), MutationError> {
for (name, state_buffer) in state.state_buffers.iter() {
let backend = backend_from_string(name, Some(state_buffer))?;
es.backends.insert(name.to_owned(), backend);
}
es.head = state.head.clone();
Ok(())
})? {
self.broadcast.send(());
return Ok(true);
}
continue;
}
}
warn!("{self} apply_state retries exhausted while chasing moving head");
Err(MutationError::TOCTOUAttemptsExhausted)
}
pub fn snapshot(&self, trx_alive: Arc<AtomicBool>) -> Self {
let state = self.state.read().expect("other thread panicked, panic here too");
let mut forked = BTreeMap::new();
for (name, backend) in &state.backends {
forked.insert(name.clone(), backend.fork());
}
Self(Arc::new(EntityInner {
id: self.id,
collection: self.collection.clone(),
state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends: forked }),
kind: EntityKind::Transacted { trx_alive, upstream: self.clone() },
broadcast: ankurah_signals::broadcast::Broadcast::new(),
}))
}
pub fn broadcast(&self) -> &ankurah_signals::broadcast::Broadcast { &self.broadcast }
pub fn get_backend<P: PropertyBackend>(&self) -> Result<Arc<P>, RetrievalError> {
let backend_name = P::property_backend_name();
let mut state = self.state.write().expect("other thread panicked, panic here too");
if let Some(backend) = state.backends.get(&backend_name) {
let upcasted = backend.clone().as_arc_dyn_any();
Ok(upcasted.downcast::<P>().unwrap()) } else {
let backend = backend_from_string(&backend_name, None)?;
let upcasted = backend.clone().as_arc_dyn_any();
let typed_backend = upcasted.downcast::<P>().unwrap(); state.backends.insert(backend_name, backend);
Ok(typed_backend)
}
}
pub fn values(&self) -> Vec<(String, Option<Value>)> {
let state = self.state.read().expect("other thread panicked, panic here too");
state
.backends
.values()
.flat_map(|backend| {
backend
.property_values()
.iter()
.map(|(name, value)| (name.to_string(), value.clone()))
.collect::<Vec<(String, Option<Value>)>>()
})
.collect()
}
}
impl AbstractEntity for Entity {
fn collection(&self) -> ankurah_proto::CollectionId { self.collection.clone() }
fn id(&self) -> &ankurah_proto::EntityId { &self.id }
fn value(&self, field: &str) -> Option<crate::value::Value> {
if field == "id" {
Some(crate::value::Value::EntityId(self.id))
} else {
let state = self.state.read().expect("other thread panicked, panic here too");
state.backends.values().find_map(|backend| backend.property_value(&field.into()))
}
}
}
impl std::fmt::Display for Entity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Entity({}/{} {:#})", self.collection, self.id.to_base64_short(), self.head())
}
}
impl Filterable for Entity {
fn collection(&self) -> &str { self.collection.as_str() }
fn value(&self, name: &str) -> Option<Value> {
if name == "id" {
Some(Value::EntityId(self.id))
} else {
let state = self.state.read().expect("other thread panicked, panic here too");
state.backends.values().find_map(|backend| backend.property_value(&name.to_owned()))
}
}
}
impl TemporaryEntity {
pub fn new(id: EntityId, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
let mut backends = BTreeMap::new();
for (name, state_buffer) in state.state_buffers.iter() {
let backend = backend_from_string(name, Some(state_buffer))?;
backends.insert(name.to_owned(), backend);
}
Ok(Self(Arc::new(EntityInner {
id,
collection,
state: std::sync::RwLock::new(EntityInnerState { head: state.head.clone(), backends }),
kind: EntityKind::Primary,
broadcast: ankurah_signals::broadcast::Broadcast::new(),
})))
}
pub fn values(&self) -> Vec<(String, Option<Value>)> {
let state = self.0.state.read().expect("other thread panicked, panic here too");
state.backends.values().flat_map(|backend| backend.property_values()).collect()
}
}
impl Filterable for TemporaryEntity {
fn collection(&self) -> &str { self.0.collection.as_str() }
fn value(&self, name: &str) -> Option<Value> {
if name == "id" {
Some(Value::EntityId(self.0.id))
} else {
let state = self.0.state.read().expect("other thread panicked, panic here too");
state.backends.values().find_map(|backend| backend.property_value(&name.to_owned()))
}
}
}
impl std::fmt::Display for TemporaryEntity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TemporaryEntity({}/{}) = {}", &self.collection, self.id, self.0.state.read().unwrap().head)
}
}
#[derive(Clone, Default)]
pub struct WeakEntitySet(Arc<std::sync::RwLock<BTreeMap<EntityId, WeakEntity>>>);
impl WeakEntitySet {
pub fn get(&self, id: &EntityId) -> Option<Entity> {
let entities = self.0.read().unwrap();
if let Some(entity) = entities.get(id) {
entity.upgrade()
} else {
None
}
}
pub async fn get_or_retrieve<R>(
&self,
retriever: &R,
collection_id: &CollectionId,
id: &EntityId,
) -> Result<Option<Entity>, RetrievalError>
where
R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
{
match self.get(id) {
Some(entity) => Ok(Some(entity)),
None => match retriever.get_state(*id).await {
Ok(None) => Ok(None),
Ok(Some(state)) => {
let (_, entity) = self.with_state(retriever, *id, collection_id.to_owned(), state.payload.state).await?;
Ok(Some(entity))
}
Err(e) => Err(e),
},
}
}
pub async fn get_retrieve_or_create<R>(
&self,
retriever: &R,
collection_id: &CollectionId,
id: &EntityId,
) -> Result<Entity, RetrievalError>
where
R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
{
match self.get_or_retrieve(retriever, collection_id, id).await? {
Some(entity) => Ok(entity),
None => {
let mut entities = self.0.write().unwrap();
if let Some(entity) = entities.get(id) {
if let Some(entity) = entity.upgrade() {
return Ok(entity);
}
}
let entity = Entity::create(*id, collection_id.to_owned());
entities.insert(*id, entity.weak());
Ok(entity)
}
}
}
pub fn create(&self, collection: CollectionId) -> Entity {
let mut entities = self.0.write().unwrap();
let id = EntityId::new();
let entity = Entity::create(id, collection);
entities.insert(id, entity.weak());
entity
}
#[cfg(feature = "test-helpers")]
pub fn conjure_evil_phantom(&self, id: EntityId, collection: CollectionId) -> Entity {
let mut entities = self.0.write().unwrap();
let entity = Entity::create(id, collection);
entities.insert(id, entity.weak());
entity
}
fn private_get_or_create(&self, id: EntityId, collection_id: &CollectionId, state: &State) -> Result<(bool, Entity), RetrievalError> {
let mut entities = self.0.write().unwrap();
if let Some(existing_weak) = entities.get(&id) {
if let Some(existing_entity) = existing_weak.upgrade() {
debug!("Entity {id} was created by another thread during async work, using that one");
return Ok((true, existing_entity));
}
}
let entity = Entity::from_state(id, collection_id.to_owned(), state)?;
entities.insert(id, entity.weak());
Ok((false, entity))
}
pub async fn with_state<R>(
&self,
retriever: &R,
id: EntityId,
collection_id: CollectionId,
state: State,
) -> Result<(Option<bool>, Entity), RetrievalError>
where
R: Retrieve<Id = EventId, Event = Event>,
{
let entity = match self.get(&id) {
Some(entity) => entity, None => {
if let Some(stored_state) = retriever.get_state(id).await? {
self.private_get_or_create(id, &collection_id, &stored_state.payload.state)?.1
} else {
match self.private_get_or_create(id, &collection_id, &state)? {
(true, entity) => entity, (false, entity) => {
return Ok((None, entity));
}
}
}
}
};
let changed = entity.apply_state(retriever, &state).await?;
Ok((Some(changed), entity))
}
}