1use std::sync::Arc;
13use std::time::SystemTime;
14
15use bee::swarm::{EthAddress, Identifier, Topic};
16use tokio::sync::mpsc::UnboundedSender;
17use tokio_util::sync::CancellationToken;
18
19use crate::api::ApiClient;
20
21pub const MAX_MESSAGES: usize = 500;
25
26#[derive(Debug, Clone)]
31pub struct PubsubMessage {
32 pub received_at: SystemTime,
36 pub kind: PubsubKind,
37 pub channel: String,
40 pub payload: Vec<u8>,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum PubsubKind {
47 Pss,
48 Gsoc,
49}
50
51impl PubsubKind {
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::Pss => "PSS",
55 Self::Gsoc => "GSOC",
56 }
57 }
58}
59
60pub fn pss_sub_id(topic: &Topic) -> String {
64 format!("pss:{}", topic.to_hex())
65}
66
67pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
68 format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
69}
70
71pub async fn spawn_pss_watcher(
76 api: Arc<ApiClient>,
77 topic: Topic,
78 cancel: CancellationToken,
79 tx: UnboundedSender<PubsubMessage>,
80) -> Result<(), String> {
81 let mut sub = api
82 .bee()
83 .pss()
84 .subscribe(&topic)
85 .await
86 .map_err(|e| format!("/pss/subscribe failed: {e}"))?;
87 let channel = topic.to_hex();
88 tokio::spawn(async move {
89 loop {
90 tokio::select! {
91 msg = sub.recv() => {
92 match msg {
93 Some(payload) => {
94 let _ = tx.send(PubsubMessage {
95 received_at: SystemTime::now(),
96 kind: PubsubKind::Pss,
97 channel: channel.clone(),
98 payload: payload.to_vec(),
99 });
100 }
101 None => return, }
103 }
104 _ = cancel.cancelled() => {
105 sub.cancel();
106 return;
107 }
108 }
109 }
110 });
111 Ok(())
112}
113
114pub async fn spawn_gsoc_watcher(
117 api: Arc<ApiClient>,
118 owner: EthAddress,
119 identifier: Identifier,
120 cancel: CancellationToken,
121 tx: UnboundedSender<PubsubMessage>,
122) -> Result<(), String> {
123 let mut sub = api
124 .bee()
125 .gsoc()
126 .subscribe(&owner, &identifier)
127 .await
128 .map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
129 let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
131 Ok(r) => r.to_hex(),
132 Err(e) => return Err(format!("calculate soc address: {e}")),
133 };
134 tokio::spawn(async move {
135 loop {
136 tokio::select! {
137 msg = sub.recv() => {
138 match msg {
139 Some(payload) => {
140 let _ = tx.send(PubsubMessage {
141 received_at: SystemTime::now(),
142 kind: PubsubKind::Gsoc,
143 channel: channel.clone(),
144 payload: payload.to_vec(),
145 });
146 }
147 None => return,
148 }
149 }
150 _ = cancel.cancelled() => {
151 sub.cancel();
152 return;
153 }
154 }
155 }
156 });
157 Ok(())
158}
159
160pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
164 let mut s = String::with_capacity(cap.min(bytes.len()));
165 for &b in bytes.iter().take(cap) {
166 if (0x20..0x7f).contains(&b) {
167 s.push(b as char);
168 } else {
169 s.push('.');
170 }
171 }
172 if bytes.len() > cap {
173 s.push('…');
174 }
175 s
176}
177
178pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
181 let bytes_to_show = bytes.iter().take(cap / 2);
182 let mut s = String::with_capacity(cap.min(bytes.len() * 2));
183 for b in bytes_to_show {
184 s.push_str(&format!("{b:02x}"));
185 }
186 if bytes.len() * 2 > cap {
187 s.push('…');
188 }
189 s
190}
191
192pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
196 if bytes.is_empty() {
197 return "(empty)".to_string();
198 }
199 let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
200 let ratio = printable as f64 / bytes.len() as f64;
201 if ratio >= 0.75 {
202 ascii_preview(bytes, cap)
203 } else {
204 hex_preview(bytes, cap)
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 #[test]
213 fn ascii_preview_replaces_nonprintable() {
214 let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
215 assert_eq!(p, "hi.!.");
216 }
217
218 #[test]
219 fn ascii_preview_caps_with_ellipsis() {
220 let p = ascii_preview(b"abcdefghij", 5);
221 assert_eq!(p, "abcde…");
222 }
223
224 #[test]
225 fn hex_preview_renders_two_hex_chars_per_byte() {
226 let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
227 assert_eq!(p, "deadbeef");
228 }
229
230 #[test]
231 fn hex_preview_caps_with_ellipsis() {
232 let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
233 assert_eq!(p, "dead…");
234 }
235
236 #[test]
237 fn smart_preview_picks_ascii_for_text() {
238 let p = smart_preview(b"hello world!", 32);
239 assert_eq!(p, "hello world!");
240 }
241
242 #[test]
243 fn smart_preview_picks_hex_for_binary() {
244 let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
245 assert_eq!(p, "fffefd00");
247 }
248
249 #[test]
250 fn smart_preview_handles_empty() {
251 assert_eq!(smart_preview(&[], 16), "(empty)");
252 }
253
254 #[test]
255 fn pss_sub_id_uses_topic_hex() {
256 let topic = Topic::from_string("test-topic");
257 let id = pss_sub_id(&topic);
258 assert!(id.starts_with("pss:"));
259 assert_eq!(&id[4..], &topic.to_hex());
260 }
261
262 #[test]
263 fn gsoc_sub_id_combines_owner_and_identifier() {
264 let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
265 let id = Identifier::new(&[0xab; 32]).unwrap();
266 let sub_id = gsoc_sub_id(&owner, &id);
267 assert!(sub_id.starts_with("gsoc:"));
268 assert!(sub_id.contains(&owner.to_hex()));
269 assert!(sub_id.contains(&id.to_hex()));
270 }
271}