1use std::path::Path;
13use std::sync::Arc;
14use std::time::SystemTime;
15
16use bee::swarm::{EthAddress, Identifier, Topic};
17use tokio::sync::Mutex;
18use tokio::sync::mpsc::UnboundedSender;
19use tokio_util::sync::CancellationToken;
20
21use crate::api::ApiClient;
22
23pub const MAX_MESSAGES: usize = 500;
27
28#[derive(Debug, Clone)]
33pub struct PubsubMessage {
34 pub received_at: SystemTime,
38 pub kind: PubsubKind,
39 pub channel: String,
42 pub payload: Vec<u8>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PubsubKind {
49 Pss,
50 Gsoc,
51}
52
53impl PubsubKind {
54 pub fn as_str(self) -> &'static str {
55 match self {
56 Self::Pss => "PSS",
57 Self::Gsoc => "GSOC",
58 }
59 }
60}
61
62pub fn pss_sub_id(topic: &Topic) -> String {
66 format!("pss:{}", topic.to_hex())
67}
68
69pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
70 format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
71}
72
73pub type HistoryWriter = Option<Arc<Mutex<tokio::fs::File>>>;
79
80pub async fn open_history_writer(path: &Path) -> Result<HistoryWriter, String> {
86 let mut opts = tokio::fs::OpenOptions::new();
87 opts.create(true).append(true);
88 #[cfg(unix)]
89 {
90 opts.mode(0o600);
94 }
95 let file = opts
96 .open(path)
97 .await
98 .map_err(|e| format!("open {}: {e}", path.display()))?;
99 Ok(Some(Arc::new(Mutex::new(file))))
100}
101
102async fn append_history(writer: &HistoryWriter, msg: &PubsubMessage) {
105 let Some(file) = writer.as_ref() else {
106 return;
107 };
108 let received_unix = msg
109 .received_at
110 .duration_since(SystemTime::UNIX_EPOCH)
111 .map(|d| d.as_secs())
112 .unwrap_or(0);
113 let line = serde_json::json!({
114 "received_unix": received_unix,
115 "kind": msg.kind.as_str(),
116 "channel": msg.channel,
117 "size": msg.payload.len(),
118 "payload_hex": hex_preview(&msg.payload, msg.payload.len() * 2),
119 });
120 let mut bytes = match serde_json::to_vec(&line) {
121 Ok(b) => b,
122 Err(e) => {
123 tracing::warn!(target: "bee_tui::pubsub", "history serialise failed: {e}");
124 return;
125 }
126 };
127 bytes.push(b'\n');
128 let mut guard = file.lock().await;
129 use tokio::io::AsyncWriteExt;
130 if let Err(e) = guard.write_all(&bytes).await {
131 tracing::warn!(target: "bee_tui::pubsub", "history append failed: {e}");
132 }
133}
134
135pub async fn spawn_pss_watcher(
142 api: Arc<ApiClient>,
143 topic: Topic,
144 cancel: CancellationToken,
145 tx: UnboundedSender<PubsubMessage>,
146 history: HistoryWriter,
147) -> Result<(), String> {
148 let mut sub = api
149 .bee()
150 .pss()
151 .subscribe(&topic)
152 .await
153 .map_err(|e| format!("/pss/subscribe failed: {e}"))?;
154 let channel = topic.to_hex();
155 tokio::spawn(async move {
156 loop {
157 tokio::select! {
158 msg = sub.recv() => {
159 match msg {
160 Some(payload) => {
161 let m = PubsubMessage {
162 received_at: SystemTime::now(),
163 kind: PubsubKind::Pss,
164 channel: channel.clone(),
165 payload: payload.to_vec(),
166 };
167 append_history(&history, &m).await;
168 let _ = tx.send(m);
169 }
170 None => return, }
172 }
173 _ = cancel.cancelled() => {
174 sub.cancel();
175 return;
176 }
177 }
178 }
179 });
180 Ok(())
181}
182
183pub async fn spawn_gsoc_watcher(
186 api: Arc<ApiClient>,
187 owner: EthAddress,
188 identifier: Identifier,
189 cancel: CancellationToken,
190 tx: UnboundedSender<PubsubMessage>,
191 history: HistoryWriter,
192) -> Result<(), String> {
193 let mut sub = api
194 .bee()
195 .gsoc()
196 .subscribe(&owner, &identifier)
197 .await
198 .map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
199 let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
201 Ok(r) => r.to_hex(),
202 Err(e) => return Err(format!("calculate soc address: {e}")),
203 };
204 tokio::spawn(async move {
205 loop {
206 tokio::select! {
207 msg = sub.recv() => {
208 match msg {
209 Some(payload) => {
210 let m = PubsubMessage {
211 received_at: SystemTime::now(),
212 kind: PubsubKind::Gsoc,
213 channel: channel.clone(),
214 payload: payload.to_vec(),
215 };
216 append_history(&history, &m).await;
217 let _ = tx.send(m);
218 }
219 None => return,
220 }
221 }
222 _ = cancel.cancelled() => {
223 sub.cancel();
224 return;
225 }
226 }
227 }
228 });
229 Ok(())
230}
231
232pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
236 let mut s = String::with_capacity(cap.min(bytes.len()));
237 for &b in bytes.iter().take(cap) {
238 if (0x20..0x7f).contains(&b) {
239 s.push(b as char);
240 } else {
241 s.push('.');
242 }
243 }
244 if bytes.len() > cap {
245 s.push('…');
246 }
247 s
248}
249
250pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
253 let bytes_to_show = bytes.iter().take(cap / 2);
254 let mut s = String::with_capacity(cap.min(bytes.len() * 2));
255 for b in bytes_to_show {
256 s.push_str(&format!("{b:02x}"));
257 }
258 if bytes.len() * 2 > cap {
259 s.push('…');
260 }
261 s
262}
263
264pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
268 if bytes.is_empty() {
269 return "(empty)".to_string();
270 }
271 let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
272 let ratio = printable as f64 / bytes.len() as f64;
273 if ratio >= 0.75 {
274 ascii_preview(bytes, cap)
275 } else {
276 hex_preview(bytes, cap)
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn ascii_preview_replaces_nonprintable() {
286 let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
287 assert_eq!(p, "hi.!.");
288 }
289
290 #[test]
291 fn ascii_preview_caps_with_ellipsis() {
292 let p = ascii_preview(b"abcdefghij", 5);
293 assert_eq!(p, "abcde…");
294 }
295
296 #[test]
297 fn hex_preview_renders_two_hex_chars_per_byte() {
298 let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
299 assert_eq!(p, "deadbeef");
300 }
301
302 #[test]
303 fn hex_preview_caps_with_ellipsis() {
304 let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
305 assert_eq!(p, "dead…");
306 }
307
308 #[test]
309 fn smart_preview_picks_ascii_for_text() {
310 let p = smart_preview(b"hello world!", 32);
311 assert_eq!(p, "hello world!");
312 }
313
314 #[test]
315 fn smart_preview_picks_hex_for_binary() {
316 let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
317 assert_eq!(p, "fffefd00");
319 }
320
321 #[test]
322 fn smart_preview_handles_empty() {
323 assert_eq!(smart_preview(&[], 16), "(empty)");
324 }
325
326 #[test]
327 fn pss_sub_id_uses_topic_hex() {
328 let topic = Topic::from_string("test-topic");
329 let id = pss_sub_id(&topic);
330 assert!(id.starts_with("pss:"));
331 assert_eq!(&id[4..], &topic.to_hex());
332 }
333
334 #[test]
335 fn gsoc_sub_id_combines_owner_and_identifier() {
336 let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
337 let id = Identifier::new(&[0xab; 32]).unwrap();
338 let sub_id = gsoc_sub_id(&owner, &id);
339 assert!(sub_id.starts_with("gsoc:"));
340 assert!(sub_id.contains(&owner.to_hex()));
341 assert!(sub_id.contains(&id.to_hex()));
342 }
343}