1use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6 AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10 PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11 parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17#[derive(Debug)]
19pub struct CreateTreeOptions<'a> {
20 pub input: TaskTreeInput,
21 pub parent_id: Option<String>,
22 pub child_type: Option<String>,
23 pub sibling_type: Option<String>,
24 pub states_config: &'a StatesConfig,
25 pub phases_config: &'a PhasesConfig,
26 pub tags_config: &'a TagsConfig,
27 pub ids_config: &'a IdsConfig,
28}
29
30#[derive(Debug, Default)]
32pub struct ListTasksQuery<'a> {
33 pub status: Option<&'a str>,
34 pub phase: Option<&'a str>,
35 pub owner: Option<&'a str>,
36 pub parent_id: Option<Option<&'a str>>,
37 pub limit: Option<i32>,
38 pub offset: i32,
39 pub sort_by: Option<&'a str>,
40 pub sort_order: Option<&'a str>,
41}
42
43fn generate_task_id(ids_config: &IdsConfig) -> String {
46 let words = ids_config.task_id_words;
47 let case = ids_config.id_case;
48
49 let base = Petnames::medium()
51 .generate_one(words, "-")
52 .unwrap_or_else(|| format!("task-{}", super::now_ms()));
53
54 case.convert(&base)
56}
57
58fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
61 let field = match sort_by {
62 Some("priority") => "CAST(t.priority AS INTEGER)",
63 Some("created_at") => "t.created_at",
64 Some("updated_at") => "t.updated_at",
65 _ => "t.created_at", };
67
68 let order = match sort_order {
69 Some("asc") => "ASC",
70 Some("desc") => "DESC",
71 _ => {
72 "DESC"
74 }
75 };
76
77 format!("{} {}", field, order)
78}
79
80fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
87 conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
88 for tag in tags {
89 conn.execute(
90 "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
91 params![task_id, tag],
92 )?;
93 }
94 Ok(())
95}
96
97fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
99 conn.execute(
100 "DELETE FROM task_needed_tags WHERE task_id = ?1",
101 params![task_id],
102 )?;
103 for tag in tags {
104 conn.execute(
105 "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
106 params![task_id, tag],
107 )?;
108 }
109 Ok(())
110}
111
112fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
114 conn.execute(
115 "DELETE FROM task_wanted_tags WHERE task_id = ?1",
116 params![task_id],
117 )?;
118 for tag in tags {
119 conn.execute(
120 "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
121 params![task_id, tag],
122 )?;
123 }
124 Ok(())
125}
126
127pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
128 let id: String = row.get("id")?;
129 let title: String = row.get("title")?;
130 let description: Option<String> = row.get("description")?;
131 let status: String = row.get("status")?;
132 let phase: Option<String> = row.get("phase")?;
133 let priority: String = row.get("priority")?;
134 let worker_id: Option<String> = row.get("worker_id")?;
135 let claimed_at: Option<i64> = row.get("claimed_at")?;
136
137 let needed_tags_json: Option<String> = row.get("needed_tags")?;
138 let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
139 let tags_json: Option<String> = row.get("tags")?;
140
141 let points: Option<i32> = row.get("points")?;
142 let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
143 let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
144 let started_at: Option<i64> = row.get("started_at")?;
145 let completed_at: Option<i64> = row.get("completed_at")?;
146
147 let current_thought: Option<String> = row.get("current_thought")?;
148
149 let cost_usd: f64 = row.get("cost_usd")?;
150 let metric_0: i64 = row.get("metric_0")?;
151 let metric_1: i64 = row.get("metric_1")?;
152 let metric_2: i64 = row.get("metric_2")?;
153 let metric_3: i64 = row.get("metric_3")?;
154 let metric_4: i64 = row.get("metric_4")?;
155 let metric_5: i64 = row.get("metric_5")?;
156 let metric_6: i64 = row.get("metric_6")?;
157 let metric_7: i64 = row.get("metric_7")?;
158
159 let created_at: i64 = row.get("created_at")?;
160 let updated_at: i64 = row.get("updated_at")?;
161
162 Ok(Task {
163 id,
164 title,
165 description,
166 status,
167 phase,
168 priority: parse_priority(&priority),
169 worker_id,
170 claimed_at,
171 needed_tags: needed_tags_json
172 .map(|s| serde_json::from_str(&s).unwrap_or_default())
173 .unwrap_or_default(),
174 wanted_tags: wanted_tags_json
175 .map(|s| serde_json::from_str(&s).unwrap_or_default())
176 .unwrap_or_default(),
177 tags: tags_json
178 .map(|s| serde_json::from_str(&s).unwrap_or_default())
179 .unwrap_or_default(),
180 points,
181 time_estimate_ms,
182 time_actual_ms,
183 started_at,
184 completed_at,
185 current_thought,
186 cost_usd,
187 metrics: [
188 metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
189 ],
190 created_at,
191 updated_at,
192 })
193}
194
195fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
197 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
198
199 let result = stmt.query_row(params![task_id], parse_task_row);
200
201 match result {
202 Ok(task) => Ok(Some(task)),
203 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
204 Err(e) => Err(e.into()),
205 }
206}
207
208fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
210 let mut stmt = conn.prepare(
211 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
212 FROM workers WHERE id = ?1",
213 )?;
214
215 let result = stmt.query_row(params![worker_id], |row| {
216 let id: String = row.get(0)?;
217 let tags_json: String = row.get(1)?;
218 let max_claims: i32 = row.get(2)?;
219 let registered_at: i64 = row.get(3)?;
220 let last_heartbeat: i64 = row.get(4)?;
221 let last_status: Option<String> = row.get(5)?;
222 let last_phase: Option<String> = row.get(6)?;
223 let workflow: Option<String> = row.get(7)?;
224 let overlays_json: Option<String> = row.get(8)?;
225
226 Ok((
227 id,
228 tags_json,
229 max_claims,
230 registered_at,
231 last_heartbeat,
232 last_status,
233 last_phase,
234 workflow,
235 overlays_json,
236 ))
237 });
238
239 match result {
240 Ok((
241 id,
242 tags_json,
243 max_claims,
244 registered_at,
245 last_heartbeat,
246 last_status,
247 last_phase,
248 workflow,
249 overlays_json,
250 )) => {
251 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
252 let overlays: Vec<String> = overlays_json
253 .as_deref()
254 .and_then(|s| serde_json::from_str(s).ok())
255 .unwrap_or_default();
256 Ok(Some(Worker {
257 id,
258 tags,
259 max_claims,
260 registered_at,
261 last_heartbeat,
262 last_status,
263 last_phase,
264 workflow,
265 overlays,
266 }))
267 }
268 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
269 Err(e) => Err(e.into()),
270 }
271}
272
273impl Database {
274 #[allow(clippy::too_many_arguments)]
279 pub fn create_task(
280 &self,
281 id: Option<String>,
282 title: String,
283 description: Option<String>,
284 parent_id: Option<String>,
285 phase: Option<String>,
286 priority: Option<Priority>,
287 points: Option<i32>,
288 time_estimate_ms: Option<i64>,
289 agent_tags_all: Option<Vec<String>>,
290 agent_tags_any: Option<Vec<String>>,
291 tags: Option<Vec<String>>,
292 states_config: &StatesConfig,
293 ids_config: &IdsConfig,
294 ) -> Result<Task> {
295 let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
296 let now = now_ms();
297 let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
298 let initial_status = &states_config.initial;
299
300 let needed_tags = agent_tags_all.unwrap_or_default();
301 let wanted_tags = agent_tags_any.unwrap_or_default();
302 let tags = tags.unwrap_or_default();
303 let needed_tags_json = serde_json::to_string(&needed_tags)?;
304 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
305 let tags_json = serde_json::to_string(&tags)?;
306
307 self.with_conn_mut(|conn| {
308 let tx = conn.transaction()?;
309
310 tx.execute(
311 "INSERT INTO tasks (
312 id, title, description, status, phase, priority,
313 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
314 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
315 params![
316 &task_id,
317 &title,
318 &description,
319 initial_status,
320 &phase,
321 priority.to_string(),
322 needed_tags_json,
323 wanted_tags_json,
324 tags_json,
325 points,
326 time_estimate_ms,
327 now,
328 now,
329 ],
330 )?;
331
332 sync_task_tags(&tx, &task_id, &tags)?;
334 sync_needed_tags(&tx, &task_id, &needed_tags)?;
335 sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
336
337 if let Some(ref pid) = parent_id {
339 Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
340 }
341
342 record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
344
345 tx.commit()?;
346
347 Ok(Task {
348 id: task_id,
349 title,
350 description,
351 status: initial_status.clone(),
352 phase,
353 priority,
354 worker_id: None,
355 claimed_at: None,
356 needed_tags,
357 wanted_tags,
358 tags,
359 points,
360 time_estimate_ms,
361 time_actual_ms: None,
362 started_at: None,
363 completed_at: None,
364 current_thought: None,
365 cost_usd: 0.0,
366 metrics: [0; 8],
367 created_at: now,
368 updated_at: now,
369 })
370 })
371 }
372
373 pub fn create_task_simple(
376 &self,
377 description: impl Into<String>,
378 states_config: &StatesConfig,
379 ids_config: &IdsConfig,
380 ) -> Result<Task> {
381 let desc = description.into();
382 self.create_task(
383 None,
384 desc.clone(),
385 Some(desc),
386 None,
387 None,
388 None,
389 None,
390 None,
391 None,
392 None,
393 None,
394 states_config,
395 ids_config,
396 )
397 }
398
399 #[allow(clippy::type_complexity)]
403 pub fn create_task_tree(
404 &self,
405 opts: CreateTreeOptions<'_>,
406 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
407 let mut all_ids = Vec::new();
408 let mut phase_warnings = Vec::new();
409 let mut tag_warnings = Vec::new();
410 let child_type = opts.child_type.or_else(|| Some("contains".to_string()));
412
413 self.with_conn_mut(|conn| {
414 let tx = conn.transaction()?;
415 let root_id = create_tree_recursive(
416 &tx,
417 &opts.input,
418 opts.parent_id.as_deref(),
419 None, child_type.as_deref(),
421 opts.sibling_type.as_deref(),
422 &mut all_ids,
423 &mut phase_warnings,
424 &mut tag_warnings,
425 opts.states_config,
426 opts.phases_config,
427 opts.tags_config,
428 opts.ids_config,
429 )?;
430 tx.commit()?;
431 Ok((root_id, all_ids, phase_warnings, tag_warnings))
432 })
433 }
434
435 pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
437 self.with_conn(|conn| {
438 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
439
440 let result = stmt.query_row(params![task_id], parse_task_row);
441
442 match result {
443 Ok(task) => Ok(Some(task)),
444 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
445 Err(e) => Err(e.into()),
446 }
447 })
448 }
449
450 pub fn rename_task(&self, old_id: &str, new_id: &str) -> Result<()> {
456 if new_id.is_empty() {
458 return Err(anyhow!("new_id must not be empty"));
459 }
460 if new_id.len() > 64 {
461 return Err(anyhow!("new_id must not exceed 64 characters"));
462 }
463
464 self.with_conn_mut(|conn| {
465 let exists: bool = conn.query_row(
467 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
468 params![old_id],
469 |row| row.get(0),
470 )?;
471 if !exists {
472 return Err(anyhow!("Task '{}' not found", old_id));
473 }
474
475 let conflict: bool = conn.query_row(
477 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
478 params![new_id],
479 |row| row.get(0),
480 )?;
481 if conflict {
482 return Err(anyhow!("Task '{}' already exists", new_id));
483 }
484
485 conn.execute_batch("PRAGMA foreign_keys = OFF")?;
487
488 let result = (|| -> Result<()> {
489 let tx = conn.transaction()?;
490
491 tx.execute(
493 "UPDATE tasks SET id = ?1 WHERE id = ?2",
494 params![new_id, old_id],
495 )?;
496
497 tx.execute(
499 "UPDATE attachments SET task_id = ?1 WHERE task_id = ?2",
500 params![new_id, old_id],
501 )?;
502
503 tx.execute(
505 "UPDATE dependencies SET from_task_id = ?1 WHERE from_task_id = ?2",
506 params![new_id, old_id],
507 )?;
508 tx.execute(
509 "UPDATE dependencies SET to_task_id = ?1 WHERE to_task_id = ?2",
510 params![new_id, old_id],
511 )?;
512
513 tx.execute(
515 "UPDATE file_locks SET task_id = ?1 WHERE task_id = ?2",
516 params![new_id, old_id],
517 )?;
518
519 tx.execute(
521 "UPDATE task_tags SET task_id = ?1 WHERE task_id = ?2",
522 params![new_id, old_id],
523 )?;
524 tx.execute(
525 "UPDATE task_needed_tags SET task_id = ?1 WHERE task_id = ?2",
526 params![new_id, old_id],
527 )?;
528 tx.execute(
529 "UPDATE task_wanted_tags SET task_id = ?1 WHERE task_id = ?2",
530 params![new_id, old_id],
531 )?;
532
533 tx.execute(
535 "UPDATE task_sequence SET task_id = ?1 WHERE task_id = ?2",
536 params![new_id, old_id],
537 )?;
538
539 tx.commit()?;
540 Ok(())
541 })();
542
543 conn.execute_batch("PRAGMA foreign_keys = ON")?;
545
546 result?;
548
549 let mut stmt = conn.prepare("PRAGMA foreign_key_check")?;
551 let violations: Vec<String> = stmt
552 .query_map([], |row| {
553 let table: String = row.get(0)?;
554 Ok(table)
555 })?
556 .filter_map(|r| r.ok())
557 .collect();
558
559 if !violations.is_empty() {
560 return Err(anyhow!(
561 "Foreign key violations after rename in tables: {:?}",
562 violations
563 ));
564 }
565
566 Ok(())
567 })
568 }
569
570 pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
572 let task = self.get_task(task_id)?;
573 match task {
574 None => Ok(None),
575 Some(task) => {
576 let children = self.get_children_recursive(&task.id)?;
577 Ok(Some(TaskTree { task, children }))
578 }
579 }
580 }
581
582 fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
584 let children = self.get_children(parent_id)?;
585 let mut result = Vec::new();
586
587 for child in children {
588 let child_children = self.get_children_recursive(&child.id)?;
589 result.push(TaskTree {
590 task: child,
591 children: child_children,
592 });
593 }
594
595 Ok(result)
596 }
597
598 pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
600 self.with_conn(|conn| {
601 let mut stmt = conn.prepare(
602 "SELECT t.* FROM tasks t
603 INNER JOIN dependencies d ON t.id = d.to_task_id
604 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
605 ORDER BY t.created_at",
606 )?;
607
608 let tasks = stmt
609 .query_map(params![parent_id], parse_task_row)?
610 .filter_map(|r| r.ok())
611 .collect();
612
613 Ok(tasks)
614 })
615 }
616
617 #[allow(clippy::too_many_arguments)]
619 pub fn update_task(
620 &self,
621 task_id: &str,
622 title: Option<String>,
623 description: Option<Option<String>>,
624 status: Option<String>,
625 priority: Option<Priority>,
626 points: Option<Option<i32>>,
627 tags: Option<Vec<String>>,
628 states_config: &StatesConfig,
629 ) -> Result<Task> {
630 let now = now_ms();
631
632 self.with_conn(|conn| {
633 let task =
634 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
635
636 let new_title = title.unwrap_or(task.title.clone());
637 let new_description = description.unwrap_or(task.description.clone());
638 let new_status = status.unwrap_or(task.status.clone());
639 let new_priority = priority.unwrap_or(task.priority);
640 let new_points = points.unwrap_or(task.points);
641 let new_tags = tags.unwrap_or(task.tags.clone());
642
643 if !states_config.is_valid_state(&new_status) {
645 return Err(anyhow!(
646 "Invalid state '{}'. Valid states: {:?}",
647 new_status,
648 states_config.state_names()
649 ));
650 }
651
652 if task.status != new_status
654 && !states_config.is_valid_transition(&task.status, &new_status)
655 {
656 let exits = states_config.get_exits(&task.status);
657 return Err(anyhow!(
658 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
659 task.status,
660 new_status,
661 exits
662 ));
663 }
664
665 let started_at =
668 if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
669 Some(now)
670 } else {
671 task.started_at
672 };
673
674 let completed_at = if new_status == "completed" {
676 Some(now)
677 } else {
678 task.completed_at
679 };
680
681 if task.status != new_status {
683 record_state_transition(
684 conn,
685 task_id,
686 &new_status,
687 task.worker_id.as_deref(),
688 None,
689 states_config,
690 )?;
691 }
692
693 conn.execute(
694 "UPDATE tasks SET
695 title = ?1, description = ?2, status = ?3, priority = ?4,
696 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
697 tags = ?9
698 WHERE id = ?10",
699 params![
700 new_title,
701 new_description,
702 new_status,
703 new_priority.to_string(),
704 new_points,
705 started_at,
706 completed_at,
707 now,
708 serde_json::to_string(&new_tags)?,
709 task_id,
710 ],
711 )?;
712
713 Ok(Task {
714 id: task_id.to_string(),
715 title: new_title,
716 description: new_description,
717 status: new_status,
718 priority: new_priority,
719 points: new_points,
720 tags: new_tags,
721 started_at,
722 completed_at,
723 updated_at: now,
724 ..task
725 })
726 })
727 }
728
729 #[allow(clippy::too_many_arguments)]
741 pub fn update_task_unified(
742 &self,
743 task_id: &str,
744 agent_id: &str,
745 assignee: Option<&str>,
746 title: Option<String>,
747 description: Option<Option<String>>,
748 status: Option<String>,
749 phase: Option<String>,
750 priority: Option<Priority>,
751 points: Option<Option<i32>>,
752 tags: Option<Vec<String>>,
753 needed_tags: Option<Vec<String>>,
754 wanted_tags: Option<Vec<String>>,
755 time_estimate_ms: Option<i64>,
756 reason: Option<String>,
757 force: bool,
758 states_config: &StatesConfig,
759 deps_config: &DependenciesConfig,
760 auto_advance: &AutoAdvanceConfig,
761 ) -> Result<(Task, Vec<String>, Vec<String>)> {
762 let now = now_ms();
763
764 self.with_conn_mut(|conn| {
765 let tx = conn.transaction()?;
766
767 let task =
768 get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
769
770 if let Some(ref current_owner) = task.worker_id
772 && current_owner != agent_id && !force {
773 return Err(anyhow!(
774 "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
775 current_owner
776 ));
777 }
778
779 let new_title = title.unwrap_or(task.title.clone());
780 let new_description = description.unwrap_or(task.description.clone());
781 let new_status = if assignee.is_some() && status.is_none() {
783 "assigned".to_string()
784 } else {
785 status.unwrap_or(task.status.clone())
786 };
787 let new_priority = priority.unwrap_or(task.priority);
788 let new_points = points.unwrap_or(task.points);
789 let new_tags = tags.unwrap_or(task.tags.clone());
790 let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
791 let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
792 let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
793 let new_phase = phase.or(task.phase.clone());
794
795 if !states_config.is_valid_state(&new_status) {
797 return Err(anyhow!(
798 "Invalid state '{}'. Valid states: {:?}",
799 new_status,
800 states_config.state_names()
801 ));
802 }
803
804 if task.status != new_status
806 && !states_config.is_valid_transition(&task.status, &new_status) {
807 let exits = states_config.get_exits(&task.status);
808 return Err(anyhow!(
809 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
810 task.status,
811 new_status,
812 exits
813 ));
814 }
815
816 let new_is_timed = states_config.is_timed_state(&new_status);
818 let new_is_terminal = states_config.is_terminal_state(&new_status);
819 let current_owner = task.worker_id.as_deref();
820 let is_owned_by_agent = current_owner == Some(agent_id);
821 let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
822
823 let mut new_owner: Option<String> = task.worker_id.clone();
824 let mut new_claimed_at: Option<i64> = task.claimed_at;
825
826 if let Some(target_agent) = assignee {
829 if is_owned_by_other && !force {
831 return Err(anyhow!(
832 "Task is already claimed by agent '{}'. Use force=true to reassign.",
833 current_owner.unwrap()
834 ));
835 }
836
837 let target = get_worker_internal(&tx, target_agent)?
839 .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
840
841 if !task.needed_tags.is_empty() {
843 for needed in &task.needed_tags {
844 if !target.tags.contains(needed) {
845 return Err(anyhow!(
846 "Assignee '{}' missing required tag: {}",
847 target_agent,
848 needed
849 ));
850 }
851 }
852 }
853
854 if !task.wanted_tags.is_empty() {
855 let has_any = task
856 .wanted_tags
857 .iter()
858 .any(|wanted| target.tags.contains(wanted));
859 if !has_any {
860 return Err(anyhow!(
861 "Assignee '{}' has none of the wanted tags: {:?}",
862 target_agent,
863 task.wanted_tags
864 ));
865 }
866 }
867
868 new_owner = Some(target_agent.to_string());
870 new_claimed_at = Some(now);
871 }
872
873 if new_is_timed && !is_owned_by_agent {
876 if is_owned_by_other && !force {
878 return Err(anyhow!(
879 "Task is already claimed by agent '{}'",
880 current_owner.unwrap()
881 ));
882 }
883
884 if !force {
886 let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
887 &tx,
888 task_id,
889 states_config,
890 deps_config,
891 )?;
892 if !unsatisfied_blockers.is_empty() {
893 return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
896 }
897 }
898
899 let agent = get_worker_internal(&tx, agent_id)?
901 .ok_or_else(|| anyhow!("Agent not found"))?;
902
903 if !task.needed_tags.is_empty() {
905 for needed in &task.needed_tags {
906 if !agent.tags.contains(needed) {
907 return Err(anyhow!("Agent missing required tag: {}", needed));
908 }
909 }
910 }
911
912 if !task.wanted_tags.is_empty() {
914 let has_any = task
915 .wanted_tags
916 .iter()
917 .any(|wanted| agent.tags.contains(wanted));
918 if !has_any {
919 return Err(anyhow!("Agent has none of the wanted tags"));
920 }
921 }
922
923 new_owner = Some(agent_id.to_string());
925 new_claimed_at = Some(now);
926
927 tx.execute(
929 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
930 params![now, agent_id],
931 )?;
932 }
933
934 if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
936 if is_owned_by_other && !force {
938 return Err(anyhow!("Task is not owned by this agent"));
939 }
940
941 new_owner = None;
943 new_claimed_at = None;
944 }
945
946 if new_is_terminal {
948 if let Some(ref current_owner) = task.worker_id
950 && current_owner != agent_id && !force {
951 return Err(anyhow!("Task is not owned by this agent"));
952 }
953
954 let incomplete_children: i32 = tx.query_row(
956 "SELECT COUNT(*) FROM dependencies d
957 INNER JOIN tasks child ON d.to_task_id = child.id
958 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
959 AND child.status IN (SELECT value FROM json_each(?2))",
960 params![
961 task_id,
962 serde_json::to_string(&states_config.blocking_states)?
963 ],
964 |row| row.get(0),
965 )?;
966
967 if incomplete_children > 0 {
968 return Err(anyhow!(
969 "Cannot complete task: {} child task(s) are not complete",
970 incomplete_children
971 ));
972 }
973
974 new_owner = None;
976 new_claimed_at = None;
977
978 tx.execute(
980 "DELETE FROM file_locks WHERE task_id = ?1",
981 params![task_id],
982 )?;
983 }
984
985 let started_at =
987 if task.started_at.is_none() && new_is_timed {
988 Some(now)
989 } else {
990 task.started_at
991 };
992
993 let completed_at = if new_status == "completed" {
995 Some(now)
996 } else {
997 task.completed_at
998 };
999
1000 let status_changed = task.status != new_status;
1002 if status_changed {
1003 record_state_transition(
1004 &tx,
1005 task_id,
1006 &new_status,
1007 new_owner.as_deref(),
1008 reason.as_deref(),
1009 states_config,
1010 )?;
1011 }
1012
1013 let phase_changed = task.phase != new_phase;
1015 if phase_changed {
1016 super::state_transitions::record_phase_transition(
1017 &tx,
1018 task_id,
1019 new_phase.as_deref().unwrap_or(""),
1020 Some(agent_id),
1021 reason.as_deref(),
1022 )?;
1023 }
1024
1025 tx.execute(
1026 "UPDATE tasks SET
1027 title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
1028 points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
1029 tags = ?10, worker_id = ?11, claimed_at = ?12,
1030 needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
1031 WHERE id = ?16",
1032 params![
1033 new_title,
1034 new_description,
1035 new_status,
1036 new_phase,
1037 new_priority.to_string(),
1038 new_points,
1039 started_at,
1040 completed_at,
1041 now,
1042 serde_json::to_string(&new_tags)?,
1043 new_owner,
1044 new_claimed_at,
1045 serde_json::to_string(&new_needed_tags)?,
1046 serde_json::to_string(&new_wanted_tags)?,
1047 new_time_estimate_ms,
1048 task_id,
1049 ],
1050 )?;
1051
1052 if new_tags != task.tags {
1054 sync_task_tags(&tx, task_id, &new_tags)?;
1055 }
1056 if new_needed_tags != task.needed_tags {
1057 sync_needed_tags(&tx, task_id, &new_needed_tags)?;
1058 }
1059 if new_wanted_tags != task.wanted_tags {
1060 sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
1061 }
1062
1063 let (unblocked, auto_advanced) = if status_changed {
1065 let was_blocking = states_config.is_blocking_state(&task.status);
1066 let is_blocking = states_config.is_blocking_state(&new_status);
1067
1068 if was_blocking && !is_blocking {
1069 super::deps::propagate_unblock_effects(
1070 &tx,
1071 task_id,
1072 Some(agent_id),
1073 states_config,
1074 deps_config,
1075 auto_advance,
1076 )?
1077 } else {
1078 (vec![], vec![])
1079 }
1080 } else {
1081 (vec![], vec![])
1082 };
1083
1084 tx.commit()?;
1085
1086 Ok((Task {
1087 id: task_id.to_string(),
1088 title: new_title,
1089 description: new_description,
1090 status: new_status,
1091 phase: new_phase,
1092 priority: new_priority,
1093 points: new_points,
1094 tags: new_tags,
1095 needed_tags: new_needed_tags,
1096 wanted_tags: new_wanted_tags,
1097 time_estimate_ms: new_time_estimate_ms,
1098 started_at,
1099 completed_at,
1100 updated_at: now,
1101 worker_id: new_owner,
1102 claimed_at: new_claimed_at,
1103 ..task
1104 }, unblocked, auto_advanced))
1105 })
1106 }
1107
1108 pub fn delete_task(
1116 &self,
1117 task_id: &str,
1118 worker_id: &str,
1119 cascade: bool,
1120 reason: Option<String>,
1121 obliterate: bool,
1122 force: bool,
1123 ) -> Result<()> {
1124 let now = now_ms();
1125
1126 self.with_conn_mut(|conn| {
1127 let tx = conn.transaction()?;
1128
1129 let task = get_task_internal(&tx, task_id)?
1131 .ok_or_else(|| anyhow!("Task not found"))?;
1132
1133 if let Some(ref owner) = task.worker_id
1135 && owner != worker_id && !force {
1136 return Err(anyhow!(
1137 "Task is claimed by worker '{}'. Use force=true to override.",
1138 owner
1139 ));
1140 }
1141
1142 if obliterate {
1143 if cascade {
1145 tx.execute(
1148 "WITH RECURSIVE descendants AS (
1149 SELECT ?1 AS id
1150 UNION ALL
1151 SELECT dep.to_task_id FROM dependencies dep
1152 INNER JOIN descendants d ON dep.from_task_id = d.id
1153 WHERE dep.dep_type = 'contains'
1154 )
1155 DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1156 params![task_id],
1157 )?;
1158 } else {
1159 let child_count: i32 = tx.query_row(
1161 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1162 params![task_id],
1163 |row| row.get(0),
1164 )?;
1165
1166 if child_count > 0 {
1167 return Err(anyhow!("Task has children; use cascade=true to delete"));
1168 }
1169
1170 tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1171 }
1172 } else {
1173 if cascade {
1175 tx.execute(
1177 "WITH RECURSIVE descendants AS (
1178 SELECT ?1 AS id
1179 UNION ALL
1180 SELECT dep.to_task_id FROM dependencies dep
1181 INNER JOIN descendants d ON dep.from_task_id = d.id
1182 WHERE dep.dep_type = 'contains'
1183 )
1184 UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1185 WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1186 params![task_id, now, worker_id, reason],
1187 )?;
1188 } else {
1189 let child_count: i32 = tx.query_row(
1191 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1192 params![task_id],
1193 |row| row.get(0),
1194 )?;
1195
1196 if child_count > 0 {
1197 return Err(anyhow!("Task has children; use cascade=true to delete"));
1198 }
1199
1200 tx.execute(
1201 "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1202 params![now, worker_id, reason, task_id],
1203 )?;
1204 }
1205 }
1206
1207 tx.commit()?;
1208 Ok(())
1209 })
1210 }
1211
1212 pub fn list_tasks(&self, query: ListTasksQuery<'_>) -> Result<Vec<Task>> {
1215 let ListTasksQuery {
1216 status,
1217 phase,
1218 owner,
1219 parent_id,
1220 limit,
1221 offset,
1222 sort_by,
1223 sort_order,
1224 } = query;
1225 self.with_conn(|conn| {
1226 let mut sql = String::from(
1227 "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1228 );
1229 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1230
1231 if let Some(s) = status {
1232 sql.push_str(" AND t.status = ?");
1233 params_vec.push(Box::new(s.to_string()));
1234 }
1235
1236 if let Some(p) = phase {
1237 sql.push_str(" AND t.phase = ?");
1238 params_vec.push(Box::new(p.to_string()));
1239 }
1240
1241 if let Some(o) = owner {
1242 sql.push_str(" AND t.worker_id = ?");
1243 params_vec.push(Box::new(o.to_string()));
1244 }
1245
1246 if let Some(p) = parent_id {
1248 match p {
1249 Some(pid) => {
1250 sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1251 params_vec.push(Box::new(pid.to_string()));
1252 }
1253 None => {
1254 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1256 }
1257 }
1258 }
1259
1260 let order_clause = build_order_clause(sort_by, sort_order);
1262 sql.push_str(&format!(" ORDER BY {}", order_clause));
1263
1264 if let Some(l) = limit {
1265 sql.push_str(&format!(" LIMIT {}", l));
1266 }
1267
1268 if offset > 0 {
1269 sql.push_str(&format!(" OFFSET {}", offset));
1270 }
1271
1272 let params_refs: Vec<&dyn rusqlite::ToSql> =
1273 params_vec.iter().map(|b| b.as_ref()).collect();
1274
1275 let mut stmt = conn.prepare(&sql)?;
1276 let tasks = stmt
1277 .query_map(params_refs.as_slice(), parse_task_row)?
1278 .filter_map(|r| r.ok())
1279 .collect();
1280
1281 Ok(tasks)
1282 })
1283 }
1284
1285 pub fn set_thought(
1287 &self,
1288 agent_id: &str,
1289 thought: Option<String>,
1290 task_ids: Option<Vec<String>>,
1291 ) -> Result<i32> {
1292 let now = now_ms();
1293
1294 self.with_conn(|conn| {
1295 let updated = if let Some(ids) = task_ids {
1296 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1297 let sql = format!(
1298 "UPDATE tasks SET current_thought = ?, updated_at = ?
1299 WHERE worker_id = ? AND id IN ({})",
1300 placeholders.join(", ")
1301 );
1302
1303 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1304 params_vec.push(Box::new(thought.clone()));
1305 params_vec.push(Box::new(now));
1306 params_vec.push(Box::new(agent_id.to_string()));
1307 for id in &ids {
1308 params_vec.push(Box::new(id.clone()));
1309 }
1310
1311 let params_refs: Vec<&dyn rusqlite::ToSql> =
1312 params_vec.iter().map(|b| b.as_ref()).collect();
1313 conn.execute(&sql, params_refs.as_slice())?
1314 } else {
1315 conn.execute(
1316 "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1317 params![thought, now, agent_id],
1318 )?
1319 };
1320
1321 Ok(updated as i32)
1322 })
1323 }
1324
1325 pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1327 let now = now_ms();
1328
1329 self.with_conn(|conn| {
1330 conn.execute(
1331 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1332 WHERE id = ?3",
1333 params![duration_ms, now, task_id],
1334 )?;
1335
1336 let total: i64 = conn.query_row(
1337 "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1338 params![task_id],
1339 |row| row.get(0),
1340 )?;
1341
1342 Ok(total)
1343 })
1344 }
1345
1346 pub fn log_metrics(
1349 &self,
1350 task_id: &str,
1351 cost_usd: Option<f64>,
1352 values: &[i64],
1353 ) -> Result<Task> {
1354 let now = now_ms();
1355
1356 self.with_conn(|conn| {
1357 let task =
1358 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1359
1360 let mut new_metrics = task.metrics;
1362 for (i, &val) in values.iter().take(8).enumerate() {
1363 new_metrics[i] += val;
1364 }
1365
1366 let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1367
1368 conn.execute(
1369 "UPDATE tasks SET
1370 metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1371 metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1372 cost_usd = ?9, updated_at = ?10
1373 WHERE id = ?11",
1374 params![
1375 new_metrics[0],
1376 new_metrics[1],
1377 new_metrics[2],
1378 new_metrics[3],
1379 new_metrics[4],
1380 new_metrics[5],
1381 new_metrics[6],
1382 new_metrics[7],
1383 new_cost_usd,
1384 now,
1385 task_id,
1386 ],
1387 )?;
1388
1389 Ok(Task {
1390 cost_usd: new_cost_usd,
1391 metrics: new_metrics,
1392 updated_at: now,
1393 ..task
1394 })
1395 })
1396 }
1397
1398 pub fn claim_task(
1401 &self,
1402 task_id: &str,
1403 agent_id: &str,
1404 states_config: &StatesConfig,
1405 ) -> Result<Task> {
1406 let now = now_ms();
1407
1408 let claim_status = states_config
1410 .definitions
1411 .iter()
1412 .find(|(_, def)| def.timed)
1413 .map(|(name, _)| name.as_str())
1414 .unwrap_or("working");
1415
1416 self.with_conn(|conn| {
1417 let task =
1419 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1420
1421 if task.worker_id.is_some() {
1423 return Err(anyhow!("Task is already claimed"));
1424 }
1425
1426 if !states_config.is_valid_transition(&task.status, claim_status) {
1428 let exits = states_config.get_exits(&task.status);
1429 return Err(anyhow!(
1430 "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1431 task.status,
1432 exits
1433 ));
1434 }
1435
1436 let agent =
1438 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1439
1440 if !task.needed_tags.is_empty() {
1442 for needed in &task.needed_tags {
1443 if !agent.tags.contains(needed) {
1444 return Err(anyhow!("Agent missing required tag: {}", needed));
1445 }
1446 }
1447 }
1448
1449 if !task.wanted_tags.is_empty() {
1451 let has_any = task
1452 .wanted_tags
1453 .iter()
1454 .any(|wanted| agent.tags.contains(wanted));
1455 if !has_any {
1456 return Err(anyhow!("Agent has none of the wanted tags"));
1457 }
1458 }
1459
1460 conn.execute(
1461 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1462 WHERE id = ?6",
1463 params![agent_id, now, claim_status, now, now, task_id,],
1464 )?;
1465
1466 record_state_transition(
1468 conn,
1469 task_id,
1470 claim_status,
1471 Some(agent_id),
1472 None,
1473 states_config,
1474 )?;
1475
1476 conn.execute(
1478 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1479 params![now, agent_id],
1480 )?;
1481
1482 Ok(Task {
1483 worker_id: Some(agent_id.to_string()),
1484 claimed_at: Some(now),
1485 status: claim_status.to_string(),
1486 started_at: Some(now),
1487 updated_at: now,
1488 ..task
1489 })
1490 })
1491 }
1492
1493 pub fn release_task(
1495 &self,
1496 task_id: &str,
1497 agent_id: &str,
1498 states_config: &StatesConfig,
1499 ) -> Result<()> {
1500 let now = now_ms();
1501 let release_status = &states_config.initial;
1502
1503 self.with_conn(|conn| {
1504 let task =
1505 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1506
1507 if task.worker_id.as_deref() != Some(agent_id) {
1508 return Err(anyhow!("Task is not owned by this agent"));
1509 }
1510
1511 record_state_transition(
1513 conn,
1514 task_id,
1515 release_status,
1516 Some(agent_id),
1517 None,
1518 states_config,
1519 )?;
1520
1521 conn.execute(
1522 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1523 WHERE id = ?3",
1524 params![release_status, now, task_id],
1525 )?;
1526
1527 Ok(())
1528 })
1529 }
1530
1531 pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1533 let now = now_ms();
1534 let release_status = &states_config.initial;
1535
1536 self.with_conn(|conn| {
1537 let task =
1538 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1539
1540 record_state_transition(
1542 conn,
1543 task_id,
1544 release_status,
1545 task.worker_id.as_deref(),
1546 None,
1547 states_config,
1548 )?;
1549
1550 conn.execute(
1551 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1552 WHERE id = ?3",
1553 params![release_status, now, task_id],
1554 )?;
1555
1556 Ok(())
1557 })
1558 }
1559
1560 pub fn force_claim_task(
1562 &self,
1563 task_id: &str,
1564 agent_id: &str,
1565 states_config: &StatesConfig,
1566 ) -> Result<Task> {
1567 let now = now_ms();
1568
1569 let claim_status = states_config
1571 .definitions
1572 .iter()
1573 .find(|(_, def)| def.timed)
1574 .map(|(name, _)| name.as_str())
1575 .unwrap_or("working");
1576
1577 self.with_conn(|conn| {
1578 let task =
1580 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1581
1582 let agent =
1584 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1585
1586 if !task.needed_tags.is_empty() {
1588 for needed in &task.needed_tags {
1589 if !agent.tags.contains(needed) {
1590 return Err(anyhow!("Agent missing required tag: {}", needed));
1591 }
1592 }
1593 }
1594
1595 if !task.wanted_tags.is_empty() {
1597 let has_any = task
1598 .wanted_tags
1599 .iter()
1600 .any(|wanted| agent.tags.contains(wanted));
1601 if !has_any {
1602 return Err(anyhow!("Agent has none of the wanted tags"));
1603 }
1604 }
1605
1606 conn.execute(
1607 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1608 WHERE id = ?6",
1609 params![agent_id, now, claim_status, now, now, task_id,],
1610 )?;
1611
1612 record_state_transition(
1614 conn,
1615 task_id,
1616 claim_status,
1617 Some(agent_id),
1618 None,
1619 states_config,
1620 )?;
1621
1622 conn.execute(
1624 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1625 params![now, agent_id],
1626 )?;
1627
1628 Ok(Task {
1629 worker_id: Some(agent_id.to_string()),
1630 claimed_at: Some(now),
1631 status: claim_status.to_string(),
1632 started_at: task.started_at.or(Some(now)),
1633 updated_at: now,
1634 ..task
1635 })
1636 })
1637 }
1638
1639 pub fn release_task_with_state(
1641 &self,
1642 task_id: &str,
1643 agent_id: &str,
1644 state: &str,
1645 states_config: &StatesConfig,
1646 ) -> Result<()> {
1647 let now = now_ms();
1648
1649 self.with_conn(|conn| {
1650 let task =
1651 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1652
1653 if task.worker_id.as_deref() != Some(agent_id) {
1654 return Err(anyhow!("Task is not owned by this agent"));
1655 }
1656
1657 if !states_config.is_valid_state(state) {
1659 return Err(anyhow!(
1660 "Invalid state '{}'. Valid states: {:?}",
1661 state,
1662 states_config.state_names()
1663 ));
1664 }
1665
1666 if !states_config.is_valid_transition(&task.status, state) {
1668 let exits = states_config.get_exits(&task.status);
1669 return Err(anyhow!(
1670 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1671 task.status,
1672 state,
1673 exits
1674 ));
1675 }
1676
1677 let completed_at = if state == "completed" {
1679 Some(now)
1680 } else {
1681 None
1682 };
1683
1684 record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1686
1687 conn.execute(
1688 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1689 WHERE id = ?4",
1690 params![state, completed_at, now, task_id],
1691 )?;
1692
1693 Ok(())
1694 })
1695 }
1696
1697 pub fn force_release_stale(
1699 &self,
1700 timeout_seconds: i64,
1701 states_config: &StatesConfig,
1702 ) -> Result<i32> {
1703 let now = now_ms();
1704 let cutoff = now - (timeout_seconds * 1000);
1705 let release_status = &states_config.initial;
1706
1707 self.with_conn(|conn| {
1708 let updated = conn.execute(
1709 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1710 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1711 params![release_status, now, cutoff],
1712 )?;
1713
1714 Ok(updated as i32)
1715 })
1716 }
1717
1718 pub fn complete_task(
1722 &self,
1723 task_id: &str,
1724 agent_id: &str,
1725 states_config: &StatesConfig,
1726 ) -> Result<Task> {
1727 let now = now_ms();
1728
1729 let complete_status = if states_config.definitions.contains_key("completed") {
1731 "completed"
1732 } else {
1733 states_config
1735 .definitions
1736 .iter()
1737 .find(|(_, def)| def.exits.is_empty())
1738 .map(|(name, _)| name.as_str())
1739 .unwrap_or("completed")
1740 };
1741
1742 self.with_conn_mut(|conn| {
1743 let tx = conn.transaction()?;
1744
1745 let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1747 let task = stmt
1748 .query_row(params![task_id], parse_task_row)
1749 .map_err(|_| anyhow!("Task not found"))?;
1750 drop(stmt);
1751
1752 if task.worker_id.as_deref() != Some(agent_id) {
1754 return Err(anyhow!("Task is not owned by this agent"));
1755 }
1756
1757 let incomplete_children: i32 = tx.query_row(
1759 "SELECT COUNT(*) FROM dependencies d
1760 INNER JOIN tasks child ON d.to_task_id = child.id
1761 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1762 AND child.status IN (SELECT value FROM json_each(?2))",
1763 params![
1764 task_id,
1765 serde_json::to_string(&states_config.blocking_states)?
1766 ],
1767 |row| row.get(0),
1768 )?;
1769
1770 if incomplete_children > 0 {
1771 return Err(anyhow!(
1772 "Cannot complete task: {} child task(s) are not complete",
1773 incomplete_children
1774 ));
1775 }
1776
1777 if !states_config.is_valid_transition(&task.status, complete_status) {
1779 let exits = states_config.get_exits(&task.status);
1780 return Err(anyhow!(
1781 "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1782 task.status,
1783 exits
1784 ));
1785 }
1786
1787 record_state_transition(
1789 &tx,
1790 task_id,
1791 complete_status,
1792 Some(agent_id),
1793 None,
1794 states_config,
1795 )?;
1796
1797 tx.execute(
1799 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1800 worker_id = NULL, claimed_at = NULL
1801 WHERE id = ?4",
1802 params![complete_status, now, now, task_id],
1803 )?;
1804
1805 tx.execute(
1807 "DELETE FROM file_locks WHERE task_id = ?1",
1808 params![task_id],
1809 )?;
1810
1811 tx.execute(
1813 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1814 params![now, agent_id],
1815 )?;
1816
1817 tx.commit()?;
1818
1819 Ok(Task {
1820 status: complete_status.to_string(),
1821 completed_at: Some(now),
1822 updated_at: now,
1823 worker_id: None,
1824 claimed_at: None,
1825 ..task
1826 })
1827 })
1828 }
1829
1830 pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1832 self.with_conn(|conn| {
1833 let mut stmt =
1834 conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1835 let tasks = stmt
1836 .query_map([], parse_task_row)?
1837 .filter_map(|r| r.ok())
1838 .collect();
1839 Ok(tasks)
1840 })
1841 }
1842
1843 #[allow(dead_code)]
1845 pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1846 self.with_conn(|conn| {
1847 let mut stmt =
1848 conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1849 let tasks = stmt
1850 .query_map(params![status], parse_task_row)?
1851 .filter_map(|r| r.ok())
1852 .collect();
1853 Ok(tasks)
1854 })
1855 }
1856
1857 pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1859 self.with_conn(|conn| {
1860 let tasks = if let Some(aid) = agent_id {
1861 let mut stmt = conn
1862 .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1863 stmt.query_map(params![aid], parse_task_row)?
1864 .filter_map(|r| r.ok())
1865 .collect()
1866 } else {
1867 let mut stmt = conn.prepare(
1868 "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1869 )?;
1870 stmt.query_map([], parse_task_row)?
1871 .filter_map(|r| r.ok())
1872 .collect()
1873 };
1874
1875 Ok(tasks)
1876 })
1877 }
1878}
1879
1880#[allow(clippy::too_many_arguments)]
1885fn create_tree_recursive(
1886 conn: &Connection,
1887 input: &TaskTreeInput,
1888 parent_id: Option<&str>,
1889 prev_sibling_id: Option<&str>,
1890 child_type: Option<&str>,
1891 sibling_type: Option<&str>,
1892 all_ids: &mut Vec<String>,
1893 phase_warnings: &mut Vec<String>,
1894 tag_warnings: &mut Vec<String>,
1895 states_config: &StatesConfig,
1896 phases_config: &PhasesConfig,
1897 tags_config: &TagsConfig,
1898 ids_config: &IdsConfig,
1899) -> Result<String> {
1900 let task_id = if let Some(ref ref_id) = input.ref_id {
1902 let exists: bool = conn.query_row(
1904 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1905 params![ref_id],
1906 |row| row.get(0),
1907 )?;
1908 if !exists {
1909 return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1910 }
1911 ref_id.clone()
1912 } else {
1913 let task_id = input
1915 .id
1916 .clone()
1917 .unwrap_or_else(|| generate_task_id(ids_config));
1918 let now = now_ms();
1919 let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1920 let initial_status = &states_config.initial;
1921
1922 let title = input.title.clone().unwrap_or_else(|| {
1924 input
1925 .description
1926 .as_deref()
1927 .map(|d| crate::format::truncate_title(d).into_owned())
1928 .unwrap_or_default()
1929 });
1930
1931 if let Some(ref phase) = input.phase
1933 && let Some(warning) = phases_config.check_phase(phase)?
1934 {
1935 phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1936 }
1937
1938 let needed_tags = input.needed_tags.clone().unwrap_or_default();
1939 let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1940 let tags = input.tags.clone().unwrap_or_default();
1941
1942 for warning in tags_config.validate_tags(&tags)? {
1944 tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1945 }
1946 for warning in tags_config.validate_tags(&needed_tags)? {
1947 tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1948 }
1949 for warning in tags_config.validate_tags(&wanted_tags)? {
1950 tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1951 }
1952
1953 let needed_tags_json = serde_json::to_string(&needed_tags)?;
1954 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1955 let tags_json = serde_json::to_string(&tags)?;
1956
1957 conn.execute(
1958 "INSERT INTO tasks (
1959 id, title, description, status, phase, priority,
1960 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1961 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1962 params![
1963 &task_id,
1964 &title,
1965 &input.description,
1966 initial_status,
1967 &input.phase,
1968 priority.to_string(),
1969 needed_tags_json,
1970 wanted_tags_json,
1971 tags_json,
1972 input.points,
1973 input.time_estimate_ms,
1974 now,
1975 now,
1976 ],
1977 )?;
1978
1979 record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1981
1982 sync_task_tags(conn, &task_id, &tags)?;
1984 sync_needed_tags(conn, &task_id, &needed_tags)?;
1985 sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1986
1987 task_id
1988 };
1989
1990 if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1992 Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1993 }
1994
1995 if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1997 Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1998 }
1999
2000 all_ids.push(task_id.clone());
2001
2002 let mut prev_child_id: Option<String> = None;
2004 for child in input.children.iter() {
2005 let child_id = create_tree_recursive(
2006 conn,
2007 child,
2008 Some(&task_id),
2009 prev_child_id.as_deref(),
2010 child_type,
2011 sibling_type,
2012 all_ids,
2013 phase_warnings,
2014 tag_warnings,
2015 states_config,
2016 phases_config,
2017 tags_config,
2018 ids_config,
2019 )?;
2020 prev_child_id = Some(child_id);
2021 }
2022
2023 Ok(task_id)
2024}