1use super::Store;
4use kernex_core::error::KernexError;
5use uuid::Uuid;
6
7pub struct DueTask {
9 pub id: String,
10 pub channel: String,
11 pub sender_id: String,
12 pub reply_target: String,
13 pub description: String,
14 pub repeat: Option<String>,
15 pub task_type: String,
16 pub project: String,
17}
18
19impl Store {
20 #[allow(clippy::too_many_arguments)]
24 pub async fn create_task(
25 &self,
26 channel: &str,
27 sender_id: &str,
28 reply_target: &str,
29 description: &str,
30 due_at: &str,
31 repeat: Option<&str>,
32 task_type: &str,
33 project: &str,
34 ) -> Result<String, KernexError> {
35 let normalized_due = normalize_due_at(due_at);
36
37 let existing: Option<(String,)> = sqlx::query_as(
39 "SELECT id FROM scheduled_tasks \
40 WHERE sender_id = ? AND description = ? AND due_at = ? AND status = 'pending' \
41 LIMIT 1",
42 )
43 .bind(sender_id)
44 .bind(description)
45 .bind(&normalized_due)
46 .fetch_optional(&self.pool)
47 .await
48 .map_err(|e| KernexError::Store(format!("dedup check failed: {e}")))?;
49
50 if let Some((id,)) = existing {
51 tracing::info!("scheduled task dedup: reusing existing {id}");
52 return Ok(id);
53 }
54
55 let nearby: Vec<(String, String, String)> = sqlx::query_as(
57 "SELECT id, description, due_at FROM scheduled_tasks \
58 WHERE sender_id = ? AND status = 'pending' \
59 AND abs(strftime('%s', ?) - strftime('%s', due_at)) <= 1800",
60 )
61 .bind(sender_id)
62 .bind(&normalized_due)
63 .fetch_all(&self.pool)
64 .await
65 .map_err(|e| KernexError::Store(format!("fuzzy dedup check failed: {e}")))?;
66
67 for (existing_id, existing_desc, _) in &nearby {
68 if descriptions_are_similar(description, existing_desc) {
69 tracing::info!(
70 "scheduled task fuzzy dedup: reusing {existing_id} (similar to new)"
71 );
72 return Ok(existing_id.clone());
73 }
74 }
75
76 let id = Uuid::new_v4().to_string();
77 sqlx::query(
78 "INSERT INTO scheduled_tasks (id, channel, sender_id, reply_target, description, due_at, repeat, task_type, project) \
79 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
80 )
81 .bind(&id)
82 .bind(channel)
83 .bind(sender_id)
84 .bind(reply_target)
85 .bind(description)
86 .bind(&normalized_due)
87 .bind(repeat)
88 .bind(task_type)
89 .bind(project)
90 .execute(&self.pool)
91 .await
92 .map_err(|e| KernexError::Store(format!("create task failed: {e}")))?;
93
94 Ok(id)
95 }
96
97 #[allow(clippy::type_complexity)]
99 pub async fn get_due_tasks(&self) -> Result<Vec<DueTask>, KernexError> {
100 let rows: Vec<(
101 String,
102 String,
103 String,
104 String,
105 String,
106 Option<String>,
107 String,
108 String,
109 )> = sqlx::query_as(
110 "SELECT id, channel, sender_id, reply_target, description, repeat, task_type, project \
111 FROM scheduled_tasks \
112 WHERE status = 'pending' AND datetime(due_at) <= datetime('now')",
113 )
114 .fetch_all(&self.pool)
115 .await
116 .map_err(|e| KernexError::Store(format!("get due tasks failed: {e}")))?;
117
118 Ok(rows
119 .into_iter()
120 .map(
121 |(
122 id,
123 channel,
124 sender_id,
125 reply_target,
126 description,
127 repeat,
128 task_type,
129 project,
130 )| {
131 DueTask {
132 id,
133 channel,
134 sender_id,
135 reply_target,
136 description,
137 repeat,
138 task_type,
139 project,
140 }
141 },
142 )
143 .collect())
144 }
145
146 pub async fn complete_task(&self, id: &str, repeat: Option<&str>) -> Result<(), KernexError> {
148 match repeat {
149 None | Some("once") => {
150 sqlx::query(
151 "UPDATE scheduled_tasks SET status = 'delivered', delivered_at = datetime('now') WHERE id = ?",
152 )
153 .bind(id)
154 .execute(&self.pool)
155 .await
156 .map_err(|e| KernexError::Store(format!("complete task failed: {e}")))?;
157 }
158 Some(interval) => {
159 let offset = match interval {
160 "daily" | "weekdays" => "+1 day",
161 "weekly" => "+7 days",
162 "monthly" => "+1 month",
163 _ => "+1 day",
164 };
165
166 sqlx::query("UPDATE scheduled_tasks SET due_at = datetime(due_at, ?) WHERE id = ?")
167 .bind(offset)
168 .bind(id)
169 .execute(&self.pool)
170 .await
171 .map_err(|e| KernexError::Store(format!("advance task failed: {e}")))?;
172
173 if interval == "weekdays" {
174 sqlx::query(
175 "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+2 days') \
176 WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 6",
177 )
178 .bind(id)
179 .execute(&self.pool)
180 .await
181 .map_err(|e| KernexError::Store(format!("weekday skip sat failed: {e}")))?;
182
183 sqlx::query(
184 "UPDATE scheduled_tasks SET due_at = datetime(due_at, '+1 day') \
185 WHERE id = ? AND CAST(strftime('%w', due_at) AS INTEGER) = 0",
186 )
187 .bind(id)
188 .execute(&self.pool)
189 .await
190 .map_err(|e| KernexError::Store(format!("weekday skip sun failed: {e}")))?;
191 }
192 }
193 }
194 Ok(())
195 }
196
197 pub async fn fail_task(
201 &self,
202 id: &str,
203 error: &str,
204 max_retries: u32,
205 ) -> Result<bool, KernexError> {
206 let row: Option<(i64,)> =
207 sqlx::query_as("SELECT retry_count FROM scheduled_tasks WHERE id = ?")
208 .bind(id)
209 .fetch_optional(&self.pool)
210 .await
211 .map_err(|e| KernexError::Store(format!("fail_task fetch failed: {e}")))?;
212
213 let current_count = row.map(|r| r.0).unwrap_or(0) as u32;
214 let new_count = current_count + 1;
215
216 if new_count < max_retries {
217 sqlx::query(
218 "UPDATE scheduled_tasks \
219 SET retry_count = ?, last_error = ?, \
220 due_at = datetime('now', '+2 minutes') \
221 WHERE id = ?",
222 )
223 .bind(new_count as i64)
224 .bind(error)
225 .bind(id)
226 .execute(&self.pool)
227 .await
228 .map_err(|e| KernexError::Store(format!("fail_task retry update failed: {e}")))?;
229 Ok(true)
230 } else {
231 sqlx::query(
232 "UPDATE scheduled_tasks \
233 SET status = 'failed', retry_count = ?, last_error = ? \
234 WHERE id = ?",
235 )
236 .bind(new_count as i64)
237 .bind(error)
238 .bind(id)
239 .execute(&self.pool)
240 .await
241 .map_err(|e| KernexError::Store(format!("fail_task final update failed: {e}")))?;
242 Ok(false)
243 }
244 }
245
246 pub async fn get_tasks_for_sender(
248 &self,
249 sender_id: &str,
250 ) -> Result<Vec<(String, String, String, Option<String>, String, String)>, KernexError> {
251 let rows: Vec<(String, String, String, Option<String>, String, String)> = sqlx::query_as(
252 "SELECT id, description, due_at, repeat, task_type, project \
253 FROM scheduled_tasks \
254 WHERE sender_id = ? AND status = 'pending' \
255 ORDER BY due_at ASC",
256 )
257 .bind(sender_id)
258 .fetch_all(&self.pool)
259 .await
260 .map_err(|e| KernexError::Store(format!("get tasks failed: {e}")))?;
261
262 Ok(rows)
263 }
264
265 pub async fn cancel_task(&self, id_prefix: &str, sender_id: &str) -> Result<bool, KernexError> {
267 let prefix = format!("{id_prefix}%");
268
269 let result = sqlx::query(
270 "UPDATE scheduled_tasks SET status = 'cancelled' \
271 WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
272 )
273 .bind(&prefix)
274 .bind(sender_id)
275 .execute(&self.pool)
276 .await
277 .map_err(|e| KernexError::Store(format!("cancel task failed: {e}")))?;
278
279 if result.rows_affected() > 0 {
280 return Ok(true);
281 }
282
283 let already: (i64,) = sqlx::query_as(
284 "SELECT COUNT(*) FROM scheduled_tasks \
285 WHERE id LIKE ? AND sender_id = ? AND status = 'cancelled'",
286 )
287 .bind(&prefix)
288 .bind(sender_id)
289 .fetch_one(&self.pool)
290 .await
291 .map_err(|e| KernexError::Store(format!("cancel task check failed: {e}")))?;
292
293 Ok(already.0 > 0)
294 }
295
296 pub async fn update_task(
298 &self,
299 id_prefix: &str,
300 sender_id: &str,
301 description: Option<&str>,
302 due_at: Option<&str>,
303 repeat: Option<&str>,
304 ) -> Result<bool, KernexError> {
305 let mut sets = Vec::new();
306 let mut values: Vec<String> = Vec::new();
307
308 if let Some(d) = description {
309 sets.push("description = ?");
310 values.push(d.to_string());
311 }
312 if let Some(d) = due_at {
313 sets.push("due_at = ?");
314 values.push(d.to_string());
315 }
316 if let Some(r) = repeat {
317 sets.push("repeat = ?");
318 values.push(r.to_string());
319 }
320
321 if sets.is_empty() {
322 return Ok(false);
323 }
324
325 let sql = format!(
326 "UPDATE scheduled_tasks SET {} WHERE id LIKE ? AND sender_id = ? AND status = 'pending'",
327 sets.join(", ")
328 );
329
330 let mut query = sqlx::query(&sql);
331 for v in &values {
332 query = query.bind(v);
333 }
334 query = query.bind(format!("{id_prefix}%"));
335 query = query.bind(sender_id);
336
337 let result = query
338 .execute(&self.pool)
339 .await
340 .map_err(|e| KernexError::Store(format!("update task failed: {e}")))?;
341
342 Ok(result.rows_affected() > 0)
343 }
344
345 pub async fn defer_task(&self, id: &str, new_due_at: &str) -> Result<(), KernexError> {
347 sqlx::query("UPDATE scheduled_tasks SET due_at = ? WHERE id = ? AND status = 'pending'")
348 .bind(new_due_at)
349 .bind(id)
350 .execute(&self.pool)
351 .await
352 .map_err(|e| KernexError::Store(format!("defer task failed: {e}")))?;
353 Ok(())
354 }
355}
356
357pub(super) fn normalize_due_at(due_at: &str) -> String {
359 let s = due_at.trim_end_matches('Z');
360 s.replacen('T', " ", 1)
361}
362
363pub(super) fn descriptions_are_similar(a: &str, b: &str) -> bool {
365 let words_a = significant_words(a);
366 let words_b = significant_words(b);
367
368 if words_a.len() < 3 || words_b.len() < 3 {
369 return false;
370 }
371
372 let (smaller, larger) = if words_a.len() <= words_b.len() {
373 (&words_a, &words_b)
374 } else {
375 (&words_b, &words_a)
376 };
377
378 let overlap = smaller.iter().filter(|w| larger.contains(w)).count();
379 let threshold = smaller.len().div_ceil(2);
380 overlap >= threshold
381}
382
383fn significant_words(text: &str) -> Vec<String> {
384 const STOP_WORDS: &[&str] = &[
385 "the", "and", "for", "that", "this", "with", "from", "are", "was", "were", "been", "have",
386 "has", "had", "will", "would", "could", "should", "may", "might", "can", "about", "into",
387 "over", "after", "before", "between", "under", "again", "then", "once", "daily", "weekly",
388 "monthly", "cada", "diario", "escribir", "enviar", "usar", "nunca", "siempre", "cada",
389 ];
390 text.split(|c: char| !c.is_alphanumeric())
391 .map(|w| w.to_lowercase())
392 .filter(|w| w.len() >= 3 && !STOP_WORDS.contains(&w.as_str()))
393 .collect()
394}