Skip to main content

kernex_memory/store/
tasks.rs

1//! Scheduled task CRUD, deduplication, and retry logic.
2
3use super::Store;
4use crate::error::MemoryError;
5use uuid::Uuid;
6
7/// A scheduled task that is due for delivery.
8pub struct DueTask {
9    pub id: String,
10    pub channel: String,
11    pub sender_id: String,
12    pub reply_target: String,
13    pub description: String,
14    pub repeat: Option<String>,
15    pub task_type: String,
16    pub project: String,
17}
18
19/// One recorded execution of a scheduled task.
20#[derive(Debug, Clone)]
21pub struct TaskRunRecord {
22    pub id: String,
23    pub task_id: String,
24    pub started_at: String,
25    pub finished_at: String,
26    /// `"completed"` or `"failed"` (enforced by a DB CHECK).
27    pub status: String,
28    /// Response text on success.
29    pub result: Option<String>,
30    /// Error text on failure.
31    pub error: Option<String>,
32    /// Billed tokens for the run, when the provider reported a count.
33    pub tokens_used: Option<i64>,
34}
35
36/// A claim older than this is considered abandoned (the claimer died
37/// mid-run) and becomes reclaimable by the next poller.
38const CLAIM_STALE_MINUTES: u32 = 10;
39
40impl Store {
41    /// Create a scheduled task. Deduplicates on two levels:
42    /// 1. Exact match: same sender + description + normalized due_at.
43    /// 2. Fuzzy match: same sender + similar description + due_at within 30 min.
44    #[allow(clippy::too_many_arguments)]
45    pub async fn create_task(
46        &self,
47        channel: &str,
48        sender_id: &str,
49        reply_target: &str,
50        description: &str,
51        due_at: &str,
52        repeat: Option<&str>,
53        task_type: &str,
54        project: &str,
55    ) -> Result<String, MemoryError> {
56        let normalized_due = normalize_due_at(due_at);
57
58        // Level 1: exact dedup on (sender, description, normalized due_at).
59        let existing: Option<(String,)> = sqlx::query_as(
60            "SELECT id FROM scheduled_tasks \
61             WHERE sender_id = ? AND description = ? AND due_at = ? AND status = 'pending' \
62             LIMIT 1",
63        )
64        .bind(sender_id)
65        .bind(description)
66        .bind(&normalized_due)
67        .fetch_optional(&self.pool)
68        .await
69        .map_err(|e| MemoryError::sqlite("dedup check failed", e))?;
70
71        if let Some((id,)) = existing {
72            tracing::info!("scheduled task dedup: reusing existing {id}");
73            return Ok(id);
74        }
75
76        // Level 2: fuzzy dedup — same sender, similar description, due_at within 30 min.
77        let nearby: Vec<(String, String, String)> = sqlx::query_as(
78            "SELECT id, description, due_at FROM scheduled_tasks \
79             WHERE sender_id = ? AND status = 'pending' \
80             AND abs(strftime('%s', ?) - strftime('%s', due_at)) <= 1800",
81        )
82        .bind(sender_id)
83        .bind(&normalized_due)
84        .fetch_all(&self.pool)
85        .await
86        .map_err(|e| MemoryError::sqlite("fuzzy dedup check failed", e))?;
87
88        for (existing_id, existing_desc, _) in &nearby {
89            if descriptions_are_similar(description, existing_desc) {
90                tracing::info!(
91                    "scheduled task fuzzy dedup: reusing {existing_id} (similar to new)"
92                );
93                return Ok(existing_id.clone());
94            }
95        }
96
97        let id = Uuid::new_v4().to_string();
98        sqlx::query(
99            "INSERT INTO scheduled_tasks (id, channel, sender_id, reply_target, description, due_at, repeat, task_type, project) \
100             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
101        )
102        .bind(&id)
103        .bind(channel)
104        .bind(sender_id)
105        .bind(reply_target)
106        .bind(description)
107        .bind(&normalized_due)
108        .bind(repeat)
109        .bind(task_type)
110        .bind(project)
111        .execute(&self.pool)
112        .await
113        .map_err(|e| MemoryError::sqlite("create task failed", e))?;
114
115        Ok(id)
116    }
117
118    /// Get tasks that are due for delivery.
119    #[allow(clippy::type_complexity)]
120    pub async fn get_due_tasks(&self) -> Result<Vec<DueTask>, MemoryError> {
121        let rows: Vec<(
122            String,
123            String,
124            String,
125            String,
126            String,
127            Option<String>,
128            String,
129            String,
130        )> = sqlx::query_as(
131            "SELECT id, channel, sender_id, reply_target, description, repeat, task_type, project \
132                 FROM scheduled_tasks \
133                 WHERE status = 'pending' AND datetime(due_at) <= datetime('now')",
134        )
135        .fetch_all(&self.pool)
136        .await
137        .map_err(|e| MemoryError::sqlite("get due tasks failed", e))?;
138
139        Ok(rows
140            .into_iter()
141            .map(
142                |(
143                    id,
144                    channel,
145                    sender_id,
146                    reply_target,
147                    description,
148                    repeat,
149                    task_type,
150                    project,
151                )| {
152                    DueTask {
153                        id,
154                        channel,
155                        sender_id,
156                        reply_target,
157                        description,
158                        repeat,
159                        task_type,
160                        project,
161                    }
162                },
163            )
164            .collect())
165    }
166
167    /// Atomically claim every task that is due: status flips
168    /// 'pending' -> 'claimed' and the claimed rows are returned in the same
169    /// statement, so when several pollers (an HTTP server, an open REPL, a
170    /// one-shot drain) share a store, each due task is handed to exactly one
171    /// of them. Claims left behind by a dead claimer become reclaimable
172    /// after [`CLAIM_STALE_MINUTES`].
173    ///
174    /// The claim is released by [`Store::complete_task`] (recurring tasks
175    /// return to 'pending' at the next due time) or [`Store::fail_task`]
176    /// (retries return to 'pending').
177    pub async fn claim_due_tasks(&self) -> Result<Vec<DueTask>, MemoryError> {
178        #[allow(clippy::type_complexity)]
179        let rows: Vec<(
180            String,
181            String,
182            String,
183            String,
184            String,
185            Option<String>,
186            String,
187            String,
188        )> = sqlx::query_as(
189            "UPDATE scheduled_tasks \
190             SET status = 'claimed', claimed_at = datetime('now') \
191             WHERE (status = 'pending' AND datetime(due_at) <= datetime('now')) \
192                OR (status = 'claimed' AND datetime(claimed_at) <= datetime('now', ?)) \
193             RETURNING id, channel, sender_id, reply_target, description, repeat, task_type, project",
194        )
195        .bind(format!("-{CLAIM_STALE_MINUTES} minutes"))
196        .fetch_all(&self.pool)
197        .await
198        .map_err(|e| MemoryError::sqlite("claim due tasks failed", e))?;
199
200        Ok(rows
201            .into_iter()
202            .map(
203                |(
204                    id,
205                    channel,
206                    sender_id,
207                    reply_target,
208                    description,
209                    repeat,
210                    task_type,
211                    project,
212                )| {
213                    DueTask {
214                        id,
215                        channel,
216                        sender_id,
217                        reply_target,
218                        description,
219                        repeat,
220                        task_type,
221                        project,
222                    }
223                },
224            )
225            .collect())
226    }
227
228    /// Record one execution of a scheduled task. `status` must be
229    /// `"completed"` or `"failed"` (DB CHECK); `finished_at` is stamped
230    /// server-side. Returns the new run id.
231    pub async fn record_task_run(
232        &self,
233        task_id: &str,
234        started_at: &str,
235        status: &str,
236        result: Option<&str>,
237        error: Option<&str>,
238        tokens_used: Option<u64>,
239    ) -> Result<String, MemoryError> {
240        let id = Uuid::new_v4().to_string();
241        sqlx::query(
242            "INSERT INTO task_runs (id, task_id, started_at, status, result, error, tokens_used) \
243             VALUES (?, ?, ?, ?, ?, ?, ?)",
244        )
245        .bind(&id)
246        .bind(task_id)
247        .bind(started_at)
248        .bind(status)
249        .bind(result)
250        .bind(error)
251        .bind(tokens_used.map(|t| t as i64))
252        .execute(&self.pool)
253        .await
254        .map_err(|e| MemoryError::sqlite("record task run failed", e))?;
255        Ok(id)
256    }
257
258    /// Recorded runs for tasks whose id starts with `task_id_prefix`,
259    /// newest first, capped at `limit`.
260    pub async fn list_task_runs(
261        &self,
262        task_id_prefix: &str,
263        limit: u32,
264    ) -> Result<Vec<TaskRunRecord>, MemoryError> {
265        #[allow(clippy::type_complexity)]
266        let rows: Vec<(
267            String,
268            String,
269            String,
270            String,
271            String,
272            Option<String>,
273            Option<String>,
274            Option<i64>,
275        )> = sqlx::query_as(
276            "SELECT id, task_id, started_at, finished_at, status, result, error, tokens_used \
277             FROM task_runs WHERE task_id LIKE ? \
278             ORDER BY started_at DESC LIMIT ?",
279        )
280        .bind(format!("{task_id_prefix}%"))
281        .bind(i64::from(limit))
282        .fetch_all(&self.pool)
283        .await
284        .map_err(|e| MemoryError::sqlite("list task runs failed", e))?;
285
286        Ok(rows
287            .into_iter()
288            .map(
289                |(id, task_id, started_at, finished_at, status, result, error, tokens_used)| {
290                    TaskRunRecord {
291                        id,
292                        task_id,
293                        started_at,
294                        finished_at,
295                        status,
296                        result,
297                        error,
298                        tokens_used,
299                    }
300                },
301            )
302            .collect())
303    }
304
305    /// Complete a task: one-shot tasks become 'delivered', recurring tasks advance due_at.
306    pub async fn complete_task(&self, id: &str, repeat: Option<&str>) -> Result<(), MemoryError> {
307        match repeat {
308            None | Some("once") => {
309                sqlx::query(
310                    "UPDATE scheduled_tasks SET status = 'delivered', delivered_at = datetime('now') WHERE id = ?",
311                )
312                .bind(id)
313                .execute(&self.pool)
314                .await
315                .map_err(|e| MemoryError::sqlite("complete task failed", e))?;
316            }
317            Some(interval) => {
318                let offset = match interval {
319                    "daily" | "weekdays" => "+1 day",
320                    "weekly" => "+7 days",
321                    "monthly" => "+1 month",
322                    _ => "+1 day",
323                };
324
325                // Advancing a recurring task also releases any claim so the
326                // next occurrence is visible to pollers again.
327                sqlx::query(
328                    "UPDATE scheduled_tasks \
329                     SET due_at = datetime(due_at, ?), status = 'pending', claimed_at = NULL \
330                     WHERE id = ?",
331                )
332                .bind(offset)
333                .bind(id)
334                .execute(&self.pool)
335                .await
336                .map_err(|e| MemoryError::sqlite("advance task failed", e))?;
337
338                if interval == "weekdays" {
339                    sqlx::query(
340                        "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+2 days') \
341                         WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 6",
342                    )
343                    .bind(id)
344                    .execute(&self.pool)
345                    .await
346                    .map_err(|e| MemoryError::sqlite("weekday skip sat failed", e))?;
347
348                    sqlx::query(
349                        "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+1 day') \
350                         WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 0",
351                    )
352                    .bind(id)
353                    .execute(&self.pool)
354                    .await
355                    .map_err(|e| MemoryError::sqlite("weekday skip sun failed", e))?;
356                }
357            }
358        }
359        Ok(())
360    }
361
362    /// Fail an action task: increment retry count and either reschedule or permanently fail.
363    ///
364    /// Returns `true` if the task will be retried, `false` if permanently failed.
365    pub async fn fail_task(
366        &self,
367        id: &str,
368        error: &str,
369        max_retries: u32,
370    ) -> Result<bool, MemoryError> {
371        let row: Option<(i64,)> =
372            sqlx::query_as("SELECT retry_count FROM scheduled_tasks WHERE id = ?")
373                .bind(id)
374                .fetch_optional(&self.pool)
375                .await
376                .map_err(|e| MemoryError::sqlite("fail_task fetch failed", e))?;
377
378        let current_count = row.map(|r| r.0).unwrap_or(0) as u32;
379        let new_count = current_count + 1;
380
381        if new_count < max_retries {
382            // The retry also releases any claim so the rescheduled attempt
383            // is visible to pollers again.
384            sqlx::query(
385                "UPDATE scheduled_tasks \
386                 SET retry_count = ?, last_error = ?, \
387                     due_at = datetime('now', '+2 minutes'), \
388                     status = 'pending', claimed_at = NULL \
389                 WHERE id = ?",
390            )
391            .bind(new_count as i64)
392            .bind(error)
393            .bind(id)
394            .execute(&self.pool)
395            .await
396            .map_err(|e| MemoryError::sqlite("fail_task retry update failed", e))?;
397            Ok(true)
398        } else {
399            sqlx::query(
400                "UPDATE scheduled_tasks \
401                 SET status = 'failed', retry_count = ?, last_error = ? \
402                 WHERE id = ?",
403            )
404            .bind(new_count as i64)
405            .bind(error)
406            .bind(id)
407            .execute(&self.pool)
408            .await
409            .map_err(|e| MemoryError::sqlite("fail_task final update failed", e))?;
410            Ok(false)
411        }
412    }
413
414    /// Get pending tasks for a sender.
415    pub async fn get_tasks_for_sender(
416        &self,
417        sender_id: &str,
418    ) -> Result<Vec<(String, String, String, Option<String>, String, String)>, MemoryError> {
419        let rows: Vec<(String, String, String, Option<String>, String, String)> = sqlx::query_as(
420            "SELECT id, description, due_at, repeat, task_type, project \
421             FROM scheduled_tasks \
422             WHERE sender_id = ? AND status = 'pending' \
423             ORDER BY due_at ASC",
424        )
425        .bind(sender_id)
426        .fetch_all(&self.pool)
427        .await
428        .map_err(|e| MemoryError::sqlite("get tasks failed", e))?;
429
430        Ok(rows)
431    }
432
433    /// Cancel a task by ID prefix (must match sender).
434    pub async fn cancel_task(&self, id_prefix: &str, sender_id: &str) -> Result<bool, MemoryError> {
435        let prefix = format!("{id_prefix}%");
436
437        let result = sqlx::query(
438            "UPDATE scheduled_tasks SET status = 'cancelled' \
439             WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
440        )
441        .bind(&prefix)
442        .bind(sender_id)
443        .execute(&self.pool)
444        .await
445        .map_err(|e| MemoryError::sqlite("cancel task failed", e))?;
446
447        if result.rows_affected() > 0 {
448            return Ok(true);
449        }
450
451        let already: (i64,) = sqlx::query_as(
452            "SELECT COUNT(*) FROM scheduled_tasks \
453             WHERE id LIKE ? AND sender_id = ? AND status = 'cancelled'",
454        )
455        .bind(&prefix)
456        .bind(sender_id)
457        .fetch_one(&self.pool)
458        .await
459        .map_err(|e| MemoryError::sqlite("cancel task check failed", e))?;
460
461        Ok(already.0 > 0)
462    }
463
464    /// Update fields of a pending task by ID prefix (must match sender).
465    pub async fn update_task(
466        &self,
467        id_prefix: &str,
468        sender_id: &str,
469        description: Option<&str>,
470        due_at: Option<&str>,
471        repeat: Option<&str>,
472    ) -> Result<bool, MemoryError> {
473        let mut sets = Vec::new();
474        let mut values: Vec<String> = Vec::new();
475
476        if let Some(d) = description {
477            sets.push("description = ?");
478            values.push(d.to_string());
479        }
480        if let Some(d) = due_at {
481            sets.push("due_at = ?");
482            values.push(d.to_string());
483        }
484        if let Some(r) = repeat {
485            sets.push("repeat = ?");
486            values.push(r.to_string());
487        }
488
489        if sets.is_empty() {
490            return Ok(false);
491        }
492
493        let sql = format!(
494            "UPDATE scheduled_tasks SET {} WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
495            sets.join(", ")
496        );
497
498        let mut query = sqlx::query(&sql);
499        for v in &values {
500            query = query.bind(v);
501        }
502        query = query.bind(format!("{id_prefix}%"));
503        query = query.bind(sender_id);
504
505        let result = query
506            .execute(&self.pool)
507            .await
508            .map_err(|e| MemoryError::sqlite("update task failed", e))?;
509
510        Ok(result.rows_affected() > 0)
511    }
512
513    /// Defer a pending task to a new due_at time (by exact ID).
514    pub async fn defer_task(&self, id: &str, new_due_at: &str) -> Result<(), MemoryError> {
515        sqlx::query("UPDATE scheduled_tasks SET due_at = ? WHERE id = ? AND status = 'pending'")
516            .bind(new_due_at)
517            .bind(id)
518            .execute(&self.pool)
519            .await
520            .map_err(|e| MemoryError::sqlite("defer task failed", e))?;
521        Ok(())
522    }
523}
524
525/// Normalize a datetime string to a consistent format for dedup comparison.
526pub(super) fn normalize_due_at(due_at: &str) -> String {
527    let s = due_at.trim_end_matches('Z');
528    s.replacen('T', " ", 1)
529}
530
531/// Check if two task descriptions are semantically similar via word overlap.
532pub(super) fn descriptions_are_similar(a: &str, b: &str) -> bool {
533    let words_a = significant_words(a);
534    let words_b = significant_words(b);
535
536    if words_a.len() < 3 || words_b.len() < 3 {
537        return false;
538    }
539
540    let (smaller, larger) = if words_a.len() <= words_b.len() {
541        (&words_a, &words_b)
542    } else {
543        (&words_b, &words_a)
544    };
545
546    let overlap = smaller.iter().filter(|w| larger.contains(w)).count();
547    let threshold = smaller.len().div_ceil(2);
548    overlap >= threshold
549}
550
551fn significant_words(text: &str) -> Vec<String> {
552    const STOP_WORDS: &[&str] = &[
553        "the", "and", "for", "that", "this", "with", "from", "are", "was", "were", "been", "have",
554        "has", "had", "will", "would", "could", "should", "may", "might", "can", "about", "into",
555        "over", "after", "before", "between", "under", "again", "then", "once", "daily", "weekly",
556        "monthly", "cada", "diario", "escribir", "enviar", "usar", "nunca", "siempre", "cada",
557    ];
558    text.split(|c: char| !c.is_alphanumeric())
559        .map(|w| w.to_lowercase())
560        .filter(|w| w.len() >= 3 && !STOP_WORDS.contains(&w.as_str()))
561        .collect()
562}