Skip to main content

bee_tui/
pubsub.rs

1//! Pubsub watch — the receiver side of v1.3's `:gsoc-mine` and
2//! `:pss-target` writer verbs. PSS topic subscriptions and GSOC
3//! `(owner, identifier)` subscriptions both connect to a Bee
4//! WebSocket and emit message frames whenever a publisher hits the
5//! corresponding `/pss/send` / SOC-upload endpoint.
6//!
7//! The S15 Pubsub screen renders a merged timeline of all active
8//! subscriptions; this module owns the subscription metadata + the
9//! [`PubsubMessage`] shape that flows between the background watcher
10//! tasks and the screen via an mpsc channel in `App`.
11
12use std::collections::VecDeque;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use bee::swarm::{EthAddress, Identifier, Topic};
18use tokio::sync::Mutex;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio_util::sync::CancellationToken;
21
22use crate::api::ApiClient;
23
24/// Maximum number of messages the screen retains. Older messages
25/// are evicted from the front (`VecDeque`-style) so the cockpit's
26/// memory stays bounded even on a noisy topic.
27pub const MAX_MESSAGES: usize = 500;
28
29/// One delivered message — what the screen renders as a row.
30/// Encodes both PSS (topic-keyed) and GSOC (address-keyed) variants
31/// in a single shape so the merged timeline can render them
32/// together, distinguished by [`PubsubKind`] in the row glyph.
33#[derive(Debug, Clone)]
34pub struct PubsubMessage {
35    /// Wall-clock time the cockpit received the frame from Bee
36    /// (not the publisher's timestamp; PSS / GSOC frames don't
37    /// carry one of their own).
38    pub received_at: SystemTime,
39    pub kind: PubsubKind,
40    /// `topic` for PSS, the SOC address (hex) for GSOC. The screen
41    /// renders this short; the detail line shows the full hex.
42    pub channel: String,
43    /// Raw payload bytes. Bounded by Bee's chunk-size cap (4 KiB)
44    /// for both PSS and GSOC.
45    pub payload: Vec<u8>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum PubsubKind {
50    Pss,
51    Gsoc,
52}
53
54impl PubsubKind {
55    pub fn as_str(self) -> &'static str {
56        match self {
57            Self::Pss => "PSS",
58            Self::Gsoc => "GSOC",
59        }
60    }
61}
62
63/// Identity for an active subscription. Used as the key in `App`'s
64/// `pubsub_subs` hashmap so re-issuing `:pubsub-pss <topic>` for an
65/// already-subscribed topic is detectable + cancellable.
66pub fn pss_sub_id(topic: &Topic) -> String {
67    format!("pss:{}", topic.to_hex())
68}
69
70pub fn gsoc_sub_id(owner: &EthAddress, identifier: &Identifier) -> String {
71    format!("gsoc:{}:{}", owner.to_hex(), identifier.to_hex())
72}
73
74/// State backing the optional pubsub-history JSONL file. Tracks the
75/// current write offset so the writer can rotate at a configured
76/// size threshold without an extra `metadata()` syscall per append.
77/// Held inside an `Arc<Mutex<_>>` so every watcher task shares the
78/// same file handle + offset and serialises through the mutex.
79pub struct HistoryFile {
80    file: tokio::fs::File,
81    path: PathBuf,
82    bytes_written: u64,
83    /// Rotation threshold. When `bytes_written >= rotate_size_bytes`
84    /// after an append, the file is rolled over (`path.1` ←
85    /// `path`). Zero disables rotation (file grows unbounded).
86    rotate_size_bytes: u64,
87    /// How many rotated files (`.1` .. `.N`) to retain. Older
88    /// rotations are unlinked. Zero disables rotation entirely
89    /// (any rotation would discard data).
90    keep_files: u32,
91}
92
93/// Optional history-file handle shared by every active subscription.
94/// `None` when `[pubsub].history_file` is unset; `Some(handle)` when
95/// the operator opted in. JSONL appends and rotation are serialised
96/// through the mutex so concurrent watchers don't tear lines.
97pub type HistoryWriter = Option<Arc<Mutex<HistoryFile>>>;
98
99/// Open `path` for append (`O_CREATE | O_APPEND`), creating it if
100/// it doesn't exist. `rotate_size_bytes` and `keep_files` configure
101/// rotation: when a fresh append makes the file cross the size
102/// threshold, the active file is renamed to `path.1` (older
103/// rotations shift to `.2`, `.3`, ..., `.keep_files`; oldest is
104/// dropped) and a new empty file takes its place. Pass
105/// `rotate_size_bytes = 0` to disable rotation. Errors are surfaced
106/// to the caller so cockpit startup can log a clear "history file
107/// disabled: <reason>" message and keep the live tail going.
108pub async fn open_history_writer(
109    path: &Path,
110    rotate_size_bytes: u64,
111    keep_files: u32,
112) -> Result<HistoryWriter, String> {
113    let mut opts = tokio::fs::OpenOptions::new();
114    opts.create(true).append(true);
115    #[cfg(unix)]
116    {
117        // tokio::fs::OpenOptions exposes mode() directly on Unix
118        // (no OpenOptionsExt import needed). Owner-only because
119        // pubsub payloads can be sensitive on shared hosts.
120        opts.mode(0o600);
121    }
122    let file = opts
123        .open(path)
124        .await
125        .map_err(|e| format!("open {}: {e}", path.display()))?;
126    // Seed the byte counter from the file's current size — re-opening
127    // an existing JSONL must not reset rotation tracking back to zero.
128    let bytes_written = file.metadata().await.map(|m| m.len()).unwrap_or(0);
129    Ok(Some(Arc::new(Mutex::new(HistoryFile {
130        file,
131        path: path.to_path_buf(),
132        bytes_written,
133        rotate_size_bytes,
134        keep_files,
135    }))))
136}
137
138/// Append one JSONL line for `msg`. Failures are logged but never
139/// fatal — losing a history line shouldn't kill the live tail.
140/// Rotates the file in place once `bytes_written` crosses the
141/// configured threshold (post-append, so the rotated file may be
142/// slightly above the threshold by one line).
143async fn append_history(writer: &HistoryWriter, msg: &PubsubMessage) {
144    let Some(handle) = writer.as_ref() else {
145        return;
146    };
147    let received_unix = msg
148        .received_at
149        .duration_since(UNIX_EPOCH)
150        .map(|d| d.as_secs())
151        .unwrap_or(0);
152    let line = serde_json::json!({
153        "received_unix": received_unix,
154        "kind": msg.kind.as_str(),
155        "channel": msg.channel,
156        "size": msg.payload.len(),
157        "payload_hex": hex_preview(&msg.payload, msg.payload.len() * 2),
158    });
159    let mut bytes = match serde_json::to_vec(&line) {
160        Ok(b) => b,
161        Err(e) => {
162            tracing::warn!(target: "bee_tui::pubsub", "history serialise failed: {e}");
163            return;
164        }
165    };
166    bytes.push(b'\n');
167    let mut guard = handle.lock().await;
168    use tokio::io::AsyncWriteExt;
169    if let Err(e) = guard.file.write_all(&bytes).await {
170        tracing::warn!(target: "bee_tui::pubsub", "history append failed: {e}");
171        return;
172    }
173    guard.bytes_written += bytes.len() as u64;
174    if guard.rotate_size_bytes > 0
175        && guard.keep_files > 0
176        && guard.bytes_written >= guard.rotate_size_bytes
177    {
178        if let Err(e) = rotate_history(&mut guard).await {
179            tracing::warn!(target: "bee_tui::pubsub", "history rotate failed: {e}");
180        }
181    }
182}
183
184/// Roll the active history file: rename `path.{N-1}` → `path.{N}`
185/// in descending order (dropping `path.{keep_files}`), then rename
186/// `path` → `path.1`, then re-open `path` fresh. Holds the mutex
187/// guard so no concurrent appends race with the rename.
188async fn rotate_history(guard: &mut HistoryFile) -> Result<(), String> {
189    use tokio::fs;
190    use tokio::io::AsyncWriteExt;
191
192    // Flush + close the current file so the rename observes a fully
193    // persisted file. tokio::fs::File doesn't expose explicit close;
194    // dropping the handle is enough, but we want to keep `guard.file`
195    // populated for type-safety, so we replace it after opening the
196    // new one (below).
197    if let Err(e) = guard.file.flush().await {
198        return Err(format!("flush before rotate: {e}"));
199    }
200
201    let base = guard.path.clone();
202    let keep = guard.keep_files;
203
204    // Shift rotations: i = keep..=2 → rename .{i-1} → .{i}.
205    // Skip missing files (an empty rotation slot is normal on the
206    // first rotation).
207    for i in (2..=keep).rev() {
208        let from = rotation_path(&base, i - 1);
209        let to = rotation_path(&base, i);
210        match fs::rename(&from, &to).await {
211            Ok(()) => {}
212            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
213            Err(e) => {
214                return Err(format!(
215                    "rename {} -> {}: {e}",
216                    from.display(),
217                    to.display()
218                ));
219            }
220        }
221    }
222    // Rename current → .1.
223    let dot1 = rotation_path(&base, 1);
224    fs::rename(&base, &dot1)
225        .await
226        .map_err(|e| format!("rename {} -> {}: {e}", base.display(), dot1.display()))?;
227
228    // Open a fresh active file with the same options as
229    // open_history_writer.
230    let mut opts = fs::OpenOptions::new();
231    opts.create(true).append(true);
232    #[cfg(unix)]
233    {
234        opts.mode(0o600);
235    }
236    let new_file = opts
237        .open(&base)
238        .await
239        .map_err(|e| format!("re-open {}: {e}", base.display()))?;
240    guard.file = new_file;
241    guard.bytes_written = 0;
242    Ok(())
243}
244
245fn rotation_path(base: &Path, n: u32) -> PathBuf {
246    // Append `.N` to the file name (preserves any existing extension).
247    let mut s = base.as_os_str().to_os_string();
248    s.push(format!(".{n}"));
249    PathBuf::from(s)
250}
251
252/// Read a previously-written pubsub history file back into a vector
253/// of [`PubsubMessage`]s for replay onto the S15 timeline. Lines
254/// that fail to parse are logged + skipped so a single corrupt line
255/// doesn't kill the whole replay. Caps the return at [`MAX_MESSAGES`]
256/// (the most recent N lines), since the screen can only render that
257/// many anyway. Returned in chronological order (oldest → newest).
258pub async fn replay_history_file(path: &Path) -> Result<Vec<PubsubMessage>, String> {
259    use tokio::io::{AsyncBufReadExt, BufReader};
260    let file = tokio::fs::File::open(path)
261        .await
262        .map_err(|e| format!("open {}: {e}", path.display()))?;
263    let mut reader = BufReader::new(file).lines();
264    let mut buf: VecDeque<PubsubMessage> = VecDeque::with_capacity(MAX_MESSAGES);
265    let mut total_lines: u64 = 0;
266    let mut bad_lines: u64 = 0;
267    loop {
268        let line = match reader.next_line().await {
269            Ok(Some(l)) => l,
270            Ok(None) => break,
271            Err(e) => return Err(format!("read {}: {e}", path.display())),
272        };
273        if line.trim().is_empty() {
274            continue;
275        }
276        total_lines += 1;
277        match parse_history_line(&line) {
278            Ok(m) => {
279                if buf.len() == MAX_MESSAGES {
280                    buf.pop_front();
281                }
282                buf.push_back(m);
283            }
284            Err(e) => {
285                bad_lines += 1;
286                tracing::warn!(target: "bee_tui::pubsub", "replay: skip bad line: {e}");
287            }
288        }
289    }
290    if bad_lines > 0 {
291        tracing::info!(
292            target: "bee_tui::pubsub",
293            "replay: parsed {ok}/{total} lines ({bad} skipped)",
294            ok = total_lines - bad_lines,
295            total = total_lines,
296            bad = bad_lines,
297        );
298    }
299    Ok(buf.into_iter().collect())
300}
301
302/// Parse one JSONL history line into a [`PubsubMessage`]. Public
303/// only for tests; the writer's format is the contract.
304pub fn parse_history_line(line: &str) -> Result<PubsubMessage, String> {
305    let v: serde_json::Value = serde_json::from_str(line).map_err(|e| format!("json: {e}"))?;
306    let received_unix = v
307        .get("received_unix")
308        .and_then(|x| x.as_u64())
309        .ok_or_else(|| "missing received_unix".to_string())?;
310    let kind_str = v
311        .get("kind")
312        .and_then(|x| x.as_str())
313        .ok_or_else(|| "missing kind".to_string())?;
314    let kind = match kind_str {
315        "PSS" => PubsubKind::Pss,
316        "GSOC" => PubsubKind::Gsoc,
317        other => return Err(format!("bad kind {other:?}")),
318    };
319    let channel = v
320        .get("channel")
321        .and_then(|x| x.as_str())
322        .ok_or_else(|| "missing channel".to_string())?
323        .to_string();
324    let payload_hex = v
325        .get("payload_hex")
326        .and_then(|x| x.as_str())
327        .ok_or_else(|| "missing payload_hex".to_string())?;
328    let payload = decode_hex(payload_hex.trim_end_matches('…'))?;
329    Ok(PubsubMessage {
330        received_at: UNIX_EPOCH + Duration::from_secs(received_unix),
331        kind,
332        channel,
333        payload,
334    })
335}
336
337fn decode_hex(s: &str) -> Result<Vec<u8>, String> {
338    if s.len() % 2 != 0 {
339        return Err(format!("odd hex length: {}", s.len()));
340    }
341    let bytes = s.as_bytes();
342    let mut out = Vec::with_capacity(s.len() / 2);
343    for chunk in bytes.chunks_exact(2) {
344        let hi = nybble(chunk[0])?;
345        let lo = nybble(chunk[1])?;
346        out.push((hi << 4) | lo);
347    }
348    Ok(out)
349}
350
351fn nybble(b: u8) -> Result<u8, String> {
352    match b {
353        b'0'..=b'9' => Ok(b - b'0'),
354        b'a'..=b'f' => Ok(b - b'a' + 10),
355        b'A'..=b'F' => Ok(b - b'A' + 10),
356        _ => Err(format!("bad hex char: {:?}", b as char)),
357    }
358}
359
360/// Spawn a background task that drives a PSS subscription's
361/// `recv()` loop and forwards every delivered message into `tx` as
362/// a [`PubsubMessage`] until `cancel` fires. Returns immediately
363/// after `subscribe` succeeds; the actual recv loop runs on tokio.
364/// `history` is the optional shared writer that appends every
365/// frame to a JSONL file on arrival.
366pub async fn spawn_pss_watcher(
367    api: Arc<ApiClient>,
368    topic: Topic,
369    cancel: CancellationToken,
370    tx: UnboundedSender<PubsubMessage>,
371    history: HistoryWriter,
372) -> Result<(), String> {
373    let mut sub = api
374        .bee()
375        .pss()
376        .subscribe(&topic)
377        .await
378        .map_err(|e| format!("/pss/subscribe failed: {e}"))?;
379    let channel = topic.to_hex();
380    tokio::spawn(async move {
381        loop {
382            tokio::select! {
383                msg = sub.recv() => {
384                    match msg {
385                        Some(payload) => {
386                            let m = PubsubMessage {
387                                received_at: SystemTime::now(),
388                                kind: PubsubKind::Pss,
389                                channel: channel.clone(),
390                                payload: payload.to_vec(),
391                            };
392                            append_history(&history, &m).await;
393                            let _ = tx.send(m);
394                        }
395                        None => return, // ws closed by Bee
396                    }
397                }
398                _ = cancel.cancelled() => {
399                    sub.cancel();
400                    return;
401                }
402            }
403        }
404    });
405    Ok(())
406}
407
408/// Spawn a background task that drives a GSOC subscription's
409/// `recv()` loop. Same shape as [`spawn_pss_watcher`].
410pub async fn spawn_gsoc_watcher(
411    api: Arc<ApiClient>,
412    owner: EthAddress,
413    identifier: Identifier,
414    cancel: CancellationToken,
415    tx: UnboundedSender<PubsubMessage>,
416    history: HistoryWriter,
417) -> Result<(), String> {
418    let mut sub = api
419        .bee()
420        .gsoc()
421        .subscribe(&owner, &identifier)
422        .await
423        .map_err(|e| format!("/gsoc/subscribe failed: {e}"))?;
424    // Channel = SOC address (the value Bee uses to route).
425    let channel = match bee::swarm::soc::calculate_single_owner_chunk_address(&identifier, &owner) {
426        Ok(r) => r.to_hex(),
427        Err(e) => return Err(format!("calculate soc address: {e}")),
428    };
429    tokio::spawn(async move {
430        loop {
431            tokio::select! {
432                msg = sub.recv() => {
433                    match msg {
434                        Some(payload) => {
435                            let m = PubsubMessage {
436                                received_at: SystemTime::now(),
437                                kind: PubsubKind::Gsoc,
438                                channel: channel.clone(),
439                                payload: payload.to_vec(),
440                            };
441                            append_history(&history, &m).await;
442                            let _ = tx.send(m);
443                        }
444                        None => return,
445                    }
446                }
447                _ = cancel.cancelled() => {
448                    sub.cancel();
449                    return;
450                }
451            }
452        }
453    });
454    Ok(())
455}
456
457/// Convert a small byte payload to a printable-ASCII preview.
458/// Replaces non-printable bytes with `.`. Caps at `cap` characters.
459/// Used by the screen renderer for the inline content peek.
460pub fn ascii_preview(bytes: &[u8], cap: usize) -> String {
461    let mut s = String::with_capacity(cap.min(bytes.len()));
462    for &b in bytes.iter().take(cap) {
463        if (0x20..0x7f).contains(&b) {
464            s.push(b as char);
465        } else {
466            s.push('.');
467        }
468    }
469    if bytes.len() > cap {
470        s.push('…');
471    }
472    s
473}
474
475/// Hex-prefix preview for binary payloads (when ASCII is mostly
476/// dots). Caps at `cap` characters of hex (so `cap/2` bytes).
477pub fn hex_preview(bytes: &[u8], cap: usize) -> String {
478    let bytes_to_show = bytes.iter().take(cap / 2);
479    let mut s = String::with_capacity(cap.min(bytes.len() * 2));
480    for b in bytes_to_show {
481        s.push_str(&format!("{b:02x}"));
482    }
483    if bytes.len() * 2 > cap {
484        s.push('…');
485    }
486    s
487}
488
489/// Pick the smarter of [`ascii_preview`] / [`hex_preview`] for a
490/// payload — ASCII when ≥ 75% of the bytes are printable, hex
491/// otherwise. Pure for testability.
492pub fn smart_preview(bytes: &[u8], cap: usize) -> String {
493    if bytes.is_empty() {
494        return "(empty)".to_string();
495    }
496    let printable = bytes.iter().filter(|&&b| (0x20..0x7f).contains(&b)).count();
497    let ratio = printable as f64 / bytes.len() as f64;
498    if ratio >= 0.75 {
499        ascii_preview(bytes, cap)
500    } else {
501        hex_preview(bytes, cap)
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn ascii_preview_replaces_nonprintable() {
511        let p = ascii_preview(&[b'h', b'i', 0x00, b'!', 0xff], 16);
512        assert_eq!(p, "hi.!.");
513    }
514
515    #[test]
516    fn ascii_preview_caps_with_ellipsis() {
517        let p = ascii_preview(b"abcdefghij", 5);
518        assert_eq!(p, "abcde…");
519    }
520
521    #[test]
522    fn hex_preview_renders_two_hex_chars_per_byte() {
523        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 8);
524        assert_eq!(p, "deadbeef");
525    }
526
527    #[test]
528    fn hex_preview_caps_with_ellipsis() {
529        let p = hex_preview(&[0xde, 0xad, 0xbe, 0xef], 4);
530        assert_eq!(p, "dead…");
531    }
532
533    #[test]
534    fn smart_preview_picks_ascii_for_text() {
535        let p = smart_preview(b"hello world!", 32);
536        assert_eq!(p, "hello world!");
537    }
538
539    #[test]
540    fn smart_preview_picks_hex_for_binary() {
541        let p = smart_preview(&[0xff, 0xfe, 0xfd, 0x00], 16);
542        // Binary → hex form.
543        assert_eq!(p, "fffefd00");
544    }
545
546    #[test]
547    fn smart_preview_handles_empty() {
548        assert_eq!(smart_preview(&[], 16), "(empty)");
549    }
550
551    #[test]
552    fn pss_sub_id_uses_topic_hex() {
553        let topic = Topic::from_string("test-topic");
554        let id = pss_sub_id(&topic);
555        assert!(id.starts_with("pss:"));
556        assert_eq!(&id[4..], &topic.to_hex());
557    }
558
559    #[test]
560    fn gsoc_sub_id_combines_owner_and_identifier() {
561        let owner = EthAddress::from_hex("0x1234567890123456789012345678901234567890").unwrap();
562        let id = Identifier::new(&[0xab; 32]).unwrap();
563        let sub_id = gsoc_sub_id(&owner, &id);
564        assert!(sub_id.starts_with("gsoc:"));
565        assert!(sub_id.contains(&owner.to_hex()));
566        assert!(sub_id.contains(&id.to_hex()));
567    }
568
569    #[test]
570    fn decode_hex_roundtrips_preview_output() {
571        let bytes = vec![0x00, 0xde, 0xad, 0xbe, 0xef, 0x7f];
572        let hex = hex_preview(&bytes, bytes.len() * 2);
573        assert_eq!(decode_hex(&hex).unwrap(), bytes);
574    }
575
576    #[test]
577    fn decode_hex_rejects_odd_length() {
578        assert!(decode_hex("abc").is_err());
579    }
580
581    #[test]
582    fn decode_hex_rejects_non_hex_char() {
583        assert!(decode_hex("0g").is_err());
584    }
585
586    #[test]
587    fn parse_history_line_round_trips_message() {
588        let line = r#"{"received_unix":1730000000,"kind":"PSS","channel":"deadbeef","size":4,"payload_hex":"01020304"}"#;
589        let m = parse_history_line(line).unwrap();
590        assert_eq!(m.kind, PubsubKind::Pss);
591        assert_eq!(m.channel, "deadbeef");
592        assert_eq!(m.payload, vec![1, 2, 3, 4]);
593        let secs = m.received_at.duration_since(UNIX_EPOCH).unwrap().as_secs();
594        assert_eq!(secs, 1_730_000_000);
595    }
596
597    #[test]
598    fn parse_history_line_handles_gsoc_kind() {
599        let line = r#"{"received_unix":1,"kind":"GSOC","channel":"abc","size":0,"payload_hex":""}"#;
600        let m = parse_history_line(line).unwrap();
601        assert_eq!(m.kind, PubsubKind::Gsoc);
602        assert!(m.payload.is_empty());
603    }
604
605    #[test]
606    fn parse_history_line_rejects_unknown_kind() {
607        let line = r#"{"received_unix":1,"kind":"WAT","channel":"x","size":0,"payload_hex":""}"#;
608        assert!(parse_history_line(line).is_err());
609    }
610
611    #[test]
612    fn rotation_path_appends_dot_n() {
613        let p = rotation_path(Path::new("/tmp/pubsub.jsonl"), 3);
614        assert_eq!(p, PathBuf::from("/tmp/pubsub.jsonl.3"));
615    }
616
617    #[tokio::test]
618    async fn rotation_rolls_over_at_threshold() {
619        let dir = tempfile::tempdir().unwrap();
620        let path = dir.path().join("h.jsonl");
621        // Tiny rotate threshold so two short messages already trip it.
622        let writer = open_history_writer(&path, 50, 3).await.unwrap();
623        let now = SystemTime::now();
624        for i in 0..6 {
625            let m = PubsubMessage {
626                received_at: now,
627                kind: PubsubKind::Pss,
628                channel: format!("ch{i}"),
629                payload: vec![i as u8; 4],
630            };
631            append_history(&writer, &m).await;
632        }
633        // Active file exists.
634        assert!(path.is_file(), "active file kept after rotation");
635        // .1 exists (at minimum one rotation happened).
636        assert!(path.with_extension("jsonl.1").is_file(), ".1 exists");
637        // Never goes beyond .keep_files.
638        assert!(
639            !path.with_extension("jsonl.4").exists(),
640            ".4 should not exist with keep_files=3"
641        );
642    }
643
644    #[tokio::test]
645    async fn replay_round_trips_messages_in_order() {
646        let dir = tempfile::tempdir().unwrap();
647        let path = dir.path().join("h.jsonl");
648        // No rotation — single file.
649        let writer = open_history_writer(&path, 0, 0).await.unwrap();
650        let now = SystemTime::now();
651        let msgs = vec![
652            PubsubMessage {
653                received_at: now,
654                kind: PubsubKind::Pss,
655                channel: "topic-a".into(),
656                payload: b"one".to_vec(),
657            },
658            PubsubMessage {
659                received_at: now,
660                kind: PubsubKind::Gsoc,
661                channel: "abcdef".into(),
662                payload: vec![0xff, 0xfe, 0xfd],
663            },
664        ];
665        for m in &msgs {
666            append_history(&writer, m).await;
667        }
668        // Drop the writer so the file flushes / closes before replay.
669        drop(writer);
670        let replayed = replay_history_file(&path).await.unwrap();
671        assert_eq!(replayed.len(), 2);
672        assert_eq!(replayed[0].channel, "topic-a");
673        assert_eq!(replayed[0].payload, b"one");
674        assert_eq!(replayed[1].kind, PubsubKind::Gsoc);
675        assert_eq!(replayed[1].payload, vec![0xff, 0xfe, 0xfd]);
676    }
677
678    #[tokio::test]
679    async fn replay_caps_at_max_messages() {
680        let dir = tempfile::tempdir().unwrap();
681        let path = dir.path().join("h.jsonl");
682        let writer = open_history_writer(&path, 0, 0).await.unwrap();
683        let now = SystemTime::now();
684        for i in 0..(MAX_MESSAGES + 7) {
685            let m = PubsubMessage {
686                received_at: now,
687                kind: PubsubKind::Pss,
688                channel: "t".into(),
689                payload: format!("p{i}").into_bytes(),
690            };
691            append_history(&writer, &m).await;
692        }
693        drop(writer);
694        let replayed = replay_history_file(&path).await.unwrap();
695        assert_eq!(replayed.len(), MAX_MESSAGES);
696        // First retained line should be index 7 (we dropped 0..=6).
697        assert_eq!(replayed[0].payload, b"p7");
698    }
699
700    #[tokio::test]
701    async fn replay_skips_bad_lines() {
702        use tokio::io::AsyncWriteExt;
703        let dir = tempfile::tempdir().unwrap();
704        let path = dir.path().join("h.jsonl");
705        let mut f = tokio::fs::File::create(&path).await.unwrap();
706        f.write_all(b"{not json\n").await.unwrap();
707        f.write_all(b"{\"received_unix\":1,\"kind\":\"PSS\",\"channel\":\"c\",\"size\":1,\"payload_hex\":\"ab\"}\n").await.unwrap();
708        f.write_all(b"{\"missing\":\"fields\"}\n").await.unwrap();
709        f.flush().await.unwrap();
710        drop(f);
711        let replayed = replay_history_file(&path).await.unwrap();
712        assert_eq!(replayed.len(), 1);
713        assert_eq!(replayed[0].payload, vec![0xab]);
714    }
715}