use super::*;
use crate::core::workflow::publish_dht_ops_workflow::publish_dht_ops_workflow;
use ::fixt::*;
use holo_hash::fixt::ActionHashFixturator;
use holo_hash::fixt::AgentPubKeyFixturator;
use holo_hash::fixt::EntryHashFixturator;
use holochain_conductor_api::conductor::ConductorTuningParams;
use holochain_state::mutations;
use holochain_state::prelude::StateMutationResult;
#[tokio::test]
async fn test_trigger_receiver_waits_for_sender() {
let (_tx, mut rx) = TriggerSender::new();
let jh = tokio::spawn(async move { rx.listen().await.unwrap() });
let r = tokio::time::timeout(Duration::from_millis(10), jh).await;
assert!(r.is_err());
}
#[tokio::test]
async fn test_trigger_send() {
let (tx, mut rx) = TriggerSender::new();
let jh = tokio::spawn(async move { rx.listen().await.unwrap() });
tx.trigger(&"");
let r = jh.await;
assert!(r.is_ok());
}
#[tokio::test]
async fn test_trigger_only_permits_single_trigger() {
holochain_trace::test_run();
let (tx, mut rx) = TriggerSender::new();
let jh = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
rx.listen().await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
rx.listen().await.unwrap()
});
tx.trigger(&"");
tx.trigger(&"");
let r = tokio::time::timeout(Duration::from_millis(100), jh).await;
assert!(r.is_err());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_trigger_back_off() {
let (tx, mut rx) =
TriggerSender::new_with_loop(Duration::from_secs(60)..Duration::from_secs(60 * 5), false);
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60 * 2)
&& timer.elapsed() < Duration::from_secs(60 * 2 + 1)
);
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60 * 4)
&& timer.elapsed() < Duration::from_secs(60 * 4 + 1)
);
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60 * 5)
&& timer.elapsed() < Duration::from_secs(60 * 5 + 1)
);
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60 * 5)
&& timer.elapsed() < Duration::from_secs(60 * 5 + 1)
);
tx.reset_back_off();
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_trigger_loop() {
let (_tx, mut rx) =
TriggerSender::new_with_loop(Duration::from_secs(60)..Duration::from_secs(60), false);
for _ in 0..100 {
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_reset_on_trigger() {
let (tx, mut rx) =
TriggerSender::new_with_loop(Duration::from_secs(60)..Duration::from_secs(60 * 5), true);
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60 * 2)
&& timer.elapsed() < Duration::from_secs(60 * 2 + 1)
);
tx.trigger(&"");
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(timer.elapsed() < Duration::from_secs(1));
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_pause_resume() {
let (tx, mut rx) =
TriggerSender::new_with_loop(Duration::from_secs(60)..Duration::from_secs(60), false);
for _ in 0..10 {
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
}
tx.pause_loop();
let r = tokio::time::timeout(Duration::from_secs(60 * 60), rx.listen()).await;
assert!(r.is_err());
tx.resume_loop();
let timer = tokio::time::Instant::now();
rx.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
}
#[tokio::test]
#[ignore = "flaky due to dependence on timing"]
async fn test_concurrency() {
let (tx, mut rx) =
TriggerSender::new_with_loop(Duration::from_millis(60)..Duration::from_millis(60), false);
let timer = tokio::time::Instant::now();
let jh = tokio::spawn(async move { rx.listen().await.unwrap() });
tokio::time::sleep(Duration::from_millis(10)).await;
tx.trigger(&"");
jh.await.unwrap();
assert!(timer.elapsed() < Duration::from_millis(20));
let (tx, mut rx) =
TriggerSender::new_with_loop(Duration::from_millis(60)..Duration::from_millis(60), false);
let timer = tokio::time::Instant::now();
let jh = tokio::spawn(async move { rx.listen().await.unwrap() });
tokio::time::sleep(Duration::from_millis(10)).await;
tx.resume_loop_now();
jh.await.unwrap();
assert!(timer.elapsed() >= Duration::from_millis(60));
let (tx, mut rx) =
TriggerSender::new_with_loop(Duration::from_millis(60)..Duration::from_millis(60), false);
tx.pause_loop();
let timer = tokio::time::Instant::now();
let jh = tokio::spawn(async move { rx.listen().await.unwrap() });
tokio::time::sleep(Duration::from_millis(10)).await;
tx.resume_loop_now();
jh.await.unwrap();
assert!(timer.elapsed() < Duration::from_millis(20));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn publish_loop() {
let kind = DbKindAuthored(Arc::new(fixt!(CellId)));
let tmpdir = tempfile::Builder::new()
.prefix("holochain-test-environments")
.tempdir()
.unwrap();
let db = DbWrite::test(tmpdir.path(), kind).expect("Couldn't create test database");
let action = Action::Create(Create {
author: fixt!(AgentPubKey),
timestamp: Timestamp::now(),
action_seq: 5,
prev_action: fixt!(ActionHash),
entry_type: EntryType::App(AppEntryDef::new(
0.into(),
0.into(),
EntryVisibility::Public,
)),
entry_hash: fixt!(EntryHash),
weight: EntryRateWeight::default(),
});
let author = action.author().clone();
let signature = Signature(vec![3; SIGNATURE_BYTES].try_into().unwrap());
let op = ChainOp::RegisterAgentActivity(signature, action);
let op = DhtOpHashed::from_content_sync(op);
let op_hash = op.to_hash();
db.write_async({
let op = op.clone();
move |txn| -> StateMutationResult<()> {
mutations::insert_op_authored(txn, &op)?;
mutations::set_when_integrated(txn, &op.to_hash(), Timestamp::now())?;
Ok(())
}
})
.await
.unwrap();
let mut dna_network = MockHolochainP2pDnaT::new();
let (tx, mut op_published) = tokio::sync::mpsc::channel(100);
dna_network
.expect_publish()
.returning(move |_, _, _, _, _| {
tx.try_send(()).unwrap();
Ok(())
});
let dna_network = Arc::new(dna_network);
let (ts, mut trigger_recv) =
TriggerSender::new_with_loop(Duration::from_secs(60)..Duration::from_secs(60 * 5), true);
let timer = tokio::time::Instant::now();
trigger_recv.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
publish_dht_ops_workflow(
db.clone(),
dna_network.clone(),
ts.clone(),
author.clone(),
ConductorTuningParams::default().min_publish_interval(),
)
.await
.unwrap();
op_published.recv().await.unwrap();
let timer = tokio::time::Instant::now();
trigger_recv.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60 * 2)
&& timer.elapsed() < Duration::from_secs(60 * 2 + 1)
);
publish_dht_ops_workflow(
db.clone(),
dna_network.clone(),
ts.clone(),
author.clone(),
ConductorTuningParams::default().min_publish_interval(),
)
.await
.unwrap();
assert_eq!(
op_published.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
);
ts.trigger(&"");
let timer = tokio::time::Instant::now();
trigger_recv.listen().await.unwrap();
assert!(timer.elapsed() < Duration::from_secs(1));
publish_dht_ops_workflow(
db.clone(),
dna_network.clone(),
ts.clone(),
author.clone(),
ConductorTuningParams::default().min_publish_interval(),
)
.await
.unwrap();
assert_eq!(
op_published.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
);
let five_mins_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|epoch| {
epoch.checked_sub(ConductorTuningParams::default().min_publish_interval())
})
.unwrap();
db.write_async({
let query_op_hash = op_hash.clone();
move |txn| -> StateMutationResult<()> {
mutations::set_last_publish_time(txn, &query_op_hash, five_mins_ago)
}
})
.await
.unwrap();
let timer = tokio::time::Instant::now();
trigger_recv.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
publish_dht_ops_workflow(
db.clone(),
dna_network.clone(),
ts.clone(),
author.clone(),
ConductorTuningParams::default().min_publish_interval(),
)
.await
.unwrap();
op_published.recv().await.unwrap();
db.write_async({
let query_op_hash = op_hash.clone();
move |txn| -> StateMutationResult<()> {
mutations::set_receipts_complete(txn, &query_op_hash, true)
}
})
.await
.unwrap();
let timer = tokio::time::Instant::now();
trigger_recv.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60 * 2)
&& timer.elapsed() < Duration::from_secs(60 * 2 + 1)
);
publish_dht_ops_workflow(
db.clone(),
dna_network.clone(),
ts.clone(),
author.clone(),
ConductorTuningParams::default().min_publish_interval(),
)
.await
.unwrap();
assert_eq!(
op_published.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
);
let r = tokio::time::timeout(Duration::from_secs(60 * 100), trigger_recv.listen()).await;
assert!(r.is_err());
assert_eq!(
op_published.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
);
db.write_async({
let query_op_hash = op_hash.clone();
move |txn| -> StateMutationResult<()> {
mutations::set_last_publish_time(txn, &query_op_hash, five_mins_ago)?;
mutations::set_receipts_complete(txn, &query_op_hash, false)?;
Ok(())
}
})
.await
.unwrap();
ts.trigger(&"");
let timer = tokio::time::Instant::now();
trigger_recv.listen().await.unwrap();
assert!(timer.elapsed() < Duration::from_secs(1));
publish_dht_ops_workflow(
db.clone(),
dna_network.clone(),
ts.clone(),
author.clone(),
ConductorTuningParams::default().min_publish_interval(),
)
.await
.unwrap();
op_published.recv().await.unwrap();
let timer = tokio::time::Instant::now();
trigger_recv.listen().await.unwrap();
assert!(
timer.elapsed() >= Duration::from_secs(60) && timer.elapsed() < Duration::from_secs(61)
);
publish_dht_ops_workflow(
db.clone(),
dna_network.clone(),
ts.clone(),
author.clone(),
ConductorTuningParams::default().min_publish_interval(),
)
.await
.unwrap();
assert_eq!(
op_published.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
);
}