use crate::{
action::{Action, ActionWrapper},
consistency::ConsistencyModel,
context::{ActionReceiver, ActionSender, Context},
dht::actions::remove_queued_holding_workflow::{
remove_queued_holding_workflow, HoldingWorkflowQueueing,
},
network,
persister::Persister,
scheduled_jobs,
signal::Signal,
state::{State, StateWrapper},
state_dump::DumpOptions,
workflows::{application, run_holding_workflow},
};
#[cfg(test)]
use crate::{
network::actions::initialize_network::initialize_network_with_spoofed_dna,
nucleus::actions::initialize::initialize_chain,
};
use clokwerk::{ScheduleHandle, Scheduler, TimeUnits};
use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::task::Waker;
use holochain_core_types::{
dna::Dna,
error::{HcResult, HolochainError},
};
use holochain_locksmith::RwLock;
#[cfg(test)]
use holochain_persistence_api::cas::content::Address;
use holochain_tracing::{self as ht, channel::lax_send_wrapped};
use snowflake::ProcessUniqueId;
use std::{
collections::HashMap,
sync::{
atomic::Ordering::{self, Relaxed},
Arc,
},
thread,
time::{Duration, Instant},
};
pub const RETRY_VALIDATION_DURATION_MIN: Duration = Duration::from_millis(15000);
pub const RETRY_VALIDATION_DURATION_MAX: Duration = Duration::from_secs(60 * 60);
pub enum WakerRequest {
Add(ProcessUniqueId, Waker),
Remove(ProcessUniqueId),
}
#[derive(Clone)]
pub struct Instance {
state: Arc<RwLock<StateWrapper>>,
action_channel: Option<ActionSender>,
observer_channel: Option<Sender<Observer>>,
waker_channel: Option<Sender<WakerRequest>>,
scheduler_handle: Option<Arc<ScheduleHandle>>,
persister: Option<Arc<RwLock<dyn Persister>>>,
consistency_model: ConsistencyModel,
kill_switch: Option<Sender<()>>,
kill_switch_holding: Option<Sender<()>>,
}
pub struct Observer {
pub ticker: Sender<()>,
}
pub static DISPATCH_WITHOUT_CHANNELS: &str = "dispatch called without channels open";
impl Instance {
pub(in crate::instance) fn inner_setup(&mut self, context: Arc<Context>) -> Arc<Context> {
let (rx_action, rx_observer, rx_waker) = self.initialize_channels();
let context = self.initialize_context(context);
let mut scheduler = Scheduler::new();
scheduler
.every(10.seconds())
.run(scheduled_jobs::create_state_dump_callback(
context.clone(),
DumpOptions {
include_eavis: false,
},
));
scheduler
.every(1.second())
.run(scheduled_jobs::create_timeout_callback(context.clone()));
scheduler
.every(30.seconds())
.run(scheduled_jobs::create_state_pruning_callback(
context.clone(),
));
self.scheduler_handle = Some(Arc::new(scheduler.watch_thread(Duration::from_millis(10))));
self.persister = Some(context.persister.clone());
self.start_action_loop(context.clone(), rx_action, rx_observer, rx_waker);
self.start_holding_loop(context.clone());
context
}
pub fn initialize(
&mut self,
dna: Option<Dna>,
context: Arc<Context>,
) -> HcResult<Arc<Context>> {
let context = self.inner_setup(context);
context.block_on(application::initialize(self, dna, context.clone()))
}
#[cfg(test)]
pub fn initialize_with_spoofed_dna(
&mut self,
dna: Dna,
spoofed_dna_address: Address,
context: Arc<Context>,
) -> HcResult<Arc<Context>> {
let context = self.inner_setup(context);
context.block_on(async {
initialize_chain(dna.clone(), &context).await?;
initialize_network_with_spoofed_dna(spoofed_dna_address, &context).await
})?;
Ok(context)
}
#[cfg(test)]
pub fn initialize_without_dna(&mut self, context: Arc<Context>) -> Arc<Context> {
self.inner_setup(context)
}
fn action_channel(&self) -> &ActionSender {
self.action_channel
.as_ref()
.expect("Action channel not initialized")
}
pub fn observer_channel(&self) -> &Sender<Observer> {
self.observer_channel
.as_ref()
.expect("Observer channel not initialized")
}
pub fn dispatch(&mut self, action_wrapper: ActionWrapper) {
dispatch_action(self.action_channel(), action_wrapper)
}
fn initialize_channels(
&mut self,
) -> (ActionReceiver, Receiver<Observer>, Receiver<WakerRequest>) {
let (tx_action, rx_action) = unbounded::<ht::SpanWrap<ActionWrapper>>();
let (tx_observer, rx_observer) = unbounded::<Observer>();
let (tx_waker, rx_waker) = unbounded::<WakerRequest>();
self.action_channel = Some(tx_action.into());
self.observer_channel = Some(tx_observer);
self.waker_channel = Some(tx_waker);
(rx_action, rx_observer, rx_waker)
}
pub fn initialize_context(&self, context: Arc<Context>) -> Arc<Context> {
let mut sub_context = (*context).clone();
sub_context.set_state(self.state.clone());
sub_context.action_channel = self.action_channel.clone();
sub_context.observer_channel = self.observer_channel.clone();
sub_context.waker_channel = self.waker_channel.clone();
Arc::new(sub_context)
}
pub fn start_action_loop(
&mut self,
context: Arc<Context>,
rx_action: ActionReceiver,
rx_observer: Receiver<Observer>,
rx_waker: Receiver<WakerRequest>,
) {
self.stop_action_loop();
let mut sync_self = self.clone();
let sub_context = self.initialize_context(context);
let (kill_sender, kill_receiver) = crossbeam_channel::unbounded();
self.kill_switch = Some(kill_sender);
let instance_is_alive = sub_context.instance_is_alive.clone();
instance_is_alive.store(true, Ordering::Relaxed);
let _ = thread::Builder::new()
.name(format!(
"action_loop/{}",
ProcessUniqueId::new().to_string()
))
.spawn(move || {
let mut state_observers: Vec<Observer> = Vec::new();
let mut unprocessed_action: Option<ht::SpanWrap<ActionWrapper>> = None;
let mut wakers: HashMap::<ProcessUniqueId, Waker> = HashMap::new();
while kill_receiver.try_recv().is_err() {
if let Some(action_wrapper) = unprocessed_action.take().or_else(|| rx_action.recv_timeout(Duration::from_secs(1)).ok()) {
state_observers.extend(rx_observer.try_iter());
for waker_request in rx_waker.try_iter() {
match waker_request {
WakerRequest::Add(id, waker) => wakers.insert(id, waker),
WakerRequest::Remove(id) => wakers.remove(&id),
};
}
let action = action_wrapper.action();
let should_process = *action != Action::Ping;
if should_process {
match sync_self.process_action(&action_wrapper, &sub_context) {
Ok(()) => {
let tag = ht::Tag::new("action", format!("{:?}", action));
let _guard = action_wrapper.follower_(&sub_context.tracer, "action_loop thread", |s| s.tag(tag).start()).map(|span| {
ht::push_span(span)
});
sync_self.emit_signals(&sub_context, &action_wrapper);
state_observers= state_observers
.into_iter()
.filter(|observer| observer.ticker.send(()).is_ok())
.collect();
for waker in wakers.values() {
waker.clone().wake();
}
},
Err(HolochainError::Timeout(s)) => {
warn!("Instance::process_action() couldn't get lock on state. Trying again next loop. Timeout string: {}", s);
unprocessed_action = Some(action_wrapper);
},
Err(e) => {
error!("Instance::process_action() returned unexpected error: {:?}", e);
unprocessed_action = Some(action_wrapper);
}
};
}
}
}
instance_is_alive.store(false, Relaxed);
});
}
pub fn stop_action_loop(&self) {
if let Some(ref kill_switch) = self.kill_switch {
let _ = kill_switch.send(());
}
if let Some(ref kill_switch) = self.kill_switch_holding {
let _ = kill_switch.send(());
}
}
pub(crate) fn process_action(
&self,
action_wrapper: &ht::SpanWrap<ActionWrapper>,
context: &Arc<Context>,
) -> Result<(), HolochainError> {
let span = action_wrapper
.follower(&context.tracer, "begin process_action")
.unwrap_or_else(|| {
context
.tracer
.span("ROOT: process_action")
.tag(ht::debug_tag("action_wrapper", action_wrapper))
.start()
.into()
});
let _trace_guard = ht::push_span(span);
context.redux_wants_write.store(true, Relaxed);
{
let new_state: StateWrapper;
let mut state = self
.state
.try_write_until(Instant::now().checked_add(Duration::from_secs(10)).unwrap())
.ok_or_else(|| {
HolochainError::Timeout(format!("timeout src: {}:{}", file!(), line!()))
})?;
new_state = state.reduce(action_wrapper.data.clone());
*state = new_state;
if let Err(e) = self.save(&state) {
log_error!(
context,
"instance/process_action: could not save state: {:?}",
e
);
} else {
log_trace!(
context,
"reduce/process_actions: reducing {:?}",
action_wrapper
);
}
}
context.redux_wants_write.store(false, Relaxed);
Ok(())
}
fn start_holding_loop(&mut self, context: Arc<Context>) {
let (kill_sender, kill_receiver) = crossbeam_channel::unbounded();
self.kill_switch_holding = Some(kill_sender);
thread::Builder::new()
.name(format!(
"holding_loop/{}",
ProcessUniqueId::new().to_string()
))
.spawn(move || {
while kill_receiver.try_recv().is_err() {
log_trace!(context, "Checking holding queue...");
loop {
let dht_store = context
.state()
.expect("Couldn't get state in run_pending_validations")
.dht();
let maybe_holding_workflow = dht_store.next_queued_holding_workflow();
if let Some((pending, maybe_delay)) = maybe_holding_workflow {
log_debug!(context, "Found queued validation: {:?}", pending);
context.block_on(remove_queued_holding_workflow(
HoldingWorkflowQueueing::Processing,
pending.clone(),
context.clone(),
));
let c = context.clone();
let pending = pending.clone();
let closure = async move || {
let queuing = match run_holding_workflow(pending.clone(), c.clone()).await {
Err(HolochainError::ValidationPending) => {
let mut delay = maybe_delay
.map(|old_delay| {
old_delay * 2
})
.unwrap_or(RETRY_VALIDATION_DURATION_MIN);
if delay > RETRY_VALIDATION_DURATION_MAX {
delay = RETRY_VALIDATION_DURATION_MAX
}
log_debug!(c, "re-queuing pending validation for {:?} with a delay of {:?}", pending, delay);
HoldingWorkflowQueueing::Waiting(delay)
}
Err(e) => {
log_error!(
c,
"Error running holding workflow for {:?}: {:?}",
pending,
e,
);
HoldingWorkflowQueueing::Done
}
Ok(()) => {
log_debug!(c, "Successfully processed: {:?}", pending);
HoldingWorkflowQueueing::Done
}
};
remove_queued_holding_workflow(
queuing,
pending.clone(),
c.clone(),
).await
};
let future = closure();
context.spawn_task(future);
} else {
break;
}
}
std::thread::sleep(Duration::from_millis(50));
}
})
.expect("Could not spawn holding thread");
}
pub(crate) fn emit_signals(&mut self, context: &Context, action_wrapper: &ActionWrapper) {
if let Some(tx) = context.signal_tx() {
let trace_signal = Signal::Trace(action_wrapper.clone());
tx.send(trace_signal).unwrap_or_else(|e| {
log_warn!(
context,
"reduce: Signal channel is closed! No signals can be sent ({:?}).",
e
);
});
if let Some(signal) = self
.consistency_model
.process_action(action_wrapper.action())
{
tx.send(Signal::Consistency(signal.into()))
.unwrap_or_else(|e| {
log_warn!(
context,
"reduce: Signal channel is closed! No signals can be sent ({:?}).",
e
);
});
}
}
}
pub fn new(context: Arc<Context>) -> Self {
Instance {
state: Arc::new(RwLock::new(StateWrapper::new(context.clone()))),
action_channel: None,
observer_channel: None,
waker_channel: None,
scheduler_handle: None,
persister: None,
consistency_model: ConsistencyModel::new(context),
kill_switch: None,
kill_switch_holding: None,
}
}
pub fn from_state(state: State, context: Arc<Context>) -> Self {
Instance {
state: Arc::new(RwLock::new(StateWrapper::from(state))),
action_channel: None,
observer_channel: None,
waker_channel: None,
scheduler_handle: None,
persister: None,
consistency_model: ConsistencyModel::new(context),
kill_switch: None,
kill_switch_holding: None,
}
}
pub fn state(&self) -> StateWrapper {
self.state
.read()
.expect("owners of the state RwLock shouldn't panic")
.clone()
}
pub fn save(&self, state: &StateWrapper) -> HcResult<()> {
self.persister
.as_ref()
.ok_or_else(|| HolochainError::new("Instance::save() called without persister set."))?
.try_write()
.ok_or_else(|| HolochainError::new("Could not get lock on persister"))?
.save(&state)
}
#[allow(clippy::needless_lifetimes)]
#[no_autotrace]
pub async fn shutdown_network(&self) -> HcResult<()> {
network::actions::shutdown::shutdown(
self.state.clone(),
self.action_channel.as_ref().unwrap(),
)
.await
}
}
impl Drop for Instance {
fn drop(&mut self) {
let _ = self.shutdown_network();
self.stop_action_loop();
self.state.write().unwrap().drop_inner_state();
}
}
pub fn dispatch_action(action_channel: &ActionSender, action_wrapper: ActionWrapper) {
lax_send_wrapped(action_channel.clone(), action_wrapper, "dispatch_action");
}
#[cfg(test)]
pub mod tests {
use self::tempfile::tempdir;
use super::*;
use crate::{
action::{tests::test_action_wrapper_commit, Action, ActionWrapper},
agent::{
chain_store::ChainStore,
state::{AgentActionResponse, AgentState},
},
context::{test_memory_network_config, Context},
logger::{test_logger, TestLogger},
};
use holochain_core_types::{
agent::AgentId,
chain_header::test_chain_header,
dna::{zome::Zome, Dna},
entry::{entry_type::EntryType, test_entry},
};
use holochain_locksmith::{Mutex, RwLock};
use holochain_persistence_api::cas::content::AddressableContent;
use holochain_persistence_file::{cas::file::FilesystemStorage, eav::file::EavFileStorage};
use tempfile;
use test_utils;
use crate::persister::SimplePersister;
use std::{sync::Arc, thread::sleep, time::Duration};
use test_utils::mock_signing::registered_test_agent;
use crate::nucleus::state::NucleusStatus;
use holochain_core_types::entry::Entry;
use holochain_json_api::json::JsonString;
use holochain_persistence_lmdb::{cas::lmdb::LmdbStorage, eav::lmdb::EavLmdbStorage};
use holochain_persistence_mem::{cas::memory::MemoryStorage, eav::memory::EavMemoryStorage};
#[cfg_attr(tarpaulin, skip)]
pub fn test_context_and_logger(
agent_name: &str,
network_name: Option<&str>,
) -> (Arc<Context>, Arc<Mutex<TestLogger>>) {
test_context_and_logger_with_in_memory_network(agent_name, network_name)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_context_and_logger_with_in_memory_network(
agent_name: &str,
network_name: Option<&str>,
) -> (Arc<Context>, Arc<Mutex<TestLogger>>) {
let agent = registered_test_agent(agent_name);
let content_storage = Arc::new(RwLock::new(MemoryStorage::new()));
let meta_storage = Arc::new(RwLock::new(EavMemoryStorage::new()));
let logger = test_logger();
(
Arc::new(Context::new(
"Test-context-and-logger-instance",
agent,
Arc::new(RwLock::new(SimplePersister::new(content_storage.clone()))),
content_storage.clone(),
content_storage.clone(),
meta_storage,
test_memory_network_config(network_name),
None,
None,
false,
holochain_metrics::config::MetricPublisherConfig::default()
.create_metric_publisher(),
Arc::new(ht::null_tracer()),
)),
logger,
)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_context(agent_name: &str, network_name: Option<&str>) -> Arc<Context> {
let (context, _) = test_context_and_logger(agent_name, network_name);
context
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_context_with_memory_network(
agent_name: &str,
network_name: Option<&str>,
) -> Arc<Context> {
let (context, _) = test_context_and_logger_with_in_memory_network(agent_name, network_name);
context
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_context_with_channels(
agent_name: &str,
action_channel: &ActionSender,
observer_channel: &Sender<Observer>,
network_name: Option<&str>,
) -> Arc<Context> {
let agent = AgentId::generate_fake(agent_name);
let file_storage = Arc::new(RwLock::new(
FilesystemStorage::new(tempdir().unwrap().path().to_str().unwrap()).unwrap(),
));
Arc::new(
Context::new_with_channels(
"Test-context-with-channels-instance",
agent,
Arc::new(RwLock::new(SimplePersister::new(file_storage.clone()))),
Some(action_channel.clone()),
None,
Some(observer_channel.clone()),
file_storage.clone(),
Arc::new(RwLock::new(
EavFileStorage::new(tempdir().unwrap().path().to_str().unwrap().to_string())
.unwrap(),
)),
test_memory_network_config(network_name),
false,
Arc::new(RwLock::new(
holochain_metrics::DefaultMetricPublisher::default(),
)),
Arc::new(ht::null_tracer()),
)
.unwrap(),
)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_context_with_state(network_name: Option<&str>) -> Arc<Context> {
let file_storage = Arc::new(RwLock::new(
FilesystemStorage::new(tempdir().unwrap().path().to_str().unwrap()).unwrap(),
));
let mut context = Context::new(
"test-context-with-state-instance",
registered_test_agent("Florence"),
Arc::new(RwLock::new(SimplePersister::new(file_storage.clone()))),
file_storage.clone(),
file_storage.clone(),
Arc::new(RwLock::new(
EavFileStorage::new(tempdir().unwrap().path().to_str().unwrap().to_string())
.unwrap(),
)),
test_memory_network_config(network_name),
None,
None,
false,
Arc::new(RwLock::new(
holochain_metrics::DefaultMetricPublisher::default(),
)),
Arc::new(ht::null_tracer()),
);
let global_state = Arc::new(RwLock::new(StateWrapper::new(Arc::new(context.clone()))));
context.set_state(global_state.clone());
Arc::new(context)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_context_with_agent_state(network_name: Option<&str>) -> Arc<Context> {
let file_system =
FilesystemStorage::new(tempdir().unwrap().path().to_str().unwrap()).unwrap();
let cas = Arc::new(RwLock::new(file_system.clone()));
let mut context = Context::new(
"test-context-with-agent-state-instance",
registered_test_agent("Florence"),
Arc::new(RwLock::new(SimplePersister::new(cas.clone()))),
cas.clone(),
cas.clone(),
Arc::new(RwLock::new(
EavFileStorage::new(tempdir().unwrap().path().to_str().unwrap().to_string())
.unwrap(),
)),
test_memory_network_config(network_name),
None,
None,
false,
Arc::new(RwLock::new(
holochain_metrics::DefaultMetricPublisher::default(),
)),
Arc::new(ht::null_tracer()),
);
let chain_store = ChainStore::new(cas.clone());
let chain_header = test_chain_header();
let agent_state = AgentState::new_with_top_chain_header(
chain_store,
Some(chain_header),
context.agent_id.address(),
);
let state = StateWrapper::new_with_agent(Arc::new(context.clone()), agent_state);
let global_state = Arc::new(RwLock::new(state));
context.set_state(global_state.clone());
Arc::new(context)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_instance(dna: Dna, network_name: Option<&str>) -> Result<Instance, String> {
test_instance_and_context(dna, network_name).map(|tuple| tuple.0)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_instance_and_context(
dna: Dna,
network_name: Option<&str>,
) -> Result<(Instance, Arc<Context>), String> {
test_instance_and_context_by_name(dna, "jane", network_name)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_instance_and_context_by_name(
dna: Dna,
name: &str,
network_name: Option<&str>,
) -> Result<(Instance, Arc<Context>), String> {
test_instance_and_context_with_memory_network_nodes(dna, name, network_name)
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_instance_and_context_with_memory_network_nodes(
dna: Dna,
name: &str,
network_name: Option<&str>,
) -> Result<(Instance, Arc<Context>), String> {
let context = test_context_with_memory_network(name, network_name);
let mut instance = Instance::new(context.clone());
let context = instance.initialize(Some(dna.clone()), context.clone())?;
assert_eq!(instance.state().nucleus().dna(), Some(dna.clone()));
assert!(instance.state().nucleus().has_initialized());
assert!(
!dna.zomes.clone().is_empty(),
"Empty zomes = No init = infinite loops below!"
);
loop {
if let NucleusStatus::Initialized(_) = instance.state().nucleus().status {
break;
} else {
println!("Waiting for initialized status");
sleep(Duration::from_millis(10))
}
}
assert!(instance
.state()
.agent()
.iter_chain()
.any(|header| *header.entry_type() == EntryType::Dna));
assert!(instance
.state()
.agent()
.iter_chain()
.any(|header| *header.entry_type() == EntryType::AgentId));
Ok((instance, context))
}
#[cfg_attr(tarpaulin, skip)]
pub fn test_instance_blank() -> Instance {
let mut dna = Dna::new();
dna.zomes.insert("".to_string(), Zome::empty());
dna.uuid = "2297b5bc-ef75-4702-8e15-66e0545f3482".into();
test_instance(dna, None).expect("Blank instance could not be initialized!")
}
#[test]
pub fn can_process_action() {
let netname = Some("can_process_action");
let mut instance = Instance::new(test_context("jason", netname));
let context = instance.initialize_context(test_context("jane", netname));
let (rx_action, rx_observer, _) = instance.initialize_channels();
let action_wrapper = test_action_wrapper_commit();
instance
.process_action(&action_wrapper, &context)
.expect("process_action should run without error");
let rx_action_is_empty = match rx_action.try_recv() {
Err(crossbeam_channel::TryRecvError::Empty) => true,
_ => false,
};
assert!(rx_action_is_empty);
let rx_observer_is_empty = match rx_observer.try_recv() {
Err(crossbeam_channel::TryRecvError::Empty) => true,
_ => false,
};
assert!(rx_observer_is_empty);
let state = instance.state();
let actions = state.agent().actions();
let response = actions
.get(&action_wrapper)
.expect("action and reponse should be added after Get action dispatch");
assert_eq!(
response.response(),
&AgentActionResponse::Commit(Ok(test_entry().address()))
);
}
#[test]
fn test_missing_init() {
let dna = test_utils::create_test_dna_with_wat("test_zome", None);
let instance = test_instance(dna, None);
assert!(instance.is_ok());
let instance = instance.unwrap();
assert!(instance.state().nucleus().has_initialized());
}
#[test]
fn test_init_ok() {
let dna = test_utils::create_test_dna_with_wat(
"test_zome",
Some(
r#"
(module
(memory (;0;) 1)
(func (export "init") (param $p0 i64) (result i64)
i64.const 0
)
(data (i32.const 0)
""
)
(export "memory" (memory 0))
)
"#,
),
);
let maybe_instance = test_instance(dna, Some("test_init_ok"));
assert!(maybe_instance.is_ok());
let instance = maybe_instance.unwrap();
assert!(instance.state().nucleus().has_initialized());
}
#[test]
fn test_init_err() {
let dna = test_utils::create_test_dna_with_wat(
"test_zome",
Some(
r#"
(module
(memory (;0;) 1)
(func (export "init") (param $p0 i64) (result i64)
i64.const 9
)
(data (i32.const 0)
"1337.0"
)
(export "memory" (memory 0))
)
"#,
),
);
let instance = test_instance(dna, None);
assert!(instance.is_err());
assert_eq!(
instance.err().unwrap(),
String::from(
"At least one zome init returned error: [(\"test_zome\", \"\\\"Init\\\"\")]"
)
);
}
#[test]
fn can_commit_dna() {
let netname = Some("can_commit_dna");
let context = test_context("alex", netname);
let dna = test_utils::create_test_dna_with_wat("test_zome", None);
let dna_entry = Entry::Dna(Box::new(dna));
let commit_action = ht::test_wrap(ActionWrapper::new(Action::Commit((
dna_entry.clone(),
None,
vec![],
))));
let instance = Instance::new(test_context("jason", netname));
let context = instance.initialize_context(context);
instance
.process_action(&commit_action, &context)
.expect("process_action should run without error");
assert!(instance
.state()
.agent()
.iter_chain()
.any(|header| *header.entry_address() == dna_entry.address()))
}
#[test]
fn can_commit_agent() {
let netname = Some("can_commit_agent");
let context = test_context("alex", netname);
let agent_entry = Entry::AgentId(context.agent_id.clone());
let commit_agent_action = ht::test_wrap(ActionWrapper::new(Action::Commit((
agent_entry.clone(),
None,
vec![],
))));
let instance = Instance::new(context.clone());
let context = instance.initialize_context(context);
instance
.process_action(&commit_agent_action, &context)
.expect("process_action should run without error");
assert!(instance
.state()
.agent()
.iter_chain()
.any(|header| *header.entry_address() == agent_entry.address()))
}
fn test_context_lmdb(
agent_name: &str,
network_name: Option<&str>,
cas_initial_mmap: Option<usize>,
) -> (Arc<Context>, Arc<Mutex<TestLogger>>) {
let agent = registered_test_agent(agent_name);
let cas_dir = tempdir().expect("Could not create a tempdir for CAS testing");
let eav_dir = tempdir().expect("Could not create a tempdir for CAS testing");
let content_storage = Arc::new(RwLock::new(LmdbStorage::new(
cas_dir.path(),
cas_initial_mmap,
)));
let meta_storage = Arc::new(RwLock::new(EavLmdbStorage::new(eav_dir.path(), None)));
let logger = test_logger();
(
Arc::new(Context::new(
"Test-context-lmdb",
agent,
Arc::new(RwLock::new(SimplePersister::new(content_storage.clone()))),
content_storage.clone(),
content_storage.clone(),
meta_storage,
test_memory_network_config(network_name),
None,
None,
false,
Arc::new(RwLock::new(
holochain_metrics::DefaultMetricPublisher::default(),
)),
Arc::new(ht::null_tracer()),
)),
logger,
)
}
#[test]
fn lmdb_large_entry_test() {
let megabytes = 1024 * 1024;
let initial_mmap_size = 1 * megabytes;
let (context, _) =
test_context_lmdb("alice", Some("lmdb_stress_test"), Some(initial_mmap_size));
let instance = Instance::new(context.clone());
let context = instance.initialize_context(context);
fn test_entry(i: u32, reps: usize) -> Entry {
let data: String = std::iter::repeat(format!("{}", i)).take(reps).collect();
Entry::App("test-entry".into(), JsonString::from_json(&data))
}
let entry = test_entry(0, 3 * initial_mmap_size);
let commit_agent_action = ht::test_wrap(ActionWrapper::new(Action::Commit((
entry.clone(),
None,
vec![],
))));
instance
.process_action(&commit_agent_action, &context)
.unwrap();
let dht = context.dht_storage.read().unwrap();
assert!(dht.contains(&entry.address()).unwrap());
}
}