1use std::collections::VecDeque;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use bee::swarm::{EthAddress, Identifier, Topic};
18use tokio::sync::Mutex;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio_util::sync::CancellationToken;
21
22use crate::api::ApiClient;
23
24pub const MAX_MESSAGES: usize = 500;
28
29#[derive(Debug, Clone)]
34pub struct PubsubMessage {
35 pub received_at: SystemTime,
39 pub kind: PubsubKind,
40 pub channel: String,
43 pub payload: Vec<u8>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum PubsubKind {
50 Pss,
51 Gsoc,
52}
53
54impl PubsubKind {
55 pub fn as_str(self) -> &'static str {
56 match self {
57 Self::Pss => "PSS",
58 Self::Gsoc => "GSOC",
59 }
60 }
61}
62
63pub fn pss_sub_id(topic: &Topic) -> String {
67 format!("pss:{}", topic.to_hex())
68}
69
70pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
71 format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
72}
73
74pub struct HistoryFile {
80 file: tokio::fs::File,
81 path: PathBuf,
82 bytes_written: u64,
83 rotate_size_bytes: u64,
87 keep_files: u32,
91}
92
93pub type HistoryWriter = Option<Arc<Mutex<HistoryFile>>>;
98
99pub async fn open_history_writer(
109 path: &Path,
110 rotate_size_bytes: u64,
111 keep_files: u32,
112) -> Result<HistoryWriter, String> {
113 let mut opts = tokio::fs::OpenOptions::new();
114 opts.create(true).append(true);
115 #[cfg(unix)]
116 {
117 opts.mode(0o600);
121 }
122 let file = opts
123 .open(path)
124 .await
125 .map_err(|e| format!("open {}: {e}", path.display()))?;
126 let bytes_written = file.metadata().await.map(|m| m.len()).unwrap_or(0);
129 Ok(Some(Arc::new(Mutex::new(HistoryFile {
130 file,
131 path: path.to_path_buf(),
132 bytes_written,
133 rotate_size_bytes,
134 keep_files,
135 }))))
136}
137
138async fn append_history(writer: &HistoryWriter, msg: &PubsubMessage) {
144 let Some(handle) = writer.as_ref() else {
145 return;
146 };
147 let received_unix = msg
148 .received_at
149 .duration_since(UNIX_EPOCH)
150 .map(|d| d.as_secs())
151 .unwrap_or(0);
152 let line = serde_json::json!({
153 "received_unix": received_unix,
154 "kind": msg.kind.as_str(),
155 "channel": msg.channel,
156 "size": msg.payload.len(),
157 "payload_hex": hex_preview(&msg.payload, msg.payload.len() * 2),
158 });
159 let mut bytes = match serde_json::to_vec(&line) {
160 Ok(b) => b,
161 Err(e) => {
162 tracing::warn!(target: "bee_tui::pubsub", "history serialise failed: {e}");
163 return;
164 }
165 };
166 bytes.push(b'\n');
167 let mut guard = handle.lock().await;
168 use tokio::io::AsyncWriteExt;
169 if let Err(e) = guard.file.write_all(&bytes).await {
170 tracing::warn!(target: "bee_tui::pubsub", "history append failed: {e}");
171 return;
172 }
173 guard.bytes_written += bytes.len() as u64;
174 if guard.rotate_size_bytes > 0
175 && guard.keep_files > 0
176 && guard.bytes_written >= guard.rotate_size_bytes
177 {
178 if let Err(e) = rotate_history(&mut guard).await {
179 tracing::warn!(target: "bee_tui::pubsub", "history rotate failed: {e}");
180 }
181 }
182}
183
184async fn rotate_history(guard: &mut HistoryFile) -> Result<(), String> {
189 use tokio::fs;
190 use tokio::io::AsyncWriteExt;
191
192 if let Err(e) = guard.file.flush().await {
198 return Err(format!("flush before rotate: {e}"));
199 }
200
201 let base = guard.path.clone();
202 let keep = guard.keep_files;
203
204 for i in (2..=keep).rev() {
208 let from = rotation_path(&base, i - 1);
209 let to = rotation_path(&base, i);
210 match fs::rename(&from, &to).await {
211 Ok(()) => {}
212 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
213 Err(e) => {
214 return Err(format!(
215 "rename {} -> {}: {e}",
216 from.display(),
217 to.display()
218 ));
219 }
220 }
221 }
222 let dot1 = rotation_path(&base, 1);
224 fs::rename(&base, &dot1)
225 .await
226 .map_err(|e| format!("rename {} -> {}: {e}", base.display(), dot1.display()))?;
227
228 let mut opts = fs::OpenOptions::new();
231 opts.create(true).append(true);
232 #[cfg(unix)]
233 {
234 opts.mode(0o600);
235 }
236 let new_file = opts
237 .open(&base)
238 .await
239 .map_err(|e| format!("re-open {}: {e}", base.display()))?;
240 guard.file = new_file;
241 guard.bytes_written = 0;
242 Ok(())
243}
244
245fn rotation_path(base: &Path, n: u32) -> PathBuf {
246 let mut s = base.as_os_str().to_os_string();
248 s.push(format!(".{n}"));
249 PathBuf::from(s)
250}
251
252pub async fn replay_history_file(path: &Path) -> Result<Vec<PubsubMessage>, String> {
259 use tokio::io::{AsyncBufReadExt, BufReader};
260 let file = tokio::fs::File::open(path)
261 .await
262 .map_err(|e| format!("open {}: {e}", path.display()))?;
263 let mut reader = BufReader::new(file).lines();
264 let mut buf: VecDeque<PubsubMessage> = VecDeque::with_capacity(MAX_MESSAGES);
265 let mut total_lines: u64 = 0;
266 let mut bad_lines: u64 = 0;
267 loop {
268 let line = match reader.next_line().await {
269 Ok(Some(l)) => l,
270 Ok(None) => break,
271 Err(e) => return Err(format!("read {}: {e}", path.display())),
272 };
273 if line.trim().is_empty() {
274 continue;
275 }
276 total_lines += 1;
277 match parse_history_line(&line) {
278 Ok(m) => {
279 if buf.len() == MAX_MESSAGES {
280 buf.pop_front();
281 }
282 buf.push_back(m);
283 }
284 Err(e) => {
285 bad_lines += 1;
286 tracing::warn!(target: "bee_tui::pubsub", "replay: skip bad line: {e}");
287 }
288 }
289 }
290 if bad_lines > 0 {
291 tracing::info!(
292 target: "bee_tui::pubsub",
293 "replay: parsed {ok}/{total} lines ({bad} skipped)",
294 ok = total_lines - bad_lines,
295 total = total_lines,
296 bad = bad_lines,
297 );
298 }
299 Ok(buf.into_iter().collect())
300}
301
302pub fn parse_history_line(line: &str) -> Result<PubsubMessage, String> {
305 let v: serde_json::Value = serde_json::from_str(line).map_err(|e| format!("json: {e}"))?;
306 let received_unix = v
307 .get("received_unix")
308 .and_then(|x| x.as_u64())
309 .ok_or_else(|| "missing received_unix".to_string())?;
310 let kind_str = v
311 .get("kind")
312 .and_then(|x| x.as_str())
313 .ok_or_else(|| "missing kind".to_string())?;
314 let kind = match kind_str {
315 "PSS" => PubsubKind::Pss,
316 "GSOC" => PubsubKind::Gsoc,
317 other => return Err(format!("bad kind {other:?}")),
318 };
319 let channel = v
320 .get("channel")
321 .and_then(|x| x.as_str())
322 .ok_or_else(|| "missing channel".to_string())?
323 .to_string();
324 let payload_hex = v
325 .get("payload_hex")
326 .and_then(|x| x.as_str())
327 .ok_or_else(|| "missing payload_hex".to_string())?;
328 let payload = decode_hex(payload_hex.trim_end_matches('…'))?;
329 Ok(PubsubMessage {
330 received_at: UNIX_EPOCH + Duration::from_secs(received_unix),
331 kind,
332 channel,
333 payload,
334 })
335}
336
337fn decode_hex(s: &str) -> Result<Vec<u8>, String> {
338 if s.len() % 2 != 0 {
339 return Err(format!("odd hex length: {}", s.len()));
340 }
341 let bytes = s.as_bytes();
342 let mut out = Vec::with_capacity(s.len() / 2);
343 for chunk in bytes.chunks_exact(2) {
344 let hi = nybble(chunk[0])?;
345 let lo = nybble(chunk[1])?;
346 out.push((hi << 4) | lo);
347 }
348 Ok(out)
349}
350
351fn nybble(b: u8) -> Result<u8, String> {
352 match b {
353 b'0'..=b'9' => Ok(b - b'0'),
354 b'a'..=b'f' => Ok(b - b'a' + 10),
355 b'A'..=b'F' => Ok(b - b'A' + 10),
356 _ => Err(format!("bad hex char: {:?}", b as char)),
357 }
358}
359
360pub async fn spawn_pss_watcher(
367 api: Arc<ApiClient>,
368 topic: Topic,
369 cancel: CancellationToken,
370 tx: UnboundedSender<PubsubMessage>,
371 history: HistoryWriter,
372) -> Result<(), String> {
373 let mut sub = api
374 .bee()
375 .pss()
376 .subscribe(&topic)
377 .await
378 .map_err(|e| format!("/pss/subscribe failed: {e}"))?;
379 let channel = topic.to_hex();
380 tokio::spawn(async move {
381 loop {
382 tokio::select! {
383 msg = sub.recv() => {
384 match msg {
385 Some(payload) => {
386 let m = PubsubMessage {
387 received_at: SystemTime::now(),
388 kind: PubsubKind::Pss,
389 channel: channel.clone(),
390 payload: payload.to_vec(),
391 };
392 append_history(&history, &m).await;
393 let _ = tx.send(m);
394 }
395 None => return, }
397 }
398 _ = cancel.cancelled() => {
399 sub.cancel();
400 return;
401 }
402 }
403 }
404 });
405 Ok(())
406}
407
408pub async fn spawn_gsoc_watcher(
411 api: Arc<ApiClient>,
412 owner: EthAddress,
413 identifier: Identifier,
414 cancel: CancellationToken,
415 tx: UnboundedSender<PubsubMessage>,
416 history: HistoryWriter,
417) -> Result<(), String> {
418 let mut sub = api
419 .bee()
420 .gsoc()
421 .subscribe(&owner, &identifier)
422 .await
423 .map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
424 let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
426 Ok(r) => r.to_hex(),
427 Err(e) => return Err(format!("calculate soc address: {e}")),
428 };
429 tokio::spawn(async move {
430 loop {
431 tokio::select! {
432 msg = sub.recv() => {
433 match msg {
434 Some(payload) => {
435 let m = PubsubMessage {
436 received_at: SystemTime::now(),
437 kind: PubsubKind::Gsoc,
438 channel: channel.clone(),
439 payload: payload.to_vec(),
440 };
441 append_history(&history, &m).await;
442 let _ = tx.send(m);
443 }
444 None => return,
445 }
446 }
447 _ = cancel.cancelled() => {
448 sub.cancel();
449 return;
450 }
451 }
452 }
453 });
454 Ok(())
455}
456
457pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
461 let mut s = String::with_capacity(cap.min(bytes.len()));
462 for &b in bytes.iter().take(cap) {
463 if (0x20..0x7f).contains(&b) {
464 s.push(b as char);
465 } else {
466 s.push('.');
467 }
468 }
469 if bytes.len() > cap {
470 s.push('…');
471 }
472 s
473}
474
475pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
478 let bytes_to_show = bytes.iter().take(cap / 2);
479 let mut s = String::with_capacity(cap.min(bytes.len() * 2));
480 for b in bytes_to_show {
481 s.push_str(&format!("{b:02x}"));
482 }
483 if bytes.len() * 2 > cap {
484 s.push('…');
485 }
486 s
487}
488
489pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
493 if bytes.is_empty() {
494 return "(empty)".to_string();
495 }
496 let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
497 let ratio = printable as f64 / bytes.len() as f64;
498 if ratio >= 0.75 {
499 ascii_preview(bytes, cap)
500 } else {
501 hex_preview(bytes, cap)
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn ascii_preview_replaces_nonprintable() {
511 let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
512 assert_eq!(p, "hi.!.");
513 }
514
515 #[test]
516 fn ascii_preview_caps_with_ellipsis() {
517 let p = ascii_preview(b"abcdefghij", 5);
518 assert_eq!(p, "abcde…");
519 }
520
521 #[test]
522 fn hex_preview_renders_two_hex_chars_per_byte() {
523 let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
524 assert_eq!(p, "deadbeef");
525 }
526
527 #[test]
528 fn hex_preview_caps_with_ellipsis() {
529 let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
530 assert_eq!(p, "dead…");
531 }
532
533 #[test]
534 fn smart_preview_picks_ascii_for_text() {
535 let p = smart_preview(b"hello world!", 32);
536 assert_eq!(p, "hello world!");
537 }
538
539 #[test]
540 fn smart_preview_picks_hex_for_binary() {
541 let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
542 assert_eq!(p, "fffefd00");
544 }
545
546 #[test]
547 fn smart_preview_handles_empty() {
548 assert_eq!(smart_preview(&[], 16), "(empty)");
549 }
550
551 #[test]
552 fn pss_sub_id_uses_topic_hex() {
553 let topic = Topic::from_string("test-topic");
554 let id = pss_sub_id(&topic);
555 assert!(id.starts_with("pss:"));
556 assert_eq!(&id[4..], &topic.to_hex());
557 }
558
559 #[test]
560 fn gsoc_sub_id_combines_owner_and_identifier() {
561 let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
562 let id = Identifier::new(&[0xab; 32]).unwrap();
563 let sub_id = gsoc_sub_id(&owner, &id);
564 assert!(sub_id.starts_with("gsoc:"));
565 assert!(sub_id.contains(&owner.to_hex()));
566 assert!(sub_id.contains(&id.to_hex()));
567 }
568
569 #[test]
570 fn decode_hex_roundtrips_preview_output() {
571 let bytes = vec![0x00, 0xde, 0xad, 0xbe, 0xef, 0x7f];
572 let hex = hex_preview(&bytes, bytes.len() * 2);
573 assert_eq!(decode_hex(&hex).unwrap(), bytes);
574 }
575
576 #[test]
577 fn decode_hex_rejects_odd_length() {
578 assert!(decode_hex("abc").is_err());
579 }
580
581 #[test]
582 fn decode_hex_rejects_non_hex_char() {
583 assert!(decode_hex("0g").is_err());
584 }
585
586 #[test]
587 fn parse_history_line_round_trips_message() {
588 let line = r#"{"received_unix":1730000000,"kind":"PSS","channel":"deadbeef","size":4,"payload_hex":"01020304"}"#;
589 let m = parse_history_line(line).unwrap();
590 assert_eq!(m.kind, PubsubKind::Pss);
591 assert_eq!(m.channel, "deadbeef");
592 assert_eq!(m.payload, vec![1, 2, 3, 4]);
593 let secs = m.received_at.duration_since(UNIX_EPOCH).unwrap().as_secs();
594 assert_eq!(secs, 1_730_000_000);
595 }
596
597 #[test]
598 fn parse_history_line_handles_gsoc_kind() {
599 let line = r#"{"received_unix":1,"kind":"GSOC","channel":"abc","size":0,"payload_hex":""}"#;
600 let m = parse_history_line(line).unwrap();
601 assert_eq!(m.kind, PubsubKind::Gsoc);
602 assert!(m.payload.is_empty());
603 }
604
605 #[test]
606 fn parse_history_line_rejects_unknown_kind() {
607 let line = r#"{"received_unix":1,"kind":"WAT","channel":"x","size":0,"payload_hex":""}"#;
608 assert!(parse_history_line(line).is_err());
609 }
610
611 #[test]
612 fn rotation_path_appends_dot_n() {
613 let p = rotation_path(Path::new("/tmp/pubsub.jsonl"), 3);
614 assert_eq!(p, PathBuf::from("/tmp/pubsub.jsonl.3"));
615 }
616
617 #[tokio::test]
618 async fn rotation_rolls_over_at_threshold() {
619 let dir = tempfile::tempdir().unwrap();
620 let path = dir.path().join("h.jsonl");
621 let writer = open_history_writer(&path, 50, 3).await.unwrap();
623 let now = SystemTime::now();
624 for i in 0..6 {
625 let m = PubsubMessage {
626 received_at: now,
627 kind: PubsubKind::Pss,
628 channel: format!("ch{i}"),
629 payload: vec![i as u8; 4],
630 };
631 append_history(&writer, &m).await;
632 }
633 assert!(path.is_file(), "active file kept after rotation");
635 assert!(path.with_extension("jsonl.1").is_file(), ".1 exists");
637 assert!(
639 !path.with_extension("jsonl.4").exists(),
640 ".4 should not exist with keep_files=3"
641 );
642 }
643
644 #[tokio::test]
645 async fn replay_round_trips_messages_in_order() {
646 let dir = tempfile::tempdir().unwrap();
647 let path = dir.path().join("h.jsonl");
648 let writer = open_history_writer(&path, 0, 0).await.unwrap();
650 let now = SystemTime::now();
651 let msgs = vec![
652 PubsubMessage {
653 received_at: now,
654 kind: PubsubKind::Pss,
655 channel: "topic-a".into(),
656 payload: b"one".to_vec(),
657 },
658 PubsubMessage {
659 received_at: now,
660 kind: PubsubKind::Gsoc,
661 channel: "abcdef".into(),
662 payload: vec![0xff, 0xfe, 0xfd],
663 },
664 ];
665 for m in &msgs {
666 append_history(&writer, m).await;
667 }
668 drop(writer);
670 let replayed = replay_history_file(&path).await.unwrap();
671 assert_eq!(replayed.len(), 2);
672 assert_eq!(replayed[0].channel, "topic-a");
673 assert_eq!(replayed[0].payload, b"one");
674 assert_eq!(replayed[1].kind, PubsubKind::Gsoc);
675 assert_eq!(replayed[1].payload, vec![0xff, 0xfe, 0xfd]);
676 }
677
678 #[tokio::test]
679 async fn replay_caps_at_max_messages() {
680 let dir = tempfile::tempdir().unwrap();
681 let path = dir.path().join("h.jsonl");
682 let writer = open_history_writer(&path, 0, 0).await.unwrap();
683 let now = SystemTime::now();
684 for i in 0..(MAX_MESSAGES + 7) {
685 let m = PubsubMessage {
686 received_at: now,
687 kind: PubsubKind::Pss,
688 channel: "t".into(),
689 payload: format!("p{i}").into_bytes(),
690 };
691 append_history(&writer, &m).await;
692 }
693 drop(writer);
694 let replayed = replay_history_file(&path).await.unwrap();
695 assert_eq!(replayed.len(), MAX_MESSAGES);
696 assert_eq!(replayed[0].payload, b"p7");
698 }
699
700 #[tokio::test]
701 async fn replay_skips_bad_lines() {
702 use tokio::io::AsyncWriteExt;
703 let dir = tempfile::tempdir().unwrap();
704 let path = dir.path().join("h.jsonl");
705 let mut f = tokio::fs::File::create(&path).await.unwrap();
706 f.write_all(b"{not json\n").await.unwrap();
707 f.write_all(b"{\"received_unix\":1,\"kind\":\"PSS\",\"channel\":\"c\",\"size\":1,\"payload_hex\":\"ab\"}\n").await.unwrap();
708 f.write_all(b"{\"missing\":\"fields\"}\n").await.unwrap();
709 f.flush().await.unwrap();
710 drop(f);
711 let replayed = replay_history_file(&path).await.unwrap();
712 assert_eq!(replayed.len(), 1);
713 assert_eq!(replayed[0].payload, vec![0xab]);
714 }
715}