1use std::collections::HashMap;
2
3use tokio::sync::mpsc;
4
5use room_protocol::SubscriptionTier;
6
7use crate::message::Message;
8use input::{
9 parse_kick_broadcast, parse_status_broadcast, parse_subscription_broadcast,
10 seed_online_users_from_who,
11};
12use render::{assign_color, ColorMap};
13
14mod colors;
15mod display;
16mod dm;
17mod event_loop;
18mod frame;
19mod input;
20mod markdown;
21mod panel;
22mod parse;
23mod render;
24mod render_bots;
25mod widgets;
26
27pub use event_loop::run;
28
29const MAX_INPUT_LINES: usize = 6;
31
32struct RoomTab {
35 room_id: String,
36 messages: Vec<Message>,
37 online_users: Vec<String>,
38 user_statuses: HashMap<String, String>,
39 subscription_tiers: HashMap<String, SubscriptionTier>,
40 unread_count: usize,
41 scroll_offset: usize,
42 msg_rx: mpsc::UnboundedReceiver<Message>,
43 write_half: tokio::net::unix::OwnedWriteHalf,
44}
45
46enum DrainResult {
48 Ok,
50 Disconnected,
52}
53
54impl RoomTab {
55 fn process_message(&mut self, msg: Message, color_map: &mut ColorMap, is_active: bool) {
59 match &msg {
60 Message::Join { user, .. } if !self.online_users.contains(user) => {
61 assign_color(user, color_map);
62 self.online_users.push(user.clone());
63 }
64 Message::Leave { user, .. } => {
65 self.online_users.retain(|u| u != user);
66 self.user_statuses.remove(user);
67 self.subscription_tiers.remove(user);
68 }
69 Message::Message { user, .. } if !self.online_users.contains(user) => {
70 assign_color(user, color_map);
71 self.online_users.push(user.clone());
72 }
73 Message::Message { user, .. } => {
74 assign_color(user, color_map);
75 }
76 Message::System { user, content, .. } if user == "broker" => {
77 seed_online_users_from_who(
78 content,
79 &mut self.online_users,
80 &mut self.user_statuses,
81 );
82 if let Some((name, status)) = parse_status_broadcast(content) {
83 self.user_statuses.insert(name, status);
84 }
85 if let Some(kicked) = parse_kick_broadcast(content) {
86 self.online_users.retain(|u| u != kicked);
87 self.user_statuses.remove(kicked);
88 self.subscription_tiers.remove(kicked);
89 }
90 if let Some((name, tier)) = parse_subscription_broadcast(content) {
91 self.subscription_tiers.insert(name, tier);
92 }
93 for u in &self.online_users {
94 assign_color(u, color_map);
95 }
96 }
97 _ => {}
98 }
99 if !is_active {
100 self.unread_count += 1;
101 }
102 self.messages.push(msg);
103 }
104
105 fn drain_messages(&mut self, color_map: &mut ColorMap, is_active: bool) -> DrainResult {
107 loop {
108 match self.msg_rx.try_recv() {
109 Ok(msg) => self.process_message(msg, color_map, is_active),
110 Err(mpsc::error::TryRecvError::Empty) => return DrainResult::Ok,
111 Err(mpsc::error::TryRecvError::Disconnected) => return DrainResult::Disconnected,
112 }
113 }
114 }
115}
116
117#[cfg(test)]
120mod tests {
121 use super::*;
122 use chrono::Utc;
123
124 fn make_msg(user: &str, content: &str) -> Message {
125 Message::Message {
126 id: "test-id".into(),
127 room: "test-room".into(),
128 user: user.into(),
129 ts: Utc::now(),
130 content: content.into(),
131 seq: None,
132 }
133 }
134
135 fn make_join(user: &str) -> Message {
136 Message::Join {
137 id: "test-id".into(),
138 room: "test-room".into(),
139 user: user.into(),
140 ts: Utc::now(),
141 seq: None,
142 }
143 }
144
145 fn make_leave(user: &str) -> Message {
146 Message::Leave {
147 id: "test-id".into(),
148 room: "test-room".into(),
149 user: user.into(),
150 ts: Utc::now(),
151 seq: None,
152 }
153 }
154
155 fn make_system(content: &str) -> Message {
156 Message::System {
157 id: "test-id".into(),
158 room: "test-room".into(),
159 user: "broker".into(),
160 ts: Utc::now(),
161 content: content.into(),
162 seq: None,
163 data: None,
164 }
165 }
166
167 #[tokio::test]
170 async fn process_message_adds_user_on_join() {
171 let (_, rx) = mpsc::unbounded_channel();
172 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
173 let mut tab = RoomTab {
174 room_id: "test".into(),
175 messages: Vec::new(),
176 online_users: Vec::new(),
177 user_statuses: HashMap::new(),
178 subscription_tiers: HashMap::new(),
179 unread_count: 0,
180 scroll_offset: 0,
181 msg_rx: rx,
182 write_half: wh,
183 };
184 let mut cm = ColorMap::new();
185
186 tab.process_message(make_join("alice"), &mut cm, true);
187 assert_eq!(tab.online_users, vec!["alice"]);
188 assert_eq!(tab.messages.len(), 1);
189 }
190
191 #[tokio::test]
192 async fn process_message_removes_user_on_leave() {
193 let (_, rx) = mpsc::unbounded_channel();
194 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
195 let mut tab = RoomTab {
196 room_id: "test".into(),
197 messages: Vec::new(),
198 online_users: vec!["alice".into()],
199 user_statuses: HashMap::new(),
200 subscription_tiers: HashMap::new(),
201 unread_count: 0,
202 scroll_offset: 0,
203 msg_rx: rx,
204 write_half: wh,
205 };
206 let mut cm = ColorMap::new();
207
208 tab.process_message(make_leave("alice"), &mut cm, true);
209 assert!(tab.online_users.is_empty());
210 }
211
212 #[tokio::test]
213 async fn process_message_increments_unread_when_inactive() {
214 let (_, rx) = mpsc::unbounded_channel();
215 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
216 let mut tab = RoomTab {
217 room_id: "test".into(),
218 messages: Vec::new(),
219 online_users: Vec::new(),
220 user_statuses: HashMap::new(),
221 subscription_tiers: HashMap::new(),
222 unread_count: 0,
223 scroll_offset: 0,
224 msg_rx: rx,
225 write_half: wh,
226 };
227 let mut cm = ColorMap::new();
228
229 tab.process_message(make_msg("bob", "hello"), &mut cm, false);
230 assert_eq!(tab.unread_count, 1);
231
232 tab.process_message(make_msg("bob", "world"), &mut cm, false);
233 assert_eq!(tab.unread_count, 2);
234 }
235
236 #[tokio::test]
237 async fn process_message_no_unread_when_active() {
238 let (_, rx) = mpsc::unbounded_channel();
239 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
240 let mut tab = RoomTab {
241 room_id: "test".into(),
242 messages: Vec::new(),
243 online_users: Vec::new(),
244 user_statuses: HashMap::new(),
245 subscription_tiers: HashMap::new(),
246 unread_count: 0,
247 scroll_offset: 0,
248 msg_rx: rx,
249 write_half: wh,
250 };
251 let mut cm = ColorMap::new();
252
253 tab.process_message(make_msg("bob", "hello"), &mut cm, true);
254 assert_eq!(tab.unread_count, 0);
255 }
256
257 #[tokio::test]
258 async fn process_message_seeds_user_from_message_sender() {
259 let (_, rx) = mpsc::unbounded_channel();
260 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
261 let mut tab = RoomTab {
262 room_id: "test".into(),
263 messages: Vec::new(),
264 online_users: Vec::new(),
265 user_statuses: HashMap::new(),
266 subscription_tiers: HashMap::new(),
267 unread_count: 0,
268 scroll_offset: 0,
269 msg_rx: rx,
270 write_half: wh,
271 };
272 let mut cm = ColorMap::new();
273
274 tab.process_message(make_msg("charlie", "hi"), &mut cm, true);
275 assert_eq!(tab.online_users, vec!["charlie"]);
276 assert!(cm.contains_key("charlie"));
277 }
278
279 #[tokio::test]
280 async fn process_message_does_not_duplicate_existing_user() {
281 let (_, rx) = mpsc::unbounded_channel();
282 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
283 let mut tab = RoomTab {
284 room_id: "test".into(),
285 messages: Vec::new(),
286 online_users: vec!["alice".into()],
287 user_statuses: HashMap::new(),
288 subscription_tiers: HashMap::new(),
289 unread_count: 0,
290 scroll_offset: 0,
291 msg_rx: rx,
292 write_half: wh,
293 };
294 let mut cm = ColorMap::new();
295
296 tab.process_message(make_msg("alice", "hi"), &mut cm, true);
297 assert_eq!(tab.online_users.len(), 1);
298 }
299
300 #[tokio::test]
303 async fn drain_messages_processes_pending() {
304 let (tx, rx) = mpsc::unbounded_channel();
305 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
306 let mut tab = RoomTab {
307 room_id: "test".into(),
308 messages: Vec::new(),
309 online_users: Vec::new(),
310 user_statuses: HashMap::new(),
311 subscription_tiers: HashMap::new(),
312 unread_count: 0,
313 scroll_offset: 0,
314 msg_rx: rx,
315 write_half: wh,
316 };
317 let mut cm = ColorMap::new();
318
319 tx.send(make_msg("bob", "one")).unwrap();
320 tx.send(make_msg("bob", "two")).unwrap();
321
322 let result = tab.drain_messages(&mut cm, true);
323 assert!(matches!(result, DrainResult::Ok));
324 assert_eq!(tab.messages.len(), 2);
325 }
326
327 #[tokio::test]
328 async fn drain_messages_detects_disconnect() {
329 let (tx, rx) = mpsc::unbounded_channel();
330 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
331 let mut tab = RoomTab {
332 room_id: "test".into(),
333 messages: Vec::new(),
334 online_users: Vec::new(),
335 user_statuses: HashMap::new(),
336 subscription_tiers: HashMap::new(),
337 unread_count: 0,
338 scroll_offset: 0,
339 msg_rx: rx,
340 write_half: wh,
341 };
342 let mut cm = ColorMap::new();
343
344 drop(tx);
345 let result = tab.drain_messages(&mut cm, true);
346 assert!(matches!(result, DrainResult::Disconnected));
347 }
348
349 #[tokio::test]
350 async fn drain_messages_empty_returns_ok() {
351 let (_tx, rx) = mpsc::unbounded_channel::<Message>();
352 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
353 let mut tab = RoomTab {
354 room_id: "test".into(),
355 messages: Vec::new(),
356 online_users: Vec::new(),
357 user_statuses: HashMap::new(),
358 subscription_tiers: HashMap::new(),
359 unread_count: 0,
360 scroll_offset: 0,
361 msg_rx: rx,
362 write_half: wh,
363 };
364 let mut cm = ColorMap::new();
365
366 let result = tab.drain_messages(&mut cm, true);
367 assert!(matches!(result, DrainResult::Ok));
368 assert!(tab.messages.is_empty());
369 }
370
371 #[tokio::test]
372 async fn process_system_message_parses_status() {
373 let (_, rx) = mpsc::unbounded_channel();
374 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
375 let mut tab = RoomTab {
376 room_id: "test".into(),
377 messages: Vec::new(),
378 online_users: vec!["alice".into()],
379 user_statuses: HashMap::new(),
380 subscription_tiers: HashMap::new(),
381 unread_count: 0,
382 scroll_offset: 0,
383 msg_rx: rx,
384 write_half: wh,
385 };
386 let mut cm = ColorMap::new();
387
388 tab.process_message(make_system("alice set status: coding"), &mut cm, true);
389 assert_eq!(tab.user_statuses.get("alice").unwrap(), "coding");
390 }
391
392 #[tokio::test]
393 async fn process_subscription_broadcast_sets_tier() {
394 let (_, rx) = mpsc::unbounded_channel();
395 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
396 let mut tab = RoomTab {
397 room_id: "test".into(),
398 messages: Vec::new(),
399 online_users: vec!["alice".into()],
400 user_statuses: HashMap::new(),
401 subscription_tiers: HashMap::new(),
402 unread_count: 0,
403 scroll_offset: 0,
404 msg_rx: rx,
405 write_half: wh,
406 };
407 let mut cm = ColorMap::new();
408
409 tab.process_message(
410 make_system("alice subscribed to test (tier: mentions_only)"),
411 &mut cm,
412 true,
413 );
414 assert_eq!(
415 tab.subscription_tiers.get("alice").copied(),
416 Some(SubscriptionTier::MentionsOnly),
417 );
418
419 tab.process_message(
421 make_system("alice subscribed to test (tier: full)"),
422 &mut cm,
423 true,
424 );
425 assert_eq!(
426 tab.subscription_tiers.get("alice").copied(),
427 Some(SubscriptionTier::Full),
428 );
429 }
430
431 #[tokio::test]
432 async fn process_leave_clears_subscription_tier() {
433 let (_, rx) = mpsc::unbounded_channel();
434 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
435 let mut tab = RoomTab {
436 room_id: "test".into(),
437 messages: Vec::new(),
438 online_users: vec!["alice".into()],
439 user_statuses: HashMap::new(),
440 subscription_tiers: HashMap::from([(
441 "alice".to_owned(),
442 SubscriptionTier::MentionsOnly,
443 )]),
444 unread_count: 0,
445 scroll_offset: 0,
446 msg_rx: rx,
447 write_half: wh,
448 };
449 let mut cm = ColorMap::new();
450
451 tab.process_message(make_leave("alice"), &mut cm, true);
452 assert!(tab.subscription_tiers.get("alice").is_none());
453 }
454
455 #[tokio::test]
458 async fn process_kick_broadcast_removes_user() {
459 let (_, rx) = mpsc::unbounded_channel();
460 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
461 let mut tab = RoomTab {
462 room_id: "test".into(),
463 messages: Vec::new(),
464 online_users: vec!["alice".into(), "bob".into()],
465 user_statuses: HashMap::from([("bob".to_owned(), "working".to_owned())]),
466 subscription_tiers: HashMap::from([("bob".to_owned(), SubscriptionTier::Full)]),
467 unread_count: 0,
468 scroll_offset: 0,
469 msg_rx: rx,
470 write_half: wh,
471 };
472 let mut cm = ColorMap::new();
473
474 tab.process_message(
475 make_system("alice kicked bob (token invalidated)"),
476 &mut cm,
477 true,
478 );
479 assert!(
480 !tab.online_users.contains(&"bob".to_owned()),
481 "kicked user must be removed from online_users"
482 );
483 assert!(
484 tab.user_statuses.get("bob").is_none(),
485 "kicked user's status must be cleared"
486 );
487 assert!(
488 tab.subscription_tiers.get("bob").is_none(),
489 "kicked user's subscription tier must be cleared"
490 );
491 assert!(tab.online_users.contains(&"alice".to_owned()));
493 }
494
495 #[tokio::test]
498 async fn process_message_who_with_comma_status_no_fake_users() {
499 let (_, rx) = mpsc::unbounded_channel();
500 let (_, wh) = tokio::net::UnixStream::pair().unwrap().1.into_split();
501 let mut tab = RoomTab {
502 room_id: "test".into(),
503 messages: Vec::new(),
504 online_users: Vec::new(),
505 user_statuses: HashMap::new(),
506 subscription_tiers: HashMap::new(),
507 unread_count: 0,
508 scroll_offset: 0,
509 msg_rx: rx,
510 write_half: wh,
511 };
512 let mut cm = ColorMap::new();
513
514 tab.process_message(
518 make_system("online \u{2014} alice: PR #630 merged, #636 filed, bob"),
519 &mut cm,
520 true,
521 );
522 assert_eq!(
523 tab.online_users.len(),
524 2,
525 "only alice and bob are real users"
526 );
527 assert!(tab.online_users.contains(&"alice".to_owned()));
528 assert!(tab.online_users.contains(&"bob".to_owned()));
529 assert!(
530 !tab.online_users.contains(&"#636 filed".to_owned()),
531 "status fragment must not appear as a user"
532 );
533 }
534}