1use std::{
2 io::{BufRead, Write},
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10use uuid::Uuid;
11
12use super::{BoxFuture, CommandContext, CommandInfo, ParamSchema, ParamType, Plugin, PluginResult};
13use crate::broker::state::{ClaimEntry, ClaimMap};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct QueueItem {
18 pub id: String,
20 pub description: String,
22 pub added_by: String,
24 pub added_at: DateTime<Utc>,
26 pub status: String,
28}
29
30pub struct QueuePlugin {
37 queue: Arc<Mutex<Vec<QueueItem>>>,
39 queue_path: PathBuf,
41 claim_map: ClaimMap,
43}
44
45impl QueuePlugin {
46 pub(crate) fn new(queue_path: PathBuf, claim_map: ClaimMap) -> anyhow::Result<Self> {
52 let items = load_queue(&queue_path)?;
53 Ok(Self {
54 queue: Arc::new(Mutex::new(items)),
55 queue_path,
56 claim_map,
57 })
58 }
59
60 pub fn queue_path_from_chat(chat_path: &Path) -> PathBuf {
62 chat_path.with_extension("queue")
63 }
64
65 pub fn default_commands() -> Vec<CommandInfo> {
68 vec![CommandInfo {
69 name: "queue".to_owned(),
70 description: "Manage the task backlog".to_owned(),
71 usage: "/queue <add|list|remove|pop> [args]".to_owned(),
72 params: vec![
73 ParamSchema {
74 name: "action".to_owned(),
75 param_type: ParamType::Choice(vec![
76 "add".to_owned(),
77 "list".to_owned(),
78 "remove".to_owned(),
79 "pop".to_owned(),
80 ]),
81 required: true,
82 description: "Action to perform".to_owned(),
83 },
84 ParamSchema {
85 name: "args".to_owned(),
86 param_type: ParamType::Text,
87 required: false,
88 description: "Task description (add) or index (remove)".to_owned(),
89 },
90 ],
91 }]
92 }
93}
94
95impl Plugin for QueuePlugin {
96 fn name(&self) -> &str {
97 "queue"
98 }
99
100 fn commands(&self) -> Vec<CommandInfo> {
101 Self::default_commands()
102 }
103
104 fn handle(&self, ctx: CommandContext) -> BoxFuture<'_, anyhow::Result<PluginResult>> {
105 Box::pin(async move {
106 let action = ctx.params.first().map(String::as_str).unwrap_or("");
107 let rest: Vec<&str> = ctx.params.iter().skip(1).map(String::as_str).collect();
108
109 match action {
110 "add" => self.handle_add(&ctx.sender, &rest).await,
111 "list" => self.handle_list(&ctx).await,
112 "remove" => self.handle_remove(&rest).await,
113 "pop" => self.handle_pop(&ctx.sender).await,
114 _ => Ok(PluginResult::Reply(format!(
115 "queue: unknown action '{action}'. use add, list, remove, or pop"
116 ))),
117 }
118 })
119 }
120}
121
122impl QueuePlugin {
123 async fn handle_add(&self, sender: &str, rest: &[&str]) -> anyhow::Result<PluginResult> {
124 if rest.is_empty() {
125 return Ok(PluginResult::Reply(
126 "queue add: missing task description".to_owned(),
127 ));
128 }
129
130 let description = rest.join(" ");
131 let item = QueueItem {
132 id: Uuid::new_v4().to_string(),
133 description: description.clone(),
134 added_by: sender.to_owned(),
135 added_at: Utc::now(),
136 status: "queued".to_owned(),
137 };
138
139 {
140 let mut queue = self.queue.lock().await;
141 queue.push(item.clone());
142 }
143
144 append_item(&self.queue_path, &item)?;
145 let count = self.queue.lock().await.len();
146
147 Ok(PluginResult::Broadcast(format!(
148 "queue: {sender} added \"{description}\" (#{count} in backlog)"
149 )))
150 }
151
152 async fn handle_list(&self, _ctx: &CommandContext) -> anyhow::Result<PluginResult> {
153 let queue = self.queue.lock().await;
154
155 if queue.is_empty() {
156 return Ok(PluginResult::Reply("queue: backlog is empty".to_owned()));
157 }
158
159 let mut lines = Vec::with_capacity(queue.len() + 1);
160 lines.push(format!("queue: {} item(s) in backlog", queue.len()));
161 for (i, item) in queue.iter().enumerate() {
162 lines.push(format!(
163 " {}. {} (added by {} at {})",
164 i + 1,
165 item.description,
166 item.added_by,
167 item.added_at.format("%Y-%m-%d %H:%M UTC")
168 ));
169 }
170
171 Ok(PluginResult::Reply(lines.join("\n")))
172 }
173
174 async fn handle_remove(&self, rest: &[&str]) -> anyhow::Result<PluginResult> {
175 let index_str = rest.first().copied().unwrap_or("");
176 let index: usize = match index_str.parse::<usize>() {
177 Ok(n) if n >= 1 => n,
178 _ => {
179 return Ok(PluginResult::Reply(
180 "queue remove: provide a valid 1-based index".to_owned(),
181 ));
182 }
183 };
184
185 let removed = {
186 let mut queue = self.queue.lock().await;
187 if index > queue.len() {
188 return Ok(PluginResult::Reply(format!(
189 "queue remove: index {index} out of range (queue has {} item(s))",
190 queue.len()
191 )));
192 }
193 queue.remove(index - 1)
194 };
195
196 self.rewrite_queue().await?;
197
198 Ok(PluginResult::Broadcast(format!(
199 "queue: removed \"{}\" (was #{index})",
200 removed.description
201 )))
202 }
203
204 async fn handle_pop(&self, sender: &str) -> anyhow::Result<PluginResult> {
205 let popped = {
206 let mut queue = self.queue.lock().await;
207 if queue.is_empty() {
208 return Ok(PluginResult::Reply(
209 "queue pop: backlog is empty, nothing to claim".to_owned(),
210 ));
211 }
212 queue.remove(0)
213 };
214
215 self.rewrite_queue().await?;
216
217 {
219 let mut claims = self.claim_map.lock().await;
220 claims.insert(
221 sender.to_owned(),
222 ClaimEntry {
223 task: popped.description.clone(),
224 claimed_at: std::time::Instant::now(),
225 },
226 );
227 }
228
229 Ok(PluginResult::Broadcast(format!(
230 "{sender} claimed from queue: \"{}\"",
231 popped.description
232 )))
233 }
234
235 async fn rewrite_queue(&self) -> anyhow::Result<()> {
237 let queue = self.queue.lock().await;
238 rewrite_queue_file(&self.queue_path, &queue)
239 }
240}
241
242fn load_queue(path: &Path) -> anyhow::Result<Vec<QueueItem>> {
247 if !path.exists() {
248 return Ok(Vec::new());
249 }
250
251 let file = std::fs::File::open(path)?;
252 let reader = std::io::BufReader::new(file);
253 let mut items = Vec::new();
254
255 for line in reader.lines() {
256 let line = line?;
257 let trimmed = line.trim();
258 if trimmed.is_empty() {
259 continue;
260 }
261 match serde_json::from_str::<QueueItem>(trimmed) {
262 Ok(item) => items.push(item),
263 Err(e) => {
264 eprintln!("queue: skipping malformed line in {}: {e}", path.display());
265 }
266 }
267 }
268
269 Ok(items)
270}
271
272fn append_item(path: &Path, item: &QueueItem) -> anyhow::Result<()> {
274 let mut file = std::fs::OpenOptions::new()
275 .create(true)
276 .append(true)
277 .open(path)?;
278 let line = serde_json::to_string(item)?;
279 writeln!(file, "{line}")?;
280 Ok(())
281}
282
283fn rewrite_queue_file(path: &Path, items: &[QueueItem]) -> anyhow::Result<()> {
285 let mut file = std::fs::File::create(path)?;
286 for item in items {
287 let line = serde_json::to_string(item)?;
288 writeln!(file, "{line}")?;
289 }
290 Ok(())
291}
292
293#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::plugin::{ChatWriter, HistoryReader, RoomMetadata, UserInfo};
299 use std::collections::HashMap;
300 use std::sync::atomic::AtomicU64;
301 use tempfile::TempDir;
302 use tokio::sync::broadcast;
303
304 fn make_test_plugin(dir: &TempDir) -> QueuePlugin {
306 let queue_path = dir.path().join("test-room.queue");
307 let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
308 QueuePlugin::new(queue_path, claim_map).unwrap()
309 }
310
311 fn make_test_ctx(
313 command: &str,
314 params: Vec<&str>,
315 sender: &str,
316 chat_path: &Path,
317 clients: &Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
318 seq: &Arc<AtomicU64>,
319 ) -> CommandContext {
320 let chat_arc = Arc::new(chat_path.to_path_buf());
321 let room_id = Arc::new("test-room".to_owned());
322
323 CommandContext {
324 command: command.to_owned(),
325 params: params.into_iter().map(|s| s.to_owned()).collect(),
326 sender: sender.to_owned(),
327 room_id: "test-room".to_owned(),
328 message_id: Uuid::new_v4().to_string(),
329 timestamp: Utc::now(),
330 history: HistoryReader::new(chat_path, sender),
331 writer: ChatWriter::new(clients, &chat_arc, &room_id, seq, "queue"),
332 metadata: RoomMetadata {
333 online_users: vec![UserInfo {
334 username: sender.to_owned(),
335 status: String::new(),
336 }],
337 host: None,
338 message_count: 0,
339 },
340 available_commands: vec![],
341 }
342 }
343
344 fn make_test_clients() -> (
346 Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
347 broadcast::Receiver<String>,
348 Arc<AtomicU64>,
349 ) {
350 let (tx, rx) = broadcast::channel::<String>(64);
351 let mut map = HashMap::new();
352 map.insert(1u64, ("alice".to_owned(), tx));
353 let clients = Arc::new(Mutex::new(map));
354 let seq = Arc::new(AtomicU64::new(0));
355 (clients, rx, seq)
356 }
357
358 #[test]
361 fn load_queue_nonexistent_file_returns_empty() {
362 let items = load_queue(Path::new("/nonexistent/path.queue")).unwrap();
363 assert!(items.is_empty());
364 }
365
366 #[test]
367 fn append_and_load_round_trips() {
368 let dir = TempDir::new().unwrap();
369 let path = dir.path().join("test.queue");
370
371 let item = QueueItem {
372 id: "id-1".to_owned(),
373 description: "fix the bug".to_owned(),
374 added_by: "alice".to_owned(),
375 added_at: Utc::now(),
376 status: "queued".to_owned(),
377 };
378
379 append_item(&path, &item).unwrap();
380 let loaded = load_queue(&path).unwrap();
381 assert_eq!(loaded.len(), 1);
382 assert_eq!(loaded[0].id, "id-1");
383 assert_eq!(loaded[0].description, "fix the bug");
384 assert_eq!(loaded[0].added_by, "alice");
385 assert_eq!(loaded[0].status, "queued");
386 }
387
388 #[test]
389 fn rewrite_replaces_file_contents() {
390 let dir = TempDir::new().unwrap();
391 let path = dir.path().join("test.queue");
392
393 let items: Vec<QueueItem> = (0..3)
395 .map(|i| QueueItem {
396 id: format!("id-{i}"),
397 description: format!("task {i}"),
398 added_by: "bob".to_owned(),
399 added_at: Utc::now(),
400 status: "queued".to_owned(),
401 })
402 .collect();
403
404 for item in &items {
405 append_item(&path, item).unwrap();
406 }
407 assert_eq!(load_queue(&path).unwrap().len(), 3);
408
409 rewrite_queue_file(&path, &items[1..2]).unwrap();
411 let reloaded = load_queue(&path).unwrap();
412 assert_eq!(reloaded.len(), 1);
413 assert_eq!(reloaded[0].id, "id-1");
414 }
415
416 #[test]
417 fn load_skips_malformed_lines() {
418 let dir = TempDir::new().unwrap();
419 let path = dir.path().join("test.queue");
420
421 let good = QueueItem {
422 id: "good".to_owned(),
423 description: "valid".to_owned(),
424 added_by: "alice".to_owned(),
425 added_at: Utc::now(),
426 status: "queued".to_owned(),
427 };
428
429 let mut file = std::fs::File::create(&path).unwrap();
430 writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
431 writeln!(file, "not valid json").unwrap();
432 writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
433 writeln!(file).unwrap(); let loaded = load_queue(&path).unwrap();
436 assert_eq!(loaded.len(), 2);
437 }
438
439 #[test]
440 fn queue_path_from_chat_replaces_extension() {
441 let chat = PathBuf::from("/data/room-dev.chat");
442 let queue = QueuePlugin::queue_path_from_chat(&chat);
443 assert_eq!(queue, PathBuf::from("/data/room-dev.queue"));
444 }
445
446 #[test]
449 fn new_plugin_loads_existing_queue() {
450 let dir = TempDir::new().unwrap();
451 let path = dir.path().join("test.queue");
452
453 let item = QueueItem {
454 id: "pre-existing".to_owned(),
455 description: "already there".to_owned(),
456 added_by: "ba".to_owned(),
457 added_at: Utc::now(),
458 status: "queued".to_owned(),
459 };
460 append_item(&path, &item).unwrap();
461
462 let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
463 let plugin = QueuePlugin::new(path, claim_map).unwrap();
464
465 let rt = tokio::runtime::Runtime::new().unwrap();
466 rt.block_on(async {
467 let queue = plugin.queue.lock().await;
468 assert_eq!(queue.len(), 1);
469 assert_eq!(queue[0].description, "already there");
470 });
471 }
472
473 #[tokio::test]
476 async fn add_appends_to_queue_and_persists() {
477 let dir = TempDir::new().unwrap();
478 let plugin = make_test_plugin(&dir);
479 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
480 let (clients, _rx, seq) = make_test_clients();
481 let ctx = make_test_ctx(
482 "queue",
483 vec!["add", "fix", "the", "bug"],
484 "alice",
485 chat_tmp.path(),
486 &clients,
487 &seq,
488 );
489
490 let result = plugin.handle(ctx).await.unwrap();
491 match &result {
492 PluginResult::Broadcast(msg) => {
493 assert!(msg.contains("fix the bug"), "broadcast should mention task");
494 assert!(msg.contains("alice"), "broadcast should mention sender");
495 assert!(msg.contains("#1"), "broadcast should show queue position");
496 }
497 _ => panic!("expected Broadcast"),
498 }
499
500 let queue = plugin.queue.lock().await;
502 assert_eq!(queue.len(), 1);
503 assert_eq!(queue[0].description, "fix the bug");
504 assert_eq!(queue[0].added_by, "alice");
505 drop(queue);
506
507 let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
509 assert_eq!(persisted.len(), 1);
510 assert_eq!(persisted[0].description, "fix the bug");
511 }
512
513 #[tokio::test]
514 async fn add_without_description_returns_error() {
515 let dir = TempDir::new().unwrap();
516 let plugin = make_test_plugin(&dir);
517 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
518 let (clients, _rx, seq) = make_test_clients();
519 let ctx = make_test_ctx(
520 "queue",
521 vec!["add"],
522 "alice",
523 chat_tmp.path(),
524 &clients,
525 &seq,
526 );
527
528 let result = plugin.handle(ctx).await.unwrap();
529 match result {
530 PluginResult::Reply(msg) => assert!(msg.contains("missing task description")),
531 _ => panic!("expected Reply"),
532 }
533 }
534
535 #[tokio::test]
536 async fn list_empty_queue() {
537 let dir = TempDir::new().unwrap();
538 let plugin = make_test_plugin(&dir);
539 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
540 let (clients, _rx, seq) = make_test_clients();
541 let ctx = make_test_ctx(
542 "queue",
543 vec!["list"],
544 "alice",
545 chat_tmp.path(),
546 &clients,
547 &seq,
548 );
549
550 let result = plugin.handle(ctx).await.unwrap();
551 match result {
552 PluginResult::Reply(msg) => assert!(msg.contains("empty")),
553 _ => panic!("expected Reply"),
554 }
555 }
556
557 #[tokio::test]
558 async fn list_shows_indexed_items() {
559 let dir = TempDir::new().unwrap();
560 let plugin = make_test_plugin(&dir);
561 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
562 let (clients, _rx, seq) = make_test_clients();
563
564 for desc in ["first task", "second task"] {
566 let words: Vec<&str> = std::iter::once("add")
567 .chain(desc.split_whitespace())
568 .collect();
569 let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
570 plugin.handle(ctx).await.unwrap();
571 }
572
573 let ctx = make_test_ctx(
574 "queue",
575 vec!["list"],
576 "alice",
577 chat_tmp.path(),
578 &clients,
579 &seq,
580 );
581 let result = plugin.handle(ctx).await.unwrap();
582 match result {
583 PluginResult::Reply(msg) => {
584 assert!(msg.contains("2 item(s)"));
585 assert!(msg.contains("1. first task"));
586 assert!(msg.contains("2. second task"));
587 }
588 _ => panic!("expected Reply"),
589 }
590 }
591
592 #[tokio::test]
593 async fn remove_by_index() {
594 let dir = TempDir::new().unwrap();
595 let plugin = make_test_plugin(&dir);
596 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
597 let (clients, _rx, seq) = make_test_clients();
598
599 for desc in ["first", "second"] {
601 let ctx = make_test_ctx(
602 "queue",
603 vec!["add", desc],
604 "alice",
605 chat_tmp.path(),
606 &clients,
607 &seq,
608 );
609 plugin.handle(ctx).await.unwrap();
610 }
611
612 let ctx = make_test_ctx(
614 "queue",
615 vec!["remove", "1"],
616 "alice",
617 chat_tmp.path(),
618 &clients,
619 &seq,
620 );
621 let result = plugin.handle(ctx).await.unwrap();
622 match &result {
623 PluginResult::Broadcast(msg) => {
624 assert!(
625 msg.contains("first"),
626 "broadcast should mention removed item"
627 );
628 assert!(msg.contains("#1"), "broadcast should mention index");
629 }
630 _ => panic!("expected Broadcast"),
631 }
632
633 let queue = plugin.queue.lock().await;
635 assert_eq!(queue.len(), 1);
636 assert_eq!(queue[0].description, "second");
637 drop(queue);
638
639 let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
641 assert_eq!(persisted.len(), 1);
642 assert_eq!(persisted[0].description, "second");
643 }
644
645 #[tokio::test]
646 async fn remove_out_of_range() {
647 let dir = TempDir::new().unwrap();
648 let plugin = make_test_plugin(&dir);
649 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
650 let (clients, _rx, seq) = make_test_clients();
651
652 let ctx = make_test_ctx(
653 "queue",
654 vec!["remove", "5"],
655 "alice",
656 chat_tmp.path(),
657 &clients,
658 &seq,
659 );
660 let result = plugin.handle(ctx).await.unwrap();
661 match result {
662 PluginResult::Reply(msg) => assert!(msg.contains("out of range")),
663 _ => panic!("expected Reply"),
664 }
665 }
666
667 #[tokio::test]
668 async fn remove_invalid_index() {
669 let dir = TempDir::new().unwrap();
670 let plugin = make_test_plugin(&dir);
671 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
672 let (clients, _rx, seq) = make_test_clients();
673
674 let ctx = make_test_ctx(
675 "queue",
676 vec!["remove", "abc"],
677 "alice",
678 chat_tmp.path(),
679 &clients,
680 &seq,
681 );
682 let result = plugin.handle(ctx).await.unwrap();
683 match result {
684 PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
685 _ => panic!("expected Reply"),
686 }
687 }
688
689 #[tokio::test]
690 async fn remove_zero_index() {
691 let dir = TempDir::new().unwrap();
692 let plugin = make_test_plugin(&dir);
693 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
694 let (clients, _rx, seq) = make_test_clients();
695
696 let ctx = make_test_ctx(
697 "queue",
698 vec!["remove", "0"],
699 "alice",
700 chat_tmp.path(),
701 &clients,
702 &seq,
703 );
704 let result = plugin.handle(ctx).await.unwrap();
705 match result {
706 PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
707 _ => panic!("expected Reply"),
708 }
709 }
710
711 #[tokio::test]
712 async fn pop_claims_first_item() {
713 let dir = TempDir::new().unwrap();
714 let queue_path = dir.path().join("test-room.queue");
715 let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
716 let plugin = QueuePlugin::new(queue_path, claim_map.clone()).unwrap();
717
718 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
719 let (clients, _rx, seq) = make_test_clients();
720
721 for desc in ["urgent fix", "nice to have"] {
723 let words: Vec<&str> = std::iter::once("add")
724 .chain(desc.split_whitespace())
725 .collect();
726 let ctx = make_test_ctx("queue", words, "ba", chat_tmp.path(), &clients, &seq);
727 plugin.handle(ctx).await.unwrap();
728 }
729
730 let ctx = make_test_ctx(
732 "queue",
733 vec!["pop"],
734 "alice",
735 chat_tmp.path(),
736 &clients,
737 &seq,
738 );
739 let result = plugin.handle(ctx).await.unwrap();
740 match &result {
741 PluginResult::Broadcast(msg) => {
742 assert!(msg.contains("alice"), "broadcast should mention claimer");
743 assert!(msg.contains("urgent fix"), "broadcast should mention task");
744 }
745 _ => panic!("expected Broadcast"),
746 }
747
748 let claims = claim_map.lock().await;
750 assert_eq!(claims.get("alice").unwrap().task, "urgent fix");
751 drop(claims);
752
753 let queue = plugin.queue.lock().await;
755 assert_eq!(queue.len(), 1);
756 assert_eq!(queue[0].description, "nice to have");
757 drop(queue);
758
759 let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
761 assert_eq!(persisted.len(), 1);
762 }
763
764 #[tokio::test]
765 async fn pop_empty_queue_returns_error() {
766 let dir = TempDir::new().unwrap();
767 let plugin = make_test_plugin(&dir);
768 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
769 let (clients, _rx, seq) = make_test_clients();
770
771 let ctx = make_test_ctx(
772 "queue",
773 vec!["pop"],
774 "alice",
775 chat_tmp.path(),
776 &clients,
777 &seq,
778 );
779 let result = plugin.handle(ctx).await.unwrap();
780 match result {
781 PluginResult::Reply(msg) => assert!(msg.contains("empty")),
782 _ => panic!("expected Reply"),
783 }
784 }
785
786 #[tokio::test]
787 async fn pop_replaces_existing_claim() {
788 let dir = TempDir::new().unwrap();
789 let queue_path = dir.path().join("test-room.queue");
790 let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
791
792 {
794 let mut claims = claim_map.lock().await;
795 claims.insert(
796 "alice".to_owned(),
797 ClaimEntry {
798 task: "old task".to_owned(),
799 claimed_at: std::time::Instant::now(),
800 },
801 );
802 }
803
804 let plugin = QueuePlugin::new(queue_path, claim_map.clone()).unwrap();
805 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
806 let (clients, _rx, seq) = make_test_clients();
807
808 let ctx = make_test_ctx(
810 "queue",
811 vec!["add", "new", "task"],
812 "ba",
813 chat_tmp.path(),
814 &clients,
815 &seq,
816 );
817 plugin.handle(ctx).await.unwrap();
818
819 let ctx = make_test_ctx(
820 "queue",
821 vec!["pop"],
822 "alice",
823 chat_tmp.path(),
824 &clients,
825 &seq,
826 );
827 plugin.handle(ctx).await.unwrap();
828
829 let claims = claim_map.lock().await;
831 assert_eq!(claims.get("alice").unwrap().task, "new task");
832 }
833
834 #[tokio::test]
835 async fn unknown_action_returns_error() {
836 let dir = TempDir::new().unwrap();
837 let plugin = make_test_plugin(&dir);
838 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
839 let (clients, _rx, seq) = make_test_clients();
840
841 let ctx = make_test_ctx(
842 "queue",
843 vec!["foobar"],
844 "alice",
845 chat_tmp.path(),
846 &clients,
847 &seq,
848 );
849 let result = plugin.handle(ctx).await.unwrap();
850 match result {
851 PluginResult::Reply(msg) => assert!(msg.contains("unknown action")),
852 _ => panic!("expected Reply"),
853 }
854 }
855
856 #[tokio::test]
857 async fn queue_survives_reload() {
858 let dir = TempDir::new().unwrap();
859 let queue_path = dir.path().join("test-room.queue");
860 let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
861
862 {
864 let plugin = QueuePlugin::new(queue_path.clone(), claim_map.clone()).unwrap();
865 let chat_tmp = tempfile::NamedTempFile::new().unwrap();
866 let (clients, _rx, seq) = make_test_clients();
867
868 for desc in ["task a", "task b", "task c"] {
869 let words: Vec<&str> = std::iter::once("add")
870 .chain(desc.split_whitespace())
871 .collect();
872 let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
873 plugin.handle(ctx).await.unwrap();
874 }
875 }
876
877 let plugin2 = QueuePlugin::new(queue_path, claim_map).unwrap();
879 let queue = plugin2.queue.lock().await;
880 assert_eq!(queue.len(), 3);
881 assert_eq!(queue[0].description, "task a");
882 assert_eq!(queue[1].description, "task b");
883 assert_eq!(queue[2].description, "task c");
884 }
885
886 #[test]
889 fn plugin_name_is_queue() {
890 let dir = TempDir::new().unwrap();
891 let plugin = make_test_plugin(&dir);
892 assert_eq!(plugin.name(), "queue");
893 }
894
895 #[test]
896 fn plugin_registers_queue_command() {
897 let dir = TempDir::new().unwrap();
898 let plugin = make_test_plugin(&dir);
899 let cmds = plugin.commands();
900 assert_eq!(cmds.len(), 1);
901 assert_eq!(cmds[0].name, "queue");
902 assert_eq!(cmds[0].params.len(), 2);
903 assert!(cmds[0].params[0].required);
904 assert!(!cmds[0].params[1].required);
905 }
906}