Skip to main content

room_cli/plugin/
queue.rs

1use std::{
2    io::{BufRead, Write},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use tokio::sync::Mutex;
10use uuid::Uuid;
11
12use super::{BoxFuture, CommandContext, CommandInfo, ParamSchema, ParamType, Plugin, PluginResult};
13use crate::broker::state::{ClaimEntry, ClaimMap};
14
15/// A single item in the task queue backlog.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct QueueItem {
18    /// Unique identifier for this queue entry.
19    pub id: String,
20    /// Human-readable task description.
21    pub description: String,
22    /// Username of the agent who added this item.
23    pub added_by: String,
24    /// Timestamp when the item was added.
25    pub added_at: DateTime<Utc>,
26    /// Current status — always `"queued"` while in the backlog.
27    pub status: String,
28}
29
30/// Plugin that manages a persistent task backlog.
31///
32/// Agents can add tasks to the queue and self-assign from it using `/queue pop`,
33/// which integrates with the built-in `/claim` system.
34///
35/// Queue state is persisted to an NDJSON file alongside the room's `.chat` file.
36pub struct QueuePlugin {
37    /// In-memory queue items, protected by a mutex for concurrent access.
38    queue: Arc<Mutex<Vec<QueueItem>>>,
39    /// Path to the NDJSON persistence file (`<room-data-dir>/<room-id>.queue`).
40    queue_path: PathBuf,
41    /// Shared reference to the broker's claim map — same Arc the broker uses.
42    claim_map: ClaimMap,
43}
44
45impl QueuePlugin {
46    /// Create a new `QueuePlugin`, loading any existing queue from disk.
47    ///
48    /// # Arguments
49    /// * `queue_path` — path to the `.queue` NDJSON file
50    /// * `claim_map` — the broker's `ClaimMap` Arc (not a data clone)
51    pub(crate) fn new(queue_path: PathBuf, claim_map: ClaimMap) -> anyhow::Result<Self> {
52        let items = load_queue(&queue_path)?;
53        Ok(Self {
54            queue: Arc::new(Mutex::new(items)),
55            queue_path,
56            claim_map,
57        })
58    }
59
60    /// Derive the `.queue` file path from a `.chat` file path.
61    pub fn queue_path_from_chat(chat_path: &Path) -> PathBuf {
62        chat_path.with_extension("queue")
63    }
64
65    /// Returns the command info for the TUI command palette without needing
66    /// an instantiated plugin. Used by `all_known_commands()`.
67    pub fn default_commands() -> Vec<CommandInfo> {
68        vec![CommandInfo {
69            name: "queue".to_owned(),
70            description: "Manage the task backlog".to_owned(),
71            usage: "/queue <add|list|remove|pop> [args]".to_owned(),
72            params: vec![
73                ParamSchema {
74                    name: "action".to_owned(),
75                    param_type: ParamType::Choice(vec![
76                        "add".to_owned(),
77                        "list".to_owned(),
78                        "remove".to_owned(),
79                        "pop".to_owned(),
80                    ]),
81                    required: true,
82                    description: "Action to perform".to_owned(),
83                },
84                ParamSchema {
85                    name: "args".to_owned(),
86                    param_type: ParamType::Text,
87                    required: false,
88                    description: "Task description (add) or index (remove)".to_owned(),
89                },
90            ],
91        }]
92    }
93}
94
95impl Plugin for QueuePlugin {
96    fn name(&self) -> &str {
97        "queue"
98    }
99
100    fn commands(&self) -> Vec<CommandInfo> {
101        Self::default_commands()
102    }
103
104    fn handle(&self, ctx: CommandContext) -> BoxFuture<'_, anyhow::Result<PluginResult>> {
105        Box::pin(async move {
106            let action = ctx.params.first().map(String::as_str).unwrap_or("");
107            let rest: Vec<&str> = ctx.params.iter().skip(1).map(String::as_str).collect();
108
109            match action {
110                "add" => self.handle_add(&ctx.sender, &rest).await,
111                "list" => self.handle_list(&ctx).await,
112                "remove" => self.handle_remove(&rest).await,
113                "pop" => self.handle_pop(&ctx.sender).await,
114                _ => Ok(PluginResult::Reply(format!(
115                    "queue: unknown action '{action}'. use add, list, remove, or pop"
116                ))),
117            }
118        })
119    }
120}
121
122impl QueuePlugin {
123    async fn handle_add(&self, sender: &str, rest: &[&str]) -> anyhow::Result<PluginResult> {
124        if rest.is_empty() {
125            return Ok(PluginResult::Reply(
126                "queue add: missing task description".to_owned(),
127            ));
128        }
129
130        let description = rest.join(" ");
131        let item = QueueItem {
132            id: Uuid::new_v4().to_string(),
133            description: description.clone(),
134            added_by: sender.to_owned(),
135            added_at: Utc::now(),
136            status: "queued".to_owned(),
137        };
138
139        {
140            let mut queue = self.queue.lock().await;
141            queue.push(item.clone());
142        }
143
144        append_item(&self.queue_path, &item)?;
145        let count = self.queue.lock().await.len();
146
147        Ok(PluginResult::Broadcast(format!(
148            "queue: {sender} added \"{description}\" (#{count} in backlog)"
149        )))
150    }
151
152    async fn handle_list(&self, _ctx: &CommandContext) -> anyhow::Result<PluginResult> {
153        let queue = self.queue.lock().await;
154
155        if queue.is_empty() {
156            return Ok(PluginResult::Reply("queue: backlog is empty".to_owned()));
157        }
158
159        let mut lines = Vec::with_capacity(queue.len() + 1);
160        lines.push(format!("queue: {} item(s) in backlog", queue.len()));
161        for (i, item) in queue.iter().enumerate() {
162            lines.push(format!(
163                "  {}. {} (added by {} at {})",
164                i + 1,
165                item.description,
166                item.added_by,
167                item.added_at.format("%Y-%m-%d %H:%M UTC")
168            ));
169        }
170
171        Ok(PluginResult::Reply(lines.join("\n")))
172    }
173
174    async fn handle_remove(&self, rest: &[&str]) -> anyhow::Result<PluginResult> {
175        let index_str = rest.first().copied().unwrap_or("");
176        let index: usize = match index_str.parse::<usize>() {
177            Ok(n) if n >= 1 => n,
178            _ => {
179                return Ok(PluginResult::Reply(
180                    "queue remove: provide a valid 1-based index".to_owned(),
181                ));
182            }
183        };
184
185        let removed = {
186            let mut queue = self.queue.lock().await;
187            if index > queue.len() {
188                return Ok(PluginResult::Reply(format!(
189                    "queue remove: index {index} out of range (queue has {} item(s))",
190                    queue.len()
191                )));
192            }
193            queue.remove(index - 1)
194        };
195
196        self.rewrite_queue().await?;
197
198        Ok(PluginResult::Broadcast(format!(
199            "queue: removed \"{}\" (was #{index})",
200            removed.description
201        )))
202    }
203
204    async fn handle_pop(&self, sender: &str) -> anyhow::Result<PluginResult> {
205        let popped = {
206            let mut queue = self.queue.lock().await;
207            if queue.is_empty() {
208                return Ok(PluginResult::Reply(
209                    "queue pop: backlog is empty, nothing to claim".to_owned(),
210                ));
211            }
212            queue.remove(0)
213        };
214
215        self.rewrite_queue().await?;
216
217        // Integrate with /claim — set the claim on the invoker
218        {
219            let mut claims = self.claim_map.lock().await;
220            claims.insert(
221                sender.to_owned(),
222                ClaimEntry {
223                    task: popped.description.clone(),
224                    claimed_at: std::time::Instant::now(),
225                },
226            );
227        }
228
229        Ok(PluginResult::Broadcast(format!(
230            "{sender} claimed from queue: \"{}\"",
231            popped.description
232        )))
233    }
234
235    /// Rewrite the entire queue file from the in-memory state.
236    async fn rewrite_queue(&self) -> anyhow::Result<()> {
237        let queue = self.queue.lock().await;
238        rewrite_queue_file(&self.queue_path, &queue)
239    }
240}
241
242// ── Persistence helpers ─────────────────────────────────────────────────────
243
244/// Load queue items from an NDJSON file. Returns an empty vec if the file
245/// does not exist.
246fn load_queue(path: &Path) -> anyhow::Result<Vec<QueueItem>> {
247    if !path.exists() {
248        return Ok(Vec::new());
249    }
250
251    let file = std::fs::File::open(path)?;
252    let reader = std::io::BufReader::new(file);
253    let mut items = Vec::new();
254
255    for line in reader.lines() {
256        let line = line?;
257        let trimmed = line.trim();
258        if trimmed.is_empty() {
259            continue;
260        }
261        match serde_json::from_str::<QueueItem>(trimmed) {
262            Ok(item) => items.push(item),
263            Err(e) => {
264                eprintln!("queue: skipping malformed line in {}: {e}", path.display());
265            }
266        }
267    }
268
269    Ok(items)
270}
271
272/// Append a single item to the NDJSON queue file.
273fn append_item(path: &Path, item: &QueueItem) -> anyhow::Result<()> {
274    let mut file = std::fs::OpenOptions::new()
275        .create(true)
276        .append(true)
277        .open(path)?;
278    let line = serde_json::to_string(item)?;
279    writeln!(file, "{line}")?;
280    Ok(())
281}
282
283/// Rewrite the entire queue file from a slice of items.
284fn rewrite_queue_file(path: &Path, items: &[QueueItem]) -> anyhow::Result<()> {
285    let mut file = std::fs::File::create(path)?;
286    for item in items {
287        let line = serde_json::to_string(item)?;
288        writeln!(file, "{line}")?;
289    }
290    Ok(())
291}
292
293// ── Tests ───────────────────────────────────────────────────────────────────
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::plugin::{ChatWriter, HistoryReader, RoomMetadata, UserInfo};
299    use std::collections::HashMap;
300    use std::sync::atomic::AtomicU64;
301    use tempfile::TempDir;
302    use tokio::sync::broadcast;
303
304    /// Helper: create a QueuePlugin with a temp directory and empty claim map.
305    fn make_test_plugin(dir: &TempDir) -> QueuePlugin {
306        let queue_path = dir.path().join("test-room.queue");
307        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
308        QueuePlugin::new(queue_path, claim_map).unwrap()
309    }
310
311    /// Helper: create a CommandContext wired to a test plugin.
312    fn make_test_ctx(
313        command: &str,
314        params: Vec<&str>,
315        sender: &str,
316        chat_path: &Path,
317        clients: &Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
318        seq: &Arc<AtomicU64>,
319    ) -> CommandContext {
320        let chat_arc = Arc::new(chat_path.to_path_buf());
321        let room_id = Arc::new("test-room".to_owned());
322
323        CommandContext {
324            command: command.to_owned(),
325            params: params.into_iter().map(|s| s.to_owned()).collect(),
326            sender: sender.to_owned(),
327            room_id: "test-room".to_owned(),
328            message_id: Uuid::new_v4().to_string(),
329            timestamp: Utc::now(),
330            history: HistoryReader::new(chat_path, sender),
331            writer: ChatWriter::new(clients, &chat_arc, &room_id, seq, "queue"),
332            metadata: RoomMetadata {
333                online_users: vec![UserInfo {
334                    username: sender.to_owned(),
335                    status: String::new(),
336                }],
337                host: None,
338                message_count: 0,
339            },
340            available_commands: vec![],
341        }
342    }
343
344    /// Helper: set up broadcast channel and client map for tests.
345    fn make_test_clients() -> (
346        Arc<Mutex<HashMap<u64, (String, broadcast::Sender<String>)>>>,
347        broadcast::Receiver<String>,
348        Arc<AtomicU64>,
349    ) {
350        let (tx, rx) = broadcast::channel::<String>(64);
351        let mut map = HashMap::new();
352        map.insert(1u64, ("alice".to_owned(), tx));
353        let clients = Arc::new(Mutex::new(map));
354        let seq = Arc::new(AtomicU64::new(0));
355        (clients, rx, seq)
356    }
357
358    // ── load/save persistence tests ─────────────────────────────────────────
359
360    #[test]
361    fn load_queue_nonexistent_file_returns_empty() {
362        let items = load_queue(Path::new("/nonexistent/path.queue")).unwrap();
363        assert!(items.is_empty());
364    }
365
366    #[test]
367    fn append_and_load_round_trips() {
368        let dir = TempDir::new().unwrap();
369        let path = dir.path().join("test.queue");
370
371        let item = QueueItem {
372            id: "id-1".to_owned(),
373            description: "fix the bug".to_owned(),
374            added_by: "alice".to_owned(),
375            added_at: Utc::now(),
376            status: "queued".to_owned(),
377        };
378
379        append_item(&path, &item).unwrap();
380        let loaded = load_queue(&path).unwrap();
381        assert_eq!(loaded.len(), 1);
382        assert_eq!(loaded[0].id, "id-1");
383        assert_eq!(loaded[0].description, "fix the bug");
384        assert_eq!(loaded[0].added_by, "alice");
385        assert_eq!(loaded[0].status, "queued");
386    }
387
388    #[test]
389    fn rewrite_replaces_file_contents() {
390        let dir = TempDir::new().unwrap();
391        let path = dir.path().join("test.queue");
392
393        // Write 3 items
394        let items: Vec<QueueItem> = (0..3)
395            .map(|i| QueueItem {
396                id: format!("id-{i}"),
397                description: format!("task {i}"),
398                added_by: "bob".to_owned(),
399                added_at: Utc::now(),
400                status: "queued".to_owned(),
401            })
402            .collect();
403
404        for item in &items {
405            append_item(&path, item).unwrap();
406        }
407        assert_eq!(load_queue(&path).unwrap().len(), 3);
408
409        // Rewrite with only 1 item
410        rewrite_queue_file(&path, &items[1..2]).unwrap();
411        let reloaded = load_queue(&path).unwrap();
412        assert_eq!(reloaded.len(), 1);
413        assert_eq!(reloaded[0].id, "id-1");
414    }
415
416    #[test]
417    fn load_skips_malformed_lines() {
418        let dir = TempDir::new().unwrap();
419        let path = dir.path().join("test.queue");
420
421        let good = QueueItem {
422            id: "good".to_owned(),
423            description: "valid".to_owned(),
424            added_by: "alice".to_owned(),
425            added_at: Utc::now(),
426            status: "queued".to_owned(),
427        };
428
429        let mut file = std::fs::File::create(&path).unwrap();
430        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
431        writeln!(file, "not valid json").unwrap();
432        writeln!(file, "{}", serde_json::to_string(&good).unwrap()).unwrap();
433        writeln!(file).unwrap(); // blank line
434
435        let loaded = load_queue(&path).unwrap();
436        assert_eq!(loaded.len(), 2);
437    }
438
439    #[test]
440    fn queue_path_from_chat_replaces_extension() {
441        let chat = PathBuf::from("/data/room-dev.chat");
442        let queue = QueuePlugin::queue_path_from_chat(&chat);
443        assert_eq!(queue, PathBuf::from("/data/room-dev.queue"));
444    }
445
446    // ── plugin construction tests ───────────────────────────────────────────
447
448    #[test]
449    fn new_plugin_loads_existing_queue() {
450        let dir = TempDir::new().unwrap();
451        let path = dir.path().join("test.queue");
452
453        let item = QueueItem {
454            id: "pre-existing".to_owned(),
455            description: "already there".to_owned(),
456            added_by: "ba".to_owned(),
457            added_at: Utc::now(),
458            status: "queued".to_owned(),
459        };
460        append_item(&path, &item).unwrap();
461
462        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
463        let plugin = QueuePlugin::new(path, claim_map).unwrap();
464
465        let rt = tokio::runtime::Runtime::new().unwrap();
466        rt.block_on(async {
467            let queue = plugin.queue.lock().await;
468            assert_eq!(queue.len(), 1);
469            assert_eq!(queue[0].description, "already there");
470        });
471    }
472
473    // ── handle tests ────────────────────────────────────────────────────────
474
475    #[tokio::test]
476    async fn add_appends_to_queue_and_persists() {
477        let dir = TempDir::new().unwrap();
478        let plugin = make_test_plugin(&dir);
479        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
480        let (clients, _rx, seq) = make_test_clients();
481        let ctx = make_test_ctx(
482            "queue",
483            vec!["add", "fix", "the", "bug"],
484            "alice",
485            chat_tmp.path(),
486            &clients,
487            &seq,
488        );
489
490        let result = plugin.handle(ctx).await.unwrap();
491        match &result {
492            PluginResult::Broadcast(msg) => {
493                assert!(msg.contains("fix the bug"), "broadcast should mention task");
494                assert!(msg.contains("alice"), "broadcast should mention sender");
495                assert!(msg.contains("#1"), "broadcast should show queue position");
496            }
497            _ => panic!("expected Broadcast"),
498        }
499
500        // In-memory
501        let queue = plugin.queue.lock().await;
502        assert_eq!(queue.len(), 1);
503        assert_eq!(queue[0].description, "fix the bug");
504        assert_eq!(queue[0].added_by, "alice");
505        drop(queue);
506
507        // On disk
508        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
509        assert_eq!(persisted.len(), 1);
510        assert_eq!(persisted[0].description, "fix the bug");
511    }
512
513    #[tokio::test]
514    async fn add_without_description_returns_error() {
515        let dir = TempDir::new().unwrap();
516        let plugin = make_test_plugin(&dir);
517        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
518        let (clients, _rx, seq) = make_test_clients();
519        let ctx = make_test_ctx(
520            "queue",
521            vec!["add"],
522            "alice",
523            chat_tmp.path(),
524            &clients,
525            &seq,
526        );
527
528        let result = plugin.handle(ctx).await.unwrap();
529        match result {
530            PluginResult::Reply(msg) => assert!(msg.contains("missing task description")),
531            _ => panic!("expected Reply"),
532        }
533    }
534
535    #[tokio::test]
536    async fn list_empty_queue() {
537        let dir = TempDir::new().unwrap();
538        let plugin = make_test_plugin(&dir);
539        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
540        let (clients, _rx, seq) = make_test_clients();
541        let ctx = make_test_ctx(
542            "queue",
543            vec!["list"],
544            "alice",
545            chat_tmp.path(),
546            &clients,
547            &seq,
548        );
549
550        let result = plugin.handle(ctx).await.unwrap();
551        match result {
552            PluginResult::Reply(msg) => assert!(msg.contains("empty")),
553            _ => panic!("expected Reply"),
554        }
555    }
556
557    #[tokio::test]
558    async fn list_shows_indexed_items() {
559        let dir = TempDir::new().unwrap();
560        let plugin = make_test_plugin(&dir);
561        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
562        let (clients, _rx, seq) = make_test_clients();
563
564        // Add two items
565        for desc in ["first task", "second task"] {
566            let words: Vec<&str> = std::iter::once("add")
567                .chain(desc.split_whitespace())
568                .collect();
569            let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
570            plugin.handle(ctx).await.unwrap();
571        }
572
573        let ctx = make_test_ctx(
574            "queue",
575            vec!["list"],
576            "alice",
577            chat_tmp.path(),
578            &clients,
579            &seq,
580        );
581        let result = plugin.handle(ctx).await.unwrap();
582        match result {
583            PluginResult::Reply(msg) => {
584                assert!(msg.contains("2 item(s)"));
585                assert!(msg.contains("1. first task"));
586                assert!(msg.contains("2. second task"));
587            }
588            _ => panic!("expected Reply"),
589        }
590    }
591
592    #[tokio::test]
593    async fn remove_by_index() {
594        let dir = TempDir::new().unwrap();
595        let plugin = make_test_plugin(&dir);
596        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
597        let (clients, _rx, seq) = make_test_clients();
598
599        // Add two items
600        for desc in ["first", "second"] {
601            let ctx = make_test_ctx(
602                "queue",
603                vec!["add", desc],
604                "alice",
605                chat_tmp.path(),
606                &clients,
607                &seq,
608            );
609            plugin.handle(ctx).await.unwrap();
610        }
611
612        // Remove item 1
613        let ctx = make_test_ctx(
614            "queue",
615            vec!["remove", "1"],
616            "alice",
617            chat_tmp.path(),
618            &clients,
619            &seq,
620        );
621        let result = plugin.handle(ctx).await.unwrap();
622        match &result {
623            PluginResult::Broadcast(msg) => {
624                assert!(
625                    msg.contains("first"),
626                    "broadcast should mention removed item"
627                );
628                assert!(msg.contains("#1"), "broadcast should mention index");
629            }
630            _ => panic!("expected Broadcast"),
631        }
632
633        // Verify only "second" remains
634        let queue = plugin.queue.lock().await;
635        assert_eq!(queue.len(), 1);
636        assert_eq!(queue[0].description, "second");
637        drop(queue);
638
639        // Verify persistence
640        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
641        assert_eq!(persisted.len(), 1);
642        assert_eq!(persisted[0].description, "second");
643    }
644
645    #[tokio::test]
646    async fn remove_out_of_range() {
647        let dir = TempDir::new().unwrap();
648        let plugin = make_test_plugin(&dir);
649        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
650        let (clients, _rx, seq) = make_test_clients();
651
652        let ctx = make_test_ctx(
653            "queue",
654            vec!["remove", "5"],
655            "alice",
656            chat_tmp.path(),
657            &clients,
658            &seq,
659        );
660        let result = plugin.handle(ctx).await.unwrap();
661        match result {
662            PluginResult::Reply(msg) => assert!(msg.contains("out of range")),
663            _ => panic!("expected Reply"),
664        }
665    }
666
667    #[tokio::test]
668    async fn remove_invalid_index() {
669        let dir = TempDir::new().unwrap();
670        let plugin = make_test_plugin(&dir);
671        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
672        let (clients, _rx, seq) = make_test_clients();
673
674        let ctx = make_test_ctx(
675            "queue",
676            vec!["remove", "abc"],
677            "alice",
678            chat_tmp.path(),
679            &clients,
680            &seq,
681        );
682        let result = plugin.handle(ctx).await.unwrap();
683        match result {
684            PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
685            _ => panic!("expected Reply"),
686        }
687    }
688
689    #[tokio::test]
690    async fn remove_zero_index() {
691        let dir = TempDir::new().unwrap();
692        let plugin = make_test_plugin(&dir);
693        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
694        let (clients, _rx, seq) = make_test_clients();
695
696        let ctx = make_test_ctx(
697            "queue",
698            vec!["remove", "0"],
699            "alice",
700            chat_tmp.path(),
701            &clients,
702            &seq,
703        );
704        let result = plugin.handle(ctx).await.unwrap();
705        match result {
706            PluginResult::Reply(msg) => assert!(msg.contains("valid 1-based index")),
707            _ => panic!("expected Reply"),
708        }
709    }
710
711    #[tokio::test]
712    async fn pop_claims_first_item() {
713        let dir = TempDir::new().unwrap();
714        let queue_path = dir.path().join("test-room.queue");
715        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
716        let plugin = QueuePlugin::new(queue_path, claim_map.clone()).unwrap();
717
718        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
719        let (clients, _rx, seq) = make_test_clients();
720
721        // Add two items
722        for desc in ["urgent fix", "nice to have"] {
723            let words: Vec<&str> = std::iter::once("add")
724                .chain(desc.split_whitespace())
725                .collect();
726            let ctx = make_test_ctx("queue", words, "ba", chat_tmp.path(), &clients, &seq);
727            plugin.handle(ctx).await.unwrap();
728        }
729
730        // Pop as alice
731        let ctx = make_test_ctx(
732            "queue",
733            vec!["pop"],
734            "alice",
735            chat_tmp.path(),
736            &clients,
737            &seq,
738        );
739        let result = plugin.handle(ctx).await.unwrap();
740        match &result {
741            PluginResult::Broadcast(msg) => {
742                assert!(msg.contains("alice"), "broadcast should mention claimer");
743                assert!(msg.contains("urgent fix"), "broadcast should mention task");
744            }
745            _ => panic!("expected Broadcast"),
746        }
747
748        // Verify claim was set
749        let claims = claim_map.lock().await;
750        assert_eq!(claims.get("alice").unwrap().task, "urgent fix");
751        drop(claims);
752
753        // Verify queue has only 1 item left
754        let queue = plugin.queue.lock().await;
755        assert_eq!(queue.len(), 1);
756        assert_eq!(queue[0].description, "nice to have");
757        drop(queue);
758
759        // Verify persistence
760        let persisted = load_queue(&dir.path().join("test-room.queue")).unwrap();
761        assert_eq!(persisted.len(), 1);
762    }
763
764    #[tokio::test]
765    async fn pop_empty_queue_returns_error() {
766        let dir = TempDir::new().unwrap();
767        let plugin = make_test_plugin(&dir);
768        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
769        let (clients, _rx, seq) = make_test_clients();
770
771        let ctx = make_test_ctx(
772            "queue",
773            vec!["pop"],
774            "alice",
775            chat_tmp.path(),
776            &clients,
777            &seq,
778        );
779        let result = plugin.handle(ctx).await.unwrap();
780        match result {
781            PluginResult::Reply(msg) => assert!(msg.contains("empty")),
782            _ => panic!("expected Reply"),
783        }
784    }
785
786    #[tokio::test]
787    async fn pop_replaces_existing_claim() {
788        let dir = TempDir::new().unwrap();
789        let queue_path = dir.path().join("test-room.queue");
790        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
791
792        // Pre-existing claim
793        {
794            let mut claims = claim_map.lock().await;
795            claims.insert(
796                "alice".to_owned(),
797                ClaimEntry {
798                    task: "old task".to_owned(),
799                    claimed_at: std::time::Instant::now(),
800                },
801            );
802        }
803
804        let plugin = QueuePlugin::new(queue_path, claim_map.clone()).unwrap();
805        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
806        let (clients, _rx, seq) = make_test_clients();
807
808        // Add an item and pop it
809        let ctx = make_test_ctx(
810            "queue",
811            vec!["add", "new", "task"],
812            "ba",
813            chat_tmp.path(),
814            &clients,
815            &seq,
816        );
817        plugin.handle(ctx).await.unwrap();
818
819        let ctx = make_test_ctx(
820            "queue",
821            vec!["pop"],
822            "alice",
823            chat_tmp.path(),
824            &clients,
825            &seq,
826        );
827        plugin.handle(ctx).await.unwrap();
828
829        // Verify claim was replaced
830        let claims = claim_map.lock().await;
831        assert_eq!(claims.get("alice").unwrap().task, "new task");
832    }
833
834    #[tokio::test]
835    async fn unknown_action_returns_error() {
836        let dir = TempDir::new().unwrap();
837        let plugin = make_test_plugin(&dir);
838        let chat_tmp = tempfile::NamedTempFile::new().unwrap();
839        let (clients, _rx, seq) = make_test_clients();
840
841        let ctx = make_test_ctx(
842            "queue",
843            vec!["foobar"],
844            "alice",
845            chat_tmp.path(),
846            &clients,
847            &seq,
848        );
849        let result = plugin.handle(ctx).await.unwrap();
850        match result {
851            PluginResult::Reply(msg) => assert!(msg.contains("unknown action")),
852            _ => panic!("expected Reply"),
853        }
854    }
855
856    #[tokio::test]
857    async fn queue_survives_reload() {
858        let dir = TempDir::new().unwrap();
859        let queue_path = dir.path().join("test-room.queue");
860        let claim_map: ClaimMap = Arc::new(Mutex::new(HashMap::new()));
861
862        // First plugin instance — add items
863        {
864            let plugin = QueuePlugin::new(queue_path.clone(), claim_map.clone()).unwrap();
865            let chat_tmp = tempfile::NamedTempFile::new().unwrap();
866            let (clients, _rx, seq) = make_test_clients();
867
868            for desc in ["task a", "task b", "task c"] {
869                let words: Vec<&str> = std::iter::once("add")
870                    .chain(desc.split_whitespace())
871                    .collect();
872                let ctx = make_test_ctx("queue", words, "alice", chat_tmp.path(), &clients, &seq);
873                plugin.handle(ctx).await.unwrap();
874            }
875        }
876
877        // Second plugin instance — simulates broker restart
878        let plugin2 = QueuePlugin::new(queue_path, claim_map).unwrap();
879        let queue = plugin2.queue.lock().await;
880        assert_eq!(queue.len(), 3);
881        assert_eq!(queue[0].description, "task a");
882        assert_eq!(queue[1].description, "task b");
883        assert_eq!(queue[2].description, "task c");
884    }
885
886    // ── Plugin trait tests ──────────────────────────────────────────────────
887
888    #[test]
889    fn plugin_name_is_queue() {
890        let dir = TempDir::new().unwrap();
891        let plugin = make_test_plugin(&dir);
892        assert_eq!(plugin.name(), "queue");
893    }
894
895    #[test]
896    fn plugin_registers_queue_command() {
897        let dir = TempDir::new().unwrap();
898        let plugin = make_test_plugin(&dir);
899        let cmds = plugin.commands();
900        assert_eq!(cmds.len(), 1);
901        assert_eq!(cmds[0].name, "queue");
902        assert_eq!(cmds[0].params.len(), 2);
903        assert!(cmds[0].params[0].required);
904        assert!(!cmds[0].params[1].required);
905    }
906}