bee-tui 1.7.0

Production-grade k9s-style terminal cockpit for Ethereum Swarm Bee node operators.
Documentation
//! Pubsub watch — the receiver side of v1.3's `:gsoc-mine` and
//! `:pss-target` writer verbs. PSS topic subscriptions and GSOC
//! `(owner, identifier)` subscriptions both connect to a Bee
//! WebSocket and emit message frames whenever a publisher hits the
//! corresponding `/pss/send` / SOC-upload endpoint.
//!
//! The S15 Pubsub screen renders a merged timeline of all active
//! subscriptions; this module owns the subscription metadata + the
//! [`PubsubMessage`] shape that flows between the background watcher
//! tasks and the screen via an mpsc channel in `App`.

use std::sync::Arc;
use std::time::SystemTime;

use bee::swarm::{EthAddress, Identifier, Topic};
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;

use crate::api::ApiClient;

/// Maximum number of messages the screen retains. Older messages
/// are evicted from the front (`VecDeque`-style) so the cockpit's
/// memory stays bounded even on a noisy topic.
pub const MAX_MESSAGES: usize = 500;

/// One delivered message — what the screen renders as a row.
/// Encodes both PSS (topic-keyed) and GSOC (address-keyed) variants
/// in a single shape so the merged timeline can render them
/// together, distinguished by [`PubsubKind`] in the row glyph.
#[derive(Debug, Clone)]
pub struct PubsubMessage {
    /// Wall-clock time the cockpit received the frame from Bee
    /// (not the publisher's timestamp; PSS / GSOC frames don't
    /// carry one of their own).
    pub received_at: SystemTime,
    pub kind: PubsubKind,
    /// `topic` for PSS, the SOC address (hex) for GSOC. The screen
    /// renders this short; the detail line shows the full hex.
    pub channel: String,
    /// Raw payload bytes. Bounded by Bee's chunk-size cap (4 KiB)
    /// for both PSS and GSOC.
    pub payload: Vec<u8>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PubsubKind {
    Pss,
    Gsoc,
}

impl PubsubKind {
    pub fn as_str(self) -> &'static str {
        match self {
            Self::Pss => "PSS",
            Self::Gsoc => "GSOC",
        }
    }
}

/// Identity for an active subscription. Used as the key in `App`'s
/// `pubsub_subs` hashmap so re-issuing `:pubsub-pss <topic>` for an
/// already-subscribed topic is detectable + cancellable.
pub fn pss_sub_id(topic: &Topic) -> String {
    format!("pss:{}", topic.to_hex())
}

pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
    format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
}

/// Spawn a background task that drives a PSS subscription's
/// `recv()` loop and forwards every delivered message into `tx` as
/// a [`PubsubMessage`] until `cancel` fires. Returns immediately
/// after `subscribe` succeeds; the actual recv loop runs on tokio.
pub async fn spawn_pss_watcher(
    api: Arc<ApiClient>,
    topic: Topic,
    cancel: CancellationToken,
    tx: UnboundedSender<PubsubMessage>,
) -> Result<(), String> {
    let mut sub = api
        .bee()
        .pss()
        .subscribe(&topic)
        .await
        .map_err(|e| format!("/pss/subscribe failed: {e}"))?;
    let channel = topic.to_hex();
    tokio::spawn(async move {
        loop {
            tokio::select! {
                msg = sub.recv() => {
                    match msg {
                        Some(payload) => {
                            let _ = tx.send(PubsubMessage {
                                received_at: SystemTime::now(),
                                kind: PubsubKind::Pss,
                                channel: channel.clone(),
                                payload: payload.to_vec(),
                            });
                        }
                        None => return, // ws closed by Bee
                    }
                }
                _ = cancel.cancelled() => {
                    sub.cancel();
                    return;
                }
            }
        }
    });
    Ok(())
}

/// Spawn a background task that drives a GSOC subscription's
/// `recv()` loop. Same shape as [`spawn_pss_watcher`].
pub async fn spawn_gsoc_watcher(
    api: Arc<ApiClient>,
    owner: EthAddress,
    identifier: Identifier,
    cancel: CancellationToken,
    tx: UnboundedSender<PubsubMessage>,
) -> Result<(), String> {
    let mut sub = api
        .bee()
        .gsoc()
        .subscribe(&owner, &identifier)
        .await
        .map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
    // Channel = SOC address (the value Bee uses to route).
    let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
        Ok(r) => r.to_hex(),
        Err(e) => return Err(format!("calculate soc address: {e}")),
    };
    tokio::spawn(async move {
        loop {
            tokio::select! {
                msg = sub.recv() => {
                    match msg {
                        Some(payload) => {
                            let _ = tx.send(PubsubMessage {
                                received_at: SystemTime::now(),
                                kind: PubsubKind::Gsoc,
                                channel: channel.clone(),
                                payload: payload.to_vec(),
                            });
                        }
                        None => return,
                    }
                }
                _ = cancel.cancelled() => {
                    sub.cancel();
                    return;
                }
            }
        }
    });
    Ok(())
}

/// Convert a small byte payload to a printable-ASCII preview.
/// Replaces non-printable bytes with `.`. Caps at `cap` characters.
/// Used by the screen renderer for the inline content peek.
pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
    let mut s = String::with_capacity(cap.min(bytes.len()));
    for &b in bytes.iter().take(cap) {
        if (0x20..0x7f).contains(&b) {
            s.push(b as char);
        } else {
            s.push('.');
        }
    }
    if bytes.len() > cap {
        s.push('');
    }
    s
}

/// Hex-prefix preview for binary payloads (when ASCII is mostly
/// dots). Caps at `cap` characters of hex (so `cap/2` bytes).
pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
    let bytes_to_show = bytes.iter().take(cap / 2);
    let mut s = String::with_capacity(cap.min(bytes.len() * 2));
    for b in bytes_to_show {
        s.push_str(&format!("{b:02x}"));
    }
    if bytes.len() * 2 > cap {
        s.push('');
    }
    s
}

/// Pick the smarter of [`ascii_preview`] / [`hex_preview`] for a
/// payload — ASCII when ≥ 75% of the bytes are printable, hex
/// otherwise. Pure for testability.
pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
    if bytes.is_empty() {
        return "(empty)".to_string();
    }
    let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
    let ratio = printable as f64 / bytes.len() as f64;
    if ratio >= 0.75 {
        ascii_preview(bytes, cap)
    } else {
        hex_preview(bytes, cap)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn ascii_preview_replaces_nonprintable() {
        let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
        assert_eq!(p, "hi.!.");
    }

    #[test]
    fn ascii_preview_caps_with_ellipsis() {
        let p = ascii_preview(b"abcdefghij", 5);
        assert_eq!(p, "abcde…");
    }

    #[test]
    fn hex_preview_renders_two_hex_chars_per_byte() {
        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
        assert_eq!(p, "deadbeef");
    }

    #[test]
    fn hex_preview_caps_with_ellipsis() {
        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
        assert_eq!(p, "dead…");
    }

    #[test]
    fn smart_preview_picks_ascii_for_text() {
        let p = smart_preview(b"hello world!", 32);
        assert_eq!(p, "hello world!");
    }

    #[test]
    fn smart_preview_picks_hex_for_binary() {
        let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
        // Binary → hex form.
        assert_eq!(p, "fffefd00");
    }

    #[test]
    fn smart_preview_handles_empty() {
        assert_eq!(smart_preview(&[], 16), "(empty)");
    }

    #[test]
    fn pss_sub_id_uses_topic_hex() {
        let topic = Topic::from_string("test-topic");
        let id = pss_sub_id(&topic);
        assert!(id.starts_with("pss:"));
        assert_eq!(&id[4..], &topic.to_hex());
    }

    #[test]
    fn gsoc_sub_id_combines_owner_and_identifier() {
        let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
        let id = Identifier::new(&[0xab; 32]).unwrap();
        let sub_id = gsoc_sub_id(&owner, &id);
        assert!(sub_id.starts_with("gsoc:"));
        assert!(sub_id.contains(&owner.to_hex()));
        assert!(sub_id.contains(&id.to_hex()));
    }
}