1use super::Store;
4use crate::error::MemoryError;
5use uuid::Uuid;
6
7pub struct DueTask {
9 pub id: String,
10 pub channel: String,
11 pub sender_id: String,
12 pub reply_target: String,
13 pub description: String,
14 pub repeat: Option<String>,
15 pub task_type: String,
16 pub project: String,
17}
18
19#[derive(Debug, Clone)]
21pub struct TaskRunRecord {
22 pub id: String,
23 pub task_id: String,
24 pub started_at: String,
25 pub finished_at: String,
26 pub status: String,
28 pub result: Option<String>,
30 pub error: Option<String>,
32 pub tokens_used: Option<i64>,
34}
35
36const CLAIM_STALE_MINUTES: u32 = 10;
39
40impl Store {
41 #[allow(clippy::too_many_arguments)]
45 pub async fn create_task(
46 &self,
47 channel: &str,
48 sender_id: &str,
49 reply_target: &str,
50 description: &str,
51 due_at: &str,
52 repeat: Option<&str>,
53 task_type: &str,
54 project: &str,
55 ) -> Result<String, MemoryError> {
56 let normalized_due = normalize_due_at(due_at);
57
58 let existing: Option<(String,)> = sqlx::query_as(
60 "SELECT id FROM scheduled_tasks \
61 WHERE sender_id = ? AND description = ? AND due_at = ? AND status = 'pending' \
62 LIMIT 1",
63 )
64 .bind(sender_id)
65 .bind(description)
66 .bind(&normalized_due)
67 .fetch_optional(&self.pool)
68 .await
69 .map_err(|e| MemoryError::sqlite("dedup check failed", e))?;
70
71 if let Some((id,)) = existing {
72 tracing::info!("scheduled task dedup: reusing existing {id}");
73 return Ok(id);
74 }
75
76 let nearby: Vec<(String, String, String)> = sqlx::query_as(
78 "SELECT id, description, due_at FROM scheduled_tasks \
79 WHERE sender_id = ? AND status = 'pending' \
80 AND abs(strftime('%s', ?) - strftime('%s', due_at)) <= 1800",
81 )
82 .bind(sender_id)
83 .bind(&normalized_due)
84 .fetch_all(&self.pool)
85 .await
86 .map_err(|e| MemoryError::sqlite("fuzzy dedup check failed", e))?;
87
88 for (existing_id, existing_desc, _) in &nearby {
89 if descriptions_are_similar(description, existing_desc) {
90 tracing::info!(
91 "scheduled task fuzzy dedup: reusing {existing_id} (similar to new)"
92 );
93 return Ok(existing_id.clone());
94 }
95 }
96
97 let id = Uuid::new_v4().to_string();
98 sqlx::query(
99 "INSERT INTO scheduled_tasks (id, channel, sender_id, reply_target, description, due_at, repeat, task_type, project) \
100 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
101 )
102 .bind(&id)
103 .bind(channel)
104 .bind(sender_id)
105 .bind(reply_target)
106 .bind(description)
107 .bind(&normalized_due)
108 .bind(repeat)
109 .bind(task_type)
110 .bind(project)
111 .execute(&self.pool)
112 .await
113 .map_err(|e| MemoryError::sqlite("create task failed", e))?;
114
115 Ok(id)
116 }
117
118 #[allow(clippy::type_complexity)]
120 pub async fn get_due_tasks(&self) -> Result<Vec<DueTask>, MemoryError> {
121 let rows: Vec<(
122 String,
123 String,
124 String,
125 String,
126 String,
127 Option<String>,
128 String,
129 String,
130 )> = sqlx::query_as(
131 "SELECT id, channel, sender_id, reply_target, description, repeat, task_type, project \
132 FROM scheduled_tasks \
133 WHERE status = 'pending' AND datetime(due_at) <= datetime('now')",
134 )
135 .fetch_all(&self.pool)
136 .await
137 .map_err(|e| MemoryError::sqlite("get due tasks failed", e))?;
138
139 Ok(rows
140 .into_iter()
141 .map(
142 |(
143 id,
144 channel,
145 sender_id,
146 reply_target,
147 description,
148 repeat,
149 task_type,
150 project,
151 )| {
152 DueTask {
153 id,
154 channel,
155 sender_id,
156 reply_target,
157 description,
158 repeat,
159 task_type,
160 project,
161 }
162 },
163 )
164 .collect())
165 }
166
167 pub async fn claim_due_tasks(&self) -> Result<Vec<DueTask>, MemoryError> {
178 #[allow(clippy::type_complexity)]
179 let rows: Vec<(
180 String,
181 String,
182 String,
183 String,
184 String,
185 Option<String>,
186 String,
187 String,
188 )> = sqlx::query_as(
189 "UPDATE scheduled_tasks \
190 SET status = 'claimed', claimed_at = datetime('now') \
191 WHERE (status = 'pending' AND datetime(due_at) <= datetime('now')) \
192 OR (status = 'claimed' AND datetime(claimed_at) <= datetime('now', ?)) \
193 RETURNING id, channel, sender_id, reply_target, description, repeat, task_type, project",
194 )
195 .bind(format!("-{CLAIM_STALE_MINUTES} minutes"))
196 .fetch_all(&self.pool)
197 .await
198 .map_err(|e| MemoryError::sqlite("claim due tasks failed", e))?;
199
200 Ok(rows
201 .into_iter()
202 .map(
203 |(
204 id,
205 channel,
206 sender_id,
207 reply_target,
208 description,
209 repeat,
210 task_type,
211 project,
212 )| {
213 DueTask {
214 id,
215 channel,
216 sender_id,
217 reply_target,
218 description,
219 repeat,
220 task_type,
221 project,
222 }
223 },
224 )
225 .collect())
226 }
227
228 pub async fn record_task_run(
232 &self,
233 task_id: &str,
234 started_at: &str,
235 status: &str,
236 result: Option<&str>,
237 error: Option<&str>,
238 tokens_used: Option<u64>,
239 ) -> Result<String, MemoryError> {
240 let id = Uuid::new_v4().to_string();
241 sqlx::query(
242 "INSERT INTO task_runs (id, task_id, started_at, status, result, error, tokens_used) \
243 VALUES (?, ?, ?, ?, ?, ?, ?)",
244 )
245 .bind(&id)
246 .bind(task_id)
247 .bind(started_at)
248 .bind(status)
249 .bind(result)
250 .bind(error)
251 .bind(tokens_used.map(|t| t as i64))
252 .execute(&self.pool)
253 .await
254 .map_err(|e| MemoryError::sqlite("record task run failed", e))?;
255 Ok(id)
256 }
257
258 pub async fn list_task_runs(
261 &self,
262 task_id_prefix: &str,
263 limit: u32,
264 ) -> Result<Vec<TaskRunRecord>, MemoryError> {
265 #[allow(clippy::type_complexity)]
266 let rows: Vec<(
267 String,
268 String,
269 String,
270 String,
271 String,
272 Option<String>,
273 Option<String>,
274 Option<i64>,
275 )> = sqlx::query_as(
276 "SELECT id, task_id, started_at, finished_at, status, result, error, tokens_used \
277 FROM task_runs WHERE task_id LIKE ? \
278 ORDER BY started_at DESC LIMIT ?",
279 )
280 .bind(format!("{task_id_prefix}%"))
281 .bind(i64::from(limit))
282 .fetch_all(&self.pool)
283 .await
284 .map_err(|e| MemoryError::sqlite("list task runs failed", e))?;
285
286 Ok(rows
287 .into_iter()
288 .map(
289 |(id, task_id, started_at, finished_at, status, result, error, tokens_used)| {
290 TaskRunRecord {
291 id,
292 task_id,
293 started_at,
294 finished_at,
295 status,
296 result,
297 error,
298 tokens_used,
299 }
300 },
301 )
302 .collect())
303 }
304
305 pub async fn complete_task(&self, id: &str, repeat: Option<&str>) -> Result<(), MemoryError> {
307 match repeat {
308 None | Some("once") => {
309 sqlx::query(
310 "UPDATE scheduled_tasks SET status = 'delivered', delivered_at = datetime('now') WHERE id = ?",
311 )
312 .bind(id)
313 .execute(&self.pool)
314 .await
315 .map_err(|e| MemoryError::sqlite("complete task failed", e))?;
316 }
317 Some(interval) => {
318 let offset = match interval {
319 "daily" | "weekdays" => "+1 day",
320 "weekly" => "+7 days",
321 "monthly" => "+1 month",
322 _ => "+1 day",
323 };
324
325 sqlx::query(
328 "UPDATE scheduled_tasks \
329 SET due_at = datetime(due_at, ?), status = 'pending', claimed_at = NULL \
330 WHERE id = ?",
331 )
332 .bind(offset)
333 .bind(id)
334 .execute(&self.pool)
335 .await
336 .map_err(|e| MemoryError::sqlite("advance task failed", e))?;
337
338 if interval == "weekdays" {
339 sqlx::query(
340 "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+2 days') \
341 WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 6",
342 )
343 .bind(id)
344 .execute(&self.pool)
345 .await
346 .map_err(|e| MemoryError::sqlite("weekday skip sat failed", e))?;
347
348 sqlx::query(
349 "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+1 day') \
350 WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 0",
351 )
352 .bind(id)
353 .execute(&self.pool)
354 .await
355 .map_err(|e| MemoryError::sqlite("weekday skip sun failed", e))?;
356 }
357 }
358 }
359 Ok(())
360 }
361
362 pub async fn fail_task(
366 &self,
367 id: &str,
368 error: &str,
369 max_retries: u32,
370 ) -> Result<bool, MemoryError> {
371 let row: Option<(i64,)> =
372 sqlx::query_as("SELECT retry_count FROM scheduled_tasks WHERE id = ?")
373 .bind(id)
374 .fetch_optional(&self.pool)
375 .await
376 .map_err(|e| MemoryError::sqlite("fail_task fetch failed", e))?;
377
378 let current_count = row.map(|r| r.0).unwrap_or(0) as u32;
379 let new_count = current_count + 1;
380
381 if new_count < max_retries {
382 sqlx::query(
385 "UPDATE scheduled_tasks \
386 SET retry_count = ?, last_error = ?, \
387 due_at = datetime('now', '+2 minutes'), \
388 status = 'pending', claimed_at = NULL \
389 WHERE id = ?",
390 )
391 .bind(new_count as i64)
392 .bind(error)
393 .bind(id)
394 .execute(&self.pool)
395 .await
396 .map_err(|e| MemoryError::sqlite("fail_task retry update failed", e))?;
397 Ok(true)
398 } else {
399 sqlx::query(
400 "UPDATE scheduled_tasks \
401 SET status = 'failed', retry_count = ?, last_error = ? \
402 WHERE id = ?",
403 )
404 .bind(new_count as i64)
405 .bind(error)
406 .bind(id)
407 .execute(&self.pool)
408 .await
409 .map_err(|e| MemoryError::sqlite("fail_task final update failed", e))?;
410 Ok(false)
411 }
412 }
413
414 pub async fn get_tasks_for_sender(
416 &self,
417 sender_id: &str,
418 ) -> Result<Vec<(String, String, String, Option<String>, String, String)>, MemoryError> {
419 let rows: Vec<(String, String, String, Option<String>, String, String)> = sqlx::query_as(
420 "SELECT id, description, due_at, repeat, task_type, project \
421 FROM scheduled_tasks \
422 WHERE sender_id = ? AND status = 'pending' \
423 ORDER BY due_at ASC",
424 )
425 .bind(sender_id)
426 .fetch_all(&self.pool)
427 .await
428 .map_err(|e| MemoryError::sqlite("get tasks failed", e))?;
429
430 Ok(rows)
431 }
432
433 pub async fn cancel_task(&self, id_prefix: &str, sender_id: &str) -> Result<bool, MemoryError> {
435 let prefix = format!("{id_prefix}%");
436
437 let result = sqlx::query(
438 "UPDATE scheduled_tasks SET status = 'cancelled' \
439 WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
440 )
441 .bind(&prefix)
442 .bind(sender_id)
443 .execute(&self.pool)
444 .await
445 .map_err(|e| MemoryError::sqlite("cancel task failed", e))?;
446
447 if result.rows_affected() > 0 {
448 return Ok(true);
449 }
450
451 let already: (i64,) = sqlx::query_as(
452 "SELECT COUNT(*) FROM scheduled_tasks \
453 WHERE id LIKE ? AND sender_id = ? AND status = 'cancelled'",
454 )
455 .bind(&prefix)
456 .bind(sender_id)
457 .fetch_one(&self.pool)
458 .await
459 .map_err(|e| MemoryError::sqlite("cancel task check failed", e))?;
460
461 Ok(already.0 > 0)
462 }
463
464 pub async fn update_task(
466 &self,
467 id_prefix: &str,
468 sender_id: &str,
469 description: Option<&str>,
470 due_at: Option<&str>,
471 repeat: Option<&str>,
472 ) -> Result<bool, MemoryError> {
473 let mut sets = Vec::new();
474 let mut values: Vec<String> = Vec::new();
475
476 if let Some(d) = description {
477 sets.push("description = ?");
478 values.push(d.to_string());
479 }
480 if let Some(d) = due_at {
481 sets.push("due_at = ?");
482 values.push(d.to_string());
483 }
484 if let Some(r) = repeat {
485 sets.push("repeat = ?");
486 values.push(r.to_string());
487 }
488
489 if sets.is_empty() {
490 return Ok(false);
491 }
492
493 let sql = format!(
494 "UPDATE scheduled_tasks SET {} WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
495 sets.join(", ")
496 );
497
498 let mut query = sqlx::query(&sql);
499 for v in &values {
500 query = query.bind(v);
501 }
502 query = query.bind(format!("{id_prefix}%"));
503 query = query.bind(sender_id);
504
505 let result = query
506 .execute(&self.pool)
507 .await
508 .map_err(|e| MemoryError::sqlite("update task failed", e))?;
509
510 Ok(result.rows_affected() > 0)
511 }
512
513 pub async fn defer_task(&self, id: &str, new_due_at: &str) -> Result<(), MemoryError> {
515 sqlx::query("UPDATE scheduled_tasks SET due_at = ? WHERE id = ? AND status = 'pending'")
516 .bind(new_due_at)
517 .bind(id)
518 .execute(&self.pool)
519 .await
520 .map_err(|e| MemoryError::sqlite("defer task failed", e))?;
521 Ok(())
522 }
523}
524
525pub(super) fn normalize_due_at(due_at: &str) -> String {
527 let s = due_at.trim_end_matches('Z');
528 s.replacen('T', " ", 1)
529}
530
531pub(super) fn descriptions_are_similar(a: &str, b: &str) -> bool {
533 let words_a = significant_words(a);
534 let words_b = significant_words(b);
535
536 if words_a.len() < 3 || words_b.len() < 3 {
537 return false;
538 }
539
540 let (smaller, larger) = if words_a.len() <= words_b.len() {
541 (&words_a, &words_b)
542 } else {
543 (&words_b, &words_a)
544 };
545
546 let overlap = smaller.iter().filter(|w| larger.contains(w)).count();
547 let threshold = smaller.len().div_ceil(2);
548 overlap >= threshold
549}
550
551fn significant_words(text: &str) -> Vec<String> {
552 const STOP_WORDS: &[&str] = &[
553 "the", "and", "for", "that", "this", "with", "from", "are", "was", "were", "been", "have",
554 "has", "had", "will", "would", "could", "should", "may", "might", "can", "about", "into",
555 "over", "after", "before", "between", "under", "again", "then", "once", "daily", "weekly",
556 "monthly", "cada", "diario", "escribir", "enviar", "usar", "nunca", "siempre", "cada",
557 ];
558 text.split(|c: char| !c.is_alphanumeric())
559 .map(|w| w.to_lowercase())
560 .filter(|w| w.len() >= 3 && !STOP_WORDS.contains(&w.as_str()))
561 .collect()
562}