Skip to main content

room_daemon/plugin/
queue.rs

1use std::{
2    io::{BufRead, Write},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10use uuid::Uuid;
11
12use super::{BoxFuture, CommandContext, CommandInfo, ParamSchema, ParamType, Plugin, PluginResult};
13
14/// A single item in the task queue backlog.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct QueueItem {
17    /// Unique identifier for this queue entry.
18    pub id: String,
19    /// Human-readable task description.
20    pub description: String,
21    /// Username of the agent who added this item.
22    pub added_by: String,
23    /// Timestamp when the item was added.
24    pub added_at: DateTime<Utc>,
25    /// Current status — always `"queued"` while in the backlog.
26    pub status: String,
27}
28
29/// Plugin that manages a persistent task backlog.
30///
31/// Agents can add tasks to the queue and self-assign from it using `/queue pop`.
32///
33/// Queue state is persisted to an NDJSON file alongside the room's `.chat` file.
34pub struct QueuePlugin {
35    /// In-memory queue items, protected by a mutex for concurrent access.
36    queue: Arc<Mutex<Vec<QueueItem>>>,
37    /// Path to the NDJSON persistence file (`<room-data-dir>/<room-id>.queue`).
38    queue_path: PathBuf,
39}
40
41impl QueuePlugin {
42    /// Create a new `QueuePlugin`, loading any existing queue from disk.
43    ///
44    /// # Arguments
45    /// * `queue_path` — path to the `.queue` NDJSON file
46    pub(crate) fn new(queue_path: PathBuf) -> anyhow::Result<Self> {
47        let items = load_queue(&queue_path)?;
48        Ok(Self {
49            queue: Arc::new(Mutex::new(items)),
50            queue_path,
51        })
52    }
53
54    /// Derive the `.queue` file path from a `.chat` file path.
55    pub fn queue_path_from_chat(chat_path: &Path) -> PathBuf {
56        chat_path.with_extension("queue")
57    }
58
59    /// Returns the command info for the TUI command palette without needing
60    /// an instantiated plugin. Used by `all_known_commands()`.
61    pub fn default_commands() -> Vec<CommandInfo> {
62        vec![CommandInfo {
63            name: "queue".to_owned(),
64            description: "Manage the task backlog".to_owned(),
65            usage: "/queue <add|list|remove|pop> [args]".to_owned(),
66            params: vec![
67                ParamSchema {
68                    name: "action".to_owned(),
69                    param_type: ParamType::Choice(vec![
70                        "add".to_owned(),
71                        "list".to_owned(),
72                        "remove".to_owned(),
73                        "pop".to_owned(),
74                    ]),
75                    required: true,
76                    description: "Action to perform".to_owned(),
77                },
78                ParamSchema {
79                    name: "args".to_owned(),
80                    param_type: ParamType::Text,
81                    required: false,
82                    description: "Task description (add) or index (remove)".to_owned(),
83                },
84            ],
85        }]
86    }
87}
88
89impl Plugin for QueuePlugin {
90    fn name(&self) -> &str {
91        "queue"
92    }
93
94    fn version(&self) -> &str {
95        env!("CARGO_PKG_VERSION")
96    }
97
98    fn commands(&self) -> Vec<CommandInfo> {
99        Self::default_commands()
100    }
101
102    fn handle(&self, ctx: CommandContext) -> BoxFuture<'_, anyhow::Result<PluginResult>> {
103        Box::pin(async move {
104            let action = ctx.params.first().map(String::as_str).unwrap_or("");
105            let rest: Vec<&str> = ctx.params.iter().skip(1).map(String::as_str).collect();
106
107            match action {
108                "add" => self.handle_add(&ctx.sender, &rest).await,
109                "list" => self.handle_list(&ctx).await,
110                "remove" => self.handle_remove(&rest).await,
111                "pop" => self.handle_pop(&ctx.sender).await,
112                _ => Ok(PluginResult::Reply(
113                    format!("queue: unknown action '{action}'. use add, list, remove, or pop"),
114                    None,
115                )),
116            }
117        })
118    }
119}
120
121impl QueuePlugin {
122    async fn handle_add(&self, sender: &str, rest: &[&str]) -> anyhow::Result<PluginResult> {
123        if rest.is_empty() {
124            return Ok(PluginResult::Reply(
125                "queue add: missing task description".to_owned(),
126                None,
127            ));
128        }
129
130        let description = rest.join(" ");
131        let item = QueueItem {
132            id: Uuid::new_v4().to_string(),
133            description: description.clone(),
134            added_by: sender.to_owned(),
135            added_at: Utc::now(),
136            status: "queued".to_owned(),
137        };
138
139        {
140            let mut queue = self.queue.lock().await;
141            queue.push(item.clone());
142        }
143
144        append_item(&self.queue_path, &item)?;
145        let count = self.queue.lock().await.len();
146
147        Ok(PluginResult::Broadcast(
148            format!("queue: {sender} added \"{description}\" (#{count} in backlog)"),
149            None,
150        ))
151    }
152
153    async fn handle_list(&self, _ctx: &CommandContext) -> anyhow::Result<PluginResult> {
154        let queue = self.queue.lock().await;
155
156        if queue.is_empty() {
157            return Ok(PluginResult::Reply(
158                "queue: backlog is empty".to_owned(),
159                None,
160            ));
161        }
162
163        let mut lines = Vec::with_capacity(queue.len() + 1);
164        lines.push(format!("queue: {} item(s) in backlog", queue.len()));
165        for (i, item) in queue.iter().enumerate() {
166            lines.push(format!(
167                "  {}. {} (added by {} at {})",
168                i + 1,
169                item.description,
170                item.added_by,
171                item.added_at.format("%Y-%m-%d %H:%M UTC")
172            ));
173        }
174
175        Ok(PluginResult::Reply(lines.join("\n"), None))
176    }
177
178    async fn handle_remove(&self, rest: &[&str]) -> anyhow::Result<PluginResult> {
179        let index_str = rest.first().copied().unwrap_or("");
180        let index: usize = match index_str.parse::<usize>() {
181            Ok(n) if n >= 1 => n,
182            _ => {
183                return Ok(PluginResult::Reply(
184                    "queue remove: provide a valid 1-based index".to_owned(),
185                    None,
186                ));
187            }
188        };
189
190        let removed = {
191            let mut queue = self.queue.lock().await;
192            if index > queue.len() {
193                return Ok(PluginResult::Reply(
194                    format!(
195                        "queue remove: index {index} out of range (queue has {} item(s))",
196                        queue.len()
197                    ),
198                    None,
199                ));
200            }
201            queue.remove(index - 1)
202        };
203
204        self.rewrite_queue().await?;
205
206        Ok(PluginResult::Broadcast(
207            format!("queue: removed \"{}\" (was #{index})", removed.description),
208            None,
209        ))
210    }
211
212    async fn handle_pop(&self, sender: &str) -> anyhow::Result<PluginResult> {
213        let popped = {
214            let mut queue = self.queue.lock().await;
215            if queue.is_empty() {
216                return Ok(PluginResult::Reply(
217                    "queue pop: backlog is empty, nothing to pop".to_owned(),
218                    None,
219                ));
220            }
221            queue.remove(0)
222        };
223
224        self.rewrite_queue().await?;
225
226        Ok(PluginResult::Broadcast(
227            format!("{sender} popped from queue: \"{}\"", popped.description),
228            None,
229        ))
230    }
231
232    /// Rewrite the entire queue file from the in-memory state.
233    async fn rewrite_queue(&self) -> anyhow::Result<()> {
234        let queue = self.queue.lock().await;
235        rewrite_queue_file(&self.queue_path, &queue)
236    }
237}
238
239// ── Persistence helpers ─────────────────────────────────────────────────────
240
241/// Load queue items from an NDJSON file. Returns an empty vec if the file
242/// does not exist.
243fn load_queue(path: &Path) -> anyhow::Result<Vec<QueueItem>> {
244    if !path.exists() {
245        return Ok(Vec::new());
246    }
247
248    let file = std::fs::File::open(path)?;
249    let reader = std::io::BufReader::new(file);
250    let mut items = Vec::new();
251
252    for line in reader.lines() {
253        let line = line?;
254        let trimmed = line.trim();
255        if trimmed.is_empty() {
256            continue;
257        }
258        match serde_json::from_str::<QueueItem>(trimmed) {
259            Ok(item) => items.push(item),
260            Err(e) => {
261                eprintln!("queue: skipping malformed line in {}: {e}", path.display());
262            }
263        }
264    }
265
266    Ok(items)
267}
268
269/// Append a single item to the NDJSON queue file.
270fn append_item(path: &Path, item: &QueueItem) -> anyhow::Result<()> {
271    let mut file = std::fs::OpenOptions::new()
272        .create(true)
273        .append(true)
274        .open(path)?;
275    let line = serde_json::to_string(item)?;
276    writeln!(file, "{line}")?;
277    Ok(())
278}
279
280/// Rewrite the entire queue file from a slice of items.
281fn rewrite_queue_file(path: &Path, items: &[QueueItem]) -> anyhow::Result<()> {
282    let mut file = std::fs::File::create(path)?;
283    for item in items {
284        let line = serde_json::to_string(item)?;
285        writeln!(file, "{line}")?;
286    }
287    Ok(())
288}
289
290// ── Tests ───────────────────────────────────────────────────────────────────
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use crate::plugin::{ChatWriter, HistoryReader, RoomMetadata, UserInfo};
296    use std::collections::HashMap;
297    use std::sync::atomic::AtomicU64;
298    use tempfile::TempDir;
299    use tokio::sync::broadcast;
300
301    /// Helper: create a QueuePlugin with a temp directory.
302    fn make_test_plugin(dir: &TempDir) -> QueuePlugin {
303        let queue_path = dir.path().join("test-room.queue");
304        QueuePlugin::new(queue_path).unwrap()
305    }
306
307    /// Helper: create a CommandContext wired to a test plugin.
308    fn make_test_ctx(
309        command: &str,
310        params: Vec<&str>,
311        sender: &str,
312        chat_path: &Path,
313        clients: &Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
314        seq: &Arc<AtomicU64>,
315    ) -> CommandContext {
316        let chat_arc = Arc::new(chat_path.to_path_buf());
317        let room_id = Arc::new("test-room".to_owned());
318
319        CommandContext {
320            command: command.to_owned(),
321            params: params.into_iter().map(|s| s.to_owned()).collect(),
322            sender: sender.to_owned(),
323            room_id: "test-room".to_owned(),
324            message_id: Uuid::new_v4().to_string(),
325            timestamp: Utc::now(),
326            history: Box::new(HistoryReader::new(chat_path, sender)),
327            writer: Box::new(ChatWriter::new(clients, &chat_arc, &room_id, seq, "queue")),
328            metadata: RoomMetadata {
329                online_users: vec![UserInfo {
330                    username: sender.to_owned(),
331                    status: String::new(),
332                }],
333                host: None,
334                message_count: 0,
335            },
336            available_commands: vec![],
337            team_access: None,
338        }
339    }
340
341    /// Helper: set up broadcast channel and client map for tests.
342    fn make_test_clients() -> (
343        Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
344        broadcast::Receiver<String>,
345        Arc<AtomicU64>,
346    ) {
347        let (tx, rx) = broadcast::channel::<String>(64);
348        let mut map = HashMap::new();
349        map.insert(1u64, ("alice".to_owned(), tx));
350        let clients = Arc::new(Mutex::new(map));
351        let seq = Arc::new(AtomicU64::new(0));
352        (clients, rx, seq)
353    }
354
355    // ── load/save persistence tests ─────────────────────────────────────────
356
357    #[test]
358    fn load_queue_nonexistent_file_returns_empty() {
359        let items = load_queue(Path::new("/nonexistent/path.queue")).unwrap();
360        assert!(items.is_empty());
361    }
362
363    #[test]
364    fn append_and_load_round_trips() {
365        let dir = TempDir::new().unwrap();
366        let path = dir.path().join("test.queue");
367
368        let item = QueueItem {
369            id: "id-1".to_owned(),
370            description: "fix the bug".to_owned(),
371            added_by: "alice".to_owned(),
372            added_at: Utc::now(),
373            status: "queued".to_owned(),
374        };
375
376        append_item(&path, &item).unwrap();
377        let loaded = load_queue(&path).unwrap();
378        assert_eq!(loaded.len(), 1);
379        assert_eq!(loaded[0].id, "id-1");
380        assert_eq!(loaded[0].description, "fix the bug");
381        assert_eq!(loaded[0].added_by, "alice");
382        assert_eq!(loaded[0].status, "queued");
383    }
384
385    #[test]
386    fn rewrite_replaces_file_contents() {
387        let dir = TempDir::new().unwrap();
388        let path = dir.path().join("test.queue");
389
390        // Write 3 items
391        let items: Vec<QueueItem> = (0..3)
392            .map(|i| QueueItem {
393                id: format!("id-{i}"),
394                description: format!("task {i}"),
395                added_by: "bob".to_owned(),
396                added_at: Utc::now(),
397                status: "queued".to_owned(),
398            })
399            .collect();
400
401        for item in &items {
402            append_item(&path, item).unwrap();
403        }
404        assert_eq!(load_queue(&path).unwrap().len(), 3);
405
406        // Rewrite with only 1 item
407        rewrite_queue_file(&path, &items[1..2]).unwrap();
408        let reloaded = load_queue(&path).unwrap();
409        assert_eq!(reloaded.len(), 1);
410        assert_eq!(reloaded[0].id, "id-1");
411    }
412
413    #[test]
414    fn load_skips_malformed_lines() {
415        let dir = TempDir::new().unwrap();
416        let path = dir.path().join("test.queue");
417
418        let good = QueueItem {
419            id: "good".to_owned(),
420            description: "valid".to_owned(),
421            added_by: "alice".to_owned(),
422            added_at: Utc::now(),
423            status: "queued".to_owned(),
424        };
425
426        let mut file = std::fs::File::create(&path).unwrap();
427        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
428        writeln!(file, "not valid json").unwrap();
429        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
430        writeln!(file).unwrap(); // blank line
431
432        let loaded = load_queue(&path).unwrap();
433        assert_eq!(loaded.len(), 2);
434    }
435
436    #[test]
437    fn queue_path_from_chat_replaces_extension() {
438        let chat = PathBuf::from("/data/room-dev.chat");
439        let queue = QueuePlugin::queue_path_from_chat(&chat);
440        assert_eq!(queue, PathBuf::from("/data/room-dev.queue"));
441    }
442
443    // ── plugin construction tests ───────────────────────────────────────────
444
445    #[test]
446    fn new_plugin_loads_existing_queue() {
447        let dir = TempDir::new().unwrap();
448        let path = dir.path().join("test.queue");
449
450        let item = QueueItem {
451            id: "pre-existing".to_owned(),
452            description: "already there".to_owned(),
453            added_by: "ba".to_owned(),
454            added_at: Utc::now(),
455            status: "queued".to_owned(),
456        };
457        append_item(&path, &item).unwrap();
458
459        let plugin = QueuePlugin::new(path).unwrap();
460
461        let rt = tokio::runtime::Runtime::new().unwrap();
462        rt.block_on(async {
463            let queue = plugin.queue.lock().await;
464            assert_eq!(queue.len(), 1);
465            assert_eq!(queue[0].description, "already there");
466        });
467    }
468
469    // ── handle tests ────────────────────────────────────────────────────────
470
471    #[tokio::test]
472    async fn add_appends_to_queue_and_persists() {
473        let dir = TempDir::new().unwrap();
474        let plugin = make_test_plugin(&dir);
475        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
476        let (clients, _rx, seq) = make_test_clients();
477        let ctx = make_test_ctx(
478            "queue",
479            vec!["add", "fix", "the", "bug"],
480            "alice",
481            chat_tmp.path(),
482            &clients,
483            &seq,
484        );
485
486        let result = plugin.handle(ctx).await.unwrap();
487        match &result {
488            PluginResult::Broadcast(msg, None) => {
489                assert!(msg.contains("fix the bug"), "broadcast should mention task");
490                assert!(msg.contains("alice"), "broadcast should mention sender");
491                assert!(msg.contains("#1"), "broadcast should show queue position");
492            }
493            _ => panic!("expected Broadcast"),
494        }
495
496        // In-memory
497        let queue = plugin.queue.lock().await;
498        assert_eq!(queue.len(), 1);
499        assert_eq!(queue[0].description, "fix the bug");
500        assert_eq!(queue[0].added_by, "alice");
501        drop(queue);
502
503        // On disk
504        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
505        assert_eq!(persisted.len(), 1);
506        assert_eq!(persisted[0].description, "fix the bug");
507    }
508
509    #[tokio::test]
510    async fn add_without_description_returns_error() {
511        let dir = TempDir::new().unwrap();
512        let plugin = make_test_plugin(&dir);
513        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
514        let (clients, _rx, seq) = make_test_clients();
515        let ctx = make_test_ctx(
516            "queue",
517            vec!["add"],
518            "alice",
519            chat_tmp.path(),
520            &clients,
521            &seq,
522        );
523
524        let result = plugin.handle(ctx).await.unwrap();
525        match result {
526            PluginResult::Reply(msg, None) => assert!(msg.contains("missing task description")),
527            _ => panic!("expected Reply"),
528        }
529    }
530
531    #[tokio::test]
532    async fn list_empty_queue() {
533        let dir = TempDir::new().unwrap();
534        let plugin = make_test_plugin(&dir);
535        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
536        let (clients, _rx, seq) = make_test_clients();
537        let ctx = make_test_ctx(
538            "queue",
539            vec!["list"],
540            "alice",
541            chat_tmp.path(),
542            &clients,
543            &seq,
544        );
545
546        let result = plugin.handle(ctx).await.unwrap();
547        match result {
548            PluginResult::Reply(msg, None) => assert!(msg.contains("empty")),
549            _ => panic!("expected Reply"),
550        }
551    }
552
553    #[tokio::test]
554    async fn list_shows_indexed_items() {
555        let dir = TempDir::new().unwrap();
556        let plugin = make_test_plugin(&dir);
557        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
558        let (clients, _rx, seq) = make_test_clients();
559
560        // Add two items
561        for desc in ["first task", "second task"] {
562            let words: Vec<&str> = std::iter::once("add")
563                .chain(desc.split_whitespace())
564                .collect();
565            let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
566            plugin.handle(ctx).await.unwrap();
567        }
568
569        let ctx = make_test_ctx(
570            "queue",
571            vec!["list"],
572            "alice",
573            chat_tmp.path(),
574            &clients,
575            &seq,
576        );
577        let result = plugin.handle(ctx).await.unwrap();
578        match result {
579            PluginResult::Reply(msg, None) => {
580                assert!(msg.contains("2 item(s)"));
581                assert!(msg.contains("1. first task"));
582                assert!(msg.contains("2. second task"));
583            }
584            _ => panic!("expected Reply"),
585        }
586    }
587
588    #[tokio::test]
589    async fn remove_by_index() {
590        let dir = TempDir::new().unwrap();
591        let plugin = make_test_plugin(&dir);
592        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
593        let (clients, _rx, seq) = make_test_clients();
594
595        // Add two items
596        for desc in ["first", "second"] {
597            let ctx = make_test_ctx(
598                "queue",
599                vec!["add", desc],
600                "alice",
601                chat_tmp.path(),
602                &clients,
603                &seq,
604            );
605            plugin.handle(ctx).await.unwrap();
606        }
607
608        // Remove item 1
609        let ctx = make_test_ctx(
610            "queue",
611            vec!["remove", "1"],
612            "alice",
613            chat_tmp.path(),
614            &clients,
615            &seq,
616        );
617        let result = plugin.handle(ctx).await.unwrap();
618        match &result {
619            PluginResult::Broadcast(msg, None) => {
620                assert!(
621                    msg.contains("first"),
622                    "broadcast should mention removed item"
623                );
624                assert!(msg.contains("#1"), "broadcast should mention index");
625            }
626            _ => panic!("expected Broadcast"),
627        }
628
629        // Verify only "second" remains
630        let queue = plugin.queue.lock().await;
631        assert_eq!(queue.len(), 1);
632        assert_eq!(queue[0].description, "second");
633        drop(queue);
634
635        // Verify persistence
636        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
637        assert_eq!(persisted.len(), 1);
638        assert_eq!(persisted[0].description, "second");
639    }
640
641    #[tokio::test]
642    async fn remove_out_of_range() {
643        let dir = TempDir::new().unwrap();
644        let plugin = make_test_plugin(&dir);
645        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
646        let (clients, _rx, seq) = make_test_clients();
647
648        let ctx = make_test_ctx(
649            "queue",
650            vec!["remove", "5"],
651            "alice",
652            chat_tmp.path(),
653            &clients,
654            &seq,
655        );
656        let result = plugin.handle(ctx).await.unwrap();
657        match result {
658            PluginResult::Reply(msg, None) => assert!(msg.contains("out of range")),
659            _ => panic!("expected Reply"),
660        }
661    }
662
663    #[tokio::test]
664    async fn remove_invalid_index() {
665        let dir = TempDir::new().unwrap();
666        let plugin = make_test_plugin(&dir);
667        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
668        let (clients, _rx, seq) = make_test_clients();
669
670        let ctx = make_test_ctx(
671            "queue",
672            vec!["remove", "abc"],
673            "alice",
674            chat_tmp.path(),
675            &clients,
676            &seq,
677        );
678        let result = plugin.handle(ctx).await.unwrap();
679        match result {
680            PluginResult::Reply(msg, None) => assert!(msg.contains("valid 1-based index")),
681            _ => panic!("expected Reply"),
682        }
683    }
684
685    #[tokio::test]
686    async fn remove_zero_index() {
687        let dir = TempDir::new().unwrap();
688        let plugin = make_test_plugin(&dir);
689        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
690        let (clients, _rx, seq) = make_test_clients();
691
692        let ctx = make_test_ctx(
693            "queue",
694            vec!["remove", "0"],
695            "alice",
696            chat_tmp.path(),
697            &clients,
698            &seq,
699        );
700        let result = plugin.handle(ctx).await.unwrap();
701        match result {
702            PluginResult::Reply(msg, None) => assert!(msg.contains("valid 1-based index")),
703            _ => panic!("expected Reply"),
704        }
705    }
706
707    #[tokio::test]
708    async fn pop_removes_first_item() {
709        let dir = TempDir::new().unwrap();
710        let plugin = make_test_plugin(&dir);
711
712        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
713        let (clients, _rx, seq) = make_test_clients();
714
715        // Add two items
716        for desc in ["urgent fix", "nice to have"] {
717            let words: Vec<&str> = std::iter::once("add")
718                .chain(desc.split_whitespace())
719                .collect();
720            let ctx = make_test_ctx("queue", words, "ba", chat_tmp.path(), &clients, &seq);
721            plugin.handle(ctx).await.unwrap();
722        }
723
724        // Pop as alice
725        let ctx = make_test_ctx(
726            "queue",
727            vec!["pop"],
728            "alice",
729            chat_tmp.path(),
730            &clients,
731            &seq,
732        );
733        let result = plugin.handle(ctx).await.unwrap();
734        match &result {
735            PluginResult::Broadcast(msg, None) => {
736                assert!(msg.contains("alice"), "broadcast should mention popper");
737                assert!(msg.contains("urgent fix"), "broadcast should mention task");
738            }
739            _ => panic!("expected Broadcast"),
740        }
741
742        // Verify queue has only 1 item left
743        let queue = plugin.queue.lock().await;
744        assert_eq!(queue.len(), 1);
745        assert_eq!(queue[0].description, "nice to have");
746        drop(queue);
747
748        // Verify persistence
749        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
750        assert_eq!(persisted.len(), 1);
751    }
752
753    #[tokio::test]
754    async fn pop_empty_queue_returns_error() {
755        let dir = TempDir::new().unwrap();
756        let plugin = make_test_plugin(&dir);
757        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
758        let (clients, _rx, seq) = make_test_clients();
759
760        let ctx = make_test_ctx(
761            "queue",
762            vec!["pop"],
763            "alice",
764            chat_tmp.path(),
765            &clients,
766            &seq,
767        );
768        let result = plugin.handle(ctx).await.unwrap();
769        match result {
770            PluginResult::Reply(msg, None) => assert!(msg.contains("empty")),
771            _ => panic!("expected Reply"),
772        }
773    }
774
775    #[tokio::test]
776    async fn pop_fifo_order() {
777        let dir = TempDir::new().unwrap();
778        let plugin = make_test_plugin(&dir);
779        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
780        let (clients, _rx, seq) = make_test_clients();
781
782        // Add two items
783        let ctx = make_test_ctx(
784            "queue",
785            vec!["add", "first"],
786            "ba",
787            chat_tmp.path(),
788            &clients,
789            &seq,
790        );
791        plugin.handle(ctx).await.unwrap();
792        let ctx = make_test_ctx(
793            "queue",
794            vec!["add", "second"],
795            "ba",
796            chat_tmp.path(),
797            &clients,
798            &seq,
799        );
800        plugin.handle(ctx).await.unwrap();
801
802        // Pop should return first item
803        let ctx = make_test_ctx(
804            "queue",
805            vec!["pop"],
806            "alice",
807            chat_tmp.path(),
808            &clients,
809            &seq,
810        );
811        let result = plugin.handle(ctx).await.unwrap();
812        match &result {
813            PluginResult::Broadcast(msg, None) => assert!(msg.contains("first")),
814            _ => panic!("expected Broadcast"),
815        }
816
817        // Pop again should return second item
818        let ctx = make_test_ctx("queue", vec!["pop"], "bob", chat_tmp.path(), &clients, &seq);
819        let result = plugin.handle(ctx).await.unwrap();
820        match &result {
821            PluginResult::Broadcast(msg, None) => assert!(msg.contains("second")),
822            _ => panic!("expected Broadcast"),
823        }
824
825        // Queue is now empty
826        assert!(plugin.queue.lock().await.is_empty());
827    }
828
829    #[tokio::test]
830    async fn unknown_action_returns_error() {
831        let dir = TempDir::new().unwrap();
832        let plugin = make_test_plugin(&dir);
833        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
834        let (clients, _rx, seq) = make_test_clients();
835
836        let ctx = make_test_ctx(
837            "queue",
838            vec!["foobar"],
839            "alice",
840            chat_tmp.path(),
841            &clients,
842            &seq,
843        );
844        let result = plugin.handle(ctx).await.unwrap();
845        match result {
846            PluginResult::Reply(msg, None) => assert!(msg.contains("unknown action")),
847            _ => panic!("expected Reply"),
848        }
849    }
850
851    #[tokio::test]
852    async fn queue_survives_reload() {
853        let dir = TempDir::new().unwrap();
854        let queue_path = dir.path().join("test-room.queue");
855
856        // First plugin instance — add items
857        {
858            let plugin = QueuePlugin::new(queue_path.clone()).unwrap();
859            let chat_tmp = tempfile::NamedTempFile::new().unwrap();
860            let (clients, _rx, seq) = make_test_clients();
861
862            for desc in ["task a", "task b", "task c"] {
863                let words: Vec<&str> = std::iter::once("add")
864                    .chain(desc.split_whitespace())
865                    .collect();
866                let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
867                plugin.handle(ctx).await.unwrap();
868            }
869        }
870
871        // Second plugin instance — simulates broker restart
872        let plugin2 = QueuePlugin::new(queue_path).unwrap();
873        let queue = plugin2.queue.lock().await;
874        assert_eq!(queue.len(), 3);
875        assert_eq!(queue[0].description, "task a");
876        assert_eq!(queue[1].description, "task b");
877        assert_eq!(queue[2].description, "task c");
878    }
879
880    // ── Plugin trait tests ──────────────────────────────────────────────────
881
882    #[test]
883    fn plugin_name_is_queue() {
884        let dir = TempDir::new().unwrap();
885        let plugin = make_test_plugin(&dir);
886        assert_eq!(plugin.name(), "queue");
887    }
888
889    #[test]
890    fn plugin_registers_queue_command() {
891        let dir = TempDir::new().unwrap();
892        let plugin = make_test_plugin(&dir);
893        let cmds = plugin.commands();
894        assert_eq!(cmds.len(), 1);
895        assert_eq!(cmds[0].name, "queue");
896        assert_eq!(cmds[0].params.len(), 2);
897        assert!(cmds[0].params[0].required);
898        assert!(!cmds[0].params[1].required);
899    }
900}