Skip to main content

bee_tui/
pubsub.rs

1//! Pubsub watch — the receiver side of v1.3's `:gsoc-mine` and
2//! `:pss-target` writer verbs. PSS topic subscriptions and GSOC
3//! `(owner, identifier)` subscriptions both connect to a Bee
4//! WebSocket and emit message frames whenever a publisher hits the
5//! corresponding `/pss/send` / SOC-upload endpoint.
6//!
7//! The S15 Pubsub screen renders a merged timeline of all active
8//! subscriptions; this module owns the subscription metadata + the
9//! [`PubsubMessage`] shape that flows between the background watcher
10//! tasks and the screen via an mpsc channel in `App`.
11
12use std::sync::Arc;
13use std::time::SystemTime;
14
15use bee::swarm::{EthAddress, Identifier, Topic};
16use tokio::sync::mpsc::UnboundedSender;
17use tokio_util::sync::CancellationToken;
18
19use crate::api::ApiClient;
20
21/// Maximum number of messages the screen retains. Older messages
22/// are evicted from the front (`VecDeque`-style) so the cockpit's
23/// memory stays bounded even on a noisy topic.
24pub const MAX_MESSAGES: usize = 500;
25
26/// One delivered message — what the screen renders as a row.
27/// Encodes both PSS (topic-keyed) and GSOC (address-keyed) variants
28/// in a single shape so the merged timeline can render them
29/// together, distinguished by [`PubsubKind`] in the row glyph.
30#[derive(Debug, Clone)]
31pub struct PubsubMessage {
32    /// Wall-clock time the cockpit received the frame from Bee
33    /// (not the publisher's timestamp; PSS / GSOC frames don't
34    /// carry one of their own).
35    pub received_at: SystemTime,
36    pub kind: PubsubKind,
37    /// `topic` for PSS, the SOC address (hex) for GSOC. The screen
38    /// renders this short; the detail line shows the full hex.
39    pub channel: String,
40    /// Raw payload bytes. Bounded by Bee's chunk-size cap (4 KiB)
41    /// for both PSS and GSOC.
42    pub payload: Vec<u8>,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum PubsubKind {
47    Pss,
48    Gsoc,
49}
50
51impl PubsubKind {
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::Pss => "PSS",
55            Self::Gsoc => "GSOC",
56        }
57    }
58}
59
60/// Identity for an active subscription. Used as the key in `App`'s
61/// `pubsub_subs` hashmap so re-issuing `:pubsub-pss <topic>` for an
62/// already-subscribed topic is detectable + cancellable.
63pub fn pss_sub_id(topic: &Topic) -> String {
64    format!("pss:{}", topic.to_hex())
65}
66
67pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
68    format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
69}
70
71/// Spawn a background task that drives a PSS subscription's
72/// `recv()` loop and forwards every delivered message into `tx` as
73/// a [`PubsubMessage`] until `cancel` fires. Returns immediately
74/// after `subscribe` succeeds; the actual recv loop runs on tokio.
75pub async fn spawn_pss_watcher(
76    api: Arc<ApiClient>,
77    topic: Topic,
78    cancel: CancellationToken,
79    tx: UnboundedSender<PubsubMessage>,
80) -> Result<(), String> {
81    let mut sub = api
82        .bee()
83        .pss()
84        .subscribe(&topic)
85        .await
86        .map_err(|e| format!("/pss/subscribe failed: {e}"))?;
87    let channel = topic.to_hex();
88    tokio::spawn(async move {
89        loop {
90            tokio::select! {
91                msg = sub.recv() => {
92                    match msg {
93                        Some(payload) => {
94                            let _ = tx.send(PubsubMessage {
95                                received_at: SystemTime::now(),
96                                kind: PubsubKind::Pss,
97                                channel: channel.clone(),
98                                payload: payload.to_vec(),
99                            });
100                        }
101                        None => return, // ws closed by Bee
102                    }
103                }
104                _ = cancel.cancelled() => {
105                    sub.cancel();
106                    return;
107                }
108            }
109        }
110    });
111    Ok(())
112}
113
114/// Spawn a background task that drives a GSOC subscription's
115/// `recv()` loop. Same shape as [`spawn_pss_watcher`].
116pub async fn spawn_gsoc_watcher(
117    api: Arc<ApiClient>,
118    owner: EthAddress,
119    identifier: Identifier,
120    cancel: CancellationToken,
121    tx: UnboundedSender<PubsubMessage>,
122) -> Result<(), String> {
123    let mut sub = api
124        .bee()
125        .gsoc()
126        .subscribe(&owner, &identifier)
127        .await
128        .map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
129    // Channel = SOC address (the value Bee uses to route).
130    let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
131        Ok(r) => r.to_hex(),
132        Err(e) => return Err(format!("calculate soc address: {e}")),
133    };
134    tokio::spawn(async move {
135        loop {
136            tokio::select! {
137                msg = sub.recv() => {
138                    match msg {
139                        Some(payload) => {
140                            let _ = tx.send(PubsubMessage {
141                                received_at: SystemTime::now(),
142                                kind: PubsubKind::Gsoc,
143                                channel: channel.clone(),
144                                payload: payload.to_vec(),
145                            });
146                        }
147                        None => return,
148                    }
149                }
150                _ = cancel.cancelled() => {
151                    sub.cancel();
152                    return;
153                }
154            }
155        }
156    });
157    Ok(())
158}
159
160/// Convert a small byte payload to a printable-ASCII preview.
161/// Replaces non-printable bytes with `.`. Caps at `cap` characters.
162/// Used by the screen renderer for the inline content peek.
163pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
164    let mut s = String::with_capacity(cap.min(bytes.len()));
165    for &b in bytes.iter().take(cap) {
166        if (0x20..0x7f).contains(&b) {
167            s.push(b as char);
168        } else {
169            s.push('.');
170        }
171    }
172    if bytes.len() > cap {
173        s.push('…');
174    }
175    s
176}
177
178/// Hex-prefix preview for binary payloads (when ASCII is mostly
179/// dots). Caps at `cap` characters of hex (so `cap/2` bytes).
180pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
181    let bytes_to_show = bytes.iter().take(cap / 2);
182    let mut s = String::with_capacity(cap.min(bytes.len() * 2));
183    for b in bytes_to_show {
184        s.push_str(&format!("{b:02x}"));
185    }
186    if bytes.len() * 2 > cap {
187        s.push('…');
188    }
189    s
190}
191
192/// Pick the smarter of [`ascii_preview`] / [`hex_preview`] for a
193/// payload — ASCII when ≥ 75% of the bytes are printable, hex
194/// otherwise. Pure for testability.
195pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
196    if bytes.is_empty() {
197        return "(empty)".to_string();
198    }
199    let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
200    let ratio = printable as f64 / bytes.len() as f64;
201    if ratio >= 0.75 {
202        ascii_preview(bytes, cap)
203    } else {
204        hex_preview(bytes, cap)
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn ascii_preview_replaces_nonprintable() {
214        let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
215        assert_eq!(p, "hi.!.");
216    }
217
218    #[test]
219    fn ascii_preview_caps_with_ellipsis() {
220        let p = ascii_preview(b"abcdefghij", 5);
221        assert_eq!(p, "abcde…");
222    }
223
224    #[test]
225    fn hex_preview_renders_two_hex_chars_per_byte() {
226        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
227        assert_eq!(p, "deadbeef");
228    }
229
230    #[test]
231    fn hex_preview_caps_with_ellipsis() {
232        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
233        assert_eq!(p, "dead…");
234    }
235
236    #[test]
237    fn smart_preview_picks_ascii_for_text() {
238        let p = smart_preview(b"hello world!", 32);
239        assert_eq!(p, "hello world!");
240    }
241
242    #[test]
243    fn smart_preview_picks_hex_for_binary() {
244        let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
245        // Binary → hex form.
246        assert_eq!(p, "fffefd00");
247    }
248
249    #[test]
250    fn smart_preview_handles_empty() {
251        assert_eq!(smart_preview(&[], 16), "(empty)");
252    }
253
254    #[test]
255    fn pss_sub_id_uses_topic_hex() {
256        let topic = Topic::from_string("test-topic");
257        let id = pss_sub_id(&topic);
258        assert!(id.starts_with("pss:"));
259        assert_eq!(&id[4..], &topic.to_hex());
260    }
261
262    #[test]
263    fn gsoc_sub_id_combines_owner_and_identifier() {
264        let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
265        let id = Identifier::new(&[0xab; 32]).unwrap();
266        let sub_id = gsoc_sub_id(&owner, &id);
267        assert!(sub_id.starts_with("gsoc:"));
268        assert!(sub_id.contains(&owner.to_hex()));
269        assert!(sub_id.contains(&id.to_hex()));
270    }
271}