Skip to main content

room_cli/plugin/
queue.rs

1use std::{
2    io::{BufRead, Write},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10use uuid::Uuid;
11
12use super::{BoxFuture, CommandContext, CommandInfo, ParamSchema, ParamType, Plugin, PluginResult};
13
14/// A single item in the task queue backlog.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct QueueItem {
17    /// Unique identifier for this queue entry.
18    pub id: String,
19    /// Human-readable task description.
20    pub description: String,
21    /// Username of the agent who added this item.
22    pub added_by: String,
23    /// Timestamp when the item was added.
24    pub added_at: DateTime<Utc>,
25    /// Current status — always `"queued"` while in the backlog.
26    pub status: String,
27}
28
29/// Plugin that manages a persistent task backlog.
30///
31/// Agents can add tasks to the queue and self-assign from it using `/queue pop`.
32///
33/// Queue state is persisted to an NDJSON file alongside the room's `.chat` file.
34pub struct QueuePlugin {
35    /// In-memory queue items, protected by a mutex for concurrent access.
36    queue: Arc<Mutex<Vec<QueueItem>>>,
37    /// Path to the NDJSON persistence file (`<room-data-dir>/<room-id>.queue`).
38    queue_path: PathBuf,
39}
40
41impl QueuePlugin {
42    /// Create a new `QueuePlugin`, loading any existing queue from disk.
43    ///
44    /// # Arguments
45    /// * `queue_path` — path to the `.queue` NDJSON file
46    pub(crate) fn new(queue_path: PathBuf) -> anyhow::Result<Self> {
47        let items = load_queue(&queue_path)?;
48        Ok(Self {
49            queue: Arc::new(Mutex::new(items)),
50            queue_path,
51        })
52    }
53
54    /// Derive the `.queue` file path from a `.chat` file path.
55    pub fn queue_path_from_chat(chat_path: &Path) -> PathBuf {
56        chat_path.with_extension("queue")
57    }
58
59    /// Returns the command info for the TUI command palette without needing
60    /// an instantiated plugin. Used by `all_known_commands()`.
61    pub fn default_commands() -> Vec<CommandInfo> {
62        vec![CommandInfo {
63            name: "queue".to_owned(),
64            description: "Manage the task backlog".to_owned(),
65            usage: "/queue <add|list|remove|pop> [args]".to_owned(),
66            params: vec![
67                ParamSchema {
68                    name: "action".to_owned(),
69                    param_type: ParamType::Choice(vec![
70                        "add".to_owned(),
71                        "list".to_owned(),
72                        "remove".to_owned(),
73                        "pop".to_owned(),
74                    ]),
75                    required: true,
76                    description: "Action to perform".to_owned(),
77                },
78                ParamSchema {
79                    name: "args".to_owned(),
80                    param_type: ParamType::Text,
81                    required: false,
82                    description: "Task description (add) or index (remove)".to_owned(),
83                },
84            ],
85        }]
86    }
87}
88
89impl Plugin for QueuePlugin {
90    fn name(&self) -> &str {
91        "queue"
92    }
93
94    fn commands(&self) -> Vec<CommandInfo> {
95        Self::default_commands()
96    }
97
98    fn handle(&self, ctx: CommandContext) -> BoxFuture<'_, anyhow::Result<PluginResult>> {
99        Box::pin(async move {
100            let action = ctx.params.first().map(String::as_str).unwrap_or("");
101            let rest: Vec<&str> = ctx.params.iter().skip(1).map(String::as_str).collect();
102
103            match action {
104                "add" => self.handle_add(&ctx.sender, &rest).await,
105                "list" => self.handle_list(&ctx).await,
106                "remove" => self.handle_remove(&rest).await,
107                "pop" => self.handle_pop(&ctx.sender).await,
108                _ => Ok(PluginResult::Reply(format!(
109                    "queue: unknown action '{action}'. use add, list, remove, or pop"
110                ))),
111            }
112        })
113    }
114}
115
116impl QueuePlugin {
117    async fn handle_add(&self, sender: &str, rest: &[&str]) -> anyhow::Result<PluginResult> {
118        if rest.is_empty() {
119            return Ok(PluginResult::Reply(
120                "queue add: missing task description".to_owned(),
121            ));
122        }
123
124        let description = rest.join(" ");
125        let item = QueueItem {
126            id: Uuid::new_v4().to_string(),
127            description: description.clone(),
128            added_by: sender.to_owned(),
129            added_at: Utc::now(),
130            status: "queued".to_owned(),
131        };
132
133        {
134            let mut queue = self.queue.lock().await;
135            queue.push(item.clone());
136        }
137
138        append_item(&self.queue_path, &item)?;
139        let count = self.queue.lock().await.len();
140
141        Ok(PluginResult::Broadcast(format!(
142            "queue: {sender} added \"{description}\" (#{count} in backlog)"
143        )))
144    }
145
146    async fn handle_list(&self, _ctx: &CommandContext) -> anyhow::Result<PluginResult> {
147        let queue = self.queue.lock().await;
148
149        if queue.is_empty() {
150            return Ok(PluginResult::Reply("queue: backlog is empty".to_owned()));
151        }
152
153        let mut lines = Vec::with_capacity(queue.len() + 1);
154        lines.push(format!("queue: {} item(s) in backlog", queue.len()));
155        for (i, item) in queue.iter().enumerate() {
156            lines.push(format!(
157                "  {}. {} (added by {} at {})",
158                i + 1,
159                item.description,
160                item.added_by,
161                item.added_at.format("%Y-%m-%d %H:%M UTC")
162            ));
163        }
164
165        Ok(PluginResult::Reply(lines.join("\n")))
166    }
167
168    async fn handle_remove(&self, rest: &[&str]) -> anyhow::Result<PluginResult> {
169        let index_str = rest.first().copied().unwrap_or("");
170        let index: usize = match index_str.parse::<usize>() {
171            Ok(n) if n >= 1 => n,
172            _ => {
173                return Ok(PluginResult::Reply(
174                    "queue remove: provide a valid 1-based index".to_owned(),
175                ));
176            }
177        };
178
179        let removed = {
180            let mut queue = self.queue.lock().await;
181            if index > queue.len() {
182                return Ok(PluginResult::Reply(format!(
183                    "queue remove: index {index} out of range (queue has {} item(s))",
184                    queue.len()
185                )));
186            }
187            queue.remove(index - 1)
188        };
189
190        self.rewrite_queue().await?;
191
192        Ok(PluginResult::Broadcast(format!(
193            "queue: removed \"{}\" (was #{index})",
194            removed.description
195        )))
196    }
197
198    async fn handle_pop(&self, sender: &str) -> anyhow::Result<PluginResult> {
199        let popped = {
200            let mut queue = self.queue.lock().await;
201            if queue.is_empty() {
202                return Ok(PluginResult::Reply(
203                    "queue pop: backlog is empty, nothing to pop".to_owned(),
204                ));
205            }
206            queue.remove(0)
207        };
208
209        self.rewrite_queue().await?;
210
211        Ok(PluginResult::Broadcast(format!(
212            "{sender} popped from queue: \"{}\"",
213            popped.description
214        )))
215    }
216
217    /// Rewrite the entire queue file from the in-memory state.
218    async fn rewrite_queue(&self) -> anyhow::Result<()> {
219        let queue = self.queue.lock().await;
220        rewrite_queue_file(&self.queue_path, &queue)
221    }
222}
223
224// ── Persistence helpers ─────────────────────────────────────────────────────
225
226/// Load queue items from an NDJSON file. Returns an empty vec if the file
227/// does not exist.
228fn load_queue(path: &Path) -> anyhow::Result<Vec<QueueItem>> {
229    if !path.exists() {
230        return Ok(Vec::new());
231    }
232
233    let file = std::fs::File::open(path)?;
234    let reader = std::io::BufReader::new(file);
235    let mut items = Vec::new();
236
237    for line in reader.lines() {
238        let line = line?;
239        let trimmed = line.trim();
240        if trimmed.is_empty() {
241            continue;
242        }
243        match serde_json::from_str::<QueueItem>(trimmed) {
244            Ok(item) => items.push(item),
245            Err(e) => {
246                eprintln!("queue: skipping malformed line in {}: {e}", path.display());
247            }
248        }
249    }
250
251    Ok(items)
252}
253
254/// Append a single item to the NDJSON queue file.
255fn append_item(path: &Path, item: &QueueItem) -> anyhow::Result<()> {
256    let mut file = std::fs::OpenOptions::new()
257        .create(true)
258        .append(true)
259        .open(path)?;
260    let line = serde_json::to_string(item)?;
261    writeln!(file, "{line}")?;
262    Ok(())
263}
264
265/// Rewrite the entire queue file from a slice of items.
266fn rewrite_queue_file(path: &Path, items: &[QueueItem]) -> anyhow::Result<()> {
267    let mut file = std::fs::File::create(path)?;
268    for item in items {
269        let line = serde_json::to_string(item)?;
270        writeln!(file, "{line}")?;
271    }
272    Ok(())
273}
274
275// ── Tests ───────────────────────────────────────────────────────────────────
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use crate::plugin::{ChatWriter, HistoryReader, RoomMetadata, UserInfo};
281    use std::collections::HashMap;
282    use std::sync::atomic::AtomicU64;
283    use tempfile::TempDir;
284    use tokio::sync::broadcast;
285
286    /// Helper: create a QueuePlugin with a temp directory.
287    fn make_test_plugin(dir: &TempDir) -> QueuePlugin {
288        let queue_path = dir.path().join("test-room.queue");
289        QueuePlugin::new(queue_path).unwrap()
290    }
291
292    /// Helper: create a CommandContext wired to a test plugin.
293    fn make_test_ctx(
294        command: &str,
295        params: Vec<&str>,
296        sender: &str,
297        chat_path: &Path,
298        clients: &Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
299        seq: &Arc<AtomicU64>,
300    ) -> CommandContext {
301        let chat_arc = Arc::new(chat_path.to_path_buf());
302        let room_id = Arc::new("test-room".to_owned());
303
304        CommandContext {
305            command: command.to_owned(),
306            params: params.into_iter().map(|s| s.to_owned()).collect(),
307            sender: sender.to_owned(),
308            room_id: "test-room".to_owned(),
309            message_id: Uuid::new_v4().to_string(),
310            timestamp: Utc::now(),
311            history: HistoryReader::new(chat_path, sender),
312            writer: ChatWriter::new(clients, &chat_arc, &room_id, seq, "queue"),
313            metadata: RoomMetadata {
314                online_users: vec![UserInfo {
315                    username: sender.to_owned(),
316                    status: String::new(),
317                }],
318                host: None,
319                message_count: 0,
320            },
321            available_commands: vec![],
322        }
323    }
324
325    /// Helper: set up broadcast channel and client map for tests.
326    fn make_test_clients() -> (
327        Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
328        broadcast::Receiver<String>,
329        Arc<AtomicU64>,
330    ) {
331        let (tx, rx) = broadcast::channel::<String>(64);
332        let mut map = HashMap::new();
333        map.insert(1u64, ("alice".to_owned(), tx));
334        let clients = Arc::new(Mutex::new(map));
335        let seq = Arc::new(AtomicU64::new(0));
336        (clients, rx, seq)
337    }
338
339    // ── load/save persistence tests ─────────────────────────────────────────
340
341    #[test]
342    fn load_queue_nonexistent_file_returns_empty() {
343        let items = load_queue(Path::new("/nonexistent/path.queue")).unwrap();
344        assert!(items.is_empty());
345    }
346
347    #[test]
348    fn append_and_load_round_trips() {
349        let dir = TempDir::new().unwrap();
350        let path = dir.path().join("test.queue");
351
352        let item = QueueItem {
353            id: "id-1".to_owned(),
354            description: "fix the bug".to_owned(),
355            added_by: "alice".to_owned(),
356            added_at: Utc::now(),
357            status: "queued".to_owned(),
358        };
359
360        append_item(&path, &item).unwrap();
361        let loaded = load_queue(&path).unwrap();
362        assert_eq!(loaded.len(), 1);
363        assert_eq!(loaded[0].id, "id-1");
364        assert_eq!(loaded[0].description, "fix the bug");
365        assert_eq!(loaded[0].added_by, "alice");
366        assert_eq!(loaded[0].status, "queued");
367    }
368
369    #[test]
370    fn rewrite_replaces_file_contents() {
371        let dir = TempDir::new().unwrap();
372        let path = dir.path().join("test.queue");
373
374        // Write 3 items
375        let items: Vec<QueueItem> = (0..3)
376            .map(|i| QueueItem {
377                id: format!("id-{i}"),
378                description: format!("task {i}"),
379                added_by: "bob".to_owned(),
380                added_at: Utc::now(),
381                status: "queued".to_owned(),
382            })
383            .collect();
384
385        for item in &items {
386            append_item(&path, item).unwrap();
387        }
388        assert_eq!(load_queue(&path).unwrap().len(), 3);
389
390        // Rewrite with only 1 item
391        rewrite_queue_file(&path, &items[1..2]).unwrap();
392        let reloaded = load_queue(&path).unwrap();
393        assert_eq!(reloaded.len(), 1);
394        assert_eq!(reloaded[0].id, "id-1");
395    }
396
397    #[test]
398    fn load_skips_malformed_lines() {
399        let dir = TempDir::new().unwrap();
400        let path = dir.path().join("test.queue");
401
402        let good = QueueItem {
403            id: "good".to_owned(),
404            description: "valid".to_owned(),
405            added_by: "alice".to_owned(),
406            added_at: Utc::now(),
407            status: "queued".to_owned(),
408        };
409
410        let mut file = std::fs::File::create(&path).unwrap();
411        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
412        writeln!(file, "not valid json").unwrap();
413        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
414        writeln!(file).unwrap(); // blank line
415
416        let loaded = load_queue(&path).unwrap();
417        assert_eq!(loaded.len(), 2);
418    }
419
420    #[test]
421    fn queue_path_from_chat_replaces_extension() {
422        let chat = PathBuf::from("/data/room-dev.chat");
423        let queue = QueuePlugin::queue_path_from_chat(&chat);
424        assert_eq!(queue, PathBuf::from("/data/room-dev.queue"));
425    }
426
427    // ── plugin construction tests ───────────────────────────────────────────
428
429    #[test]
430    fn new_plugin_loads_existing_queue() {
431        let dir = TempDir::new().unwrap();
432        let path = dir.path().join("test.queue");
433
434        let item = QueueItem {
435            id: "pre-existing".to_owned(),
436            description: "already there".to_owned(),
437            added_by: "ba".to_owned(),
438            added_at: Utc::now(),
439            status: "queued".to_owned(),
440        };
441        append_item(&path, &item).unwrap();
442
443        let plugin = QueuePlugin::new(path).unwrap();
444
445        let rt = tokio::runtime::Runtime::new().unwrap();
446        rt.block_on(async {
447            let queue = plugin.queue.lock().await;
448            assert_eq!(queue.len(), 1);
449            assert_eq!(queue[0].description, "already there");
450        });
451    }
452
453    // ── handle tests ────────────────────────────────────────────────────────
454
455    #[tokio::test]
456    async fn add_appends_to_queue_and_persists() {
457        let dir = TempDir::new().unwrap();
458        let plugin = make_test_plugin(&dir);
459        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
460        let (clients, _rx, seq) = make_test_clients();
461        let ctx = make_test_ctx(
462            "queue",
463            vec!["add", "fix", "the", "bug"],
464            "alice",
465            chat_tmp.path(),
466            &clients,
467            &seq,
468        );
469
470        let result = plugin.handle(ctx).await.unwrap();
471        match &result {
472            PluginResult::Broadcast(msg) => {
473                assert!(msg.contains("fix the bug"), "broadcast should mention task");
474                assert!(msg.contains("alice"), "broadcast should mention sender");
475                assert!(msg.contains("#1"), "broadcast should show queue position");
476            }
477            _ => panic!("expected Broadcast"),
478        }
479
480        // In-memory
481        let queue = plugin.queue.lock().await;
482        assert_eq!(queue.len(), 1);
483        assert_eq!(queue[0].description, "fix the bug");
484        assert_eq!(queue[0].added_by, "alice");
485        drop(queue);
486
487        // On disk
488        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
489        assert_eq!(persisted.len(), 1);
490        assert_eq!(persisted[0].description, "fix the bug");
491    }
492
493    #[tokio::test]
494    async fn add_without_description_returns_error() {
495        let dir = TempDir::new().unwrap();
496        let plugin = make_test_plugin(&dir);
497        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
498        let (clients, _rx, seq) = make_test_clients();
499        let ctx = make_test_ctx(
500            "queue",
501            vec!["add"],
502            "alice",
503            chat_tmp.path(),
504            &clients,
505            &seq,
506        );
507
508        let result = plugin.handle(ctx).await.unwrap();
509        match result {
510            PluginResult::Reply(msg) => assert!(msg.contains("missing task description")),
511            _ => panic!("expected Reply"),
512        }
513    }
514
515    #[tokio::test]
516    async fn list_empty_queue() {
517        let dir = TempDir::new().unwrap();
518        let plugin = make_test_plugin(&dir);
519        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
520        let (clients, _rx, seq) = make_test_clients();
521        let ctx = make_test_ctx(
522            "queue",
523            vec!["list"],
524            "alice",
525            chat_tmp.path(),
526            &clients,
527            &seq,
528        );
529
530        let result = plugin.handle(ctx).await.unwrap();
531        match result {
532            PluginResult::Reply(msg) => assert!(msg.contains("empty")),
533            _ => panic!("expected Reply"),
534        }
535    }
536
537    #[tokio::test]
538    async fn list_shows_indexed_items() {
539        let dir = TempDir::new().unwrap();
540        let plugin = make_test_plugin(&dir);
541        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
542        let (clients, _rx, seq) = make_test_clients();
543
544        // Add two items
545        for desc in ["first task", "second task"] {
546            let words: Vec<&str> = std::iter::once("add")
547                .chain(desc.split_whitespace())
548                .collect();
549            let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
550            plugin.handle(ctx).await.unwrap();
551        }
552
553        let ctx = make_test_ctx(
554            "queue",
555            vec!["list"],
556            "alice",
557            chat_tmp.path(),
558            &clients,
559            &seq,
560        );
561        let result = plugin.handle(ctx).await.unwrap();
562        match result {
563            PluginResult::Reply(msg) => {
564                assert!(msg.contains("2 item(s)"));
565                assert!(msg.contains("1. first task"));
566                assert!(msg.contains("2. second task"));
567            }
568            _ => panic!("expected Reply"),
569        }
570    }
571
572    #[tokio::test]
573    async fn remove_by_index() {
574        let dir = TempDir::new().unwrap();
575        let plugin = make_test_plugin(&dir);
576        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
577        let (clients, _rx, seq) = make_test_clients();
578
579        // Add two items
580        for desc in ["first", "second"] {
581            let ctx = make_test_ctx(
582                "queue",
583                vec!["add", desc],
584                "alice",
585                chat_tmp.path(),
586                &clients,
587                &seq,
588            );
589            plugin.handle(ctx).await.unwrap();
590        }
591
592        // Remove item 1
593        let ctx = make_test_ctx(
594            "queue",
595            vec!["remove", "1"],
596            "alice",
597            chat_tmp.path(),
598            &clients,
599            &seq,
600        );
601        let result = plugin.handle(ctx).await.unwrap();
602        match &result {
603            PluginResult::Broadcast(msg) => {
604                assert!(
605                    msg.contains("first"),
606                    "broadcast should mention removed item"
607                );
608                assert!(msg.contains("#1"), "broadcast should mention index");
609            }
610            _ => panic!("expected Broadcast"),
611        }
612
613        // Verify only "second" remains
614        let queue = plugin.queue.lock().await;
615        assert_eq!(queue.len(), 1);
616        assert_eq!(queue[0].description, "second");
617        drop(queue);
618
619        // Verify persistence
620        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
621        assert_eq!(persisted.len(), 1);
622        assert_eq!(persisted[0].description, "second");
623    }
624
625    #[tokio::test]
626    async fn remove_out_of_range() {
627        let dir = TempDir::new().unwrap();
628        let plugin = make_test_plugin(&dir);
629        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
630        let (clients, _rx, seq) = make_test_clients();
631
632        let ctx = make_test_ctx(
633            "queue",
634            vec!["remove", "5"],
635            "alice",
636            chat_tmp.path(),
637            &clients,
638            &seq,
639        );
640        let result = plugin.handle(ctx).await.unwrap();
641        match result {
642            PluginResult::Reply(msg) => assert!(msg.contains("out of range")),
643            _ => panic!("expected Reply"),
644        }
645    }
646
647    #[tokio::test]
648    async fn remove_invalid_index() {
649        let dir = TempDir::new().unwrap();
650        let plugin = make_test_plugin(&dir);
651        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
652        let (clients, _rx, seq) = make_test_clients();
653
654        let ctx = make_test_ctx(
655            "queue",
656            vec!["remove", "abc"],
657            "alice",
658            chat_tmp.path(),
659            &clients,
660            &seq,
661        );
662        let result = plugin.handle(ctx).await.unwrap();
663        match result {
664            PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
665            _ => panic!("expected Reply"),
666        }
667    }
668
669    #[tokio::test]
670    async fn remove_zero_index() {
671        let dir = TempDir::new().unwrap();
672        let plugin = make_test_plugin(&dir);
673        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
674        let (clients, _rx, seq) = make_test_clients();
675
676        let ctx = make_test_ctx(
677            "queue",
678            vec!["remove", "0"],
679            "alice",
680            chat_tmp.path(),
681            &clients,
682            &seq,
683        );
684        let result = plugin.handle(ctx).await.unwrap();
685        match result {
686            PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
687            _ => panic!("expected Reply"),
688        }
689    }
690
691    #[tokio::test]
692    async fn pop_removes_first_item() {
693        let dir = TempDir::new().unwrap();
694        let plugin = make_test_plugin(&dir);
695
696        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
697        let (clients, _rx, seq) = make_test_clients();
698
699        // Add two items
700        for desc in ["urgent fix", "nice to have"] {
701            let words: Vec<&str> = std::iter::once("add")
702                .chain(desc.split_whitespace())
703                .collect();
704            let ctx = make_test_ctx("queue", words, "ba", chat_tmp.path(), &clients, &seq);
705            plugin.handle(ctx).await.unwrap();
706        }
707
708        // Pop as alice
709        let ctx = make_test_ctx(
710            "queue",
711            vec!["pop"],
712            "alice",
713            chat_tmp.path(),
714            &clients,
715            &seq,
716        );
717        let result = plugin.handle(ctx).await.unwrap();
718        match &result {
719            PluginResult::Broadcast(msg) => {
720                assert!(msg.contains("alice"), "broadcast should mention popper");
721                assert!(msg.contains("urgent fix"), "broadcast should mention task");
722            }
723            _ => panic!("expected Broadcast"),
724        }
725
726        // Verify queue has only 1 item left
727        let queue = plugin.queue.lock().await;
728        assert_eq!(queue.len(), 1);
729        assert_eq!(queue[0].description, "nice to have");
730        drop(queue);
731
732        // Verify persistence
733        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
734        assert_eq!(persisted.len(), 1);
735    }
736
737    #[tokio::test]
738    async fn pop_empty_queue_returns_error() {
739        let dir = TempDir::new().unwrap();
740        let plugin = make_test_plugin(&dir);
741        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
742        let (clients, _rx, seq) = make_test_clients();
743
744        let ctx = make_test_ctx(
745            "queue",
746            vec!["pop"],
747            "alice",
748            chat_tmp.path(),
749            &clients,
750            &seq,
751        );
752        let result = plugin.handle(ctx).await.unwrap();
753        match result {
754            PluginResult::Reply(msg) => assert!(msg.contains("empty")),
755            _ => panic!("expected Reply"),
756        }
757    }
758
759    #[tokio::test]
760    async fn pop_fifo_order() {
761        let dir = TempDir::new().unwrap();
762        let plugin = make_test_plugin(&dir);
763        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
764        let (clients, _rx, seq) = make_test_clients();
765
766        // Add two items
767        let ctx = make_test_ctx(
768            "queue",
769            vec!["add", "first"],
770            "ba",
771            chat_tmp.path(),
772            &clients,
773            &seq,
774        );
775        plugin.handle(ctx).await.unwrap();
776        let ctx = make_test_ctx(
777            "queue",
778            vec!["add", "second"],
779            "ba",
780            chat_tmp.path(),
781            &clients,
782            &seq,
783        );
784        plugin.handle(ctx).await.unwrap();
785
786        // Pop should return first item
787        let ctx = make_test_ctx(
788            "queue",
789            vec!["pop"],
790            "alice",
791            chat_tmp.path(),
792            &clients,
793            &seq,
794        );
795        let result = plugin.handle(ctx).await.unwrap();
796        match &result {
797            PluginResult::Broadcast(msg) => assert!(msg.contains("first")),
798            _ => panic!("expected Broadcast"),
799        }
800
801        // Pop again should return second item
802        let ctx = make_test_ctx("queue", vec!["pop"], "bob", chat_tmp.path(), &clients, &seq);
803        let result = plugin.handle(ctx).await.unwrap();
804        match &result {
805            PluginResult::Broadcast(msg) => assert!(msg.contains("second")),
806            _ => panic!("expected Broadcast"),
807        }
808
809        // Queue is now empty
810        assert!(plugin.queue.lock().await.is_empty());
811    }
812
813    #[tokio::test]
814    async fn unknown_action_returns_error() {
815        let dir = TempDir::new().unwrap();
816        let plugin = make_test_plugin(&dir);
817        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
818        let (clients, _rx, seq) = make_test_clients();
819
820        let ctx = make_test_ctx(
821            "queue",
822            vec!["foobar"],
823            "alice",
824            chat_tmp.path(),
825            &clients,
826            &seq,
827        );
828        let result = plugin.handle(ctx).await.unwrap();
829        match result {
830            PluginResult::Reply(msg) => assert!(msg.contains("unknown action")),
831            _ => panic!("expected Reply"),
832        }
833    }
834
835    #[tokio::test]
836    async fn queue_survives_reload() {
837        let dir = TempDir::new().unwrap();
838        let queue_path = dir.path().join("test-room.queue");
839
840        // First plugin instance — add items
841        {
842            let plugin = QueuePlugin::new(queue_path.clone()).unwrap();
843            let chat_tmp = tempfile::NamedTempFile::new().unwrap();
844            let (clients, _rx, seq) = make_test_clients();
845
846            for desc in ["task a", "task b", "task c"] {
847                let words: Vec<&str> = std::iter::once("add")
848                    .chain(desc.split_whitespace())
849                    .collect();
850                let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
851                plugin.handle(ctx).await.unwrap();
852            }
853        }
854
855        // Second plugin instance — simulates broker restart
856        let plugin2 = QueuePlugin::new(queue_path).unwrap();
857        let queue = plugin2.queue.lock().await;
858        assert_eq!(queue.len(), 3);
859        assert_eq!(queue[0].description, "task a");
860        assert_eq!(queue[1].description, "task b");
861        assert_eq!(queue[2].description, "task c");
862    }
863
864    // ── Plugin trait tests ──────────────────────────────────────────────────
865
866    #[test]
867    fn plugin_name_is_queue() {
868        let dir = TempDir::new().unwrap();
869        let plugin = make_test_plugin(&dir);
870        assert_eq!(plugin.name(), "queue");
871    }
872
873    #[test]
874    fn plugin_registers_queue_command() {
875        let dir = TempDir::new().unwrap();
876        let plugin = make_test_plugin(&dir);
877        let cmds = plugin.commands();
878        assert_eq!(cmds.len(), 1);
879        assert_eq!(cmds[0].name, "queue");
880        assert_eq!(cmds[0].params.len(), 2);
881        assert!(cmds[0].params[0].required);
882        assert!(!cmds[0].params[1].required);
883    }
884}