Skip to main content

bee_tui/
pubsub.rs

1//! Pubsub watch — the receiver side of v1.3's `:gsoc-mine` and
2//! `:pss-target` writer verbs. PSS topic subscriptions and GSOC
3//! `(owner, identifier)` subscriptions both connect to a Bee
4//! WebSocket and emit message frames whenever a publisher hits the
5//! corresponding `/pss/send` / SOC-upload endpoint.
6//!
7//! The S15 Pubsub screen renders a merged timeline of all active
8//! subscriptions; this module owns the subscription metadata + the
9//! [`PubsubMessage`] shape that flows between the background watcher
10//! tasks and the screen via an mpsc channel in `App`.
11
12use std::path::Path;
13use std::sync::Arc;
14use std::time::SystemTime;
15
16use bee::swarm::{EthAddress, Identifier, Topic};
17use tokio::sync::Mutex;
18use tokio::sync::mpsc::UnboundedSender;
19use tokio_util::sync::CancellationToken;
20
21use crate::api::ApiClient;
22
23/// Maximum number of messages the screen retains. Older messages
24/// are evicted from the front (`VecDeque`-style) so the cockpit's
25/// memory stays bounded even on a noisy topic.
26pub const MAX_MESSAGES: usize = 500;
27
28/// One delivered message — what the screen renders as a row.
29/// Encodes both PSS (topic-keyed) and GSOC (address-keyed) variants
30/// in a single shape so the merged timeline can render them
31/// together, distinguished by [`PubsubKind`] in the row glyph.
32#[derive(Debug, Clone)]
33pub struct PubsubMessage {
34    /// Wall-clock time the cockpit received the frame from Bee
35    /// (not the publisher's timestamp; PSS / GSOC frames don't
36    /// carry one of their own).
37    pub received_at: SystemTime,
38    pub kind: PubsubKind,
39    /// `topic` for PSS, the SOC address (hex) for GSOC. The screen
40    /// renders this short; the detail line shows the full hex.
41    pub channel: String,
42    /// Raw payload bytes. Bounded by Bee's chunk-size cap (4 KiB)
43    /// for both PSS and GSOC.
44    pub payload: Vec<u8>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PubsubKind {
49    Pss,
50    Gsoc,
51}
52
53impl PubsubKind {
54    pub fn as_str(self) -> &'static str {
55        match self {
56            Self::Pss => "PSS",
57            Self::Gsoc => "GSOC",
58        }
59    }
60}
61
62/// Identity for an active subscription. Used as the key in `App`'s
63/// `pubsub_subs` hashmap so re-issuing `:pubsub-pss <topic>` for an
64/// already-subscribed topic is detectable + cancellable.
65pub fn pss_sub_id(topic: &Topic) -> String {
66    format!("pss:{}", topic.to_hex())
67}
68
69pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
70    format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
71}
72
73/// Optional history-file writer shared by every active subscription.
74/// `None` when `[pubsub].history_file` is unset; `Some(file)` when
75/// the operator opted in. The writer is wrapped in a `tokio::sync::Mutex`
76/// so concurrent watchers serialise their appends — JSONL stays
77/// well-formed even when two subscriptions deliver simultaneously.
78pub type HistoryWriter = Option<Arc<Mutex<tokio::fs::File>>>;
79
80/// Open `path` for append (`O_CREATE | O_APPEND`), creating it if
81/// it doesn't exist. Returns `Some(writer)` ready to share across
82/// watchers. Errors are surfaced to the caller so cockpit startup
83/// can log a clear "history file disabled: <reason>" message and
84/// keep the live tail going without persistence.
85pub async fn open_history_writer(path: &Path) -> Result<HistoryWriter, String> {
86    let mut opts = tokio::fs::OpenOptions::new();
87    opts.create(true).append(true);
88    #[cfg(unix)]
89    {
90        // tokio::fs::OpenOptions exposes mode() directly on Unix
91        // (no OpenOptionsExt import needed). Owner-only because
92        // pubsub payloads can be sensitive on shared hosts.
93        opts.mode(0o600);
94    }
95    let file = opts
96        .open(path)
97        .await
98        .map_err(|e| format!("open {}: {e}", path.display()))?;
99    Ok(Some(Arc::new(Mutex::new(file))))
100}
101
102/// Append one JSONL line for `msg`. Failures are logged but never
103/// fatal — losing a history line shouldn't kill the live tail.
104async fn append_history(writer: &HistoryWriter, msg: &PubsubMessage) {
105    let Some(file) = writer.as_ref() else {
106        return;
107    };
108    let received_unix = msg
109        .received_at
110        .duration_since(SystemTime::UNIX_EPOCH)
111        .map(|d| d.as_secs())
112        .unwrap_or(0);
113    let line = serde_json::json!({
114        "received_unix": received_unix,
115        "kind": msg.kind.as_str(),
116        "channel": msg.channel,
117        "size": msg.payload.len(),
118        "payload_hex": hex_preview(&msg.payload, msg.payload.len() * 2),
119    });
120    let mut bytes = match serde_json::to_vec(&line) {
121        Ok(b) => b,
122        Err(e) => {
123            tracing::warn!(target: "bee_tui::pubsub", "history serialise failed: {e}");
124            return;
125        }
126    };
127    bytes.push(b'\n');
128    let mut guard = file.lock().await;
129    use tokio::io::AsyncWriteExt;
130    if let Err(e) = guard.write_all(&bytes).await {
131        tracing::warn!(target: "bee_tui::pubsub", "history append failed: {e}");
132    }
133}
134
135/// Spawn a background task that drives a PSS subscription's
136/// `recv()` loop and forwards every delivered message into `tx` as
137/// a [`PubsubMessage`] until `cancel` fires. Returns immediately
138/// after `subscribe` succeeds; the actual recv loop runs on tokio.
139/// `history` is the optional shared writer that appends every
140/// frame to a JSONL file on arrival.
141pub async fn spawn_pss_watcher(
142    api: Arc<ApiClient>,
143    topic: Topic,
144    cancel: CancellationToken,
145    tx: UnboundedSender<PubsubMessage>,
146    history: HistoryWriter,
147) -> Result<(), String> {
148    let mut sub = api
149        .bee()
150        .pss()
151        .subscribe(&topic)
152        .await
153        .map_err(|e| format!("/pss/subscribe failed: {e}"))?;
154    let channel = topic.to_hex();
155    tokio::spawn(async move {
156        loop {
157            tokio::select! {
158                msg = sub.recv() => {
159                    match msg {
160                        Some(payload) => {
161                            let m = PubsubMessage {
162                                received_at: SystemTime::now(),
163                                kind: PubsubKind::Pss,
164                                channel: channel.clone(),
165                                payload: payload.to_vec(),
166                            };
167                            append_history(&history, &m).await;
168                            let _ = tx.send(m);
169                        }
170                        None => return, // ws closed by Bee
171                    }
172                }
173                _ = cancel.cancelled() => {
174                    sub.cancel();
175                    return;
176                }
177            }
178        }
179    });
180    Ok(())
181}
182
183/// Spawn a background task that drives a GSOC subscription's
184/// `recv()` loop. Same shape as [`spawn_pss_watcher`].
185pub async fn spawn_gsoc_watcher(
186    api: Arc<ApiClient>,
187    owner: EthAddress,
188    identifier: Identifier,
189    cancel: CancellationToken,
190    tx: UnboundedSender<PubsubMessage>,
191    history: HistoryWriter,
192) -> Result<(), String> {
193    let mut sub = api
194        .bee()
195        .gsoc()
196        .subscribe(&owner, &identifier)
197        .await
198        .map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
199    // Channel = SOC address (the value Bee uses to route).
200    let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
201        Ok(r) => r.to_hex(),
202        Err(e) => return Err(format!("calculate soc address: {e}")),
203    };
204    tokio::spawn(async move {
205        loop {
206            tokio::select! {
207                msg = sub.recv() => {
208                    match msg {
209                        Some(payload) => {
210                            let m = PubsubMessage {
211                                received_at: SystemTime::now(),
212                                kind: PubsubKind::Gsoc,
213                                channel: channel.clone(),
214                                payload: payload.to_vec(),
215                            };
216                            append_history(&history, &m).await;
217                            let _ = tx.send(m);
218                        }
219                        None => return,
220                    }
221                }
222                _ = cancel.cancelled() => {
223                    sub.cancel();
224                    return;
225                }
226            }
227        }
228    });
229    Ok(())
230}
231
232/// Convert a small byte payload to a printable-ASCII preview.
233/// Replaces non-printable bytes with `.`. Caps at `cap` characters.
234/// Used by the screen renderer for the inline content peek.
235pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
236    let mut s = String::with_capacity(cap.min(bytes.len()));
237    for &b in bytes.iter().take(cap) {
238        if (0x20..0x7f).contains(&b) {
239            s.push(b as char);
240        } else {
241            s.push('.');
242        }
243    }
244    if bytes.len() > cap {
245        s.push('…');
246    }
247    s
248}
249
250/// Hex-prefix preview for binary payloads (when ASCII is mostly
251/// dots). Caps at `cap` characters of hex (so `cap/2` bytes).
252pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
253    let bytes_to_show = bytes.iter().take(cap / 2);
254    let mut s = String::with_capacity(cap.min(bytes.len() * 2));
255    for b in bytes_to_show {
256        s.push_str(&format!("{b:02x}"));
257    }
258    if bytes.len() * 2 > cap {
259        s.push('…');
260    }
261    s
262}
263
264/// Pick the smarter of [`ascii_preview`] / [`hex_preview`] for a
265/// payload — ASCII when ≥ 75% of the bytes are printable, hex
266/// otherwise. Pure for testability.
267pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
268    if bytes.is_empty() {
269        return "(empty)".to_string();
270    }
271    let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
272    let ratio = printable as f64 / bytes.len() as f64;
273    if ratio >= 0.75 {
274        ascii_preview(bytes, cap)
275    } else {
276        hex_preview(bytes, cap)
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn ascii_preview_replaces_nonprintable() {
286        let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
287        assert_eq!(p, "hi.!.");
288    }
289
290    #[test]
291    fn ascii_preview_caps_with_ellipsis() {
292        let p = ascii_preview(b"abcdefghij", 5);
293        assert_eq!(p, "abcde…");
294    }
295
296    #[test]
297    fn hex_preview_renders_two_hex_chars_per_byte() {
298        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
299        assert_eq!(p, "deadbeef");
300    }
301
302    #[test]
303    fn hex_preview_caps_with_ellipsis() {
304        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
305        assert_eq!(p, "dead…");
306    }
307
308    #[test]
309    fn smart_preview_picks_ascii_for_text() {
310        let p = smart_preview(b"hello world!", 32);
311        assert_eq!(p, "hello world!");
312    }
313
314    #[test]
315    fn smart_preview_picks_hex_for_binary() {
316        let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
317        // Binary → hex form.
318        assert_eq!(p, "fffefd00");
319    }
320
321    #[test]
322    fn smart_preview_handles_empty() {
323        assert_eq!(smart_preview(&[], 16), "(empty)");
324    }
325
326    #[test]
327    fn pss_sub_id_uses_topic_hex() {
328        let topic = Topic::from_string("test-topic");
329        let id = pss_sub_id(&topic);
330        assert!(id.starts_with("pss:"));
331        assert_eq!(&id[4..], &topic.to_hex());
332    }
333
334    #[test]
335    fn gsoc_sub_id_combines_owner_and_identifier() {
336        let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
337        let id = Identifier::new(&[0xab; 32]).unwrap();
338        let sub_id = gsoc_sub_id(&owner, &id);
339        assert!(sub_id.starts_with("gsoc:"));
340        assert!(sub_id.contains(&owner.to_hex()));
341        assert!(sub_id.contains(&id.to_hex()));
342    }
343}