aidaemon 0.11.4

A personal AI agent that runs as a background daemon, accessible via Telegram, Slack, or Discord, with tool use, MCP integration, and persistent memory
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};

use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

/// Window in seconds during which identical messages to the same session
/// are treated as duplicates, even after the original was popped from the queue.
const DEDUP_WINDOW_SECS: i64 = 120;

/// A task still marked `Running` after this many seconds is treated as leaked:
/// its channel handler never finalized it (an abnormal turn exit, a dropped
/// future, or a panic), so without this it would block the session's message
/// queue forever — every new message stranded behind a phantom "running" task.
/// Set far above any legitimate task wall-clock (agent turns and `computer_use`
/// GUI flows are minutes, not hours), so this only ever reaps genuine leaks.
const MAX_RUNNING_TASK_AGE_SECS: i64 = 3600;

/// A queued message waiting to be processed.
#[derive(Clone, Debug)]
pub struct QueuedMessage {
    pub text: String,
    #[allow(dead_code)]
    pub queued_at: DateTime<Utc>,
}

/// Outcome of an atomic check-and-queue attempt.
#[derive(Debug)]
pub enum QueueOutcome {
    /// Message queued at this 1-based position.
    Queued(usize),
    /// Identical message recently seen — silently dropped.
    Duplicate,
    /// No running task for the session: queueing would strand the message
    /// (nothing will drain it). The caller must process it directly.
    NoRunningTask,
}

#[derive(Clone, Debug)]
pub enum TaskStatus {
    Running,
    Completed,
    Failed(String),
    Cancelled,
}

impl std::fmt::Display for TaskStatus {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            TaskStatus::Running => write!(f, "Running"),
            TaskStatus::Completed => write!(f, "Completed"),
            TaskStatus::Failed(e) => write!(f, "Failed: {}", e),
            TaskStatus::Cancelled => write!(f, "Cancelled"),
        }
    }
}

#[derive(Clone, Debug)]
pub struct TaskEntry {
    pub id: u64,
    pub session_id: String,
    pub description: String,
    pub status: TaskStatus,
    pub started_at: DateTime<Utc>,
    pub finished_at: Option<DateTime<Utc>>,
}

struct TaskHandle {
    entry: TaskEntry,
    cancel_token: CancellationToken,
    /// Optional token to cancel the channel's typing indicator for this task.
    /// Set by the channel after task registration; cancelled alongside the task.
    typing_cancel: Option<CancellationToken>,
}

pub struct TaskRegistry {
    tasks: RwLock<HashMap<u64, TaskHandle>>,
    next_id: AtomicU64,
    max_completed: usize,
    /// Message queues per session - messages wait here when a task is running.
    queues: RwLock<HashMap<String, VecDeque<QueuedMessage>>>,
    /// Recently seen message fingerprints per session for deduplication.
    /// Key: (session_id, text_hash), Value: timestamp when first seen.
    /// This survives message pops so webhook retries are caught.
    recently_seen: RwLock<HashMap<(String, u64), DateTime<Utc>>>,
}

impl TaskRegistry {
    pub fn new(max_completed: usize) -> Self {
        Self {
            tasks: RwLock::new(HashMap::new()),
            next_id: AtomicU64::new(1),
            max_completed,
            queues: RwLock::new(HashMap::new()),
            recently_seen: RwLock::new(HashMap::new()),
        }
    }

    /// Simple hash for dedup fingerprinting.
    fn text_hash(text: &str) -> u64 {
        use std::hash::{Hash, Hasher};
        let mut hasher = std::collections::hash_map::DefaultHasher::new();
        text.hash(&mut hasher);
        hasher.finish()
    }

    /// Register a new task. Returns the task ID and a cancellation token.
    pub async fn register(&self, session_id: &str, description: &str) -> (u64, CancellationToken) {
        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
        let cancel_token = CancellationToken::new();
        let handle = TaskHandle {
            entry: TaskEntry {
                id,
                session_id: session_id.to_string(),
                description: description.to_string(),
                status: TaskStatus::Running,
                started_at: Utc::now(),
                finished_at: None,
            },
            cancel_token: cancel_token.clone(),
            typing_cancel: None,
        };
        let mut tasks = self.tasks.write().await;
        tasks.insert(id, handle);
        (id, cancel_token)
    }

    /// Mark a task as completed.
    /// Channel loops now use [`Self::finalize_and_drain`]; retained for
    /// callers that finalize without a session queue (and for tests).
    #[allow(dead_code)]
    pub async fn complete(&self, task_id: u64) {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            handle.entry.status = TaskStatus::Completed;
            handle.entry.finished_at = Some(Utc::now());
        }
        Self::cleanup_locked(&mut tasks, self.max_completed);
    }

    /// Mark a task as failed.
    pub async fn fail(&self, task_id: u64, error: &str) {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            handle.entry.status = TaskStatus::Failed(error.to_string());
            handle.entry.finished_at = Some(Utc::now());
        }
        Self::cleanup_locked(&mut tasks, self.max_completed);
    }

    /// Associate a typing indicator cancel token with a running task.
    /// When the task is cancelled, the typing indicator will also be stopped.
    pub async fn set_typing_cancel(&self, task_id: u64, token: CancellationToken) {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            handle.typing_cancel = Some(token);
        }
    }

    /// Cancel a running task. Returns true if the task was found and cancelled.
    pub async fn cancel(&self, task_id: u64) -> bool {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            if matches!(handle.entry.status, TaskStatus::Running) {
                handle.cancel_token.cancel();
                if let Some(ref typing) = handle.typing_cancel {
                    typing.cancel();
                }
                handle.entry.status = TaskStatus::Cancelled;
                handle.entry.finished_at = Some(Utc::now());
                return true;
            }
        }
        false
    }

    /// Cancel all running tasks for a session. Returns cancelled task (ID, description) pairs.
    pub async fn cancel_running_for_session(&self, session_id: &str) -> Vec<(u64, String)> {
        let mut tasks = self.tasks.write().await;
        let mut cancelled = Vec::new();
        for (id, handle) in tasks.iter_mut() {
            if handle.entry.session_id == session_id
                && matches!(handle.entry.status, TaskStatus::Running)
            {
                handle.cancel_token.cancel();
                if let Some(ref typing) = handle.typing_cancel {
                    typing.cancel();
                }
                handle.entry.status = TaskStatus::Cancelled;
                handle.entry.finished_at = Some(Utc::now());
                cancelled.push((*id, handle.entry.description.clone()));
            }
        }
        cancelled
    }

    /// List all tasks for a given session, sorted by ID.
    pub async fn list_for_session(&self, session_id: &str) -> Vec<TaskEntry> {
        let tasks = self.tasks.read().await;
        let mut entries: Vec<TaskEntry> = tasks
            .values()
            .filter(|h| h.entry.session_id == session_id)
            .map(|h| h.entry.clone())
            .collect();
        entries.sort_by_key(|e| e.id);
        entries
    }

    /// Mark leaked `Running` tasks (older than [`MAX_RUNNING_TASK_AGE_SECS`]) as
    /// `Failed` so they stop blocking the session queue and surface as resolved
    /// in `/tasks`. Cancels each reaped task's token to nudge any still-alive
    /// hung future to exit. Returns the reaped task ids for logging.
    fn reap_stale_running_locked(tasks: &mut HashMap<u64, TaskHandle>) -> Vec<u64> {
        let now = Utc::now();
        let mut reaped = Vec::new();
        for handle in tasks.values_mut() {
            let stale = matches!(handle.entry.status, TaskStatus::Running)
                && (now - handle.entry.started_at).num_seconds() > MAX_RUNNING_TASK_AGE_SECS;
            if stale {
                handle.cancel_token.cancel();
                if let Some(ref typing) = handle.typing_cancel {
                    typing.cancel();
                }
                handle.entry.status =
                    TaskStatus::Failed("stale: task was never finalized".to_string());
                handle.entry.finished_at = Some(now);
                reaped.push(handle.entry.id);
            }
        }
        reaped
    }

    /// Remove oldest finished tasks when count exceeds max_completed.
    fn cleanup_locked(tasks: &mut HashMap<u64, TaskHandle>, max_completed: usize) {
        let mut finished: Vec<u64> = tasks
            .iter()
            .filter(|(_, h)| !matches!(h.entry.status, TaskStatus::Running))
            .map(|(&id, _)| id)
            .collect();

        if finished.len() <= max_completed {
            return;
        }

        // Sort ascending by ID (oldest first) and remove excess
        finished.sort();
        let to_remove = finished.len() - max_completed;
        for &id in finished.iter().take(to_remove) {
            tasks.remove(&id);
        }
    }

    /// Check if a session has a running task.
    ///
    /// Reaps leaked (stale) running tasks first, so a phantom task that was
    /// never finalized can't make this return `true` forever and block the
    /// session's queue.
    pub async fn has_running_task(&self, session_id: &str) -> bool {
        let mut tasks = self.tasks.write().await;
        let reaped = Self::reap_stale_running_locked(&mut tasks);
        if !reaped.is_empty() {
            tracing::warn!(
                ?reaped,
                session_id,
                "Reaped stale running task(s) that were never finalized"
            );
        }
        tasks.values().any(|h| {
            h.entry.session_id == session_id && matches!(h.entry.status, TaskStatus::Running)
        })
    }

    /// Get the description of the currently running task for a session (if any).
    pub async fn get_running_task_description(&self, session_id: &str) -> Option<String> {
        let tasks = self.tasks.read().await;
        tasks
            .values()
            .find(|h| {
                h.entry.session_id == session_id && matches!(h.entry.status, TaskStatus::Running)
            })
            .map(|h| h.entry.description.clone())
    }

    /// Atomically mark a message as "seen" for dedup purposes.
    /// Returns `true` if this is the first time (caller should proceed),
    /// or `false` if a duplicate (caller should drop the message).
    ///
    /// Call this at the start of **direct** message processing (when no task
    /// is already running) to prevent concurrent webhook handlers from all
    /// bypassing `queue_message()` and starting separate processing flows.
    ///
    /// `dedup_key` is the channel's stable per-message identity (e.g. Telegram
    /// `message_id`, Slack `ts`, Discord message id) — the correct idempotency
    /// key for webhook/poll redeliveries. When `Some`, dedup is keyed on it so a
    /// user *re-typing the same text* (which gets a NEW message id) is treated
    /// as a fresh request rather than a duplicate. Falls back to hashing `text`
    /// when `None`, preserving the legacy content-based behavior.
    pub async fn mark_seen(&self, session_id: &str, text: &str, dedup_key: Option<&str>) -> bool {
        let now = Utc::now();
        let hash = Self::text_hash(dedup_key.unwrap_or(text));
        let key = (session_id.to_string(), hash);
        let mut seen = self.recently_seen.write().await;
        seen.retain(|_, ts| (now - *ts).num_seconds() < DEDUP_WINDOW_SECS);
        use std::collections::hash_map::Entry;
        match seen.entry(key) {
            Entry::Occupied(_) => false,
            Entry::Vacant(e) => {
                e.insert(now);
                true
            }
        }
    }

    /// Queue a message for later processing.
    /// Returns `Some(position)` if queued, or `None` if deduplicated (identical message
    /// already exists in the queue or was recently processed).
    /// Channel loops now use [`Self::queue_message_if_running`]; retained
    /// for queue-only callers and tests.
    #[allow(dead_code)]
    pub async fn queue_message(
        &self,
        session_id: &str,
        text: &str,
        dedup_key: Option<&str>,
    ) -> Option<usize> {
        let now = Utc::now();
        let hash = Self::text_hash(dedup_key.unwrap_or(text));
        let key = (session_id.to_string(), hash);

        // Check recently_seen first — this survives queue pops and catches
        // webhook retries even after the original message was processed.
        {
            let mut seen = self.recently_seen.write().await;
            // Prune stale entries while we hold the lock.
            seen.retain(|_, ts| (now - *ts).num_seconds() < DEDUP_WINDOW_SECS);

            if let Some(first_seen) = seen.get(&key) {
                if (now - *first_seen).num_seconds() < DEDUP_WINDOW_SECS {
                    return None; // Duplicate — silently drop
                }
            }
            seen.insert(key, now);
        }

        let mut queues = self.queues.write().await;
        let queue = queues.entry(session_id.to_string()).or_default();

        queue.push_back(QueuedMessage {
            text: text.to_string(),
            queued_at: now,
        });
        Some(queue.len())
    }

    /// Pop the next queued message for a session.
    #[allow(dead_code)] // Retained for compatibility with single-pop queue consumers.
    pub async fn pop_queued_message(&self, session_id: &str) -> Option<QueuedMessage> {
        let mut queues = self.queues.write().await;
        queues.get_mut(session_id).and_then(|q| q.pop_front())
    }

    /// Drain a queue and coalesce its messages into one. When a long message
    /// is fragmented by the client (e.g. Telegram Web), each fragment lands
    /// as a separate queued message — concatenate with newlines so the agent
    /// sees the full message as a single prompt.
    fn drain_and_coalesce(queue: &mut VecDeque<QueuedMessage>) -> Option<QueuedMessage> {
        let first = queue.pop_front()?;
        if queue.is_empty() {
            return Some(first);
        }
        let mut combined = first.text;
        while let Some(msg) = queue.pop_front() {
            combined.push('\n');
            combined.push_str(&msg.text);
        }
        Some(QueuedMessage {
            text: combined,
            queued_at: first.queued_at,
        })
    }

    /// Drain all queued messages for a session and coalesce into one.
    /// Channel loops now use [`Self::finalize_and_drain`]; retained for
    /// drain-only callers and tests.
    #[allow(dead_code)]
    pub async fn pop_all_queued_messages(&self, session_id: &str) -> Option<QueuedMessage> {
        let mut queues = self.queues.write().await;
        let queue = queues.get_mut(session_id)?;
        Self::drain_and_coalesce(queue)
    }

    /// Atomically check for a running task and queue the message behind it.
    ///
    /// Closes the race where a session task finishes between the caller's
    /// `has_running_task()` check and `queue_message()`: a message queued
    /// after the drain in [`finalize_and_drain`] would otherwise sit in the
    /// queue with nothing left to drain it (observed in the 2026-06-06
    /// attribution run, where a queued message was silently stranded).
    ///
    /// Lock order: `tasks` → `recently_seen` → `queues` (same as
    /// [`finalize_and_drain`]), so the two operations serialize and the
    /// caller can trust the [`QueueOutcome`].
    pub async fn queue_message_if_running(
        &self,
        session_id: &str,
        text: &str,
        dedup_key: Option<&str>,
    ) -> QueueOutcome {
        let mut tasks = self.tasks.write().await;
        // Reap leaked running tasks first: a phantom that was never finalized
        // must not strand this message in a queue nobody will drain.
        let reaped = Self::reap_stale_running_locked(&mut tasks);
        if !reaped.is_empty() {
            tracing::warn!(
                ?reaped,
                session_id,
                "Reaped stale running task(s) before queue decision"
            );
        }
        let running = tasks.values().any(|h| {
            h.entry.session_id == session_id && matches!(h.entry.status, TaskStatus::Running)
        });
        if !running {
            return QueueOutcome::NoRunningTask;
        }

        // Keep holding the task-map write lock while queueing so
        // finalize_and_drain (which also takes the write lock first) cannot
        // finalize-and-drain between our check and our push.
        let now = Utc::now();
        let hash = Self::text_hash(dedup_key.unwrap_or(text));
        let key = (session_id.to_string(), hash);
        {
            let mut seen = self.recently_seen.write().await;
            seen.retain(|_, ts| (now - *ts).num_seconds() < DEDUP_WINDOW_SECS);
            if let Some(first_seen) = seen.get(&key) {
                if (now - *first_seen).num_seconds() < DEDUP_WINDOW_SECS {
                    return QueueOutcome::Duplicate;
                }
            }
            seen.insert(key, now);
        }
        let mut queues = self.queues.write().await;
        let queue = queues.entry(session_id.to_string()).or_default();
        queue.push_back(QueuedMessage {
            text: text.to_string(),
            queued_at: now,
        });
        QueueOutcome::Queued(queue.len())
    }

    /// Atomically finalize a task (complete, or fail with `error`) and drain
    /// the session's message queue in one critical section.
    ///
    /// Holding the task-map write lock across both steps means no concurrent
    /// [`queue_message_if_running`] can observe the task as running after the
    /// drain — so a message is either drained here or rejected with
    /// `NoRunningTask` (and processed directly by its own handler). Replaces
    /// the racy `complete()`/`fail()` → gap → `pop_all_queued_messages()`
    /// sequence in the channel loops.
    pub async fn finalize_and_drain(
        &self,
        task_id: u64,
        session_id: &str,
        error: Option<&str>,
    ) -> Option<QueuedMessage> {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            handle.entry.status = match error {
                Some(e) => TaskStatus::Failed(e.to_string()),
                None => TaskStatus::Completed,
            };
            handle.entry.finished_at = Some(Utc::now());
        }
        Self::cleanup_locked(&mut tasks, self.max_completed);

        let mut queues = self.queues.write().await;
        let queue = queues.get_mut(session_id)?;
        Self::drain_and_coalesce(queue)
    }

    /// Get the number of queued messages for a session.
    pub async fn queue_len(&self, session_id: &str) -> usize {
        let queues = self.queues.read().await;
        queues.get(session_id).map(|q| q.len()).unwrap_or(0)
    }

    /// Clear all queued messages for a session.
    pub async fn clear_queue(&self, session_id: &str) {
        let mut queues = self.queues.write().await;
        if let Some(queue) = queues.get_mut(session_id) {
            queue.clear();
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_queue_deduplicates_identical_messages() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // First message queues normally
        let result = registry.queue_message(session, "hello world", None).await;
        assert_eq!(result, Some(1));

        // Identical message within dedup window is deduplicated
        let result = registry.queue_message(session, "hello world", None).await;
        assert_eq!(result, None);

        // Different message queues normally
        let result = registry
            .queue_message(session, "different message", None)
            .await;
        assert_eq!(result, Some(2));

        // Only 2 messages in queue (not 3)
        assert_eq!(registry.queue_len(session).await, 2);
    }

    #[tokio::test]
    async fn test_queue_allows_same_message_different_sessions() {
        let registry = TaskRegistry::new(10);

        let result = registry.queue_message("session-a", "hello", None).await;
        assert_eq!(result, Some(1));

        // Same message but different session — should queue
        let result = registry.queue_message("session-b", "hello", None).await;
        assert_eq!(result, Some(1));
    }

    #[tokio::test]
    async fn test_queue_deduplicates_after_pop() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        let result = registry.queue_message(session, "hello", None).await;
        assert_eq!(result, Some(1));

        // Pop the message
        let popped = registry.pop_queued_message(session).await;
        assert!(popped.is_some());
        assert_eq!(popped.unwrap().text, "hello");

        // Same message should STILL be deduplicated after pop (within dedup window).
        // This prevents webhook retry duplicates from re-entering the queue.
        let result = registry.queue_message(session, "hello", None).await;
        assert_eq!(result, None);
    }

    #[tokio::test]
    async fn test_mark_seen_prevents_duplicates() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // First call returns true (proceed)
        assert!(registry.mark_seen(session, "hello world", None).await);

        // Second call with same text returns false (duplicate)
        assert!(!registry.mark_seen(session, "hello world", None).await);

        // Different text returns true
        assert!(registry.mark_seen(session, "different text", None).await);

        // Different session with same text returns true
        assert!(
            registry
                .mark_seen("other-session", "hello world", None)
                .await
        );
    }

    #[tokio::test]
    async fn test_mark_seen_blocks_subsequent_queue() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // mark_seen first (simulates direct processing)
        assert!(registry.mark_seen(session, "hello", None).await);

        // queue_message for the same text should be deduplicated
        let result = registry.queue_message(session, "hello", None).await;
        assert_eq!(result, None);
    }

    #[tokio::test]
    async fn test_pop_all_coalesces_fragments() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // Simulate a long message fragmented into 4 parts
        registry.queue_message(session, "Part 1: Hello", None).await;
        registry.queue_message(session, "Part 2: World", None).await;
        registry
            .queue_message(session, "Part 3: How are", None)
            .await;
        registry.queue_message(session, "Part 4: you?", None).await;
        assert_eq!(registry.queue_len(session).await, 4);

        // pop_all should coalesce into a single message
        let coalesced = registry.pop_all_queued_messages(session).await;
        assert!(coalesced.is_some());
        let msg = coalesced.unwrap();
        assert_eq!(
            msg.text,
            "Part 1: Hello\nPart 2: World\nPart 3: How are\nPart 4: you?"
        );

        // Queue should now be empty
        assert_eq!(registry.queue_len(session).await, 0);
        assert!(registry.pop_all_queued_messages(session).await.is_none());
    }

    #[tokio::test]
    async fn test_pop_all_single_message() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        registry.queue_message(session, "only one", None).await;
        let result = registry.pop_all_queued_messages(session).await;
        assert!(result.is_some());
        assert_eq!(result.unwrap().text, "only one");
    }

    #[tokio::test]
    async fn test_queue_message_if_running_queues_while_task_runs() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (_task_id, _token) = registry.register(session, "long task").await;

        match registry
            .queue_message_if_running(session, "follow-up", None)
            .await
        {
            QueueOutcome::Queued(pos) => assert_eq!(pos, 1),
            other => panic!("expected Queued(1), got {:?}", other),
        }
    }

    #[tokio::test]
    async fn test_queue_message_if_running_rejects_when_no_task() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // No running task: the caller must process directly instead of
        // queueing into a queue nobody will ever drain.
        assert!(matches!(
            registry
                .queue_message_if_running(session, "orphan", None)
                .await,
            QueueOutcome::NoRunningTask
        ));
        assert_eq!(registry.queue_len(session).await, 0);
    }

    #[tokio::test]
    async fn test_queue_message_if_running_detects_finished_task_race() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (task_id, _token) = registry.register(session, "task").await;
        // Task finishes between the caller's has_running_task check and the
        // queue attempt — the registry must report NoRunningTask instead of
        // stranding the message.
        registry.complete(task_id).await;

        assert!(matches!(
            registry
                .queue_message_if_running(session, "late message", None)
                .await,
            QueueOutcome::NoRunningTask
        ));
    }

    #[tokio::test]
    async fn test_queue_message_if_running_deduplicates() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (_task_id, _token) = registry.register(session, "task").await;

        assert!(matches!(
            registry
                .queue_message_if_running(session, "hello", None)
                .await,
            QueueOutcome::Queued(1)
        ));
        assert!(matches!(
            registry
                .queue_message_if_running(session, "hello", None)
                .await,
            QueueOutcome::Duplicate
        ));
    }

    #[tokio::test]
    async fn test_finalize_and_drain_completes_and_returns_queued_work() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (task_id, _token) = registry.register(session, "task").await;
        registry
            .queue_message_if_running(session, "queued A", None)
            .await;
        registry
            .queue_message_if_running(session, "queued B", None)
            .await;

        let drained = registry.finalize_and_drain(task_id, session, None).await;
        assert_eq!(drained.unwrap().text, "queued A\nqueued B");
        assert!(!registry.has_running_task(session).await);
        assert_eq!(registry.queue_len(session).await, 0);
    }

    #[tokio::test]
    async fn test_finalize_and_drain_with_error_marks_failed() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (task_id, _token) = registry.register(session, "task").await;

        let drained = registry
            .finalize_and_drain(task_id, session, Some("boom"))
            .await;
        assert!(drained.is_none());
        assert!(!registry.has_running_task(session).await);
        let entries = registry.list_for_session(session).await;
        assert!(matches!(entries[0].status, TaskStatus::Failed(_)));
    }

    // Regression: a user re-typing the SAME text gets a new per-message
    // identity (Telegram message_id / Slack ts / Discord id). With a distinct
    // dedup_key, the re-ask must be treated as a fresh request — not silently
    // swallowed by the 120s text-dedup window. (Screenshot bug: "who is my
    // wife?" sent twice, second got no reply.)
    #[tokio::test]
    async fn test_mark_seen_allows_reask_with_distinct_message_id() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // First send of "who is my wife?" with message_id 100.
        assert!(
            registry
                .mark_seen(session, "who is my wife?", Some("100"))
                .await
        );

        // Deliberate re-ask: identical text, NEW message_id 101 → proceed.
        assert!(
            registry
                .mark_seen(session, "who is my wife?", Some("101"))
                .await
        );

        // True redelivery of message_id 100 (poll/webhook retry) → dropped.
        assert!(
            !registry
                .mark_seen(session, "who is my wife?", Some("100"))
                .await
        );
    }

    // Regression: the queued path (task already running) must also key on the
    // per-message identity so a re-ask queues behind the running task instead
    // of being dropped as a Duplicate.
    #[tokio::test]
    async fn test_queue_if_running_allows_reask_with_distinct_message_id() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (_task_id, _token) = registry.register(session, "task").await;

        assert!(matches!(
            registry
                .queue_message_if_running(session, "status?", Some("200"))
                .await,
            QueueOutcome::Queued(1)
        ));
        // Same text, new message_id → fresh request, queues again.
        assert!(matches!(
            registry
                .queue_message_if_running(session, "status?", Some("201"))
                .await,
            QueueOutcome::Queued(2)
        ));
        // Redelivery of message_id 200 → dropped as Duplicate.
        assert!(matches!(
            registry
                .queue_message_if_running(session, "status?", Some("200"))
                .await,
            QueueOutcome::Duplicate
        ));
    }

    #[tokio::test]
    async fn test_stale_running_task_is_reaped_and_unblocks_queue() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (task_id, _token) = registry.register(session, "leaked task").await;

        // Force the task to look leaked: started long before the staleness cap
        // and never finalized (simulates an abnormal channel-handler exit).
        {
            let mut tasks = registry.tasks.write().await;
            let handle = tasks.get_mut(&task_id).unwrap();
            handle.entry.started_at =
                Utc::now() - chrono::Duration::seconds(MAX_RUNNING_TASK_AGE_SECS + 60);
        }

        // The phantom must not report as running...
        assert!(!registry.has_running_task(session).await);
        // ...and a new message must process directly, not queue behind it.
        assert!(matches!(
            registry
                .queue_message_if_running(session, "new message", Some("m1"))
                .await,
            QueueOutcome::NoRunningTask
        ));
        // The reaped task is surfaced as Failed (resolved) in listings.
        let entries = registry.list_for_session(session).await;
        assert!(matches!(entries[0].status, TaskStatus::Failed(_)));
    }

    #[tokio::test]
    async fn test_recent_running_task_is_not_reaped() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (_task_id, _token) = registry.register(session, "active task").await;

        // A freshly-registered task is genuinely running and must still gate
        // the queue (no false-positive reaping of legitimate work).
        assert!(registry.has_running_task(session).await);
        assert!(matches!(
            registry
                .queue_message_if_running(session, "follow-up", Some("m1"))
                .await,
            QueueOutcome::Queued(1)
        ));
    }

    #[tokio::test]
    async fn test_finalize_and_drain_empty_queue_returns_none() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";
        let (task_id, _token) = registry.register(session, "task").await;

        assert!(registry
            .finalize_and_drain(task_id, session, None)
            .await
            .is_none());
        assert!(!registry.has_running_task(session).await);
    }
}