Skip to main content

room_daemon/plugin/
queue.rs

1use std::{
2    io::{BufRead, Write},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10use uuid::Uuid;
11
12use super::{BoxFuture, CommandContext, CommandInfo, ParamSchema, ParamType, Plugin, PluginResult};
13
14/// A single item in the task queue backlog.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct QueueItem {
17    /// Unique identifier for this queue entry.
18    pub id: String,
19    /// Human-readable task description.
20    pub description: String,
21    /// Username of the agent who added this item.
22    pub added_by: String,
23    /// Timestamp when the item was added.
24    pub added_at: DateTime<Utc>,
25    /// Current status — always `"queued"` while in the backlog.
26    pub status: String,
27}
28
29/// Plugin that manages a persistent task backlog.
30///
31/// Agents can add tasks to the queue and self-assign from it using `/queue pop`.
32///
33/// Queue state is persisted to an NDJSON file alongside the room's `.chat` file.
34pub struct QueuePlugin {
35    /// In-memory queue items, protected by a mutex for concurrent access.
36    queue: Arc<Mutex<Vec<QueueItem>>>,
37    /// Path to the NDJSON persistence file (`<room-data-dir>/<room-id>.queue`).
38    queue_path: PathBuf,
39}
40
41impl QueuePlugin {
42    /// Create a new `QueuePlugin`, loading any existing queue from disk.
43    ///
44    /// # Arguments
45    /// * `queue_path` — path to the `.queue` NDJSON file
46    pub(crate) fn new(queue_path: PathBuf) -> anyhow::Result<Self> {
47        let items = load_queue(&queue_path)?;
48        Ok(Self {
49            queue: Arc::new(Mutex::new(items)),
50            queue_path,
51        })
52    }
53
54    /// Derive the `.queue` file path from a `.chat` file path.
55    pub fn queue_path_from_chat(chat_path: &Path) -> PathBuf {
56        chat_path.with_extension("queue")
57    }
58
59    /// Returns the command info for the TUI command palette without needing
60    /// an instantiated plugin. Used by `all_known_commands()`.
61    pub fn default_commands() -> Vec<CommandInfo> {
62        vec![CommandInfo {
63            name: "queue".to_owned(),
64            description: "Manage the task backlog".to_owned(),
65            usage: "/queue <add|list|remove|pop> [args]".to_owned(),
66            params: vec![
67                ParamSchema {
68                    name: "action".to_owned(),
69                    param_type: ParamType::Choice(vec![
70                        "add".to_owned(),
71                        "list".to_owned(),
72                        "remove".to_owned(),
73                        "pop".to_owned(),
74                    ]),
75                    required: true,
76                    description: "Action to perform".to_owned(),
77                },
78                ParamSchema {
79                    name: "args".to_owned(),
80                    param_type: ParamType::Text,
81                    required: false,
82                    description: "Task description (add) or index (remove)".to_owned(),
83                },
84            ],
85            subcommands: vec![
86                CommandInfo {
87                    name: "add".to_owned(),
88                    description: "Add an item to the queue".to_owned(),
89                    usage: "/queue add <description>".to_owned(),
90                    params: vec![ParamSchema {
91                        name: "description".to_owned(),
92                        param_type: ParamType::Text,
93                        required: true,
94                        description: "Item description".to_owned(),
95                    }],
96                    subcommands: vec![],
97                },
98                CommandInfo {
99                    name: "list".to_owned(),
100                    description: "List all items in the queue".to_owned(),
101                    usage: "/queue list".to_owned(),
102                    params: vec![],
103                    subcommands: vec![],
104                },
105                CommandInfo {
106                    name: "remove".to_owned(),
107                    description: "Remove an item by index".to_owned(),
108                    usage: "/queue remove <index>".to_owned(),
109                    params: vec![ParamSchema {
110                        name: "index".to_owned(),
111                        param_type: ParamType::Number {
112                            min: Some(1),
113                            max: None,
114                        },
115                        required: true,
116                        description: "1-based index of the item to remove".to_owned(),
117                    }],
118                    subcommands: vec![],
119                },
120                CommandInfo {
121                    name: "pop".to_owned(),
122                    description: "Remove and return the first item".to_owned(),
123                    usage: "/queue pop".to_owned(),
124                    params: vec![],
125                    subcommands: vec![],
126                },
127            ],
128        }]
129    }
130}
131
132impl Plugin for QueuePlugin {
133    fn name(&self) -> &str {
134        "queue"
135    }
136
137    fn version(&self) -> &str {
138        env!("CARGO_PKG_VERSION")
139    }
140
141    fn commands(&self) -> Vec<CommandInfo> {
142        Self::default_commands()
143    }
144
145    fn handle(&self, ctx: CommandContext) -> BoxFuture<'_, anyhow::Result<PluginResult>> {
146        Box::pin(async move {
147            let action = ctx.params.first().map(String::as_str).unwrap_or("");
148            let rest: Vec<&str> = ctx.params.iter().skip(1).map(String::as_str).collect();
149
150            match action {
151                "add" => self.handle_add(&ctx.sender, &rest).await,
152                "list" => self.handle_list(&ctx).await,
153                "remove" => self.handle_remove(&rest).await,
154                "pop" => self.handle_pop(&ctx.sender).await,
155                _ => Ok(PluginResult::Reply(
156                    format!("queue: unknown action '{action}'. use add, list, remove, or pop"),
157                    None,
158                )),
159            }
160        })
161    }
162}
163
164impl QueuePlugin {
165    async fn handle_add(&self, sender: &str, rest: &[&str]) -> anyhow::Result<PluginResult> {
166        if rest.is_empty() {
167            return Ok(PluginResult::Reply(
168                "queue add: missing task description".to_owned(),
169                None,
170            ));
171        }
172
173        let description = rest.join(" ");
174        let item = QueueItem {
175            id: Uuid::new_v4().to_string(),
176            description: description.clone(),
177            added_by: sender.to_owned(),
178            added_at: Utc::now(),
179            status: "queued".to_owned(),
180        };
181
182        {
183            let mut queue = self.queue.lock().await;
184            queue.push(item.clone());
185        }
186
187        append_item(&self.queue_path, &item)?;
188        let count = self.queue.lock().await.len();
189
190        Ok(PluginResult::Broadcast(
191            format!("queue: {sender} added \"{description}\" (#{count} in backlog)"),
192            None,
193        ))
194    }
195
196    async fn handle_list(&self, _ctx: &CommandContext) -> anyhow::Result<PluginResult> {
197        let queue = self.queue.lock().await;
198
199        if queue.is_empty() {
200            return Ok(PluginResult::Reply(
201                "queue: backlog is empty".to_owned(),
202                None,
203            ));
204        }
205
206        let mut lines = Vec::with_capacity(queue.len() + 1);
207        lines.push(format!("queue: {} item(s) in backlog", queue.len()));
208        for (i, item) in queue.iter().enumerate() {
209            lines.push(format!(
210                "  {}. {} (added by {} at {})",
211                i + 1,
212                item.description,
213                item.added_by,
214                item.added_at.format("%Y-%m-%d %H:%M UTC")
215            ));
216        }
217
218        Ok(PluginResult::Reply(lines.join("\n"), None))
219    }
220
221    async fn handle_remove(&self, rest: &[&str]) -> anyhow::Result<PluginResult> {
222        let index_str = rest.first().copied().unwrap_or("");
223        let index: usize = match index_str.parse::<usize>() {
224            Ok(n) if n >= 1 => n,
225            _ => {
226                return Ok(PluginResult::Reply(
227                    "queue remove: provide a valid 1-based index".to_owned(),
228                    None,
229                ));
230            }
231        };
232
233        let removed = {
234            let mut queue = self.queue.lock().await;
235            if index > queue.len() {
236                return Ok(PluginResult::Reply(
237                    format!(
238                        "queue remove: index {index} out of range (queue has {} item(s))",
239                        queue.len()
240                    ),
241                    None,
242                ));
243            }
244            queue.remove(index - 1)
245        };
246
247        self.rewrite_queue().await?;
248
249        Ok(PluginResult::Broadcast(
250            format!("queue: removed \"{}\" (was #{index})", removed.description),
251            None,
252        ))
253    }
254
255    async fn handle_pop(&self, sender: &str) -> anyhow::Result<PluginResult> {
256        let popped = {
257            let mut queue = self.queue.lock().await;
258            if queue.is_empty() {
259                return Ok(PluginResult::Reply(
260                    "queue pop: backlog is empty, nothing to pop".to_owned(),
261                    None,
262                ));
263            }
264            queue.remove(0)
265        };
266
267        self.rewrite_queue().await?;
268
269        Ok(PluginResult::Broadcast(
270            format!("{sender} popped from queue: \"{}\"", popped.description),
271            None,
272        ))
273    }
274
275    /// Rewrite the entire queue file from the in-memory state.
276    async fn rewrite_queue(&self) -> anyhow::Result<()> {
277        let queue = self.queue.lock().await;
278        rewrite_queue_file(&self.queue_path, &queue)
279    }
280}
281
282// ── Persistence helpers ─────────────────────────────────────────────────────
283
284/// Load queue items from an NDJSON file. Returns an empty vec if the file
285/// does not exist.
286fn load_queue(path: &Path) -> anyhow::Result<Vec<QueueItem>> {
287    if !path.exists() {
288        return Ok(Vec::new());
289    }
290
291    let file = std::fs::File::open(path)?;
292    let reader = std::io::BufReader::new(file);
293    let mut items = Vec::new();
294
295    for line in reader.lines() {
296        let line = line?;
297        let trimmed = line.trim();
298        if trimmed.is_empty() {
299            continue;
300        }
301        match serde_json::from_str::<QueueItem>(trimmed) {
302            Ok(item) => items.push(item),
303            Err(e) => {
304                eprintln!("queue: skipping malformed line in {}: {e}", path.display());
305            }
306        }
307    }
308
309    Ok(items)
310}
311
312/// Append a single item to the NDJSON queue file.
313fn append_item(path: &Path, item: &QueueItem) -> anyhow::Result<()> {
314    let mut file = std::fs::OpenOptions::new()
315        .create(true)
316        .append(true)
317        .open(path)?;
318    let line = serde_json::to_string(item)?;
319    writeln!(file, "{line}")?;
320    Ok(())
321}
322
323/// Rewrite the entire queue file from a slice of items.
324fn rewrite_queue_file(path: &Path, items: &[QueueItem]) -> anyhow::Result<()> {
325    let mut file = std::fs::File::create(path)?;
326    for item in items {
327        let line = serde_json::to_string(item)?;
328        writeln!(file, "{line}")?;
329    }
330    Ok(())
331}
332
333// ── Tests ───────────────────────────────────────────────────────────────────
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::plugin::{ChatWriter, HistoryReader, RoomMetadata, UserInfo};
339    use std::collections::HashMap;
340    use std::sync::atomic::AtomicU64;
341    use tempfile::TempDir;
342    use tokio::sync::broadcast;
343
344    /// Helper: create a QueuePlugin with a temp directory.
345    fn make_test_plugin(dir: &TempDir) -> QueuePlugin {
346        let queue_path = dir.path().join("test-room.queue");
347        QueuePlugin::new(queue_path).unwrap()
348    }
349
350    /// Helper: create a CommandContext wired to a test plugin.
351    fn make_test_ctx(
352        command: &str,
353        params: Vec<&str>,
354        sender: &str,
355        chat_path: &Path,
356        clients: &Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
357        seq: &Arc<AtomicU64>,
358    ) -> CommandContext {
359        let chat_arc = Arc::new(chat_path.to_path_buf());
360        let room_id = Arc::new("test-room".to_owned());
361
362        CommandContext {
363            command: command.to_owned(),
364            params: params.into_iter().map(|s| s.to_owned()).collect(),
365            sender: sender.to_owned(),
366            room_id: "test-room".to_owned(),
367            message_id: Uuid::new_v4().to_string(),
368            timestamp: Utc::now(),
369            history: Box::new(HistoryReader::new(chat_path, sender)),
370            writer: Box::new(ChatWriter::new(clients, &chat_arc, &room_id, seq, "queue")),
371            metadata: RoomMetadata {
372                online_users: vec![UserInfo {
373                    username: sender.to_owned(),
374                    status: String::new(),
375                }],
376                host: None,
377                message_count: 0,
378            },
379            available_commands: vec![],
380            team_access: None,
381        }
382    }
383
384    /// Helper: set up broadcast channel and client map for tests.
385    fn make_test_clients() -> (
386        Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
387        broadcast::Receiver<String>,
388        Arc<AtomicU64>,
389    ) {
390        let (tx, rx) = broadcast::channel::<String>(64);
391        let mut map = HashMap::new();
392        map.insert(1u64, ("alice".to_owned(), tx));
393        let clients = Arc::new(Mutex::new(map));
394        let seq = Arc::new(AtomicU64::new(0));
395        (clients, rx, seq)
396    }
397
398    // ── load/save persistence tests ─────────────────────────────────────────
399
400    #[test]
401    fn load_queue_nonexistent_file_returns_empty() {
402        let items = load_queue(Path::new("/nonexistent/path.queue")).unwrap();
403        assert!(items.is_empty());
404    }
405
406    #[test]
407    fn append_and_load_round_trips() {
408        let dir = TempDir::new().unwrap();
409        let path = dir.path().join("test.queue");
410
411        let item = QueueItem {
412            id: "id-1".to_owned(),
413            description: "fix the bug".to_owned(),
414            added_by: "alice".to_owned(),
415            added_at: Utc::now(),
416            status: "queued".to_owned(),
417        };
418
419        append_item(&path, &item).unwrap();
420        let loaded = load_queue(&path).unwrap();
421        assert_eq!(loaded.len(), 1);
422        assert_eq!(loaded[0].id, "id-1");
423        assert_eq!(loaded[0].description, "fix the bug");
424        assert_eq!(loaded[0].added_by, "alice");
425        assert_eq!(loaded[0].status, "queued");
426    }
427
428    #[test]
429    fn rewrite_replaces_file_contents() {
430        let dir = TempDir::new().unwrap();
431        let path = dir.path().join("test.queue");
432
433        // Write 3 items
434        let items: Vec<QueueItem> = (0..3)
435            .map(|i| QueueItem {
436                id: format!("id-{i}"),
437                description: format!("task {i}"),
438                added_by: "bob".to_owned(),
439                added_at: Utc::now(),
440                status: "queued".to_owned(),
441            })
442            .collect();
443
444        for item in &items {
445            append_item(&path, item).unwrap();
446        }
447        assert_eq!(load_queue(&path).unwrap().len(), 3);
448
449        // Rewrite with only 1 item
450        rewrite_queue_file(&path, &items[1..2]).unwrap();
451        let reloaded = load_queue(&path).unwrap();
452        assert_eq!(reloaded.len(), 1);
453        assert_eq!(reloaded[0].id, "id-1");
454    }
455
456    #[test]
457    fn load_skips_malformed_lines() {
458        let dir = TempDir::new().unwrap();
459        let path = dir.path().join("test.queue");
460
461        let good = QueueItem {
462            id: "good".to_owned(),
463            description: "valid".to_owned(),
464            added_by: "alice".to_owned(),
465            added_at: Utc::now(),
466            status: "queued".to_owned(),
467        };
468
469        let mut file = std::fs::File::create(&path).unwrap();
470        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
471        writeln!(file, "not valid json").unwrap();
472        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
473        writeln!(file).unwrap(); // blank line
474
475        let loaded = load_queue(&path).unwrap();
476        assert_eq!(loaded.len(), 2);
477    }
478
479    #[test]
480    fn queue_path_from_chat_replaces_extension() {
481        let chat = PathBuf::from("/data/room-dev.chat");
482        let queue = QueuePlugin::queue_path_from_chat(&chat);
483        assert_eq!(queue, PathBuf::from("/data/room-dev.queue"));
484    }
485
486    // ── plugin construction tests ───────────────────────────────────────────
487
488    #[test]
489    fn new_plugin_loads_existing_queue() {
490        let dir = TempDir::new().unwrap();
491        let path = dir.path().join("test.queue");
492
493        let item = QueueItem {
494            id: "pre-existing".to_owned(),
495            description: "already there".to_owned(),
496            added_by: "ba".to_owned(),
497            added_at: Utc::now(),
498            status: "queued".to_owned(),
499        };
500        append_item(&path, &item).unwrap();
501
502        let plugin = QueuePlugin::new(path).unwrap();
503
504        let rt = tokio::runtime::Runtime::new().unwrap();
505        rt.block_on(async {
506            let queue = plugin.queue.lock().await;
507            assert_eq!(queue.len(), 1);
508            assert_eq!(queue[0].description, "already there");
509        });
510    }
511
512    // ── handle tests ────────────────────────────────────────────────────────
513
514    #[tokio::test]
515    async fn add_appends_to_queue_and_persists() {
516        let dir = TempDir::new().unwrap();
517        let plugin = make_test_plugin(&dir);
518        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
519        let (clients, _rx, seq) = make_test_clients();
520        let ctx = make_test_ctx(
521            "queue",
522            vec!["add", "fix", "the", "bug"],
523            "alice",
524            chat_tmp.path(),
525            &clients,
526            &seq,
527        );
528
529        let result = plugin.handle(ctx).await.unwrap();
530        match &result {
531            PluginResult::Broadcast(msg, None) => {
532                assert!(msg.contains("fix the bug"), "broadcast should mention task");
533                assert!(msg.contains("alice"), "broadcast should mention sender");
534                assert!(msg.contains("#1"), "broadcast should show queue position");
535            }
536            _ => panic!("expected Broadcast"),
537        }
538
539        // In-memory
540        let queue = plugin.queue.lock().await;
541        assert_eq!(queue.len(), 1);
542        assert_eq!(queue[0].description, "fix the bug");
543        assert_eq!(queue[0].added_by, "alice");
544        drop(queue);
545
546        // On disk
547        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
548        assert_eq!(persisted.len(), 1);
549        assert_eq!(persisted[0].description, "fix the bug");
550    }
551
552    #[tokio::test]
553    async fn add_without_description_returns_error() {
554        let dir = TempDir::new().unwrap();
555        let plugin = make_test_plugin(&dir);
556        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
557        let (clients, _rx, seq) = make_test_clients();
558        let ctx = make_test_ctx(
559            "queue",
560            vec!["add"],
561            "alice",
562            chat_tmp.path(),
563            &clients,
564            &seq,
565        );
566
567        let result = plugin.handle(ctx).await.unwrap();
568        match result {
569            PluginResult::Reply(msg, None) => assert!(msg.contains("missing task description")),
570            _ => panic!("expected Reply"),
571        }
572    }
573
574    #[tokio::test]
575    async fn list_empty_queue() {
576        let dir = TempDir::new().unwrap();
577        let plugin = make_test_plugin(&dir);
578        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
579        let (clients, _rx, seq) = make_test_clients();
580        let ctx = make_test_ctx(
581            "queue",
582            vec!["list"],
583            "alice",
584            chat_tmp.path(),
585            &clients,
586            &seq,
587        );
588
589        let result = plugin.handle(ctx).await.unwrap();
590        match result {
591            PluginResult::Reply(msg, None) => assert!(msg.contains("empty")),
592            _ => panic!("expected Reply"),
593        }
594    }
595
596    #[tokio::test]
597    async fn list_shows_indexed_items() {
598        let dir = TempDir::new().unwrap();
599        let plugin = make_test_plugin(&dir);
600        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
601        let (clients, _rx, seq) = make_test_clients();
602
603        // Add two items
604        for desc in ["first task", "second task"] {
605            let words: Vec<&str> = std::iter::once("add")
606                .chain(desc.split_whitespace())
607                .collect();
608            let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
609            plugin.handle(ctx).await.unwrap();
610        }
611
612        let ctx = make_test_ctx(
613            "queue",
614            vec!["list"],
615            "alice",
616            chat_tmp.path(),
617            &clients,
618            &seq,
619        );
620        let result = plugin.handle(ctx).await.unwrap();
621        match result {
622            PluginResult::Reply(msg, None) => {
623                assert!(msg.contains("2 item(s)"));
624                assert!(msg.contains("1. first task"));
625                assert!(msg.contains("2. second task"));
626            }
627            _ => panic!("expected Reply"),
628        }
629    }
630
631    #[tokio::test]
632    async fn remove_by_index() {
633        let dir = TempDir::new().unwrap();
634        let plugin = make_test_plugin(&dir);
635        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
636        let (clients, _rx, seq) = make_test_clients();
637
638        // Add two items
639        for desc in ["first", "second"] {
640            let ctx = make_test_ctx(
641                "queue",
642                vec!["add", desc],
643                "alice",
644                chat_tmp.path(),
645                &clients,
646                &seq,
647            );
648            plugin.handle(ctx).await.unwrap();
649        }
650
651        // Remove item 1
652        let ctx = make_test_ctx(
653            "queue",
654            vec!["remove", "1"],
655            "alice",
656            chat_tmp.path(),
657            &clients,
658            &seq,
659        );
660        let result = plugin.handle(ctx).await.unwrap();
661        match &result {
662            PluginResult::Broadcast(msg, None) => {
663                assert!(
664                    msg.contains("first"),
665                    "broadcast should mention removed item"
666                );
667                assert!(msg.contains("#1"), "broadcast should mention index");
668            }
669            _ => panic!("expected Broadcast"),
670        }
671
672        // Verify only "second" remains
673        let queue = plugin.queue.lock().await;
674        assert_eq!(queue.len(), 1);
675        assert_eq!(queue[0].description, "second");
676        drop(queue);
677
678        // Verify persistence
679        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
680        assert_eq!(persisted.len(), 1);
681        assert_eq!(persisted[0].description, "second");
682    }
683
684    #[tokio::test]
685    async fn remove_out_of_range() {
686        let dir = TempDir::new().unwrap();
687        let plugin = make_test_plugin(&dir);
688        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
689        let (clients, _rx, seq) = make_test_clients();
690
691        let ctx = make_test_ctx(
692            "queue",
693            vec!["remove", "5"],
694            "alice",
695            chat_tmp.path(),
696            &clients,
697            &seq,
698        );
699        let result = plugin.handle(ctx).await.unwrap();
700        match result {
701            PluginResult::Reply(msg, None) => assert!(msg.contains("out of range")),
702            _ => panic!("expected Reply"),
703        }
704    }
705
706    #[tokio::test]
707    async fn remove_invalid_index() {
708        let dir = TempDir::new().unwrap();
709        let plugin = make_test_plugin(&dir);
710        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
711        let (clients, _rx, seq) = make_test_clients();
712
713        let ctx = make_test_ctx(
714            "queue",
715            vec!["remove", "abc"],
716            "alice",
717            chat_tmp.path(),
718            &clients,
719            &seq,
720        );
721        let result = plugin.handle(ctx).await.unwrap();
722        match result {
723            PluginResult::Reply(msg, None) => assert!(msg.contains("valid 1-based index")),
724            _ => panic!("expected Reply"),
725        }
726    }
727
728    #[tokio::test]
729    async fn remove_zero_index() {
730        let dir = TempDir::new().unwrap();
731        let plugin = make_test_plugin(&dir);
732        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
733        let (clients, _rx, seq) = make_test_clients();
734
735        let ctx = make_test_ctx(
736            "queue",
737            vec!["remove", "0"],
738            "alice",
739            chat_tmp.path(),
740            &clients,
741            &seq,
742        );
743        let result = plugin.handle(ctx).await.unwrap();
744        match result {
745            PluginResult::Reply(msg, None) => assert!(msg.contains("valid 1-based index")),
746            _ => panic!("expected Reply"),
747        }
748    }
749
750    #[tokio::test]
751    async fn pop_removes_first_item() {
752        let dir = TempDir::new().unwrap();
753        let plugin = make_test_plugin(&dir);
754
755        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
756        let (clients, _rx, seq) = make_test_clients();
757
758        // Add two items
759        for desc in ["urgent fix", "nice to have"] {
760            let words: Vec<&str> = std::iter::once("add")
761                .chain(desc.split_whitespace())
762                .collect();
763            let ctx = make_test_ctx("queue", words, "ba", chat_tmp.path(), &clients, &seq);
764            plugin.handle(ctx).await.unwrap();
765        }
766
767        // Pop as alice
768        let ctx = make_test_ctx(
769            "queue",
770            vec!["pop"],
771            "alice",
772            chat_tmp.path(),
773            &clients,
774            &seq,
775        );
776        let result = plugin.handle(ctx).await.unwrap();
777        match &result {
778            PluginResult::Broadcast(msg, None) => {
779                assert!(msg.contains("alice"), "broadcast should mention popper");
780                assert!(msg.contains("urgent fix"), "broadcast should mention task");
781            }
782            _ => panic!("expected Broadcast"),
783        }
784
785        // Verify queue has only 1 item left
786        let queue = plugin.queue.lock().await;
787        assert_eq!(queue.len(), 1);
788        assert_eq!(queue[0].description, "nice to have");
789        drop(queue);
790
791        // Verify persistence
792        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
793        assert_eq!(persisted.len(), 1);
794    }
795
796    #[tokio::test]
797    async fn pop_empty_queue_returns_error() {
798        let dir = TempDir::new().unwrap();
799        let plugin = make_test_plugin(&dir);
800        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
801        let (clients, _rx, seq) = make_test_clients();
802
803        let ctx = make_test_ctx(
804            "queue",
805            vec!["pop"],
806            "alice",
807            chat_tmp.path(),
808            &clients,
809            &seq,
810        );
811        let result = plugin.handle(ctx).await.unwrap();
812        match result {
813            PluginResult::Reply(msg, None) => assert!(msg.contains("empty")),
814            _ => panic!("expected Reply"),
815        }
816    }
817
818    #[tokio::test]
819    async fn pop_fifo_order() {
820        let dir = TempDir::new().unwrap();
821        let plugin = make_test_plugin(&dir);
822        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
823        let (clients, _rx, seq) = make_test_clients();
824
825        // Add two items
826        let ctx = make_test_ctx(
827            "queue",
828            vec!["add", "first"],
829            "ba",
830            chat_tmp.path(),
831            &clients,
832            &seq,
833        );
834        plugin.handle(ctx).await.unwrap();
835        let ctx = make_test_ctx(
836            "queue",
837            vec!["add", "second"],
838            "ba",
839            chat_tmp.path(),
840            &clients,
841            &seq,
842        );
843        plugin.handle(ctx).await.unwrap();
844
845        // Pop should return first item
846        let ctx = make_test_ctx(
847            "queue",
848            vec!["pop"],
849            "alice",
850            chat_tmp.path(),
851            &clients,
852            &seq,
853        );
854        let result = plugin.handle(ctx).await.unwrap();
855        match &result {
856            PluginResult::Broadcast(msg, None) => assert!(msg.contains("first")),
857            _ => panic!("expected Broadcast"),
858        }
859
860        // Pop again should return second item
861        let ctx = make_test_ctx("queue", vec!["pop"], "bob", chat_tmp.path(), &clients, &seq);
862        let result = plugin.handle(ctx).await.unwrap();
863        match &result {
864            PluginResult::Broadcast(msg, None) => assert!(msg.contains("second")),
865            _ => panic!("expected Broadcast"),
866        }
867
868        // Queue is now empty
869        assert!(plugin.queue.lock().await.is_empty());
870    }
871
872    #[tokio::test]
873    async fn unknown_action_returns_error() {
874        let dir = TempDir::new().unwrap();
875        let plugin = make_test_plugin(&dir);
876        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
877        let (clients, _rx, seq) = make_test_clients();
878
879        let ctx = make_test_ctx(
880            "queue",
881            vec!["foobar"],
882            "alice",
883            chat_tmp.path(),
884            &clients,
885            &seq,
886        );
887        let result = plugin.handle(ctx).await.unwrap();
888        match result {
889            PluginResult::Reply(msg, None) => assert!(msg.contains("unknown action")),
890            _ => panic!("expected Reply"),
891        }
892    }
893
894    #[tokio::test]
895    async fn queue_survives_reload() {
896        let dir = TempDir::new().unwrap();
897        let queue_path = dir.path().join("test-room.queue");
898
899        // First plugin instance — add items
900        {
901            let plugin = QueuePlugin::new(queue_path.clone()).unwrap();
902            let chat_tmp = tempfile::NamedTempFile::new().unwrap();
903            let (clients, _rx, seq) = make_test_clients();
904
905            for desc in ["task a", "task b", "task c"] {
906                let words: Vec<&str> = std::iter::once("add")
907                    .chain(desc.split_whitespace())
908                    .collect();
909                let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
910                plugin.handle(ctx).await.unwrap();
911            }
912        }
913
914        // Second plugin instance — simulates broker restart
915        let plugin2 = QueuePlugin::new(queue_path).unwrap();
916        let queue = plugin2.queue.lock().await;
917        assert_eq!(queue.len(), 3);
918        assert_eq!(queue[0].description, "task a");
919        assert_eq!(queue[1].description, "task b");
920        assert_eq!(queue[2].description, "task c");
921    }
922
923    // ── Plugin trait tests ──────────────────────────────────────────────────
924
925    #[test]
926    fn plugin_name_is_queue() {
927        let dir = TempDir::new().unwrap();
928        let plugin = make_test_plugin(&dir);
929        assert_eq!(plugin.name(), "queue");
930    }
931
932    #[test]
933    fn plugin_registers_queue_command() {
934        let dir = TempDir::new().unwrap();
935        let plugin = make_test_plugin(&dir);
936        let cmds = plugin.commands();
937        assert_eq!(cmds.len(), 1);
938        assert_eq!(cmds[0].name, "queue");
939        assert_eq!(cmds[0].params.len(), 2);
940        assert!(cmds[0].params[0].required);
941        assert!(!cmds[0].params[1].required);
942    }
943}