Skip to main content

bee_tui/components/
pubsub.rs

1//! S15 Pubsub watch screen. Renders the merged timeline of PSS +
2//! GSOC subscriptions managed by [`crate::pubsub`]. Newest message
3//! at the top; the cursor lets the operator inspect a specific
4//! row's full hex/ASCII payload via the detail line.
5
6use std::any::Any;
7use std::collections::VecDeque;
8
9use color_eyre::Result;
10use crossterm::event::{KeyCode, KeyEvent};
11use ratatui::{
12    Frame,
13    layout::{Constraint, Layout, Rect},
14    style::{Color, Modifier, Style},
15    text::{Line, Span},
16    widgets::{Block, Borders, Paragraph},
17};
18
19use super::Component;
20use crate::action::Action;
21use crate::pubsub::{MAX_MESSAGES, PubsubKind, PubsubMessage, smart_preview};
22use crate::theme;
23
24pub struct Pubsub {
25    /// Newest at the front; capped at [`MAX_MESSAGES`].
26    rows: VecDeque<PubsubMessage>,
27    selected: usize,
28    /// Number of currently active subscriptions, displayed in the
29    /// header. Updated by `App` via [`Self::set_active_count`] when
30    /// subscriptions start / stop.
31    active_subs: usize,
32}
33
34impl Default for Pubsub {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl Pubsub {
41    pub fn new() -> Self {
42        Self {
43            rows: VecDeque::with_capacity(MAX_MESSAGES),
44            selected: 0,
45            active_subs: 0,
46        }
47    }
48
49    /// Push a freshly-received message onto the front of the
50    /// timeline. Bounded — when the ring is full the oldest entry
51    /// is evicted. Cursor stays anchored on the row the operator
52    /// was looking at unless the eviction pushed it off the end.
53    pub fn record(&mut self, msg: PubsubMessage) {
54        if self.rows.len() == MAX_MESSAGES {
55            self.rows.pop_back();
56        }
57        self.rows.push_front(msg);
58        if self.selected >= self.rows.len() && !self.rows.is_empty() {
59            self.selected = self.rows.len() - 1;
60        }
61    }
62
63    pub fn set_active_count(&mut self, n: usize) {
64        self.active_subs = n;
65    }
66}
67
68impl Component for Pubsub {
69    fn as_any_mut(&mut self) -> Option<&mut dyn Any> {
70        Some(self)
71    }
72
73    fn handle_key_event(&mut self, key: KeyEvent) -> Result<Option<Action>> {
74        let len = self.rows.len();
75        match key.code {
76            KeyCode::Up | KeyCode::Char('k') => {
77                self.selected = self.selected.saturating_sub(1);
78            }
79            KeyCode::Down | KeyCode::Char('j') if len > 0 && self.selected + 1 < len => {
80                self.selected += 1;
81            }
82            KeyCode::PageUp => {
83                self.selected = self.selected.saturating_sub(10);
84            }
85            KeyCode::PageDown if len > 0 => {
86                self.selected = (self.selected + 10).min(len.saturating_sub(1));
87            }
88            KeyCode::Char('c') => {
89                self.rows.clear();
90                self.selected = 0;
91            }
92            _ => {}
93        }
94        Ok(None)
95    }
96
97    fn draw(&mut self, frame: &mut Frame, area: Rect) -> Result<()> {
98        let t = theme::active();
99        let chunks = Layout::vertical([
100            Constraint::Length(2),
101            Constraint::Min(0),
102            Constraint::Length(2),
103            Constraint::Length(1),
104        ])
105        .split(area);
106
107        // Header
108        let header_line = Line::from(vec![
109            Span::styled(
110                "PUBSUB WATCH",
111                Style::default().add_modifier(Modifier::BOLD),
112            ),
113            Span::raw(format!(
114                "  · {} active subs · {} messages",
115                self.active_subs,
116                self.rows.len(),
117            )),
118        ]);
119        frame.render_widget(
120            Paragraph::new(header_line).block(Block::default().borders(Borders::BOTTOM)),
121            chunks[0],
122        );
123
124        // Body — most-recent-first message timeline.
125        let mut body: Vec<Line> = Vec::with_capacity(self.rows.len() + 1);
126        body.push(Line::from(Span::styled(
127            "  TIME      KIND   CHANNEL      SIZE   PREVIEW",
128            Style::default().fg(t.dim).add_modifier(Modifier::BOLD),
129        )));
130        if self.rows.is_empty() {
131            body.push(Line::from(Span::styled(
132                "  (no messages yet — start a subscription with :pubsub-pss <topic> or :pubsub-gsoc <owner> <id>)",
133                Style::default().fg(t.dim).add_modifier(Modifier::ITALIC),
134            )));
135        } else {
136            for (i, msg) in self.rows.iter().enumerate() {
137                body.push(render_row(msg, i == self.selected, t));
138            }
139        }
140        frame.render_widget(Paragraph::new(body), chunks[1]);
141
142        // Detail — full preview of the cursored row's payload + channel.
143        let detail = match self.rows.get(self.selected) {
144            Some(msg) => {
145                let preview_long = smart_preview(&msg.payload, 200);
146                vec![
147                    Line::from(Span::styled(
148                        format!("  channel: {} · {} bytes", msg.channel, msg.payload.len(),),
149                        Style::default().fg(t.dim),
150                    )),
151                    Line::from(Span::styled(
152                        format!("  data: {preview_long}"),
153                        Style::default().fg(t.dim),
154                    )),
155                ]
156            }
157            None => vec![Line::from(""), Line::from("")],
158        };
159        frame.render_widget(Paragraph::new(detail), chunks[2]);
160
161        // Footer — keymap.
162        frame.render_widget(
163            Paragraph::new(Line::from(vec![
164                Span::styled(
165                    " ↑↓/jk ",
166                    Style::default().fg(Color::Black).bg(Color::White),
167                ),
168                Span::raw(" select  "),
169                Span::styled(" c ", Style::default().fg(Color::Black).bg(Color::White)),
170                Span::raw(" clear timeline  "),
171                Span::styled(" Tab ", Style::default().fg(Color::Black).bg(Color::White)),
172                Span::raw(" switch screen  "),
173                Span::styled(" : ", Style::default().fg(Color::Black).bg(Color::White)),
174                Span::raw(" command  "),
175                Span::styled(" q ", Style::default().fg(Color::Black).bg(Color::White)),
176                Span::raw(" quit "),
177            ])),
178            chunks[3],
179        );
180        Ok(())
181    }
182}
183
184fn render_row(msg: &PubsubMessage, is_selected: bool, t: &theme::Theme) -> Line<'static> {
185    let time_str = format_clock(msg.received_at);
186    let kind_str = match msg.kind {
187        PubsubKind::Pss => "PSS ",
188        PubsubKind::Gsoc => "GSOC",
189    };
190    let chan_short = short_hex(&msg.channel, 12);
191    let preview = smart_preview(&msg.payload, 50);
192    let row_style = if is_selected {
193        Style::default().add_modifier(Modifier::REVERSED)
194    } else {
195        match msg.kind {
196            PubsubKind::Pss => Style::default(),
197            PubsubKind::Gsoc => Style::default().fg(t.info),
198        }
199    };
200    Line::from(vec![Span::styled(
201        format!(
202            "  {time_str}   {kind_str}  {chan_short:<12}  {:>4}   {preview}",
203            msg.payload.len(),
204        ),
205        row_style,
206    )])
207}
208
209fn short_hex(hex: &str, len: usize) -> String {
210    let s = hex.trim_start_matches("0x");
211    if s.len() > len {
212        format!("{}…", &s[..len])
213    } else {
214        s.to_string()
215    }
216}
217
218fn format_clock(t: std::time::SystemTime) -> String {
219    use std::time::{Duration, UNIX_EPOCH};
220    let secs = t
221        .duration_since(UNIX_EPOCH)
222        .unwrap_or(Duration::ZERO)
223        .as_secs();
224    let h = (secs / 3600) % 24;
225    let m = (secs / 60) % 60;
226    let s = secs % 60;
227    format!("{h:02}:{m:02}:{s:02}")
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use std::time::SystemTime;
234
235    fn msg(kind: PubsubKind, channel: &str, payload: &[u8]) -> PubsubMessage {
236        PubsubMessage {
237            received_at: SystemTime::now(),
238            kind,
239            channel: channel.to_string(),
240            payload: payload.to_vec(),
241        }
242    }
243
244    #[test]
245    fn record_pushes_newest_to_front() {
246        let mut s = Pubsub::new();
247        s.record(msg(PubsubKind::Pss, "topic-1", b"first"));
248        s.record(msg(PubsubKind::Pss, "topic-1", b"second"));
249        assert_eq!(s.rows[0].payload, b"second");
250        assert_eq!(s.rows[1].payload, b"first");
251    }
252
253    #[test]
254    fn record_evicts_oldest_when_full() {
255        let mut s = Pubsub::new();
256        for i in 0..(MAX_MESSAGES + 5) {
257            s.record(msg(PubsubKind::Pss, "topic", format!("msg-{i}").as_bytes()));
258        }
259        assert_eq!(s.rows.len(), MAX_MESSAGES);
260        // Newest at the front: msg-(MAX_MESSAGES + 4).
261        let head = std::str::from_utf8(&s.rows[0].payload).unwrap();
262        assert_eq!(head, format!("msg-{}", MAX_MESSAGES + 4));
263    }
264
265    #[test]
266    fn clear_key_empties_timeline() {
267        let mut s = Pubsub::new();
268        s.record(msg(PubsubKind::Pss, "topic", b"data"));
269        assert_eq!(s.rows.len(), 1);
270        s.handle_key_event(KeyEvent::from(KeyCode::Char('c')))
271            .unwrap();
272        assert!(s.rows.is_empty());
273        assert_eq!(s.selected, 0);
274    }
275
276    #[test]
277    fn cursor_clamps_at_last_row() {
278        let mut s = Pubsub::new();
279        s.record(msg(PubsubKind::Pss, "topic", b"a"));
280        s.record(msg(PubsubKind::Pss, "topic", b"b"));
281        for _ in 0..10 {
282            s.handle_key_event(KeyEvent::from(KeyCode::Down)).unwrap();
283        }
284        assert_eq!(s.selected, 1);
285    }
286
287    #[test]
288    fn set_active_count_updates_header_state() {
289        let mut s = Pubsub::new();
290        s.set_active_count(3);
291        assert_eq!(s.active_subs, 3);
292    }
293}