Skip to main content

bee_tui/
app.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5use color_eyre::eyre::eyre;
6use crossterm::event::KeyEvent;
7use ratatui::prelude::Rect;
8use serde::{Deserialize, Serialize};
9use tokio::sync::{mpsc, watch};
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, info};
12
13use crate::{
14    action::Action,
15    api::ApiClient,
16    bee_supervisor::{BeeStatus, BeeSupervisor},
17    components::{
18        Component,
19        api_health::ApiHealth,
20        feed_timeline::FeedTimeline,
21        health::{Gate, GateStatus, Health},
22        log_pane::{BeeLogLine, LogPane, LogTab},
23        lottery::Lottery,
24        manifest::Manifest,
25        network::Network,
26        peers::Peers,
27        pins::Pins,
28        pubsub::Pubsub,
29        stamps::Stamps,
30        swap::Swap,
31        tags::Tags,
32        warmup::Warmup,
33        watchlist::Watchlist,
34    },
35    config::Config,
36    config_doctor, durability, economics_oracle, log_capture,
37    manifest_walker::{self, InspectResult},
38    pprof_bundle, stamp_preview,
39    state::State,
40    theme,
41    tui::{Event, Tui},
42    utility_verbs, version_check,
43    watch::{BeeWatch, HealthSnapshot, RefreshProfile},
44};
45
46pub struct App {
47    config: Config,
48    tick_rate: f64,
49    frame_rate: f64,
50    /// Top-level screens, in display order. Tab cycles among them.
51    /// v0.4 also wires the k9s-style `:command` switcher so users
52    /// can jump directly with `:peers`, `:stamps`, etc.
53    screens: Vec<Box<dyn Component>>,
54    /// Index into [`Self::screens`] for the currently visible screen.
55    current_screen: usize,
56    /// Always-on bottom strip; not part of `screens` because it
57    /// renders alongside whatever screen is active. Tabbed across
58    /// Errors/Warn/Info/Debug/BeeHttp/SelfHttp.
59    log_pane: LogPane,
60    /// Where the persisted UI state (tab + height) lives on disk.
61    /// Computed once at startup; rewritten on quit.
62    state_path: PathBuf,
63    should_quit: bool,
64    should_suspend: bool,
65    mode: Mode,
66    last_tick_key_events: Vec<KeyEvent>,
67    action_tx: mpsc::UnboundedSender<Action>,
68    action_rx: mpsc::UnboundedReceiver<Action>,
69    /// Root cancellation token. Children: BeeWatch hub → per-resource
70    /// pollers. Cancelling this on quit unwinds every spawned task.
71    root_cancel: CancellationToken,
72    /// Active Bee node connection; cheap to clone (`Arc<Inner>` under
73    /// the hood). Read by future header bar + multi-node switcher.
74    #[allow(dead_code)]
75    api: Arc<ApiClient>,
76    /// Watch / informer hub feeding screens.
77    watch: BeeWatch,
78    /// Top-bar reuses the health snapshot for the live ping
79    /// indicator. Cheap clone of the watch receiver.
80    health_rx: watch::Receiver<HealthSnapshot>,
81    /// `Some(buf)` while the user is typing a `:command`. The
82    /// buffer holds the characters typed *after* the leading colon.
83    command_buffer: Option<String>,
84    /// Index into the *filtered* command-suggestion list of the row
85    /// currently highlighted by the Up/Down keys. Reset to 0 on every
86    /// buffer mutation so a fresh prefix always starts at the top
87    /// match.
88    command_suggestion_index: usize,
89    /// Status / error from the most recent `:command`, persisted on
90    /// the command-bar line until the user enters command mode again.
91    /// Cleared when `command_buffer` transitions to `Some`.
92    command_status: Option<CommandStatus>,
93    /// `true` while the `?` help overlay is up. Renders on top of
94    /// the active screen; `?` toggles, `Esc` dismisses.
95    help_visible: bool,
96    /// Tracks the moment the operator pressed `q` once. A second
97    /// `q` within [`QUIT_CONFIRM_WINDOW`] commits the quit; otherwise
98    /// it expires and the cockpit keeps running. Prevents a single
99    /// stray keystroke from killing a session the operator is
100    /// actively monitoring.
101    quit_pending: Option<Instant>,
102    /// `Some` when the `[bee]` block (or `--bee-bin` / `--bee-config`)
103    /// is configured and we're acting as Bee's parent process. `None`
104    /// for the legacy "connect to a running Bee" flow.
105    supervisor: Option<BeeSupervisor>,
106    /// Last-observed status of the supervised Bee child. Refreshed
107    /// each Tick from `supervisor.status()`. Surfaced in the top bar
108    /// so a mid-session crash is visible to the operator (variant B
109    /// of the crash-handling spec — show, don't auto-restart).
110    bee_status: BeeStatus,
111    /// Receiver paired with the bee-log tailer task. `None` when
112    /// the cockpit isn't acting as the supervisor (no log file to
113    /// tail). Drained on each Tick into the LogPane.
114    bee_log_rx: Option<mpsc::UnboundedReceiver<(LogTab, BeeLogLine)>>,
115    /// Channel for async-completing `:command` results. Verbs that
116    /// can't return their answer synchronously (e.g. `:probe-upload`
117    /// which has to wait on an HTTP round-trip) hand a clone of the
118    /// sender to a tokio task and surface the outcome on completion;
119    /// the App drains this on every Tick into `command_status`.
120    cmd_status_tx: mpsc::UnboundedSender<CommandStatus>,
121    cmd_status_rx: mpsc::UnboundedReceiver<CommandStatus>,
122    /// Async-result channel for durability-check completions. Each
123    /// result is forwarded to the S13 Watchlist screen on the next
124    /// Tick. Sibling to `cmd_status_tx` rather than overloading it
125    /// because the Watchlist row carries structured data, not a
126    /// formatted `CommandStatus` string.
127    durability_tx: mpsc::UnboundedSender<crate::durability::DurabilityResult>,
128    durability_rx: mpsc::UnboundedReceiver<crate::durability::DurabilityResult>,
129    /// Async-result channel for `:feed-timeline` walks. Each
130    /// completed walk arrives as a `FeedTimelineMessage` and is
131    /// forwarded to the S14 screen on the next Tick.
132    feed_timeline_tx: mpsc::UnboundedSender<FeedTimelineMessage>,
133    feed_timeline_rx: mpsc::UnboundedReceiver<FeedTimelineMessage>,
134    /// Active `:watch-ref` daemon loops keyed by reference hex. Each
135    /// entry owns a `CancellationToken` whose `cancel()` stops the
136    /// daemon's tokio task on the next iteration boundary.
137    watch_refs: std::collections::HashMap<String, CancellationToken>,
138    /// Active pubsub subscriptions (PSS / GSOC) keyed by sub-id.
139    /// Each entry's `CancellationToken` stops both the websocket
140    /// recv loop and the forwarding task that pushes messages onto
141    /// `pubsub_msg_tx`.
142    pubsub_subs: std::collections::HashMap<String, CancellationToken>,
143    /// Optional shared history-file writer. `Some(...)` when
144    /// `[pubsub].history_file` is configured; cloned into each
145    /// watcher so JSONL appends serialise across subscriptions.
146    pubsub_history: crate::pubsub::HistoryWriter,
147    /// Async-message channel feeding the S15 Pubsub screen with
148    /// every PSS / GSOC frame the active subscriptions deliver.
149    pubsub_msg_tx: mpsc::UnboundedSender<crate::pubsub::PubsubMessage>,
150    pubsub_msg_rx: mpsc::UnboundedReceiver<crate::pubsub::PubsubMessage>,
151    /// Per-gate transition tracker for the optional webhook alerter.
152    /// On every Tick we feed it the latest gates; it returns the
153    /// transitions worth pinging on (debounced per-gate). When
154    /// `[alerts].webhook_url` is unset, [`Self::tick_alerts`] short-
155    /// circuits before touching this state so the cost is one
156    /// `Option::is_none` check per Tick.
157    alert_state: crate::alerts::AlertState,
158}
159
160/// Window during which a second `q` press is interpreted as confirming
161/// the quit. After this elapses the first press is forgotten.
162const QUIT_CONFIRM_WINDOW: Duration = Duration::from_millis(1500);
163
164/// Outcome from the most recently executed `:command`. Drives the
165/// colour of the command-bar line in normal mode.
166#[derive(Debug, Clone)]
167pub enum CommandStatus {
168    Info(String),
169    Err(String),
170}
171
172/// Result variants that flow from `:feed-timeline`'s background
173/// walk into the S14 screen. Drained by the Tick handler the same
174/// way `cmd_status_rx` and `durability_rx` are.
175#[derive(Debug, Clone)]
176pub enum FeedTimelineMessage {
177    Loaded(crate::feed_timeline::Timeline),
178    Failed(String),
179}
180
181/// Names the top-level screens. Index matches position in
182/// [`App::screens`].
183const SCREEN_NAMES: &[&str] = &[
184    "Health",
185    "Stamps",
186    "Swap",
187    "Lottery",
188    "Peers",
189    "Network",
190    "Warmup",
191    "API",
192    "Tags",
193    "Pins",
194    "Manifest",
195    "Watchlist",
196    "FeedTimeline",
197    "Pubsub",
198];
199
200/// Catalog of every `:command` verb with a short description. Drives
201/// the suggestion popup that surfaces matches as the operator types
202/// (so they don't have to memorize the whole list). Aliases stay
203/// implicit — they still work when typed but only the primary name
204/// shows up in the popup, to keep the list tidy.
205///
206/// Order matters: this is the order operators see, so screen jumps
207/// come first (most-used), action verbs in approximate frequency
208/// order, the four economics previews + buy-suggest grouped together,
209/// utility verbs last.
210const KNOWN_COMMANDS: &[(&str, &str)] = &[
211    ("health", "S1 Health screen"),
212    ("stamps", "S2 Stamps screen"),
213    ("swap", "S3 SWAP / cheques screen"),
214    ("lottery", "S4 Lottery + rchash"),
215    ("peers", "S6 Peers + bin saturation"),
216    ("network", "S7 Network / NAT"),
217    ("warmup", "S5 Warmup checklist"),
218    ("api", "S8 RPC / API health"),
219    ("tags", "S9 Tags / uploads"),
220    ("pins", "S11 Pins screen"),
221    ("topup-preview", "<batch> <amount-plur> — predict topup"),
222    ("dilute-preview", "<batch> <new-depth> — predict dilute"),
223    ("extend-preview", "<batch> <duration> — predict extend"),
224    ("buy-preview", "<depth> <amount-plur> — predict fresh buy"),
225    ("buy-suggest", "<size> <duration> — minimum (depth, amount)"),
226    (
227        "plan-batch",
228        "<batch> [usage-thr] [ttl-thr] [extra-depth] — unified topup+dilute plan",
229    ),
230    (
231        "check-version",
232        "compare running Bee version with GitHub's latest release",
233    ),
234    (
235        "config-doctor",
236        "audit bee.yaml for deprecated keys (read-only, never modifies)",
237    ),
238    ("price", "fetch xBZZ → USD spot price"),
239    (
240        "basefee",
241        "fetch Gnosis basefee + tip (requires [economics].gnosis_rpc_url)",
242    ),
243    (
244        "probe-upload",
245        "<batch> — single 4 KiB chunk, end-to-end probe",
246    ),
247    (
248        "upload-file",
249        "<path> <batch> — upload a single local file, return Swarm ref",
250    ),
251    (
252        "upload-collection",
253        "<dir> <batch> — recursive directory upload, return Swarm ref",
254    ),
255    (
256        "feed-probe",
257        "<owner> <topic> — latest update for a feed (read-only lookup)",
258    ),
259    (
260        "feed-timeline",
261        "<owner> <topic> [N] — walk a feed's history, open S14",
262    ),
263    (
264        "manifest",
265        "<ref> — open Mantaray tree browser at a reference",
266    ),
267    (
268        "inspect",
269        "<ref> — what is this? auto-detects manifest vs raw chunk",
270    ),
271    (
272        "durability-check",
273        "<ref> — walk chunk graph, report total / lost / errors",
274    ),
275    (
276        "grantees-list",
277        "<ref> — list ACT grantees on a reference (read-only)",
278    ),
279    (
280        "watch-ref",
281        "<ref> [interval] — run :durability-check every interval (default 60s)",
282    ),
283    (
284        "watch-ref-stop",
285        "[ref] — stop one :watch-ref daemon (or all if no arg)",
286    ),
287    (
288        "pubsub-pss",
289        "<topic> — subscribe to PSS messages on a topic, surface in S15",
290    ),
291    (
292        "pubsub-gsoc",
293        "<owner> <identifier> — subscribe to a GSOC SOC, surface in S15",
294    ),
295    (
296        "pubsub-stop",
297        "[sub-id] — stop one pubsub subscription (or all if no arg)",
298    ),
299    (
300        "pubsub-filter",
301        "<substring> — show only messages whose channel/preview contains substring",
302    ),
303    (
304        "pubsub-filter-clear",
305        "remove the active S15 substring filter",
306    ),
307    (
308        "pubsub-replay",
309        "<path> — load a pubsub history JSONL into the S15 timeline",
310    ),
311    ("watchlist", "S13 Watchlist — durability-check history"),
312    (
313        "hash",
314        "<path> — Swarm reference of a local file/dir (offline)",
315    ),
316    ("cid", "<ref> [manifest|feed] — encode reference as CID"),
317    ("depth-table", "Print canonical depth → capacity table"),
318    (
319        "gsoc-mine",
320        "<overlay> <id> — mine a GSOC signer (CPU work)",
321    ),
322    (
323        "pss-target",
324        "<overlay> — first 4 hex chars (Bee's max prefix)",
325    ),
326    (
327        "diagnose",
328        "[--pprof[=N]] Export snapshot (+ optional CPU profile + trace)",
329    ),
330    ("pins-check", "Bulk integrity walk to a file"),
331    ("loggers", "Dump live logger registry"),
332    ("set-logger", "<expr> <level> — change a logger's verbosity"),
333    ("context", "<name> — switch node profile"),
334    ("quit", "Exit the cockpit"),
335];
336
337/// Pull the `--pprof[=N]` flag value out of a `:diagnose ...`
338/// command line. Returns `Some(seconds)` when the flag is present
339/// (defaulting to 60 when no `=N` is supplied), `None` when the
340/// operator just typed `:diagnose`. Pure for testability.
341fn parse_pprof_arg(line: &str) -> Option<u32> {
342    for tok in line.split_whitespace() {
343        if tok == "--pprof" {
344            return Some(60);
345        }
346        if let Some(rest) = tok.strip_prefix("--pprof=") {
347            if let Ok(n) = rest.parse::<u32>() {
348                return Some(n.clamp(1, 600));
349            }
350        }
351    }
352    None
353}
354
355/// Produce the filtered list of (name, description) pairs that match
356/// the buffer's first whitespace token (case-insensitive prefix). An
357/// empty buffer matches everything. Pure for testability.
358fn filter_command_suggestions<'a>(
359    buffer: &str,
360    catalog: &'a [(&'a str, &'a str)],
361) -> Vec<&'a (&'a str, &'a str)> {
362    let head = buffer
363        .split_whitespace()
364        .next()
365        .unwrap_or("")
366        .to_ascii_lowercase();
367    catalog
368        .iter()
369        .filter(|(name, _)| name.starts_with(&head))
370        .collect()
371}
372
373#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
374pub enum Mode {
375    #[default]
376    Home,
377}
378
379/// Configuration knobs the binary passes into [`App::with_overrides`].
380/// Bundled in a struct so future flags don't churn the call site.
381#[derive(Debug, Default)]
382pub struct AppOverrides {
383    /// Force ASCII glyphs.
384    pub ascii: bool,
385    /// Force the mono palette.
386    pub no_color: bool,
387    /// `--bee-bin` CLI override.
388    pub bee_bin: Option<PathBuf>,
389    /// `--bee-config` CLI override.
390    pub bee_config: Option<PathBuf>,
391}
392
393/// Default timeout for waiting on `/health` after spawning Bee.
394/// Bee's first start can include chain-state catch-up; a generous
395/// budget here saves the operator from one false "didn't come up"
396/// alarm. Override later via config if needed.
397const BEE_API_READY_TIMEOUT: Duration = Duration::from_secs(60);
398
399impl App {
400    pub async fn new(tick_rate: f64, frame_rate: f64) -> color_eyre::Result<Self> {
401        Self::with_overrides(tick_rate, frame_rate, AppOverrides::default()).await
402    }
403
404    /// Build an App with explicit `--ascii` / `--no-color` /
405    /// `--bee-bin` / `--bee-config` overrides. Async because, when
406    /// the bee paths are set, we spawn Bee and wait for its `/health`
407    /// before opening the TUI.
408    pub async fn with_overrides(
409        tick_rate: f64,
410        frame_rate: f64,
411        overrides: AppOverrides,
412    ) -> color_eyre::Result<Self> {
413        let (action_tx, action_rx) = mpsc::unbounded_channel();
414        let (cmd_status_tx, cmd_status_rx) = mpsc::unbounded_channel();
415        let (durability_tx, durability_rx) = mpsc::unbounded_channel();
416        let (feed_timeline_tx, feed_timeline_rx) = mpsc::unbounded_channel();
417        let (pubsub_msg_tx, pubsub_msg_rx) = mpsc::unbounded_channel();
418        let config = Config::new()?;
419
420        // Optional pubsub history-file writer. Failures here aren't
421        // fatal — the live tail keeps working without persistence —
422        // so we log a warning and keep going.
423        let pubsub_history = match config.pubsub.history_file.as_deref() {
424            Some(path) => {
425                let rotate_bytes = config.pubsub.rotate_size_mb.saturating_mul(1024 * 1024);
426                let keep = config.pubsub.keep_files;
427                match crate::pubsub::open_history_writer(path, rotate_bytes, keep).await {
428                    Ok(w) => w,
429                    Err(e) => {
430                        tracing::warn!(target: "bee_tui::pubsub", "history file disabled: {e}");
431                        None
432                    }
433                }
434            }
435            None => None,
436        };
437        // Install the theme first so any tracing emitted during the
438        // rest of `new` already reflects the operator's choice.
439        let force_no_color = overrides.no_color || theme::no_color_env();
440        theme::install_with_overrides(&config.ui, force_no_color, overrides.ascii);
441
442        // Pick the active node profile (and its URL) before spawning
443        // Bee — the supervisor's /health probe needs the URL.
444        let node = config
445            .active_node()
446            .ok_or_else(|| eyre!("no Bee node configured (config.nodes is empty)"))?;
447        let api = Arc::new(ApiClient::from_node(node)?);
448
449        // Resolve the bee paths: CLI flags > [bee] config block > unset.
450        let bee_bin = overrides
451            .bee_bin
452            .or_else(|| config.bee.as_ref().map(|b| b.bin.clone()));
453        let bee_config = overrides
454            .bee_config
455            .or_else(|| config.bee.as_ref().map(|b| b.config.clone()));
456        // [bee.logs] sub-config; defaults if [bee] is set but
457        // [bee.logs] isn't.
458        let bee_logs = config
459            .bee
460            .as_ref()
461            .map(|b| b.logs.clone())
462            .unwrap_or_default();
463        let supervisor = match (bee_bin, bee_config) {
464            (Some(bin), Some(cfg)) => {
465                eprintln!("bee-tui: spawning bee {bin:?} --config {cfg:?}");
466                let mut sup = BeeSupervisor::spawn(&bin, &cfg, bee_logs)?;
467                eprintln!(
468                    "bee-tui: log → {} (will appear in the cockpit's bottom pane)",
469                    sup.log_path().display()
470                );
471                eprintln!(
472                    "bee-tui: waiting for {} to respond on /health (up to {:?})...",
473                    api.url, BEE_API_READY_TIMEOUT
474                );
475                sup.wait_for_api(&api.url, BEE_API_READY_TIMEOUT).await?;
476                eprintln!("bee-tui: bee ready, opening cockpit");
477                Some(sup)
478            }
479            (Some(_), None) | (None, Some(_)) => {
480                return Err(eyre!(
481                    "[bee].bin and [bee].config must both be set (or both unset). \
482                     Use --bee-bin AND --bee-config, or both fields in config.toml."
483                ));
484            }
485            (None, None) => None,
486        };
487
488        // Spawn the watch / informer hub. Pollers attach to children
489        // of `root_cancel`, so quitting cancels everything in one go.
490        // The cadence preset comes from `[ui].refresh` — operators
491        // who want the original 2 s health stream can opt into
492        // `"live"`; the default is "calmer" (4 s health, 10 s
493        // topology).
494        let refresh = RefreshProfile::from_config(&config.ui.refresh);
495        let root_cancel = CancellationToken::new();
496        let watch = BeeWatch::start_with_profile(api.clone(), &root_cancel, refresh);
497        let health_rx = watch.health();
498
499        // Cost-context poller — opt-in via `[economics].enable_market_tile`.
500        // When off, no outbound traffic and S3 SWAP renders identically
501        // to v1.3 (no Market tile slot).
502        let market_rx = if config.economics.enable_market_tile {
503            Some(economics_oracle::spawn_poller(
504                config.economics.gnosis_rpc_url.clone(),
505                root_cancel.child_token(),
506            ))
507        } else {
508            None
509        };
510
511        let screens = build_screens(&api, &watch, market_rx);
512        // Bottom log pane subscribes to the bee::http capture set up
513        // by logging::init for its `bee::http` tab. The four severity
514        // tabs + "Bee HTTP" tab populate from the supervisor's log
515        // tail (increment 3+); for now they show placeholders.
516        let (persisted, state_path) = State::load();
517        let initial_tab = LogTab::from_kebab(&persisted.log_pane_active_tab);
518        let mut log_pane = LogPane::new(
519            log_capture::handle(),
520            initial_tab,
521            persisted.log_pane_height,
522        );
523        log_pane.set_spawn_active(supervisor.is_some());
524        if let Some(c) = log_capture::cockpit_handle() {
525            log_pane.set_cockpit_capture(c);
526        }
527
528        // Spawn the bee-log tailer if we own the supervisor. The
529        // tailer parses each new line of the captured Bee log and
530        // forwards `(LogTab, BeeLogLine)` pairs down an mpsc the
531        // App drains every Tick. Inherits root_cancel so quit
532        // unwinds it the same way as every other spawned task.
533        let bee_log_rx = supervisor.as_ref().map(|sup| {
534            let (tx, rx) = mpsc::unbounded_channel();
535            crate::bee_log_tailer::spawn(
536                sup.log_path().to_path_buf(),
537                tx,
538                root_cancel.child_token(),
539            );
540            rx
541        });
542
543        // Optional Prometheus `/metrics` endpoint. Off by default;
544        // when `[metrics].enabled = true` we spawn the server under
545        // `root_cancel` so it dies with the cockpit. Failures here
546        // are non-fatal — surface a tracing error and keep going,
547        // since a port-conflict shouldn't block the operator from
548        // using the cockpit itself.
549        if config.metrics.enabled {
550            match config.metrics.addr.parse::<std::net::SocketAddr>() {
551                Ok(bind_addr) => {
552                    let render_fn = build_metrics_render_fn(watch.clone(), log_capture::handle());
553                    let cancel = root_cancel.child_token();
554                    match crate::metrics_server::spawn(bind_addr, render_fn, cancel).await {
555                        Ok(actual) => {
556                            eprintln!(
557                                "bee-tui: metrics endpoint serving /metrics on http://{actual}"
558                            );
559                        }
560                        Err(e) => {
561                            tracing::error!(
562                                "metrics: failed to start endpoint on {bind_addr}: {e}"
563                            );
564                        }
565                    }
566                }
567                Err(e) => {
568                    tracing::error!(
569                        "metrics: invalid [metrics].addr {:?}: {e}",
570                        config.metrics.addr
571                    );
572                }
573            }
574        }
575
576        let config_alerts_debounce = config.alerts.debounce_secs;
577
578        Ok(Self {
579            tick_rate,
580            frame_rate,
581            screens,
582            current_screen: 0,
583            log_pane,
584            state_path,
585            should_quit: false,
586            should_suspend: false,
587            config,
588            mode: Mode::Home,
589            last_tick_key_events: Vec::new(),
590            action_tx,
591            action_rx,
592            root_cancel,
593            api,
594            watch,
595            health_rx,
596            command_buffer: None,
597            command_suggestion_index: 0,
598            command_status: None,
599            help_visible: false,
600            quit_pending: None,
601            supervisor,
602            bee_status: BeeStatus::Running,
603            bee_log_rx,
604            cmd_status_tx,
605            cmd_status_rx,
606            durability_tx,
607            durability_rx,
608            feed_timeline_tx,
609            feed_timeline_rx,
610            watch_refs: std::collections::HashMap::new(),
611            pubsub_subs: std::collections::HashMap::new(),
612            pubsub_history,
613            pubsub_msg_tx,
614            pubsub_msg_rx,
615            alert_state: crate::alerts::AlertState::new(config_alerts_debounce),
616        })
617    }
618
619    pub async fn run(&mut self) -> color_eyre::Result<()> {
620        let mut tui = Tui::new()?
621            // .mouse(true) // uncomment this line to enable mouse support
622            .tick_rate(self.tick_rate)
623            .frame_rate(self.frame_rate);
624        tui.enter()?;
625
626        let tx = self.action_tx.clone();
627        let cfg = self.config.clone();
628        let size = tui.size()?;
629        for component in self.iter_components_mut() {
630            component.register_action_handler(tx.clone())?;
631            component.register_config_handler(cfg.clone())?;
632            component.init(size)?;
633        }
634
635        let action_tx = self.action_tx.clone();
636        loop {
637            self.handle_events(&mut tui).await?;
638            self.handle_actions(&mut tui)?;
639            if self.should_suspend {
640                tui.suspend()?;
641                action_tx.send(Action::Resume)?;
642                action_tx.send(Action::ClearScreen)?;
643                // tui.mouse(true);
644                tui.enter()?;
645            } else if self.should_quit {
646                tui.stop()?;
647                break;
648            }
649        }
650        // Unwind every spawned task before tearing down the terminal.
651        self.watch.shutdown();
652        self.root_cancel.cancel();
653        // Persist UI state (last tab + height) so the next launch
654        // restores the operator's preference. Best-effort — failures
655        // log a warning but never block quit.
656        let snapshot = State {
657            log_pane_height: self.log_pane.height(),
658            log_pane_active_tab: self.log_pane.active_tab().to_kebab().to_string(),
659        };
660        snapshot.save(&self.state_path);
661        // SIGTERM Bee (pgroup) and wait for clean exit. Done before
662        // tui.exit() so any "bee shutting down" messages still land
663        // in the supervisor's log file (no race with terminal teardown).
664        if let Some(sup) = self.supervisor.take() {
665            let final_status = sup.shutdown_default().await;
666            tracing::info!("bee child exited: {}", final_status.label());
667        }
668        tui.exit()?;
669        Ok(())
670    }
671
672    async fn handle_events(&mut self, tui: &mut Tui) -> color_eyre::Result<()> {
673        let Some(event) = tui.next_event().await else {
674            return Ok(());
675        };
676        let action_tx = self.action_tx.clone();
677        // Sample modal state both before and after handling: a key
678        // that *opens* a modal (`?` → help) only flips state inside
679        // handle, but the same key shouldn't propagate to screens;
680        // a key that *closes* one (Esc on help) flips it the other
681        // way but also shouldn't propagate. Either side of the
682        // transition counts as "modal" for swallowing purposes.
683        let modal_before = self.command_buffer.is_some() || self.help_visible;
684        match event {
685            Event::Quit => action_tx.send(Action::Quit)?,
686            Event::Tick => action_tx.send(Action::Tick)?,
687            Event::Render => action_tx.send(Action::Render)?,
688            Event::Resize(x, y) => action_tx.send(Action::Resize(x, y))?,
689            Event::Key(key) => self.handle_key_event(key)?,
690            _ => {}
691        }
692        let modal_after = self.command_buffer.is_some() || self.help_visible;
693        // Non-key events (Tick / Resize / Render) always propagate
694        // so screens keep refreshing under modals.
695        let propagate = !((modal_before || modal_after) && matches!(event, Event::Key(_)));
696        if propagate {
697            for component in self.iter_components_mut() {
698                if let Some(action) = component.handle_events(Some(event.clone()))? {
699                    action_tx.send(action)?;
700                }
701            }
702        }
703        Ok(())
704    }
705
706    /// Iterate every component (screens + log pane) for uniform
707    /// lifecycle ticks. Returns trait objects so the heterogeneous
708    /// `LogPane` (a concrete type for direct method access in the
709    /// app layer) walks alongside the boxed screens.
710    fn iter_components_mut(&mut self) -> impl Iterator<Item = &mut dyn Component> {
711        self.screens
712            .iter_mut()
713            .map(|c| c.as_mut() as &mut dyn Component)
714            .chain(std::iter::once(&mut self.log_pane as &mut dyn Component))
715    }
716
717    fn handle_key_event(&mut self, key: KeyEvent) -> color_eyre::Result<()> {
718        // While a `:command` is being typed every key edits the
719        // buffer or commits / cancels the line. No other keymap
720        // applies.
721        if self.command_buffer.is_some() {
722            self.handle_command_mode_key(key)?;
723            return Ok(());
724        }
725        // While the `?` help overlay is up, only Esc / ? / q close
726        // it. Don't propagate to components or process other keys
727        // — the operator is reading reference, not driving.
728        if self.help_visible {
729            match key.code {
730                crossterm::event::KeyCode::Esc
731                | crossterm::event::KeyCode::Char('?')
732                | crossterm::event::KeyCode::Char('q') => {
733                    self.help_visible = false;
734                }
735                _ => {}
736            }
737            return Ok(());
738        }
739        // `?` opens the help overlay. We capture it at the app level
740        // so every screen gets the overlay for free without each one
741        // having to wire its own.
742        if matches!(key.code, crossterm::event::KeyCode::Char('?')) {
743            self.help_visible = true;
744            return Ok(());
745        }
746        let action_tx = self.action_tx.clone();
747        // ':' opens the command bar.
748        if matches!(key.code, crossterm::event::KeyCode::Char(':')) {
749            self.command_buffer = Some(String::new());
750            self.command_status = None;
751            return Ok(());
752        }
753        // Tab / Shift+Tab keep working as a quick screen-cycle
754        // shortcut even after the `:command` bar lands. crossterm
755        // surfaces Shift+Tab as `BackTab` (a separate KeyCode rather
756        // than Tab + the Shift modifier), so both branches are needed.
757        if matches!(key.code, crossterm::event::KeyCode::Tab) {
758            if !self.screens.is_empty() {
759                self.current_screen = (self.current_screen + 1) % self.screens.len();
760                debug!(
761                    "switched to screen {}",
762                    SCREEN_NAMES.get(self.current_screen).unwrap_or(&"?")
763                );
764            }
765            return Ok(());
766        }
767        if matches!(key.code, crossterm::event::KeyCode::BackTab) {
768            if !self.screens.is_empty() {
769                let len = self.screens.len();
770                self.current_screen = (self.current_screen + len - 1) % len;
771                debug!(
772                    "switched to screen {}",
773                    SCREEN_NAMES.get(self.current_screen).unwrap_or(&"?")
774                );
775            }
776            return Ok(());
777        }
778        // Log-pane controls. `[` / `]` cycle tabs (lazygit / k9s
779        // pattern, no conflict with screen-cycling Tab/Shift+Tab).
780        // `+` / `-` resize the pane in 1-line steps, clamped to
781        // [LOG_PANE_MIN_HEIGHT, LOG_PANE_MAX_HEIGHT]. The state is
782        // persisted on quit.
783        if matches!(key.code, crossterm::event::KeyCode::Char('['))
784            && key.modifiers == crossterm::event::KeyModifiers::NONE
785        {
786            self.log_pane.prev_tab();
787            return Ok(());
788        }
789        if matches!(key.code, crossterm::event::KeyCode::Char(']'))
790            && key.modifiers == crossterm::event::KeyModifiers::NONE
791        {
792            self.log_pane.next_tab();
793            return Ok(());
794        }
795        if matches!(key.code, crossterm::event::KeyCode::Char('+'))
796            && key.modifiers == crossterm::event::KeyModifiers::NONE
797        {
798            self.log_pane.grow();
799            return Ok(());
800        }
801        if matches!(key.code, crossterm::event::KeyCode::Char('-'))
802            && key.modifiers == crossterm::event::KeyModifiers::NONE
803        {
804            self.log_pane.shrink();
805            return Ok(());
806        }
807        // Log-pane scroll. Shift+Up/Down step one line; Shift+PgUp/PgDn
808        // step ten; Shift+End resumes tail. The Shift modifier
809        // distinguishes from in-screen scroll (j/k/PgUp/PgDn) bound
810        // by S2/S6/S9 — those keep working without conflict.
811        if key.modifiers == crossterm::event::KeyModifiers::SHIFT {
812            match key.code {
813                crossterm::event::KeyCode::Up => {
814                    self.log_pane.scroll_up(1);
815                    return Ok(());
816                }
817                crossterm::event::KeyCode::Down => {
818                    self.log_pane.scroll_down(1);
819                    return Ok(());
820                }
821                crossterm::event::KeyCode::PageUp => {
822                    self.log_pane.scroll_up(10);
823                    return Ok(());
824                }
825                crossterm::event::KeyCode::PageDown => {
826                    self.log_pane.scroll_down(10);
827                    return Ok(());
828                }
829                crossterm::event::KeyCode::End => {
830                    self.log_pane.resume_tail();
831                    return Ok(());
832                }
833                // Horizontal pan for long Bee log lines. 8 chars per
834                // keystroke feels live without making the operator
835                // hold the key; `Shift+End` resets both axes via
836                // resume_tail() so there's no separate "back to
837                // left edge" binding.
838                crossterm::event::KeyCode::Left => {
839                    self.log_pane.scroll_left(8);
840                    return Ok(());
841                }
842                crossterm::event::KeyCode::Right => {
843                    self.log_pane.scroll_right(8);
844                    return Ok(());
845                }
846                _ => {}
847            }
848        }
849        // `q` is the easy-to-misclick exit. Require a double-tap
850        // within `QUIT_CONFIRM_WINDOW` so a stray keystroke doesn't
851        // kill an active monitoring session. `Ctrl+C` / `Ctrl+D`
852        // remain wired through the keybindings system as immediate
853        // quit — escape hatches if the cockpit ever stops responding.
854        if matches!(key.code, crossterm::event::KeyCode::Char('q'))
855            && key.modifiers == crossterm::event::KeyModifiers::NONE
856        {
857            match resolve_quit_press(self.quit_pending, Instant::now(), QUIT_CONFIRM_WINDOW) {
858                QuitResolution::Confirm => {
859                    self.quit_pending = None;
860                    self.action_tx.send(Action::Quit)?;
861                }
862                QuitResolution::Pending => {
863                    self.quit_pending = Some(Instant::now());
864                    self.command_status = Some(CommandStatus::Info(
865                        "press q again to quit (Esc cancels)".into(),
866                    ));
867                }
868            }
869            return Ok(());
870        }
871        // Any other key resets the pending-quit window so the operator
872        // doesn't accidentally confirm later from a forgotten first
873        // tap.
874        if self.quit_pending.is_some() {
875            self.quit_pending = None;
876        }
877        let Some(keymap) = self.config.keybindings.0.get(&self.mode) else {
878            return Ok(());
879        };
880        match keymap.get(&vec![key]) {
881            Some(action) => {
882                info!("Got action: {action:?}");
883                action_tx.send(action.clone())?;
884            }
885            _ => {
886                // If the key was not handled as a single key action,
887                // then consider it for multi-key combinations.
888                self.last_tick_key_events.push(key);
889
890                // Check for multi-key combinations
891                if let Some(action) = keymap.get(&self.last_tick_key_events) {
892                    info!("Got action: {action:?}");
893                    action_tx.send(action.clone())?;
894                }
895            }
896        }
897        Ok(())
898    }
899
900    fn handle_command_mode_key(&mut self, key: KeyEvent) -> color_eyre::Result<()> {
901        use crossterm::event::KeyCode;
902        let buf = match self.command_buffer.as_mut() {
903            Some(b) => b,
904            None => return Ok(()),
905        };
906        match key.code {
907            KeyCode::Esc => {
908                // Cancel without dispatching.
909                self.command_buffer = None;
910                self.command_suggestion_index = 0;
911            }
912            KeyCode::Enter => {
913                let line = std::mem::take(buf);
914                self.command_buffer = None;
915                self.command_suggestion_index = 0;
916                self.execute_command(&line)?;
917            }
918            KeyCode::Up => {
919                // Walk up the filtered suggestion list. Saturates at
920                // 0 so a stray Up doesn't wrap unexpectedly.
921                self.command_suggestion_index = self.command_suggestion_index.saturating_sub(1);
922            }
923            KeyCode::Down => {
924                let n = filter_command_suggestions(buf, KNOWN_COMMANDS).len();
925                if n > 0 && self.command_suggestion_index + 1 < n {
926                    self.command_suggestion_index += 1;
927                }
928            }
929            KeyCode::Tab => {
930                // Autocomplete: replace the buffer's first token with
931                // the highlighted suggestion's name and append a
932                // space so the operator can type args immediately.
933                let matches = filter_command_suggestions(buf, KNOWN_COMMANDS);
934                if let Some((name, _)) = matches.get(self.command_suggestion_index) {
935                    let rest = buf
936                        .split_once(char::is_whitespace)
937                        .map(|(_, tail)| tail)
938                        .unwrap_or("");
939                    let new = if rest.is_empty() {
940                        format!("{name} ")
941                    } else {
942                        format!("{name} {rest}")
943                    };
944                    buf.clear();
945                    buf.push_str(&new);
946                    self.command_suggestion_index = 0;
947                }
948            }
949            KeyCode::Backspace => {
950                buf.pop();
951                self.command_suggestion_index = 0;
952            }
953            KeyCode::Char(c) => {
954                buf.push(c);
955                self.command_suggestion_index = 0;
956            }
957            _ => {}
958        }
959        Ok(())
960    }
961
962    /// Resolve a `:command` token to the action it represents.
963    /// Empty input is a silent no-op (operator typed `:` then Enter).
964    fn execute_command(&mut self, line: &str) -> color_eyre::Result<()> {
965        let trimmed = line.trim();
966        if trimmed.is_empty() {
967            return Ok(());
968        }
969        let head = trimmed.split_whitespace().next().unwrap_or("");
970        match head {
971            "q" | "quit" => {
972                self.action_tx.send(Action::Quit)?;
973                self.command_status = Some(CommandStatus::Info("quitting".into()));
974            }
975            "diagnose" | "diag" => {
976                let pprof_secs = parse_pprof_arg(trimmed);
977                if let Some(secs) = pprof_secs {
978                    self.command_status = Some(self.start_diagnose_with_pprof(secs));
979                } else {
980                    self.command_status = Some(match self.export_diagnostic_bundle() {
981                        Ok(path) => CommandStatus::Info(format!(
982                            "diagnostic bundle exported to {}",
983                            path.display()
984                        )),
985                        Err(e) => CommandStatus::Err(format!("diagnose failed: {e}")),
986                    });
987                }
988            }
989            "pins-check" => {
990                // `:pins-check` keeps the legacy bulk-check-to-file behaviour;
991                // `:pins` (without `-check`) now jumps to the S11 screen via
992                // the screen-name catch-all below. The two are deliberately
993                // distinct so an operator who types `:pins` doesn't kick off
994                // a many-minute integrity walk by accident.
995                self.command_status = Some(match self.start_pins_check() {
996                    Ok(path) => CommandStatus::Info(format!(
997                        "pins integrity check running → {} (tail to watch progress)",
998                        path.display()
999                    )),
1000                    Err(e) => CommandStatus::Err(format!("pins-check failed to start: {e}")),
1001                });
1002            }
1003            "loggers" => {
1004                self.command_status = Some(match self.start_loggers_dump() {
1005                    Ok(path) => CommandStatus::Info(format!(
1006                        "loggers snapshot writing → {} (open when ready)",
1007                        path.display()
1008                    )),
1009                    Err(e) => CommandStatus::Err(format!("loggers failed to start: {e}")),
1010                });
1011            }
1012            "set-logger" => {
1013                let mut parts = trimmed.split_whitespace();
1014                let _ = parts.next(); // command head
1015                let expr = parts.next().unwrap_or("");
1016                let level = parts.next().unwrap_or("");
1017                if expr.is_empty() || level.is_empty() {
1018                    self.command_status = Some(CommandStatus::Err(
1019                        "usage: :set-logger <expr> <level>  (level: none|error|warning|info|debug|all; expr: e.g. node/pushsync or '.' for all)"
1020                            .into(),
1021                    ));
1022                    return Ok(());
1023                }
1024                self.start_set_logger(expr.to_string(), level.to_string());
1025                self.command_status = Some(CommandStatus::Info(format!(
1026                    "set-logger {expr:?} → {level:?} (PUT in-flight; check :loggers to verify)"
1027                )));
1028            }
1029            "topup-preview" => {
1030                self.command_status = Some(self.run_topup_preview(trimmed));
1031            }
1032            "dilute-preview" => {
1033                self.command_status = Some(self.run_dilute_preview(trimmed));
1034            }
1035            "extend-preview" => {
1036                self.command_status = Some(self.run_extend_preview(trimmed));
1037            }
1038            "buy-preview" => {
1039                self.command_status = Some(self.run_buy_preview(trimmed));
1040            }
1041            "buy-suggest" => {
1042                self.command_status = Some(self.run_buy_suggest(trimmed));
1043            }
1044            "plan-batch" => {
1045                self.command_status = Some(self.run_plan_batch(trimmed));
1046            }
1047            "check-version" => {
1048                self.command_status = Some(self.run_check_version());
1049            }
1050            "config-doctor" => {
1051                self.command_status = Some(self.run_config_doctor());
1052            }
1053            "price" => {
1054                self.command_status = Some(self.run_price());
1055            }
1056            "basefee" => {
1057                self.command_status = Some(self.run_basefee());
1058            }
1059            "probe-upload" => {
1060                self.command_status = Some(self.run_probe_upload(trimmed));
1061            }
1062            "upload-file" => {
1063                self.command_status = Some(self.run_upload_file(trimmed));
1064            }
1065            "upload-collection" => {
1066                self.command_status = Some(self.run_upload_collection(trimmed));
1067            }
1068            "feed-probe" => {
1069                self.command_status = Some(self.run_feed_probe(trimmed));
1070            }
1071            "feed-timeline" => {
1072                self.command_status = Some(self.run_feed_timeline(trimmed));
1073            }
1074            "hash" => {
1075                self.command_status = Some(self.run_hash(trimmed));
1076            }
1077            "cid" => {
1078                self.command_status = Some(self.run_cid(trimmed));
1079            }
1080            "depth-table" => {
1081                self.command_status = Some(self.run_depth_table());
1082            }
1083            "gsoc-mine" => {
1084                self.command_status = Some(self.run_gsoc_mine(trimmed));
1085            }
1086            "pss-target" => {
1087                self.command_status = Some(self.run_pss_target(trimmed));
1088            }
1089            "manifest" => {
1090                self.command_status = Some(self.run_manifest(trimmed));
1091            }
1092            "inspect" => {
1093                self.command_status = Some(self.run_inspect(trimmed));
1094            }
1095            "durability-check" => {
1096                self.command_status = Some(self.run_durability_check(trimmed));
1097            }
1098            "grantees-list" => {
1099                self.command_status = Some(self.run_grantees_list(trimmed));
1100            }
1101            "watch-ref" => {
1102                self.command_status = Some(self.run_watch_ref(trimmed));
1103            }
1104            "watch-ref-stop" => {
1105                self.command_status = Some(self.run_watch_ref_stop(trimmed));
1106            }
1107            "pubsub-pss" => {
1108                self.command_status = Some(self.run_pubsub_pss(trimmed));
1109            }
1110            "pubsub-gsoc" => {
1111                self.command_status = Some(self.run_pubsub_gsoc(trimmed));
1112            }
1113            "pubsub-stop" => {
1114                self.command_status = Some(self.run_pubsub_stop(trimmed));
1115            }
1116            "pubsub-filter" => {
1117                self.command_status = Some(self.run_pubsub_filter(trimmed));
1118            }
1119            "pubsub-filter-clear" => {
1120                self.command_status = Some(self.run_pubsub_filter_clear());
1121            }
1122            "pubsub-replay" => {
1123                self.command_status = Some(self.run_pubsub_replay(trimmed));
1124            }
1125            "context" | "ctx" => {
1126                let target = trimmed.split_whitespace().nth(1).unwrap_or("");
1127                if target.is_empty() {
1128                    let known: Vec<String> =
1129                        self.config.nodes.iter().map(|n| n.name.clone()).collect();
1130                    self.command_status = Some(CommandStatus::Err(format!(
1131                        "usage: :context <name>  (known: {})",
1132                        known.join(", ")
1133                    )));
1134                    return Ok(());
1135                }
1136                self.command_status = Some(match self.switch_context(target) {
1137                    Ok(()) => CommandStatus::Info(format!(
1138                        "switched to context {target} ({})",
1139                        self.api.url
1140                    )),
1141                    Err(e) => CommandStatus::Err(format!("context switch failed: {e}")),
1142                });
1143            }
1144            screen
1145                if SCREEN_NAMES
1146                    .iter()
1147                    .any(|name| name.eq_ignore_ascii_case(screen)) =>
1148            {
1149                if let Some(idx) = SCREEN_NAMES
1150                    .iter()
1151                    .position(|name| name.eq_ignore_ascii_case(screen))
1152                {
1153                    self.current_screen = idx;
1154                    self.command_status =
1155                        Some(CommandStatus::Info(format!("→ {}", SCREEN_NAMES[idx])));
1156                }
1157            }
1158            other => {
1159                self.command_status = Some(CommandStatus::Err(format!(
1160                    "unknown command: {other:?} (try :health, :stamps, :swap, :lottery, :peers, :network, :warmup, :api, :tags, :pins, :manifest, :inspect, :diagnose, :pins-check, :loggers, :set-logger, :topup-preview, :dilute-preview, :extend-preview, :buy-preview, :buy-suggest, :plan-batch, :probe-upload, :upload-file, :upload-collection, :feed-probe, :feed-timeline, :watch-ref, :watch-ref-stop, :pubsub-pss, :pubsub-gsoc, :pubsub-stop, :pubsub-filter, :pubsub-filter-clear, :pubsub-replay, :grantees-list, :hash, :cid, :depth-table, :gsoc-mine, :pss-target, :context, :quit)"
1161                )));
1162            }
1163        }
1164        Ok(())
1165    }
1166
1167    /// Read-only "what would happen if I topped up batch X with N
1168    /// PLUR/chunk?". Pure math — no Bee calls, no writes. Args:
1169    /// `:topup-preview <batch-prefix> <amount-plur>`.
1170    fn run_topup_preview(&self, line: &str) -> CommandStatus {
1171        let parts: Vec<&str> = line.split_whitespace().collect();
1172        let (prefix, amount_str) = match parts.as_slice() {
1173            [_, prefix, amount, ..] => (*prefix, *amount),
1174            _ => {
1175                return CommandStatus::Err(
1176                    "usage: :topup-preview <batch-prefix> <amount-plur-per-chunk>".into(),
1177                );
1178            }
1179        };
1180        let chain = match self.health_rx.borrow().chain_state.clone() {
1181            Some(c) => c,
1182            None => return CommandStatus::Err("chain state not loaded yet".into()),
1183        };
1184        let stamps = self.watch.stamps().borrow().clone();
1185        let batch = match stamp_preview::match_batch_prefix(&stamps.batches, prefix) {
1186            Ok(b) => b.clone(),
1187            Err(e) => return CommandStatus::Err(e),
1188        };
1189        let amount = match stamp_preview::parse_plur_amount(amount_str) {
1190            Ok(a) => a,
1191            Err(e) => return CommandStatus::Err(e),
1192        };
1193        match stamp_preview::topup_preview(&batch, amount, &chain) {
1194            Ok(p) => CommandStatus::Info(p.summary()),
1195            Err(e) => CommandStatus::Err(e),
1196        }
1197    }
1198
1199    /// `:dilute-preview <batch-prefix> <new-depth>` — pure math:
1200    /// halves per-chunk amount and TTL for each +1 in depth, doubles
1201    /// theoretical capacity.
1202    fn run_dilute_preview(&self, line: &str) -> CommandStatus {
1203        let parts: Vec<&str> = line.split_whitespace().collect();
1204        let (prefix, depth_str) = match parts.as_slice() {
1205            [_, prefix, depth, ..] => (*prefix, *depth),
1206            _ => {
1207                return CommandStatus::Err(
1208                    "usage: :dilute-preview <batch-prefix> <new-depth>".into(),
1209                );
1210            }
1211        };
1212        let new_depth: u8 = match depth_str.parse() {
1213            Ok(d) => d,
1214            Err(_) => {
1215                return CommandStatus::Err(format!("invalid depth {depth_str:?} (expected u8)"));
1216            }
1217        };
1218        let stamps = self.watch.stamps().borrow().clone();
1219        let batch = match stamp_preview::match_batch_prefix(&stamps.batches, prefix) {
1220            Ok(b) => b.clone(),
1221            Err(e) => return CommandStatus::Err(e),
1222        };
1223        match stamp_preview::dilute_preview(&batch, new_depth) {
1224            Ok(p) => CommandStatus::Info(p.summary()),
1225            Err(e) => CommandStatus::Err(e),
1226        }
1227    }
1228
1229    /// `:extend-preview <batch-prefix> <duration>` — accepts `30d`,
1230    /// `12h`, `90m`, `45s`, or plain seconds.
1231    fn run_extend_preview(&self, line: &str) -> CommandStatus {
1232        let parts: Vec<&str> = line.split_whitespace().collect();
1233        let (prefix, duration_str) = match parts.as_slice() {
1234            [_, prefix, duration, ..] => (*prefix, *duration),
1235            _ => {
1236                return CommandStatus::Err(
1237                    "usage: :extend-preview <batch-prefix> <duration>  (e.g. 30d, 12h, 90m, 45s, or plain seconds)".into(),
1238                );
1239            }
1240        };
1241        let extension_seconds = match stamp_preview::parse_duration_seconds(duration_str) {
1242            Ok(s) => s,
1243            Err(e) => return CommandStatus::Err(e),
1244        };
1245        let chain = match self.health_rx.borrow().chain_state.clone() {
1246            Some(c) => c,
1247            None => return CommandStatus::Err("chain state not loaded yet".into()),
1248        };
1249        let stamps = self.watch.stamps().borrow().clone();
1250        let batch = match stamp_preview::match_batch_prefix(&stamps.batches, prefix) {
1251            Ok(b) => b.clone(),
1252            Err(e) => return CommandStatus::Err(e),
1253        };
1254        match stamp_preview::extend_preview(&batch, extension_seconds, &chain) {
1255            Ok(p) => CommandStatus::Info(p.summary()),
1256            Err(e) => CommandStatus::Err(e),
1257        }
1258    }
1259
1260    /// `:probe-upload <batch-prefix>` — uploads one synthetic 4 KiB
1261    /// chunk to Bee and reports end-to-end latency. The cockpit is
1262    /// otherwise read-only; this is the deliberate exception. The
1263    /// chunk's payload is timestamp-randomised so each invocation
1264    /// fully exercises the upload + stamp path (no Bee dedup).
1265    ///
1266    /// Cost: one bucket increment on the chosen batch + the BZZ for
1267    /// one stamped chunk (`current_price` PLUR, fractions of a cent
1268    /// at typical prices). Returns immediately with a "started"
1269    /// notice; the actual outcome lands on the command bar via the
1270    /// async `cmd_status_tx` channel when Bee responds.
1271    fn run_probe_upload(&self, line: &str) -> CommandStatus {
1272        let parts: Vec<&str> = line.split_whitespace().collect();
1273        let prefix = match parts.as_slice() {
1274            [_, prefix, ..] => *prefix,
1275            _ => {
1276                return CommandStatus::Err(
1277                    "usage: :probe-upload <batch-prefix>  (uploads one synthetic 4 KiB chunk)"
1278                        .into(),
1279                );
1280            }
1281        };
1282        let stamps = self.watch.stamps().borrow().clone();
1283        let batch = match stamp_preview::match_batch_prefix(&stamps.batches, prefix) {
1284            Ok(b) => b.clone(),
1285            Err(e) => return CommandStatus::Err(e),
1286        };
1287        if !batch.usable {
1288            return CommandStatus::Err(format!(
1289                "batch {} is not usable yet (waiting on chain confirmation) — pick another",
1290                short_hex(&batch.batch_id.to_hex(), 8),
1291            ));
1292        }
1293        if batch.batch_ttl <= 0 {
1294            return CommandStatus::Err(format!(
1295                "batch {} is expired — pick another",
1296                short_hex(&batch.batch_id.to_hex(), 8),
1297            ));
1298        }
1299
1300        let api = self.api.clone();
1301        let tx = self.cmd_status_tx.clone();
1302        let batch_id = batch.batch_id;
1303        let batch_short = short_hex(&batch.batch_id.to_hex(), 8);
1304        let task_short = batch_short.clone();
1305        tokio::spawn(async move {
1306            let chunk = build_synthetic_probe_chunk();
1307            let started = Instant::now();
1308            let result = api.bee().file().upload_chunk(&batch_id, chunk, None).await;
1309            let elapsed_ms = started.elapsed().as_millis();
1310            let status = match result {
1311                Ok(res) => CommandStatus::Info(format!(
1312                    "probe-upload OK in {elapsed_ms}ms — batch {task_short}, ref {}",
1313                    short_hex(&res.reference.to_hex(), 8),
1314                )),
1315                Err(e) => CommandStatus::Err(format!(
1316                    "probe-upload FAILED after {elapsed_ms}ms — batch {task_short}: {e}"
1317                )),
1318            };
1319            let _ = tx.send(status);
1320        });
1321
1322        CommandStatus::Info(format!(
1323            "probe-upload to batch {batch_short} in flight — result will replace this line"
1324        ))
1325    }
1326
1327    /// `:upload-file <path> <batch-prefix>` — upload a single local
1328    /// file via `POST /bzz` and return the resulting Swarm reference.
1329    /// Single-file scope only: directories error with a hint to use
1330    /// the (yet-to-ship) collection mode. The 256 MiB ceiling protects
1331    /// the cockpit from accidentally streaming a multi-GB file through
1332    /// the event loop; operators with bigger uploads should use
1333    /// swarm-cli where the upload runs out-of-process.
1334    fn run_upload_file(&self, line: &str) -> CommandStatus {
1335        let parts: Vec<&str> = line.split_whitespace().collect();
1336        let (path_str, prefix) = match parts.as_slice() {
1337            [_, p, b, ..] => (*p, *b),
1338            _ => {
1339                return CommandStatus::Err("usage: :upload-file <path> <batch-prefix>".into());
1340            }
1341        };
1342        let path = std::path::PathBuf::from(path_str);
1343        let meta = match std::fs::metadata(&path) {
1344            Ok(m) => m,
1345            Err(e) => return CommandStatus::Err(format!("stat {path_str}: {e}")),
1346        };
1347        if meta.is_dir() {
1348            return CommandStatus::Err(format!(
1349                "{path_str} is a directory — :upload-file is single-file only (collection upload coming in a later release)"
1350            ));
1351        }
1352        const MAX_FILE_BYTES: u64 = 256 * 1024 * 1024;
1353        if meta.len() > MAX_FILE_BYTES {
1354            return CommandStatus::Err(format!(
1355                "{path_str} is {} — over the {}-MiB cockpit ceiling; use swarm-cli for larger uploads",
1356                meta.len(),
1357                MAX_FILE_BYTES / (1024 * 1024),
1358            ));
1359        }
1360        let stamps = self.watch.stamps().borrow().clone();
1361        let batch = match stamp_preview::match_batch_prefix(&stamps.batches, prefix) {
1362            Ok(b) => b.clone(),
1363            Err(e) => return CommandStatus::Err(e),
1364        };
1365        if !batch.usable {
1366            return CommandStatus::Err(format!(
1367                "batch {} is not usable yet (waiting on chain confirmation) — pick another",
1368                short_hex(&batch.batch_id.to_hex(), 8),
1369            ));
1370        }
1371        if batch.batch_ttl <= 0 {
1372            return CommandStatus::Err(format!(
1373                "batch {} is expired — pick another",
1374                short_hex(&batch.batch_id.to_hex(), 8),
1375            ));
1376        }
1377
1378        let api = self.api.clone();
1379        let tx = self.cmd_status_tx.clone();
1380        let batch_id = batch.batch_id;
1381        let batch_short = short_hex(&batch.batch_id.to_hex(), 8);
1382        let task_short = batch_short.clone();
1383        let file_size = meta.len();
1384        let name = path
1385            .file_name()
1386            .and_then(|n| n.to_str())
1387            .unwrap_or("")
1388            .to_string();
1389        let content_type = guess_content_type(&path);
1390        tokio::spawn(async move {
1391            let data = match tokio::fs::read(&path).await {
1392                Ok(b) => b,
1393                Err(e) => {
1394                    let _ = tx.send(CommandStatus::Err(format!("read {}: {e}", path.display())));
1395                    return;
1396                }
1397            };
1398            let started = Instant::now();
1399            let result = api
1400                .bee()
1401                .file()
1402                .upload_file(&batch_id, data, &name, &content_type, None)
1403                .await;
1404            let elapsed_ms = started.elapsed().as_millis();
1405            let status = match result {
1406                Ok(res) => CommandStatus::Info(format!(
1407                    "upload-file OK in {elapsed_ms}ms — {file_size}B → ref {} (batch {task_short})",
1408                    res.reference.to_hex(),
1409                )),
1410                Err(e) => CommandStatus::Err(format!(
1411                    "upload-file FAILED after {elapsed_ms}ms — batch {task_short}: {e}"
1412                )),
1413            };
1414            let _ = tx.send(status);
1415        });
1416
1417        CommandStatus::Info(format!(
1418            "upload-file ({file_size}B) to batch {batch_short} in flight — result will replace this line"
1419        ))
1420    }
1421
1422    /// `:upload-collection <dir> <batch-prefix>` — recursive
1423    /// directory upload via `POST /bzz` (tar). Hidden files /
1424    /// dirs (`.git`, `.env`, …) and symlinks are skipped; an
1425    /// `index.html` at the root auto-becomes the collection's
1426    /// default index. Caps: 256 MiB total, 10k entries — same
1427    /// reasoning as `:upload-file`'s 256-MiB single-file ceiling
1428    /// (keeps the cockpit's event loop responsive).
1429    fn run_upload_collection(&self, line: &str) -> CommandStatus {
1430        let parts: Vec<&str> = line.split_whitespace().collect();
1431        let (dir_str, prefix) = match parts.as_slice() {
1432            [_, d, b, ..] => (*d, *b),
1433            _ => {
1434                return CommandStatus::Err("usage: :upload-collection <dir> <batch-prefix>".into());
1435            }
1436        };
1437        let dir = std::path::PathBuf::from(dir_str);
1438        let walked = match crate::uploads::walk_dir(&dir) {
1439            Ok(w) => w,
1440            Err(e) => return CommandStatus::Err(format!("walk {dir_str}: {e}")),
1441        };
1442        if walked.entries.is_empty() {
1443            return CommandStatus::Err(format!(
1444                "{dir_str} contains no uploadable files (after skipping hidden + symlinks)"
1445            ));
1446        }
1447        let stamps = self.watch.stamps().borrow().clone();
1448        let batch = match stamp_preview::match_batch_prefix(&stamps.batches, prefix) {
1449            Ok(b) => b.clone(),
1450            Err(e) => return CommandStatus::Err(e),
1451        };
1452        if !batch.usable {
1453            return CommandStatus::Err(format!(
1454                "batch {} is not usable yet (waiting on chain confirmation) — pick another",
1455                short_hex(&batch.batch_id.to_hex(), 8),
1456            ));
1457        }
1458        if batch.batch_ttl <= 0 {
1459            return CommandStatus::Err(format!(
1460                "batch {} is expired — pick another",
1461                short_hex(&batch.batch_id.to_hex(), 8),
1462            ));
1463        }
1464
1465        let api = self.api.clone();
1466        let tx = self.cmd_status_tx.clone();
1467        let batch_id = batch.batch_id;
1468        let batch_short = short_hex(&batch.batch_id.to_hex(), 8);
1469        let task_short = batch_short.clone();
1470        let total_bytes = walked.total_bytes;
1471        let entry_count = walked.entries.len();
1472        let entries = walked.entries;
1473        let default_index = walked.default_index.clone();
1474        let dir_str_owned = dir_str.to_string();
1475        let default_index_for_msg = default_index.clone();
1476        tokio::spawn(async move {
1477            let opts = bee::api::CollectionUploadOptions {
1478                index_document: default_index,
1479                ..Default::default()
1480            };
1481            let started = Instant::now();
1482            let result = api
1483                .bee()
1484                .file()
1485                .upload_collection_entries(&batch_id, &entries, Some(&opts))
1486                .await;
1487            let elapsed_ms = started.elapsed().as_millis();
1488            let status = match result {
1489                Ok(res) => {
1490                    let idx = default_index_for_msg
1491                        .as_deref()
1492                        .map(|i| format!(" · index={i}"))
1493                        .unwrap_or_default();
1494                    CommandStatus::Info(format!(
1495                        "upload-collection OK in {elapsed_ms}ms — {entry_count} files, {total_bytes}B → ref {} (batch {task_short}){idx}",
1496                        res.reference.to_hex(),
1497                    ))
1498                }
1499                Err(e) => CommandStatus::Err(format!(
1500                    "upload-collection FAILED after {elapsed_ms}ms — {dir_str_owned} → batch {task_short}: {e}"
1501                )),
1502            };
1503            let _ = tx.send(status);
1504        });
1505
1506        let idx_note = walked
1507            .default_index
1508            .as_deref()
1509            .map(|i| format!(" · default index={i}"))
1510            .unwrap_or_default();
1511        CommandStatus::Info(format!(
1512            "upload-collection {entry_count} files ({total_bytes}B){idx_note} to batch {batch_short} in flight — result will replace this line"
1513        ))
1514    }
1515
1516    /// `:feed-probe <owner> <topic>` — fetch the latest update of a
1517    /// feed and surface its index, timestamp, and (when the payload
1518    /// is reference-shaped) the embedded Swarm reference. Async via
1519    /// `cmd_status_tx` because /feeds lookups can take 30-60s on a
1520    /// fresh feed (Bee's first lookup walks epoch indices).
1521    fn run_feed_probe(&self, line: &str) -> CommandStatus {
1522        let parts: Vec<&str> = line.split_whitespace().collect();
1523        let (owner_str, topic_str) = match parts.as_slice() {
1524            [_, o, t, ..] => (*o, *t),
1525            _ => {
1526                return CommandStatus::Err(
1527                    "usage: :feed-probe <owner> <topic>  (topic = 64-hex or arbitrary string)"
1528                        .into(),
1529                );
1530            }
1531        };
1532        let parsed = match crate::feed_probe::parse_args(owner_str, topic_str) {
1533            Ok(p) => p,
1534            Err(e) => return CommandStatus::Err(e),
1535        };
1536        let owner_short = short_hex(&parsed.owner.to_hex(), 8);
1537        let api = self.api.clone();
1538        let tx = self.cmd_status_tx.clone();
1539        tokio::spawn(async move {
1540            let started = Instant::now();
1541            let status = match crate::feed_probe::probe(api, parsed).await {
1542                Ok(r) => CommandStatus::Info(format!(
1543                    "{} ({}ms)",
1544                    r.summary(),
1545                    started.elapsed().as_millis()
1546                )),
1547                Err(e) => CommandStatus::Err(format!("feed-probe failed: {e}")),
1548            };
1549            let _ = tx.send(status);
1550        });
1551        CommandStatus::Info(format!(
1552            "feed-probe owner={owner_short} in flight — result will replace this line (first lookup can take 30-60s)"
1553        ))
1554    }
1555
1556    /// `:feed-timeline <owner> <topic> [N]` — walk the feed's
1557    /// history (newest first) and surface the entries on the S14
1558    /// screen. The walk runs async (10s of seconds for a fresh feed
1559    /// before the latest-index probe completes); the screen shows a
1560    /// spinner until the result lands. The optional `[N]` clamps the
1561    /// number of entries fetched (default 50, hard max 1000).
1562    fn run_feed_timeline(&mut self, line: &str) -> CommandStatus {
1563        let parts: Vec<&str> = line.split_whitespace().collect();
1564        let (owner_str, topic_str, n_arg) = match parts.as_slice() {
1565            [_, o, t] => (*o, *t, None),
1566            [_, o, t, n, ..] => (*o, *t, Some(*n)),
1567            _ => {
1568                return CommandStatus::Err(
1569                    "usage: :feed-timeline <owner> <topic> [N]  (default 50, hard max 1000)".into(),
1570                );
1571            }
1572        };
1573        let parsed = match crate::feed_probe::parse_args(owner_str, topic_str) {
1574            Ok(p) => p,
1575            Err(e) => return CommandStatus::Err(e),
1576        };
1577        let max_entries = match n_arg {
1578            None => crate::feed_timeline::DEFAULT_MAX_ENTRIES,
1579            Some(s) => match s.parse::<u64>() {
1580                Ok(n) if n > 0 => n,
1581                _ => return CommandStatus::Err(format!("invalid N: {s:?}")),
1582            },
1583        };
1584        // Switch to S14 + reset the screen state synchronously so the
1585        // operator sees the spinner immediately. The result lands via
1586        // feed_timeline_rx on a future tick.
1587        if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "FeedTimeline") {
1588            self.current_screen = idx;
1589            if let Some(ft) = self
1590                .screens
1591                .get_mut(idx)
1592                .and_then(|s| s.as_any_mut())
1593                .and_then(|a| a.downcast_mut::<FeedTimeline>())
1594            {
1595                let label = format!(
1596                    "owner=0x{} · topic={} · N={max_entries}",
1597                    short_hex(&parsed.owner.to_hex(), 8),
1598                    short_hex(&parsed.topic.to_hex(), 8),
1599                );
1600                ft.set_loading(label);
1601            }
1602        }
1603        let api = self.api.clone();
1604        let tx = self.feed_timeline_tx.clone();
1605        tokio::spawn(async move {
1606            let msg = match crate::feed_timeline::walk(api, parsed.owner, parsed.topic, max_entries)
1607                .await
1608            {
1609                Ok(t) => FeedTimelineMessage::Loaded(t),
1610                Err(e) => FeedTimelineMessage::Failed(e),
1611            };
1612            let _ = tx.send(msg);
1613        });
1614        CommandStatus::Info(format!(
1615            "feed-timeline N={max_entries} in flight — switching to S14 (first lookup can take 30-60s)"
1616        ))
1617    }
1618
1619    /// `:hash <path>` — Swarm reference of a local file or directory,
1620    /// computed offline. Useful before paying for an upload to confirm
1621    /// the content's address-of-record matches what the dApp already
1622    /// committed to (the swarm-cli `hash` workflow).
1623    fn run_hash(&self, line: &str) -> CommandStatus {
1624        let parts: Vec<&str> = line.split_whitespace().collect();
1625        let path = match parts.as_slice() {
1626            [_, p, ..] => *p,
1627            _ => {
1628                return CommandStatus::Err(
1629                    "usage: :hash <path>  (file or directory; computed locally)".into(),
1630                );
1631            }
1632        };
1633        match utility_verbs::hash_path(path) {
1634            Ok(r) => CommandStatus::Info(format!("hash {path}: {r}")),
1635            Err(e) => CommandStatus::Err(format!("hash failed: {e}")),
1636        }
1637    }
1638
1639    /// `:cid <ref> [manifest|feed]` — re-encode a 32-byte Swarm ref as
1640    /// a multibase CID string for ENS / IPFS-gateway integration. Kind
1641    /// defaults to manifest.
1642    fn run_cid(&self, line: &str) -> CommandStatus {
1643        let parts: Vec<&str> = line.split_whitespace().collect();
1644        let (ref_hex, kind_arg) = match parts.as_slice() {
1645            [_, r, k, ..] => (*r, Some(*k)),
1646            [_, r] => (*r, None),
1647            _ => {
1648                return CommandStatus::Err(
1649                    "usage: :cid <ref> [manifest|feed]  (default manifest)".into(),
1650                );
1651            }
1652        };
1653        let kind = match utility_verbs::parse_cid_kind(kind_arg) {
1654            Ok(k) => k,
1655            Err(e) => return CommandStatus::Err(e),
1656        };
1657        match utility_verbs::cid_for_ref(ref_hex, kind) {
1658            Ok(cid) => CommandStatus::Info(format!("cid: {cid}")),
1659            Err(e) => CommandStatus::Err(format!("cid failed: {e}")),
1660        }
1661    }
1662
1663    /// `:depth-table` — print the canonical depth → effective-bytes
1664    /// table the rest of the cockpit's economics math is anchored on.
1665    /// Result lands in the temp dir as a one-shot file because the
1666    /// command bar can't render an 18-row table.
1667    fn run_depth_table(&self) -> CommandStatus {
1668        let body = utility_verbs::depth_table();
1669        let path = std::env::temp_dir().join("bee-tui-depth-table.txt");
1670        match std::fs::write(&path, &body) {
1671            Ok(()) => CommandStatus::Info(format!("depth table → {}", path.display())),
1672            Err(e) => CommandStatus::Err(format!("depth-table write failed: {e}")),
1673        }
1674    }
1675
1676    /// `:gsoc-mine <overlay> <identifier>` — pure CPU work that finds a
1677    /// `PrivateKey` whose SOC at `(identifier, owner)` lands close to
1678    /// the supplied overlay. Blocks the event loop briefly (≤ a few
1679    /// seconds typical) — acceptable for an interactive verb.
1680    fn run_gsoc_mine(&self, line: &str) -> CommandStatus {
1681        let parts: Vec<&str> = line.split_whitespace().collect();
1682        let (overlay, ident) = match parts.as_slice() {
1683            [_, o, i, ..] => (*o, *i),
1684            _ => {
1685                return CommandStatus::Err(
1686                    "usage: :gsoc-mine <overlay-hex> <identifier>  (CPU work, no network)".into(),
1687                );
1688            }
1689        };
1690        match utility_verbs::gsoc_mine_for(overlay, ident) {
1691            Ok(out) => CommandStatus::Info(out.replace('\n', " · ")),
1692            Err(e) => CommandStatus::Err(format!("gsoc-mine failed: {e}")),
1693        }
1694    }
1695
1696    /// `:manifest <ref>` — fetch the chunk + open S12 with a tree
1697    /// browser rooted on it. Async; the load lands on the screen via
1698    /// its own mpsc fetch channel, not via `cmd_status_tx`.
1699    fn run_manifest(&mut self, line: &str) -> CommandStatus {
1700        let parts: Vec<&str> = line.split_whitespace().collect();
1701        let ref_arg = match parts.as_slice() {
1702            [_, r, ..] => *r,
1703            _ => {
1704                return CommandStatus::Err(
1705                    "usage: :manifest <ref>  (32-byte hex reference)".into(),
1706                );
1707            }
1708        };
1709        let reference = match bee::swarm::Reference::from_hex(ref_arg.trim()) {
1710            Ok(r) => r,
1711            Err(e) => return CommandStatus::Err(format!("manifest: bad ref: {e}")),
1712        };
1713        // Find the Manifest screen and ask it to load. Index lookup
1714        // by SCREEN_NAMES so future re-orders don't bit-rot.
1715        let idx = match SCREEN_NAMES.iter().position(|n| *n == "Manifest") {
1716            Some(i) => i,
1717            None => {
1718                return CommandStatus::Err("internal: Manifest screen not registered".into());
1719            }
1720        };
1721        let screen = self
1722            .screens
1723            .get_mut(idx)
1724            .and_then(|s| s.as_any_mut())
1725            .and_then(|a| a.downcast_mut::<Manifest>());
1726        let Some(manifest) = screen else {
1727            return CommandStatus::Err("internal: failed to access Manifest screen".into());
1728        };
1729        manifest.load(reference);
1730        self.current_screen = idx;
1731        CommandStatus::Info(format!("loading manifest {}", short_hex(ref_arg, 8)))
1732    }
1733
1734    /// `:inspect <ref>` — universal "what is this thing?" verb.
1735    /// Fetches one chunk and tries `MantarayNode::unmarshal` to
1736    /// distinguish manifest from raw. On manifest, jumps to S12 with
1737    /// the tree opened; on raw, prints a one-line summary to the
1738    /// command-status row. Result delivered via the async cmd-status
1739    /// channel.
1740    fn run_inspect(&self, line: &str) -> CommandStatus {
1741        let parts: Vec<&str> = line.split_whitespace().collect();
1742        let ref_arg = match parts.as_slice() {
1743            [_, r, ..] => *r,
1744            _ => {
1745                return CommandStatus::Err("usage: :inspect <ref>  (32-byte hex reference)".into());
1746            }
1747        };
1748        let reference = match bee::swarm::Reference::from_hex(ref_arg.trim()) {
1749            Ok(r) => r,
1750            Err(e) => return CommandStatus::Err(format!("inspect: bad ref: {e}")),
1751        };
1752        let api = self.api.clone();
1753        let tx = self.cmd_status_tx.clone();
1754        let label = short_hex(ref_arg, 8);
1755        let label_for_task = label.clone();
1756        tokio::spawn(async move {
1757            let result = manifest_walker::inspect(api, reference).await;
1758            let status = match result {
1759                InspectResult::Manifest { node, bytes_len } => CommandStatus::Info(format!(
1760                    "inspect {label_for_task}: manifest · {bytes_len} bytes · {} forks (jump to :manifest {label_for_task})",
1761                    node.forks.len(),
1762                )),
1763                InspectResult::RawChunk { bytes_len } => CommandStatus::Info(format!(
1764                    "inspect {label_for_task}: raw chunk · {bytes_len} bytes · not a manifest"
1765                )),
1766                InspectResult::Error(e) => {
1767                    CommandStatus::Err(format!("inspect {label_for_task} failed: {e}"))
1768                }
1769            };
1770            let _ = tx.send(status);
1771        });
1772        CommandStatus::Info(format!(
1773            "inspecting {label} — result will replace this line"
1774        ))
1775    }
1776
1777    /// `:durability-check <ref>` — walk the chunk graph rooted at
1778    /// `<ref>` and record the result on the S13 Watchlist screen.
1779    /// Async; the immediate command-status shows "in flight", the
1780    /// final summary lands when the walk completes.
1781    ///
1782    /// On manifest references the walk is recursive (root + every
1783    /// fork's `self_address`); on raw chunks it's just the single
1784    /// fetch. Either way, the cockpit jumps to S13 so the operator
1785    /// sees the running history while the new check completes.
1786    /// `:grantees-list <ref>` — fetch `GET /grantee/{ref}` and print
1787    /// the registered public key list. Read-only; pairs cleanly with
1788    /// `:inspect`. A full S16 ACT Grantees screen with create/patch
1789    /// is on the v1.8+ roadmap; this verb is the read-side
1790    /// foundation operators need today.
1791    fn run_grantees_list(&self, line: &str) -> CommandStatus {
1792        let parts: Vec<&str> = line.split_whitespace().collect();
1793        let ref_arg = match parts.as_slice() {
1794            [_, r, ..] => *r,
1795            _ => return CommandStatus::Err("usage: :grantees-list <ref>".into()),
1796        };
1797        let reference = match bee::swarm::Reference::from_hex(ref_arg.trim()) {
1798            Ok(r) => r,
1799            Err(e) => return CommandStatus::Err(format!("grantees-list: bad ref: {e}")),
1800        };
1801        let api = self.api.clone();
1802        let tx = self.cmd_status_tx.clone();
1803        let label = short_hex(ref_arg, 8);
1804        let label_for_task = label.clone();
1805        tokio::spawn(async move {
1806            let status = match api.bee().api().get_grantees(&reference).await {
1807                Ok(list) => {
1808                    if list.is_empty() {
1809                        CommandStatus::Info(format!(
1810                            "grantees-list {label_for_task}: no grantees registered"
1811                        ))
1812                    } else {
1813                        let preview: Vec<String> =
1814                            list.iter().take(3).map(|p| short_hex(p, 12)).collect();
1815                        let suffix = if list.len() > 3 {
1816                            format!(" (+{} more)", list.len() - 3)
1817                        } else {
1818                            String::new()
1819                        };
1820                        CommandStatus::Info(format!(
1821                            "grantees-list {label_for_task}: {} grantee(s) — {}{suffix}",
1822                            list.len(),
1823                            preview.join(", "),
1824                        ))
1825                    }
1826                }
1827                Err(e) => CommandStatus::Err(format!("grantees-list {label_for_task} failed: {e}")),
1828            };
1829            let _ = tx.send(status);
1830        });
1831        CommandStatus::Info(format!(
1832            "grantees-list {label} in flight — result will replace this line"
1833        ))
1834    }
1835
1836    fn run_durability_check(&mut self, line: &str) -> CommandStatus {
1837        let parts: Vec<&str> = line.split_whitespace().collect();
1838        let ref_arg = match parts.as_slice() {
1839            [_, r, ..] => *r,
1840            _ => {
1841                return CommandStatus::Err(
1842                    "usage: :durability-check <ref>  (32-byte hex reference)".into(),
1843                );
1844            }
1845        };
1846        let reference = match bee::swarm::Reference::from_hex(ref_arg.trim()) {
1847            Ok(r) => r,
1848            Err(e) => {
1849                return CommandStatus::Err(format!("durability-check: bad ref: {e}"));
1850            }
1851        };
1852        // Jump to S13 so the operator sees the existing history while
1853        // the new walk completes.
1854        if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "Watchlist") {
1855            self.current_screen = idx;
1856        }
1857        let api = self.api.clone();
1858        let tx = self.cmd_status_tx.clone();
1859        let watchlist_tx = self.durability_tx.clone();
1860        let label = short_hex(ref_arg, 8);
1861        let label_for_task = label.clone();
1862        let opts = self.durability_check_options();
1863        tokio::spawn(async move {
1864            let result = durability::check_with_options(api, reference, opts).await;
1865            let summary = result.summary();
1866            let _ = watchlist_tx.send(result);
1867            let _ = tx.send(if summary.contains("UNHEALTHY") {
1868                CommandStatus::Err(summary)
1869            } else {
1870                CommandStatus::Info(summary)
1871            });
1872        });
1873        CommandStatus::Info(format!(
1874            "durability-check {label_for_task} in flight — see S13 Watchlist for the running history"
1875        ))
1876    }
1877
1878    /// Read `[durability]` from config and convert to a
1879    /// `CheckOptions`. Cheap; called per-walk so config edits picked
1880    /// up via a future `:context` switch take effect on the next
1881    /// check without a cockpit restart.
1882    fn durability_check_options(&self) -> durability::CheckOptions {
1883        durability::CheckOptions {
1884            bmt_verify: true,
1885            swarmscan_url: if self.config.durability.swarmscan_check {
1886                Some(self.config.durability.swarmscan_url.clone())
1887            } else {
1888                None
1889            },
1890        }
1891    }
1892
1893    /// `:watch-ref <ref> [interval-secs]` — start a daemon loop that
1894    /// runs `:durability-check` on `<ref>` every `interval-secs`
1895    /// seconds. Each run's result lands in S13 Watchlist via the
1896    /// existing `durability_tx` channel. The cockpit's `root_cancel`
1897    /// triggers shutdown on quit; `:watch-ref-stop [ref]` triggers
1898    /// shutdown earlier. Re-issuing `:watch-ref` for a ref already
1899    /// being watched cancels the prior daemon and starts a fresh one
1900    /// (so the operator can change the interval without an explicit
1901    /// stop).
1902    fn run_watch_ref(&mut self, line: &str) -> CommandStatus {
1903        let parts: Vec<&str> = line.split_whitespace().collect();
1904        let (ref_arg, interval_arg) = match parts.as_slice() {
1905            [_, r] => (*r, None),
1906            [_, r, i, ..] => (*r, Some(*i)),
1907            _ => {
1908                return CommandStatus::Err(
1909                    "usage: :watch-ref <ref> [interval-secs]  (default 60s)".into(),
1910                );
1911            }
1912        };
1913        let reference = match bee::swarm::Reference::from_hex(ref_arg.trim()) {
1914            Ok(r) => r,
1915            Err(e) => return CommandStatus::Err(format!("watch-ref: bad ref: {e}")),
1916        };
1917        let interval_secs = match interval_arg {
1918            None => 60u64,
1919            Some(s) => match s.parse::<u64>() {
1920                Ok(n) if (10..=86_400).contains(&n) => n,
1921                Ok(n) => {
1922                    return CommandStatus::Err(format!(
1923                        "watch-ref: interval {n}s out of range (10..=86400)"
1924                    ));
1925                }
1926                Err(_) => return CommandStatus::Err(format!("watch-ref: invalid interval: {s:?}")),
1927            },
1928        };
1929        let key = reference.to_hex();
1930        // If a daemon is already running for this ref, cancel it
1931        // first so we don't double-fire checks.
1932        if let Some(prev) = self.watch_refs.remove(&key) {
1933            prev.cancel();
1934        }
1935        let cancel = self.root_cancel.child_token();
1936        self.watch_refs.insert(key.clone(), cancel.clone());
1937
1938        let api = self.api.clone();
1939        let watchlist_tx = self.durability_tx.clone();
1940        let label = short_hex(ref_arg, 8);
1941        let label_for_task = label.clone();
1942        let opts = self.durability_check_options();
1943        tokio::spawn(async move {
1944            let interval = std::time::Duration::from_secs(interval_secs);
1945            loop {
1946                let result =
1947                    durability::check_with_options(api.clone(), reference.clone(), opts.clone())
1948                        .await;
1949                let _ = watchlist_tx.send(result);
1950                tokio::select! {
1951                    _ = tokio::time::sleep(interval) => {}
1952                    _ = cancel.cancelled() => return,
1953                }
1954            }
1955        });
1956
1957        CommandStatus::Info(format!(
1958            "watch-ref {label_for_task} started — re-checking every {interval_secs}s; results in S13 Watchlist"
1959        ))
1960    }
1961
1962    /// `:watch-ref-stop [ref]` — cancel a running `:watch-ref`
1963    /// daemon. With no arg, cancels every active daemon; with a
1964    /// `<ref>` arg, cancels only the matching one. The daemon's
1965    /// tokio task observes the cancel on its next iteration
1966    /// boundary (i.e. up to `interval-secs` later); the App's
1967    /// hashmap entry is removed immediately.
1968    fn run_watch_ref_stop(&mut self, line: &str) -> CommandStatus {
1969        let parts: Vec<&str> = line.split_whitespace().collect();
1970        match parts.as_slice() {
1971            [_] => {
1972                let n = self.watch_refs.len();
1973                for (_, c) in self.watch_refs.drain() {
1974                    c.cancel();
1975                }
1976                CommandStatus::Info(format!("watch-ref-stop: cancelled {n} active daemon(s)"))
1977            }
1978            [_, r, ..] => {
1979                let reference = match bee::swarm::Reference::from_hex(r.trim()) {
1980                    Ok(r) => r,
1981                    Err(e) => return CommandStatus::Err(format!("watch-ref-stop: bad ref: {e}")),
1982                };
1983                let key = reference.to_hex();
1984                match self.watch_refs.remove(&key) {
1985                    Some(c) => {
1986                        c.cancel();
1987                        CommandStatus::Info(format!(
1988                            "watch-ref-stop: cancelled daemon for {}",
1989                            short_hex(r, 8)
1990                        ))
1991                    }
1992                    None => CommandStatus::Err(format!(
1993                        "watch-ref-stop: no daemon running for {}",
1994                        short_hex(r, 8)
1995                    )),
1996                }
1997            }
1998            _ => CommandStatus::Err("usage: :watch-ref-stop [ref]  (omit ref to stop all)".into()),
1999        }
2000    }
2001
2002    /// `:pubsub-pss <topic>` — open a PSS subscription on `<topic>`
2003    /// and surface every received message on the S15 Pubsub screen.
2004    /// Topic accepts the same forms as `:feed-probe`: 64-hex literal
2005    /// or arbitrary string (keccak256-hashed via
2006    /// `Topic::from_string`). Re-issuing for an already-watched
2007    /// topic refuses with a clear error so duplicate sockets don't
2008    /// silently pile up — operator must `:pubsub-stop <sub-id>` first.
2009    fn run_pubsub_pss(&mut self, line: &str) -> CommandStatus {
2010        let parts: Vec<&str> = line.split_whitespace().collect();
2011        let topic_str = match parts.as_slice() {
2012            [_, t, ..] => *t,
2013            _ => return CommandStatus::Err("usage: :pubsub-pss <topic>".into()),
2014        };
2015        // Reuse :feed-probe's topic parser (string OR 64-hex literal).
2016        let parsed = match crate::feed_probe::parse_args(
2017            "0x0000000000000000000000000000000000000000",
2018            topic_str,
2019        ) {
2020            Ok(p) => p,
2021            Err(e) => return CommandStatus::Err(format!("pubsub-pss: {e}")),
2022        };
2023        let topic = parsed.topic;
2024        let sub_id = crate::pubsub::pss_sub_id(&topic);
2025        if self.pubsub_subs.contains_key(&sub_id) {
2026            return CommandStatus::Err(format!(
2027                "pubsub-pss: already subscribed to {sub_id} (use :pubsub-stop {sub_id} first)"
2028            ));
2029        }
2030        let cancel = self.root_cancel.child_token();
2031        self.pubsub_subs.insert(sub_id.clone(), cancel.clone());
2032        self.jump_to_pubsub_screen();
2033        let api = self.api.clone();
2034        let tx = self.pubsub_msg_tx.clone();
2035        let status_tx = self.cmd_status_tx.clone();
2036        let sub_id_for_task = sub_id.clone();
2037        let history = self.pubsub_history.clone();
2038        tokio::spawn(async move {
2039            if let Err(e) = crate::pubsub::spawn_pss_watcher(api, topic, cancel, tx, history).await
2040            {
2041                let _ = status_tx.send(CommandStatus::Err(format!(
2042                    "pubsub-pss {sub_id_for_task}: {e}"
2043                )));
2044            }
2045        });
2046        CommandStatus::Info(format!("pubsub-pss subscribed: {sub_id}"))
2047    }
2048
2049    /// `:pubsub-gsoc <owner> <identifier>` — open a GSOC subscription
2050    /// on the SOC keyed by `(owner, identifier)`. Both args accept
2051    /// `0x`-prefixed or bare hex (40 chars for owner, 64 chars for
2052    /// identifier).
2053    fn run_pubsub_gsoc(&mut self, line: &str) -> CommandStatus {
2054        let parts: Vec<&str> = line.split_whitespace().collect();
2055        let (owner_str, id_str) = match parts.as_slice() {
2056            [_, o, i, ..] => (*o, *i),
2057            _ => return CommandStatus::Err("usage: :pubsub-gsoc <owner> <identifier>".into()),
2058        };
2059        let owner = match bee::swarm::EthAddress::from_hex(owner_str.trim()) {
2060            Ok(o) => o,
2061            Err(e) => return CommandStatus::Err(format!("pubsub-gsoc: bad owner: {e}")),
2062        };
2063        let identifier = match bee::swarm::Identifier::from_hex(id_str.trim()) {
2064            Ok(i) => i,
2065            Err(e) => return CommandStatus::Err(format!("pubsub-gsoc: bad identifier: {e}")),
2066        };
2067        let sub_id = crate::pubsub::gsoc_sub_id(&owner, &identifier);
2068        if self.pubsub_subs.contains_key(&sub_id) {
2069            return CommandStatus::Err(format!(
2070                "pubsub-gsoc: already subscribed to {sub_id} (use :pubsub-stop first)"
2071            ));
2072        }
2073        let cancel = self.root_cancel.child_token();
2074        self.pubsub_subs.insert(sub_id.clone(), cancel.clone());
2075        self.jump_to_pubsub_screen();
2076        let api = self.api.clone();
2077        let tx = self.pubsub_msg_tx.clone();
2078        let status_tx = self.cmd_status_tx.clone();
2079        let sub_id_for_task = sub_id.clone();
2080        let history = self.pubsub_history.clone();
2081        tokio::spawn(async move {
2082            if let Err(e) =
2083                crate::pubsub::spawn_gsoc_watcher(api, owner, identifier, cancel, tx, history).await
2084            {
2085                let _ = status_tx.send(CommandStatus::Err(format!(
2086                    "pubsub-gsoc {sub_id_for_task}: {e}"
2087                )));
2088            }
2089        });
2090        CommandStatus::Info(format!("pubsub-gsoc subscribed: {sub_id}"))
2091    }
2092
2093    /// `:pubsub-stop [sub-id]` — cancel pubsub subscriptions. With
2094    /// no arg, cancels every active subscription; with a `<sub-id>`
2095    /// arg (`pss:...` or `gsoc:...`), cancels just that one.
2096    fn run_pubsub_stop(&mut self, line: &str) -> CommandStatus {
2097        let parts: Vec<&str> = line.split_whitespace().collect();
2098        match parts.as_slice() {
2099            [_] => {
2100                let n = self.pubsub_subs.len();
2101                for (_, c) in self.pubsub_subs.drain() {
2102                    c.cancel();
2103                }
2104                CommandStatus::Info(format!("pubsub-stop: cancelled {n} subscription(s)"))
2105            }
2106            [_, id, ..] => match self.pubsub_subs.remove(*id) {
2107                Some(c) => {
2108                    c.cancel();
2109                    CommandStatus::Info(format!("pubsub-stop: cancelled {id}"))
2110                }
2111                None => CommandStatus::Err(format!("pubsub-stop: no active subscription {id}")),
2112            },
2113            _ => CommandStatus::Err("usage: :pubsub-stop [sub-id]".into()),
2114        }
2115    }
2116
2117    /// Helper used by :pubsub-pss / :pubsub-gsoc to jump to S15 so
2118    /// the operator sees their incoming messages immediately.
2119    fn jump_to_pubsub_screen(&mut self) {
2120        if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "Pubsub") {
2121            self.current_screen = idx;
2122        }
2123    }
2124
2125    /// `:pubsub-filter <substring>` — show only S15 rows whose
2126    /// channel hex or smart-preview contains the given substring.
2127    /// Case-insensitive; underlying ring still receives every
2128    /// message (filtering is presentation-only).
2129    fn run_pubsub_filter(&mut self, line: &str) -> CommandStatus {
2130        let parts: Vec<&str> = line.splitn(2, char::is_whitespace).collect();
2131        let needle = match parts.as_slice() {
2132            [_, rest] => rest.trim().to_string(),
2133            _ => return CommandStatus::Err("usage: :pubsub-filter <substring>".into()),
2134        };
2135        if needle.is_empty() {
2136            return CommandStatus::Err("usage: :pubsub-filter <substring>".into());
2137        }
2138        if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "Pubsub") {
2139            if let Some(ps) = self
2140                .screens
2141                .get_mut(idx)
2142                .and_then(|s| s.as_any_mut())
2143                .and_then(|a| a.downcast_mut::<Pubsub>())
2144            {
2145                ps.set_filter(Some(needle.clone()));
2146            }
2147            self.current_screen = idx;
2148        }
2149        CommandStatus::Info(format!("pubsub-filter: showing rows containing {needle:?}"))
2150    }
2151
2152    /// `:pubsub-filter-clear` — remove the active S15 filter.
2153    fn run_pubsub_filter_clear(&mut self) -> CommandStatus {
2154        if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "Pubsub") {
2155            if let Some(ps) = self
2156                .screens
2157                .get_mut(idx)
2158                .and_then(|s| s.as_any_mut())
2159                .and_then(|a| a.downcast_mut::<Pubsub>())
2160            {
2161                ps.set_filter(None);
2162            }
2163        }
2164        CommandStatus::Info("pubsub-filter-clear: filter removed".into())
2165    }
2166
2167    /// `:pubsub-replay <path>` — load a previously-written pubsub
2168    /// history JSONL file and push its messages onto the S15 timeline
2169    /// so the operator can browse a past session without an active
2170    /// subscription. Caps at MAX_MESSAGES; bad lines are skipped with
2171    /// a warn log. Replay does not start any watchers.
2172    fn run_pubsub_replay(&mut self, line: &str) -> CommandStatus {
2173        let parts: Vec<&str> = line.split_whitespace().collect();
2174        let path_str = match parts.as_slice() {
2175            [_, p, ..] => *p,
2176            _ => return CommandStatus::Err("usage: :pubsub-replay <path>".into()),
2177        };
2178        let path = std::path::PathBuf::from(path_str);
2179        self.jump_to_pubsub_screen();
2180        let tx = self.pubsub_msg_tx.clone();
2181        let status_tx = self.cmd_status_tx.clone();
2182        tokio::spawn(async move {
2183            match crate::pubsub::replay_history_file(&path).await {
2184                Ok(msgs) => {
2185                    let n = msgs.len();
2186                    // Push oldest → newest so record() ends with newest at front.
2187                    for m in msgs {
2188                        let _ = tx.send(m);
2189                    }
2190                    let _ = status_tx.send(CommandStatus::Info(format!(
2191                        "pubsub-replay: loaded {n} message(s)"
2192                    )));
2193                }
2194                Err(e) => {
2195                    let _ = status_tx.send(CommandStatus::Err(format!("pubsub-replay: {e}")));
2196                }
2197            }
2198        });
2199        CommandStatus::Info(format!("pubsub-replay: loading {path_str}…"))
2200    }
2201
2202    /// `:pss-target <overlay>` — Bee's `/pss/send` accepts at most a
2203    /// 4-hex-char target prefix. This verb extracts those four chars
2204    /// from a full overlay so dApp authors don't have to re-derive
2205    /// the rule.
2206    fn run_pss_target(&self, line: &str) -> CommandStatus {
2207        let parts: Vec<&str> = line.split_whitespace().collect();
2208        let overlay = match parts.as_slice() {
2209            [_, o, ..] => *o,
2210            _ => {
2211                return CommandStatus::Err(
2212                    "usage: :pss-target <overlay-hex>  (returns first 4 hex chars)".into(),
2213                );
2214            }
2215        };
2216        match utility_verbs::pss_target_for(overlay) {
2217            Ok(prefix) => CommandStatus::Info(format!("pss target prefix: {prefix}")),
2218            Err(e) => CommandStatus::Err(format!("pss-target failed: {e}")),
2219        }
2220    }
2221
2222    /// `:price` — fire a one-shot fetch of the xBZZ → USD spot
2223    /// price from Swarm's public tokenservice. Async via
2224    /// cmd_status_tx. The cockpit doesn't auto-poll the price —
2225    /// operators ask for it when they want to think about
2226    /// economics in dollars.
2227    fn run_price(&self) -> CommandStatus {
2228        let tx = self.cmd_status_tx.clone();
2229        tokio::spawn(async move {
2230            let status = match economics_oracle::fetch_xbzz_price().await {
2231                Ok(p) => CommandStatus::Info(p.summary()),
2232                Err(e) => CommandStatus::Err(format!("price: {e}")),
2233            };
2234            let _ = tx.send(status);
2235        });
2236        CommandStatus::Info("price: querying tokenservice.ethswarm.org…".into())
2237    }
2238
2239    /// `:basefee` — fire JSON-RPC calls against the configured
2240    /// Gnosis RPC endpoint (`[economics].gnosis_rpc_url`) for the
2241    /// pending block's basefee + the network's expected tip. Async.
2242    fn run_basefee(&self) -> CommandStatus {
2243        let url = match self.config.economics.gnosis_rpc_url.clone() {
2244            Some(u) => u,
2245            None => {
2246                return CommandStatus::Err(
2247                    "basefee: set [economics].gnosis_rpc_url in config.toml (typically the same URL as Bee's --blockchain-rpc-endpoint)"
2248                        .into(),
2249                );
2250            }
2251        };
2252        let tx = self.cmd_status_tx.clone();
2253        tokio::spawn(async move {
2254            let status = match economics_oracle::fetch_gnosis_gas(&url).await {
2255                Ok(g) => CommandStatus::Info(g.summary()),
2256                Err(e) => CommandStatus::Err(format!("basefee: {e}")),
2257            };
2258            let _ = tx.send(status);
2259        });
2260        CommandStatus::Info("basefee: querying gnosis RPC…".into())
2261    }
2262
2263    /// `:config-doctor` — audit the operator's `bee.yaml` against
2264    /// the deprecation list ported from swarm-desktop's
2265    /// `migration.ts`. Read-only — the cockpit never modifies the
2266    /// operator's config. Report lands as a temp file the operator
2267    /// can review and apply by hand.
2268    fn run_config_doctor(&self) -> CommandStatus {
2269        let path = match self.config.bee.as_ref().map(|b| b.config.clone()) {
2270            Some(p) => p,
2271            None => {
2272                return CommandStatus::Err(
2273                    "config-doctor: no [bee].config in config.toml (or pass --bee-config) — point bee-tui at the bee.yaml you want audited"
2274                        .into(),
2275                );
2276            }
2277        };
2278        let report = match config_doctor::audit(&path) {
2279            Ok(r) => r,
2280            Err(e) => return CommandStatus::Err(format!("config-doctor: {e}")),
2281        };
2282        let secs = SystemTime::now()
2283            .duration_since(UNIX_EPOCH)
2284            .map(|d| d.as_secs())
2285            .unwrap_or(0);
2286        let out_path = std::env::temp_dir().join(format!("bee-tui-config-doctor-{secs}.txt"));
2287        if let Err(e) = std::fs::write(&out_path, report.render()) {
2288            return CommandStatus::Err(format!("config-doctor write {}: {e}", out_path.display()));
2289        }
2290        CommandStatus::Info(format!("{} → {}", report.summary(), out_path.display()))
2291    }
2292
2293    /// `:check-version` — fire a GitHub `releases/latest` lookup for
2294    /// `ethersphere/bee` and pair the result with the version the
2295    /// local Bee reported on `/health`. Both fetches happen in the
2296    /// spawned task (`/health` for the running version, GitHub for
2297    /// the latest); the watch hub's `HealthSnapshot` carries
2298    /// `/status` data, not the structured Bee version, so we hit
2299    /// `/health` explicitly here.
2300    fn run_check_version(&self) -> CommandStatus {
2301        let api = self.api.clone();
2302        let tx = self.cmd_status_tx.clone();
2303        tokio::spawn(async move {
2304            let running = api.bee().debug().health().await.ok().map(|h| h.version);
2305            let status = match version_check::check_latest(running).await {
2306                Ok(v) => CommandStatus::Info(v.summary()),
2307                Err(e) => CommandStatus::Err(format!("check-version failed: {e}")),
2308            };
2309            let _ = tx.send(status);
2310        });
2311        CommandStatus::Info("check-version: querying github.com/ethersphere/bee…".into())
2312    }
2313
2314    /// `:plan-batch <batch-prefix> [usage-thr] [ttl-thr] [extra-depth]` —
2315    /// runs beekeeper-stamper's `Set` algorithm read-only and tells
2316    /// the operator whether the batch needs topup, dilute, both, or
2317    /// nothing — plus the BZZ cost. Defaults: usage 0.85, TTL 24h,
2318    /// extra depth +2 (cross-ecosystem convention).
2319    fn run_plan_batch(&self, line: &str) -> CommandStatus {
2320        let parts: Vec<&str> = line.split_whitespace().collect();
2321        let prefix = match parts.as_slice() {
2322            [_, prefix, ..] => *prefix,
2323            _ => {
2324                return CommandStatus::Err(
2325                    "usage: :plan-batch <batch-prefix> [usage-thr] [ttl-thr] [extra-depth]".into(),
2326                );
2327            }
2328        };
2329        let usage_thr = match parts.get(2) {
2330            Some(s) => match s.parse::<f64>() {
2331                Ok(v) => v,
2332                Err(_) => {
2333                    return CommandStatus::Err(format!(
2334                        "invalid usage-thr {s:?} (expected float in [0,1], default 0.85)"
2335                    ));
2336                }
2337            },
2338            None => stamp_preview::DEFAULT_USAGE_THRESHOLD,
2339        };
2340        let ttl_thr = match parts.get(3) {
2341            Some(s) => match stamp_preview::parse_duration_seconds(s) {
2342                Ok(v) => v,
2343                Err(e) => return CommandStatus::Err(format!("ttl-thr: {e}")),
2344            },
2345            None => stamp_preview::DEFAULT_TTL_THRESHOLD_SECONDS,
2346        };
2347        let extra_depth = match parts.get(4) {
2348            Some(s) => match s.parse::<u8>() {
2349                Ok(v) => v,
2350                Err(_) => {
2351                    return CommandStatus::Err(format!(
2352                        "invalid extra-depth {s:?} (expected u8, default 2)"
2353                    ));
2354                }
2355            },
2356            None => stamp_preview::DEFAULT_EXTRA_DEPTH,
2357        };
2358        let chain = match self.health_rx.borrow().chain_state.clone() {
2359            Some(c) => c,
2360            None => return CommandStatus::Err("chain state not loaded yet".into()),
2361        };
2362        let stamps = self.watch.stamps().borrow().clone();
2363        let batch = match stamp_preview::match_batch_prefix(&stamps.batches, prefix) {
2364            Ok(b) => b.clone(),
2365            Err(e) => return CommandStatus::Err(e),
2366        };
2367        match stamp_preview::plan_batch(&batch, &chain, usage_thr, ttl_thr, extra_depth) {
2368            Ok(p) => CommandStatus::Info(p.summary()),
2369            Err(e) => CommandStatus::Err(e),
2370        }
2371    }
2372
2373    /// `:buy-suggest <size> <duration>` — inverse of buy-preview.
2374    /// Operator says "I want X bytes for Y seconds", we return the
2375    /// minimum `(depth, amount)` that covers it. Depth rounds up
2376    /// to the next power of two so the headroom is operator-visible;
2377    /// duration rounds up in chain blocks.
2378    fn run_buy_suggest(&self, line: &str) -> CommandStatus {
2379        let parts: Vec<&str> = line.split_whitespace().collect();
2380        let (size_str, duration_str) = match parts.as_slice() {
2381            [_, size, duration, ..] => (*size, *duration),
2382            _ => {
2383                return CommandStatus::Err(
2384                    "usage: :buy-suggest <size> <duration>  (e.g. 5GiB 30d, 100MiB 12h)".into(),
2385                );
2386            }
2387        };
2388        let target_bytes = match stamp_preview::parse_size_bytes(size_str) {
2389            Ok(b) => b,
2390            Err(e) => return CommandStatus::Err(e),
2391        };
2392        let target_seconds = match stamp_preview::parse_duration_seconds(duration_str) {
2393            Ok(s) => s,
2394            Err(e) => return CommandStatus::Err(e),
2395        };
2396        let chain = match self.health_rx.borrow().chain_state.clone() {
2397            Some(c) => c,
2398            None => return CommandStatus::Err("chain state not loaded yet".into()),
2399        };
2400        match stamp_preview::buy_suggest(target_bytes, target_seconds, &chain) {
2401            Ok(s) => CommandStatus::Info(s.summary()),
2402            Err(e) => CommandStatus::Err(e),
2403        }
2404    }
2405
2406    /// `:buy-preview <depth> <amount-plur>` — hypothetical fresh
2407    /// batch; no batch lookup needed.
2408    fn run_buy_preview(&self, line: &str) -> CommandStatus {
2409        let parts: Vec<&str> = line.split_whitespace().collect();
2410        let (depth_str, amount_str) = match parts.as_slice() {
2411            [_, depth, amount, ..] => (*depth, *amount),
2412            _ => {
2413                return CommandStatus::Err(
2414                    "usage: :buy-preview <depth> <amount-plur-per-chunk>".into(),
2415                );
2416            }
2417        };
2418        let depth: u8 = match depth_str.parse() {
2419            Ok(d) => d,
2420            Err(_) => {
2421                return CommandStatus::Err(format!("invalid depth {depth_str:?} (expected u8)"));
2422            }
2423        };
2424        let amount = match stamp_preview::parse_plur_amount(amount_str) {
2425            Ok(a) => a,
2426            Err(e) => return CommandStatus::Err(e),
2427        };
2428        let chain = match self.health_rx.borrow().chain_state.clone() {
2429            Some(c) => c,
2430            None => return CommandStatus::Err("chain state not loaded yet".into()),
2431        };
2432        match stamp_preview::buy_preview(depth, amount, &chain) {
2433            Ok(p) => CommandStatus::Info(p.summary()),
2434            Err(e) => CommandStatus::Err(e),
2435        }
2436    }
2437
2438    /// Tear down the current watch hub and ApiClient, build a new
2439    /// connection against the named NodeConfig, and rebuild the
2440    /// screen list against fresh receivers. Component-internal state
2441    /// (Lottery's bench history, Network's reachability stability
2442    /// timer, etc.) is intentionally lost — a profile switch is a
2443    /// fresh slate, the same way it would be on app restart.
2444    fn switch_context(&mut self, target: &str) -> color_eyre::Result<()> {
2445        let node = self
2446            .config
2447            .nodes
2448            .iter()
2449            .find(|n| n.name == target)
2450            .ok_or_else(|| eyre!("no node configured with name {target:?}"))?
2451            .clone();
2452        let new_api = Arc::new(ApiClient::from_node(&node)?);
2453        // Cancel the current hub's children and let it drop. The new
2454        // hub spawns under the same root_cancel so quit-time teardown
2455        // still walks the whole tree in one go.
2456        self.watch.shutdown();
2457        let refresh = RefreshProfile::from_config(&self.config.ui.refresh);
2458        let new_watch = BeeWatch::start_with_profile(new_api.clone(), &self.root_cancel, refresh);
2459        let new_health_rx = new_watch.health();
2460        // Spin up a fresh cost-context poller for the new context;
2461        // the old one keeps emitting until root_cancel fires at quit
2462        // (cheap — one tokio task), but the screens consume only the
2463        // new receiver after this point.
2464        let new_market_rx = if self.config.economics.enable_market_tile {
2465            Some(economics_oracle::spawn_poller(
2466                self.config.economics.gnosis_rpc_url.clone(),
2467                self.root_cancel.child_token(),
2468            ))
2469        } else {
2470            None
2471        };
2472        let new_screens = build_screens(&new_api, &new_watch, new_market_rx);
2473        self.api = new_api;
2474        self.watch = new_watch;
2475        self.health_rx = new_health_rx;
2476        self.screens = new_screens;
2477        // Keep the same tab index so the operator stays on the
2478        // screen they were looking at — same data shape, new node.
2479        Ok(())
2480    }
2481
2482    /// Build and persist a redacted diagnostic bundle to a file in
2483    /// the system temp directory. Designed to be paste-ready into a
2484    /// support thread (Discord, GitHub issue) without leaking
2485    /// auth tokens — URLs are reduced to their path component, since
2486    /// Bearer tokens live in headers, not URLs.
2487    /// Kick off `GET /pins/check` in a background task. Returns the
2488    /// destination file path immediately so the operator can `tail -f`
2489    /// it while bee-rs streams the NDJSON response. Each pin is
2490    /// appended as a single line: `<ref>  total=N  missing=N  invalid=N
2491    /// (healthy|UNHEALTHY)`. A `# done. <n> pins checked.` trailer
2492    /// signals completion.
2493    ///
2494    /// The task captures `Arc<ApiClient>` so a `:context` switch
2495    /// mid-check still completes against the original profile — the
2496    /// destination file's name pins the profile so two parallel
2497    /// invocations against different profiles don't collide.
2498    fn start_pins_check(&self) -> std::io::Result<PathBuf> {
2499        let secs = SystemTime::now()
2500            .duration_since(UNIX_EPOCH)
2501            .map(|d| d.as_secs())
2502            .unwrap_or(0);
2503        let path = std::env::temp_dir().join(format!(
2504            "bee-tui-pins-check-{}-{secs}.txt",
2505            sanitize_for_filename(&self.api.name),
2506        ));
2507        // Pre-create with a header so the operator's `tail -f` finds
2508        // something immediately, even before the first pin lands.
2509        std::fs::write(
2510            &path,
2511            format!(
2512                "# bee-tui :pins-check\n# profile  {}\n# endpoint {}\n# started  {}\n",
2513                self.api.name,
2514                self.api.url,
2515                format_utc_now(),
2516            ),
2517        )?;
2518
2519        let api = self.api.clone();
2520        let dest = path.clone();
2521        tokio::spawn(async move {
2522            let bee = api.bee();
2523            match bee.api().check_pins(None).await {
2524                Ok(entries) => {
2525                    let mut body = String::new();
2526                    for e in &entries {
2527                        body.push_str(&format!(
2528                            "{}  total={}  missing={}  invalid={}  {}\n",
2529                            e.reference.to_hex(),
2530                            e.total,
2531                            e.missing,
2532                            e.invalid,
2533                            if e.is_healthy() {
2534                                "healthy"
2535                            } else {
2536                                "UNHEALTHY"
2537                            },
2538                        ));
2539                    }
2540                    body.push_str(&format!("# done. {} pins checked.\n", entries.len()));
2541                    if let Err(e) = append(&dest, &body) {
2542                        let _ = append(&dest, &format!("# write error: {e}\n"));
2543                    }
2544                }
2545                Err(e) => {
2546                    let _ = append(&dest, &format!("# error: {e}\n"));
2547                }
2548            }
2549        });
2550        Ok(path)
2551    }
2552
2553    /// Spawn a fire-and-forget task that calls
2554    /// `set_logger(expression, level)` against the node. The result
2555    /// (success or error) is appended to a `:loggers`-style log file
2556    /// so the operator has a paper trail of mutations made from the
2557    /// cockpit. Per-profile and per-call so multiple `:set-logger`
2558    /// invocations don't trample each other's record.
2559    ///
2560    /// Bee will validate `level` against its own enum (`none|error|
2561    /// warning|info|debug|all`); bee-rs does the same client-side, so
2562    /// a mistyped level errors out before any HTTP request goes out.
2563    fn start_set_logger(&self, expression: String, level: String) {
2564        let secs = SystemTime::now()
2565            .duration_since(UNIX_EPOCH)
2566            .map(|d| d.as_secs())
2567            .unwrap_or(0);
2568        let dest = std::env::temp_dir().join(format!(
2569            "bee-tui-set-logger-{}-{secs}.txt",
2570            sanitize_for_filename(&self.api.name),
2571        ));
2572        let _ = std::fs::write(
2573            &dest,
2574            format!(
2575                "# bee-tui :set-logger\n# profile  {}\n# endpoint {}\n# expr     {expression}\n# level    {level}\n# started  {}\n",
2576                self.api.name,
2577                self.api.url,
2578                format_utc_now(),
2579            ),
2580        );
2581
2582        let api = self.api.clone();
2583        tokio::spawn(async move {
2584            let bee = api.bee();
2585            match bee.debug().set_logger(&expression, &level).await {
2586                Ok(()) => {
2587                    let _ = append(
2588                        &dest,
2589                        &format!("# done. {expression} → {level} accepted by Bee.\n"),
2590                    );
2591                }
2592                Err(e) => {
2593                    let _ = append(&dest, &format!("# error: {e}\n"));
2594                }
2595            }
2596        });
2597    }
2598
2599    /// Snapshot Bee's logger configuration to a file. Same on-demand
2600    /// pattern as `:pins-check`: capture the registered loggers + their
2601    /// verbosity into a sortable text table so operators can answer
2602    /// "is push-sync at debug right now?" without curling the API.
2603    fn start_loggers_dump(&self) -> std::io::Result<PathBuf> {
2604        let secs = SystemTime::now()
2605            .duration_since(UNIX_EPOCH)
2606            .map(|d| d.as_secs())
2607            .unwrap_or(0);
2608        let path = std::env::temp_dir().join(format!(
2609            "bee-tui-loggers-{}-{secs}.txt",
2610            sanitize_for_filename(&self.api.name),
2611        ));
2612        std::fs::write(
2613            &path,
2614            format!(
2615                "# bee-tui :loggers\n# profile  {}\n# endpoint {}\n# started  {}\n",
2616                self.api.name,
2617                self.api.url,
2618                format_utc_now(),
2619            ),
2620        )?;
2621
2622        let api = self.api.clone();
2623        let dest = path.clone();
2624        tokio::spawn(async move {
2625            let bee = api.bee();
2626            match bee.debug().loggers().await {
2627                Ok(listing) => {
2628                    let mut rows = listing.loggers.clone();
2629                    // Stable sort: verbosity buckets first ("all"
2630                    // before "1"/"info" etc. so the loud loggers
2631                    // float to the top), then logger name.
2632                    rows.sort_by(|a, b| {
2633                        verbosity_rank(&b.verbosity)
2634                            .cmp(&verbosity_rank(&a.verbosity))
2635                            .then_with(|| a.logger.cmp(&b.logger))
2636                    });
2637                    let mut body = String::new();
2638                    body.push_str(&format!("# {} loggers registered\n", rows.len()));
2639                    body.push_str("# VERBOSITY  LOGGER\n");
2640                    for r in &rows {
2641                        body.push_str(&format!("  {:<9}  {}\n", r.verbosity, r.logger,));
2642                    }
2643                    body.push_str("# done.\n");
2644                    if let Err(e) = append(&dest, &body) {
2645                        let _ = append(&dest, &format!("# write error: {e}\n"));
2646                    }
2647                }
2648                Err(e) => {
2649                    let _ = append(&dest, &format!("# error: {e}\n"));
2650                }
2651            }
2652        });
2653        Ok(path)
2654    }
2655
2656    /// `:diagnose --pprof[=N]` — drop the existing diagnostic text into
2657    /// a fresh directory, then asynchronously fetch
2658    /// `/debug/pprof/profile?seconds=N` and `/debug/pprof/trace?seconds=N`
2659    /// and write each as a sibling file. The operator's command-status
2660    /// row gets a "running" notice immediately; the final bundle path
2661    /// (or error) lands via `cmd_status_tx` when the pprof block ends.
2662    ///
2663    /// Pprof endpoints live on Bee's debug API. When operators
2664    /// haven't enabled `--debug-api-enable=true` the endpoint 404s;
2665    /// the helper translates that into a clear "enable Bee's debug
2666    /// API" hint.
2667    fn start_diagnose_with_pprof(&self, seconds: u32) -> CommandStatus {
2668        let secs_unix = SystemTime::now()
2669            .duration_since(UNIX_EPOCH)
2670            .map(|d| d.as_secs())
2671            .unwrap_or(0);
2672        let dir = std::env::temp_dir().join(format!("bee-tui-diagnostic-{secs_unix}"));
2673        if let Err(e) = std::fs::create_dir_all(&dir) {
2674            return CommandStatus::Err(format!("diagnose --pprof: mkdir failed: {e}"));
2675        }
2676        let bundle_text = self.render_diagnostic_bundle();
2677        if let Err(e) = std::fs::write(dir.join("bundle.txt"), &bundle_text) {
2678            return CommandStatus::Err(format!("diagnose --pprof: write bundle.txt: {e}"));
2679        }
2680        // Resolve the active node's auth token so the pprof fetch
2681        // carries the same Authorization header bee-tui uses for the
2682        // regular API. Bee's debug-api-addr inherits the token when
2683        // it's served on the same listener.
2684        let auth_token = self
2685            .config
2686            .nodes
2687            .iter()
2688            .find(|n| n.name == self.api.name)
2689            .and_then(|n| n.resolved_token());
2690        let base_url = self.api.url.clone();
2691        let dir_for_task = dir.clone();
2692        let tx = self.cmd_status_tx.clone();
2693        tokio::spawn(async move {
2694            let r =
2695                pprof_bundle::fetch_and_write(&base_url, auth_token, seconds, dir_for_task).await;
2696            let status = match r {
2697                Ok(b) => CommandStatus::Info(b.summary()),
2698                Err(e) => CommandStatus::Err(format!("diagnose --pprof failed: {e}")),
2699            };
2700            let _ = tx.send(status);
2701        });
2702        CommandStatus::Info(format!(
2703            "diagnose --pprof={seconds}s in flight (bundle.txt already at {}; profile + trace will join when sampling completes)",
2704            dir.display()
2705        ))
2706    }
2707
2708    fn export_diagnostic_bundle(&self) -> std::io::Result<PathBuf> {
2709        let bundle = self.render_diagnostic_bundle();
2710        let secs = SystemTime::now()
2711            .duration_since(UNIX_EPOCH)
2712            .map(|d| d.as_secs())
2713            .unwrap_or(0);
2714        let path = std::env::temp_dir().join(format!("bee-tui-diagnostic-{secs}.txt"));
2715        std::fs::write(&path, bundle)?;
2716        Ok(path)
2717    }
2718
2719    fn render_diagnostic_bundle(&self) -> String {
2720        let now = format_utc_now();
2721        let health = self.health_rx.borrow().clone();
2722        let topology = self.watch.topology().borrow().clone();
2723        let stamps = self.watch.stamps().borrow().clone();
2724        let gates = Health::gates_for_with_stamps(&health, Some(&topology), Some(&stamps));
2725        let recent: Vec<_> = log_capture::handle()
2726            .map(|c| {
2727                let mut snap = c.snapshot();
2728                let len = snap.len();
2729                if len > 50 {
2730                    snap.drain(0..len - 50);
2731                }
2732                snap
2733            })
2734            .unwrap_or_default();
2735
2736        let mut out = String::new();
2737        out.push_str("# bee-tui diagnostic bundle\n");
2738        out.push_str(&format!("# generated UTC {now}\n\n"));
2739        out.push_str("## profile\n");
2740        out.push_str(&format!("  name      {}\n", self.api.name));
2741        out.push_str(&format!("  endpoint  {}\n\n", self.api.url));
2742        out.push_str("## health gates\n");
2743        for g in &gates {
2744            out.push_str(&format_gate_line(g));
2745        }
2746        out.push_str("\n## last API calls (path only — Bearer tokens, if any, live in headers and aren't captured)\n");
2747        for e in &recent {
2748            let status = e
2749                .status
2750                .map(|s| s.to_string())
2751                .unwrap_or_else(|| "—".into());
2752            let elapsed = e
2753                .elapsed_ms
2754                .map(|ms| format!("{ms}ms"))
2755                .unwrap_or_else(|| "—".into());
2756            out.push_str(&format!(
2757                "  {ts} {method:<5} {path:<32} {status:>4} {elapsed:>7}\n",
2758                ts = e.ts,
2759                method = e.method,
2760                path = path_only(&e.url),
2761                status = status,
2762                elapsed = elapsed,
2763            ));
2764        }
2765        out.push_str(&format!(
2766            "\n## generated by bee-tui {}\n",
2767            env!("CARGO_PKG_VERSION"),
2768        ));
2769        out
2770    }
2771
2772    /// Per-Tick webhook alerter. No-op when `[alerts].webhook_url`
2773    /// is unset — operators get the cockpit's existing visual gates
2774    /// without any outbound traffic. When configured, we compute the
2775    /// same `Health::gates_for(...)` view the cockpit renders, diff
2776    /// against the previous Tick's status, and POST one webhook per
2777    /// transition that survives the per-gate debounce.
2778    fn tick_alerts(&mut self) {
2779        let url = match self.config.alerts.webhook_url.as_deref() {
2780            Some(u) if !u.is_empty() => u.to_string(),
2781            _ => return,
2782        };
2783        let health = self.health_rx.borrow().clone();
2784        let topology = self.watch.topology().borrow().clone();
2785        let stamps = self.watch.stamps().borrow().clone();
2786        let gates = Health::gates_for_with_stamps(&health, Some(&topology), Some(&stamps));
2787        let alerts = self.alert_state.diff_and_record(&gates);
2788        for alert in alerts {
2789            let url = url.clone();
2790            tokio::spawn(async move {
2791                if let Err(e) = crate::alerts::fire(&url, &alert).await {
2792                    tracing::warn!(target: "bee_tui::alerts", "webhook fire failed: {e}");
2793                }
2794            });
2795        }
2796    }
2797
2798    fn handle_actions(&mut self, tui: &mut Tui) -> color_eyre::Result<()> {
2799        while let Ok(action) = self.action_rx.try_recv() {
2800            if action != Action::Tick && action != Action::Render {
2801                debug!("{action:?}");
2802            }
2803            match action {
2804                Action::Tick => {
2805                    self.last_tick_key_events.drain(..);
2806                    // Advance the cold-start spinner once per tick
2807                    // so every screen's "loading…" line shows
2808                    // motion at a consistent cadence.
2809                    theme::advance_spinner();
2810                    // Refresh the supervised Bee's status (cheap
2811                    // non-blocking try_wait). Surfaced in the top
2812                    // bar so a mid-session crash is visible.
2813                    if let Some(sup) = self.supervisor.as_mut() {
2814                        self.bee_status = sup.status();
2815                    }
2816                    // Drain any newly-tailed Bee log lines into the
2817                    // log pane. Bounded loop — the channel is
2818                    // unbounded but try_recv stops at the first
2819                    // empty so we don't block the tick.
2820                    if let Some(rx) = self.bee_log_rx.as_mut() {
2821                        while let Ok((tab, line)) = rx.try_recv() {
2822                            self.log_pane.push_bee(tab, line);
2823                        }
2824                    }
2825                    // Surface async command-result updates (e.g.
2826                    // `:probe-upload` finished). The latest message
2827                    // wins — earlier ones get implicitly overwritten
2828                    // because we keep the loop draining.
2829                    while let Ok(status) = self.cmd_status_rx.try_recv() {
2830                        self.command_status = Some(status);
2831                    }
2832                    // Drain durability-check completions into the
2833                    // S13 Watchlist screen. Late results are still
2834                    // recorded — operators want to see every check
2835                    // they fired, not just the most recent.
2836                    while let Ok(result) = self.durability_rx.try_recv() {
2837                        if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "Watchlist") {
2838                            if let Some(wl) = self
2839                                .screens
2840                                .get_mut(idx)
2841                                .and_then(|s| s.as_any_mut())
2842                                .and_then(|a| a.downcast_mut::<Watchlist>())
2843                            {
2844                                wl.record(result);
2845                            }
2846                        }
2847                    }
2848                    // Drain feed-timeline walk completions into S14.
2849                    // Newest message wins — operator can fire a fresh
2850                    // :feed-timeline mid-walk and the in-flight result
2851                    // will overwrite this immediately.
2852                    while let Ok(msg) = self.feed_timeline_rx.try_recv() {
2853                        if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "FeedTimeline") {
2854                            if let Some(ft) = self
2855                                .screens
2856                                .get_mut(idx)
2857                                .and_then(|s| s.as_any_mut())
2858                                .and_then(|a| a.downcast_mut::<FeedTimeline>())
2859                            {
2860                                match msg {
2861                                    FeedTimelineMessage::Loaded(t) => ft.set_timeline(t),
2862                                    FeedTimelineMessage::Failed(e) => ft.set_error(e),
2863                                }
2864                            }
2865                        }
2866                    }
2867                    // Drain pubsub messages into S15 + sync the
2868                    // active-subs count so the header reflects
2869                    // start/stop verb activity even on a quiet topic.
2870                    let mut buffered: Vec<crate::pubsub::PubsubMessage> = Vec::new();
2871                    while let Ok(msg) = self.pubsub_msg_rx.try_recv() {
2872                        buffered.push(msg);
2873                    }
2874                    if let Some(idx) = SCREEN_NAMES.iter().position(|n| *n == "Pubsub") {
2875                        if let Some(ps) = self
2876                            .screens
2877                            .get_mut(idx)
2878                            .and_then(|s| s.as_any_mut())
2879                            .and_then(|a| a.downcast_mut::<Pubsub>())
2880                        {
2881                            for m in buffered {
2882                                ps.record(m);
2883                            }
2884                            ps.set_active_count(self.pubsub_subs.len());
2885                        }
2886                    }
2887                    // Webhook health-gate alerts. Cheap when not
2888                    // configured (no clones, no work) — only computes
2889                    // gates and diffs when [alerts].webhook_url is set.
2890                    self.tick_alerts();
2891                }
2892                Action::Quit => self.should_quit = true,
2893                Action::Suspend => self.should_suspend = true,
2894                Action::Resume => self.should_suspend = false,
2895                Action::ClearScreen => tui.terminal.clear()?,
2896                Action::Resize(w, h) => self.handle_resize(tui, w, h)?,
2897                Action::Render => self.render(tui)?,
2898                _ => {}
2899            }
2900            let tx = self.action_tx.clone();
2901            for component in self.iter_components_mut() {
2902                if let Some(action) = component.update(action.clone())? {
2903                    tx.send(action)?
2904                };
2905            }
2906        }
2907        Ok(())
2908    }
2909
2910    fn handle_resize(&mut self, tui: &mut Tui, w: u16, h: u16) -> color_eyre::Result<()> {
2911        tui.resize(Rect::new(0, 0, w, h))?;
2912        self.render(tui)?;
2913        Ok(())
2914    }
2915
2916    fn render(&mut self, tui: &mut Tui) -> color_eyre::Result<()> {
2917        let active = self.current_screen;
2918        let tx = self.action_tx.clone();
2919        let screens = &mut self.screens;
2920        let log_pane = &mut self.log_pane;
2921        let log_pane_height = log_pane.height();
2922        let command_buffer = self.command_buffer.clone();
2923        let command_suggestion_index = self.command_suggestion_index;
2924        let command_status = self.command_status.clone();
2925        let help_visible = self.help_visible;
2926        let profile = self.api.name.clone();
2927        let endpoint = self.api.url.clone();
2928        let last_ping = self.health_rx.borrow().last_ping;
2929        let now_utc = format_utc_now();
2930        let bee_status_label = if self.supervisor.is_some() && !self.bee_status.is_running() {
2931            // Only show the status when (a) we're acting as the
2932            // supervisor and (b) something is wrong. Hiding the
2933            // happy-path label keeps the metadata line uncluttered.
2934            Some(self.bee_status.label())
2935        } else {
2936            None
2937        };
2938        tui.draw(|frame| {
2939            use ratatui::layout::{Constraint, Layout};
2940            use ratatui::style::{Color, Modifier, Style};
2941            use ratatui::text::{Line, Span};
2942            use ratatui::widgets::Paragraph;
2943
2944            let chunks = Layout::vertical([
2945                Constraint::Length(2),               // top-bar (metadata + tabs)
2946                Constraint::Min(0),                  // active screen
2947                Constraint::Length(1),               // command bar / status line
2948                Constraint::Length(log_pane_height), // tabbed log pane (operator-resizable)
2949            ])
2950            .split(frame.area());
2951
2952            let top_chunks =
2953                Layout::vertical([Constraint::Length(1), Constraint::Length(1)]).split(chunks[0]);
2954
2955            // Metadata line: profile · endpoint · ping · clock.
2956            let ping_str = match last_ping {
2957                Some(d) => format!("{}ms", d.as_millis()),
2958                None => "—".into(),
2959            };
2960            let t = theme::active();
2961            let mut metadata_spans = vec![
2962                Span::styled(
2963                    " bee-tui ",
2964                    Style::default()
2965                        .fg(Color::Black)
2966                        .bg(t.info)
2967                        .add_modifier(Modifier::BOLD),
2968                ),
2969                Span::raw("  "),
2970                Span::styled(
2971                    profile,
2972                    Style::default().fg(t.accent).add_modifier(Modifier::BOLD),
2973                ),
2974                Span::styled(format!(" @ {endpoint}"), Style::default().fg(t.dim)),
2975                Span::raw("   "),
2976                Span::styled("ping ", Style::default().fg(t.dim)),
2977                Span::styled(ping_str, Style::default().fg(t.info)),
2978                Span::raw("   "),
2979                Span::styled(format!("UTC {now_utc}"), Style::default().fg(t.dim)),
2980            ];
2981            // Append a Bee-process status chip iff the supervisor is
2982            // active AND something is wrong. Renders red so a crash
2983            // mid-session is impossible to miss in the top bar.
2984            if let Some(label) = bee_status_label.as_ref() {
2985                metadata_spans.push(Span::raw("   "));
2986                metadata_spans.push(Span::styled(
2987                    format!(" {label} "),
2988                    Style::default()
2989                        .fg(Color::Black)
2990                        .bg(t.fail)
2991                        .add_modifier(Modifier::BOLD),
2992                ));
2993            }
2994            let metadata_line = Line::from(metadata_spans);
2995            frame.render_widget(Paragraph::new(metadata_line), top_chunks[0]);
2996
2997            // Tab strip with the active screen highlighted.
2998            let theme = *theme::active();
2999            let mut tabs = Vec::with_capacity(SCREEN_NAMES.len() * 2);
3000            for (i, name) in SCREEN_NAMES.iter().enumerate() {
3001                let style = if i == active {
3002                    Style::default()
3003                        .fg(theme.tab_active_fg)
3004                        .bg(theme.tab_active_bg)
3005                        .add_modifier(Modifier::BOLD)
3006                } else {
3007                    Style::default().fg(theme.dim)
3008                };
3009                tabs.push(Span::styled(format!(" {name} "), style));
3010                tabs.push(Span::raw(" "));
3011            }
3012            tabs.push(Span::styled(
3013                ":cmd · Tab to cycle · ? help",
3014                Style::default().fg(theme.dim),
3015            ));
3016            frame.render_widget(Paragraph::new(Line::from(tabs)), top_chunks[1]);
3017
3018            // Active screen
3019            if let Some(screen) = screens.get_mut(active) {
3020                if let Err(err) = screen.draw(frame, chunks[1]) {
3021                    let _ = tx.send(Action::Error(format!("Failed to draw screen: {err:?}")));
3022                }
3023            }
3024            // Command bar / status line
3025            let prompt = if let Some(buf) = &command_buffer {
3026                Line::from(vec![
3027                    Span::styled(
3028                        ":",
3029                        Style::default().fg(t.accent).add_modifier(Modifier::BOLD),
3030                    ),
3031                    Span::styled(buf.clone(), Style::default().add_modifier(Modifier::BOLD)),
3032                    Span::styled("█", Style::default().fg(t.accent)),
3033                ])
3034            } else {
3035                match &command_status {
3036                    Some(CommandStatus::Info(msg)) => {
3037                        Line::from(Span::styled(msg.clone(), Style::default().fg(t.pass)))
3038                    }
3039                    Some(CommandStatus::Err(msg)) => {
3040                        Line::from(Span::styled(msg.clone(), Style::default().fg(t.fail)))
3041                    }
3042                    None => Line::from(""),
3043                }
3044            };
3045            frame.render_widget(Paragraph::new(prompt), chunks[2]);
3046
3047            // Command suggestion popup — floats above the command bar
3048            // while the operator is typing. Filtered list of known
3049            // verbs that prefix-match the buffer's first token; Up/Down
3050            // navigates, Tab completes. Skipped silently if the
3051            // command bar is closed or no commands match.
3052            if let Some(buf) = &command_buffer {
3053                let matches = filter_command_suggestions(buf, KNOWN_COMMANDS);
3054                if !matches.is_empty() {
3055                    draw_command_suggestions(
3056                        frame,
3057                        chunks[2],
3058                        &matches,
3059                        command_suggestion_index,
3060                        &theme,
3061                    );
3062                }
3063            }
3064
3065            // Tabbed log pane
3066            if let Err(err) = log_pane.draw(frame, chunks[3]) {
3067                let _ = tx.send(Action::Error(format!("Failed to draw log: {err:?}")));
3068            }
3069
3070            // Help overlay — drawn last so it floats above everything
3071            // else. Centred with a fixed width that fits even narrow
3072            // terminals (≥60 cols). Falls back to the full screen on
3073            // anything narrower.
3074            if help_visible {
3075                draw_help_overlay(frame, frame.area(), active, &theme);
3076            }
3077        })?;
3078        Ok(())
3079    }
3080}
3081
3082/// Render the command-suggestion popup just above the command bar.
3083/// Floats over the active screen (uses `Clear` to blank what's
3084/// underneath) and highlights the row at `selected` so Up/Down
3085/// navigation is visible. Auto-scrolls if the filtered list exceeds
3086/// the visible window — operators see at most `MAX_VISIBLE` rows at
3087/// a time.
3088fn draw_command_suggestions(
3089    frame: &mut ratatui::Frame,
3090    bar_rect: ratatui::layout::Rect,
3091    matches: &[&(&str, &str)],
3092    selected: usize,
3093    theme: &theme::Theme,
3094) {
3095    use ratatui::layout::Rect;
3096    use ratatui::style::{Modifier, Style};
3097    use ratatui::text::{Line, Span};
3098    use ratatui::widgets::{Block, Borders, Clear, Paragraph};
3099
3100    const MAX_VISIBLE: usize = 10;
3101    let visible_rows = matches.len().min(MAX_VISIBLE);
3102    if visible_rows == 0 {
3103        return;
3104    }
3105    let height = (visible_rows as u16) + 2; // +2 for top + bottom borders
3106    // Width = longest "name  description" line + borders + padding,
3107    // capped at 80% of the screen so wide descriptions don't push
3108    // the popup off the edge.
3109    let widest = matches
3110        .iter()
3111        .map(|(name, desc)| name.len() + desc.len() + 6)
3112        .max()
3113        .unwrap_or(40)
3114        .min(bar_rect.width as usize);
3115    let width = (widest as u16 + 2).min(bar_rect.width);
3116    // Anchor above the command bar; if the popup would clip the top
3117    // of the screen, fall back to as much vertical room as we have.
3118    let bottom = bar_rect.y;
3119    let y = bottom.saturating_sub(height);
3120    let popup = Rect {
3121        x: bar_rect.x,
3122        y,
3123        width,
3124        height: bottom - y,
3125    };
3126
3127    // Auto-scroll: keep `selected` inside the visible window.
3128    let scroll_start = if selected >= visible_rows {
3129        selected + 1 - visible_rows
3130    } else {
3131        0
3132    };
3133    let visible_slice = &matches[scroll_start..(scroll_start + visible_rows).min(matches.len())];
3134
3135    let mut lines: Vec<Line> = Vec::with_capacity(visible_slice.len());
3136    for (i, (name, desc)) in visible_slice.iter().enumerate() {
3137        let absolute_idx = scroll_start + i;
3138        let is_selected = absolute_idx == selected;
3139        let row_style = if is_selected {
3140            Style::default()
3141                .fg(theme.tab_active_fg)
3142                .bg(theme.tab_active_bg)
3143                .add_modifier(Modifier::BOLD)
3144        } else {
3145            Style::default()
3146        };
3147        let cursor = if is_selected { "▸ " } else { "  " };
3148        lines.push(Line::from(vec![
3149            Span::styled(format!("{cursor}:{name:<16}  "), row_style),
3150            Span::styled(
3151                desc.to_string(),
3152                if is_selected {
3153                    row_style
3154                } else {
3155                    Style::default().fg(theme.dim)
3156                },
3157            ),
3158        ]));
3159    }
3160
3161    // Title shows pagination state when the list overflows.
3162    let title = if matches.len() > MAX_VISIBLE {
3163        format!(" :commands ({}/{}) ", selected + 1, matches.len())
3164    } else {
3165        " :commands ".to_string()
3166    };
3167
3168    frame.render_widget(Clear, popup);
3169    frame.render_widget(
3170        Paragraph::new(lines).block(
3171            Block::default()
3172                .borders(Borders::ALL)
3173                .border_style(Style::default().fg(theme.accent))
3174                .title(title),
3175        ),
3176        popup,
3177    );
3178}
3179
3180/// Render the `?` help overlay. Pulls a per-screen keymap from
3181/// [`screen_keymap`] and pairs it with the global keys (Tab, `:`,
3182/// `q`). Drawn as a centred floating box; everything outside is
3183/// dimmed via a [`Clear`] underlay.
3184fn draw_help_overlay(
3185    frame: &mut ratatui::Frame,
3186    area: ratatui::layout::Rect,
3187    active_screen: usize,
3188    theme: &theme::Theme,
3189) {
3190    use ratatui::layout::Rect;
3191    use ratatui::style::{Modifier, Style};
3192    use ratatui::text::{Line, Span};
3193    use ratatui::widgets::{Block, Borders, Clear, Paragraph};
3194
3195    let screen_name = SCREEN_NAMES.get(active_screen).copied().unwrap_or("?");
3196    let screen_rows = screen_keymap(active_screen);
3197    let global_rows: &[(&str, &str)] = &[
3198        ("Tab", "next screen"),
3199        ("Shift+Tab", "previous screen"),
3200        ("[ / ]", "previous / next log-pane tab"),
3201        ("+ / -", "grow / shrink log pane"),
3202        ("Shift+↑/↓", "scroll log pane (1 line); pauses auto-tail"),
3203        ("Shift+PgUp/PgDn", "scroll log pane (10 lines)"),
3204        ("Shift+←/→", "pan log pane horizontally (8 cols)"),
3205        ("Shift+End", "resume auto-tail + reset horizontal pan"),
3206        ("?", "toggle this help"),
3207        (":", "open command bar"),
3208        ("qq", "quit (double-tap; or :q)"),
3209        ("Ctrl+C / Ctrl+D", "quit immediately"),
3210    ];
3211
3212    // Layout: pick the smaller of (screen size, 70x22) so we always
3213    // fit on small terminals.
3214    let w = area.width.min(72);
3215    let h = area.height.min(22);
3216    let x = area.x + (area.width.saturating_sub(w)) / 2;
3217    let y = area.y + (area.height.saturating_sub(h)) / 2;
3218    let rect = Rect {
3219        x,
3220        y,
3221        width: w,
3222        height: h,
3223    };
3224
3225    let mut lines: Vec<Line> = Vec::new();
3226    lines.push(Line::from(vec![
3227        Span::styled(
3228            format!(" {screen_name} "),
3229            Style::default()
3230                .fg(theme.tab_active_fg)
3231                .bg(theme.tab_active_bg)
3232                .add_modifier(Modifier::BOLD),
3233        ),
3234        Span::raw("   screen-specific keys"),
3235    ]));
3236    lines.push(Line::from(""));
3237    if screen_rows.is_empty() {
3238        lines.push(Line::from(Span::styled(
3239            "  (no extra keys for this screen — use the command bar via :)",
3240            Style::default()
3241                .fg(theme.dim)
3242                .add_modifier(Modifier::ITALIC),
3243        )));
3244    } else {
3245        for (key, desc) in screen_rows {
3246            lines.push(format_help_row(key, desc, theme));
3247        }
3248    }
3249    lines.push(Line::from(""));
3250    lines.push(Line::from(Span::styled(
3251        "  global",
3252        Style::default().fg(theme.dim).add_modifier(Modifier::BOLD),
3253    )));
3254    for (key, desc) in global_rows {
3255        lines.push(format_help_row(key, desc, theme));
3256    }
3257    lines.push(Line::from(""));
3258    lines.push(Line::from(Span::styled(
3259        "  Esc / ? / q to dismiss",
3260        Style::default()
3261            .fg(theme.dim)
3262            .add_modifier(Modifier::ITALIC),
3263    )));
3264
3265    // `Clear` blanks the underlying rendered region so the overlay
3266    // doesn't ghost over screen content.
3267    frame.render_widget(Clear, rect);
3268    frame.render_widget(
3269        Paragraph::new(lines).block(
3270            Block::default()
3271                .borders(Borders::ALL)
3272                .border_style(Style::default().fg(theme.accent))
3273                .title(" help "),
3274        ),
3275        rect,
3276    );
3277}
3278
3279fn format_help_row<'a>(
3280    key: &'a str,
3281    desc: &'a str,
3282    theme: &theme::Theme,
3283) -> ratatui::text::Line<'a> {
3284    use ratatui::style::{Modifier, Style};
3285    use ratatui::text::{Line, Span};
3286    Line::from(vec![
3287        Span::raw("  "),
3288        Span::styled(
3289            format!("{key:<16}"),
3290            Style::default()
3291                .fg(theme.accent)
3292                .add_modifier(Modifier::BOLD),
3293        ),
3294        Span::raw("  "),
3295        Span::raw(desc),
3296    ])
3297}
3298
3299/// Per-screen keymap rows, indexed by the same position as
3300/// [`SCREEN_NAMES`]. Edit here when a screen grows new keys —
3301/// no other place needs updating.
3302fn screen_keymap(active_screen: usize) -> &'static [(&'static str, &'static str)] {
3303    match active_screen {
3304        // 0: Health — read-only
3305        1 => &[
3306            ("↑↓ / j k", "move row selection"),
3307            ("Enter", "drill batch — bucket histogram + worst-N"),
3308            ("Esc", "close drill"),
3309        ],
3310        // 2: Swap — read-only
3311        3 => &[("r", "run on-demand rchash benchmark")],
3312        4 => &[
3313            ("↑↓ / j k", "move peer selection"),
3314            (
3315                "Enter",
3316                "drill peer — balance / cheques / settlement / ping",
3317            ),
3318            ("Esc", "close drill"),
3319        ],
3320        // 5: Network — read-only
3321        // 6: Warmup — read-only
3322        // 7: API — read-only
3323        8 => &[
3324            ("↑↓ / j k", "scroll one row"),
3325            ("PgUp / PgDn", "scroll ten rows"),
3326            ("Home", "back to top"),
3327        ],
3328        // 9: Pins — selectable rows + on-demand integrity check.
3329        9 => &[
3330            ("↑↓ / j k", "move row selection"),
3331            ("Enter", "integrity-check the highlighted pin"),
3332            ("c", "integrity-check every unchecked pin"),
3333            ("s", "cycle sort: ref order / bad first / by size"),
3334        ],
3335        // 10: Manifest — Mantaray tree browser.
3336        10 => &[
3337            ("↑↓ / j k", "move row selection"),
3338            ("Enter", "expand / collapse fork (loads child chunk)"),
3339            (":manifest <ref>", "open a manifest at a reference"),
3340            (":inspect <ref>", "what is this? auto-detects manifest"),
3341        ],
3342        // 11: Watchlist — durability-check history.
3343        11 => &[
3344            ("↑↓ / j k", "move row selection"),
3345            (":durability-check <ref>", "walk chunk graph + record"),
3346        ],
3347        // 12: Feed Timeline — feed history walker.
3348        12 => &[
3349            ("↑↓ / j k", "move row selection"),
3350            ("PgUp / PgDn", "jump 10 rows"),
3351            (
3352                ":feed-timeline <owner> <topic> [N]",
3353                "load history (default 50)",
3354            ),
3355        ],
3356        // 13: Pubsub watch — live PSS / GSOC tail.
3357        13 => &[
3358            ("↑↓ / j k", "move row selection"),
3359            ("PgUp / PgDn", "jump 10 rows"),
3360            ("c", "clear timeline"),
3361            (":pubsub-pss <topic>", "subscribe to a PSS topic"),
3362            (":pubsub-gsoc <owner> <id>", "subscribe to a GSOC SOC"),
3363            (":pubsub-stop [sub-id]", "stop one (or all) subscriptions"),
3364            (
3365                ":pubsub-filter <substr>",
3366                "show only rows containing substring",
3367            ),
3368            (":pubsub-filter-clear", "remove the active filter"),
3369        ],
3370        _ => &[],
3371    }
3372}
3373
3374/// Construct every cockpit screen with receivers from the supplied
3375/// hub. Extracted so `App::new` and the `:context` profile-switcher
3376/// can share the wiring — the screen list is the same on every
3377/// connection, only the underlying watch hub changes.
3378///
3379/// Order matters — the [`SCREEN_NAMES`] table assumes index 0 is
3380/// Health, 1 is Stamps, 2 is Swap, 3 is Lottery, 4 is Peers, 5 is
3381/// Network, 6 is Warmup, 7 is API, 8 is Tags, 9 is Pins.
3382fn build_screens(
3383    api: &Arc<ApiClient>,
3384    watch: &BeeWatch,
3385    market_rx: Option<watch::Receiver<crate::economics_oracle::EconomicsSnapshot>>,
3386) -> Vec<Box<dyn Component>> {
3387    let health = Health::new(api.clone(), watch.health(), watch.topology());
3388    let stamps = Stamps::new(api.clone(), watch.stamps());
3389    let swap = match market_rx {
3390        Some(rx) => Swap::new(watch.swap()).with_market_feed(rx),
3391        None => Swap::new(watch.swap()),
3392    };
3393    let lottery = Lottery::new(api.clone(), watch.health(), watch.lottery());
3394    let peers = Peers::new(api.clone(), watch.topology());
3395    let network = Network::new(watch.network(), watch.topology());
3396    let warmup = Warmup::new(watch.health(), watch.stamps(), watch.topology());
3397    let api_health = ApiHealth::new(
3398        api.clone(),
3399        watch.health(),
3400        watch.transactions(),
3401        log_capture::handle(),
3402    );
3403    let tags = Tags::new(watch.tags());
3404    let pins = Pins::new(api.clone(), watch.pins());
3405    let manifest = Manifest::new(api.clone());
3406    let watchlist = Watchlist::new();
3407    let feed_timeline = FeedTimeline::new();
3408    let pubsub_screen = Pubsub::new();
3409    vec![
3410        Box::new(health),
3411        Box::new(stamps),
3412        Box::new(swap),
3413        Box::new(lottery),
3414        Box::new(peers),
3415        Box::new(network),
3416        Box::new(warmup),
3417        Box::new(api_health),
3418        Box::new(tags),
3419        Box::new(pins),
3420        Box::new(manifest),
3421        Box::new(watchlist),
3422        Box::new(feed_timeline),
3423        Box::new(pubsub_screen),
3424    ]
3425}
3426
3427/// Build the 4104-byte (8 + 4096) synthetic chunk that
3428/// `:probe-upload` ships at Bee. Timestamp-randomised so each
3429/// invocation produces a unique chunk address — Bee's
3430/// content-addressing dedup would otherwise short-circuit the
3431/// second probe on a fresh batch and skew the latency reading.
3432/// Returns `Vec<u8>`, which `bee::FileApi::upload_chunk` accepts via
3433/// its `impl Into<bytes::Bytes>` parameter.
3434fn build_synthetic_probe_chunk() -> Vec<u8> {
3435    use std::time::{SystemTime, UNIX_EPOCH};
3436    let nanos = SystemTime::now()
3437        .duration_since(UNIX_EPOCH)
3438        .map(|d| d.as_nanos())
3439        .unwrap_or(0);
3440    let mut data = Vec::with_capacity(8 + 4096);
3441    // Span: little-endian u64 with the payload length.
3442    data.extend_from_slice(&4096u64.to_le_bytes());
3443    // Payload: 16 bytes of timestamp + zero-padding to 4096.
3444    data.extend_from_slice(&nanos.to_le_bytes());
3445    data.resize(8 + 4096, 0);
3446    data
3447}
3448
3449/// Truncate a hex string to a short prefix with an ellipsis. Used by
3450/// `:probe-upload` for the human-readable batch + reference labels.
3451fn short_hex(hex: &str, len: usize) -> String {
3452    if hex.len() > len {
3453        format!("{}…", &hex[..len])
3454    } else {
3455        hex.to_string()
3456    }
3457}
3458
3459/// Best-effort MIME guess from the file extension. The cockpit's
3460/// `:upload-file` is the only caller; bee-rs falls back to
3461/// `application/octet-stream` if we hand it an empty string, but
3462/// recognising the common types saves operators from a manual
3463/// `--content-type` flag for typical web/document workflows.
3464fn guess_content_type(path: &std::path::Path) -> String {
3465    let ext = path
3466        .extension()
3467        .and_then(|e| e.to_str())
3468        .map(|s| s.to_ascii_lowercase());
3469    match ext.as_deref() {
3470        Some("html") | Some("htm") => "text/html",
3471        Some("txt") | Some("md") => "text/plain",
3472        Some("json") => "application/json",
3473        Some("css") => "text/css",
3474        Some("js") => "application/javascript",
3475        Some("png") => "image/png",
3476        Some("jpg") | Some("jpeg") => "image/jpeg",
3477        Some("gif") => "image/gif",
3478        Some("svg") => "image/svg+xml",
3479        Some("webp") => "image/webp",
3480        Some("pdf") => "application/pdf",
3481        Some("zip") => "application/zip",
3482        Some("tar") => "application/x-tar",
3483        Some("gz") | Some("tgz") => "application/gzip",
3484        Some("wasm") => "application/wasm",
3485        _ => "",
3486    }
3487    .to_string()
3488}
3489
3490/// Build the closure the metrics HTTP handler invokes on each
3491/// scrape. Captures cloned `BeeWatch` receivers (cheap — they're
3492/// `Arc`-backed) plus the log-capture handle, then re-reads the
3493/// latest snapshot of each on every call. Returns an `Arc<Fn>`
3494/// matching `metrics_server::RenderFn`.
3495fn build_metrics_render_fn(
3496    watch: BeeWatch,
3497    log_capture: Option<log_capture::LogCapture>,
3498) -> crate::metrics_server::RenderFn {
3499    use std::time::{SystemTime, UNIX_EPOCH};
3500    Arc::new(move || {
3501        let health = watch.health().borrow().clone();
3502        let stamps = watch.stamps().borrow().clone();
3503        let swap = watch.swap().borrow().clone();
3504        let lottery = watch.lottery().borrow().clone();
3505        let topology = watch.topology().borrow().clone();
3506        let network = watch.network().borrow().clone();
3507        let transactions = watch.transactions().borrow().clone();
3508        let recent = log_capture
3509            .as_ref()
3510            .map(|c| c.snapshot())
3511            .unwrap_or_default();
3512        let call_stats = crate::components::api_health::call_stats_for(&recent);
3513        let now_unix = SystemTime::now()
3514            .duration_since(UNIX_EPOCH)
3515            .map(|d| d.as_secs() as i64)
3516            .unwrap_or(0);
3517        let inputs = crate::metrics::MetricsInputs {
3518            bee_tui_version: env!("CARGO_PKG_VERSION"),
3519            health: &health,
3520            stamps: &stamps,
3521            swap: &swap,
3522            lottery: &lottery,
3523            topology: &topology,
3524            network: &network,
3525            transactions: &transactions,
3526            call_stats: &call_stats,
3527            now_unix,
3528        };
3529        crate::metrics::render(&inputs)
3530    })
3531}
3532
3533fn format_gate_line(g: &Gate) -> String {
3534    let glyphs = crate::theme::active().glyphs;
3535    let glyph = match g.status {
3536        GateStatus::Pass => glyphs.pass,
3537        GateStatus::Warn => glyphs.warn,
3538        GateStatus::Fail => glyphs.fail,
3539        GateStatus::Unknown => glyphs.bullet,
3540    };
3541    let mut s = format!(
3542        "  [{glyph}] {label:<28} {value}\n",
3543        label = g.label,
3544        value = g.value
3545    );
3546    if let Some(why) = &g.why {
3547        s.push_str(&format!("        {} {why}\n", glyphs.continuation));
3548    }
3549    s
3550}
3551
3552/// Strip scheme + host from a URL, leaving only the path + query.
3553/// Mirrors the redaction the S10 command-log pane applies on render.
3554fn path_only(url: &str) -> String {
3555    if let Some(idx) = url.find("//") {
3556        let after_scheme = &url[idx + 2..];
3557        if let Some(slash) = after_scheme.find('/') {
3558            return after_scheme[slash..].to_string();
3559        }
3560        return "/".into();
3561    }
3562    url.to_string()
3563}
3564
3565/// Format the current wall-clock UTC time as `HH:MM:SS`. We compute
3566/// from `SystemTime::now()` directly so the binary stays free of a
3567/// chrono / time dep just for this one display string.
3568/// Append-write to `path`. Used by the `:pins-check` background task
3569/// to stream NDJSON-style results into a file the operator can
3570/// `tail -f`.
3571fn append(path: &PathBuf, s: &str) -> std::io::Result<()> {
3572    use std::io::Write;
3573    let mut f = std::fs::OpenOptions::new().append(true).open(path)?;
3574    f.write_all(s.as_bytes())
3575}
3576
3577/// Bee returns logger verbosity as a free-form string — usually
3578/// `"all"`, `"trace"`, `"debug"`, `"info"`, `"warning"`, `"error"`,
3579/// `"none"`, plus the legacy numeric forms `"1"`/`"2"`/`"3"`. Map to
3580/// a coarse rank so the noisier loggers sort to the top of the
3581/// `:loggers` dump. Unknown strings get rank 0 (silent end).
3582fn verbosity_rank(s: &str) -> u8 {
3583    match s {
3584        "all" | "trace" => 5,
3585        "debug" => 4,
3586        "info" | "1" => 3,
3587        "warning" | "warn" | "2" => 2,
3588        "error" | "3" => 1,
3589        _ => 0,
3590    }
3591}
3592
3593/// Drop characters that are unsafe in a filename. Profile names come
3594/// from the user's `config.toml`, so we accept what's in there but
3595/// keep the path well-behaved on every shell.
3596fn sanitize_for_filename(s: &str) -> String {
3597    s.chars()
3598        .map(|c| match c {
3599            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => c,
3600            _ => '-',
3601        })
3602        .collect()
3603}
3604
3605/// Outcome of a `q` keystroke under the double-tap-to-quit guard.
3606/// Pure data so [`resolve_quit_press`] can be unit-tested without
3607/// any TUI / event-loop scaffolding.
3608#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3609pub enum QuitResolution {
3610    /// Second `q` arrived inside the confirmation window — quit.
3611    Confirm,
3612    /// First `q`, or a second `q` after the window expired —
3613    /// remember the timestamp and surface the hint.
3614    Pending,
3615}
3616
3617/// Decide what to do with a `q` press given the previous press
3618/// timestamp (if any) and the current time. The window is supplied
3619/// rather than read from a constant so tests can use short windows
3620/// without sleeping.
3621fn resolve_quit_press(prev: Option<Instant>, now: Instant, window: Duration) -> QuitResolution {
3622    match prev {
3623        Some(t) if now.duration_since(t) <= window => QuitResolution::Confirm,
3624        _ => QuitResolution::Pending,
3625    }
3626}
3627
3628fn format_utc_now() -> String {
3629    let secs = SystemTime::now()
3630        .duration_since(UNIX_EPOCH)
3631        .map(|d| d.as_secs())
3632        .unwrap_or(0);
3633    let secs_in_day = secs % 86_400;
3634    let h = secs_in_day / 3_600;
3635    let m = (secs_in_day % 3_600) / 60;
3636    let s = secs_in_day % 60;
3637    format!("{h:02}:{m:02}:{s:02}")
3638}
3639
3640#[cfg(test)]
3641mod tests {
3642    use super::*;
3643
3644    #[test]
3645    fn format_utc_now_returns_eight_chars() {
3646        let s = format_utc_now();
3647        assert_eq!(s.len(), 8);
3648        assert_eq!(s.chars().nth(2), Some(':'));
3649        assert_eq!(s.chars().nth(5), Some(':'));
3650    }
3651
3652    #[test]
3653    fn path_only_strips_scheme_and_host() {
3654        assert_eq!(path_only("http://10.0.1.5:1633/status"), "/status");
3655        assert_eq!(
3656            path_only("https://bee.example.com/stamps?limit=10"),
3657            "/stamps?limit=10"
3658        );
3659    }
3660
3661    #[test]
3662    fn path_only_handles_no_path() {
3663        assert_eq!(path_only("http://localhost:1633"), "/");
3664    }
3665
3666    #[test]
3667    fn path_only_passes_relative_through() {
3668        assert_eq!(path_only("/already/relative"), "/already/relative");
3669    }
3670
3671    #[test]
3672    fn parse_pprof_arg_default_60() {
3673        assert_eq!(parse_pprof_arg("diagnose --pprof"), Some(60));
3674        assert_eq!(parse_pprof_arg("diag --pprof some other"), Some(60));
3675    }
3676
3677    #[test]
3678    fn parse_pprof_arg_with_explicit_seconds() {
3679        assert_eq!(parse_pprof_arg("diagnose --pprof=120"), Some(120));
3680        assert_eq!(parse_pprof_arg("diagnose --pprof=15 trailing"), Some(15));
3681    }
3682
3683    #[test]
3684    fn parse_pprof_arg_clamps_extreme_values() {
3685        // 0 → 1 (lower clamp), 9999 → 600 (upper clamp).
3686        assert_eq!(parse_pprof_arg("diagnose --pprof=0"), Some(1));
3687        assert_eq!(parse_pprof_arg("diagnose --pprof=9999"), Some(600));
3688    }
3689
3690    #[test]
3691    fn parse_pprof_arg_none_when_absent() {
3692        assert_eq!(parse_pprof_arg("diagnose"), None);
3693        assert_eq!(parse_pprof_arg("diag"), None);
3694        assert_eq!(parse_pprof_arg(""), None);
3695    }
3696
3697    #[test]
3698    fn parse_pprof_arg_ignores_garbage_value() {
3699        // Garbage after `=` falls through to None — operator gets the
3700        // sync diagnostic, not a panic on bad input.
3701        assert_eq!(parse_pprof_arg("diagnose --pprof=lol"), None);
3702    }
3703
3704    #[test]
3705    fn guess_content_type_known_extensions() {
3706        let p = std::path::PathBuf::from;
3707        assert_eq!(guess_content_type(&p("/tmp/x.html")), "text/html");
3708        assert_eq!(guess_content_type(&p("/tmp/x.json")), "application/json");
3709        assert_eq!(guess_content_type(&p("/tmp/x.PNG")), "image/png");
3710        assert_eq!(guess_content_type(&p("/tmp/x.tar.gz")), "application/gzip");
3711    }
3712
3713    #[test]
3714    fn guess_content_type_unknown_returns_empty() {
3715        let p = std::path::PathBuf::from;
3716        // bee-rs treats empty as "use default application/octet-stream",
3717        // so an unknown extension shouldn't produce a misleading guess.
3718        assert_eq!(guess_content_type(&p("/tmp/x.unknownext")), "");
3719        assert_eq!(guess_content_type(&p("/tmp/no-extension")), "");
3720    }
3721
3722    #[test]
3723    fn sanitize_for_filename_keeps_safe_chars() {
3724        assert_eq!(sanitize_for_filename("prod-1"), "prod-1");
3725        assert_eq!(sanitize_for_filename("lab_node"), "lab_node");
3726    }
3727
3728    #[test]
3729    fn sanitize_for_filename_replaces_unsafe_chars() {
3730        assert_eq!(sanitize_for_filename("a/b\\c d"), "a-b-c-d");
3731        assert_eq!(sanitize_for_filename("name:colon"), "name-colon");
3732    }
3733
3734    #[test]
3735    fn resolve_quit_press_first_press_is_pending() {
3736        let now = Instant::now();
3737        assert_eq!(
3738            resolve_quit_press(None, now, Duration::from_millis(1500)),
3739            QuitResolution::Pending
3740        );
3741    }
3742
3743    #[test]
3744    fn resolve_quit_press_second_press_inside_window_confirms() {
3745        let first = Instant::now();
3746        let window = Duration::from_millis(1500);
3747        let second = first + Duration::from_millis(500);
3748        assert_eq!(
3749            resolve_quit_press(Some(first), second, window),
3750            QuitResolution::Confirm
3751        );
3752    }
3753
3754    #[test]
3755    fn resolve_quit_press_second_press_after_window_resets_to_pending() {
3756        // A `q` long after the previous press should restart the
3757        // double-tap window — the operator hasn't really "meant it
3758        // twice in a row".
3759        let first = Instant::now();
3760        let window = Duration::from_millis(1500);
3761        let second = first + Duration::from_millis(2_000);
3762        assert_eq!(
3763            resolve_quit_press(Some(first), second, window),
3764            QuitResolution::Pending
3765        );
3766    }
3767
3768    #[test]
3769    fn resolve_quit_press_at_window_boundary_confirms() {
3770        // Exactly at the boundary the press counts as confirm —
3771        // operators tapping in rhythm shouldn't be punished by jitter.
3772        let first = Instant::now();
3773        let window = Duration::from_millis(1500);
3774        let second = first + window;
3775        assert_eq!(
3776            resolve_quit_press(Some(first), second, window),
3777            QuitResolution::Confirm
3778        );
3779    }
3780
3781    #[test]
3782    fn screen_keymap_covers_drill_screens() {
3783        // Stamps (1) and Peers (4) are the two screens with drill
3784        // panes — both must list ↑↓ / Enter / Esc in the help.
3785        for idx in [1usize, 4] {
3786            let rows = screen_keymap(idx);
3787            assert!(
3788                rows.iter().any(|(k, _)| k.contains("Enter")),
3789                "screen {idx} keymap must mention Enter (drill)"
3790            );
3791            assert!(
3792                rows.iter().any(|(k, _)| k.contains("Esc")),
3793                "screen {idx} keymap must mention Esc (close drill)"
3794            );
3795        }
3796    }
3797
3798    #[test]
3799    fn screen_keymap_lottery_advertises_rchash() {
3800        let rows = screen_keymap(3);
3801        assert!(rows.iter().any(|(k, _)| k.contains("r")));
3802    }
3803
3804    #[test]
3805    fn screen_keymap_unknown_index_is_empty_not_panic() {
3806        assert!(screen_keymap(999).is_empty());
3807    }
3808
3809    #[test]
3810    fn verbosity_rank_orders_loud_to_silent() {
3811        assert!(verbosity_rank("all") > verbosity_rank("debug"));
3812        assert!(verbosity_rank("debug") > verbosity_rank("info"));
3813        assert!(verbosity_rank("info") > verbosity_rank("warning"));
3814        assert!(verbosity_rank("warning") > verbosity_rank("error"));
3815        assert!(verbosity_rank("error") > verbosity_rank("unknown"));
3816        // Numeric and named forms sort identically.
3817        assert_eq!(verbosity_rank("info"), verbosity_rank("1"));
3818        assert_eq!(verbosity_rank("warning"), verbosity_rank("2"));
3819    }
3820
3821    #[test]
3822    fn filter_command_suggestions_empty_buffer_returns_all() {
3823        let matches = filter_command_suggestions("", KNOWN_COMMANDS);
3824        assert_eq!(matches.len(), KNOWN_COMMANDS.len());
3825    }
3826
3827    #[test]
3828    fn filter_command_suggestions_prefix_matches_case_insensitive() {
3829        let matches = filter_command_suggestions("Bu", KNOWN_COMMANDS);
3830        let names: Vec<&str> = matches.iter().map(|(n, _)| *n).collect();
3831        assert!(names.contains(&"buy-preview"));
3832        assert!(names.contains(&"buy-suggest"));
3833        assert_eq!(names.len(), 2);
3834    }
3835
3836    #[test]
3837    fn filter_command_suggestions_unknown_prefix_is_empty() {
3838        let matches = filter_command_suggestions("xyz", KNOWN_COMMANDS);
3839        assert!(matches.is_empty());
3840    }
3841
3842    #[test]
3843    fn filter_command_suggestions_uses_first_token_only() {
3844        // `:topup-preview a1b2 1000` — the prefix is the verb, not
3845        // any of the args.
3846        let matches = filter_command_suggestions("topup-preview a1b2 1000", KNOWN_COMMANDS);
3847        let names: Vec<&str> = matches.iter().map(|(n, _)| *n).collect();
3848        assert_eq!(names, vec!["topup-preview"]);
3849    }
3850
3851    #[test]
3852    fn probe_chunk_is_4104_bytes_with_correct_span() {
3853        // span(8) + payload(4096) = 4104, span = 4096 little-endian.
3854        let chunk = build_synthetic_probe_chunk();
3855        assert_eq!(chunk.len(), 4104);
3856        let span = u64::from_le_bytes(chunk[..8].try_into().unwrap());
3857        assert_eq!(span, 4096);
3858    }
3859
3860    #[test]
3861    fn probe_chunk_payloads_are_unique_per_call() {
3862        // Timestamp-randomised → two consecutive builds must differ.
3863        // The randomness lives in payload bytes 0..16, so compare just
3864        // that window to keep the test deterministic against the
3865        // zero-padded tail.
3866        let a = build_synthetic_probe_chunk();
3867        // tiny sleep so the nanosecond clock is guaranteed to advance
3868        std::thread::sleep(Duration::from_micros(1));
3869        let b = build_synthetic_probe_chunk();
3870        assert_ne!(&a[8..24], &b[8..24]);
3871    }
3872
3873    #[test]
3874    fn short_hex_truncates_with_ellipsis() {
3875        assert_eq!(short_hex("a1b2c3d4e5f6", 8), "a1b2c3d4…");
3876        assert_eq!(short_hex("short", 8), "short");
3877        assert_eq!(short_hex("abcdefgh", 8), "abcdefgh");
3878    }
3879}