use std::time::Duration;
use bytes::Bytes;
use net_sdk::capabilities::CapabilityFilter;
use net_sdk::compute::{
CausalEvent, DaemonHostConfig, DaemonRuntime, MeshDaemon as ComputeMeshDaemon,
};
use net_sdk::identity::Identity;
use net_sdk::meshos::DaemonError as TraitDaemonError;
use net_sdk::meshos::NodeId;
use net_sdk::testing::ClusterHarness;
use net_sdk::ComputeDaemonError;
const MIGRATABLE_KIND: &str = "demo.migratable";
const MIGRATION_CYCLE_INTERVAL: Duration = Duration::from_secs(30);
const MIGRATION_COMPLETION_WAIT: Duration = Duration::from_secs(8);
struct MigratableDaemon;
impl ComputeMeshDaemon for MigratableDaemon {
fn name(&self) -> &str {
"demo_migratable"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, TraitDaemonError> {
Ok(vec![])
}
fn is_stateful(&self) -> bool {
true
}
fn snapshot(&self) -> Option<Bytes> {
Some(Bytes::new())
}
fn restore(&mut self, _state: Bytes) -> Result<(), TraitDaemonError> {
Ok(())
}
}
pub fn install_factories(harness: &ClusterHarness) -> Result<(), color_eyre::Report> {
for (i, node) in harness.nodes().iter().enumerate() {
let rt = node
.daemon_runtime()
.ok_or_else(|| color_eyre::eyre::eyre!("node[{i}] daemon_runtime missing"))?;
rt.register_factory(MIGRATABLE_KIND, || Box::new(MigratableDaemon))
.map_err(|e| color_eyre::eyre::eyre!("register_factory on node[{i}]: {e:?}"))?;
}
Ok(())
}
pub fn spawn_loop(harness: &ClusterHarness) -> tokio::task::JoinHandle<()> {
let runtimes: Vec<DaemonRuntime> = harness
.nodes()
.iter()
.filter_map(|n| n.daemon_runtime().cloned())
.collect();
let node_ids: Vec<NodeId> = harness.nodes().iter().map(|n| n.node_id()).collect();
let total = node_ids.len();
tokio::spawn(async move {
run_loop(runtimes, node_ids, total).await;
})
}
async fn run_loop(runtimes: Vec<DaemonRuntime>, node_ids: Vec<NodeId>, total: usize) {
if total < 2 {
return;
}
let source_rt = runtimes[0].clone();
let source_node_id = node_ids[0];
let mut cycle = 0u64;
loop {
let target_idx = 1 + (cycle as usize % (total - 1));
let target_node_id = node_ids[target_idx];
let identity = Identity::generate();
let origin_hash = identity.keypair().origin_hash();
match source_rt
.spawn(MIGRATABLE_KIND, identity, DaemonHostConfig::default())
.await
{
Ok(_handle) => {
let migrate_result = source_rt
.start_migration(origin_hash, source_node_id, target_node_id)
.await;
match migrate_result {
Ok(_h) => {
tokio::time::sleep(MIGRATION_COMPLETION_WAIT).await;
}
Err(e @ ComputeDaemonError::Migration(_))
| Err(e @ ComputeDaemonError::MigrationFailed(_)) => {
eprintln!(
"[deck demo] migration cycle {cycle} \
node[0]->node[{target_idx}] failed: {e}"
);
}
Err(e) => {
eprintln!(
"[deck demo] migration cycle {cycle} unexpected error: \
{e:?}"
);
}
}
}
Err(e) => {
eprintln!("[deck demo] migration cycle {cycle} spawn failed: {e:?}");
}
}
tokio::time::sleep(MIGRATION_CYCLE_INTERVAL).await;
cycle = cycle.wrapping_add(1);
}
}