use anyhow::{Context, Result};
use kwaai_hivemind_dht::{
codec::DHTRequest,
protocol::{NodeInfo, RequestAuthInfo, StoreRequest},
value::get_dht_time,
DHTStorage,
};
use kwaai_p2p::NetworkConfig;
use kwaai_p2p_daemon::{stream, P2PDaemon};
use libp2p::PeerId;
use sha1::{Digest, Sha1};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{io::AsyncWriteExt, net::TcpListener, signal, sync::RwLock};
use tracing::{info, warn};
use crate::config::KwaaiNetConfig;
use crate::daemon::{DaemonManager, ShardManager};
use crate::identity::NodeIdentity;
type SharedStorage = Arc<RwLock<DHTStorage>>;
struct VpkInfo {
mode: String,
endpoint: String,
capacity_gb: f64,
tenant_count: u32,
vpk_version: String,
}
impl VpkInfo {
fn to_msgpack_value(&self) -> rmpv::Value {
rmpv::Value::Map(vec![
(
rmpv::Value::from("mode"),
rmpv::Value::from(self.mode.as_str()),
),
(
rmpv::Value::from("endpoint"),
rmpv::Value::from(self.endpoint.as_str()),
),
(
rmpv::Value::from("capacity_gb"),
rmpv::Value::from(self.capacity_gb),
),
(
rmpv::Value::from("tenant_count"),
rmpv::Value::from(i64::from(self.tenant_count)),
),
(
rmpv::Value::from("vpk_version"),
rmpv::Value::from(self.vpk_version.as_str()),
),
])
}
fn to_msgpack_bytes(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &self.to_msgpack_value())?;
Ok(buf)
}
}
struct DHTServerInfo {
pub state: i32,
throughput: f64,
start_block: i32,
end_block: i32,
public_name: String,
version: String,
torch_dtype: String,
using_relay: bool,
cache_tokens_left: i64,
#[allow(dead_code)]
next_pings: HashMap<String, f64>,
#[allow(dead_code)]
adapters: Vec<String>,
trust_attestations: Vec<String>,
vpk_info: Option<VpkInfo>,
peer_id_b58: String,
}
impl DHTServerInfo {
#[allow(clippy::too_many_arguments)]
fn new(
start: i32,
end: i32,
name: &str,
relay: bool,
throughput: f64,
trust_attestations: Vec<String>,
vpk_info: Option<VpkInfo>,
peer_id_b58: String,
) -> Self {
Self {
state: 2, throughput,
start_block: start,
end_block: end,
public_name: name.to_string(),
version: concat!("kwaai-", env!("CARGO_PKG_VERSION")).to_string(),
torch_dtype: "float16".to_string(),
using_relay: relay,
cache_tokens_left: 100_000,
next_pings: HashMap::new(),
adapters: vec![],
trust_attestations,
vpk_info,
peer_id_b58,
}
}
fn to_msgpack(&self) -> Result<Vec<u8>> {
let mut fields: Vec<(rmpv::Value, rmpv::Value)> = vec![
(
rmpv::Value::from("start_block"),
rmpv::Value::from(self.start_block),
),
(
rmpv::Value::from("end_block"),
rmpv::Value::from(self.end_block),
),
(
rmpv::Value::from("public_name"),
rmpv::Value::from(self.public_name.as_str()),
),
(
rmpv::Value::from("version"),
rmpv::Value::from(self.version.as_str()),
),
(
rmpv::Value::from("torch_dtype"),
rmpv::Value::from(self.torch_dtype.as_str()),
),
(
rmpv::Value::from("using_relay"),
rmpv::Value::from(self.using_relay),
),
(
rmpv::Value::from("cache_tokens_left"),
rmpv::Value::from(self.cache_tokens_left),
),
(rmpv::Value::from("adapters"), rmpv::Value::Array(vec![])),
(rmpv::Value::from("next_pings"), rmpv::Value::Map(vec![])),
(
rmpv::Value::from("peer_id"),
rmpv::Value::from(self.peer_id_b58.as_str()),
),
];
if !self.trust_attestations.is_empty() {
let ta_values: Vec<rmpv::Value> = self
.trust_attestations
.iter()
.map(|s| rmpv::Value::String(rmpv::Utf8String::from(s.as_str())))
.collect();
fields.push((
rmpv::Value::from("trust_attestations"),
rmpv::Value::Array(ta_values),
));
}
if let Some(ref vpk) = self.vpk_info {
fields.push((rmpv::Value::from("vpk"), vpk.to_msgpack_value()));
}
let inner = rmpv::Value::Array(vec![
rmpv::Value::from(self.state),
rmpv::Value::from(self.throughput),
rmpv::Value::Map(fields),
]);
let mut inner_bytes = Vec::new();
rmpv::encode::write_value(&mut inner_bytes, &inner)?;
let ext = rmpv::Value::Ext(64, inner_bytes);
let mut out = Vec::new();
rmpv::encode::write_value(&mut out, &ext)?;
Ok(out)
}
}
struct ModelInfo {
num_blocks: i32,
repository: String,
}
impl ModelInfo {
fn to_msgpack(&self) -> Result<Vec<u8>> {
let map = vec![
(
rmpv::Value::from("repository"),
rmpv::Value::from(self.repository.as_str()),
),
(
rmpv::Value::from("num_blocks"),
rmpv::Value::from(self.num_blocks),
),
];
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &rmpv::Value::Map(map))?;
Ok(buf)
}
}
fn dht_id(raw_key: &str) -> Vec<u8> {
let packed = rmp_serde::to_vec(raw_key).expect("msgpack key");
Sha1::new().chain_update(&packed).finalize().to_vec()
}
pub async fn run_node(config: &KwaaiNetConfig) -> Result<()> {
let daemon_mgr = DaemonManager::new();
daemon_mgr
.write_pid(std::process::id())
.context("writing PID")?;
info!("KwaaiNet node starting (PID {})", std::process::id());
let node_identity = NodeIdentity::load_or_create().context("loading node identity")?;
let node_did = node_identity.did();
info!("Node DID: {}", node_did);
let trust_attestations = match kwaai_trust::CredentialStore::open_default() {
Ok(store) => {
let vcs = store.load_valid_for_subject(&node_did);
if vcs.is_empty() {
info!("Trust attestations: none (run `kwaainet identity import-vc` to add)");
} else {
info!("Trust attestations: {} valid VC(s)", vcs.len());
for vc in &vcs {
info!(
" [{}] issued by {}",
vc.kwaai_type().map(|t| t.as_str()).unwrap_or("Unknown"),
&vc.issuer_did()[..vc.issuer_did().len().min(32)]
);
}
}
vcs.iter()
.filter_map(|vc| vc.to_compact_json().ok())
.collect::<Vec<_>>()
}
Err(e) => {
warn!(
"Could not open credential store: {} — proceeding without VCs",
e
);
vec![]
}
};
let public_name = format!(
"{}/v{}",
config
.public_name
.clone()
.unwrap_or_else(|| "kwaainet-node".to_string()),
env!("CARGO_PKG_VERSION"),
);
info!(
model = %config.model,
blocks = config.blocks,
port = config.port,
name = %public_name,
"Configuring KwaaiNet node"
);
let net_cfg = NetworkConfig::with_petals_bootstrap();
let bootstrap_peers: Vec<String> = if config.initial_peers.is_empty() {
net_cfg.bootstrap_peers.clone()
} else {
config.initial_peers.clone()
};
info!("[1/5] Starting p2p daemon...");
let p2pd_path = find_p2pd_binary();
if p2pd_path.is_none() {
eprintln!(" ⚠️ p2pd not found — run `kwaainet setup --get-deps` to install it");
}
let host_addr = format!("/ip4/0.0.0.0/tcp/{}", config.port);
let announce_addr = config
.public_ip
.as_deref()
.map(|ip| format!("/ip4/{}/tcp/{}", ip, config.port));
let identity_key_path = NodeIdentity::key_file_path();
let builder = P2PDaemon::builder()
.dht(true)
.relay(!config.no_relay)
.auto_relay(true)
.auto_nat(true)
.nat_portmap(true)
.host_addrs([host_addr])
.bootstrap_peers(bootstrap_peers.clone())
.with_identity_key(&identity_key_path);
let builder = if let Some(ref addr) = announce_addr {
builder.announce_addrs([addr.as_str()])
} else {
builder
};
let builder = if let Some(ref path) = p2pd_path {
builder.with_binary_path(path)
} else {
builder
};
let builder = if let Ok(sock) = std::env::var("KWAAINET_SOCKET") {
#[cfg(unix)]
let addr = format!("/unix/{}", sock);
#[cfg(not(unix))]
let addr = sock;
builder.with_listen_addr(addr)
} else {
builder
};
let mut daemon = builder.spawn().await.context("starting p2pd")?;
let mut client = daemon.client().await.context("p2pd client")?;
let peer_id_hex = client.identify().await.context("identify peer")?;
let peer_id = PeerId::from_bytes(&hex::decode(&peer_id_hex)?).context("parse peer ID")?;
info!("Peer ID: {}", peer_id.to_base58());
info!("[2/5] Initialising DHT storage...");
let storage: SharedStorage = Arc::new(RwLock::new(DHTStorage::new(peer_id)));
info!("[3/5] Registering Hivemind RPC handlers...");
let handler_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("binding RPC handler listener")?;
let handler_addr = handler_listener.local_addr()?;
client
.register_stream_handler(
&format!("/ip4/127.0.0.1/tcp/{}", handler_addr.port()),
vec![
"DHTProtocol.rpc_ping".to_string(),
"DHTProtocol.rpc_store".to_string(),
"DHTProtocol.rpc_find".to_string(),
],
)
.await
.context("registering stream handlers")?;
info!("RPC handlers ready on {}", handler_addr);
info!("[4/5] Bootstrapping...");
dial_and_wait_for_bootstrap(&mut client, &bootstrap_peers).await?;
info!("[5/5] Announcing to DHT...");
#[cfg(unix)]
let mut sighup = {
use tokio::signal::unix::{signal, SignalKind};
signal(SignalKind::hangup()).expect("SIGHUP handler")
};
let using_relay = config.public_ip.is_none() && !config.no_relay;
let dl_bps: f64 = if crate::throughput::load(&config.model).is_some() {
info!(" Measuring network bandwidth (1 MiB probe)...");
let bps = crate::throughput::measure_download_bps().await;
if bps > 0.0 {
info!(" Network: {:.1} Mbps download", bps / 1_000_000.0);
} else {
info!(" Network: measurement failed — using compute limit only");
}
bps
} else {
0.0
};
let throughput = compute_effective_tps(&config.model, dl_bps, using_relay);
if let Some(ref entry) = crate::throughput::load(&config.model) {
info!(
" Compute: {:.1} tok/s (measured, hidden_dim={})",
entry.compute_tps, entry.hidden_size
);
info!(
" Effective: {:.1} tok/s connection={} (min({:.1}, {:.1}×{}))",
throughput,
if using_relay { "relay" } else { "direct" },
entry.compute_tps,
if dl_bps > 0.0 {
dl_bps / (entry.hidden_size as f64 * 16.0)
} else {
f64::INFINITY
},
if using_relay { "0.2" } else { "1.0" },
);
} else {
info!(
" Throughput: {:.1} tok/s (default — run `kwaainet benchmark` to measure)",
throughput
);
}
let prefix = config.effective_dht_prefix();
let repository = config.model_repository.clone().unwrap_or_else(|| {
if config.model.contains('/') {
format!("https://huggingface.co/{}", config.model)
} else {
format!("https://huggingface.co/meta-llama/{}", config.model)
}
});
info!(" DHT prefix: {}", prefix);
info!(" Repository: {}", repository);
info!(" Using relay: {}", using_relay);
let vpk_info = if config.vpk_enabled {
let port = config.vpk_local_port.unwrap_or(7432);
info!("VPK enabled — checking local service on port {}", port);
match check_vpk_health(port).await {
Some(health) => {
let mode = config
.vpk_mode
.clone()
.unwrap_or_else(|| "both".to_string());
let endpoint = config
.vpk_endpoint
.clone()
.unwrap_or_else(|| format!("http://localhost:{}", port));
let capacity_gb = health["capacity_gb_available"].as_f64().unwrap_or(0.0);
let tenant_count = health["tenant_count"].as_u64().unwrap_or(0) as u32;
let vpk_version = health["version"].as_str().unwrap_or("unknown").to_string();
info!(
"VPK healthy: mode={} tenants={} capacity={:.1}GB v={}",
mode, tenant_count, capacity_gb, vpk_version
);
Some(VpkInfo {
mode,
endpoint,
capacity_gb,
tenant_count,
vpk_version,
})
}
None => {
warn!(
"VPK health check failed on port {} — skipping DHT advertisement",
port
);
None
}
}
} else {
None
};
let announce_start = config.start_block as i32;
let announce_end = config.effective_end_block() as i32;
let mut server_info = DHTServerInfo::new(
announce_start,
announce_end,
&public_name,
using_relay,
throughput,
trust_attestations,
vpk_info,
peer_id.to_base58(),
);
announce(
&mut client,
peer_id,
&storage,
&bootstrap_peers,
&prefix,
&repository,
config.model_total_blocks(),
announce_start,
announce_end,
&server_info,
)
.await
.context("initial DHT announcement")?;
info!("✅ KwaaiNet node running");
info!(" Peer ID : {}", peer_id.to_base58());
info!(" Name : {}", public_name);
info!(" Model : {}", config.model);
info!(
" Blocks : {}–{}",
config.start_block,
config.effective_end_block()
);
info!(" Map : https://map.kwaai.ai");
let mut config = config.clone();
let storage_clone = storage.clone();
let mut reannounce = tokio::time::interval(Duration::from_secs(120));
reannounce.tick().await;
loop {
tokio::select! {
result = handler_listener.accept() => {
match result {
Ok((mut stream, addr)) => {
info!("Incoming RPC from {}", addr);
let s = storage_clone.clone();
tokio::spawn(async move {
if let Err(e) = handle_rpc_stream(&mut stream, s).await {
warn!("RPC handler error: {}", e);
}
});
}
Err(e) => warn!("Accept error: {}", e),
}
}
_ = async {
#[cfg(unix)] { sighup.recv().await; }
#[cfg(not(unix))] { std::future::pending::<Option<()>>().await; }
} => {
info!("SIGHUP received — re-reading config and re-announcing");
if let Ok(fresh) = KwaaiNetConfig::load_or_create() {
if fresh.start_block != config.start_block || fresh.blocks != config.blocks {
info!(
"Block range updated: [{}–{}) → [{}–{})",
config.start_block, config.effective_end_block(),
fresh.start_block, fresh.start_block + fresh.blocks,
);
config.start_block = fresh.start_block;
config.blocks = fresh.blocks;
}
}
let sb = config.start_block as i32;
let eb = config.effective_end_block() as i32;
server_info.start_block = sb;
server_info.end_block = eb;
if let Err(e) = announce(
&mut client, peer_id, &storage, &bootstrap_peers,
&prefix, &repository, config.model_total_blocks(),
sb, eb, &server_info,
).await {
warn!("Re-announce after SIGHUP failed: {}", e);
}
}
_ = reannounce.tick() => {
if let Ok(fresh) = KwaaiNetConfig::load_or_create() {
if fresh.start_block != config.start_block || fresh.blocks != config.blocks {
info!(
"Block range updated: [{}–{}) → [{}–{})",
config.start_block, config.effective_end_block(),
fresh.start_block, fresh.start_block + fresh.blocks,
);
config.start_block = fresh.start_block;
config.blocks = fresh.blocks;
}
}
#[cfg(not(unix))]
{
let flag = crate::config::run_dir().join("reannounce.flag");
if flag.exists() {
let _ = std::fs::remove_file(&flag);
}
}
let fresh_tps = compute_effective_tps(&config.model, dl_bps, using_relay);
if (fresh_tps - server_info.throughput).abs() > 0.05 {
info!(
"Throughput updated: {:.1} → {:.1} tok/s",
server_info.throughput, fresh_tps
);
server_info.throughput = fresh_tps;
}
let sb = config.start_block as i32;
let eb = config.effective_end_block() as i32;
server_info.start_block = sb;
server_info.end_block = eb;
info!("Re-announcing to DHT (shard_ready={})...", ShardManager::shard_is_ready());
if let Err(e) = announce(
&mut client, peer_id, &storage, &bootstrap_peers,
&prefix, &repository, config.model_total_blocks(),
sb, eb, &server_info,
).await {
warn!("Re-announce failed: {}", e);
}
}
_ = shutdown_signal() => {
info!("Shutdown signal received");
break;
}
}
}
info!("Unannouncing from DHT...");
unannounce(&mut client, peer_id, &storage, &bootstrap_peers, &prefix, &server_info).await;
let _ = daemon.shutdown().await;
daemon_mgr.remove_pid();
info!("KwaaiNet node stopped");
Ok(())
}
async fn unannounce(
client: &mut kwaai_p2p_daemon::P2PClient,
peer_id: PeerId,
storage: &SharedStorage,
bootstrap_peers: &[String],
prefix: &str,
server_info: &DHTServerInfo,
) {
let offline_info = DHTServerInfo {
state: -1, throughput: 0.0,
start_block: server_info.start_block,
end_block: server_info.end_block,
public_name: server_info.public_name.clone(),
version: server_info.version.clone(),
torch_dtype: server_info.torch_dtype.clone(),
using_relay: server_info.using_relay,
cache_tokens_left: 0,
next_pings: HashMap::new(),
adapters: vec![],
trust_attestations: vec![],
vpk_info: None,
peer_id_b58: server_info.peer_id_b58.clone(),
};
let expired = get_dht_time() + 360.0;
let info_bytes = match offline_info.to_msgpack() {
Ok(b) => b,
Err(e) => { warn!("Unannounce: failed to serialise server info: {}", e); return; }
};
let subkey = match rmp_serde::to_vec(&peer_id.to_base58()) {
Ok(b) => b,
Err(e) => { warn!("Unannounce: failed to serialise subkey: {}", e); return; }
};
let node_info = NodeInfo::from_peer_id(peer_id);
let mut keys = Vec::new();
let mut subkeys = Vec::new();
let mut values = Vec::new();
let mut expirations = Vec::new();
let mut in_cache = Vec::new();
for block in server_info.start_block..server_info.end_block {
keys.push(dht_id(&format!("{}.{}", prefix, block)));
subkeys.push(subkey.clone());
values.push(info_bytes.clone());
expirations.push(expired);
in_cache.push(false);
}
let block_req = StoreRequest {
auth: Some(RequestAuthInfo::new()),
keys,
subkeys,
values,
expiration_time: expirations,
in_cache,
peer: Some(node_info.clone()),
};
{
let g = storage.read().await;
let _ = g.handle_store(block_req.clone());
}
send_to_bootstrap(client, bootstrap_peers, block_req).await;
if let Some(ref vpk) = server_info.vpk_info {
if let Ok(vpk_bytes) = vpk.to_msgpack_bytes() {
let vpk_req = StoreRequest {
auth: Some(RequestAuthInfo::new()),
keys: vec![dht_id("_kwaai.vpk.nodes")],
subkeys: vec![subkey.clone()],
values: vec![vpk_bytes],
expiration_time: vec![expired],
in_cache: vec![false],
peer: Some(node_info),
};
{
let g = storage.read().await;
let _ = g.handle_store(vpk_req.clone());
}
send_to_bootstrap(client, bootstrap_peers, vpk_req).await;
}
}
info!("Unannounced from DHT — node removed from map");
}
#[allow(clippy::too_many_arguments)]
async fn announce(
client: &mut kwaai_p2p_daemon::P2PClient,
peer_id: PeerId,
storage: &SharedStorage,
bootstrap_peers: &[String],
prefix: &str,
repository: &str,
total_blocks: i32,
start_block: i32,
end_block: i32,
server_info: &DHTServerInfo,
) -> Result<()> {
info!(
"DHT prefix: {} (blocks .{} – .{})",
prefix,
start_block,
end_block - 1
);
let info_bytes = server_info.to_msgpack()?;
let subkey = rmp_serde::to_vec(&peer_id.to_base58())?;
let node_info = NodeInfo::from_peer_id(peer_id);
{
let mut keys = Vec::new();
let mut subkeys = Vec::new();
let mut values = Vec::new();
let mut expirations = Vec::new();
let mut in_cache = Vec::new();
for block in start_block..end_block {
keys.push(dht_id(&format!("{}.{}", prefix, block)));
subkeys.push(subkey.clone());
values.push(info_bytes.clone());
expirations.push(get_dht_time() + 360.0);
in_cache.push(false);
}
let block_req = StoreRequest {
auth: Some(RequestAuthInfo::new()),
keys,
subkeys,
values,
expiration_time: expirations,
in_cache,
peer: Some(node_info.clone()),
};
{
let g = storage.read().await;
let _ = g.handle_store(block_req.clone());
}
if send_to_bootstrap(client, bootstrap_peers, block_req).await {
info!("✅ Announced {} blocks", end_block - start_block);
} else {
warn!("❌ Block announcement failed — node will not appear on map");
}
}
let model_info = ModelInfo {
num_blocks: total_blocks,
repository: repository.to_string(),
};
let registry_req = StoreRequest {
auth: Some(RequestAuthInfo::new()),
keys: vec![dht_id("_petals.models")],
subkeys: vec![rmp_serde::to_vec(&prefix)?],
values: vec![model_info.to_msgpack()?],
expiration_time: vec![get_dht_time() + 360.0],
in_cache: vec![false],
peer: Some(node_info.clone()),
};
{
let g = storage.read().await;
let _ = g.handle_store(registry_req.clone());
}
if send_to_bootstrap(client, bootstrap_peers, registry_req).await {
info!("✅ Announced model to _petals.models registry");
} else {
warn!("❌ Model registry announcement failed");
}
if let Some(ref vpk) = server_info.vpk_info {
let vpk_req = StoreRequest {
auth: Some(RequestAuthInfo::new()),
keys: vec![dht_id("_kwaai.vpk.nodes")],
subkeys: vec![subkey.clone()],
values: vec![vpk.to_msgpack_bytes()?],
expiration_time: vec![get_dht_time() + 360.0],
in_cache: vec![false],
peer: Some(node_info),
};
{
let g = storage.read().await;
let _ = g.handle_store(vpk_req.clone());
}
if send_to_bootstrap(client, bootstrap_peers, vpk_req).await {
info!("✅ Announced VPK capability to _kwaai.vpk.nodes");
} else {
warn!("❌ VPK nodes announcement failed");
}
}
Ok(())
}
async fn send_to_bootstrap(
client: &mut kwaai_p2p_daemon::P2PClient,
bootstrap_peers: &[String],
req: StoreRequest,
) -> bool {
if bootstrap_peers.is_empty() {
return false;
}
use prost::Message;
let mut bytes = Vec::new();
if let Err(e) = req.encode(&mut bytes) {
warn!("Encode STORE request failed: {}", e);
return false;
}
let mut succeeded = 0usize;
for addr in bootstrap_peers {
let Some(peer_id_str) = addr.split("/p2p/").nth(1) else {
warn!("Bootstrap peer has no /p2p/ component: {}", addr);
continue;
};
let bp = match peer_id_str.parse::<PeerId>() {
Ok(p) => p,
Err(e) => {
warn!("Invalid peer ID in {}: {}", addr, e);
continue;
}
};
match tokio::time::timeout(Duration::from_secs(20), client.connect_peer(addr)).await {
Ok(Ok(_)) => { }
Ok(Err(e)) => {
warn!("Bootstrap connect failed ({}): {}", addr, e);
continue;
}
Err(_) => {
warn!("Bootstrap connect timeout ({}): exceeded 20s", addr);
continue;
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
match tokio::time::timeout(
Duration::from_secs(30),
client.call_unary_handler(&bp.to_bytes(), "DHTProtocol.rpc_store", &bytes),
)
.await
{
Ok(Ok(resp_bytes)) => {
use kwaai_hivemind_dht::protocol::StoreResponse;
if let Ok(resp) = StoreResponse::decode(&resp_bytes[..]) {
let ok = resp.store_ok.iter().filter(|&&s| s).count();
info!(
"STORE response from {}: {}/{} stored",
peer_id_str,
ok,
resp.store_ok.len()
);
if ok > 0 {
succeeded += 1;
}
}
}
Ok(Err(e)) => warn!("STORE RPC failed ({}): {}", addr, e),
Err(_) => {
warn!("STORE RPC timeout ({}): exceeded 30s", addr);
}
}
}
if succeeded > 0 {
info!(
"✅ Announced to {} of {} bootstrap peers",
succeeded,
bootstrap_peers.len()
);
} else {
warn!(
"❌ Announcement failed on all {} bootstrap peers — see warnings above",
bootstrap_peers.len()
);
}
succeeded > 0
}
async fn dial_and_wait_for_bootstrap(
client: &mut kwaai_p2p_daemon::P2PClient,
bootstrap_peers: &[String],
) -> Result<()> {
const MAX_WAIT_SECS: u64 = 30;
const POLL_INTERVAL_MS: u64 = 500;
let start = tokio::time::Instant::now();
let max_wait = Duration::from_secs(MAX_WAIT_SECS);
let mut dialed_ok = false;
for addr in bootstrap_peers {
match tokio::time::timeout(Duration::from_secs(10), client.connect_peer(addr)).await {
Ok(Ok(_)) => {
info!("Dialed bootstrap peer {}", addr);
dialed_ok = true;
}
Ok(Err(e)) => warn!("Bootstrap dial failed ({}): {}", addr, e),
Err(_) => warn!("Bootstrap dial timeout ({}): exceeded 10s", addr),
}
}
let bootstrap_peer_ids: Vec<String> = bootstrap_peers
.iter()
.filter_map(|addr| addr.split("/p2p/").nth(1))
.map(|s| s.to_string())
.collect();
loop {
match client.list_peers().await {
Ok(peers) => {
let connected_bootstrap_count = peers
.iter()
.filter(|peer_info| match PeerId::from_bytes(&peer_info.id) {
Ok(pid) => bootstrap_peer_ids.contains(&pid.to_base58()),
Err(_) => false,
})
.count();
if connected_bootstrap_count > 0 {
let elapsed = start.elapsed();
info!(
"✅ Connected to {} bootstrap peer(s) in {:.1}s",
connected_bootstrap_count,
elapsed.as_secs_f64()
);
return Ok(());
}
let elapsed = start.elapsed();
if elapsed.as_secs().is_multiple_of(5)
&& elapsed.as_millis() < POLL_INTERVAL_MS as u128 * 2
{
info!(" Waiting for bootstrap peers... ({:.0}s elapsed, {} total peers connected)",
elapsed.as_secs_f64(), peers.len());
}
}
Err(e) => {
warn!("Peer list query failed: {} — continuing to wait", e);
}
}
if start.elapsed() >= max_wait {
if dialed_ok {
info!("Bootstrap peer dialed but not yet visible in peer list — continuing");
} else {
warn!(
"⚠️ Bootstrap timeout after {}s — no bootstrap peers connected",
MAX_WAIT_SECS
);
warn!(" Node will still announce, but may not be visible on map initially");
}
return Ok(());
}
tokio::time::sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
}
}
async fn handle_rpc_stream(tcp: &mut tokio::net::TcpStream, storage: SharedStorage) -> Result<()> {
let info = stream::parse_stream_info(tcp)
.await
.map_err(|e| anyhow::anyhow!("parse stream info: {}", e))?;
info!("RPC {}", info.proto);
use prost::Message as _;
use tokio::io::AsyncReadExt as _;
let mut bytes = Vec::new();
tcp.read_to_end(&mut bytes)
.await
.map_err(|e| anyhow::anyhow!("read request: {}", e))?;
let req = match info.proto.as_str() {
"DHTProtocol.rpc_store" => {
let r = kwaai_hivemind_dht::protocol::StoreRequest::decode(bytes.as_slice())
.map_err(|e| anyhow::anyhow!("decode StoreRequest: {}", e))?;
DHTRequest::Store(r)
}
"DHTProtocol.rpc_find" => {
let r = kwaai_hivemind_dht::protocol::FindRequest::decode(bytes.as_slice())
.map_err(|e| anyhow::anyhow!("decode FindRequest: {}", e))?;
DHTRequest::Find(r)
}
_ => {
let r = kwaai_hivemind_dht::protocol::PingRequest::decode(bytes.as_slice())
.map_err(|e| anyhow::anyhow!("decode PingRequest: {}", e))?;
DHTRequest::Ping(r)
}
};
let response_bytes = {
let g = storage.read().await;
let resp = g
.handle_request(req)
.map_err(|e| anyhow::anyhow!("handle_request: {}", e))?;
use kwaai_hivemind_dht::codec::DHTResponse;
match resp {
DHTResponse::Store(r) => r.encode_to_vec(),
DHTResponse::Find(r) => r.encode_to_vec(),
DHTResponse::Ping(r) => r.encode_to_vec(),
}
};
tcp.write_all(&response_bytes).await?;
tcp.flush().await?;
Ok(())
}
async fn shutdown_signal() {
#[cfg(unix)]
{
let mut sigterm =
signal::unix::signal(signal::unix::SignalKind::terminate()).expect("SIGTERM handler");
tokio::select! {
_ = signal::ctrl_c() => { info!("Received Ctrl-C"); }
_ = sigterm.recv() => { info!("Received SIGTERM"); }
}
}
#[cfg(not(unix))]
{
signal::ctrl_c().await.expect("Ctrl-C handler");
info!("Received Ctrl-C");
}
}
async fn check_vpk_health(port: u16) -> Option<serde_json::Value> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(3))
.build()
.ok()?;
let url = format!("http://localhost:{}/api/health", port);
let resp = client.get(&url).send().await.ok()?;
if !resp.status().is_success() {
return None;
}
resp.json::<serde_json::Value>().await.ok()
}
#[allow(dead_code)]
fn find_free_port(preferred: u16) -> Option<u16> {
if port_is_free(preferred) {
return Some(preferred);
}
((preferred + 1)..=(preferred + 100)).find(|&p| port_is_free(p))
}
#[allow(dead_code)]
fn port_is_free(port: u16) -> bool {
std::net::TcpListener::bind(("0.0.0.0", port)).is_ok()
}
fn compute_effective_tps(model: &str, dl_bps: f64, using_relay: bool) -> f64 {
match crate::throughput::load(model) {
Some(entry) => crate::throughput::effective_tps(&entry, dl_bps, using_relay),
None => 10.0, }
}
fn find_p2pd_binary() -> Option<std::path::PathBuf> {
#[cfg(windows)]
let name = "p2pd.exe";
#[cfg(not(windows))]
let name = "p2pd";
if let Ok(exe) = std::env::current_exe() {
let c = exe.parent()?.join(name);
if c.exists() {
return Some(c);
}
}
if let Ok(manifest) = std::env::var("CARGO_MANIFEST_DIR") {
#[cfg(windows)]
let c = std::path::PathBuf::from(manifest).join("../../../target/debug/p2pd.exe");
#[cfg(not(windows))]
let c = std::path::PathBuf::from(manifest).join("../../../target/debug/p2pd");
if c.exists() {
return Some(c);
}
}
let paths = std::env::var_os("PATH")?;
for dir in std::env::split_paths(&paths) {
let c = dir.join(name);
if c.exists() {
return Some(c);
}
}
None
}