use crate::{action::Action, context::Context, entry::CanPublish, nucleus::ZomeFnCall};
use holochain_core_types::{
entry::Entry, link::link_data::LinkData, network::entry_aspect::EntryAspect,
};
use holochain_persistence_api::cas::content::{Address, AddressableContent};
use serde::Serialize;
use std::{collections::HashMap, sync::Arc};
#[derive(Clone, Debug, Serialize)]
pub struct ConsistencySignal<E: Serialize> {
event: E,
pending: Vec<PendingConsistency<E>>,
}
impl<E: Serialize> ConsistencySignal<E> {
pub fn new_terminal(event: E) -> Self {
Self {
event,
pending: Vec::new(),
}
}
pub fn new_pending(event: E, group: ConsistencyGroup, pending_events: Vec<E>) -> Self {
let pending = pending_events
.into_iter()
.map(|event| PendingConsistency {
event,
group: group.clone(),
})
.collect();
Self { event, pending }
}
}
impl From<ConsistencySignalE> for ConsistencySignal<String> {
fn from(signal: ConsistencySignalE) -> ConsistencySignal<String> {
let ConsistencySignalE { event, pending } = signal;
ConsistencySignal {
event: serde_json::to_string(&event)
.expect("ConsistencySignal serialization cannot fail"),
pending: pending
.into_iter()
.map(|p| PendingConsistency {
event: serde_json::to_string(&p.event)
.expect("ConsistencySignal serialization cannot fail"),
group: p.group,
})
.collect(),
}
}
}
type ConsistencySignalE = ConsistencySignal<ConsistencyEvent>;
#[derive(Clone, Debug, Serialize)]
#[allow(clippy::large_enum_variant)]
pub enum ConsistencyEvent {
Publish(Address), InitializeNetwork, InitializeChain, SignalZomeFunctionCall(String, snowflake::ProcessUniqueId),
Hold(Address), UpdateEntry(Address, Address), RemoveEntry(Address, Address), AddLink(LinkData), RemoveLink(Address), ReturnZomeFunctionResult(String, snowflake::ProcessUniqueId), }
#[derive(Clone, Debug, Serialize)]
struct PendingConsistency<E: Serialize> {
event: E,
group: ConsistencyGroup,
}
#[derive(Clone, Debug, Serialize)]
pub enum ConsistencyGroup {
Source,
Validators,
}
#[derive(Clone)]
pub struct ConsistencyModel {
commit_cache: HashMap<Address, ConsistencySignalE>,
chain_initialized: bool,
context: Arc<Context>,
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
impl ConsistencyModel {
pub fn new(context: Arc<Context>) -> Self {
Self {
commit_cache: HashMap::new(),
chain_initialized: false,
context,
}
}
pub fn process_action(&mut self, action: &Action) -> Option<ConsistencySignalE> {
use ConsistencyEvent::*;
use ConsistencyGroup::*;
match action {
Action::Commit((entry, crud_link, _)) => {
let do_cache = self.context.state().is_none()
|| self.context.get_dna().is_none()
|| entry.entry_type().can_publish(&self.context);
if do_cache {
let address = entry.address();
let hold = Hold(address.clone());
let meta = match entry {
Entry::App(_, _) => crud_link
.clone()
.map(|crud| UpdateEntry(crud, address.clone())),
Entry::Deletion(_) => crud_link
.clone()
.map(|crud| RemoveEntry(crud, address.clone())),
Entry::LinkAdd(link_data) => Some(AddLink(link_data.clone())),
Entry::LinkRemove(_) => Some(RemoveLink(address.clone())),
_ => None,
};
let mut pending = vec![hold];
if let Some(m) = meta {
pending.push(m)
}
let signal = ConsistencySignal::new_pending(
Publish(address.clone()),
Validators,
pending,
);
self.commit_cache.insert(address, signal);
}
None
}
Action::Publish(address) => {
let maybe_signal = self.commit_cache.remove(address);
maybe_signal.or_else(|| {
log_warn!(
self.context,
"consistency: Publishing address that was not previously committed"
);
None
})
}
Action::HoldAspect((aspect,_)) => match aspect {
EntryAspect::Content(entry, _) => Some(ConsistencySignal::new_terminal(Hold(entry.address()))),
EntryAspect::Update(_, header) => {
header.link_update_delete().map(|old| {
let new = header.entry_address().clone();
ConsistencySignal::new_terminal(
ConsistencyEvent::UpdateEntry(old, new),
)
}).or_else(|| {
error!("Got header without link_update_delete associated with EntryAspect::Update");
None
})
},
EntryAspect::Deletion(header) => {
header.link_update_delete().map(|old| {
let new = header.entry_address().clone();
ConsistencySignal::new_terminal(
ConsistencyEvent::RemoveEntry(old, new),
)
}).or_else(|| {
error!("Got header without link_update_delete associated with EntryAspect::Deletion");
None
})
},
EntryAspect::LinkAdd(data, _) => Some(ConsistencySignal::new_terminal(
ConsistencyEvent::AddLink(data.clone()),
)),
EntryAspect::LinkRemove(_, header) => Some(ConsistencySignal::new_terminal(
ConsistencyEvent::RemoveLink(header.entry_address().clone()),
)),
EntryAspect::Header(_) => {
error!("Got EntryAspect::Header type, unexpectedly");
None
}
}
Action::QueueZomeFunctionCall(call) => Some(ConsistencySignal::new_pending(
SignalZomeFunctionCall(display_zome_fn_call(call), call.id()),
Source,
vec![ReturnZomeFunctionResult(
display_zome_fn_call(call),
call.id(),
)],
)),
Action::ReturnZomeFunctionResult(result) => Some(ConsistencySignal::new_terminal(
ReturnZomeFunctionResult(display_zome_fn_call(&result.call()), result.call().id()),
)),
Action::InitNetwork(settings) => {
if self.chain_initialized {
Some(ConsistencySignal::new_pending(
InitializeChain,
Validators,
vec![Hold(Address::from(settings.agent_id.clone()))],
))
} else {
None
}
}
Action::InitializeChain(_) => {
self.chain_initialized = true;
None
}
_ => None,
}
}
}
fn display_zome_fn_call(call: &ZomeFnCall) -> String {
format!("{}/{}", call.zome_name, call.fn_name)
}