volli_manager/
lib.rs

1#![cfg_attr(test, allow(unused_crate_dependencies))]
2use base64::Engine;
3use base64::engine::general_purpose::STANDARD_NO_PAD;
4pub use bootstrap::init_csk;
5use bootstrap::{CertInit, SigningInit, init_cert, init_signing, setup_quic, setup_tls_acceptor};
6pub use bootstrap::{join_secret, load_or_generate_cert};
7pub use config::ServerConfigOpts;
8use config::parse_nets;
9use connection::{ManagerContext, accept_loop, join, spawn_mesh_runner};
10use eyre::Report;
11pub use join::JoinConfig;
12pub use keys::{
13    ManagerProfileBuilder, ManagerProfileExport, add_peer, bootstrap_keypair, default_secret_dir,
14    delete_profile, export_profile, import_profile, list_profiles, load_bind_host, load_bootstrap,
15    load_manager_name, load_manager_whitelist, load_max_workers, load_peer_version, load_peers,
16    load_peers_for_gossip, load_profile_host, load_quic_port, load_signing_key, load_tcp_port,
17    load_verifying_key, load_worker_whitelist, profile_exists, remove_peer, rename_profile,
18    save_bind_host, save_bootstrap, save_manager_name, save_manager_whitelist, save_max_workers,
19    save_peer_version, save_peers, save_profile_host, save_quic_port, save_tcp_port,
20    save_worker_whitelist, secret_dir, update_profile,
21};
22use nu_ansi_term::Color;
23use peers::{AliveTable, sweep_dead};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
27#[cfg(unix)]
28use tokio::signal::unix::{SignalKind, signal};
29use tokio::sync::{Mutex, RwLock};
30use tokio::{net::TcpListener, sync::broadcast};
31use tracing::{error, info, warn};
32#[cfg(unix)]
33use util::short_id;
34pub use util::sleep_backoff;
35/// Test helper: instructs sleep_backoff to ignore the test sleep notifier.
36/// No-op in non-test builds.
37pub fn ignore_sleep_notify(ignore: bool) {
38    let _ = ignore;
39    #[cfg(test)]
40    {
41        util::set_ignore_test_sleep_notify(ignore);
42    }
43}
44use volli_commands::CommandDistributor;
45pub use volli_core::{ConnectionState, ManagerPeerEntry, Message};
46pub use volli_transport::MessageTransportExt;
47
48// Export mesh connection functions for external clients
49pub use connection::mesh::try_connect_with_fallback;
50
51mod bootstrap;
52mod config;
53mod connection;
54mod control;
55pub mod health;
56mod peers;
57// transport message extensions are centralized in volli-transport
58pub mod test_harness;
59mod util;
60pub mod workers;
61
62// Export rate limiter for white-box testing from integration tests
63pub use connection::rate_limiter::RateLimiter;
64
65#[cfg(test)]
66mod tests;
67
68pub async fn join(
69    join_peer: &ManagerPeerEntry,
70    token: &str,
71    profile: &str,
72    config: &JoinConfig<'_>,
73) -> Result<ManagerPeerEntry, Report> {
74    join::join_as_client(join_peer, token, profile, config).await
75}
76
77pub mod keys;
78pub mod mesh;
79
80pub async fn run(
81    mut cfg: ServerConfigOpts,
82    ready_signal: Option<tokio::sync::oneshot::Sender<()>>,
83) -> Result<(), Report> {
84    // In fast test environments (Nextest or VOLLI_FAST_TESTS), cap handshake timeouts
85    // to keep integration tests snappy even when not explicitly overridden.
86    if std::env::var("VOLLI_FAST_TESTS")
87        .map(|v| v != "0")
88        .unwrap_or_else(|_| std::env::var("NEXTEST").is_ok())
89        && cfg.handshake_timeout_ms > 200
90    {
91        cfg.handshake_timeout_ms = 200;
92    }
93    // Fast path for tests that don't require network listeners
94    if cfg.test_disable_listeners {
95        if let Some(signal) = ready_signal {
96            let _ = signal.send(());
97        }
98        return Ok(());
99    }
100    let mut show_bootstrap = false;
101    let secret_base = cfg
102        .secret_dir
103        .as_deref()
104        .map(std::path::PathBuf::from)
105        .unwrap_or_else(default_secret_dir);
106    let secret_dir: &std::path::Path = &secret_base;
107
108    let SigningInit {
109        key: signing,
110        newly_generated: persist_keys,
111        sk_path,
112        pk_path,
113        id: manager_id,
114        fingerprint: pub_fp,
115    } = init_signing(secret_dir, &mut show_bootstrap)?;
116    let (csk, csk_ver, persist_csk) = bootstrap::init_csk(&cfg.profile)?;
117    let csk_shared = Arc::new(RwLock::new(csk));
118    let csk_ver_shared = Arc::new(AtomicU32::new(csk_ver));
119    let CertInit {
120        chain: cert_chain,
121        key,
122        fingerprint,
123        cert_der,
124        key_der,
125        cert_path,
126        key_path,
127        newly_generated: persist_cert,
128    } = init_cert(&cfg, secret_dir)?;
129
130    let quic_endpoint = setup_quic(&cfg.bind, cfg.quic_port, &cert_chain, &key)?;
131    let quic_port = quic_endpoint.local_addr()?.port();
132    let tls_acceptor = setup_tls_acceptor(&cert_chain, &key)?;
133    let tcp_listener = TcpListener::bind((cfg.bind.as_str(), cfg.tcp_port)).await?;
134    let tcp_port = tcp_listener.local_addr()?.port();
135    cfg.quic_port = quic_port;
136    cfg.tcp_port = tcp_port;
137
138    // Save the actual bound ports to the profile
139    if let Err(e) = crate::keys::save_quic_port(&cfg.profile, quic_port) {
140        tracing::warn!("Failed to save QUIC port to profile: {}", e);
141    }
142    if let Err(e) = crate::keys::save_tcp_port(&cfg.profile, tcp_port) {
143        tracing::warn!("Failed to save TCP port to profile: {}", e);
144    }
145
146    let manager_name = cfg
147        .manager_name
148        .clone()
149        .or_else(|| load_manager_name(&cfg.profile).ok().flatten())
150        .unwrap_or_else(|| cfg.profile.clone());
151
152    // Save the manager name to the profile
153    if let Err(e) = crate::keys::save_manager_name(&cfg.profile, &manager_name) {
154        tracing::warn!("Failed to save manager name to profile: {}", e);
155    }
156
157    let self_meta = ManagerPeerEntry {
158        manager_id: manager_id.clone(),
159        manager_name,
160        tenant: "self".into(),
161        cluster: "default".into(),
162        host: cfg.advertise_host.clone(),
163        tcp_port: cfg.tcp_port,
164        quic_port: cfg.quic_port,
165        pub_fp: hex::encode(pub_fp),
166        csk_ver,
167        tls_cert: STANDARD_NO_PAD.encode(&cert_der),
168        tls_fp: fingerprint.clone(),
169        health: None, // Health metrics will be populated later
170    };
171    let peers: AliveTable = Arc::new(Mutex::new(HashMap::new()));
172    let workers: workers::WorkerTable = Arc::new(Mutex::new(HashMap::new()));
173    let peer_version = Arc::new(AtomicU64::new(1));
174    if let Ok(ver) = keys::load_peer_version(&cfg.profile)
175        && ver > 0
176    {
177        peer_version.store(ver, Ordering::SeqCst);
178    }
179    // Use a larger channel to reduce lag under bursty updates; lag is handled gracefully too.
180    let (alive_tx, _) = broadcast::channel(256);
181    tokio::spawn(sweep_dead(
182        peers.clone(),
183        cfg.profile.clone(),
184        alive_tx.clone(),
185        peer_version.clone(),
186    ));
187
188    // Create health context with soft worker cap
189    let health_context = connection::HealthContext::new(crate::health::HealthConfig {
190        max_workers: cfg.max_workers,
191        ..Default::default()
192    });
193
194    let dial_tx = spawn_mesh_runner(
195        &cfg,
196        self_meta.clone(),
197        peers.clone(),
198        peer_version.clone(),
199        alive_tx.clone(),
200        csk_shared.clone(),
201        csk_ver_shared.clone(),
202        workers.clone(),
203        health_context.clone(),
204    );
205
206    // Set up panic handler for better crash debugging
207    setup_panic_handler(&cfg.profile);
208
209    // Log system information for debugging
210    log_system_info(&cfg);
211
212    #[cfg(unix)]
213    spawn_state_signal_handler(
214        peers.clone(),
215        workers.clone(),
216        health_context.clone(),
217        cfg.profile.clone(),
218    );
219
220    // Set up shutdown signal handlers
221    #[cfg(unix)]
222    spawn_shutdown_signal_handlers(cfg.profile.clone());
223
224    // Start periodic health monitoring and health-driven gossip
225    spawn_health_monitor(
226        cfg.profile.clone(),
227        peers.clone(),
228        workers.clone(),
229        health_context.clone(),
230        peer_version.clone(),
231        alive_tx.clone(),
232    );
233
234    // Start control server (for overriding health, etc.) – runtime socket in /tmp to avoid SUN_LEN limits
235    let control_sock =
236        volli_core::util::runtime_socket_path_for_dir(secret_dir, "manager", &cfg.profile);
237    control::maybe_spawn_control_server(
238        control_sock,
239        control::ControlContext {
240            health: health_context.clone(),
241            peers: peers.clone(),
242            workers: workers.clone(),
243            profile: cfg.profile.clone(),
244            self_id: manager_id.clone(),
245            peer_version: peer_version.clone(),
246            alive_tx: alive_tx.clone(),
247        },
248    );
249
250    let active_connections = Arc::new(AtomicUsize::new(0));
251    let worker_nets = parse_nets(&cfg.worker_whitelist);
252    let manager_nets = parse_nets(&cfg.manager_whitelist);
253    let token = if let Some(t) = cfg.token.take() {
254        volli_core::token::decode_token(&t)?
255    } else {
256        volli_core::token::issue_bootstrap_token(
257            &csk,
258            "self",
259            "default",
260            "*",
261            86_400,
262            &cfg.advertise_host,
263            cfg.quic_port,
264            cfg.tcp_port,
265            cert_der.clone(),
266        )?
267    };
268    let secret_encoded = volli_core::token::encode_token(&token)?;
269    info!(
270        "Server listening on {} TCP:{} QUIC:{}",
271        cfg.bind, cfg.tcp_port, cfg.quic_port
272    );
273
274    // Show bootstrap info if needed
275    if show_bootstrap {
276        println!("Worker bootstrap command:\n  volli worker --join {secret_encoded}");
277        println!(
278            "Hint: run 'volli admin manager-token --manager {}' to get a manager join command",
279            cfg.profile
280        );
281    }
282
283    // Save profile data using unified builder pattern
284    let csk_final = *csk_shared.read().await;
285    let ver_final = csk_ver_shared.load(Ordering::SeqCst);
286    update_profile(&cfg.profile)?
287        .signing_key(&signing, persist_keys, sk_path.clone(), pk_path.clone())
288        .certificate(
289            &cert_der,
290            &key_der,
291            persist_cert,
292            cert_path.clone(),
293            key_path.clone(),
294        )
295        .cluster_key(&csk_final, ver_final, persist_csk)
296        .secret_dir(secret_dir.to_path_buf())
297        .save_all()?;
298
299    // Send ready signal to indicate server is ready
300    if let Some(signal) = ready_signal {
301        let _ = signal.send(());
302    }
303
304    // Create command distributor for distributed command execution
305    let command_distributor = Arc::new(CommandDistributor::new(manager_id.clone()));
306
307    let ctx = ManagerContext::new_server(
308        connection::SecurityContext {
309            signing: Some(signing),
310            csk: csk_shared.clone(),
311            csk_ver: csk_ver_shared.clone(),
312        },
313        connection::NetworkContext {
314            worker_nets,
315            manager_nets,
316        },
317        connection::StateContext {
318            peers,
319            workers,
320            self_meta,
321            peer_version,
322            command_distributor: Some(command_distributor),
323        },
324        connection::CommunicationContext {
325            alive_tx,
326            dial_tx,
327            profile: cfg.profile.clone(),
328        },
329        health_context.clone(),
330        manager_id,
331    );
332
333    // Save values before moving cfg
334    let profile_for_logging = cfg.profile.clone();
335    let quic_port_for_logging = cfg.quic_port;
336
337    // Wrap main server loop with error handling
338    let result = accept_loop(
339        ctx,
340        cfg,
341        tcp_listener,
342        tls_acceptor,
343        quic_endpoint,
344        active_connections,
345    )
346    .await;
347
348    // Log any errors that caused the server to exit
349    match &result {
350        Ok(_) => info!(
351            "Manager {} server loop exited normally",
352            profile_for_logging
353        ),
354        Err(e) => {
355            error!("Manager {} server loop crashed: {}", profile_for_logging, e);
356            error!("Error chain: {:?}", e);
357
358            // Try to give some context about what might have caused this
359            if format!("{}", e).contains("Permission denied") {
360                error!(
361                    "Hint: Check if port {} is already in use or requires elevated privileges",
362                    quic_port_for_logging
363                );
364            } else if format!("{}", e).contains("Address already in use") {
365                error!(
366                    "Hint: Another process is already using port {}",
367                    quic_port_for_logging
368                );
369            } else if format!("{}", e).contains("No such file or directory") {
370                error!("Hint: Check if certificates or key files exist and are readable");
371            }
372        }
373    }
374
375    result
376}
377
378/// Set up panic handler to log panics before process death
379fn setup_panic_handler(profile: &str) {
380    let profile = profile.to_string();
381    std::panic::set_hook(Box::new(move |panic_info| {
382        let backtrace = std::backtrace::Backtrace::force_capture();
383
384        if let Some(location) = panic_info.location() {
385            let message = if let Some(msg) = panic_info.payload().downcast_ref::<&str>() {
386                msg.to_string()
387            } else if let Some(msg) = panic_info.payload().downcast_ref::<String>() {
388                msg.clone()
389            } else {
390                "unknown panic".to_string()
391            };
392            error!(
393                "PANIC in manager {}: {} at {}:{}:{}",
394                profile,
395                message,
396                location.file(),
397                location.line(),
398                location.column()
399            );
400        } else {
401            let message = if let Some(msg) = panic_info.payload().downcast_ref::<&str>() {
402                msg.to_string()
403            } else if let Some(msg) = panic_info.payload().downcast_ref::<String>() {
404                msg.clone()
405            } else {
406                "unknown panic".to_string()
407            };
408            error!("PANIC in manager {}: {}", profile, message);
409        }
410
411        error!("Backtrace:\n{}", backtrace);
412
413        // Give logger time to flush
414        std::thread::sleep(std::time::Duration::from_millis(100));
415    }));
416}
417
418/// Log system information for debugging
419fn log_system_info(cfg: &ServerConfigOpts) {
420    info!("Manager configuration:");
421    info!("  Profile: {}", cfg.profile);
422    info!("  Bind: {}:{}+{}", cfg.bind, cfg.tcp_port, cfg.quic_port);
423    info!("  Advertise: {}", cfg.advertise_host);
424
425    // Log system limits that might cause issues
426    #[cfg(unix)]
427    {
428        if let Ok(limit) = std::process::Command::new("ulimit").arg("-n").output()
429            && let Ok(output) = String::from_utf8(limit.stdout)
430        {
431            info!("File descriptor limit: {}", output.trim());
432        }
433    }
434
435    // Log memory info
436    let pid = std::process::id();
437    info!("Process ID: {}", pid);
438}
439
440#[cfg(unix)]
441fn spawn_shutdown_signal_handlers(profile: String) {
442    tokio::spawn(async move {
443        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM handler");
444        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to setup SIGINT handler");
445
446        tokio::select! {
447            _ = sigterm.recv() => {
448                warn!("Received SIGTERM, manager {} shutting down gracefully", profile);
449                std::process::exit(0);
450            }
451            _ = sigint.recv() => {
452                warn!("Received SIGINT, manager {} shutting down gracefully", profile);
453                std::process::exit(0);
454            }
455        }
456    });
457}
458
459/// Spawn a background task for periodic health monitoring and diagnostics
460fn spawn_health_monitor(
461    profile: String,
462    peers: AliveTable,
463    workers: crate::workers::WorkerTable,
464    health: connection::HealthContext,
465    peer_version: Arc<AtomicU64>,
466    alive_tx: broadcast::Sender<u64>,
467) {
468    tokio::spawn(async move {
469        let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
470        let mut last_peer_count = 0;
471        let mut last_worker_count = 0;
472        let mut check_count = 0;
473        let mut last_health_snapshot: Option<volli_core::HealthMetrics> = None;
474
475        loop {
476            interval.tick().await;
477            check_count += 1;
478
479            // Check basic metrics
480            let peer_count = peers
481                .lock()
482                .await
483                .values()
484                .filter(|p| p.conn_state != ConnectionState::Inactive)
485                .count();
486            let workers_list = crate::workers::list_workers(&workers).await;
487            let worker_total = workers_list.len();
488            let worker_active = workers_list
489                .iter()
490                .filter(|w| w.disconnected_at.is_none())
491                .count();
492
493            // Log periodic health summary
494            if check_count % 4 == 0 {
495                // Every 2 minutes
496                info!(
497                    "Manager {} health check #{}: {} peers, {} workers ({} active), uptime: {} checks",
498                    profile, check_count, peer_count, worker_total, worker_active, check_count
499                );
500            }
501
502            // Detect significant changes (suppress periodic gained/lost logs to avoid duplication).
503            // Immediate connect/disconnect paths now emit precise logs; keep counters updated silently.
504            if peer_count != last_peer_count {
505                last_peer_count = peer_count;
506            }
507            if worker_active != last_worker_count {
508                last_worker_count = worker_active;
509            }
510
511            // Check system resources
512            if check_count % 12 == 0 {
513                // Every 6 minutes
514                check_system_resources(&profile).await;
515            }
516
517            // Check for potential deadlocks (peers/workers that haven't changed for too long)
518            if check_count % 20 == 0 {
519                // Every 10 minutes
520                check_for_potential_issues(&profile, &peers, &workers).await;
521            }
522
523            // Health-driven peer version bump to propagate fresh metrics
524            let cur = health.collector.lock().await.collect_metrics().await;
525            let now = cur.last_health_update;
526            // Convert manager::health::HealthMetrics to core::HealthMetrics for comparison
527            let cur_core = volli_core::HealthMetrics {
528                health_score: cur.health_score,
529                load_percentage: cur.load_percentage,
530                max_workers: cur.max_workers,
531                current_workers: cur.current_workers,
532                avg_cpu: cur.avg_cpu,
533                avg_memory: cur.avg_memory,
534                last_health_update: now,
535            };
536
537            let should_bump = match &last_health_snapshot {
538                None => true,
539                Some(prev) => {
540                    let hs_delta = (prev.health_score - cur_core.health_score).abs();
541                    let cpu_delta = match (prev.avg_cpu, cur_core.avg_cpu) {
542                        (Some(a), Some(b)) => (a - b).abs(),
543                        _ => 0.0,
544                    };
545                    let mem_delta = match (prev.avg_memory, cur_core.avg_memory) {
546                        (Some(a), Some(b)) => (a - b).abs(),
547                        _ => 0.0,
548                    };
549                    let workers_changed = prev.current_workers != cur_core.current_workers;
550                    let load_delta = (prev.load_percentage - cur_core.load_percentage).abs();
551
552                    hs_delta >= 0.05
553                        || cpu_delta >= 10.0
554                        || mem_delta >= 10.0
555                        || workers_changed
556                        || load_delta >= 10.0
557                }
558            };
559
560            if should_bump {
561                last_health_snapshot = Some(cur_core);
562                let new_ver = peer_version.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
563                let _ = crate::keys::save_peer_version(&profile, new_ver);
564                let _ = alive_tx.send(new_ver);
565                tracing::debug!(target: "health", profile=%profile, version=new_ver, "bumped peer version due to health change");
566            }
567        }
568    });
569}
570
571/// Check system resources that might cause process death
572async fn check_system_resources(profile: &str) {
573    let pid = std::process::id();
574
575    // Check memory usage
576    #[cfg(target_os = "macos")]
577    {
578        if let Ok(output) = tokio::process::Command::new("ps")
579            .args(["-o", "rss,vsz", "-p", &pid.to_string()])
580            .output()
581            .await
582            && let Ok(result) = String::from_utf8(output.stdout)
583        {
584            let lines: Vec<&str> = result.trim().lines().collect();
585            if lines.len() > 1
586                && let Some(memory_line) = lines.get(1)
587            {
588                let parts: Vec<&str> = memory_line.split_whitespace().collect();
589                if parts.len() >= 2
590                    && let (Ok(rss), Ok(vsz)) = (parts[0].parse::<u64>(), parts[1].parse::<u64>())
591                {
592                    let rss_mb = rss / 1024;
593                    let vsz_mb = vsz / 1024;
594                    info!(
595                        "Manager {} memory: {}MB RSS, {}MB VSZ",
596                        profile, rss_mb, vsz_mb
597                    );
598
599                    // Warn if memory usage is getting high
600                    if rss_mb > 500 {
601                        warn!("Manager {} high memory usage: {}MB RSS", profile, rss_mb);
602                    }
603                }
604            }
605        }
606    }
607
608    // Check file descriptor usage
609    #[cfg(unix)]
610    {
611        if let Ok(output) = tokio::process::Command::new("lsof")
612            .args(["-p", &pid.to_string()])
613            .output()
614            .await
615        {
616            let fd_count = String::from_utf8_lossy(&output.stdout)
617                .lines()
618                .count()
619                .saturating_sub(1);
620            if fd_count > 100 {
621                info!("Manager {} file descriptors: {}", profile, fd_count);
622            }
623            if fd_count > 800 {
624                warn!(
625                    "Manager {} high file descriptor usage: {}",
626                    profile, fd_count
627                );
628            }
629        }
630    }
631}
632
633/// Check for potential issues like stuck connections or deadlocks
634async fn check_for_potential_issues(
635    profile: &str,
636    peers: &AliveTable,
637    workers: &crate::workers::WorkerTable,
638) {
639    // Check if peer map is accessible (not deadlocked)
640    match tokio::time::timeout(std::time::Duration::from_secs(5), peers.lock()).await {
641        Ok(_) => {
642            // Successfully acquired lock
643        }
644        Err(_) => {
645            error!(
646                "🔒 Manager {} potential deadlock: peer table lock timeout",
647                profile
648            );
649        }
650    }
651
652    // Check if worker map is accessible
653    match tokio::time::timeout(std::time::Duration::from_secs(5), workers.lock()).await {
654        Ok(_) => {
655            // Successfully acquired lock
656        }
657        Err(_) => {
658            error!(
659                "🔒 Manager {} potential deadlock: worker table lock timeout",
660                profile
661            );
662        }
663    }
664
665    // Check if we're still responsive
666    let test_start = std::time::Instant::now();
667    tokio::task::yield_now().await;
668    let yield_time = test_start.elapsed();
669
670    if yield_time > std::time::Duration::from_millis(100) {
671        warn!(
672            "🐌 Manager {} slow task scheduling: {:?}",
673            profile, yield_time
674        );
675    }
676}
677
678#[cfg(unix)]
679fn spawn_state_signal_handler(
680    peers: AliveTable,
681    workers: crate::workers::WorkerTable,
682    health: connection::HealthContext,
683    profile: String,
684) {
685    tokio::spawn(async move {
686        if let Ok(mut sig) = signal(SignalKind::user_defined1()) {
687            while sig.recv().await.is_some() {
688                info!(
689                    "📡 Received SIGUSR1 signal, printing state for manager: {}",
690                    profile
691                );
692                print_peer_states(&peers, &workers, &health, &profile).await;
693            }
694        } else {
695            error!(
696                "Failed to setup SIGUSR1 signal handler for manager: {}",
697                profile
698            );
699        }
700    });
701}
702
703#[cfg(unix)]
704async fn print_peer_states(
705    peers: &AliveTable,
706    workers: &crate::workers::WorkerTable,
707    health: &connection::HealthContext,
708    profile: &str,
709) {
710    let map = peers.lock().await;
711
712    // Get current health metrics for this manager
713    let current_health = if let Ok(mut collector) = health.collector.try_lock() {
714        Some(collector.collect_metrics().await)
715    } else {
716        None
717    };
718
719    // Count connected workers
720    let worker_count = workers.lock().await.len();
721
722    // Load full peer information to get manager names and health data
723    let peer_entries = load_peers_for_gossip(profile).unwrap_or_default();
724    tracing::debug!(target: "peer_states", profile=%profile, peer_count=%peer_entries.len(), "loaded peer entries for status display");
725    let manager_data: std::collections::HashMap<
726        String,
727        (String, Option<volli_core::HealthMetrics>),
728    > = peer_entries
729        .iter()
730        .map(|p| {
731            (
732                p.manager_id.clone(),
733                (p.manager_name.clone(), p.health.clone()),
734            )
735        })
736        .collect();
737    tracing::debug!(target: "peer_states", profile=%profile, name_count=%manager_data.len(), "built manager data map");
738
739    let mut parts = Vec::new();
740    for (full_id, state) in map.iter() {
741        let manager_id = match full_id.split(':').collect::<Vec<_>>() {
742            segments if segments.len() == 3 => segments[2],
743            _ => full_id,
744        };
745
746        let (name_disp, peer_health) = manager_data
747            .get(manager_id)
748            .cloned()
749            .unwrap_or_else(|| {
750                tracing::debug!(target: "peer_states", manager_id=%manager_id, "no data found, using short_id");
751                (short_id(manager_id).to_string(), None)
752            });
753
754        let (sym, color) = match state.conn_state {
755            ConnectionState::Client => ("⇢", Color::Green),
756            ConnectionState::Server => ("⇠", Color::Green),
757            ConnectionState::Inactive => ("×", Color::Red),
758        };
759
760        // Add health info if available
761        let health_info = if let Some(health) = peer_health {
762            format!(
763                " (workers:{} health:{:.0}% cpu:{:.0}% mem:{:.0}%)",
764                health.current_workers,
765                health.health_score * 100.0,
766                health.avg_cpu.unwrap_or(0.0),
767                health.avg_memory.unwrap_or(0.0)
768            )
769        } else {
770            String::new()
771        };
772
773        parts.push((
774            full_id.clone(),
775            format!("{}{}{}", color.paint(sym), name_disp, health_info),
776        ));
777    }
778    parts.sort_by(|a, b| a.0.cmp(&b.0));
779    let sorted_parts: Vec<String> = parts.into_iter().map(|(_, part)| part).collect();
780
781    // Print self info first
782    if let Some(health) = current_health {
783        println!(
784            "[{}] workers:{} health:{:.0}% cpu:{:.0}% mem:{:.0}% load:{:.0}%",
785            profile,
786            worker_count,
787            health.health_score * 100.0,
788            health.avg_cpu.unwrap_or(0.0),
789            health.avg_memory.unwrap_or(0.0),
790            health.load_percentage
791        );
792    } else {
793        println!(
794            "[{}] workers:{} (health data unavailable)",
795            profile, worker_count
796        );
797    }
798
799    // Print peer connections
800    if sorted_parts.is_empty() {
801        println!("peers: none");
802    } else {
803        println!("peers: {}", sorted_parts.join(" "));
804    }
805}