Skip to main content

bee_tui/components/
pubsub.rs

1//! S15 Pubsub watch screen. Renders the merged timeline of PSS +
2//! GSOC subscriptions managed by [`crate::pubsub`]. Newest message
3//! at the top; the cursor lets the operator inspect a specific
4//! row's full hex/ASCII payload via the detail line.
5
6use std::any::Any;
7use std::collections::VecDeque;
8
9use color_eyre::Result;
10use crossterm::event::{KeyCode, KeyEvent};
11use ratatui::{
12    Frame,
13    layout::{Constraint, Layout, Rect},
14    style::{Color, Modifier, Style},
15    text::{Line, Span},
16    widgets::{Block, Borders, Paragraph},
17};
18
19use super::Component;
20use crate::action::Action;
21use crate::pubsub::{MAX_MESSAGES, PubsubKind, PubsubMessage, smart_preview};
22use crate::theme;
23
24pub struct Pubsub {
25    /// Newest at the front; capped at [`MAX_MESSAGES`].
26    rows: VecDeque<PubsubMessage>,
27    selected: usize,
28    /// Number of currently active subscriptions, displayed in the
29    /// header. Updated by `App` via [`Self::set_active_count`] when
30    /// subscriptions start / stop.
31    active_subs: usize,
32    /// Optional case-insensitive substring filter. When `Some`,
33    /// only rows whose channel hex or smart-preview contains the
34    /// substring are rendered; the underlying ring still receives
35    /// every message (filtering is presentation-only).
36    filter: Option<String>,
37}
38
39impl Default for Pubsub {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl Pubsub {
46    pub fn new() -> Self {
47        Self {
48            rows: VecDeque::with_capacity(MAX_MESSAGES),
49            selected: 0,
50            active_subs: 0,
51            filter: None,
52        }
53    }
54
55    /// Set or clear the substring filter. `None` clears it.
56    pub fn set_filter(&mut self, substring: Option<String>) {
57        self.filter = substring.map(|s| s.to_ascii_lowercase());
58        self.selected = 0;
59    }
60
61    /// True iff `msg` matches the active filter (or no filter is
62    /// set). Pure for testability.
63    pub fn matches_filter(&self, msg: &PubsubMessage) -> bool {
64        let Some(needle) = self.filter.as_deref() else {
65            return true;
66        };
67        if msg.channel.to_ascii_lowercase().contains(needle) {
68            return true;
69        }
70        let preview = smart_preview(&msg.payload, 200).to_ascii_lowercase();
71        preview.contains(needle)
72    }
73
74    /// Push a freshly-received message onto the front of the
75    /// timeline. Bounded — when the ring is full the oldest entry
76    /// is evicted. Cursor stays anchored on the row the operator
77    /// was looking at unless the eviction pushed it off the end.
78    pub fn record(&mut self, msg: PubsubMessage) {
79        if self.rows.len() == MAX_MESSAGES {
80            self.rows.pop_back();
81        }
82        self.rows.push_front(msg);
83        if self.selected >= self.rows.len() && !self.rows.is_empty() {
84            self.selected = self.rows.len() - 1;
85        }
86    }
87
88    pub fn set_active_count(&mut self, n: usize) {
89        self.active_subs = n;
90    }
91}
92
93impl Component for Pubsub {
94    fn as_any_mut(&mut self) -> Option<&mut dyn Any> {
95        Some(self)
96    }
97
98    fn handle_key_event(&mut self, key: KeyEvent) -> Result<Option<Action>> {
99        let len = self.rows.len();
100        match key.code {
101            KeyCode::Up | KeyCode::Char('k') => {
102                self.selected = self.selected.saturating_sub(1);
103            }
104            KeyCode::Down | KeyCode::Char('j') if len > 0 && self.selected + 1 < len => {
105                self.selected += 1;
106            }
107            KeyCode::PageUp => {
108                self.selected = self.selected.saturating_sub(10);
109            }
110            KeyCode::PageDown if len > 0 => {
111                self.selected = (self.selected + 10).min(len.saturating_sub(1));
112            }
113            KeyCode::Char('c') => {
114                self.rows.clear();
115                self.selected = 0;
116            }
117            _ => {}
118        }
119        Ok(None)
120    }
121
122    fn draw(&mut self, frame: &mut Frame, area: Rect) -> Result<()> {
123        let t = theme::active();
124        let chunks = Layout::vertical([
125            Constraint::Length(2),
126            Constraint::Min(0),
127            Constraint::Length(2),
128            Constraint::Length(1),
129        ])
130        .split(area);
131
132        // Header
133        let mut header_spans = vec![
134            Span::styled(
135                "PUBSUB WATCH",
136                Style::default().add_modifier(Modifier::BOLD),
137            ),
138            Span::raw(format!(
139                "  · {} active subs · {} messages",
140                self.active_subs,
141                self.rows.len(),
142            )),
143        ];
144        if let Some(f) = &self.filter {
145            header_spans.push(Span::styled(
146                format!("  · filter: {f:?}"),
147                Style::default().fg(t.warn).add_modifier(Modifier::BOLD),
148            ));
149        }
150        let header_line = Line::from(header_spans);
151        frame.render_widget(
152            Paragraph::new(header_line).block(Block::default().borders(Borders::BOTTOM)),
153            chunks[0],
154        );
155
156        // Body — most-recent-first message timeline.
157        let mut body: Vec<Line> = Vec::with_capacity(self.rows.len() + 1);
158        body.push(Line::from(Span::styled(
159            "  TIME      KIND   CHANNEL      SIZE   PREVIEW",
160            Style::default().fg(t.dim).add_modifier(Modifier::BOLD),
161        )));
162        if self.rows.is_empty() {
163            body.push(Line::from(Span::styled(
164                "  (no messages yet — start a subscription with :pubsub-pss <topic> or :pubsub-gsoc <owner> <id>)",
165                Style::default().fg(t.dim).add_modifier(Modifier::ITALIC),
166            )));
167        } else {
168            // Pre-collect filtered rows so the cursor index lines up
169            // with what's actually displayed.
170            let visible: Vec<(usize, &PubsubMessage)> = self
171                .rows
172                .iter()
173                .enumerate()
174                .filter(|(_, m)| self.matches_filter(m))
175                .collect();
176            if visible.is_empty() {
177                body.push(Line::from(Span::styled(
178                    "  (filter matches no messages — :pubsub-filter-clear to clear)",
179                    Style::default().fg(t.dim).add_modifier(Modifier::ITALIC),
180                )));
181            } else {
182                for (i, msg) in &visible {
183                    body.push(render_row(msg, *i == self.selected, t));
184                }
185            }
186        }
187        frame.render_widget(Paragraph::new(body), chunks[1]);
188
189        // Detail — full preview of the cursored row's payload + channel.
190        let detail = match self.rows.get(self.selected) {
191            Some(msg) => {
192                let preview_long = smart_preview(&msg.payload, 200);
193                vec![
194                    Line::from(Span::styled(
195                        format!("  channel: {} · {} bytes", msg.channel, msg.payload.len(),),
196                        Style::default().fg(t.dim),
197                    )),
198                    Line::from(Span::styled(
199                        format!("  data: {preview_long}"),
200                        Style::default().fg(t.dim),
201                    )),
202                ]
203            }
204            None => vec![Line::from(""), Line::from("")],
205        };
206        frame.render_widget(Paragraph::new(detail), chunks[2]);
207
208        // Footer — keymap.
209        frame.render_widget(
210            Paragraph::new(Line::from(vec![
211                Span::styled(
212                    " ↑↓/jk ",
213                    Style::default().fg(Color::Black).bg(Color::White),
214                ),
215                Span::raw(" select  "),
216                Span::styled(" c ", Style::default().fg(Color::Black).bg(Color::White)),
217                Span::raw(" clear timeline  "),
218                Span::styled(" Tab ", Style::default().fg(Color::Black).bg(Color::White)),
219                Span::raw(" switch screen  "),
220                Span::styled(" : ", Style::default().fg(Color::Black).bg(Color::White)),
221                Span::raw(" command  "),
222                Span::styled(" q ", Style::default().fg(Color::Black).bg(Color::White)),
223                Span::raw(" quit "),
224            ])),
225            chunks[3],
226        );
227        Ok(())
228    }
229}
230
231fn render_row(msg: &PubsubMessage, is_selected: bool, t: &theme::Theme) -> Line<'static> {
232    let time_str = format_clock(msg.received_at);
233    let kind_str = match msg.kind {
234        PubsubKind::Pss => "PSS ",
235        PubsubKind::Gsoc => "GSOC",
236    };
237    let chan_short = short_hex(&msg.channel, 12);
238    let preview = smart_preview(&msg.payload, 50);
239    let row_style = if is_selected {
240        Style::default().add_modifier(Modifier::REVERSED)
241    } else {
242        match msg.kind {
243            PubsubKind::Pss => Style::default(),
244            PubsubKind::Gsoc => Style::default().fg(t.info),
245        }
246    };
247    Line::from(vec![Span::styled(
248        format!(
249            "  {time_str}   {kind_str}  {chan_short:<12}  {:>4}   {preview}",
250            msg.payload.len(),
251        ),
252        row_style,
253    )])
254}
255
256fn short_hex(hex: &str, len: usize) -> String {
257    let s = hex.trim_start_matches("0x");
258    if s.len() > len {
259        format!("{}…", &s[..len])
260    } else {
261        s.to_string()
262    }
263}
264
265fn format_clock(t: std::time::SystemTime) -> String {
266    use std::time::{Duration, UNIX_EPOCH};
267    let secs = t
268        .duration_since(UNIX_EPOCH)
269        .unwrap_or(Duration::ZERO)
270        .as_secs();
271    let h = (secs / 3600) % 24;
272    let m = (secs / 60) % 60;
273    let s = secs % 60;
274    format!("{h:02}:{m:02}:{s:02}")
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use std::time::SystemTime;
281
282    fn msg(kind: PubsubKind, channel: &str, payload: &[u8]) -> PubsubMessage {
283        PubsubMessage {
284            received_at: SystemTime::now(),
285            kind,
286            channel: channel.to_string(),
287            payload: payload.to_vec(),
288        }
289    }
290
291    #[test]
292    fn record_pushes_newest_to_front() {
293        let mut s = Pubsub::new();
294        s.record(msg(PubsubKind::Pss, "topic-1", b"first"));
295        s.record(msg(PubsubKind::Pss, "topic-1", b"second"));
296        assert_eq!(s.rows[0].payload, b"second");
297        assert_eq!(s.rows[1].payload, b"first");
298    }
299
300    #[test]
301    fn record_evicts_oldest_when_full() {
302        let mut s = Pubsub::new();
303        for i in 0..(MAX_MESSAGES + 5) {
304            s.record(msg(PubsubKind::Pss, "topic", format!("msg-{i}").as_bytes()));
305        }
306        assert_eq!(s.rows.len(), MAX_MESSAGES);
307        // Newest at the front: msg-(MAX_MESSAGES + 4).
308        let head = std::str::from_utf8(&s.rows[0].payload).unwrap();
309        assert_eq!(head, format!("msg-{}", MAX_MESSAGES + 4));
310    }
311
312    #[test]
313    fn clear_key_empties_timeline() {
314        let mut s = Pubsub::new();
315        s.record(msg(PubsubKind::Pss, "topic", b"data"));
316        assert_eq!(s.rows.len(), 1);
317        s.handle_key_event(KeyEvent::from(KeyCode::Char('c')))
318            .unwrap();
319        assert!(s.rows.is_empty());
320        assert_eq!(s.selected, 0);
321    }
322
323    #[test]
324    fn cursor_clamps_at_last_row() {
325        let mut s = Pubsub::new();
326        s.record(msg(PubsubKind::Pss, "topic", b"a"));
327        s.record(msg(PubsubKind::Pss, "topic", b"b"));
328        for _ in 0..10 {
329            s.handle_key_event(KeyEvent::from(KeyCode::Down)).unwrap();
330        }
331        assert_eq!(s.selected, 1);
332    }
333
334    #[test]
335    fn set_active_count_updates_header_state() {
336        let mut s = Pubsub::new();
337        s.set_active_count(3);
338        assert_eq!(s.active_subs, 3);
339    }
340
341    #[test]
342    fn matches_filter_no_filter_set_passes_everything() {
343        let s = Pubsub::new();
344        let m = msg(PubsubKind::Pss, "abc123", b"hello");
345        assert!(s.matches_filter(&m));
346    }
347
348    #[test]
349    fn matches_filter_substring_in_channel() {
350        let mut s = Pubsub::new();
351        s.set_filter(Some("CAFE".to_string()));
352        let m = msg(PubsubKind::Pss, "cafebabe1234", b"unrelated");
353        assert!(s.matches_filter(&m), "channel match (case-insensitive)");
354    }
355
356    #[test]
357    fn matches_filter_substring_in_preview() {
358        let mut s = Pubsub::new();
359        s.set_filter(Some("ping".to_string()));
360        let m = msg(PubsubKind::Pss, "topic", b"{\"event\":\"ping\"}");
361        assert!(s.matches_filter(&m), "preview match");
362    }
363
364    #[test]
365    fn matches_filter_no_match_drops() {
366        let mut s = Pubsub::new();
367        s.set_filter(Some("xyz".to_string()));
368        let m = msg(PubsubKind::Pss, "topic", b"hello");
369        assert!(!s.matches_filter(&m));
370    }
371}