Skip to main content

kernex_memory/store/
tasks.rs

1//! Scheduled task CRUD, deduplication, and retry logic.
2
3use super::Store;
4use kernex_core::error::KernexError;
5use uuid::Uuid;
6
7/// A scheduled task that is due for delivery.
8pub struct DueTask {
9    pub id: String,
10    pub channel: String,
11    pub sender_id: String,
12    pub reply_target: String,
13    pub description: String,
14    pub repeat: Option<String>,
15    pub task_type: String,
16    pub project: String,
17}
18
19impl Store {
20    /// Create a scheduled task. Deduplicates on two levels:
21    /// 1. Exact match: same sender + description + normalized due_at.
22    /// 2. Fuzzy match: same sender + similar description + due_at within 30 min.
23    #[allow(clippy::too_many_arguments)]
24    pub async fn create_task(
25        &self,
26        channel: &str,
27        sender_id: &str,
28        reply_target: &str,
29        description: &str,
30        due_at: &str,
31        repeat: Option<&str>,
32        task_type: &str,
33        project: &str,
34    ) -> Result<String, KernexError> {
35        let normalized_due = normalize_due_at(due_at);
36
37        // Level 1: exact dedup on (sender, description, normalized due_at).
38        let existing: Option<(String,)> = sqlx::query_as(
39            "SELECT id FROM scheduled_tasks \
40             WHERE sender_id = ? AND description = ? AND due_at = ? AND status = 'pending' \
41             LIMIT 1",
42        )
43        .bind(sender_id)
44        .bind(description)
45        .bind(&normalized_due)
46        .fetch_optional(&self.pool)
47        .await
48        .map_err(|e| KernexError::Store(format!("dedup check failed: {e}")))?;
49
50        if let Some((id,)) = existing {
51            tracing::info!("scheduled task dedup: reusing existing {id}");
52            return Ok(id);
53        }
54
55        // Level 2: fuzzy dedup — same sender, similar description, due_at within 30 min.
56        let nearby: Vec<(String, String, String)> = sqlx::query_as(
57            "SELECT id, description, due_at FROM scheduled_tasks \
58             WHERE sender_id = ? AND status = 'pending' \
59             AND abs(strftime('%s', ?) - strftime('%s', due_at)) <= 1800",
60        )
61        .bind(sender_id)
62        .bind(&normalized_due)
63        .fetch_all(&self.pool)
64        .await
65        .map_err(|e| KernexError::Store(format!("fuzzy dedup check failed: {e}")))?;
66
67        for (existing_id, existing_desc, _) in &nearby {
68            if descriptions_are_similar(description, existing_desc) {
69                tracing::info!(
70                    "scheduled task fuzzy dedup: reusing {existing_id} (similar to new)"
71                );
72                return Ok(existing_id.clone());
73            }
74        }
75
76        let id = Uuid::new_v4().to_string();
77        sqlx::query(
78            "INSERT INTO scheduled_tasks (id, channel, sender_id, reply_target, description, due_at, repeat, task_type, project) \
79             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
80        )
81        .bind(&id)
82        .bind(channel)
83        .bind(sender_id)
84        .bind(reply_target)
85        .bind(description)
86        .bind(&normalized_due)
87        .bind(repeat)
88        .bind(task_type)
89        .bind(project)
90        .execute(&self.pool)
91        .await
92        .map_err(|e| KernexError::Store(format!("create task failed: {e}")))?;
93
94        Ok(id)
95    }
96
97    /// Get tasks that are due for delivery.
98    #[allow(clippy::type_complexity)]
99    pub async fn get_due_tasks(&self) -> Result<Vec<DueTask>, KernexError> {
100        let rows: Vec<(
101            String,
102            String,
103            String,
104            String,
105            String,
106            Option<String>,
107            String,
108            String,
109        )> = sqlx::query_as(
110            "SELECT id, channel, sender_id, reply_target, description, repeat, task_type, project \
111                 FROM scheduled_tasks \
112                 WHERE status = 'pending' AND datetime(due_at) <= datetime('now')",
113        )
114        .fetch_all(&self.pool)
115        .await
116        .map_err(|e| KernexError::Store(format!("get due tasks failed: {e}")))?;
117
118        Ok(rows
119            .into_iter()
120            .map(
121                |(
122                    id,
123                    channel,
124                    sender_id,
125                    reply_target,
126                    description,
127                    repeat,
128                    task_type,
129                    project,
130                )| {
131                    DueTask {
132                        id,
133                        channel,
134                        sender_id,
135                        reply_target,
136                        description,
137                        repeat,
138                        task_type,
139                        project,
140                    }
141                },
142            )
143            .collect())
144    }
145
146    /// Complete a task: one-shot tasks become 'delivered', recurring tasks advance due_at.
147    pub async fn complete_task(&self, id: &str, repeat: Option<&str>) -> Result<(), KernexError> {
148        match repeat {
149            None | Some("once") => {
150                sqlx::query(
151                    "UPDATE scheduled_tasks SET status = 'delivered', delivered_at = datetime('now') WHERE id = ?",
152                )
153                .bind(id)
154                .execute(&self.pool)
155                .await
156                .map_err(|e| KernexError::Store(format!("complete task failed: {e}")))?;
157            }
158            Some(interval) => {
159                let offset = match interval {
160                    "daily" | "weekdays" => "+1 day",
161                    "weekly" => "+7 days",
162                    "monthly" => "+1 month",
163                    _ => "+1 day",
164                };
165
166                sqlx::query("UPDATE scheduled_tasks SET due_at = datetime(due_at, ?) WHERE id = ?")
167                    .bind(offset)
168                    .bind(id)
169                    .execute(&self.pool)
170                    .await
171                    .map_err(|e| KernexError::Store(format!("advance task failed: {e}")))?;
172
173                if interval == "weekdays" {
174                    sqlx::query(
175                        "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+2 days') \
176                         WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 6",
177                    )
178                    .bind(id)
179                    .execute(&self.pool)
180                    .await
181                    .map_err(|e| KernexError::Store(format!("weekday skip sat failed: {e}")))?;
182
183                    sqlx::query(
184                        "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+1 day') \
185                         WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 0",
186                    )
187                    .bind(id)
188                    .execute(&self.pool)
189                    .await
190                    .map_err(|e| KernexError::Store(format!("weekday skip sun failed: {e}")))?;
191                }
192            }
193        }
194        Ok(())
195    }
196
197    /// Fail an action task: increment retry count and either reschedule or permanently fail.
198    ///
199    /// Returns `true` if the task will be retried, `false` if permanently failed.
200    pub async fn fail_task(
201        &self,
202        id: &str,
203        error: &str,
204        max_retries: u32,
205    ) -> Result<bool, KernexError> {
206        let row: Option<(i64,)> =
207            sqlx::query_as("SELECT retry_count FROM scheduled_tasks WHERE id = ?")
208                .bind(id)
209                .fetch_optional(&self.pool)
210                .await
211                .map_err(|e| KernexError::Store(format!("fail_task fetch failed: {e}")))?;
212
213        let current_count = row.map(|r| r.0).unwrap_or(0) as u32;
214        let new_count = current_count + 1;
215
216        if new_count < max_retries {
217            sqlx::query(
218                "UPDATE scheduled_tasks \
219                 SET retry_count = ?, last_error = ?, \
220                     due_at = datetime('now', '+2 minutes') \
221                 WHERE id = ?",
222            )
223            .bind(new_count as i64)
224            .bind(error)
225            .bind(id)
226            .execute(&self.pool)
227            .await
228            .map_err(|e| KernexError::Store(format!("fail_task retry update failed: {e}")))?;
229            Ok(true)
230        } else {
231            sqlx::query(
232                "UPDATE scheduled_tasks \
233                 SET status = 'failed', retry_count = ?, last_error = ? \
234                 WHERE id = ?",
235            )
236            .bind(new_count as i64)
237            .bind(error)
238            .bind(id)
239            .execute(&self.pool)
240            .await
241            .map_err(|e| KernexError::Store(format!("fail_task final update failed: {e}")))?;
242            Ok(false)
243        }
244    }
245
246    /// Get pending tasks for a sender.
247    pub async fn get_tasks_for_sender(
248        &self,
249        sender_id: &str,
250    ) -> Result<Vec<(String, String, String, Option<String>, String, String)>, KernexError> {
251        let rows: Vec<(String, String, String, Option<String>, String, String)> = sqlx::query_as(
252            "SELECT id, description, due_at, repeat, task_type, project \
253             FROM scheduled_tasks \
254             WHERE sender_id = ? AND status = 'pending' \
255             ORDER BY due_at ASC",
256        )
257        .bind(sender_id)
258        .fetch_all(&self.pool)
259        .await
260        .map_err(|e| KernexError::Store(format!("get tasks failed: {e}")))?;
261
262        Ok(rows)
263    }
264
265    /// Cancel a task by ID prefix (must match sender).
266    pub async fn cancel_task(&self, id_prefix: &str, sender_id: &str) -> Result<bool, KernexError> {
267        let prefix = format!("{id_prefix}%");
268
269        let result = sqlx::query(
270            "UPDATE scheduled_tasks SET status = 'cancelled' \
271             WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
272        )
273        .bind(&prefix)
274        .bind(sender_id)
275        .execute(&self.pool)
276        .await
277        .map_err(|e| KernexError::Store(format!("cancel task failed: {e}")))?;
278
279        if result.rows_affected() > 0 {
280            return Ok(true);
281        }
282
283        let already: (i64,) = sqlx::query_as(
284            "SELECT COUNT(*) FROM scheduled_tasks \
285             WHERE id LIKE ? AND sender_id = ? AND status = 'cancelled'",
286        )
287        .bind(&prefix)
288        .bind(sender_id)
289        .fetch_one(&self.pool)
290        .await
291        .map_err(|e| KernexError::Store(format!("cancel task check failed: {e}")))?;
292
293        Ok(already.0 > 0)
294    }
295
296    /// Update fields of a pending task by ID prefix (must match sender).
297    pub async fn update_task(
298        &self,
299        id_prefix: &str,
300        sender_id: &str,
301        description: Option<&str>,
302        due_at: Option<&str>,
303        repeat: Option<&str>,
304    ) -> Result<bool, KernexError> {
305        let mut sets = Vec::new();
306        let mut values: Vec<String> = Vec::new();
307
308        if let Some(d) = description {
309            sets.push("description = ?");
310            values.push(d.to_string());
311        }
312        if let Some(d) = due_at {
313            sets.push("due_at = ?");
314            values.push(d.to_string());
315        }
316        if let Some(r) = repeat {
317            sets.push("repeat = ?");
318            values.push(r.to_string());
319        }
320
321        if sets.is_empty() {
322            return Ok(false);
323        }
324
325        let sql = format!(
326            "UPDATE scheduled_tasks SET {} WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
327            sets.join(", ")
328        );
329
330        let mut query = sqlx::query(&sql);
331        for v in &values {
332            query = query.bind(v);
333        }
334        query = query.bind(format!("{id_prefix}%"));
335        query = query.bind(sender_id);
336
337        let result = query
338            .execute(&self.pool)
339            .await
340            .map_err(|e| KernexError::Store(format!("update task failed: {e}")))?;
341
342        Ok(result.rows_affected() > 0)
343    }
344
345    /// Defer a pending task to a new due_at time (by exact ID).
346    pub async fn defer_task(&self, id: &str, new_due_at: &str) -> Result<(), KernexError> {
347        sqlx::query("UPDATE scheduled_tasks SET due_at = ? WHERE id = ? AND status = 'pending'")
348            .bind(new_due_at)
349            .bind(id)
350            .execute(&self.pool)
351            .await
352            .map_err(|e| KernexError::Store(format!("defer task failed: {e}")))?;
353        Ok(())
354    }
355}
356
357/// Normalize a datetime string to a consistent format for dedup comparison.
358pub(super) fn normalize_due_at(due_at: &str) -> String {
359    let s = due_at.trim_end_matches('Z');
360    s.replacen('T', " ", 1)
361}
362
363/// Check if two task descriptions are semantically similar via word overlap.
364pub(super) fn descriptions_are_similar(a: &str, b: &str) -> bool {
365    let words_a = significant_words(a);
366    let words_b = significant_words(b);
367
368    if words_a.len() < 3 || words_b.len() < 3 {
369        return false;
370    }
371
372    let (smaller, larger) = if words_a.len() <= words_b.len() {
373        (&words_a, &words_b)
374    } else {
375        (&words_b, &words_a)
376    };
377
378    let overlap = smaller.iter().filter(|w| larger.contains(w)).count();
379    let threshold = smaller.len().div_ceil(2);
380    overlap >= threshold
381}
382
383fn significant_words(text: &str) -> Vec<String> {
384    const STOP_WORDS: &[&str] = &[
385        "the", "and", "for", "that", "this", "with", "from", "are", "was", "were", "been", "have",
386        "has", "had", "will", "would", "could", "should", "may", "might", "can", "about", "into",
387        "over", "after", "before", "between", "under", "again", "then", "once", "daily", "weekly",
388        "monthly", "cada", "diario", "escribir", "enviar", "usar", "nunca", "siempre", "cada",
389    ];
390    text.split(|c: char| !c.is_alphanumeric())
391        .map(|w| w.to_lowercase())
392        .filter(|w| w.len() >= 3 && !STOP_WORDS.contains(&w.as_str()))
393        .collect()
394}