1use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6 AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10 PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11 parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17#[derive(Debug)]
19pub struct CreateTreeOptions<'a> {
20 pub input: TaskTreeInput,
21 pub parent_id: Option<String>,
22 pub child_type: Option<String>,
23 pub sibling_type: Option<String>,
24 pub states_config: &'a StatesConfig,
25 pub phases_config: &'a PhasesConfig,
26 pub tags_config: &'a TagsConfig,
27 pub ids_config: &'a IdsConfig,
28}
29
30#[derive(Debug, Default)]
32pub struct ListTasksQuery<'a> {
33 pub status: Option<&'a str>,
34 pub phase: Option<&'a str>,
35 pub owner: Option<&'a str>,
36 pub parent_id: Option<Option<&'a str>>,
37 pub limit: Option<i32>,
38 pub offset: i32,
39 pub sort_by: Option<&'a str>,
40 pub sort_order: Option<&'a str>,
41}
42
43fn generate_task_id(ids_config: &IdsConfig) -> String {
46 let words = ids_config.task_id_words;
47 let case = ids_config.id_case;
48
49 let base = Petnames::medium()
51 .generate_one(words, "-")
52 .unwrap_or_else(|| format!("task-{}", super::now_ms()));
53
54 case.convert(&base)
56}
57
58fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
61 let field = match sort_by {
62 Some("priority") => "CAST(t.priority AS INTEGER)",
63 Some("created_at") => "t.created_at",
64 Some("updated_at") => "t.updated_at",
65 _ => "t.created_at", };
67
68 let order = match sort_order {
69 Some("asc") => "ASC",
70 Some("desc") => "DESC",
71 _ => {
72 "DESC"
74 }
75 };
76
77 format!("{} {}", field, order)
78}
79
80fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
87 conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
88 for tag in tags {
89 conn.execute(
90 "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
91 params![task_id, tag],
92 )?;
93 }
94 Ok(())
95}
96
97fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
99 conn.execute(
100 "DELETE FROM task_needed_tags WHERE task_id = ?1",
101 params![task_id],
102 )?;
103 for tag in tags {
104 conn.execute(
105 "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
106 params![task_id, tag],
107 )?;
108 }
109 Ok(())
110}
111
112fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
114 conn.execute(
115 "DELETE FROM task_wanted_tags WHERE task_id = ?1",
116 params![task_id],
117 )?;
118 for tag in tags {
119 conn.execute(
120 "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
121 params![task_id, tag],
122 )?;
123 }
124 Ok(())
125}
126
127pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
128 let id: String = row.get("id")?;
129 let title: String = row.get("title")?;
130 let description: Option<String> = row.get("description")?;
131 let status: String = row.get("status")?;
132 let phase: Option<String> = row.get("phase")?;
133 let priority: String = row.get("priority")?;
134 let worker_id: Option<String> = row.get("worker_id")?;
135 let claimed_at: Option<i64> = row.get("claimed_at")?;
136
137 let needed_tags_json: Option<String> = row.get("needed_tags")?;
138 let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
139 let tags_json: Option<String> = row.get("tags")?;
140
141 let points: Option<i32> = row.get("points")?;
142 let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
143 let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
144 let started_at: Option<i64> = row.get("started_at")?;
145 let completed_at: Option<i64> = row.get("completed_at")?;
146
147 let current_thought: Option<String> = row.get("current_thought")?;
148
149 let cost_usd: f64 = row.get("cost_usd")?;
150 let metric_0: i64 = row.get("metric_0")?;
151 let metric_1: i64 = row.get("metric_1")?;
152 let metric_2: i64 = row.get("metric_2")?;
153 let metric_3: i64 = row.get("metric_3")?;
154 let metric_4: i64 = row.get("metric_4")?;
155 let metric_5: i64 = row.get("metric_5")?;
156 let metric_6: i64 = row.get("metric_6")?;
157 let metric_7: i64 = row.get("metric_7")?;
158
159 let created_at: i64 = row.get("created_at")?;
160 let updated_at: i64 = row.get("updated_at")?;
161
162 Ok(Task {
163 id,
164 title,
165 description,
166 status,
167 phase,
168 priority: parse_priority(&priority),
169 worker_id,
170 claimed_at,
171 needed_tags: needed_tags_json
172 .map(|s| serde_json::from_str(&s).unwrap_or_default())
173 .unwrap_or_default(),
174 wanted_tags: wanted_tags_json
175 .map(|s| serde_json::from_str(&s).unwrap_or_default())
176 .unwrap_or_default(),
177 tags: tags_json
178 .map(|s| serde_json::from_str(&s).unwrap_or_default())
179 .unwrap_or_default(),
180 points,
181 time_estimate_ms,
182 time_actual_ms,
183 started_at,
184 completed_at,
185 current_thought,
186 cost_usd,
187 metrics: [
188 metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
189 ],
190 created_at,
191 updated_at,
192 })
193}
194
195fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
197 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
198
199 let result = stmt.query_row(params![task_id], parse_task_row);
200
201 match result {
202 Ok(task) => Ok(Some(task)),
203 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
204 Err(e) => Err(e.into()),
205 }
206}
207
208fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
210 let mut stmt = conn.prepare(
211 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow
212 FROM workers WHERE id = ?1",
213 )?;
214
215 let result = stmt.query_row(params![worker_id], |row| {
216 let id: String = row.get(0)?;
217 let tags_json: String = row.get(1)?;
218 let max_claims: i32 = row.get(2)?;
219 let registered_at: i64 = row.get(3)?;
220 let last_heartbeat: i64 = row.get(4)?;
221 let last_status: Option<String> = row.get(5)?;
222 let last_phase: Option<String> = row.get(6)?;
223 let workflow: Option<String> = row.get(7)?;
224
225 Ok((
226 id,
227 tags_json,
228 max_claims,
229 registered_at,
230 last_heartbeat,
231 last_status,
232 last_phase,
233 workflow,
234 ))
235 });
236
237 match result {
238 Ok((
239 id,
240 tags_json,
241 max_claims,
242 registered_at,
243 last_heartbeat,
244 last_status,
245 last_phase,
246 workflow,
247 )) => {
248 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
249 Ok(Some(Worker {
250 id,
251 tags,
252 max_claims,
253 registered_at,
254 last_heartbeat,
255 last_status,
256 last_phase,
257 workflow,
258 }))
259 }
260 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
261 Err(e) => Err(e.into()),
262 }
263}
264
265impl Database {
266 #[allow(clippy::too_many_arguments)]
271 pub fn create_task(
272 &self,
273 id: Option<String>,
274 title: String,
275 description: Option<String>,
276 parent_id: Option<String>,
277 phase: Option<String>,
278 priority: Option<Priority>,
279 points: Option<i32>,
280 time_estimate_ms: Option<i64>,
281 agent_tags_all: Option<Vec<String>>,
282 agent_tags_any: Option<Vec<String>>,
283 tags: Option<Vec<String>>,
284 states_config: &StatesConfig,
285 ids_config: &IdsConfig,
286 ) -> Result<Task> {
287 let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
288 let now = now_ms();
289 let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
290 let initial_status = &states_config.initial;
291
292 let needed_tags = agent_tags_all.unwrap_or_default();
293 let wanted_tags = agent_tags_any.unwrap_or_default();
294 let tags = tags.unwrap_or_default();
295 let needed_tags_json = serde_json::to_string(&needed_tags)?;
296 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
297 let tags_json = serde_json::to_string(&tags)?;
298
299 self.with_conn_mut(|conn| {
300 let tx = conn.transaction()?;
301
302 tx.execute(
303 "INSERT INTO tasks (
304 id, title, description, status, phase, priority,
305 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
306 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
307 params![
308 &task_id,
309 &title,
310 &description,
311 initial_status,
312 &phase,
313 priority.to_string(),
314 needed_tags_json,
315 wanted_tags_json,
316 tags_json,
317 points,
318 time_estimate_ms,
319 now,
320 now,
321 ],
322 )?;
323
324 sync_task_tags(&tx, &task_id, &tags)?;
326 sync_needed_tags(&tx, &task_id, &needed_tags)?;
327 sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
328
329 if let Some(ref pid) = parent_id {
331 Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
332 }
333
334 record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
336
337 tx.commit()?;
338
339 Ok(Task {
340 id: task_id,
341 title,
342 description,
343 status: initial_status.clone(),
344 phase,
345 priority,
346 worker_id: None,
347 claimed_at: None,
348 needed_tags,
349 wanted_tags,
350 tags,
351 points,
352 time_estimate_ms,
353 time_actual_ms: None,
354 started_at: None,
355 completed_at: None,
356 current_thought: None,
357 cost_usd: 0.0,
358 metrics: [0; 8],
359 created_at: now,
360 updated_at: now,
361 })
362 })
363 }
364
365 pub fn create_task_simple(
368 &self,
369 description: impl Into<String>,
370 states_config: &StatesConfig,
371 ids_config: &IdsConfig,
372 ) -> Result<Task> {
373 let desc = description.into();
374 self.create_task(
375 None,
376 desc.clone(),
377 Some(desc),
378 None,
379 None,
380 None,
381 None,
382 None,
383 None,
384 None,
385 None,
386 states_config,
387 ids_config,
388 )
389 }
390
391 #[allow(clippy::type_complexity)]
395 pub fn create_task_tree(
396 &self,
397 opts: CreateTreeOptions<'_>,
398 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
399 let mut all_ids = Vec::new();
400 let mut phase_warnings = Vec::new();
401 let mut tag_warnings = Vec::new();
402 let child_type = opts.child_type.or_else(|| Some("contains".to_string()));
404
405 self.with_conn_mut(|conn| {
406 let tx = conn.transaction()?;
407 let root_id = create_tree_recursive(
408 &tx,
409 &opts.input,
410 opts.parent_id.as_deref(),
411 None, child_type.as_deref(),
413 opts.sibling_type.as_deref(),
414 &mut all_ids,
415 &mut phase_warnings,
416 &mut tag_warnings,
417 opts.states_config,
418 opts.phases_config,
419 opts.tags_config,
420 opts.ids_config,
421 )?;
422 tx.commit()?;
423 Ok((root_id, all_ids, phase_warnings, tag_warnings))
424 })
425 }
426
427 pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
429 self.with_conn(|conn| {
430 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
431
432 let result = stmt.query_row(params![task_id], parse_task_row);
433
434 match result {
435 Ok(task) => Ok(Some(task)),
436 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
437 Err(e) => Err(e.into()),
438 }
439 })
440 }
441
442 pub fn rename_task(&self, old_id: &str, new_id: &str) -> Result<()> {
448 if new_id.is_empty() {
450 return Err(anyhow!("new_id must not be empty"));
451 }
452 if new_id.len() > 64 {
453 return Err(anyhow!("new_id must not exceed 64 characters"));
454 }
455
456 self.with_conn_mut(|conn| {
457 let exists: bool = conn.query_row(
459 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
460 params![old_id],
461 |row| row.get(0),
462 )?;
463 if !exists {
464 return Err(anyhow!("Task '{}' not found", old_id));
465 }
466
467 let conflict: bool = conn.query_row(
469 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
470 params![new_id],
471 |row| row.get(0),
472 )?;
473 if conflict {
474 return Err(anyhow!("Task '{}' already exists", new_id));
475 }
476
477 conn.execute_batch("PRAGMA foreign_keys = OFF")?;
479
480 let result = (|| -> Result<()> {
481 let tx = conn.transaction()?;
482
483 tx.execute(
485 "UPDATE tasks SET id = ?1 WHERE id = ?2",
486 params![new_id, old_id],
487 )?;
488
489 tx.execute(
491 "UPDATE attachments SET task_id = ?1 WHERE task_id = ?2",
492 params![new_id, old_id],
493 )?;
494
495 tx.execute(
497 "UPDATE dependencies SET from_task_id = ?1 WHERE from_task_id = ?2",
498 params![new_id, old_id],
499 )?;
500 tx.execute(
501 "UPDATE dependencies SET to_task_id = ?1 WHERE to_task_id = ?2",
502 params![new_id, old_id],
503 )?;
504
505 tx.execute(
507 "UPDATE file_locks SET task_id = ?1 WHERE task_id = ?2",
508 params![new_id, old_id],
509 )?;
510
511 tx.execute(
513 "UPDATE task_tags SET task_id = ?1 WHERE task_id = ?2",
514 params![new_id, old_id],
515 )?;
516 tx.execute(
517 "UPDATE task_needed_tags SET task_id = ?1 WHERE task_id = ?2",
518 params![new_id, old_id],
519 )?;
520 tx.execute(
521 "UPDATE task_wanted_tags SET task_id = ?1 WHERE task_id = ?2",
522 params![new_id, old_id],
523 )?;
524
525 tx.execute(
527 "UPDATE task_sequence SET task_id = ?1 WHERE task_id = ?2",
528 params![new_id, old_id],
529 )?;
530
531 tx.commit()?;
532 Ok(())
533 })();
534
535 conn.execute_batch("PRAGMA foreign_keys = ON")?;
537
538 result?;
540
541 let mut stmt = conn.prepare("PRAGMA foreign_key_check")?;
543 let violations: Vec<String> = stmt
544 .query_map([], |row| {
545 let table: String = row.get(0)?;
546 Ok(table)
547 })?
548 .filter_map(|r| r.ok())
549 .collect();
550
551 if !violations.is_empty() {
552 return Err(anyhow!(
553 "Foreign key violations after rename in tables: {:?}",
554 violations
555 ));
556 }
557
558 Ok(())
559 })
560 }
561
562 pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
564 let task = self.get_task(task_id)?;
565 match task {
566 None => Ok(None),
567 Some(task) => {
568 let children = self.get_children_recursive(&task.id)?;
569 Ok(Some(TaskTree { task, children }))
570 }
571 }
572 }
573
574 fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
576 let children = self.get_children(parent_id)?;
577 let mut result = Vec::new();
578
579 for child in children {
580 let child_children = self.get_children_recursive(&child.id)?;
581 result.push(TaskTree {
582 task: child,
583 children: child_children,
584 });
585 }
586
587 Ok(result)
588 }
589
590 pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
592 self.with_conn(|conn| {
593 let mut stmt = conn.prepare(
594 "SELECT t.* FROM tasks t
595 INNER JOIN dependencies d ON t.id = d.to_task_id
596 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
597 ORDER BY t.created_at",
598 )?;
599
600 let tasks = stmt
601 .query_map(params![parent_id], parse_task_row)?
602 .filter_map(|r| r.ok())
603 .collect();
604
605 Ok(tasks)
606 })
607 }
608
609 #[allow(clippy::too_many_arguments)]
611 pub fn update_task(
612 &self,
613 task_id: &str,
614 title: Option<String>,
615 description: Option<Option<String>>,
616 status: Option<String>,
617 priority: Option<Priority>,
618 points: Option<Option<i32>>,
619 tags: Option<Vec<String>>,
620 states_config: &StatesConfig,
621 ) -> Result<Task> {
622 let now = now_ms();
623
624 self.with_conn(|conn| {
625 let task =
626 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
627
628 let new_title = title.unwrap_or(task.title.clone());
629 let new_description = description.unwrap_or(task.description.clone());
630 let new_status = status.unwrap_or(task.status.clone());
631 let new_priority = priority.unwrap_or(task.priority);
632 let new_points = points.unwrap_or(task.points);
633 let new_tags = tags.unwrap_or(task.tags.clone());
634
635 if !states_config.is_valid_state(&new_status) {
637 return Err(anyhow!(
638 "Invalid state '{}'. Valid states: {:?}",
639 new_status,
640 states_config.state_names()
641 ));
642 }
643
644 if task.status != new_status
646 && !states_config.is_valid_transition(&task.status, &new_status)
647 {
648 let exits = states_config.get_exits(&task.status);
649 return Err(anyhow!(
650 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
651 task.status,
652 new_status,
653 exits
654 ));
655 }
656
657 let started_at =
660 if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
661 Some(now)
662 } else {
663 task.started_at
664 };
665
666 let completed_at = if new_status == "completed" {
668 Some(now)
669 } else {
670 task.completed_at
671 };
672
673 if task.status != new_status {
675 record_state_transition(
676 conn,
677 task_id,
678 &new_status,
679 task.worker_id.as_deref(),
680 None,
681 states_config,
682 )?;
683 }
684
685 conn.execute(
686 "UPDATE tasks SET
687 title = ?1, description = ?2, status = ?3, priority = ?4,
688 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
689 tags = ?9
690 WHERE id = ?10",
691 params![
692 new_title,
693 new_description,
694 new_status,
695 new_priority.to_string(),
696 new_points,
697 started_at,
698 completed_at,
699 now,
700 serde_json::to_string(&new_tags)?,
701 task_id,
702 ],
703 )?;
704
705 Ok(Task {
706 id: task_id.to_string(),
707 title: new_title,
708 description: new_description,
709 status: new_status,
710 priority: new_priority,
711 points: new_points,
712 tags: new_tags,
713 started_at,
714 completed_at,
715 updated_at: now,
716 ..task
717 })
718 })
719 }
720
721 #[allow(clippy::too_many_arguments)]
733 pub fn update_task_unified(
734 &self,
735 task_id: &str,
736 agent_id: &str,
737 assignee: Option<&str>,
738 title: Option<String>,
739 description: Option<Option<String>>,
740 status: Option<String>,
741 phase: Option<String>,
742 priority: Option<Priority>,
743 points: Option<Option<i32>>,
744 tags: Option<Vec<String>>,
745 needed_tags: Option<Vec<String>>,
746 wanted_tags: Option<Vec<String>>,
747 time_estimate_ms: Option<i64>,
748 reason: Option<String>,
749 force: bool,
750 states_config: &StatesConfig,
751 deps_config: &DependenciesConfig,
752 auto_advance: &AutoAdvanceConfig,
753 ) -> Result<(Task, Vec<String>, Vec<String>)> {
754 let now = now_ms();
755
756 self.with_conn_mut(|conn| {
757 let tx = conn.transaction()?;
758
759 let task =
760 get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
761
762 if let Some(ref current_owner) = task.worker_id
764 && current_owner != agent_id && !force {
765 return Err(anyhow!(
766 "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
767 current_owner
768 ));
769 }
770
771 let new_title = title.unwrap_or(task.title.clone());
772 let new_description = description.unwrap_or(task.description.clone());
773 let new_status = if assignee.is_some() && status.is_none() {
775 "assigned".to_string()
776 } else {
777 status.unwrap_or(task.status.clone())
778 };
779 let new_priority = priority.unwrap_or(task.priority);
780 let new_points = points.unwrap_or(task.points);
781 let new_tags = tags.unwrap_or(task.tags.clone());
782 let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
783 let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
784 let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
785 let new_phase = phase.or(task.phase.clone());
786
787 if !states_config.is_valid_state(&new_status) {
789 return Err(anyhow!(
790 "Invalid state '{}'. Valid states: {:?}",
791 new_status,
792 states_config.state_names()
793 ));
794 }
795
796 if task.status != new_status
798 && !states_config.is_valid_transition(&task.status, &new_status) {
799 let exits = states_config.get_exits(&task.status);
800 return Err(anyhow!(
801 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
802 task.status,
803 new_status,
804 exits
805 ));
806 }
807
808 let new_is_timed = states_config.is_timed_state(&new_status);
810 let new_is_terminal = states_config.is_terminal_state(&new_status);
811 let current_owner = task.worker_id.as_deref();
812 let is_owned_by_agent = current_owner == Some(agent_id);
813 let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
814
815 let mut new_owner: Option<String> = task.worker_id.clone();
816 let mut new_claimed_at: Option<i64> = task.claimed_at;
817
818 if let Some(target_agent) = assignee {
821 if is_owned_by_other && !force {
823 return Err(anyhow!(
824 "Task is already claimed by agent '{}'. Use force=true to reassign.",
825 current_owner.unwrap()
826 ));
827 }
828
829 let target = get_worker_internal(&tx, target_agent)?
831 .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
832
833 if !task.needed_tags.is_empty() {
835 for needed in &task.needed_tags {
836 if !target.tags.contains(needed) {
837 return Err(anyhow!(
838 "Assignee '{}' missing required tag: {}",
839 target_agent,
840 needed
841 ));
842 }
843 }
844 }
845
846 if !task.wanted_tags.is_empty() {
847 let has_any = task
848 .wanted_tags
849 .iter()
850 .any(|wanted| target.tags.contains(wanted));
851 if !has_any {
852 return Err(anyhow!(
853 "Assignee '{}' has none of the wanted tags: {:?}",
854 target_agent,
855 task.wanted_tags
856 ));
857 }
858 }
859
860 new_owner = Some(target_agent.to_string());
862 new_claimed_at = Some(now);
863 }
864
865 if new_is_timed && !is_owned_by_agent {
868 if is_owned_by_other && !force {
870 return Err(anyhow!(
871 "Task is already claimed by agent '{}'",
872 current_owner.unwrap()
873 ));
874 }
875
876 if !force {
878 let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
879 &tx,
880 task_id,
881 states_config,
882 deps_config,
883 )?;
884 if !unsatisfied_blockers.is_empty() {
885 return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
888 }
889 }
890
891 let agent = get_worker_internal(&tx, agent_id)?
893 .ok_or_else(|| anyhow!("Agent not found"))?;
894
895 if !task.needed_tags.is_empty() {
897 for needed in &task.needed_tags {
898 if !agent.tags.contains(needed) {
899 return Err(anyhow!("Agent missing required tag: {}", needed));
900 }
901 }
902 }
903
904 if !task.wanted_tags.is_empty() {
906 let has_any = task
907 .wanted_tags
908 .iter()
909 .any(|wanted| agent.tags.contains(wanted));
910 if !has_any {
911 return Err(anyhow!("Agent has none of the wanted tags"));
912 }
913 }
914
915 new_owner = Some(agent_id.to_string());
917 new_claimed_at = Some(now);
918
919 tx.execute(
921 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
922 params![now, agent_id],
923 )?;
924 }
925
926 if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
928 if is_owned_by_other && !force {
930 return Err(anyhow!("Task is not owned by this agent"));
931 }
932
933 new_owner = None;
935 new_claimed_at = None;
936 }
937
938 if new_is_terminal {
940 if let Some(ref current_owner) = task.worker_id
942 && current_owner != agent_id && !force {
943 return Err(anyhow!("Task is not owned by this agent"));
944 }
945
946 let incomplete_children: i32 = tx.query_row(
948 "SELECT COUNT(*) FROM dependencies d
949 INNER JOIN tasks child ON d.to_task_id = child.id
950 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
951 AND child.status IN (SELECT value FROM json_each(?2))",
952 params![
953 task_id,
954 serde_json::to_string(&states_config.blocking_states)?
955 ],
956 |row| row.get(0),
957 )?;
958
959 if incomplete_children > 0 {
960 return Err(anyhow!(
961 "Cannot complete task: {} child task(s) are not complete",
962 incomplete_children
963 ));
964 }
965
966 new_owner = None;
968 new_claimed_at = None;
969
970 tx.execute(
972 "DELETE FROM file_locks WHERE task_id = ?1",
973 params![task_id],
974 )?;
975 }
976
977 let started_at =
979 if task.started_at.is_none() && new_is_timed {
980 Some(now)
981 } else {
982 task.started_at
983 };
984
985 let completed_at = if new_status == "completed" {
987 Some(now)
988 } else {
989 task.completed_at
990 };
991
992 let status_changed = task.status != new_status;
994 if status_changed {
995 record_state_transition(
996 &tx,
997 task_id,
998 &new_status,
999 new_owner.as_deref(),
1000 reason.as_deref(),
1001 states_config,
1002 )?;
1003 }
1004
1005 let phase_changed = task.phase != new_phase;
1007 if phase_changed {
1008 super::state_transitions::record_phase_transition(
1009 &tx,
1010 task_id,
1011 new_phase.as_deref().unwrap_or(""),
1012 Some(agent_id),
1013 reason.as_deref(),
1014 )?;
1015 }
1016
1017 tx.execute(
1018 "UPDATE tasks SET
1019 title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
1020 points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
1021 tags = ?10, worker_id = ?11, claimed_at = ?12,
1022 needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
1023 WHERE id = ?16",
1024 params![
1025 new_title,
1026 new_description,
1027 new_status,
1028 new_phase,
1029 new_priority.to_string(),
1030 new_points,
1031 started_at,
1032 completed_at,
1033 now,
1034 serde_json::to_string(&new_tags)?,
1035 new_owner,
1036 new_claimed_at,
1037 serde_json::to_string(&new_needed_tags)?,
1038 serde_json::to_string(&new_wanted_tags)?,
1039 new_time_estimate_ms,
1040 task_id,
1041 ],
1042 )?;
1043
1044 if new_tags != task.tags {
1046 sync_task_tags(&tx, task_id, &new_tags)?;
1047 }
1048 if new_needed_tags != task.needed_tags {
1049 sync_needed_tags(&tx, task_id, &new_needed_tags)?;
1050 }
1051 if new_wanted_tags != task.wanted_tags {
1052 sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
1053 }
1054
1055 let (unblocked, auto_advanced) = if status_changed {
1057 let was_blocking = states_config.is_blocking_state(&task.status);
1058 let is_blocking = states_config.is_blocking_state(&new_status);
1059
1060 if was_blocking && !is_blocking {
1061 super::deps::propagate_unblock_effects(
1062 &tx,
1063 task_id,
1064 Some(agent_id),
1065 states_config,
1066 deps_config,
1067 auto_advance,
1068 )?
1069 } else {
1070 (vec![], vec![])
1071 }
1072 } else {
1073 (vec![], vec![])
1074 };
1075
1076 tx.commit()?;
1077
1078 Ok((Task {
1079 id: task_id.to_string(),
1080 title: new_title,
1081 description: new_description,
1082 status: new_status,
1083 phase: new_phase,
1084 priority: new_priority,
1085 points: new_points,
1086 tags: new_tags,
1087 needed_tags: new_needed_tags,
1088 wanted_tags: new_wanted_tags,
1089 time_estimate_ms: new_time_estimate_ms,
1090 started_at,
1091 completed_at,
1092 updated_at: now,
1093 worker_id: new_owner,
1094 claimed_at: new_claimed_at,
1095 ..task
1096 }, unblocked, auto_advanced))
1097 })
1098 }
1099
1100 pub fn delete_task(
1108 &self,
1109 task_id: &str,
1110 worker_id: &str,
1111 cascade: bool,
1112 reason: Option<String>,
1113 obliterate: bool,
1114 force: bool,
1115 ) -> Result<()> {
1116 let now = now_ms();
1117
1118 self.with_conn_mut(|conn| {
1119 let tx = conn.transaction()?;
1120
1121 let task = get_task_internal(&tx, task_id)?
1123 .ok_or_else(|| anyhow!("Task not found"))?;
1124
1125 if let Some(ref owner) = task.worker_id
1127 && owner != worker_id && !force {
1128 return Err(anyhow!(
1129 "Task is claimed by worker '{}'. Use force=true to override.",
1130 owner
1131 ));
1132 }
1133
1134 if obliterate {
1135 if cascade {
1137 tx.execute(
1140 "WITH RECURSIVE descendants AS (
1141 SELECT ?1 AS id
1142 UNION ALL
1143 SELECT dep.to_task_id FROM dependencies dep
1144 INNER JOIN descendants d ON dep.from_task_id = d.id
1145 WHERE dep.dep_type = 'contains'
1146 )
1147 DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1148 params![task_id],
1149 )?;
1150 } else {
1151 let child_count: i32 = tx.query_row(
1153 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1154 params![task_id],
1155 |row| row.get(0),
1156 )?;
1157
1158 if child_count > 0 {
1159 return Err(anyhow!("Task has children; use cascade=true to delete"));
1160 }
1161
1162 tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1163 }
1164 } else {
1165 if cascade {
1167 tx.execute(
1169 "WITH RECURSIVE descendants AS (
1170 SELECT ?1 AS id
1171 UNION ALL
1172 SELECT dep.to_task_id FROM dependencies dep
1173 INNER JOIN descendants d ON dep.from_task_id = d.id
1174 WHERE dep.dep_type = 'contains'
1175 )
1176 UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1177 WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1178 params![task_id, now, worker_id, reason],
1179 )?;
1180 } else {
1181 let child_count: i32 = tx.query_row(
1183 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1184 params![task_id],
1185 |row| row.get(0),
1186 )?;
1187
1188 if child_count > 0 {
1189 return Err(anyhow!("Task has children; use cascade=true to delete"));
1190 }
1191
1192 tx.execute(
1193 "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1194 params![now, worker_id, reason, task_id],
1195 )?;
1196 }
1197 }
1198
1199 tx.commit()?;
1200 Ok(())
1201 })
1202 }
1203
1204 pub fn list_tasks(&self, query: ListTasksQuery<'_>) -> Result<Vec<Task>> {
1207 let ListTasksQuery {
1208 status,
1209 phase,
1210 owner,
1211 parent_id,
1212 limit,
1213 offset,
1214 sort_by,
1215 sort_order,
1216 } = query;
1217 self.with_conn(|conn| {
1218 let mut sql = String::from(
1219 "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1220 );
1221 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1222
1223 if let Some(s) = status {
1224 sql.push_str(" AND t.status = ?");
1225 params_vec.push(Box::new(s.to_string()));
1226 }
1227
1228 if let Some(p) = phase {
1229 sql.push_str(" AND t.phase = ?");
1230 params_vec.push(Box::new(p.to_string()));
1231 }
1232
1233 if let Some(o) = owner {
1234 sql.push_str(" AND t.worker_id = ?");
1235 params_vec.push(Box::new(o.to_string()));
1236 }
1237
1238 if let Some(p) = parent_id {
1240 match p {
1241 Some(pid) => {
1242 sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1243 params_vec.push(Box::new(pid.to_string()));
1244 }
1245 None => {
1246 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1248 }
1249 }
1250 }
1251
1252 let order_clause = build_order_clause(sort_by, sort_order);
1254 sql.push_str(&format!(" ORDER BY {}", order_clause));
1255
1256 if let Some(l) = limit {
1257 sql.push_str(&format!(" LIMIT {}", l));
1258 }
1259
1260 if offset > 0 {
1261 sql.push_str(&format!(" OFFSET {}", offset));
1262 }
1263
1264 let params_refs: Vec<&dyn rusqlite::ToSql> =
1265 params_vec.iter().map(|b| b.as_ref()).collect();
1266
1267 let mut stmt = conn.prepare(&sql)?;
1268 let tasks = stmt
1269 .query_map(params_refs.as_slice(), parse_task_row)?
1270 .filter_map(|r| r.ok())
1271 .collect();
1272
1273 Ok(tasks)
1274 })
1275 }
1276
1277 pub fn set_thought(
1279 &self,
1280 agent_id: &str,
1281 thought: Option<String>,
1282 task_ids: Option<Vec<String>>,
1283 ) -> Result<i32> {
1284 let now = now_ms();
1285
1286 self.with_conn(|conn| {
1287 let updated = if let Some(ids) = task_ids {
1288 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1289 let sql = format!(
1290 "UPDATE tasks SET current_thought = ?, updated_at = ?
1291 WHERE worker_id = ? AND id IN ({})",
1292 placeholders.join(", ")
1293 );
1294
1295 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1296 params_vec.push(Box::new(thought.clone()));
1297 params_vec.push(Box::new(now));
1298 params_vec.push(Box::new(agent_id.to_string()));
1299 for id in &ids {
1300 params_vec.push(Box::new(id.clone()));
1301 }
1302
1303 let params_refs: Vec<&dyn rusqlite::ToSql> =
1304 params_vec.iter().map(|b| b.as_ref()).collect();
1305 conn.execute(&sql, params_refs.as_slice())?
1306 } else {
1307 conn.execute(
1308 "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1309 params![thought, now, agent_id],
1310 )?
1311 };
1312
1313 Ok(updated as i32)
1314 })
1315 }
1316
1317 pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1319 let now = now_ms();
1320
1321 self.with_conn(|conn| {
1322 conn.execute(
1323 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1324 WHERE id = ?3",
1325 params![duration_ms, now, task_id],
1326 )?;
1327
1328 let total: i64 = conn.query_row(
1329 "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1330 params![task_id],
1331 |row| row.get(0),
1332 )?;
1333
1334 Ok(total)
1335 })
1336 }
1337
1338 pub fn log_metrics(
1341 &self,
1342 task_id: &str,
1343 cost_usd: Option<f64>,
1344 values: &[i64],
1345 ) -> Result<Task> {
1346 let now = now_ms();
1347
1348 self.with_conn(|conn| {
1349 let task =
1350 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1351
1352 let mut new_metrics = task.metrics;
1354 for (i, &val) in values.iter().take(8).enumerate() {
1355 new_metrics[i] += val;
1356 }
1357
1358 let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1359
1360 conn.execute(
1361 "UPDATE tasks SET
1362 metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1363 metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1364 cost_usd = ?9, updated_at = ?10
1365 WHERE id = ?11",
1366 params![
1367 new_metrics[0],
1368 new_metrics[1],
1369 new_metrics[2],
1370 new_metrics[3],
1371 new_metrics[4],
1372 new_metrics[5],
1373 new_metrics[6],
1374 new_metrics[7],
1375 new_cost_usd,
1376 now,
1377 task_id,
1378 ],
1379 )?;
1380
1381 Ok(Task {
1382 cost_usd: new_cost_usd,
1383 metrics: new_metrics,
1384 updated_at: now,
1385 ..task
1386 })
1387 })
1388 }
1389
1390 pub fn claim_task(
1393 &self,
1394 task_id: &str,
1395 agent_id: &str,
1396 states_config: &StatesConfig,
1397 ) -> Result<Task> {
1398 let now = now_ms();
1399
1400 let claim_status = states_config
1402 .definitions
1403 .iter()
1404 .find(|(_, def)| def.timed)
1405 .map(|(name, _)| name.as_str())
1406 .unwrap_or("working");
1407
1408 self.with_conn(|conn| {
1409 let task =
1411 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1412
1413 if task.worker_id.is_some() {
1415 return Err(anyhow!("Task is already claimed"));
1416 }
1417
1418 if !states_config.is_valid_transition(&task.status, claim_status) {
1420 let exits = states_config.get_exits(&task.status);
1421 return Err(anyhow!(
1422 "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1423 task.status,
1424 exits
1425 ));
1426 }
1427
1428 let agent =
1430 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1431
1432 if !task.needed_tags.is_empty() {
1434 for needed in &task.needed_tags {
1435 if !agent.tags.contains(needed) {
1436 return Err(anyhow!("Agent missing required tag: {}", needed));
1437 }
1438 }
1439 }
1440
1441 if !task.wanted_tags.is_empty() {
1443 let has_any = task
1444 .wanted_tags
1445 .iter()
1446 .any(|wanted| agent.tags.contains(wanted));
1447 if !has_any {
1448 return Err(anyhow!("Agent has none of the wanted tags"));
1449 }
1450 }
1451
1452 conn.execute(
1453 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1454 WHERE id = ?6",
1455 params![agent_id, now, claim_status, now, now, task_id,],
1456 )?;
1457
1458 record_state_transition(
1460 conn,
1461 task_id,
1462 claim_status,
1463 Some(agent_id),
1464 None,
1465 states_config,
1466 )?;
1467
1468 conn.execute(
1470 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1471 params![now, agent_id],
1472 )?;
1473
1474 Ok(Task {
1475 worker_id: Some(agent_id.to_string()),
1476 claimed_at: Some(now),
1477 status: claim_status.to_string(),
1478 started_at: Some(now),
1479 updated_at: now,
1480 ..task
1481 })
1482 })
1483 }
1484
1485 pub fn release_task(
1487 &self,
1488 task_id: &str,
1489 agent_id: &str,
1490 states_config: &StatesConfig,
1491 ) -> Result<()> {
1492 let now = now_ms();
1493 let release_status = &states_config.initial;
1494
1495 self.with_conn(|conn| {
1496 let task =
1497 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1498
1499 if task.worker_id.as_deref() != Some(agent_id) {
1500 return Err(anyhow!("Task is not owned by this agent"));
1501 }
1502
1503 record_state_transition(
1505 conn,
1506 task_id,
1507 release_status,
1508 Some(agent_id),
1509 None,
1510 states_config,
1511 )?;
1512
1513 conn.execute(
1514 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1515 WHERE id = ?3",
1516 params![release_status, now, task_id],
1517 )?;
1518
1519 Ok(())
1520 })
1521 }
1522
1523 pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1525 let now = now_ms();
1526 let release_status = &states_config.initial;
1527
1528 self.with_conn(|conn| {
1529 let task =
1530 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1531
1532 record_state_transition(
1534 conn,
1535 task_id,
1536 release_status,
1537 task.worker_id.as_deref(),
1538 None,
1539 states_config,
1540 )?;
1541
1542 conn.execute(
1543 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1544 WHERE id = ?3",
1545 params![release_status, now, task_id],
1546 )?;
1547
1548 Ok(())
1549 })
1550 }
1551
1552 pub fn force_claim_task(
1554 &self,
1555 task_id: &str,
1556 agent_id: &str,
1557 states_config: &StatesConfig,
1558 ) -> Result<Task> {
1559 let now = now_ms();
1560
1561 let claim_status = states_config
1563 .definitions
1564 .iter()
1565 .find(|(_, def)| def.timed)
1566 .map(|(name, _)| name.as_str())
1567 .unwrap_or("working");
1568
1569 self.with_conn(|conn| {
1570 let task =
1572 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1573
1574 let agent =
1576 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1577
1578 if !task.needed_tags.is_empty() {
1580 for needed in &task.needed_tags {
1581 if !agent.tags.contains(needed) {
1582 return Err(anyhow!("Agent missing required tag: {}", needed));
1583 }
1584 }
1585 }
1586
1587 if !task.wanted_tags.is_empty() {
1589 let has_any = task
1590 .wanted_tags
1591 .iter()
1592 .any(|wanted| agent.tags.contains(wanted));
1593 if !has_any {
1594 return Err(anyhow!("Agent has none of the wanted tags"));
1595 }
1596 }
1597
1598 conn.execute(
1599 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1600 WHERE id = ?6",
1601 params![agent_id, now, claim_status, now, now, task_id,],
1602 )?;
1603
1604 record_state_transition(
1606 conn,
1607 task_id,
1608 claim_status,
1609 Some(agent_id),
1610 None,
1611 states_config,
1612 )?;
1613
1614 conn.execute(
1616 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1617 params![now, agent_id],
1618 )?;
1619
1620 Ok(Task {
1621 worker_id: Some(agent_id.to_string()),
1622 claimed_at: Some(now),
1623 status: claim_status.to_string(),
1624 started_at: task.started_at.or(Some(now)),
1625 updated_at: now,
1626 ..task
1627 })
1628 })
1629 }
1630
1631 pub fn release_task_with_state(
1633 &self,
1634 task_id: &str,
1635 agent_id: &str,
1636 state: &str,
1637 states_config: &StatesConfig,
1638 ) -> Result<()> {
1639 let now = now_ms();
1640
1641 self.with_conn(|conn| {
1642 let task =
1643 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1644
1645 if task.worker_id.as_deref() != Some(agent_id) {
1646 return Err(anyhow!("Task is not owned by this agent"));
1647 }
1648
1649 if !states_config.is_valid_state(state) {
1651 return Err(anyhow!(
1652 "Invalid state '{}'. Valid states: {:?}",
1653 state,
1654 states_config.state_names()
1655 ));
1656 }
1657
1658 if !states_config.is_valid_transition(&task.status, state) {
1660 let exits = states_config.get_exits(&task.status);
1661 return Err(anyhow!(
1662 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1663 task.status,
1664 state,
1665 exits
1666 ));
1667 }
1668
1669 let completed_at = if state == "completed" {
1671 Some(now)
1672 } else {
1673 None
1674 };
1675
1676 record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1678
1679 conn.execute(
1680 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1681 WHERE id = ?4",
1682 params![state, completed_at, now, task_id],
1683 )?;
1684
1685 Ok(())
1686 })
1687 }
1688
1689 pub fn force_release_stale(
1691 &self,
1692 timeout_seconds: i64,
1693 states_config: &StatesConfig,
1694 ) -> Result<i32> {
1695 let now = now_ms();
1696 let cutoff = now - (timeout_seconds * 1000);
1697 let release_status = &states_config.initial;
1698
1699 self.with_conn(|conn| {
1700 let updated = conn.execute(
1701 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1702 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1703 params![release_status, now, cutoff],
1704 )?;
1705
1706 Ok(updated as i32)
1707 })
1708 }
1709
1710 pub fn complete_task(
1714 &self,
1715 task_id: &str,
1716 agent_id: &str,
1717 states_config: &StatesConfig,
1718 ) -> Result<Task> {
1719 let now = now_ms();
1720
1721 let complete_status = if states_config.definitions.contains_key("completed") {
1723 "completed"
1724 } else {
1725 states_config
1727 .definitions
1728 .iter()
1729 .find(|(_, def)| def.exits.is_empty())
1730 .map(|(name, _)| name.as_str())
1731 .unwrap_or("completed")
1732 };
1733
1734 self.with_conn_mut(|conn| {
1735 let tx = conn.transaction()?;
1736
1737 let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1739 let task = stmt
1740 .query_row(params![task_id], parse_task_row)
1741 .map_err(|_| anyhow!("Task not found"))?;
1742 drop(stmt);
1743
1744 if task.worker_id.as_deref() != Some(agent_id) {
1746 return Err(anyhow!("Task is not owned by this agent"));
1747 }
1748
1749 let incomplete_children: i32 = tx.query_row(
1751 "SELECT COUNT(*) FROM dependencies d
1752 INNER JOIN tasks child ON d.to_task_id = child.id
1753 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1754 AND child.status IN (SELECT value FROM json_each(?2))",
1755 params![
1756 task_id,
1757 serde_json::to_string(&states_config.blocking_states)?
1758 ],
1759 |row| row.get(0),
1760 )?;
1761
1762 if incomplete_children > 0 {
1763 return Err(anyhow!(
1764 "Cannot complete task: {} child task(s) are not complete",
1765 incomplete_children
1766 ));
1767 }
1768
1769 if !states_config.is_valid_transition(&task.status, complete_status) {
1771 let exits = states_config.get_exits(&task.status);
1772 return Err(anyhow!(
1773 "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1774 task.status,
1775 exits
1776 ));
1777 }
1778
1779 record_state_transition(
1781 &tx,
1782 task_id,
1783 complete_status,
1784 Some(agent_id),
1785 None,
1786 states_config,
1787 )?;
1788
1789 tx.execute(
1791 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1792 worker_id = NULL, claimed_at = NULL
1793 WHERE id = ?4",
1794 params![complete_status, now, now, task_id],
1795 )?;
1796
1797 tx.execute(
1799 "DELETE FROM file_locks WHERE task_id = ?1",
1800 params![task_id],
1801 )?;
1802
1803 tx.execute(
1805 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1806 params![now, agent_id],
1807 )?;
1808
1809 tx.commit()?;
1810
1811 Ok(Task {
1812 status: complete_status.to_string(),
1813 completed_at: Some(now),
1814 updated_at: now,
1815 worker_id: None,
1816 claimed_at: None,
1817 ..task
1818 })
1819 })
1820 }
1821
1822 pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1824 self.with_conn(|conn| {
1825 let mut stmt =
1826 conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1827 let tasks = stmt
1828 .query_map([], parse_task_row)?
1829 .filter_map(|r| r.ok())
1830 .collect();
1831 Ok(tasks)
1832 })
1833 }
1834
1835 #[allow(dead_code)]
1837 pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1838 self.with_conn(|conn| {
1839 let mut stmt =
1840 conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1841 let tasks = stmt
1842 .query_map(params![status], parse_task_row)?
1843 .filter_map(|r| r.ok())
1844 .collect();
1845 Ok(tasks)
1846 })
1847 }
1848
1849 pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1851 self.with_conn(|conn| {
1852 let tasks = if let Some(aid) = agent_id {
1853 let mut stmt = conn
1854 .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1855 stmt.query_map(params![aid], parse_task_row)?
1856 .filter_map(|r| r.ok())
1857 .collect()
1858 } else {
1859 let mut stmt = conn.prepare(
1860 "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1861 )?;
1862 stmt.query_map([], parse_task_row)?
1863 .filter_map(|r| r.ok())
1864 .collect()
1865 };
1866
1867 Ok(tasks)
1868 })
1869 }
1870}
1871
1872#[allow(clippy::too_many_arguments)]
1877fn create_tree_recursive(
1878 conn: &Connection,
1879 input: &TaskTreeInput,
1880 parent_id: Option<&str>,
1881 prev_sibling_id: Option<&str>,
1882 child_type: Option<&str>,
1883 sibling_type: Option<&str>,
1884 all_ids: &mut Vec<String>,
1885 phase_warnings: &mut Vec<String>,
1886 tag_warnings: &mut Vec<String>,
1887 states_config: &StatesConfig,
1888 phases_config: &PhasesConfig,
1889 tags_config: &TagsConfig,
1890 ids_config: &IdsConfig,
1891) -> Result<String> {
1892 let task_id = if let Some(ref ref_id) = input.ref_id {
1894 let exists: bool = conn.query_row(
1896 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1897 params![ref_id],
1898 |row| row.get(0),
1899 )?;
1900 if !exists {
1901 return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1902 }
1903 ref_id.clone()
1904 } else {
1905 let task_id = input
1907 .id
1908 .clone()
1909 .unwrap_or_else(|| generate_task_id(ids_config));
1910 let now = now_ms();
1911 let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1912 let initial_status = &states_config.initial;
1913
1914 let title = input.title.clone().unwrap_or_else(|| {
1916 input
1917 .description
1918 .as_deref()
1919 .map(|d| crate::format::truncate_title(d).into_owned())
1920 .unwrap_or_default()
1921 });
1922
1923 if let Some(ref phase) = input.phase
1925 && let Some(warning) = phases_config.check_phase(phase)?
1926 {
1927 phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1928 }
1929
1930 let needed_tags = input.needed_tags.clone().unwrap_or_default();
1931 let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1932 let tags = input.tags.clone().unwrap_or_default();
1933
1934 for warning in tags_config.validate_tags(&tags)? {
1936 tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1937 }
1938 for warning in tags_config.validate_tags(&needed_tags)? {
1939 tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1940 }
1941 for warning in tags_config.validate_tags(&wanted_tags)? {
1942 tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1943 }
1944
1945 let needed_tags_json = serde_json::to_string(&needed_tags)?;
1946 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1947 let tags_json = serde_json::to_string(&tags)?;
1948
1949 conn.execute(
1950 "INSERT INTO tasks (
1951 id, title, description, status, phase, priority,
1952 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1953 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1954 params![
1955 &task_id,
1956 &title,
1957 &input.description,
1958 initial_status,
1959 &input.phase,
1960 priority.to_string(),
1961 needed_tags_json,
1962 wanted_tags_json,
1963 tags_json,
1964 input.points,
1965 input.time_estimate_ms,
1966 now,
1967 now,
1968 ],
1969 )?;
1970
1971 record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1973
1974 sync_task_tags(conn, &task_id, &tags)?;
1976 sync_needed_tags(conn, &task_id, &needed_tags)?;
1977 sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1978
1979 task_id
1980 };
1981
1982 if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1984 Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1985 }
1986
1987 if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1989 Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1990 }
1991
1992 all_ids.push(task_id.clone());
1993
1994 let mut prev_child_id: Option<String> = None;
1996 for child in input.children.iter() {
1997 let child_id = create_tree_recursive(
1998 conn,
1999 child,
2000 Some(&task_id),
2001 prev_child_id.as_deref(),
2002 child_type,
2003 sibling_type,
2004 all_ids,
2005 phase_warnings,
2006 tag_warnings,
2007 states_config,
2008 phases_config,
2009 tags_config,
2010 ids_config,
2011 )?;
2012 prev_child_id = Some(child_id);
2013 }
2014
2015 Ok(task_id)
2016}