Skip to main content

room_cli/plugin/
queue.rs

1use std::{
2    io::{BufRead, Write},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10use uuid::Uuid;
11
12use super::{BoxFuture, CommandContext, CommandInfo, ParamSchema, ParamType, Plugin, PluginResult};
13use crate::broker::state::{ClaimEntry, ClaimMap};
14
15/// A single item in the task queue backlog.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct QueueItem {
18    /// Unique identifier for this queue entry.
19    pub id: String,
20    /// Human-readable task description.
21    pub description: String,
22    /// Username of the agent who added this item.
23    pub added_by: String,
24    /// Timestamp when the item was added.
25    pub added_at: DateTime<Utc>,
26    /// Current status — always `"queued"` while in the backlog.
27    pub status: String,
28}
29
30/// Plugin that manages a persistent task backlog.
31///
32/// Agents can add tasks to the queue and self-assign from it using `/queue pop`,
33/// which integrates with the built-in `/claim` system.
34///
35/// Queue state is persisted to an NDJSON file alongside the room's `.chat` file.
36pub struct QueuePlugin {
37    /// In-memory queue items, protected by a mutex for concurrent access.
38    queue: Arc<Mutex<Vec<QueueItem>>>,
39    /// Path to the NDJSON persistence file (`<room-data-dir>/<room-id>.queue`).
40    queue_path: PathBuf,
41    /// Shared reference to the broker's claim map — same Arc the broker uses.
42    claim_map: ClaimMap,
43}
44
45impl QueuePlugin {
46    /// Create a new `QueuePlugin`, loading any existing queue from disk.
47    ///
48    /// # Arguments
49    /// * `queue_path` — path to the `.queue` NDJSON file
50    /// * `claim_map` — the broker's `ClaimMap` Arc (not a data clone)
51    pub(crate) fn new(queue_path: PathBuf, claim_map: ClaimMap) -> anyhow::Result<Self> {
52        let items = load_queue(&queue_path)?;
53        Ok(Self {
54            queue: Arc::new(Mutex::new(items)),
55            queue_path,
56            claim_map,
57        })
58    }
59
60    /// Derive the `.queue` file path from a `.chat` file path.
61    pub fn queue_path_from_chat(chat_path: &Path) -> PathBuf {
62        chat_path.with_extension("queue")
63    }
64
65    /// Returns the command info for the TUI command palette without needing
66    /// an instantiated plugin. Used by `all_known_commands()`.
67    pub fn default_commands() -> Vec<CommandInfo> {
68        vec![CommandInfo {
69            name: "queue".to_owned(),
70            description: "Manage the task backlog".to_owned(),
71            usage: "/queue <add|list|remove|pop> [args]".to_owned(),
72            params: vec![
73                ParamSchema {
74                    name: "action".to_owned(),
75                    param_type: ParamType::Choice(vec![
76                        "add".to_owned(),
77                        "list".to_owned(),
78                        "remove".to_owned(),
79                        "pop".to_owned(),
80                    ]),
81                    required: true,
82                    description: "Action to perform".to_owned(),
83                },
84                ParamSchema {
85                    name: "args".to_owned(),
86                    param_type: ParamType::Text,
87                    required: false,
88                    description: "Task description (add) or index (remove)".to_owned(),
89                },
90            ],
91        }]
92    }
93}
94
95impl Plugin for QueuePlugin {
96    fn name(&self) -> &str {
97        "queue"
98    }
99
100    fn commands(&self) -> Vec<CommandInfo> {
101        Self::default_commands()
102    }
103
104    fn handle(&self, ctx: CommandContext) -> BoxFuture<'_, anyhow::Result<PluginResult>> {
105        Box::pin(async move {
106            let action = ctx.params.first().map(String::as_str).unwrap_or("");
107            let rest: Vec<&str> = ctx.params.iter().skip(1).map(String::as_str).collect();
108
109            match action {
110                "add" => self.handle_add(&ctx.sender, &rest, &ctx).await,
111                "list" => self.handle_list(&ctx).await,
112                "remove" => self.handle_remove(&rest, &ctx).await,
113                "pop" => self.handle_pop(&ctx.sender, &ctx).await,
114                _ => Ok(PluginResult::Reply(format!(
115                    "queue: unknown action '{action}'. use add, list, remove, or pop"
116                ))),
117            }
118        })
119    }
120}
121
122impl QueuePlugin {
123    async fn handle_add(
124        &self,
125        sender: &str,
126        rest: &[&str],
127        ctx: &CommandContext,
128    ) -> anyhow::Result<PluginResult> {
129        if rest.is_empty() {
130            return Ok(PluginResult::Reply(
131                "queue add: missing task description".to_owned(),
132            ));
133        }
134
135        let description = rest.join(" ");
136        let item = QueueItem {
137            id: Uuid::new_v4().to_string(),
138            description: description.clone(),
139            added_by: sender.to_owned(),
140            added_at: Utc::now(),
141            status: "queued".to_owned(),
142        };
143
144        {
145            let mut queue = self.queue.lock().await;
146            queue.push(item.clone());
147        }
148
149        append_item(&self.queue_path, &item)?;
150        ctx.writer
151            .broadcast(&format!(
152                "queue: {sender} added \"{description}\" (#{} in backlog)",
153                self.queue.lock().await.len()
154            ))
155            .await?;
156
157        Ok(PluginResult::Handled)
158    }
159
160    async fn handle_list(&self, _ctx: &CommandContext) -> anyhow::Result<PluginResult> {
161        let queue = self.queue.lock().await;
162
163        if queue.is_empty() {
164            return Ok(PluginResult::Reply("queue: backlog is empty".to_owned()));
165        }
166
167        let mut lines = Vec::with_capacity(queue.len() + 1);
168        lines.push(format!("queue: {} item(s) in backlog", queue.len()));
169        for (i, item) in queue.iter().enumerate() {
170            lines.push(format!(
171                "  {}. {} (added by {} at {})",
172                i + 1,
173                item.description,
174                item.added_by,
175                item.added_at.format("%Y-%m-%d %H:%M UTC")
176            ));
177        }
178
179        Ok(PluginResult::Reply(lines.join("\n")))
180    }
181
182    async fn handle_remove(
183        &self,
184        rest: &[&str],
185        ctx: &CommandContext,
186    ) -> anyhow::Result<PluginResult> {
187        let index_str = rest.first().copied().unwrap_or("");
188        let index: usize = match index_str.parse::<usize>() {
189            Ok(n) if n >= 1 => n,
190            _ => {
191                return Ok(PluginResult::Reply(
192                    "queue remove: provide a valid 1-based index".to_owned(),
193                ));
194            }
195        };
196
197        let removed = {
198            let mut queue = self.queue.lock().await;
199            if index > queue.len() {
200                return Ok(PluginResult::Reply(format!(
201                    "queue remove: index {index} out of range (queue has {} item(s))",
202                    queue.len()
203                )));
204            }
205            queue.remove(index - 1)
206        };
207
208        self.rewrite_queue().await?;
209        ctx.writer
210            .broadcast(&format!(
211                "queue: removed \"{}\" (was #{})",
212                removed.description, index
213            ))
214            .await?;
215
216        Ok(PluginResult::Handled)
217    }
218
219    async fn handle_pop(&self, sender: &str, ctx: &CommandContext) -> anyhow::Result<PluginResult> {
220        let popped = {
221            let mut queue = self.queue.lock().await;
222            if queue.is_empty() {
223                return Ok(PluginResult::Reply(
224                    "queue pop: backlog is empty, nothing to claim".to_owned(),
225                ));
226            }
227            queue.remove(0)
228        };
229
230        self.rewrite_queue().await?;
231
232        // Integrate with /claim — set the claim on the invoker
233        {
234            let mut claims = self.claim_map.lock().await;
235            claims.insert(
236                sender.to_owned(),
237                ClaimEntry {
238                    task: popped.description.clone(),
239                    claimed_at: std::time::Instant::now(),
240                },
241            );
242        }
243
244        ctx.writer
245            .broadcast(&format!(
246                "{sender} claimed from queue: \"{}\"",
247                popped.description
248            ))
249            .await?;
250
251        Ok(PluginResult::Handled)
252    }
253
254    /// Rewrite the entire queue file from the in-memory state.
255    async fn rewrite_queue(&self) -> anyhow::Result<()> {
256        let queue = self.queue.lock().await;
257        rewrite_queue_file(&self.queue_path, &queue)
258    }
259}
260
261// ── Persistence helpers ─────────────────────────────────────────────────────
262
263/// Load queue items from an NDJSON file. Returns an empty vec if the file
264/// does not exist.
265fn load_queue(path: &Path) -> anyhow::Result<Vec<QueueItem>> {
266    if !path.exists() {
267        return Ok(Vec::new());
268    }
269
270    let file = std::fs::File::open(path)?;
271    let reader = std::io::BufReader::new(file);
272    let mut items = Vec::new();
273
274    for line in reader.lines() {
275        let line = line?;
276        let trimmed = line.trim();
277        if trimmed.is_empty() {
278            continue;
279        }
280        match serde_json::from_str::<QueueItem>(trimmed) {
281            Ok(item) => items.push(item),
282            Err(e) => {
283                eprintln!("queue: skipping malformed line in {}: {e}", path.display());
284            }
285        }
286    }
287
288    Ok(items)
289}
290
291/// Append a single item to the NDJSON queue file.
292fn append_item(path: &Path, item: &QueueItem) -> anyhow::Result<()> {
293    let mut file = std::fs::OpenOptions::new()
294        .create(true)
295        .append(true)
296        .open(path)?;
297    let line = serde_json::to_string(item)?;
298    writeln!(file, "{line}")?;
299    Ok(())
300}
301
302/// Rewrite the entire queue file from a slice of items.
303fn rewrite_queue_file(path: &Path, items: &[QueueItem]) -> anyhow::Result<()> {
304    let mut file = std::fs::File::create(path)?;
305    for item in items {
306        let line = serde_json::to_string(item)?;
307        writeln!(file, "{line}")?;
308    }
309    Ok(())
310}
311
312// ── Tests ───────────────────────────────────────────────────────────────────
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use crate::plugin::{ChatWriter, HistoryReader, RoomMetadata, UserInfo};
318    use std::collections::HashMap;
319    use std::sync::atomic::AtomicU64;
320    use tempfile::TempDir;
321    use tokio::sync::broadcast;
322
323    /// Helper: create a QueuePlugin with a temp directory and empty claim map.
324    fn make_test_plugin(dir: &TempDir) -> QueuePlugin {
325        let queue_path = dir.path().join("test-room.queue");
326        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
327        QueuePlugin::new(queue_path, claim_map).unwrap()
328    }
329
330    /// Helper: create a CommandContext wired to a test plugin.
331    fn make_test_ctx(
332        command: &str,
333        params: Vec<&str>,
334        sender: &str,
335        chat_path: &Path,
336        clients: &Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
337        seq: &Arc<AtomicU64>,
338    ) -> CommandContext {
339        let chat_arc = Arc::new(chat_path.to_path_buf());
340        let room_id = Arc::new("test-room".to_owned());
341
342        CommandContext {
343            command: command.to_owned(),
344            params: params.into_iter().map(|s| s.to_owned()).collect(),
345            sender: sender.to_owned(),
346            room_id: "test-room".to_owned(),
347            message_id: Uuid::new_v4().to_string(),
348            timestamp: Utc::now(),
349            history: HistoryReader::new(chat_path, sender),
350            writer: ChatWriter::new(clients, &chat_arc, &room_id, seq, "queue"),
351            metadata: RoomMetadata {
352                online_users: vec![UserInfo {
353                    username: sender.to_owned(),
354                    status: String::new(),
355                }],
356                host: None,
357                message_count: 0,
358            },
359            available_commands: vec![],
360        }
361    }
362
363    /// Helper: set up broadcast channel and client map for tests.
364    fn make_test_clients() -> (
365        Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
366        broadcast::Receiver<String>,
367        Arc<AtomicU64>,
368    ) {
369        let (tx, rx) = broadcast::channel::<String>(64);
370        let mut map = HashMap::new();
371        map.insert(1u64, ("alice".to_owned(), tx));
372        let clients = Arc::new(Mutex::new(map));
373        let seq = Arc::new(AtomicU64::new(0));
374        (clients, rx, seq)
375    }
376
377    // ── load/save persistence tests ─────────────────────────────────────────
378
379    #[test]
380    fn load_queue_nonexistent_file_returns_empty() {
381        let items = load_queue(Path::new("/nonexistent/path.queue")).unwrap();
382        assert!(items.is_empty());
383    }
384
385    #[test]
386    fn append_and_load_round_trips() {
387        let dir = TempDir::new().unwrap();
388        let path = dir.path().join("test.queue");
389
390        let item = QueueItem {
391            id: "id-1".to_owned(),
392            description: "fix the bug".to_owned(),
393            added_by: "alice".to_owned(),
394            added_at: Utc::now(),
395            status: "queued".to_owned(),
396        };
397
398        append_item(&path, &item).unwrap();
399        let loaded = load_queue(&path).unwrap();
400        assert_eq!(loaded.len(), 1);
401        assert_eq!(loaded[0].id, "id-1");
402        assert_eq!(loaded[0].description, "fix the bug");
403        assert_eq!(loaded[0].added_by, "alice");
404        assert_eq!(loaded[0].status, "queued");
405    }
406
407    #[test]
408    fn rewrite_replaces_file_contents() {
409        let dir = TempDir::new().unwrap();
410        let path = dir.path().join("test.queue");
411
412        // Write 3 items
413        let items: Vec<QueueItem> = (0..3)
414            .map(|i| QueueItem {
415                id: format!("id-{i}"),
416                description: format!("task {i}"),
417                added_by: "bob".to_owned(),
418                added_at: Utc::now(),
419                status: "queued".to_owned(),
420            })
421            .collect();
422
423        for item in &items {
424            append_item(&path, item).unwrap();
425        }
426        assert_eq!(load_queue(&path).unwrap().len(), 3);
427
428        // Rewrite with only 1 item
429        rewrite_queue_file(&path, &items[1..2]).unwrap();
430        let reloaded = load_queue(&path).unwrap();
431        assert_eq!(reloaded.len(), 1);
432        assert_eq!(reloaded[0].id, "id-1");
433    }
434
435    #[test]
436    fn load_skips_malformed_lines() {
437        let dir = TempDir::new().unwrap();
438        let path = dir.path().join("test.queue");
439
440        let good = QueueItem {
441            id: "good".to_owned(),
442            description: "valid".to_owned(),
443            added_by: "alice".to_owned(),
444            added_at: Utc::now(),
445            status: "queued".to_owned(),
446        };
447
448        let mut file = std::fs::File::create(&path).unwrap();
449        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
450        writeln!(file, "not valid json").unwrap();
451        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
452        writeln!(file).unwrap(); // blank line
453
454        let loaded = load_queue(&path).unwrap();
455        assert_eq!(loaded.len(), 2);
456    }
457
458    #[test]
459    fn queue_path_from_chat_replaces_extension() {
460        let chat = PathBuf::from("/data/room-dev.chat");
461        let queue = QueuePlugin::queue_path_from_chat(&chat);
462        assert_eq!(queue, PathBuf::from("/data/room-dev.queue"));
463    }
464
465    // ── plugin construction tests ───────────────────────────────────────────
466
467    #[test]
468    fn new_plugin_loads_existing_queue() {
469        let dir = TempDir::new().unwrap();
470        let path = dir.path().join("test.queue");
471
472        let item = QueueItem {
473            id: "pre-existing".to_owned(),
474            description: "already there".to_owned(),
475            added_by: "ba".to_owned(),
476            added_at: Utc::now(),
477            status: "queued".to_owned(),
478        };
479        append_item(&path, &item).unwrap();
480
481        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
482        let plugin = QueuePlugin::new(path, claim_map).unwrap();
483
484        let rt = tokio::runtime::Runtime::new().unwrap();
485        rt.block_on(async {
486            let queue = plugin.queue.lock().await;
487            assert_eq!(queue.len(), 1);
488            assert_eq!(queue[0].description, "already there");
489        });
490    }
491
492    // ── handle tests ────────────────────────────────────────────────────────
493
494    #[tokio::test]
495    async fn add_appends_to_queue_and_persists() {
496        let dir = TempDir::new().unwrap();
497        let plugin = make_test_plugin(&dir);
498        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
499        let (clients, _rx, seq) = make_test_clients();
500        let ctx = make_test_ctx(
501            "queue",
502            vec!["add", "fix", "the", "bug"],
503            "alice",
504            chat_tmp.path(),
505            &clients,
506            &seq,
507        );
508
509        let result = plugin.handle(ctx).await.unwrap();
510        assert!(matches!(result, PluginResult::Handled));
511
512        // In-memory
513        let queue = plugin.queue.lock().await;
514        assert_eq!(queue.len(), 1);
515        assert_eq!(queue[0].description, "fix the bug");
516        assert_eq!(queue[0].added_by, "alice");
517        drop(queue);
518
519        // On disk
520        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
521        assert_eq!(persisted.len(), 1);
522        assert_eq!(persisted[0].description, "fix the bug");
523    }
524
525    #[tokio::test]
526    async fn add_without_description_returns_error() {
527        let dir = TempDir::new().unwrap();
528        let plugin = make_test_plugin(&dir);
529        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
530        let (clients, _rx, seq) = make_test_clients();
531        let ctx = make_test_ctx(
532            "queue",
533            vec!["add"],
534            "alice",
535            chat_tmp.path(),
536            &clients,
537            &seq,
538        );
539
540        let result = plugin.handle(ctx).await.unwrap();
541        match result {
542            PluginResult::Reply(msg) => assert!(msg.contains("missing task description")),
543            _ => panic!("expected Reply"),
544        }
545    }
546
547    #[tokio::test]
548    async fn list_empty_queue() {
549        let dir = TempDir::new().unwrap();
550        let plugin = make_test_plugin(&dir);
551        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
552        let (clients, _rx, seq) = make_test_clients();
553        let ctx = make_test_ctx(
554            "queue",
555            vec!["list"],
556            "alice",
557            chat_tmp.path(),
558            &clients,
559            &seq,
560        );
561
562        let result = plugin.handle(ctx).await.unwrap();
563        match result {
564            PluginResult::Reply(msg) => assert!(msg.contains("empty")),
565            _ => panic!("expected Reply"),
566        }
567    }
568
569    #[tokio::test]
570    async fn list_shows_indexed_items() {
571        let dir = TempDir::new().unwrap();
572        let plugin = make_test_plugin(&dir);
573        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
574        let (clients, _rx, seq) = make_test_clients();
575
576        // Add two items
577        for desc in ["first task", "second task"] {
578            let words: Vec<&str> = std::iter::once("add")
579                .chain(desc.split_whitespace())
580                .collect();
581            let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
582            plugin.handle(ctx).await.unwrap();
583        }
584
585        let ctx = make_test_ctx(
586            "queue",
587            vec!["list"],
588            "alice",
589            chat_tmp.path(),
590            &clients,
591            &seq,
592        );
593        let result = plugin.handle(ctx).await.unwrap();
594        match result {
595            PluginResult::Reply(msg) => {
596                assert!(msg.contains("2 item(s)"));
597                assert!(msg.contains("1. first task"));
598                assert!(msg.contains("2. second task"));
599            }
600            _ => panic!("expected Reply"),
601        }
602    }
603
604    #[tokio::test]
605    async fn remove_by_index() {
606        let dir = TempDir::new().unwrap();
607        let plugin = make_test_plugin(&dir);
608        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
609        let (clients, _rx, seq) = make_test_clients();
610
611        // Add two items
612        for desc in ["first", "second"] {
613            let ctx = make_test_ctx(
614                "queue",
615                vec!["add", desc],
616                "alice",
617                chat_tmp.path(),
618                &clients,
619                &seq,
620            );
621            plugin.handle(ctx).await.unwrap();
622        }
623
624        // Remove item 1
625        let ctx = make_test_ctx(
626            "queue",
627            vec!["remove", "1"],
628            "alice",
629            chat_tmp.path(),
630            &clients,
631            &seq,
632        );
633        let result = plugin.handle(ctx).await.unwrap();
634        assert!(matches!(result, PluginResult::Handled));
635
636        // Verify only "second" remains
637        let queue = plugin.queue.lock().await;
638        assert_eq!(queue.len(), 1);
639        assert_eq!(queue[0].description, "second");
640        drop(queue);
641
642        // Verify persistence
643        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
644        assert_eq!(persisted.len(), 1);
645        assert_eq!(persisted[0].description, "second");
646    }
647
648    #[tokio::test]
649    async fn remove_out_of_range() {
650        let dir = TempDir::new().unwrap();
651        let plugin = make_test_plugin(&dir);
652        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
653        let (clients, _rx, seq) = make_test_clients();
654
655        let ctx = make_test_ctx(
656            "queue",
657            vec!["remove", "5"],
658            "alice",
659            chat_tmp.path(),
660            &clients,
661            &seq,
662        );
663        let result = plugin.handle(ctx).await.unwrap();
664        match result {
665            PluginResult::Reply(msg) => assert!(msg.contains("out of range")),
666            _ => panic!("expected Reply"),
667        }
668    }
669
670    #[tokio::test]
671    async fn remove_invalid_index() {
672        let dir = TempDir::new().unwrap();
673        let plugin = make_test_plugin(&dir);
674        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
675        let (clients, _rx, seq) = make_test_clients();
676
677        let ctx = make_test_ctx(
678            "queue",
679            vec!["remove", "abc"],
680            "alice",
681            chat_tmp.path(),
682            &clients,
683            &seq,
684        );
685        let result = plugin.handle(ctx).await.unwrap();
686        match result {
687            PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
688            _ => panic!("expected Reply"),
689        }
690    }
691
692    #[tokio::test]
693    async fn remove_zero_index() {
694        let dir = TempDir::new().unwrap();
695        let plugin = make_test_plugin(&dir);
696        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
697        let (clients, _rx, seq) = make_test_clients();
698
699        let ctx = make_test_ctx(
700            "queue",
701            vec!["remove", "0"],
702            "alice",
703            chat_tmp.path(),
704            &clients,
705            &seq,
706        );
707        let result = plugin.handle(ctx).await.unwrap();
708        match result {
709            PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
710            _ => panic!("expected Reply"),
711        }
712    }
713
714    #[tokio::test]
715    async fn pop_claims_first_item() {
716        let dir = TempDir::new().unwrap();
717        let queue_path = dir.path().join("test-room.queue");
718        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
719        let plugin = QueuePlugin::new(queue_path, claim_map.clone()).unwrap();
720
721        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
722        let (clients, _rx, seq) = make_test_clients();
723
724        // Add two items
725        for desc in ["urgent fix", "nice to have"] {
726            let words: Vec<&str> = std::iter::once("add")
727                .chain(desc.split_whitespace())
728                .collect();
729            let ctx = make_test_ctx("queue", words, "ba", chat_tmp.path(), &clients, &seq);
730            plugin.handle(ctx).await.unwrap();
731        }
732
733        // Pop as alice
734        let ctx = make_test_ctx(
735            "queue",
736            vec!["pop"],
737            "alice",
738            chat_tmp.path(),
739            &clients,
740            &seq,
741        );
742        let result = plugin.handle(ctx).await.unwrap();
743        assert!(matches!(result, PluginResult::Handled));
744
745        // Verify claim was set
746        let claims = claim_map.lock().await;
747        assert_eq!(claims.get("alice").unwrap().task, "urgent fix");
748        drop(claims);
749
750        // Verify queue has only 1 item left
751        let queue = plugin.queue.lock().await;
752        assert_eq!(queue.len(), 1);
753        assert_eq!(queue[0].description, "nice to have");
754        drop(queue);
755
756        // Verify persistence
757        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
758        assert_eq!(persisted.len(), 1);
759    }
760
761    #[tokio::test]
762    async fn pop_empty_queue_returns_error() {
763        let dir = TempDir::new().unwrap();
764        let plugin = make_test_plugin(&dir);
765        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
766        let (clients, _rx, seq) = make_test_clients();
767
768        let ctx = make_test_ctx(
769            "queue",
770            vec!["pop"],
771            "alice",
772            chat_tmp.path(),
773            &clients,
774            &seq,
775        );
776        let result = plugin.handle(ctx).await.unwrap();
777        match result {
778            PluginResult::Reply(msg) => assert!(msg.contains("empty")),
779            _ => panic!("expected Reply"),
780        }
781    }
782
783    #[tokio::test]
784    async fn pop_replaces_existing_claim() {
785        let dir = TempDir::new().unwrap();
786        let queue_path = dir.path().join("test-room.queue");
787        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
788
789        // Pre-existing claim
790        {
791            let mut claims = claim_map.lock().await;
792            claims.insert(
793                "alice".to_owned(),
794                ClaimEntry {
795                    task: "old task".to_owned(),
796                    claimed_at: std::time::Instant::now(),
797                },
798            );
799        }
800
801        let plugin = QueuePlugin::new(queue_path, claim_map.clone()).unwrap();
802        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
803        let (clients, _rx, seq) = make_test_clients();
804
805        // Add an item and pop it
806        let ctx = make_test_ctx(
807            "queue",
808            vec!["add", "new", "task"],
809            "ba",
810            chat_tmp.path(),
811            &clients,
812            &seq,
813        );
814        plugin.handle(ctx).await.unwrap();
815
816        let ctx = make_test_ctx(
817            "queue",
818            vec!["pop"],
819            "alice",
820            chat_tmp.path(),
821            &clients,
822            &seq,
823        );
824        plugin.handle(ctx).await.unwrap();
825
826        // Verify claim was replaced
827        let claims = claim_map.lock().await;
828        assert_eq!(claims.get("alice").unwrap().task, "new task");
829    }
830
831    #[tokio::test]
832    async fn unknown_action_returns_error() {
833        let dir = TempDir::new().unwrap();
834        let plugin = make_test_plugin(&dir);
835        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
836        let (clients, _rx, seq) = make_test_clients();
837
838        let ctx = make_test_ctx(
839            "queue",
840            vec!["foobar"],
841            "alice",
842            chat_tmp.path(),
843            &clients,
844            &seq,
845        );
846        let result = plugin.handle(ctx).await.unwrap();
847        match result {
848            PluginResult::Reply(msg) => assert!(msg.contains("unknown action")),
849            _ => panic!("expected Reply"),
850        }
851    }
852
853    #[tokio::test]
854    async fn queue_survives_reload() {
855        let dir = TempDir::new().unwrap();
856        let queue_path = dir.path().join("test-room.queue");
857        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
858
859        // First plugin instance — add items
860        {
861            let plugin = QueuePlugin::new(queue_path.clone(), claim_map.clone()).unwrap();
862            let chat_tmp = tempfile::NamedTempFile::new().unwrap();
863            let (clients, _rx, seq) = make_test_clients();
864
865            for desc in ["task a", "task b", "task c"] {
866                let words: Vec<&str> = std::iter::once("add")
867                    .chain(desc.split_whitespace())
868                    .collect();
869                let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
870                plugin.handle(ctx).await.unwrap();
871            }
872        }
873
874        // Second plugin instance — simulates broker restart
875        let plugin2 = QueuePlugin::new(queue_path, claim_map).unwrap();
876        let queue = plugin2.queue.lock().await;
877        assert_eq!(queue.len(), 3);
878        assert_eq!(queue[0].description, "task a");
879        assert_eq!(queue[1].description, "task b");
880        assert_eq!(queue[2].description, "task c");
881    }
882
883    // ── Plugin trait tests ──────────────────────────────────────────────────
884
885    #[test]
886    fn plugin_name_is_queue() {
887        let dir = TempDir::new().unwrap();
888        let plugin = make_test_plugin(&dir);
889        assert_eq!(plugin.name(), "queue");
890    }
891
892    #[test]
893    fn plugin_registers_queue_command() {
894        let dir = TempDir::new().unwrap();
895        let plugin = make_test_plugin(&dir);
896        let cmds = plugin.commands();
897        assert_eq!(cmds.len(), 1);
898        assert_eq!(cmds[0].name, "queue");
899        assert_eq!(cmds[0].params.len(), 2);
900        assert!(cmds[0].params[0].required);
901        assert!(!cmds[0].params[1].required);
902    }
903}