use std::sync::Arc;
use std::time::SystemTime;
use bee::swarm::{EthAddress, Identifier, Topic};
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;
use crate::api::ApiClient;
pub const MAX_MESSAGES: usize = 500;
#[derive(Debug, Clone)]
pub struct PubsubMessage {
pub received_at: SystemTime,
pub kind: PubsubKind,
pub channel: String,
pub payload: Vec<u8>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PubsubKind {
Pss,
Gsoc,
}
impl PubsubKind {
pub fn as_str(self) -> &'static str {
match self {
Self::Pss => "PSS",
Self::Gsoc => "GSOC",
}
}
}
pub fn pss_sub_id(topic: &Topic) -> String {
format!("pss:{}", topic.to_hex())
}
pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
}
pub async fn spawn_pss_watcher(
api: Arc<ApiClient>,
topic: Topic,
cancel: CancellationToken,
tx: UnboundedSender<PubsubMessage>,
) -> Result<(), String> {
let mut sub = api
.bee()
.pss()
.subscribe(&topic)
.await
.map_err(|e| format!("/pss/subscribe failed: {e}"))?;
let channel = topic.to_hex();
tokio::spawn(async move {
loop {
tokio::select! {
msg = sub.recv() => {
match msg {
Some(payload) => {
let _ = tx.send(PubsubMessage {
received_at: SystemTime::now(),
kind: PubsubKind::Pss,
channel: channel.clone(),
payload: payload.to_vec(),
});
}
None => return, }
}
_ = cancel.cancelled() => {
sub.cancel();
return;
}
}
}
});
Ok(())
}
pub async fn spawn_gsoc_watcher(
api: Arc<ApiClient>,
owner: EthAddress,
identifier: Identifier,
cancel: CancellationToken,
tx: UnboundedSender<PubsubMessage>,
) -> Result<(), String> {
let mut sub = api
.bee()
.gsoc()
.subscribe(&owner, &identifier)
.await
.map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
Ok(r) => r.to_hex(),
Err(e) => return Err(format!("calculate soc address: {e}")),
};
tokio::spawn(async move {
loop {
tokio::select! {
msg = sub.recv() => {
match msg {
Some(payload) => {
let _ = tx.send(PubsubMessage {
received_at: SystemTime::now(),
kind: PubsubKind::Gsoc,
channel: channel.clone(),
payload: payload.to_vec(),
});
}
None => return,
}
}
_ = cancel.cancelled() => {
sub.cancel();
return;
}
}
}
});
Ok(())
}
pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
let mut s = String::with_capacity(cap.min(bytes.len()));
for &b in bytes.iter().take(cap) {
if (0x20..0x7f).contains(&b) {
s.push(b as char);
} else {
s.push('.');
}
}
if bytes.len() > cap {
s.push('…');
}
s
}
pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
let bytes_to_show = bytes.iter().take(cap / 2);
let mut s = String::with_capacity(cap.min(bytes.len() * 2));
for b in bytes_to_show {
s.push_str(&format!("{b:02x}"));
}
if bytes.len() * 2 > cap {
s.push('…');
}
s
}
pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
if bytes.is_empty() {
return "(empty)".to_string();
}
let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
let ratio = printable as f64 / bytes.len() as f64;
if ratio >= 0.75 {
ascii_preview(bytes, cap)
} else {
hex_preview(bytes, cap)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ascii_preview_replaces_nonprintable() {
let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
assert_eq!(p, "hi.!.");
}
#[test]
fn ascii_preview_caps_with_ellipsis() {
let p = ascii_preview(b"abcdefghij", 5);
assert_eq!(p, "abcde…");
}
#[test]
fn hex_preview_renders_two_hex_chars_per_byte() {
let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
assert_eq!(p, "deadbeef");
}
#[test]
fn hex_preview_caps_with_ellipsis() {
let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
assert_eq!(p, "dead…");
}
#[test]
fn smart_preview_picks_ascii_for_text() {
let p = smart_preview(b"hello world!", 32);
assert_eq!(p, "hello world!");
}
#[test]
fn smart_preview_picks_hex_for_binary() {
let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
assert_eq!(p, "fffefd00");
}
#[test]
fn smart_preview_handles_empty() {
assert_eq!(smart_preview(&[], 16), "(empty)");
}
#[test]
fn pss_sub_id_uses_topic_hex() {
let topic = Topic::from_string("test-topic");
let id = pss_sub_id(&topic);
assert!(id.starts_with("pss:"));
assert_eq!(&id[4..], &topic.to_hex());
}
#[test]
fn gsoc_sub_id_combines_owner_and_identifier() {
let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
let id = Identifier::new(&[0xab; 32]).unwrap();
let sub_id = gsoc_sub_id(&owner, &id);
assert!(sub_id.starts_with("gsoc:"));
assert!(sub_id.contains(&owner.to_hex()));
assert!(sub_id.contains(&id.to_hex()));
}
}