Skip to main content

room_cli/tui/
mod.rs

1use std::collections::HashMap;
2
3use tokio::sync::mpsc;
4
5use room_protocol::SubscriptionTier;
6
7use crate::message::Message;
8use input::{
9    parse_kick_broadcast, parse_status_broadcast, parse_subscription_broadcast,
10    seed_online_users_from_who,
11};
12use render::{assign_color, ColorMap};
13
14mod colors;
15mod display;
16mod dm;
17mod event_loop;
18mod frame;
19mod input;
20mod markdown;
21mod panel;
22mod parse;
23mod render;
24mod render_bots;
25mod widgets;
26
27pub use event_loop::run;
28
29/// Maximum visible content lines in the input box before it stops growing.
30const MAX_INPUT_LINES: usize = 6;
31
32/// Per-room state for the tabbed TUI. Each tab owns its message buffer,
33/// online user list, status map, and inbound message channel.
34struct RoomTab {
35    room_id: String,
36    messages: Vec<Message>,
37    online_users: Vec<String>,
38    user_statuses: HashMap<String, String>,
39    subscription_tiers: HashMap<String, SubscriptionTier>,
40    unread_count: usize,
41    scroll_offset: usize,
42    msg_rx: mpsc::UnboundedReceiver<Message>,
43    write_half: tokio::net::unix::OwnedWriteHalf,
44}
45
46/// Result of draining messages from a tab's channel.
47enum DrainResult {
48    /// Channel still open, messages drained.
49    Ok,
50    /// Channel closed — broker disconnected.
51    Disconnected,
52}
53
54impl RoomTab {
55    /// Process a single inbound message, updating online_users, statuses, and
56    /// the color map. Pushes the message into the buffer and increments unread
57    /// if `is_active` is false.
58    fn process_message(&mut self, msg: Message, color_map: &mut ColorMap, is_active: bool) {
59        match &msg {
60            Message::Join { user, .. } if !self.online_users.contains(user) => {
61                assign_color(user, color_map);
62                self.online_users.push(user.clone());
63            }
64            Message::Leave { user, .. } => {
65                self.online_users.retain(|u| u != user);
66                self.user_statuses.remove(user);
67                self.subscription_tiers.remove(user);
68            }
69            Message::Message { user, .. } if !self.online_users.contains(user) => {
70                assign_color(user, color_map);
71                self.online_users.push(user.clone());
72            }
73            Message::Message { user, .. } => {
74                assign_color(user, color_map);
75            }
76            Message::System { user, content, .. } if user == "broker" => {
77                seed_online_users_from_who(
78                    content,
79                    &mut self.online_users,
80                    &mut self.user_statuses,
81                );
82                if let Some((name, status)) = parse_status_broadcast(content) {
83                    self.user_statuses.insert(name, status);
84                }
85                if let Some(kicked) = parse_kick_broadcast(content) {
86                    self.online_users.retain(|u| u != kicked);
87                    self.user_statuses.remove(kicked);
88                    self.subscription_tiers.remove(kicked);
89                }
90                if let Some((name, tier)) = parse_subscription_broadcast(content) {
91                    self.subscription_tiers.insert(name, tier);
92                }
93                for u in &self.online_users {
94                    assign_color(u, color_map);
95                }
96            }
97            _ => {}
98        }
99        if !is_active {
100            self.unread_count += 1;
101        }
102        self.messages.push(msg);
103    }
104
105    /// Drain all pending messages from the channel into the message buffer.
106    fn drain_messages(&mut self, color_map: &mut ColorMap, is_active: bool) -> DrainResult {
107        loop {
108            match self.msg_rx.try_recv() {
109                Ok(msg) => self.process_message(msg, color_map, is_active),
110                Err(mpsc::error::TryRecvError::Empty) => return DrainResult::Ok,
111                Err(mpsc::error::TryRecvError::Disconnected) => return DrainResult::Disconnected,
112            }
113        }
114    }
115}
116
117// ── Tests ─────────────────────────────────────────────────────────────────────
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use chrono::Utc;
123
124    fn make_msg(user: &str, content: &str) -> Message {
125        Message::Message {
126            id: "test-id".into(),
127            room: "test-room".into(),
128            user: user.into(),
129            ts: Utc::now(),
130            content: content.into(),
131            seq: None,
132        }
133    }
134
135    fn make_join(user: &str) -> Message {
136        Message::Join {
137            id: "test-id".into(),
138            room: "test-room".into(),
139            user: user.into(),
140            ts: Utc::now(),
141            seq: None,
142        }
143    }
144
145    fn make_leave(user: &str) -> Message {
146        Message::Leave {
147            id: "test-id".into(),
148            room: "test-room".into(),
149            user: user.into(),
150            ts: Utc::now(),
151            seq: None,
152        }
153    }
154
155    fn make_system(content: &str) -> Message {
156        Message::System {
157            id: "test-id".into(),
158            room: "test-room".into(),
159            user: "broker".into(),
160            ts: Utc::now(),
161            content: content.into(),
162            seq: None,
163            data: None,
164        }
165    }
166
167    // ── RoomTab::process_message tests ────────────────────────────────────
168
169    #[tokio::test]
170    async fn process_message_adds_user_on_join() {
171        let (_, rx) = mpsc::unbounded_channel();
172        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
173        let mut tab = RoomTab {
174            room_id: "test".into(),
175            messages: Vec::new(),
176            online_users: Vec::new(),
177            user_statuses: HashMap::new(),
178            subscription_tiers: HashMap::new(),
179            unread_count: 0,
180            scroll_offset: 0,
181            msg_rx: rx,
182            write_half: wh,
183        };
184        let mut cm = ColorMap::new();
185
186        tab.process_message(make_join("alice"), &mut cm, true);
187        assert_eq!(tab.online_users, vec!["alice"]);
188        assert_eq!(tab.messages.len(), 1);
189    }
190
191    #[tokio::test]
192    async fn process_message_removes_user_on_leave() {
193        let (_, rx) = mpsc::unbounded_channel();
194        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
195        let mut tab = RoomTab {
196            room_id: "test".into(),
197            messages: Vec::new(),
198            online_users: vec!["alice".into()],
199            user_statuses: HashMap::new(),
200            subscription_tiers: HashMap::new(),
201            unread_count: 0,
202            scroll_offset: 0,
203            msg_rx: rx,
204            write_half: wh,
205        };
206        let mut cm = ColorMap::new();
207
208        tab.process_message(make_leave("alice"), &mut cm, true);
209        assert!(tab.online_users.is_empty());
210    }
211
212    #[tokio::test]
213    async fn process_message_increments_unread_when_inactive() {
214        let (_, rx) = mpsc::unbounded_channel();
215        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
216        let mut tab = RoomTab {
217            room_id: "test".into(),
218            messages: Vec::new(),
219            online_users: Vec::new(),
220            user_statuses: HashMap::new(),
221            subscription_tiers: HashMap::new(),
222            unread_count: 0,
223            scroll_offset: 0,
224            msg_rx: rx,
225            write_half: wh,
226        };
227        let mut cm = ColorMap::new();
228
229        tab.process_message(make_msg("bob", "hello"), &mut cm, false);
230        assert_eq!(tab.unread_count, 1);
231
232        tab.process_message(make_msg("bob", "world"), &mut cm, false);
233        assert_eq!(tab.unread_count, 2);
234    }
235
236    #[tokio::test]
237    async fn process_message_no_unread_when_active() {
238        let (_, rx) = mpsc::unbounded_channel();
239        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
240        let mut tab = RoomTab {
241            room_id: "test".into(),
242            messages: Vec::new(),
243            online_users: Vec::new(),
244            user_statuses: HashMap::new(),
245            subscription_tiers: HashMap::new(),
246            unread_count: 0,
247            scroll_offset: 0,
248            msg_rx: rx,
249            write_half: wh,
250        };
251        let mut cm = ColorMap::new();
252
253        tab.process_message(make_msg("bob", "hello"), &mut cm, true);
254        assert_eq!(tab.unread_count, 0);
255    }
256
257    #[tokio::test]
258    async fn process_message_seeds_user_from_message_sender() {
259        let (_, rx) = mpsc::unbounded_channel();
260        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
261        let mut tab = RoomTab {
262            room_id: "test".into(),
263            messages: Vec::new(),
264            online_users: Vec::new(),
265            user_statuses: HashMap::new(),
266            subscription_tiers: HashMap::new(),
267            unread_count: 0,
268            scroll_offset: 0,
269            msg_rx: rx,
270            write_half: wh,
271        };
272        let mut cm = ColorMap::new();
273
274        tab.process_message(make_msg("charlie", "hi"), &mut cm, true);
275        assert_eq!(tab.online_users, vec!["charlie"]);
276        assert!(cm.contains_key("charlie"));
277    }
278
279    #[tokio::test]
280    async fn process_message_does_not_duplicate_existing_user() {
281        let (_, rx) = mpsc::unbounded_channel();
282        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
283        let mut tab = RoomTab {
284            room_id: "test".into(),
285            messages: Vec::new(),
286            online_users: vec!["alice".into()],
287            user_statuses: HashMap::new(),
288            subscription_tiers: HashMap::new(),
289            unread_count: 0,
290            scroll_offset: 0,
291            msg_rx: rx,
292            write_half: wh,
293        };
294        let mut cm = ColorMap::new();
295
296        tab.process_message(make_msg("alice", "hi"), &mut cm, true);
297        assert_eq!(tab.online_users.len(), 1);
298    }
299
300    // ── drain_messages tests ──────────────────────────────────────────────
301
302    #[tokio::test]
303    async fn drain_messages_processes_pending() {
304        let (tx, rx) = mpsc::unbounded_channel();
305        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
306        let mut tab = RoomTab {
307            room_id: "test".into(),
308            messages: Vec::new(),
309            online_users: Vec::new(),
310            user_statuses: HashMap::new(),
311            subscription_tiers: HashMap::new(),
312            unread_count: 0,
313            scroll_offset: 0,
314            msg_rx: rx,
315            write_half: wh,
316        };
317        let mut cm = ColorMap::new();
318
319        tx.send(make_msg("bob", "one")).unwrap();
320        tx.send(make_msg("bob", "two")).unwrap();
321
322        let result = tab.drain_messages(&mut cm, true);
323        assert!(matches!(result, DrainResult::Ok));
324        assert_eq!(tab.messages.len(), 2);
325    }
326
327    #[tokio::test]
328    async fn drain_messages_detects_disconnect() {
329        let (tx, rx) = mpsc::unbounded_channel();
330        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
331        let mut tab = RoomTab {
332            room_id: "test".into(),
333            messages: Vec::new(),
334            online_users: Vec::new(),
335            user_statuses: HashMap::new(),
336            subscription_tiers: HashMap::new(),
337            unread_count: 0,
338            scroll_offset: 0,
339            msg_rx: rx,
340            write_half: wh,
341        };
342        let mut cm = ColorMap::new();
343
344        drop(tx);
345        let result = tab.drain_messages(&mut cm, true);
346        assert!(matches!(result, DrainResult::Disconnected));
347    }
348
349    #[tokio::test]
350    async fn drain_messages_empty_returns_ok() {
351        let (_tx, rx) = mpsc::unbounded_channel::<Message>();
352        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
353        let mut tab = RoomTab {
354            room_id: "test".into(),
355            messages: Vec::new(),
356            online_users: Vec::new(),
357            user_statuses: HashMap::new(),
358            subscription_tiers: HashMap::new(),
359            unread_count: 0,
360            scroll_offset: 0,
361            msg_rx: rx,
362            write_half: wh,
363        };
364        let mut cm = ColorMap::new();
365
366        let result = tab.drain_messages(&mut cm, true);
367        assert!(matches!(result, DrainResult::Ok));
368        assert!(tab.messages.is_empty());
369    }
370
371    #[tokio::test]
372    async fn process_system_message_parses_status() {
373        let (_, rx) = mpsc::unbounded_channel();
374        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
375        let mut tab = RoomTab {
376            room_id: "test".into(),
377            messages: Vec::new(),
378            online_users: vec!["alice".into()],
379            user_statuses: HashMap::new(),
380            subscription_tiers: HashMap::new(),
381            unread_count: 0,
382            scroll_offset: 0,
383            msg_rx: rx,
384            write_half: wh,
385        };
386        let mut cm = ColorMap::new();
387
388        tab.process_message(make_system("alice set status: coding"), &mut cm, true);
389        assert_eq!(tab.user_statuses.get("alice").unwrap(), "coding");
390    }
391
392    #[tokio::test]
393    async fn process_subscription_broadcast_sets_tier() {
394        let (_, rx) = mpsc::unbounded_channel();
395        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
396        let mut tab = RoomTab {
397            room_id: "test".into(),
398            messages: Vec::new(),
399            online_users: vec!["alice".into()],
400            user_statuses: HashMap::new(),
401            subscription_tiers: HashMap::new(),
402            unread_count: 0,
403            scroll_offset: 0,
404            msg_rx: rx,
405            write_half: wh,
406        };
407        let mut cm = ColorMap::new();
408
409        tab.process_message(
410            make_system("alice subscribed to test (tier: mentions_only)"),
411            &mut cm,
412            true,
413        );
414        assert_eq!(
415            tab.subscription_tiers.get("alice").copied(),
416            Some(SubscriptionTier::MentionsOnly),
417        );
418
419        // Upgrading to Full clears non-Full indicator.
420        tab.process_message(
421            make_system("alice subscribed to test (tier: full)"),
422            &mut cm,
423            true,
424        );
425        assert_eq!(
426            tab.subscription_tiers.get("alice").copied(),
427            Some(SubscriptionTier::Full),
428        );
429    }
430
431    #[tokio::test]
432    async fn process_leave_clears_subscription_tier() {
433        let (_, rx) = mpsc::unbounded_channel();
434        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
435        let mut tab = RoomTab {
436            room_id: "test".into(),
437            messages: Vec::new(),
438            online_users: vec!["alice".into()],
439            user_statuses: HashMap::new(),
440            subscription_tiers: HashMap::from([(
441                "alice".to_owned(),
442                SubscriptionTier::MentionsOnly,
443            )]),
444            unread_count: 0,
445            scroll_offset: 0,
446            msg_rx: rx,
447            write_half: wh,
448        };
449        let mut cm = ColorMap::new();
450
451        tab.process_message(make_leave("alice"), &mut cm, true);
452        assert!(tab.subscription_tiers.get("alice").is_none());
453    }
454
455    // ── kick removes user from member panel (#505) ───────────────────────
456
457    #[tokio::test]
458    async fn process_kick_broadcast_removes_user() {
459        let (_, rx) = mpsc::unbounded_channel();
460        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
461        let mut tab = RoomTab {
462            room_id: "test".into(),
463            messages: Vec::new(),
464            online_users: vec!["alice".into(), "bob".into()],
465            user_statuses: HashMap::from([("bob".to_owned(), "working".to_owned())]),
466            subscription_tiers: HashMap::from([("bob".to_owned(), SubscriptionTier::Full)]),
467            unread_count: 0,
468            scroll_offset: 0,
469            msg_rx: rx,
470            write_half: wh,
471        };
472        let mut cm = ColorMap::new();
473
474        tab.process_message(
475            make_system("alice kicked bob (token invalidated)"),
476            &mut cm,
477            true,
478        );
479        assert!(
480            !tab.online_users.contains(&"bob".to_owned()),
481            "kicked user must be removed from online_users"
482        );
483        assert!(
484            tab.user_statuses.get("bob").is_none(),
485            "kicked user's status must be cleared"
486        );
487        assert!(
488            tab.subscription_tiers.get("bob").is_none(),
489            "kicked user's subscription tier must be cleared"
490        );
491        // alice should still be online
492        assert!(tab.online_users.contains(&"alice".to_owned()));
493    }
494
495    // ── #656 regression: status commas must not leak fake users ────────
496
497    #[tokio::test]
498    async fn process_message_who_with_comma_status_no_fake_users() {
499        let (_, rx) = mpsc::unbounded_channel();
500        let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
501        let mut tab = RoomTab {
502            room_id: "test".into(),
503            messages: Vec::new(),
504            online_users: Vec::new(),
505            user_statuses: HashMap::new(),
506            subscription_tiers: HashMap::new(),
507            unread_count: 0,
508            scroll_offset: 0,
509            msg_rx: rx,
510            write_half: wh,
511        };
512        let mut cm = ColorMap::new();
513
514        // Simulate a /who response where alice's status contains a comma.
515        // The broker should sanitize this, but even if it doesn't, the
516        // parser must not treat "#636 filed" as a username.
517        tab.process_message(
518            make_system("online \u{2014} alice: PR #630 merged, #636 filed, bob"),
519            &mut cm,
520            true,
521        );
522        assert_eq!(
523            tab.online_users.len(),
524            2,
525            "only alice and bob are real users"
526        );
527        assert!(tab.online_users.contains(&"alice".to_owned()));
528        assert!(tab.online_users.contains(&"bob".to_owned()));
529        assert!(
530            !tab.online_users.contains(&"#636 filed".to_owned()),
531            "status fragment must not appear as a user"
532        );
533    }
534}