use std::sync::Arc;
use std::time::Duration;
use net_sdk::dataforts::MeshBlobAdapter;
use net_sdk::deck::{AdminVerifier, DeckClient, OperatorIdentity, OperatorRegistry};
use net_sdk::meshos::{
EntityKeypair, LogLevel, LogLine, MeshOsDaemonHandle, MeshOsEvent, MeshOsHandle, NodeId,
};
use net_sdk::testing::ClusterHarness;
use tokio::task::JoinHandle;
use super::cluster::{build_cluster, DEMO_NODE_COUNT};
use super::daemons::{
InferenceWorkerDaemon, NodeAgentDaemon, RolloutForgeDaemon, TrainerCanaryDaemon,
};
use super::dataforts::build_adapters;
use super::migrator;
use super::rpc_chatter;
use crate::streams::NrpcTail;
const HEARTBEAT_BASE_INTERVAL: Duration = Duration::from_millis(800);
pub struct Harness {
#[allow(dead_code)]
cluster: Option<ClusterHarness>,
_heartbeat_handles: Vec<MeshOsDaemonHandle>,
_group_handles: Vec<MeshOsDaemonHandle>,
_heartbeat_tasks: Vec<JoinHandle<()>>,
_migration_task: JoinHandle<()>,
_rpc_responder_handles: Vec<net_sdk::mesh_rpc::ServeHandle>,
_rpc_requester_tasks: Vec<JoinHandle<()>>,
deck: Arc<DeckClient>,
blob_adapters: Vec<Arc<MeshBlobAdapter>>,
this_node: NodeId,
}
impl Harness {
pub fn deck(&self) -> Arc<DeckClient> {
Arc::clone(&self.deck)
}
pub fn blob_adapters(&self) -> Vec<Arc<MeshBlobAdapter>> {
self.blob_adapters.clone()
}
pub fn this_node(&self) -> NodeId {
self.this_node
}
pub async fn into_shutdown(mut self) -> color_eyre::Result<()> {
if let Some(cluster) = self.cluster.take() {
cluster
.shutdown()
.await
.map_err(|e| color_eyre::eyre::eyre!("cluster shutdown: {e}"))?;
}
Ok(())
}
}
pub async fn spawn(nrpc_tail: NrpcTail) -> color_eyre::Result<Harness> {
eprintln!(
"[deck demo] booting {} - node cluster on 127.0.0.1:<ephemeral>",
DEMO_NODE_COUNT
);
let operator_keypair = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&operator_keypair);
let verifier = Arc::new(AdminVerifier::new(Arc::new(registry), 1));
let cluster = build_cluster(Arc::clone(&verifier))
.await
.map_err(|e| color_eyre::eyre::eyre!("cluster boot: {e}"))?;
eprintln!(
"[deck demo] cluster up — {} nodes peered + snapshot folds converged",
cluster.len()
);
let mut heartbeat_handles: Vec<MeshOsDaemonHandle> = Vec::with_capacity(cluster.len());
let mut heartbeat_tasks: Vec<JoinHandle<()>> = Vec::with_capacity(cluster.len());
for (i, node) in cluster.nodes().iter().enumerate() {
let sdk = node
.sdk()
.ok_or_else(|| color_eyre::eyre::eyre!("node[{i}] sdk missing"))?;
let kp = EntityKeypair::generate();
let daemon_id = kp.origin_hash();
let handle = sdk
.register_daemon(Box::new(NodeAgentDaemon), kp)
.map_err(|e| color_eyre::eyre::eyre!("register NodeAgentDaemon on node[{i}]: {e}"))?;
heartbeat_handles.push(handle);
let node_index = i;
let node_id = node.node_id();
let mesh_os_handle = sdk.runtime().handle_clone();
let task = tokio::spawn(async move {
run_heartbeat_loop(node_index, node_id, daemon_id, mesh_os_handle).await;
});
heartbeat_tasks.push(task);
}
let node0 = cluster.nth(0);
let sdk0 = node0
.sdk()
.ok_or_else(|| color_eyre::eyre::eyre!("node[0] sdk missing"))?;
let mut group_handles: Vec<MeshOsDaemonHandle> = Vec::with_capacity(3 * 3);
for replica_idx in 0..3 {
let kp = EntityKeypair::generate();
let h = sdk0
.register_daemon(Box::new(InferenceWorkerDaemon), kp)
.map_err(|e| {
color_eyre::eyre::eyre!("register InferenceWorkerDaemon[{replica_idx}]: {e}")
})?;
group_handles.push(h);
}
for fork_idx in 0..3 {
let kp = EntityKeypair::generate();
let h = sdk0
.register_daemon(Box::new(RolloutForgeDaemon), kp)
.map_err(|e| color_eyre::eyre::eyre!("register RolloutForgeDaemon[{fork_idx}]: {e}"))?;
group_handles.push(h);
}
for standby_idx in 0..3 {
let kp = EntityKeypair::generate();
let h = sdk0
.register_daemon(Box::new(TrainerCanaryDaemon), kp)
.map_err(|e| {
color_eyre::eyre::eyre!("register TrainerCanaryDaemon[{standby_idx}]: {e}")
})?;
group_handles.push(h);
}
let identity = OperatorIdentity::from_keypair(operator_keypair);
let deck = Arc::new(DeckClient::from_runtime(sdk0.runtime(), identity));
let this_node = node0.node_id();
let blob_adapters = build_adapters(DEMO_NODE_COUNT).await;
migrator::install_factories(&cluster)?;
let migration_task = migrator::spawn_loop(&cluster);
rpc_chatter::install_observers(&cluster, nrpc_tail);
let rpc_responder_handles = rpc_chatter::install_responders(&cluster)?;
let rpc_requester_tasks = rpc_chatter::spawn_requester_loops(&cluster);
Ok(Harness {
cluster: Some(cluster),
_heartbeat_handles: heartbeat_handles,
_group_handles: group_handles,
_heartbeat_tasks: heartbeat_tasks,
_migration_task: migration_task,
_rpc_responder_handles: rpc_responder_handles,
_rpc_requester_tasks: rpc_requester_tasks,
deck,
blob_adapters,
this_node,
})
}
async fn run_heartbeat_loop(
node_index: usize,
node_id: NodeId,
daemon_id: u64,
handle: MeshOsHandle,
) {
let messages = inference_corpus();
let mut tick = 0u64;
loop {
let jitter_ms = ((tick.wrapping_mul(11) ^ node_id) % 300) as i64 - 150;
let interval = if jitter_ms >= 0 {
HEARTBEAT_BASE_INTERVAL + Duration::from_millis(jitter_ms as u64)
} else {
HEARTBEAT_BASE_INTERVAL.saturating_sub(Duration::from_millis(-jitter_ms as u64))
};
tokio::time::sleep(interval).await;
let template = messages[(tick as usize + node_index) % messages.len()];
let n1 = (tick.wrapping_mul(37) ^ node_id) % 9_999;
let n2 = ((tick.wrapping_mul(53) ^ (node_id >> 8)) % 480) + 20;
let message = format!("gpu-{node_index} :: {}", fill_template(template, n1, n2),);
let line = LogLine {
level: LogLevel::Info,
daemon_id: Some(daemon_id),
message,
};
if handle.publish(MeshOsEvent::LogLine(line)).await.is_err() {
break;
}
tick = tick.wrapping_add(1);
}
}
fn inference_corpus() -> &'static [&'static str] {
&[
"dispatched batch {} to gpu-0 in {}ms",
"prefill batch {} completed — tokens/s {}",
"cache hit rate {}% on prompt-id {}",
"decode step {} :: ttft {}ms",
"embedding shard {} flushed to redex ({}KB)",
"rollout cohort {} — A/B split p99 delta {}ms",
"tokenizer queue depth {} → 4 :: backpressure clear",
"kv cache eviction batch {} freed {}MB",
"scheduler tick — queue depth {}",
"inference completed for trace {} in {}ms",
"weights sync verified :: epoch {}",
"trainer canary sync_through advanced to {}",
"speculative draft accepted ratio {}% :: batch {}",
"telemetry flush {} records",
]
}
fn fill_template(tpl: &str, a: u64, b: u64) -> String {
let mut out = String::with_capacity(tpl.len() + 12);
let mut iter = tpl.split("{}");
let first = iter.next().unwrap_or("");
out.push_str(first);
if let Some(rest) = iter.next() {
out.push_str(&a.to_string());
out.push_str(rest);
}
if let Some(rest) = iter.next() {
out.push_str(&b.to_string());
out.push_str(rest);
}
debug_assert!(
iter.next().is_none(),
"fill_template: corpus template has > 2 `{{}}` placeholders: {tpl:?}"
);
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn demo_boots_and_logs_appear() {
let nrpc_tail = NrpcTail::new(1024);
let harness = spawn(nrpc_tail.clone())
.await
.expect("demo spawn must succeed");
tokio::time::sleep(Duration::from_secs(3)).await;
let snap = harness.deck.status();
assert_eq!(
snap.peers.len(),
8,
"node[0] snapshot must see 8 remote peers"
);
assert_eq!(
snap.daemons.len(),
10,
"node[0] should show 1 heartbeat + 9 group daemons; got {}",
snap.daemons.len()
);
let count_with_name =
|name: &str| -> usize { snap.daemons.values().filter(|d| d.name == name).count() };
assert_eq!(count_with_name("node_agent"), 1);
assert_eq!(count_with_name("inference_worker#replica"), 3);
assert_eq!(count_with_name("rollout_forge#fork@7"), 3);
assert_eq!(count_with_name("trainer_canary#standby"), 3);
assert!(
snap.log_ring.len() >= 2,
"log_ring should carry heartbeat lines (got {})",
snap.log_ring.len()
);
assert_eq!(
harness.blob_adapters.len(),
9,
"demo should wire 9 blob adapters (one per node)"
);
let nrpc_records = nrpc_tail.snapshot();
assert!(
!nrpc_records.is_empty(),
"Phase 4: NrpcTail should carry observer-recorded calls within 3 s"
);
harness.into_shutdown().await.expect("demo shutdown clean");
}
}