use crate::{
changes::EntityChange,
error::{ApplyError, ApplyErrorItem, MutationError},
lineage::Retrieve,
node::Node,
policy::PolicyAgent,
storage::StorageEngine,
util::ready_chunks::ReadyChunks,
};
use ankurah_proto::{self as proto, Event, EventId};
use futures::stream::StreamExt;
use proto::Attested;
pub struct NodeApplier;
impl NodeApplier {
pub(crate) async fn apply_updates<SE, PA>(
node: &Node<SE, PA>,
from_peer_id: &proto::EntityId,
items: Vec<proto::SubscriptionUpdateItem>,
) -> Result<(), MutationError>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
{
tracing::debug!("received subscription update for {} items", items.len());
let Some(relay) = &node.subscription_relay else {
return Err(MutationError::InvalidUpdate("Should not be receiving updates without a subscription relay"));
};
let cdata = relay.get_contexts_for_peer(from_peer_id);
if cdata.is_empty() {
return Err(MutationError::InvalidUpdate("Should not be receiving updates without at least predicate context"));
}
let mut changes = Vec::new();
for update in items {
let retriever = crate::retrieval::EphemeralNodeRetriever::new(update.collection.clone(), node, &cdata);
Self::apply_update(node, from_peer_id, update, &retriever, &mut changes, &mut ()).await?;
retriever.store_used_events().await?;
}
node.reactor.notify_change(changes).await;
Ok(())
}
async fn apply_update<SE, PA, R>(
node: &Node<SE, PA>,
from_peer_id: &proto::EntityId,
update: proto::SubscriptionUpdateItem,
retriever: &R,
changes: &mut Vec<EntityChange>,
entities: &mut impl Pushable<crate::entity::Entity>,
) -> Result<(), MutationError>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
{
let proto::SubscriptionUpdateItem { entity_id, collection: collection_id, content, predicate_relevance: _ } = update;
let collection = node.collections.get(&collection_id).await?;
match content {
proto::UpdateContent::EventOnly(event_fragments) => {
let events = Self::save_events(node, from_peer_id, entity_id, &collection_id, event_fragments, &collection).await?;
let entity = node.entities.get_retrieve_or_create(retriever, &collection_id, &entity_id).await?;
entities.push(entity.clone());
let mut applied_events = Vec::new();
for event in events {
if entity.apply_event(retriever, &event.payload).await? {
applied_events.push(event);
}
}
if !applied_events.is_empty() {
changes.push(EntityChange::new(entity, applied_events)?);
}
}
proto::UpdateContent::StateAndEvent(state_fragment, event_fragments) => {
let events = Self::save_events(node, from_peer_id, entity_id, &collection_id, event_fragments, &collection).await?;
let state = (entity_id, collection_id.clone(), state_fragment.clone()).into();
node.policy_agent.validate_received_state(node, from_peer_id, &state)?;
let (changed, entity) = node.entities.with_state(retriever, entity_id, collection_id, state.payload.state).await?;
entities.push(entity.clone());
if matches!(changed, Some(true) | None) {
Self::save_state(node, &entity, &collection).await?;
changes.push(EntityChange::new(entity, events)?);
}
}
}
Ok(())
}
async fn save_events<SE, PA>(
node: &Node<SE, PA>,
from_peer_id: &proto::EntityId,
entity_id: proto::EntityId,
collection_id: &proto::CollectionId,
fragments: Vec<proto::EventFragment>,
collection: &crate::storage::StorageCollectionWrapper,
) -> Result<Vec<Attested<proto::Event>>, MutationError>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
{
let mut attested_events = Vec::new();
for fragment in fragments {
let attested_event = (entity_id, collection_id.clone(), fragment).into();
node.policy_agent.validate_received_event(node, from_peer_id, &attested_event)?;
collection.add_event(&attested_event).await?;
attested_events.push(attested_event);
}
Ok(attested_events)
}
async fn save_state<SE, PA>(
node: &Node<SE, PA>,
entity: &crate::entity::Entity,
collection_wrapper: &crate::storage::StorageCollectionWrapper,
) -> Result<(), MutationError>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
{
let state = entity.to_state()?;
let entity_state = proto::EntityState { entity_id: entity.id(), collection: entity.collection().clone(), state };
let attestation = node.policy_agent.attest_state(node, &entity_state);
let attested = Attested::opt(entity_state, attestation);
collection_wrapper.set_state(attested).await?;
Ok(())
}
pub(crate) async fn apply_deltas<SE, PA, R>(
node: &Node<SE, PA>,
from_peer_id: &proto::EntityId,
deltas: Vec<proto::EntityDelta>,
retriever: &R,
) -> Result<(), ApplyError>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
{
let mut ready_chunks = ReadyChunks::new(deltas.into_iter().map(|delta| Self::apply_delta(node, from_peer_id, delta, retriever)));
let mut all_errors = Vec::new();
while let Some(results) = ready_chunks.next().await {
let mut batch = Vec::new();
for result in results {
match result {
Ok(Some(change)) => batch.push(change),
Ok(None) => {} Err(error_item) => {
all_errors.push(error_item);
}
}
}
if !batch.is_empty() {
node.reactor.notify_change(batch).await;
}
}
if !all_errors.is_empty() {
return Err(ApplyError::Items(all_errors));
}
Ok(())
}
async fn apply_delta<SE, PA, R>(
node: &Node<SE, PA>,
from_peer_id: &proto::EntityId,
delta: proto::EntityDelta,
retriever: &R,
) -> Result<Option<EntityChange>, ApplyErrorItem>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
{
let entity_id = delta.entity_id;
let collection = delta.collection.clone();
let result = Self::apply_delta_inner(node, from_peer_id, delta, retriever).await;
result.map_err(|cause| ApplyErrorItem { entity_id, collection, cause })
}
async fn apply_delta_inner<SE, PA, R>(
node: &Node<SE, PA>,
from_peer_id: &proto::EntityId,
delta: proto::EntityDelta,
retriever: &R,
) -> Result<Option<EntityChange>, MutationError>
where
SE: StorageEngine + Send + Sync + 'static,
PA: PolicyAgent + Send + Sync + 'static,
R: Retrieve<Id = EventId, Event = Event> + Send + Sync,
{
let collection = node.collections.get(&delta.collection).await?;
match delta.content {
proto::DeltaContent::StateSnapshot { state } => {
let attested_state = (delta.entity_id, delta.collection.clone(), state).into();
node.policy_agent.validate_received_state(node, from_peer_id, &attested_state)?;
let (_, entity) =
node.entities.with_state(retriever, delta.entity_id, delta.collection, attested_state.payload.state).await?;
Self::save_state(node, &entity, &collection).await?;
Ok(Some(EntityChange::new(entity, Vec::new())?))
}
proto::DeltaContent::EventBridge { events } => {
let attested_events: Vec<Attested<proto::Event>> =
events.into_iter().map(|f| (delta.entity_id, delta.collection.clone(), f).into()).collect();
retriever.stage_events(attested_events.clone());
let entity = node.entities.get_retrieve_or_create(retriever, &delta.collection, &delta.entity_id).await?;
for event in attested_events.into_iter().rev() {
entity.apply_event(retriever, &event.payload).await?;
retriever.mark_event_used(&event.payload.id());
}
Self::save_state(node, &entity, &collection).await?;
Ok(Some(EntityChange::new(entity, Vec::new())?))
}
proto::DeltaContent::StateAndRelation { state: _, relation: _ } => {
unimplemented!("StateAndRelation not yet implemented in Phase 1")
}
}
}
}
trait Pushable<T> {
fn push(&mut self, value: T);
}
impl<T> Pushable<T> for Vec<T> {
fn push(&mut self, value: T) { self.push(value); }
}
impl<T> Pushable<T> for &mut Vec<T> {
fn push(&mut self, value: T) { (*self).push(value); }
}
impl<T> Pushable<T> for () {
fn push(&mut self, _: T) {
}
}