aidaemon 0.9.33

A personal AI agent that runs as a background daemon, accessible via Telegram, Slack, or Discord, with tool use, MCP integration, and persistent memory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};

use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

/// Window in seconds during which identical messages to the same session
/// are treated as duplicates, even after the original was popped from the queue.
const DEDUP_WINDOW_SECS: i64 = 120;

/// A queued message waiting to be processed.
#[derive(Clone, Debug)]
pub struct QueuedMessage {
    pub text: String,
    #[allow(dead_code)]
    pub queued_at: DateTime<Utc>,
}

#[derive(Clone, Debug)]
pub enum TaskStatus {
    Running,
    Completed,
    Failed(String),
    Cancelled,
}

impl std::fmt::Display for TaskStatus {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            TaskStatus::Running => write!(f, "Running"),
            TaskStatus::Completed => write!(f, "Completed"),
            TaskStatus::Failed(e) => write!(f, "Failed: {}", e),
            TaskStatus::Cancelled => write!(f, "Cancelled"),
        }
    }
}

#[derive(Clone, Debug)]
pub struct TaskEntry {
    pub id: u64,
    pub session_id: String,
    pub description: String,
    pub status: TaskStatus,
    pub started_at: DateTime<Utc>,
    pub finished_at: Option<DateTime<Utc>>,
}

struct TaskHandle {
    entry: TaskEntry,
    cancel_token: CancellationToken,
    /// Optional token to cancel the channel's typing indicator for this task.
    /// Set by the channel after task registration; cancelled alongside the task.
    typing_cancel: Option<CancellationToken>,
}

pub struct TaskRegistry {
    tasks: RwLock<HashMap<u64, TaskHandle>>,
    next_id: AtomicU64,
    max_completed: usize,
    /// Message queues per session - messages wait here when a task is running.
    queues: RwLock<HashMap<String, VecDeque<QueuedMessage>>>,
    /// Recently seen message fingerprints per session for deduplication.
    /// Key: (session_id, text_hash), Value: timestamp when first seen.
    /// This survives message pops so webhook retries are caught.
    recently_seen: RwLock<HashMap<(String, u64), DateTime<Utc>>>,
}

impl TaskRegistry {
    pub fn new(max_completed: usize) -> Self {
        Self {
            tasks: RwLock::new(HashMap::new()),
            next_id: AtomicU64::new(1),
            max_completed,
            queues: RwLock::new(HashMap::new()),
            recently_seen: RwLock::new(HashMap::new()),
        }
    }

    /// Simple hash for dedup fingerprinting.
    fn text_hash(text: &str) -> u64 {
        use std::hash::{Hash, Hasher};
        let mut hasher = std::collections::hash_map::DefaultHasher::new();
        text.hash(&mut hasher);
        hasher.finish()
    }

    /// Register a new task. Returns the task ID and a cancellation token.
    pub async fn register(&self, session_id: &str, description: &str) -> (u64, CancellationToken) {
        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
        let cancel_token = CancellationToken::new();
        let handle = TaskHandle {
            entry: TaskEntry {
                id,
                session_id: session_id.to_string(),
                description: description.to_string(),
                status: TaskStatus::Running,
                started_at: Utc::now(),
                finished_at: None,
            },
            cancel_token: cancel_token.clone(),
            typing_cancel: None,
        };
        let mut tasks = self.tasks.write().await;
        tasks.insert(id, handle);
        (id, cancel_token)
    }

    /// Mark a task as completed.
    pub async fn complete(&self, task_id: u64) {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            handle.entry.status = TaskStatus::Completed;
            handle.entry.finished_at = Some(Utc::now());
        }
        Self::cleanup_locked(&mut tasks, self.max_completed);
    }

    /// Mark a task as failed.
    pub async fn fail(&self, task_id: u64, error: &str) {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            handle.entry.status = TaskStatus::Failed(error.to_string());
            handle.entry.finished_at = Some(Utc::now());
        }
        Self::cleanup_locked(&mut tasks, self.max_completed);
    }

    /// Associate a typing indicator cancel token with a running task.
    /// When the task is cancelled, the typing indicator will also be stopped.
    pub async fn set_typing_cancel(&self, task_id: u64, token: CancellationToken) {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            handle.typing_cancel = Some(token);
        }
    }

    /// Cancel a running task. Returns true if the task was found and cancelled.
    pub async fn cancel(&self, task_id: u64) -> bool {
        let mut tasks = self.tasks.write().await;
        if let Some(handle) = tasks.get_mut(&task_id) {
            if matches!(handle.entry.status, TaskStatus::Running) {
                handle.cancel_token.cancel();
                if let Some(ref typing) = handle.typing_cancel {
                    typing.cancel();
                }
                handle.entry.status = TaskStatus::Cancelled;
                handle.entry.finished_at = Some(Utc::now());
                return true;
            }
        }
        false
    }

    /// Cancel all running tasks for a session. Returns cancelled task (ID, description) pairs.
    pub async fn cancel_running_for_session(&self, session_id: &str) -> Vec<(u64, String)> {
        let mut tasks = self.tasks.write().await;
        let mut cancelled = Vec::new();
        for (id, handle) in tasks.iter_mut() {
            if handle.entry.session_id == session_id
                && matches!(handle.entry.status, TaskStatus::Running)
            {
                handle.cancel_token.cancel();
                if let Some(ref typing) = handle.typing_cancel {
                    typing.cancel();
                }
                handle.entry.status = TaskStatus::Cancelled;
                handle.entry.finished_at = Some(Utc::now());
                cancelled.push((*id, handle.entry.description.clone()));
            }
        }
        cancelled
    }

    /// List all tasks for a given session, sorted by ID.
    pub async fn list_for_session(&self, session_id: &str) -> Vec<TaskEntry> {
        let tasks = self.tasks.read().await;
        let mut entries: Vec<TaskEntry> = tasks
            .values()
            .filter(|h| h.entry.session_id == session_id)
            .map(|h| h.entry.clone())
            .collect();
        entries.sort_by_key(|e| e.id);
        entries
    }

    /// Remove oldest finished tasks when count exceeds max_completed.
    fn cleanup_locked(tasks: &mut HashMap<u64, TaskHandle>, max_completed: usize) {
        let mut finished: Vec<u64> = tasks
            .iter()
            .filter(|(_, h)| !matches!(h.entry.status, TaskStatus::Running))
            .map(|(&id, _)| id)
            .collect();

        if finished.len() <= max_completed {
            return;
        }

        // Sort ascending by ID (oldest first) and remove excess
        finished.sort();
        let to_remove = finished.len() - max_completed;
        for &id in finished.iter().take(to_remove) {
            tasks.remove(&id);
        }
    }

    /// Check if a session has a running task.
    pub async fn has_running_task(&self, session_id: &str) -> bool {
        let tasks = self.tasks.read().await;
        tasks.values().any(|h| {
            h.entry.session_id == session_id && matches!(h.entry.status, TaskStatus::Running)
        })
    }

    /// Get the description of the currently running task for a session (if any).
    pub async fn get_running_task_description(&self, session_id: &str) -> Option<String> {
        let tasks = self.tasks.read().await;
        tasks
            .values()
            .find(|h| {
                h.entry.session_id == session_id && matches!(h.entry.status, TaskStatus::Running)
            })
            .map(|h| h.entry.description.clone())
    }

    /// Atomically mark a message as "seen" for dedup purposes.
    /// Returns `true` if this is the first time (caller should proceed),
    /// or `false` if a duplicate (caller should drop the message).
    ///
    /// Call this at the start of **direct** message processing (when no task
    /// is already running) to prevent concurrent webhook handlers from all
    /// bypassing `queue_message()` and starting separate processing flows.
    pub async fn mark_seen(&self, session_id: &str, text: &str) -> bool {
        let now = Utc::now();
        let hash = Self::text_hash(text);
        let key = (session_id.to_string(), hash);
        let mut seen = self.recently_seen.write().await;
        seen.retain(|_, ts| (now - *ts).num_seconds() < DEDUP_WINDOW_SECS);
        use std::collections::hash_map::Entry;
        match seen.entry(key) {
            Entry::Occupied(_) => false,
            Entry::Vacant(e) => {
                e.insert(now);
                true
            }
        }
    }

    /// Queue a message for later processing.
    /// Returns `Some(position)` if queued, or `None` if deduplicated (identical message
    /// already exists in the queue or was recently processed).
    pub async fn queue_message(&self, session_id: &str, text: &str) -> Option<usize> {
        let now = Utc::now();
        let hash = Self::text_hash(text);
        let key = (session_id.to_string(), hash);

        // Check recently_seen first — this survives queue pops and catches
        // webhook retries even after the original message was processed.
        {
            let mut seen = self.recently_seen.write().await;
            // Prune stale entries while we hold the lock.
            seen.retain(|_, ts| (now - *ts).num_seconds() < DEDUP_WINDOW_SECS);

            if let Some(first_seen) = seen.get(&key) {
                if (now - *first_seen).num_seconds() < DEDUP_WINDOW_SECS {
                    return None; // Duplicate — silently drop
                }
            }
            seen.insert(key, now);
        }

        let mut queues = self.queues.write().await;
        let queue = queues.entry(session_id.to_string()).or_default();

        queue.push_back(QueuedMessage {
            text: text.to_string(),
            queued_at: now,
        });
        Some(queue.len())
    }

    /// Pop the next queued message for a session.
    #[allow(dead_code)] // Retained for compatibility with single-pop queue consumers.
    pub async fn pop_queued_message(&self, session_id: &str) -> Option<QueuedMessage> {
        let mut queues = self.queues.write().await;
        queues.get_mut(session_id).and_then(|q| q.pop_front())
    }

    /// Drain all queued messages for a session and coalesce into one.
    /// When a long message is fragmented by the client (e.g. Telegram Web),
    /// each fragment lands as a separate queued message. This method pops
    /// them all and concatenates their text with newlines so the agent
    /// sees the full message as a single prompt.
    pub async fn pop_all_queued_messages(&self, session_id: &str) -> Option<QueuedMessage> {
        let mut queues = self.queues.write().await;
        let queue = queues.get_mut(session_id)?;
        if queue.is_empty() {
            return None;
        }
        let first = queue.pop_front().unwrap();
        if queue.is_empty() {
            return Some(first);
        }
        // Coalesce remaining messages into the first
        let mut combined = first.text;
        while let Some(msg) = queue.pop_front() {
            combined.push('\n');
            combined.push_str(&msg.text);
        }
        Some(QueuedMessage {
            text: combined,
            queued_at: first.queued_at,
        })
    }

    /// Get the number of queued messages for a session.
    pub async fn queue_len(&self, session_id: &str) -> usize {
        let queues = self.queues.read().await;
        queues.get(session_id).map(|q| q.len()).unwrap_or(0)
    }

    /// Clear all queued messages for a session.
    pub async fn clear_queue(&self, session_id: &str) {
        let mut queues = self.queues.write().await;
        if let Some(queue) = queues.get_mut(session_id) {
            queue.clear();
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_queue_deduplicates_identical_messages() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // First message queues normally
        let result = registry.queue_message(session, "hello world").await;
        assert_eq!(result, Some(1));

        // Identical message within dedup window is deduplicated
        let result = registry.queue_message(session, "hello world").await;
        assert_eq!(result, None);

        // Different message queues normally
        let result = registry.queue_message(session, "different message").await;
        assert_eq!(result, Some(2));

        // Only 2 messages in queue (not 3)
        assert_eq!(registry.queue_len(session).await, 2);
    }

    #[tokio::test]
    async fn test_queue_allows_same_message_different_sessions() {
        let registry = TaskRegistry::new(10);

        let result = registry.queue_message("session-a", "hello").await;
        assert_eq!(result, Some(1));

        // Same message but different session — should queue
        let result = registry.queue_message("session-b", "hello").await;
        assert_eq!(result, Some(1));
    }

    #[tokio::test]
    async fn test_queue_deduplicates_after_pop() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        let result = registry.queue_message(session, "hello").await;
        assert_eq!(result, Some(1));

        // Pop the message
        let popped = registry.pop_queued_message(session).await;
        assert!(popped.is_some());
        assert_eq!(popped.unwrap().text, "hello");

        // Same message should STILL be deduplicated after pop (within dedup window).
        // This prevents webhook retry duplicates from re-entering the queue.
        let result = registry.queue_message(session, "hello").await;
        assert_eq!(result, None);
    }

    #[tokio::test]
    async fn test_mark_seen_prevents_duplicates() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // First call returns true (proceed)
        assert!(registry.mark_seen(session, "hello world").await);

        // Second call with same text returns false (duplicate)
        assert!(!registry.mark_seen(session, "hello world").await);

        // Different text returns true
        assert!(registry.mark_seen(session, "different text").await);

        // Different session with same text returns true
        assert!(registry.mark_seen("other-session", "hello world").await);
    }

    #[tokio::test]
    async fn test_mark_seen_blocks_subsequent_queue() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // mark_seen first (simulates direct processing)
        assert!(registry.mark_seen(session, "hello").await);

        // queue_message for the same text should be deduplicated
        let result = registry.queue_message(session, "hello").await;
        assert_eq!(result, None);
    }

    #[tokio::test]
    async fn test_pop_all_coalesces_fragments() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        // Simulate a long message fragmented into 4 parts
        registry.queue_message(session, "Part 1: Hello").await;
        registry.queue_message(session, "Part 2: World").await;
        registry.queue_message(session, "Part 3: How are").await;
        registry.queue_message(session, "Part 4: you?").await;
        assert_eq!(registry.queue_len(session).await, 4);

        // pop_all should coalesce into a single message
        let coalesced = registry.pop_all_queued_messages(session).await;
        assert!(coalesced.is_some());
        let msg = coalesced.unwrap();
        assert_eq!(
            msg.text,
            "Part 1: Hello\nPart 2: World\nPart 3: How are\nPart 4: you?"
        );

        // Queue should now be empty
        assert_eq!(registry.queue_len(session).await, 0);
        assert!(registry.pop_all_queued_messages(session).await.is_none());
    }

    #[tokio::test]
    async fn test_pop_all_single_message() {
        let registry = TaskRegistry::new(10);
        let session = "test-session";

        registry.queue_message(session, "only one").await;
        let result = registry.pop_all_queued_messages(session).await;
        assert!(result.is_some());
        assert_eq!(result.unwrap().text, "only one");
    }
}