use crate::{
changes::EntityChange,
entity::Entity,
error::{MutationError, RetrievalError},
livequery::{EntityLiveQuery, LiveQuery},
model::View,
node::{MatchArgs, Node},
policy::{AccessDenied, PolicyAgent},
storage::{StorageCollectionWrapper, StorageEngine},
transaction::Transaction,
};
use ankurah_proto::{self as proto, Attested, Clock, CollectionId, EntityState};
use async_trait::async_trait;
use std::sync::{atomic::AtomicBool, Arc};
use tracing::debug;
#[cfg(feature = "wasm")]
use wasm_bindgen::prelude::*;
#[cfg_attr(feature = "wasm", wasm_bindgen)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Object))]
pub struct Context(Arc<dyn TContext + Send + Sync + 'static>);
impl Clone for Context {
fn clone(&self) -> Self { Self(self.0.clone()) }
}
pub struct NodeAndContext<SE, PA: PolicyAgent>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
{
pub node: Node<SE, PA>,
pub cdata: PA::ContextData,
}
#[async_trait]
pub trait TContext {
fn node_id(&self) -> proto::EntityId;
fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity;
fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied>;
async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError>;
fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity>;
async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError>;
async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError>;
fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError>;
async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError>;
}
#[async_trait]
impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> TContext for NodeAndContext<SE, PA> {
fn node_id(&self) -> proto::EntityId { self.node.id }
fn create_entity(&self, collection: proto::CollectionId, trx_alive: Arc<AtomicBool>) -> Entity {
let primary_entity = self.node.entities.create(collection);
primary_entity.snapshot(trx_alive)
}
fn check_write(&self, entity: &Entity) -> Result<(), AccessDenied> { self.node.policy_agent.check_write(&self.cdata, entity, None) }
async fn get_entity(&self, id: proto::EntityId, collection: &proto::CollectionId, cached: bool) -> Result<Entity, RetrievalError> {
self.get_entity(collection, id, cached).await
}
fn get_resident_entity(&self, id: proto::EntityId) -> Option<Entity> { self.node.entities.get(&id) }
async fn fetch_entities(&self, collection: &proto::CollectionId, args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
self.fetch_entities(collection, args).await
}
async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError> { self.commit_local_trx(trx).await }
fn query(&self, collection_id: proto::CollectionId, args: MatchArgs) -> Result<EntityLiveQuery, RetrievalError> {
EntityLiveQuery::new(&self.node, collection_id, args, self.cdata.clone())
}
async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
self.node.system.collection(id).await
}
}
#[cfg(feature = "wasm")]
#[wasm_bindgen]
impl Context {
#[wasm_bindgen(js_name = "node_id")]
pub fn js_node_id(&self) -> proto::EntityId { self.0.node_id() }
}
#[cfg_attr(feature = "wasm", wasm_bindgen)]
#[cfg_attr(feature = "uniffi", uniffi::export)]
impl Context {
pub fn begin(&self) -> Transaction { Transaction::new(self.0.clone()) }
}
impl Context {
pub fn new<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static>(
node: Node<SE, PA>,
data: PA::ContextData,
) -> Self {
Self(Arc::new(NodeAndContext { node, cdata: data }))
}
pub fn node_id(&self) -> proto::EntityId { self.0.node_id() }
pub async fn get<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
let entity = self.0.get_entity(id, &R::collection(), false).await?;
Ok(R::from_entity(entity))
}
pub async fn get_cached<R: View>(&self, id: proto::EntityId) -> Result<R, RetrievalError> {
let entity = self.0.get_entity(id, &R::collection(), true).await?;
Ok(R::from_entity(entity))
}
pub async fn fetch<R: View>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<Vec<R>, RetrievalError> {
let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
use crate::model::Model;
let collection_id = R::Model::collection();
let entities = self.0.fetch_entities(&collection_id, args).await?;
Ok(entities.into_iter().map(|e| R::from_entity(e)).collect())
}
pub async fn fetch_one<R: View + Clone + 'static>(
&self,
args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
) -> Result<Option<R>, RetrievalError> {
let views = self.fetch::<R>(args).await?;
Ok(views.into_iter().next())
}
pub fn query<R>(&self, args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>) -> Result<LiveQuery<R>, RetrievalError>
where R: View {
let args: MatchArgs = args.try_into().map_err(|e| e.into())?;
use crate::model::Model;
Ok(self.0.query(R::Model::collection(), args)?.map::<R>())
}
pub async fn query_wait<R>(
&self,
args: impl TryInto<MatchArgs, Error = impl Into<RetrievalError>>,
) -> Result<LiveQuery<R>, RetrievalError>
where
R: View,
{
let livequery = self.query::<R>(args)?;
livequery.wait_initialized().await;
Ok(livequery)
}
pub async fn collection(&self, id: &proto::CollectionId) -> Result<StorageCollectionWrapper, RetrievalError> {
self.0.collection(id).await
}
}
impl<SE, PA> NodeAndContext<SE, PA>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
{
pub(crate) async fn get_entity(
&self,
collection_id: &CollectionId,
id: proto::EntityId,
cached: bool,
) -> Result<Entity, RetrievalError> {
debug!("Node({}).get_entity {:?}-{:?}", self.node.id, id, collection_id);
if !self.node.durable {
match self.node.get_from_peer(collection_id, vec![id], &self.cdata).await {
Ok(_) => (),
Err(RetrievalError::NoDurablePeers) if cached => (),
Err(e) => {
return Err(e);
}
}
}
if let Some(local) = self.node.entities.get(&id) {
debug!("Node({}).get_entity found local entity - returning", self.node.id);
let state = local.to_state()?;
let entity_id = local.id();
self.node.policy_agent.check_read(&self.cdata, &entity_id, collection_id, &state)?;
return Ok(local);
}
debug!("{}.get_entity fetching from storage", self.node);
let collection = self.node.collections.get(collection_id).await?;
match collection.get_state(id).await {
Ok(entity_state) => {
self.node.policy_agent.check_read(
&self.cdata,
&entity_state.payload.entity_id,
collection_id,
&entity_state.payload.state,
)?;
let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
let (_changed, entity) =
self.node.entities.with_state(&retriever, id, collection_id.clone(), entity_state.payload.state).await?;
Ok(entity)
}
Err(e) => Err(e),
}
}
pub async fn fetch_entities(&self, collection_id: &CollectionId, mut args: MatchArgs) -> Result<Vec<Entity>, RetrievalError> {
self.node.policy_agent.can_access_collection(&self.cdata, collection_id)?;
args.selection.predicate = self.node.policy_agent.filter_predicate(&self.cdata, collection_id, args.selection.predicate)?;
args.selection = self.node.type_resolver.resolve_selection_types(args.selection);
if !self.node.durable {
Ok(self.fetch_from_peer(collection_id, args.selection).await?)
} else {
let storage_collection = self.node.collections.get(collection_id).await?;
let states = storage_collection.fetch_states(&args.selection).await?;
let mut entities = Vec::new();
for state in states {
let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
let (_, entity) =
self.node.entities.with_state(&retriever, state.payload.entity_id, collection_id.clone(), state.payload.state).await?;
entities.push(entity);
}
Ok(entities)
}
}
pub async fn commit_local_trx(&self, trx: &Transaction) -> Result<(), MutationError> {
use std::sync::atomic::Ordering;
if trx.alive.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
return Err(MutationError::General("Transaction already committed or rolled back".into()));
}
let trx_id = trx.id.clone();
let mut entity_events = Vec::new();
for entity in trx.entities.iter() {
if let Some(event) = entity.generate_commit_event()? {
if event.is_entity_create() {
let created_ids = trx.created_entity_ids.read().unwrap();
if !created_ids.contains(&entity.id) {
return Err(MutationError::General(
format!(
"Cannot commit phantom entity {}: entity has empty parent (creation event) \
but was not created in this transaction via create()",
entity.id
)
.into(),
));
}
}
entity_events.push((entity.clone(), event));
}
}
let mut attested_events = Vec::new();
let mut entity_attested_events = Vec::new();
for (entity, event) in entity_events {
use std::sync::atomic::AtomicBool;
let trx_alive = Arc::new(AtomicBool::new(true));
let forked = entity.snapshot(trx_alive);
let entity_before = match &entity.kind {
crate::entity::EntityKind::Transacted { upstream, .. } => upstream.clone(),
crate::entity::EntityKind::Primary => entity.clone(),
};
let collection_id = &event.collection;
let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
forked.apply_event(&retriever, &event).await?;
let attestation = self.node.policy_agent.check_event(&self.node, &self.cdata, &entity_before, &forked, &event)?;
let attested = Attested::opt(event.clone(), attestation);
attested_events.push(attested.clone());
entity_attested_events.push((entity, attested));
}
for (entity, attested_event) in &entity_attested_events {
let collection = self.node.collections.get(&attested_event.payload.collection).await?;
collection.add_event(&attested_event).await?;
entity.commit_head(Clock::new([attested_event.payload.id()]));
}
self.node.relay_to_required_peers(&self.cdata, trx_id, &attested_events).await?;
let mut changes: Vec<EntityChange> = Vec::new();
for (entity, attested_event) in entity_attested_events {
let collection_id = &attested_event.payload.collection;
let collection = self.node.collections.get(collection_id).await?;
let canonical_entity = match &entity.kind {
crate::entity::EntityKind::Transacted { upstream, .. } => {
let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
upstream.apply_event(&retriever, &attested_event.payload).await?;
upstream.clone()
}
crate::entity::EntityKind::Primary => entity,
};
let state = canonical_entity.to_state()?;
let entity_state = EntityState { entity_id: canonical_entity.id(), collection: canonical_entity.collection().clone(), state };
let attestation = self.node.policy_agent.attest_state(&self.node, &entity_state);
let attested = Attested::opt(entity_state, attestation);
collection.set_state(attested).await?;
changes.push(EntityChange::new(canonical_entity, vec![attested_event])?);
}
self.node.reactor.notify_change(changes).await;
Ok(())
}
async fn fetch_from_peer(
&self,
collection_id: &proto::CollectionId,
selection: ankql::ast::Selection,
) -> Result<Vec<crate::entity::Entity>, RetrievalError> {
let peer_id = self.node.get_durable_peer_random().ok_or(RetrievalError::NoDurablePeers)?;
let known_matched_entities = self.node.fetch_entities_from_local(collection_id, &selection).await?;
let known_matches = known_matched_entities
.iter()
.map(|entity| proto::KnownEntity { entity_id: entity.id(), head: entity.head().clone() })
.collect();
let selection_clone = selection.clone();
match self
.node
.request(peer_id, &self.cdata, proto::NodeRequestBody::Fetch { collection: collection_id.clone(), selection, known_matches })
.await?
{
proto::NodeResponseBody::Fetch(deltas) => {
let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id.clone(), &self.node, &self.cdata);
crate::node_applier::NodeApplier::apply_deltas(&self.node, &peer_id, deltas, &retriever).await?;
self.node.fetch_entities_from_local(collection_id, &selection_clone).await
}
proto::NodeResponseBody::Error(e) => {
tracing::debug!("Error from peer fetch: {}", e);
Err(RetrievalError::Other(format!("{:?}", e)))
}
_ => {
tracing::debug!("Unexpected response type from peer fetch");
Err(RetrievalError::Other("Unexpected response type".to_string()))
}
}
}
}