1#![cfg_attr(test, allow(unused_crate_dependencies))]
2use base64::Engine;
3use base64::engine::general_purpose::STANDARD_NO_PAD;
4pub use bootstrap::init_csk;
5use bootstrap::{CertInit, SigningInit, init_cert, init_signing, setup_quic, setup_tls_acceptor};
6pub use bootstrap::{join_secret, load_or_generate_cert};
7pub use config::ServerConfigOpts;
8use config::parse_nets;
9use connection::{ManagerContext, accept_loop, join, spawn_mesh_runner};
10use eyre::Report;
11pub use join::JoinConfig;
12pub use keys::{
13 ManagerProfileBuilder, ManagerProfileExport, add_peer, bootstrap_keypair, default_secret_dir,
14 delete_profile, export_profile, import_profile, list_profiles, load_bind_host, load_bootstrap,
15 load_manager_name, load_manager_whitelist, load_max_workers, load_peer_version, load_peers,
16 load_peers_for_gossip, load_profile_host, load_quic_port, load_signing_key, load_tcp_port,
17 load_verifying_key, load_worker_whitelist, profile_exists, remove_peer, rename_profile,
18 save_bind_host, save_bootstrap, save_manager_name, save_manager_whitelist, save_max_workers,
19 save_peer_version, save_peers, save_profile_host, save_quic_port, save_tcp_port,
20 save_worker_whitelist, secret_dir, update_profile,
21};
22use nu_ansi_term::Color;
23use peers::{AliveTable, sweep_dead};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
27#[cfg(unix)]
28use tokio::signal::unix::{SignalKind, signal};
29use tokio::sync::{Mutex, RwLock};
30use tokio::{net::TcpListener, sync::broadcast};
31use tracing::{error, info, warn};
32#[cfg(unix)]
33use util::short_id;
34pub use util::sleep_backoff;
35pub fn ignore_sleep_notify(ignore: bool) {
38 let _ = ignore;
39 #[cfg(test)]
40 {
41 util::set_ignore_test_sleep_notify(ignore);
42 }
43}
44use volli_commands::CommandDistributor;
45pub use volli_core::{ConnectionState, ManagerPeerEntry, Message};
46pub use volli_transport::MessageTransportExt;
47
48pub use connection::mesh::try_connect_with_fallback;
50
51mod bootstrap;
52mod config;
53mod connection;
54mod control;
55pub mod health;
56mod peers;
57pub mod test_harness;
59mod util;
60pub mod workers;
61
62pub use connection::rate_limiter::RateLimiter;
64
65#[cfg(test)]
66mod tests;
67
68pub async fn join(
69 join_peer: &ManagerPeerEntry,
70 token: &str,
71 profile: &str,
72 config: &JoinConfig<'_>,
73) -> Result<ManagerPeerEntry, Report> {
74 join::join_as_client(join_peer, token, profile, config).await
75}
76
77pub mod keys;
78pub mod mesh;
79
80pub async fn run(
81 mut cfg: ServerConfigOpts,
82 ready_signal: Option<tokio::sync::oneshot::Sender<()>>,
83) -> Result<(), Report> {
84 if std::env::var("VOLLI_FAST_TESTS")
87 .map(|v| v != "0")
88 .unwrap_or_else(|_| std::env::var("NEXTEST").is_ok())
89 && cfg.handshake_timeout_ms > 200
90 {
91 cfg.handshake_timeout_ms = 200;
92 }
93 if cfg.test_disable_listeners {
95 if let Some(signal) = ready_signal {
96 let _ = signal.send(());
97 }
98 return Ok(());
99 }
100 let mut show_bootstrap = false;
101 let secret_base = cfg
102 .secret_dir
103 .as_deref()
104 .map(std::path::PathBuf::from)
105 .unwrap_or_else(default_secret_dir);
106 let secret_dir: &std::path::Path = &secret_base;
107
108 let SigningInit {
109 key: signing,
110 newly_generated: persist_keys,
111 sk_path,
112 pk_path,
113 id: manager_id,
114 fingerprint: pub_fp,
115 } = init_signing(secret_dir, &mut show_bootstrap)?;
116 let (csk, csk_ver, persist_csk) = bootstrap::init_csk(&cfg.profile)?;
117 let csk_shared = Arc::new(RwLock::new(csk));
118 let csk_ver_shared = Arc::new(AtomicU32::new(csk_ver));
119 let CertInit {
120 chain: cert_chain,
121 key,
122 fingerprint,
123 cert_der,
124 key_der,
125 cert_path,
126 key_path,
127 newly_generated: persist_cert,
128 } = init_cert(&cfg, secret_dir)?;
129
130 let quic_endpoint = setup_quic(&cfg.bind, cfg.quic_port, &cert_chain, &key)?;
131 let quic_port = quic_endpoint.local_addr()?.port();
132 let tls_acceptor = setup_tls_acceptor(&cert_chain, &key)?;
133 let tcp_listener = TcpListener::bind((cfg.bind.as_str(), cfg.tcp_port)).await?;
134 let tcp_port = tcp_listener.local_addr()?.port();
135 cfg.quic_port = quic_port;
136 cfg.tcp_port = tcp_port;
137
138 if let Err(e) = crate::keys::save_quic_port(&cfg.profile, quic_port) {
140 tracing::warn!("Failed to save QUIC port to profile: {}", e);
141 }
142 if let Err(e) = crate::keys::save_tcp_port(&cfg.profile, tcp_port) {
143 tracing::warn!("Failed to save TCP port to profile: {}", e);
144 }
145
146 let manager_name = cfg
147 .manager_name
148 .clone()
149 .or_else(|| load_manager_name(&cfg.profile).ok().flatten())
150 .unwrap_or_else(|| cfg.profile.clone());
151
152 if let Err(e) = crate::keys::save_manager_name(&cfg.profile, &manager_name) {
154 tracing::warn!("Failed to save manager name to profile: {}", e);
155 }
156
157 let self_meta = ManagerPeerEntry {
158 manager_id: manager_id.clone(),
159 manager_name,
160 tenant: "self".into(),
161 cluster: "default".into(),
162 host: cfg.advertise_host.clone(),
163 tcp_port: cfg.tcp_port,
164 quic_port: cfg.quic_port,
165 pub_fp: hex::encode(pub_fp),
166 csk_ver,
167 tls_cert: STANDARD_NO_PAD.encode(&cert_der),
168 tls_fp: fingerprint.clone(),
169 health: None, };
171 let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
172 let workers: workers::WorkerTable = Arc::new(Mutex::new(HashMap::new()));
173 let peer_version = Arc::new(AtomicU64::new(1));
174 if let Ok(ver) = keys::load_peer_version(&cfg.profile)
175 && ver > 0
176 {
177 peer_version.store(ver, Ordering::SeqCst);
178 }
179 let (alive_tx, _) = broadcast::channel(256);
181 tokio::spawn(sweep_dead(
182 peers.clone(),
183 cfg.profile.clone(),
184 alive_tx.clone(),
185 peer_version.clone(),
186 ));
187
188 let health_context = connection::HealthContext::new(crate::health::HealthConfig {
190 max_workers: cfg.max_workers,
191 ..Default::default()
192 });
193
194 let dial_tx = spawn_mesh_runner(
195 &cfg,
196 self_meta.clone(),
197 peers.clone(),
198 peer_version.clone(),
199 alive_tx.clone(),
200 csk_shared.clone(),
201 csk_ver_shared.clone(),
202 workers.clone(),
203 health_context.clone(),
204 );
205
206 setup_panic_handler(&cfg.profile);
208
209 log_system_info(&cfg);
211
212 #[cfg(unix)]
213 spawn_state_signal_handler(
214 peers.clone(),
215 workers.clone(),
216 health_context.clone(),
217 cfg.profile.clone(),
218 );
219
220 #[cfg(unix)]
222 spawn_shutdown_signal_handlers(cfg.profile.clone());
223
224 spawn_health_monitor(
226 cfg.profile.clone(),
227 peers.clone(),
228 workers.clone(),
229 health_context.clone(),
230 peer_version.clone(),
231 alive_tx.clone(),
232 );
233
234 let control_sock =
236 volli_core::util::runtime_socket_path_for_dir(secret_dir, "manager", &cfg.profile);
237 control::maybe_spawn_control_server(
238 control_sock,
239 control::ControlContext {
240 health: health_context.clone(),
241 peers: peers.clone(),
242 workers: workers.clone(),
243 profile: cfg.profile.clone(),
244 self_id: manager_id.clone(),
245 peer_version: peer_version.clone(),
246 alive_tx: alive_tx.clone(),
247 },
248 );
249
250 let active_connections = Arc::new(AtomicUsize::new(0));
251 let worker_nets = parse_nets(&cfg.worker_whitelist);
252 let manager_nets = parse_nets(&cfg.manager_whitelist);
253 let token = if let Some(t) = cfg.token.take() {
254 volli_core::token::decode_token(&t)?
255 } else {
256 volli_core::token::issue_bootstrap_token(
257 &csk,
258 "self",
259 "default",
260 "*",
261 86_400,
262 &cfg.advertise_host,
263 cfg.quic_port,
264 cfg.tcp_port,
265 cert_der.clone(),
266 )?
267 };
268 let secret_encoded = volli_core::token::encode_token(&token)?;
269 info!(
270 "Server listening on {} TCP:{} QUIC:{}",
271 cfg.bind, cfg.tcp_port, cfg.quic_port
272 );
273
274 if show_bootstrap {
276 println!("Worker bootstrap command:\n volli worker --join {secret_encoded}");
277 println!(
278 "Hint: run 'volli admin manager-token --manager {}' to get a manager join command",
279 cfg.profile
280 );
281 }
282
283 let csk_final = *csk_shared.read().await;
285 let ver_final = csk_ver_shared.load(Ordering::SeqCst);
286 update_profile(&cfg.profile)?
287 .signing_key(&signing, persist_keys, sk_path.clone(), pk_path.clone())
288 .certificate(
289 &cert_der,
290 &key_der,
291 persist_cert,
292 cert_path.clone(),
293 key_path.clone(),
294 )
295 .cluster_key(&csk_final, ver_final, persist_csk)
296 .secret_dir(secret_dir.to_path_buf())
297 .save_all()?;
298
299 if let Some(signal) = ready_signal {
301 let _ = signal.send(());
302 }
303
304 let command_distributor = Arc::new(CommandDistributor::new(manager_id.clone()));
306
307 let ctx = ManagerContext::new_server(
308 connection::SecurityContext {
309 signing: Some(signing),
310 csk: csk_shared.clone(),
311 csk_ver: csk_ver_shared.clone(),
312 },
313 connection::NetworkContext {
314 worker_nets,
315 manager_nets,
316 },
317 connection::StateContext {
318 peers,
319 workers,
320 self_meta,
321 peer_version,
322 command_distributor: Some(command_distributor),
323 },
324 connection::CommunicationContext {
325 alive_tx,
326 dial_tx,
327 profile: cfg.profile.clone(),
328 },
329 health_context.clone(),
330 manager_id,
331 );
332
333 let profile_for_logging = cfg.profile.clone();
335 let quic_port_for_logging = cfg.quic_port;
336
337 let result = accept_loop(
339 ctx,
340 cfg,
341 tcp_listener,
342 tls_acceptor,
343 quic_endpoint,
344 active_connections,
345 )
346 .await;
347
348 match &result {
350 Ok(_) => info!(
351 "Manager {} server loop exited normally",
352 profile_for_logging
353 ),
354 Err(e) => {
355 error!("Manager {} server loop crashed: {}", profile_for_logging, e);
356 error!("Error chain: {:?}", e);
357
358 if format!("{}", e).contains("Permission denied") {
360 error!(
361 "Hint: Check if port {} is already in use or requires elevated privileges",
362 quic_port_for_logging
363 );
364 } else if format!("{}", e).contains("Address already in use") {
365 error!(
366 "Hint: Another process is already using port {}",
367 quic_port_for_logging
368 );
369 } else if format!("{}", e).contains("No such file or directory") {
370 error!("Hint: Check if certificates or key files exist and are readable");
371 }
372 }
373 }
374
375 result
376}
377
378fn setup_panic_handler(profile: &str) {
380 let profile = profile.to_string();
381 std::panic::set_hook(Box::new(move |panic_info| {
382 let backtrace = std::backtrace::Backtrace::force_capture();
383
384 if let Some(location) = panic_info.location() {
385 let message = if let Some(msg) = panic_info.payload().downcast_ref::<&str>() {
386 msg.to_string()
387 } else if let Some(msg) = panic_info.payload().downcast_ref::<String>() {
388 msg.clone()
389 } else {
390 "unknown panic".to_string()
391 };
392 error!(
393 "PANIC in manager {}: {} at {}:{}:{}",
394 profile,
395 message,
396 location.file(),
397 location.line(),
398 location.column()
399 );
400 } else {
401 let message = if let Some(msg) = panic_info.payload().downcast_ref::<&str>() {
402 msg.to_string()
403 } else if let Some(msg) = panic_info.payload().downcast_ref::<String>() {
404 msg.clone()
405 } else {
406 "unknown panic".to_string()
407 };
408 error!("PANIC in manager {}: {}", profile, message);
409 }
410
411 error!("Backtrace:\n{}", backtrace);
412
413 std::thread::sleep(std::time::Duration::from_millis(100));
415 }));
416}
417
418fn log_system_info(cfg: &ServerConfigOpts) {
420 info!("Manager configuration:");
421 info!(" Profile: {}", cfg.profile);
422 info!(" Bind: {}:{}+{}", cfg.bind, cfg.tcp_port, cfg.quic_port);
423 info!(" Advertise: {}", cfg.advertise_host);
424
425 #[cfg(unix)]
427 {
428 if let Ok(limit) = std::process::Command::new("ulimit").arg("-n").output()
429 && let Ok(output) = String::from_utf8(limit.stdout)
430 {
431 info!("File descriptor limit: {}", output.trim());
432 }
433 }
434
435 let pid = std::process::id();
437 info!("Process ID: {}", pid);
438}
439
440#[cfg(unix)]
441fn spawn_shutdown_signal_handlers(profile: String) {
442 tokio::spawn(async move {
443 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM handler");
444 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to setup SIGINT handler");
445
446 tokio::select! {
447 _ = sigterm.recv() => {
448 warn!("Received SIGTERM, manager {} shutting down gracefully", profile);
449 std::process::exit(0);
450 }
451 _ = sigint.recv() => {
452 warn!("Received SIGINT, manager {} shutting down gracefully", profile);
453 std::process::exit(0);
454 }
455 }
456 });
457}
458
459fn spawn_health_monitor(
461 profile: String,
462 peers: AliveTable,
463 workers: crate::workers::WorkerTable,
464 health: connection::HealthContext,
465 peer_version: Arc<AtomicU64>,
466 alive_tx: broadcast::Sender<u64>,
467) {
468 tokio::spawn(async move {
469 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
470 let mut last_peer_count = 0;
471 let mut last_worker_count = 0;
472 let mut check_count = 0;
473 let mut last_health_snapshot: Option<volli_core::HealthMetrics> = None;
474
475 loop {
476 interval.tick().await;
477 check_count += 1;
478
479 let peer_count = peers
481 .lock()
482 .await
483 .values()
484 .filter(|p| p.conn_state != ConnectionState::Inactive)
485 .count();
486 let workers_list = crate::workers::list_workers(&workers).await;
487 let worker_total = workers_list.len();
488 let worker_active = workers_list
489 .iter()
490 .filter(|w| w.disconnected_at.is_none())
491 .count();
492
493 if check_count % 4 == 0 {
495 info!(
497 "Manager {} health check #{}: {} peers, {} workers ({} active), uptime: {} checks",
498 profile, check_count, peer_count, worker_total, worker_active, check_count
499 );
500 }
501
502 if peer_count != last_peer_count {
505 last_peer_count = peer_count;
506 }
507 if worker_active != last_worker_count {
508 last_worker_count = worker_active;
509 }
510
511 if check_count % 12 == 0 {
513 check_system_resources(&profile).await;
515 }
516
517 if check_count % 20 == 0 {
519 check_for_potential_issues(&profile, &peers, &workers).await;
521 }
522
523 let cur = health.collector.lock().await.collect_metrics().await;
525 let now = cur.last_health_update;
526 let cur_core = volli_core::HealthMetrics {
528 health_score: cur.health_score,
529 load_percentage: cur.load_percentage,
530 max_workers: cur.max_workers,
531 current_workers: cur.current_workers,
532 avg_cpu: cur.avg_cpu,
533 avg_memory: cur.avg_memory,
534 last_health_update: now,
535 };
536
537 let should_bump = match &last_health_snapshot {
538 None => true,
539 Some(prev) => {
540 let hs_delta = (prev.health_score - cur_core.health_score).abs();
541 let cpu_delta = match (prev.avg_cpu, cur_core.avg_cpu) {
542 (Some(a), Some(b)) => (a - b).abs(),
543 _ => 0.0,
544 };
545 let mem_delta = match (prev.avg_memory, cur_core.avg_memory) {
546 (Some(a), Some(b)) => (a - b).abs(),
547 _ => 0.0,
548 };
549 let workers_changed = prev.current_workers != cur_core.current_workers;
550 let load_delta = (prev.load_percentage - cur_core.load_percentage).abs();
551
552 hs_delta >= 0.05
553 || cpu_delta >= 10.0
554 || mem_delta >= 10.0
555 || workers_changed
556 || load_delta >= 10.0
557 }
558 };
559
560 if should_bump {
561 last_health_snapshot = Some(cur_core);
562 let new_ver = peer_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
563 let _ = crate::keys::save_peer_version(&profile, new_ver);
564 let _ = alive_tx.send(new_ver);
565 tracing::debug!(target: "health", profile=%profile, version=new_ver, "bumped peer version due to health change");
566 }
567 }
568 });
569}
570
571async fn check_system_resources(profile: &str) {
573 let pid = std::process::id();
574
575 #[cfg(target_os = "macos")]
577 {
578 if let Ok(output) = tokio::process::Command::new("ps")
579 .args(["-o", "rss,vsz", "-p", &pid.to_string()])
580 .output()
581 .await
582 && let Ok(result) = String::from_utf8(output.stdout)
583 {
584 let lines: Vec<&str> = result.trim().lines().collect();
585 if lines.len() > 1
586 && let Some(memory_line) = lines.get(1)
587 {
588 let parts: Vec<&str> = memory_line.split_whitespace().collect();
589 if parts.len() >= 2
590 && let (Ok(rss), Ok(vsz)) = (parts[0].parse::<u64>(), parts[1].parse::<u64>())
591 {
592 let rss_mb = rss / 1024;
593 let vsz_mb = vsz / 1024;
594 info!(
595 "Manager {} memory: {}MB RSS, {}MB VSZ",
596 profile, rss_mb, vsz_mb
597 );
598
599 if rss_mb > 500 {
601 warn!("Manager {} high memory usage: {}MB RSS", profile, rss_mb);
602 }
603 }
604 }
605 }
606 }
607
608 #[cfg(unix)]
610 {
611 if let Ok(output) = tokio::process::Command::new("lsof")
612 .args(["-p", &pid.to_string()])
613 .output()
614 .await
615 {
616 let fd_count = String::from_utf8_lossy(&output.stdout)
617 .lines()
618 .count()
619 .saturating_sub(1);
620 if fd_count > 100 {
621 info!("Manager {} file descriptors: {}", profile, fd_count);
622 }
623 if fd_count > 800 {
624 warn!(
625 "Manager {} high file descriptor usage: {}",
626 profile, fd_count
627 );
628 }
629 }
630 }
631}
632
633async fn check_for_potential_issues(
635 profile: &str,
636 peers: &AliveTable,
637 workers: &crate::workers::WorkerTable,
638) {
639 match tokio::time::timeout(std::time::Duration::from_secs(5), peers.lock()).await {
641 Ok(_) => {
642 }
644 Err(_) => {
645 error!(
646 "🔒 Manager {} potential deadlock: peer table lock timeout",
647 profile
648 );
649 }
650 }
651
652 match tokio::time::timeout(std::time::Duration::from_secs(5), workers.lock()).await {
654 Ok(_) => {
655 }
657 Err(_) => {
658 error!(
659 "🔒 Manager {} potential deadlock: worker table lock timeout",
660 profile
661 );
662 }
663 }
664
665 let test_start = std::time::Instant::now();
667 tokio::task::yield_now().await;
668 let yield_time = test_start.elapsed();
669
670 if yield_time > std::time::Duration::from_millis(100) {
671 warn!(
672 "🐌 Manager {} slow task scheduling: {:?}",
673 profile, yield_time
674 );
675 }
676}
677
678#[cfg(unix)]
679fn spawn_state_signal_handler(
680 peers: AliveTable,
681 workers: crate::workers::WorkerTable,
682 health: connection::HealthContext,
683 profile: String,
684) {
685 tokio::spawn(async move {
686 if let Ok(mut sig) = signal(SignalKind::user_defined1()) {
687 while sig.recv().await.is_some() {
688 info!(
689 "📡 Received SIGUSR1 signal, printing state for manager: {}",
690 profile
691 );
692 print_peer_states(&peers, &workers, &health, &profile).await;
693 }
694 } else {
695 error!(
696 "Failed to setup SIGUSR1 signal handler for manager: {}",
697 profile
698 );
699 }
700 });
701}
702
703#[cfg(unix)]
704async fn print_peer_states(
705 peers: &AliveTable,
706 workers: &crate::workers::WorkerTable,
707 health: &connection::HealthContext,
708 profile: &str,
709) {
710 let map = peers.lock().await;
711
712 let current_health = if let Ok(mut collector) = health.collector.try_lock() {
714 Some(collector.collect_metrics().await)
715 } else {
716 None
717 };
718
719 let worker_count = workers.lock().await.len();
721
722 let peer_entries = load_peers_for_gossip(profile).unwrap_or_default();
724 tracing::debug!(target: "peer_states", profile=%profile, peer_count=%peer_entries.len(), "loaded peer entries for status display");
725 let manager_data: std::collections::HashMap<
726 String,
727 (String, Option<volli_core::HealthMetrics>),
728 > = peer_entries
729 .iter()
730 .map(|p| {
731 (
732 p.manager_id.clone(),
733 (p.manager_name.clone(), p.health.clone()),
734 )
735 })
736 .collect();
737 tracing::debug!(target: "peer_states", profile=%profile, name_count=%manager_data.len(), "built manager data map");
738
739 let mut parts = Vec::new();
740 for (full_id, state) in map.iter() {
741 let manager_id = match full_id.split(':').collect::<Vec<_>>() {
742 segments if segments.len() == 3 => segments[2],
743 _ => full_id,
744 };
745
746 let (name_disp, peer_health) = manager_data
747 .get(manager_id)
748 .cloned()
749 .unwrap_or_else(|| {
750 tracing::debug!(target: "peer_states", manager_id=%manager_id, "no data found, using short_id");
751 (short_id(manager_id).to_string(), None)
752 });
753
754 let (sym, color) = match state.conn_state {
755 ConnectionState::Client => ("⇢", Color::Green),
756 ConnectionState::Server => ("⇠", Color::Green),
757 ConnectionState::Inactive => ("×", Color::Red),
758 };
759
760 let health_info = if let Some(health) = peer_health {
762 format!(
763 " (workers:{} health:{:.0}% cpu:{:.0}% mem:{:.0}%)",
764 health.current_workers,
765 health.health_score * 100.0,
766 health.avg_cpu.unwrap_or(0.0),
767 health.avg_memory.unwrap_or(0.0)
768 )
769 } else {
770 String::new()
771 };
772
773 parts.push((
774 full_id.clone(),
775 format!("{}{}{}", color.paint(sym), name_disp, health_info),
776 ));
777 }
778 parts.sort_by(|a, b| a.0.cmp(&b.0));
779 let sorted_parts: Vec<String> = parts.into_iter().map(|(_, part)| part).collect();
780
781 if let Some(health) = current_health {
783 println!(
784 "[{}] workers:{} health:{:.0}% cpu:{:.0}% mem:{:.0}% load:{:.0}%",
785 profile,
786 worker_count,
787 health.health_score * 100.0,
788 health.avg_cpu.unwrap_or(0.0),
789 health.avg_memory.unwrap_or(0.0),
790 health.load_percentage
791 );
792 } else {
793 println!(
794 "[{}] workers:{} (health data unavailable)",
795 profile, worker_count
796 );
797 }
798
799 if sorted_parts.is_empty() {
801 println!("peers: none");
802 } else {
803 println!("peers: {}", sorted_parts.join(" "));
804 }
805}