1use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6 AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10 PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11 parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17type UpdateResult = (Task, Vec<String>, Vec<String>, Vec<(String, String)>);
21
22#[derive(Debug)]
24pub struct CreateTreeOptions<'a> {
25 pub input: TaskTreeInput,
26 pub parent_id: Option<String>,
27 pub child_type: Option<String>,
28 pub sibling_type: Option<String>,
29 pub states_config: &'a StatesConfig,
30 pub phases_config: &'a PhasesConfig,
31 pub tags_config: &'a TagsConfig,
32 pub ids_config: &'a IdsConfig,
33}
34
35#[derive(Debug, Default)]
37pub struct ListTasksQuery<'a> {
38 pub status: Option<&'a str>,
39 pub phase: Option<&'a str>,
40 pub owner: Option<&'a str>,
41 pub parent_id: Option<Option<&'a str>>,
42 pub limit: Option<i32>,
43 pub offset: i32,
44 pub sort_by: Option<&'a str>,
45 pub sort_order: Option<&'a str>,
46}
47
48fn generate_task_id(ids_config: &IdsConfig) -> String {
51 let words = ids_config.task_id_words;
52 let case = ids_config.id_case;
53
54 let base = Petnames::medium()
56 .generate_one(words, "-")
57 .unwrap_or_else(|| format!("task-{}", super::now_ms()));
58
59 case.convert(&base)
61}
62
63fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
66 let field = match sort_by {
67 Some("priority") => "CAST(t.priority AS INTEGER)",
68 Some("created_at") => "t.created_at",
69 Some("updated_at") => "t.updated_at",
70 _ => "t.created_at", };
72
73 let order = match sort_order {
74 Some("asc") => "ASC",
75 Some("desc") => "DESC",
76 _ => {
77 "DESC"
79 }
80 };
81
82 format!("{} {}", field, order)
83}
84
85fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
92 conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
93 for tag in tags {
94 conn.execute(
95 "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
96 params![task_id, tag],
97 )?;
98 }
99 Ok(())
100}
101
102fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
104 conn.execute(
105 "DELETE FROM task_needed_tags WHERE task_id = ?1",
106 params![task_id],
107 )?;
108 for tag in tags {
109 conn.execute(
110 "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
111 params![task_id, tag],
112 )?;
113 }
114 Ok(())
115}
116
117fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
119 conn.execute(
120 "DELETE FROM task_wanted_tags WHERE task_id = ?1",
121 params![task_id],
122 )?;
123 for tag in tags {
124 conn.execute(
125 "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
126 params![task_id, tag],
127 )?;
128 }
129 Ok(())
130}
131
132pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
133 let id: String = row.get("id")?;
134 let title: String = row.get("title")?;
135 let description: Option<String> = row.get("description")?;
136 let status: String = row.get("status")?;
137 let phase: Option<String> = row.get("phase")?;
138 let priority: String = row.get("priority")?;
139 let worker_id: Option<String> = row.get("worker_id")?;
140 let claimed_at: Option<i64> = row.get("claimed_at")?;
141
142 let needed_tags_json: Option<String> = row.get("needed_tags")?;
143 let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
144 let tags_json: Option<String> = row.get("tags")?;
145
146 let points: Option<i32> = row.get("points")?;
147 let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
148 let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
149 let started_at: Option<i64> = row.get("started_at")?;
150 let completed_at: Option<i64> = row.get("completed_at")?;
151
152 let current_thought: Option<String> = row.get("current_thought")?;
153
154 let cost_usd: f64 = row.get("cost_usd")?;
155 let metric_0: i64 = row.get("metric_0")?;
156 let metric_1: i64 = row.get("metric_1")?;
157 let metric_2: i64 = row.get("metric_2")?;
158 let metric_3: i64 = row.get("metric_3")?;
159 let metric_4: i64 = row.get("metric_4")?;
160 let metric_5: i64 = row.get("metric_5")?;
161 let metric_6: i64 = row.get("metric_6")?;
162 let metric_7: i64 = row.get("metric_7")?;
163
164 let created_at: i64 = row.get("created_at")?;
165 let updated_at: i64 = row.get("updated_at")?;
166
167 Ok(Task {
168 id,
169 title,
170 description,
171 status,
172 phase,
173 priority: parse_priority(&priority),
174 worker_id,
175 claimed_at,
176 needed_tags: needed_tags_json
177 .map(|s| serde_json::from_str(&s).unwrap_or_default())
178 .unwrap_or_default(),
179 wanted_tags: wanted_tags_json
180 .map(|s| serde_json::from_str(&s).unwrap_or_default())
181 .unwrap_or_default(),
182 tags: tags_json
183 .map(|s| serde_json::from_str(&s).unwrap_or_default())
184 .unwrap_or_default(),
185 points,
186 time_estimate_ms,
187 time_actual_ms,
188 started_at,
189 completed_at,
190 current_thought,
191 cost_usd,
192 metrics: [
193 metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
194 ],
195 created_at,
196 updated_at,
197 })
198}
199
200fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
202 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
203
204 let result = stmt.query_row(params![task_id], parse_task_row);
205
206 match result {
207 Ok(task) => Ok(Some(task)),
208 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
209 Err(e) => Err(e.into()),
210 }
211}
212
213fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
215 let mut stmt = conn.prepare(
216 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
217 FROM workers WHERE id = ?1",
218 )?;
219
220 let result = stmt.query_row(params![worker_id], |row| {
221 let id: String = row.get(0)?;
222 let tags_json: String = row.get(1)?;
223 let max_claims: i32 = row.get(2)?;
224 let registered_at: i64 = row.get(3)?;
225 let last_heartbeat: i64 = row.get(4)?;
226 let last_status: Option<String> = row.get(5)?;
227 let last_phase: Option<String> = row.get(6)?;
228 let last_task_id: Option<String> = row.get(7)?;
229 let workflow: Option<String> = row.get(8)?;
230 let overlays_json: Option<String> = row.get(9)?;
231
232 Ok((
233 id,
234 tags_json,
235 max_claims,
236 registered_at,
237 last_heartbeat,
238 last_status,
239 last_phase,
240 last_task_id,
241 workflow,
242 overlays_json,
243 ))
244 });
245
246 match result {
247 Ok((
248 id,
249 tags_json,
250 max_claims,
251 registered_at,
252 last_heartbeat,
253 last_status,
254 last_phase,
255 last_task_id,
256 workflow,
257 overlays_json,
258 )) => {
259 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
260 let overlays: Vec<String> = overlays_json
261 .as_deref()
262 .and_then(|s| serde_json::from_str(s).ok())
263 .unwrap_or_default();
264 Ok(Some(Worker {
265 id,
266 tags,
267 max_claims,
268 registered_at,
269 last_heartbeat,
270 last_status,
271 last_phase,
272 last_task_id,
273 workflow,
274 overlays,
275 }))
276 }
277 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
278 Err(e) => Err(e.into()),
279 }
280}
281
282impl Database {
283 #[allow(clippy::too_many_arguments)]
288 pub fn create_task(
289 &self,
290 id: Option<String>,
291 title: String,
292 description: Option<String>,
293 parent_id: Option<String>,
294 phase: Option<String>,
295 priority: Option<Priority>,
296 points: Option<i32>,
297 time_estimate_ms: Option<i64>,
298 agent_tags_all: Option<Vec<String>>,
299 agent_tags_any: Option<Vec<String>>,
300 tags: Option<Vec<String>>,
301 states_config: &StatesConfig,
302 ids_config: &IdsConfig,
303 ) -> Result<Task> {
304 let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
305 let now = now_ms();
306 let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
307 let initial_status = &states_config.initial;
308
309 let needed_tags = agent_tags_all.unwrap_or_default();
310 let wanted_tags = agent_tags_any.unwrap_or_default();
311 let tags = tags.unwrap_or_default();
312 let needed_tags_json = serde_json::to_string(&needed_tags)?;
313 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
314 let tags_json = serde_json::to_string(&tags)?;
315
316 self.with_conn_mut(|conn| {
317 let tx = conn.transaction()?;
318
319 tx.execute(
320 "INSERT INTO tasks (
321 id, title, description, status, phase, priority,
322 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
323 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
324 params![
325 &task_id,
326 &title,
327 &description,
328 initial_status,
329 &phase,
330 priority.to_string(),
331 needed_tags_json,
332 wanted_tags_json,
333 tags_json,
334 points,
335 time_estimate_ms,
336 now,
337 now,
338 ],
339 )?;
340
341 sync_task_tags(&tx, &task_id, &tags)?;
343 sync_needed_tags(&tx, &task_id, &needed_tags)?;
344 sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
345
346 if let Some(ref pid) = parent_id {
348 Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
349 }
350
351 record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
353
354 tx.commit()?;
355
356 Ok(Task {
357 id: task_id,
358 title,
359 description,
360 status: initial_status.clone(),
361 phase,
362 priority,
363 worker_id: None,
364 claimed_at: None,
365 needed_tags,
366 wanted_tags,
367 tags,
368 points,
369 time_estimate_ms,
370 time_actual_ms: None,
371 started_at: None,
372 completed_at: None,
373 current_thought: None,
374 cost_usd: 0.0,
375 metrics: [0; 8],
376 created_at: now,
377 updated_at: now,
378 })
379 })
380 }
381
382 pub fn create_task_simple(
385 &self,
386 description: impl Into<String>,
387 states_config: &StatesConfig,
388 ids_config: &IdsConfig,
389 ) -> Result<Task> {
390 let desc = description.into();
391 self.create_task(
392 None,
393 desc.clone(),
394 Some(desc),
395 None,
396 None,
397 None,
398 None,
399 None,
400 None,
401 None,
402 None,
403 states_config,
404 ids_config,
405 )
406 }
407
408 #[allow(clippy::type_complexity)]
412 pub fn create_task_tree(
413 &self,
414 opts: CreateTreeOptions<'_>,
415 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
416 let mut all_ids = Vec::new();
417 let mut phase_warnings = Vec::new();
418 let mut tag_warnings = Vec::new();
419 let child_type = opts.child_type.or_else(|| Some("contains".to_string()));
421
422 self.with_conn_mut(|conn| {
423 let tx = conn.transaction()?;
424 let root_id = create_tree_recursive(
425 &tx,
426 &opts.input,
427 opts.parent_id.as_deref(),
428 None, child_type.as_deref(),
430 opts.sibling_type.as_deref(),
431 &mut all_ids,
432 &mut phase_warnings,
433 &mut tag_warnings,
434 opts.states_config,
435 opts.phases_config,
436 opts.tags_config,
437 opts.ids_config,
438 )?;
439 tx.commit()?;
440 Ok((root_id, all_ids, phase_warnings, tag_warnings))
441 })
442 }
443
444 pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
446 self.with_conn(|conn| {
447 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
448
449 let result = stmt.query_row(params![task_id], parse_task_row);
450
451 match result {
452 Ok(task) => Ok(Some(task)),
453 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
454 Err(e) => Err(e.into()),
455 }
456 })
457 }
458
459 pub fn rename_task(&self, old_id: &str, new_id: &str) -> Result<()> {
465 if new_id.is_empty() {
467 return Err(anyhow!("new_id must not be empty"));
468 }
469 if new_id.len() > 64 {
470 return Err(anyhow!("new_id must not exceed 64 characters"));
471 }
472
473 self.with_conn_mut(|conn| {
474 let exists: bool = conn.query_row(
476 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
477 params![old_id],
478 |row| row.get(0),
479 )?;
480 if !exists {
481 return Err(anyhow!("Task '{}' not found", old_id));
482 }
483
484 let conflict: bool = conn.query_row(
486 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
487 params![new_id],
488 |row| row.get(0),
489 )?;
490 if conflict {
491 return Err(anyhow!("Task '{}' already exists", new_id));
492 }
493
494 conn.execute_batch("PRAGMA foreign_keys = OFF")?;
496
497 let result = (|| -> Result<()> {
498 let tx = conn.transaction()?;
499
500 tx.execute(
502 "UPDATE tasks SET id = ?1 WHERE id = ?2",
503 params![new_id, old_id],
504 )?;
505
506 tx.execute(
508 "UPDATE attachments SET task_id = ?1 WHERE task_id = ?2",
509 params![new_id, old_id],
510 )?;
511
512 tx.execute(
514 "UPDATE dependencies SET from_task_id = ?1 WHERE from_task_id = ?2",
515 params![new_id, old_id],
516 )?;
517 tx.execute(
518 "UPDATE dependencies SET to_task_id = ?1 WHERE to_task_id = ?2",
519 params![new_id, old_id],
520 )?;
521
522 tx.execute(
524 "UPDATE file_locks SET task_id = ?1 WHERE task_id = ?2",
525 params![new_id, old_id],
526 )?;
527
528 tx.execute(
530 "UPDATE task_tags SET task_id = ?1 WHERE task_id = ?2",
531 params![new_id, old_id],
532 )?;
533 tx.execute(
534 "UPDATE task_needed_tags SET task_id = ?1 WHERE task_id = ?2",
535 params![new_id, old_id],
536 )?;
537 tx.execute(
538 "UPDATE task_wanted_tags SET task_id = ?1 WHERE task_id = ?2",
539 params![new_id, old_id],
540 )?;
541
542 tx.execute(
544 "UPDATE task_sequence SET task_id = ?1 WHERE task_id = ?2",
545 params![new_id, old_id],
546 )?;
547
548 tx.commit()?;
549 Ok(())
550 })();
551
552 conn.execute_batch("PRAGMA foreign_keys = ON")?;
554
555 result?;
557
558 let mut stmt = conn.prepare("PRAGMA foreign_key_check")?;
560 let violations: Vec<String> = stmt
561 .query_map([], |row| {
562 let table: String = row.get(0)?;
563 Ok(table)
564 })?
565 .filter_map(|r| r.ok())
566 .collect();
567
568 if !violations.is_empty() {
569 return Err(anyhow!(
570 "Foreign key violations after rename in tables: {:?}",
571 violations
572 ));
573 }
574
575 Ok(())
576 })
577 }
578
579 pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
581 let task = self.get_task(task_id)?;
582 match task {
583 None => Ok(None),
584 Some(task) => {
585 let children = self.get_children_recursive(&task.id)?;
586 Ok(Some(TaskTree { task, children }))
587 }
588 }
589 }
590
591 fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
593 let children = self.get_children(parent_id)?;
594 let mut result = Vec::new();
595
596 for child in children {
597 let child_children = self.get_children_recursive(&child.id)?;
598 result.push(TaskTree {
599 task: child,
600 children: child_children,
601 });
602 }
603
604 Ok(result)
605 }
606
607 pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
609 self.with_conn(|conn| {
610 let mut stmt = conn.prepare(
611 "SELECT t.* FROM tasks t
612 INNER JOIN dependencies d ON t.id = d.to_task_id
613 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
614 ORDER BY t.created_at",
615 )?;
616
617 let tasks = stmt
618 .query_map(params![parent_id], parse_task_row)?
619 .filter_map(|r| r.ok())
620 .collect();
621
622 Ok(tasks)
623 })
624 }
625
626 #[allow(clippy::too_many_arguments)]
628 pub fn update_task(
629 &self,
630 task_id: &str,
631 title: Option<String>,
632 description: Option<Option<String>>,
633 status: Option<String>,
634 priority: Option<Priority>,
635 points: Option<Option<i32>>,
636 tags: Option<Vec<String>>,
637 states_config: &StatesConfig,
638 ) -> Result<Task> {
639 let now = now_ms();
640
641 self.with_conn(|conn| {
642 let task =
643 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
644
645 let new_title = title.unwrap_or(task.title.clone());
646 let new_description = description.unwrap_or(task.description.clone());
647 let new_status = status.unwrap_or(task.status.clone());
648 let new_priority = priority.unwrap_or(task.priority);
649 let new_points = points.unwrap_or(task.points);
650 let new_tags = tags.unwrap_or(task.tags.clone());
651
652 if !states_config.is_valid_state(&new_status) {
654 return Err(anyhow!(
655 "Invalid state '{}'. Valid states: {:?}",
656 new_status,
657 states_config.state_names()
658 ));
659 }
660
661 if task.status != new_status
663 && !states_config.is_valid_transition(&task.status, &new_status)
664 {
665 let exits = states_config.get_exits(&task.status);
666 return Err(anyhow!(
667 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}. \
668Tasks must transition through a timed state (e.g. working) for time tracking; \
669skipping directly to a terminal state is not permitted.",
670 task.status,
671 new_status,
672 exits
673 ));
674 }
675
676 let started_at =
679 if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
680 Some(now)
681 } else {
682 task.started_at
683 };
684
685 let completed_at = if new_status == "completed" {
687 Some(now)
688 } else {
689 task.completed_at
690 };
691
692 if task.status != new_status {
694 record_state_transition(
695 conn,
696 task_id,
697 &new_status,
698 task.worker_id.as_deref(),
699 None,
700 states_config,
701 )?;
702 }
703
704 conn.execute(
705 "UPDATE tasks SET
706 title = ?1, description = ?2, status = ?3, priority = ?4,
707 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
708 tags = ?9
709 WHERE id = ?10",
710 params![
711 new_title,
712 new_description,
713 new_status,
714 new_priority.to_string(),
715 new_points,
716 started_at,
717 completed_at,
718 now,
719 serde_json::to_string(&new_tags)?,
720 task_id,
721 ],
722 )?;
723
724 Ok(Task {
725 id: task_id.to_string(),
726 title: new_title,
727 description: new_description,
728 status: new_status,
729 priority: new_priority,
730 points: new_points,
731 tags: new_tags,
732 started_at,
733 completed_at,
734 updated_at: now,
735 ..task
736 })
737 })
738 }
739
740 #[allow(clippy::too_many_arguments)]
752 pub fn update_task_unified(
753 &self,
754 task_id: &str,
755 agent_id: &str,
756 assignee: Option<&str>,
757 title: Option<String>,
758 description: Option<Option<String>>,
759 status: Option<String>,
760 phase: Option<String>,
761 priority: Option<Priority>,
762 points: Option<Option<i32>>,
763 tags: Option<Vec<String>>,
764 needed_tags: Option<Vec<String>>,
765 wanted_tags: Option<Vec<String>>,
766 time_estimate_ms: Option<i64>,
767 reason: Option<String>,
768 force: bool,
769 states_config: &StatesConfig,
770 deps_config: &DependenciesConfig,
771 auto_advance: &AutoAdvanceConfig,
772 ) -> Result<UpdateResult> {
773 let now = now_ms();
774
775 self.with_conn_mut(|conn| {
776 let tx = conn.transaction()?;
777
778 let task =
779 get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
780
781 if let Some(ref current_owner) = task.worker_id
783 && current_owner != agent_id && !force {
784 return Err(anyhow!(
785 "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
786 current_owner
787 ));
788 }
789
790 let new_title = title.unwrap_or(task.title.clone());
791 let new_description = description.unwrap_or(task.description.clone());
792 let new_status = if assignee.is_some() && status.is_none() {
794 "assigned".to_string()
795 } else {
796 status.unwrap_or(task.status.clone())
797 };
798 let new_priority = priority.unwrap_or(task.priority);
799 let new_points = points.unwrap_or(task.points);
800 let new_tags = tags.unwrap_or(task.tags.clone());
801 let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
802 let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
803 let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
804 let new_phase = phase.or(task.phase.clone());
805
806 if !states_config.is_valid_state(&new_status) {
808 return Err(anyhow!(
809 "Invalid state '{}'. Valid states: {:?}",
810 new_status,
811 states_config.state_names()
812 ));
813 }
814
815 if task.status != new_status
817 && !states_config.is_valid_transition(&task.status, &new_status) {
818 let exits = states_config.get_exits(&task.status);
819 return Err(anyhow!(
820 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}. \
821Tasks must transition through a timed state (e.g. working) for time tracking; \
822skipping directly to a terminal state is not permitted.",
823 task.status,
824 new_status,
825 exits
826 ));
827 }
828
829 let new_is_timed = states_config.is_timed_state(&new_status);
831 let new_is_terminal = states_config.is_terminal_state(&new_status);
832 let current_owner = task.worker_id.as_deref();
833 let is_owned_by_agent = current_owner == Some(agent_id);
834 let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
835
836 let mut new_owner: Option<String> = task.worker_id.clone();
837 let mut new_claimed_at: Option<i64> = task.claimed_at;
838
839 if let Some(target_agent) = assignee {
842 if is_owned_by_other && !force {
844 return Err(anyhow!(
845 "Task is already claimed by agent '{}'. Use force=true to reassign.",
846 current_owner.unwrap()
847 ));
848 }
849
850 let target = get_worker_internal(&tx, target_agent)?
852 .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
853
854 if !task.needed_tags.is_empty() {
856 for needed in &task.needed_tags {
857 if !target.tags.contains(needed) {
858 return Err(anyhow!(
859 "Assignee '{}' missing required tag: {}",
860 target_agent,
861 needed
862 ));
863 }
864 }
865 }
866
867 if !task.wanted_tags.is_empty() {
868 let has_any = task
869 .wanted_tags
870 .iter()
871 .any(|wanted| target.tags.contains(wanted));
872 if !has_any {
873 return Err(anyhow!(
874 "Assignee '{}' has none of the wanted tags: {:?}",
875 target_agent,
876 task.wanted_tags
877 ));
878 }
879 }
880
881 new_owner = Some(target_agent.to_string());
883 new_claimed_at = Some(now);
884 }
885
886 if new_is_timed && !is_owned_by_agent {
889 if is_owned_by_other && !force {
891 return Err(anyhow!(
892 "Task is already claimed by agent '{}'",
893 current_owner.unwrap()
894 ));
895 }
896
897 if !force {
899 let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
900 &tx,
901 task_id,
902 states_config,
903 deps_config,
904 )?;
905 if !unsatisfied_blockers.is_empty() {
906 return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
909 }
910 }
911
912 let agent = get_worker_internal(&tx, agent_id)?
914 .ok_or_else(|| anyhow!("Agent not found"))?;
915
916 if !task.needed_tags.is_empty() {
918 for needed in &task.needed_tags {
919 if !agent.tags.contains(needed) {
920 return Err(anyhow!("Agent missing required tag: {}", needed));
921 }
922 }
923 }
924
925 if !task.wanted_tags.is_empty() {
927 let has_any = task
928 .wanted_tags
929 .iter()
930 .any(|wanted| agent.tags.contains(wanted));
931 if !has_any {
932 return Err(anyhow!("Agent has none of the wanted tags"));
933 }
934 }
935
936 if !force {
938 let claim_count = super::agents::get_claim_count_internal(&tx, agent_id, states_config)?;
939 if claim_count >= agent.max_claims {
940 return Err(anyhow!(
941 "Agent '{}' has reached max_claims limit ({}/{}). Release a task first or use force=true to override.",
942 agent_id, claim_count, agent.max_claims
943 ));
944 }
945 }
946
947 new_owner = Some(agent_id.to_string());
949 new_claimed_at = Some(now);
950
951 tx.execute(
953 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
954 params![now, agent_id],
955 )?;
956 }
957
958 if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
960 if is_owned_by_other && !force {
962 return Err(anyhow!("Task is not owned by this agent"));
963 }
964
965 new_owner = None;
967 new_claimed_at = None;
968 }
969
970 if new_is_terminal {
972 if let Some(ref current_owner) = task.worker_id
974 && current_owner != agent_id && !force {
975 return Err(anyhow!("Task is not owned by this agent"));
976 }
977
978 new_owner = None;
980 new_claimed_at = None;
981
982 tx.execute(
984 "DELETE FROM file_locks WHERE task_id = ?1",
985 params![task_id],
986 )?;
987 }
988
989 let started_at =
991 if task.started_at.is_none() && new_is_timed {
992 Some(now)
993 } else {
994 task.started_at
995 };
996
997 let completed_at = if new_status == "completed" {
999 Some(now)
1000 } else {
1001 task.completed_at
1002 };
1003
1004 let status_changed = task.status != new_status;
1006 if status_changed {
1007 record_state_transition(
1008 &tx,
1009 task_id,
1010 &new_status,
1011 new_owner.as_deref(),
1012 reason.as_deref(),
1013 states_config,
1014 )?;
1015 }
1016
1017 let phase_changed = task.phase != new_phase;
1019 if phase_changed {
1020 super::state_transitions::record_phase_transition(
1021 &tx,
1022 task_id,
1023 new_phase.as_deref().unwrap_or(""),
1024 Some(agent_id),
1025 reason.as_deref(),
1026 )?;
1027 }
1028
1029 tx.execute(
1030 "UPDATE tasks SET
1031 title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
1032 points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
1033 tags = ?10, worker_id = ?11, claimed_at = ?12,
1034 needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
1035 WHERE id = ?16",
1036 params![
1037 new_title,
1038 new_description,
1039 new_status,
1040 new_phase,
1041 new_priority.to_string(),
1042 new_points,
1043 started_at,
1044 completed_at,
1045 now,
1046 serde_json::to_string(&new_tags)?,
1047 new_owner,
1048 new_claimed_at,
1049 serde_json::to_string(&new_needed_tags)?,
1050 serde_json::to_string(&new_wanted_tags)?,
1051 new_time_estimate_ms,
1052 task_id,
1053 ],
1054 )?;
1055
1056 if new_tags != task.tags {
1058 sync_task_tags(&tx, task_id, &new_tags)?;
1059 }
1060 if new_needed_tags != task.needed_tags {
1061 sync_needed_tags(&tx, task_id, &new_needed_tags)?;
1062 }
1063 if new_wanted_tags != task.wanted_tags {
1064 sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
1065 }
1066
1067 let (unblocked, auto_advanced) = if status_changed {
1069 let was_blocking = states_config.is_blocking_state(&task.status);
1070 let is_blocking = states_config.is_blocking_state(&new_status);
1071
1072 if was_blocking && !is_blocking {
1073 super::deps::propagate_unblock_effects(
1074 &tx,
1075 task_id,
1076 Some(agent_id),
1077 states_config,
1078 deps_config,
1079 auto_advance,
1080 )?
1081 } else {
1082 (vec![], vec![])
1083 }
1084 } else {
1085 (vec![], vec![])
1086 };
1087
1088 let new_is_non_blocking = !states_config.is_blocking_state(&new_status);
1093 let auto_completed = if status_changed && new_is_non_blocking && auto_advance.auto_rollup {
1094 propagate_auto_rollup(
1095 &tx,
1096 task_id,
1097 agent_id,
1098 states_config,
1099 )?
1100 } else {
1101 vec![]
1102 };
1103
1104 tx.commit()?;
1105
1106 Ok((Task {
1107 id: task_id.to_string(),
1108 title: new_title,
1109 description: new_description,
1110 status: new_status,
1111 phase: new_phase,
1112 priority: new_priority,
1113 points: new_points,
1114 tags: new_tags,
1115 needed_tags: new_needed_tags,
1116 wanted_tags: new_wanted_tags,
1117 time_estimate_ms: new_time_estimate_ms,
1118 started_at,
1119 completed_at,
1120 updated_at: now,
1121 worker_id: new_owner,
1122 claimed_at: new_claimed_at,
1123 ..task
1124 }, unblocked, auto_advanced, auto_completed))
1125 })
1126 }
1127
1128 pub fn delete_task(
1136 &self,
1137 task_id: &str,
1138 worker_id: &str,
1139 cascade: bool,
1140 reason: Option<String>,
1141 obliterate: bool,
1142 force: bool,
1143 ) -> Result<()> {
1144 let now = now_ms();
1145
1146 self.with_conn_mut(|conn| {
1147 let tx = conn.transaction()?;
1148
1149 let task = get_task_internal(&tx, task_id)?
1151 .ok_or_else(|| anyhow!("Task not found"))?;
1152
1153 if let Some(ref owner) = task.worker_id
1155 && owner != worker_id && !force {
1156 return Err(anyhow!(
1157 "Task is claimed by worker '{}'. Use force=true to override.",
1158 owner
1159 ));
1160 }
1161
1162 if obliterate {
1163 if cascade {
1165 let mut stmt = tx.prepare(
1168 "WITH RECURSIVE descendants AS (
1169 SELECT ?1 AS id
1170 UNION ALL
1171 SELECT dep.to_task_id FROM dependencies dep
1172 INNER JOIN descendants d ON dep.from_task_id = d.id
1173 WHERE dep.dep_type = 'contains'
1174 )
1175 SELECT id FROM descendants",
1176 )?;
1177 let ids: Vec<String> = stmt
1178 .query_map(params![task_id], |row| row.get(0))?
1179 .collect::<Result<Vec<_>, _>>()?;
1180 drop(stmt);
1181
1182 if !ids.is_empty() {
1183 let placeholders: String = ids.iter().enumerate()
1184 .map(|(i, _)| format!("?{}", i + 1))
1185 .collect::<Vec<_>>()
1186 .join(",");
1187 let sql_locks = format!(
1188 "DELETE FROM file_locks WHERE task_id IN ({})", placeholders
1189 );
1190 let sql_tasks = format!(
1191 "DELETE FROM tasks WHERE id IN ({})", placeholders
1192 );
1193 let params: Vec<&dyn rusqlite::types::ToSql> =
1194 ids.iter().map(|id| id as &dyn rusqlite::types::ToSql).collect();
1195 tx.execute(&sql_locks, params.as_slice())?;
1196 tx.execute(&sql_tasks, params.as_slice())?;
1197 }
1198 } else {
1199 let child_count: i32 = tx.query_row(
1201 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1202 params![task_id],
1203 |row| row.get(0),
1204 )?;
1205
1206 if child_count > 0 {
1207 return Err(anyhow!("Task has children; use cascade=true to delete"));
1208 }
1209
1210 tx.execute("DELETE FROM file_locks WHERE task_id = ?1", params![task_id])?;
1212 tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1213 }
1214 } else {
1215 if cascade {
1217 tx.execute(
1219 "WITH RECURSIVE descendants AS (
1220 SELECT ?1 AS id
1221 UNION ALL
1222 SELECT dep.to_task_id FROM dependencies dep
1223 INNER JOIN descendants d ON dep.from_task_id = d.id
1224 WHERE dep.dep_type = 'contains'
1225 )
1226 UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1227 WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1228 params![task_id, now, worker_id, reason],
1229 )?;
1230 } else {
1231 let child_count: i32 = tx.query_row(
1233 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1234 params![task_id],
1235 |row| row.get(0),
1236 )?;
1237
1238 if child_count > 0 {
1239 return Err(anyhow!("Task has children; use cascade=true to delete"));
1240 }
1241
1242 tx.execute(
1243 "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1244 params![now, worker_id, reason, task_id],
1245 )?;
1246 }
1247 }
1248
1249 tx.commit()?;
1250 Ok(())
1251 })
1252 }
1253
1254 pub fn list_tasks(&self, query: ListTasksQuery<'_>) -> Result<Vec<Task>> {
1257 let ListTasksQuery {
1258 status,
1259 phase,
1260 owner,
1261 parent_id,
1262 limit,
1263 offset,
1264 sort_by,
1265 sort_order,
1266 } = query;
1267 self.with_conn(|conn| {
1268 let mut sql = String::from(
1269 "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1270 );
1271 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1272
1273 if let Some(s) = status {
1274 sql.push_str(" AND t.status = ?");
1275 params_vec.push(Box::new(s.to_string()));
1276 }
1277
1278 if let Some(p) = phase {
1279 sql.push_str(" AND t.phase = ?");
1280 params_vec.push(Box::new(p.to_string()));
1281 }
1282
1283 if let Some(o) = owner {
1284 sql.push_str(" AND t.worker_id = ?");
1285 params_vec.push(Box::new(o.to_string()));
1286 }
1287
1288 if let Some(p) = parent_id {
1290 match p {
1291 Some(pid) => {
1292 sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1293 params_vec.push(Box::new(pid.to_string()));
1294 }
1295 None => {
1296 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1298 }
1299 }
1300 }
1301
1302 let order_clause = build_order_clause(sort_by, sort_order);
1304 sql.push_str(&format!(" ORDER BY {}", order_clause));
1305
1306 if let Some(l) = limit {
1307 sql.push_str(&format!(" LIMIT {}", l));
1308 }
1309
1310 if offset > 0 {
1311 sql.push_str(&format!(" OFFSET {}", offset));
1312 }
1313
1314 let params_refs: Vec<&dyn rusqlite::ToSql> =
1315 params_vec.iter().map(|b| b.as_ref()).collect();
1316
1317 let mut stmt = conn.prepare(&sql)?;
1318 let tasks = stmt
1319 .query_map(params_refs.as_slice(), parse_task_row)?
1320 .filter_map(|r| r.ok())
1321 .collect();
1322
1323 Ok(tasks)
1324 })
1325 }
1326
1327 pub fn set_thought(
1329 &self,
1330 agent_id: &str,
1331 thought: Option<String>,
1332 task_ids: Option<Vec<String>>,
1333 ) -> Result<i32> {
1334 let now = now_ms();
1335
1336 self.with_conn(|conn| {
1337 let updated = if let Some(ids) = task_ids {
1338 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1339 let sql = format!(
1340 "UPDATE tasks SET current_thought = ?, updated_at = ?
1341 WHERE worker_id = ? AND id IN ({})",
1342 placeholders.join(", ")
1343 );
1344
1345 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1346 params_vec.push(Box::new(thought.clone()));
1347 params_vec.push(Box::new(now));
1348 params_vec.push(Box::new(agent_id.to_string()));
1349 for id in &ids {
1350 params_vec.push(Box::new(id.clone()));
1351 }
1352
1353 let params_refs: Vec<&dyn rusqlite::ToSql> =
1354 params_vec.iter().map(|b| b.as_ref()).collect();
1355 conn.execute(&sql, params_refs.as_slice())?
1356 } else {
1357 conn.execute(
1358 "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1359 params![thought, now, agent_id],
1360 )?
1361 };
1362
1363 Ok(updated as i32)
1364 })
1365 }
1366
1367 pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1369 let now = now_ms();
1370
1371 self.with_conn(|conn| {
1372 conn.execute(
1373 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1374 WHERE id = ?3",
1375 params![duration_ms, now, task_id],
1376 )?;
1377
1378 let total: i64 = conn.query_row(
1379 "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1380 params![task_id],
1381 |row| row.get(0),
1382 )?;
1383
1384 Ok(total)
1385 })
1386 }
1387
1388 pub fn log_metrics(
1391 &self,
1392 task_id: &str,
1393 cost_usd: Option<f64>,
1394 values: &[i64],
1395 ) -> Result<Task> {
1396 let now = now_ms();
1397
1398 self.with_conn(|conn| {
1399 let task =
1400 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1401
1402 let mut new_metrics = task.metrics;
1404 for (i, &val) in values.iter().take(8).enumerate() {
1405 new_metrics[i] += val;
1406 }
1407
1408 let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1409
1410 conn.execute(
1411 "UPDATE tasks SET
1412 metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1413 metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1414 cost_usd = ?9, updated_at = ?10
1415 WHERE id = ?11",
1416 params![
1417 new_metrics[0],
1418 new_metrics[1],
1419 new_metrics[2],
1420 new_metrics[3],
1421 new_metrics[4],
1422 new_metrics[5],
1423 new_metrics[6],
1424 new_metrics[7],
1425 new_cost_usd,
1426 now,
1427 task_id,
1428 ],
1429 )?;
1430
1431 Ok(Task {
1432 cost_usd: new_cost_usd,
1433 metrics: new_metrics,
1434 updated_at: now,
1435 ..task
1436 })
1437 })
1438 }
1439
1440 pub fn claim_task(
1443 &self,
1444 task_id: &str,
1445 agent_id: &str,
1446 states_config: &StatesConfig,
1447 ) -> Result<Task> {
1448 let now = now_ms();
1449
1450 let claim_status = states_config
1452 .definitions
1453 .iter()
1454 .find(|(_, def)| def.timed)
1455 .map(|(name, _)| name.as_str())
1456 .unwrap_or("working");
1457
1458 self.with_conn(|conn| {
1459 let task =
1461 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1462
1463 if task.worker_id.is_some() {
1465 return Err(anyhow!("Task is already claimed"));
1466 }
1467
1468 if !states_config.is_valid_transition(&task.status, claim_status) {
1470 let exits = states_config.get_exits(&task.status);
1471 return Err(anyhow!(
1472 "Cannot claim task in state '{}'. Allowed transitions: {:?}. \
1473Tasks must transition through a timed state (e.g. working) for time tracking.",
1474 task.status,
1475 exits
1476 ));
1477 }
1478
1479 let agent =
1481 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1482
1483 if !task.needed_tags.is_empty() {
1485 for needed in &task.needed_tags {
1486 if !agent.tags.contains(needed) {
1487 return Err(anyhow!("Agent missing required tag: {}", needed));
1488 }
1489 }
1490 }
1491
1492 if !task.wanted_tags.is_empty() {
1494 let has_any = task
1495 .wanted_tags
1496 .iter()
1497 .any(|wanted| agent.tags.contains(wanted));
1498 if !has_any {
1499 return Err(anyhow!("Agent has none of the wanted tags"));
1500 }
1501 }
1502
1503 let claim_count = super::agents::get_claim_count_internal(conn, agent_id, states_config)?;
1505 if claim_count >= agent.max_claims {
1506 return Err(anyhow!(
1507 "Agent '{}' has reached max_claims limit ({}/{}). Release a task first or use force=true to override.",
1508 agent_id, claim_count, agent.max_claims
1509 ));
1510 }
1511
1512 conn.execute(
1513 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1514 WHERE id = ?6",
1515 params![agent_id, now, claim_status, now, now, task_id,],
1516 )?;
1517
1518 record_state_transition(
1520 conn,
1521 task_id,
1522 claim_status,
1523 Some(agent_id),
1524 None,
1525 states_config,
1526 )?;
1527
1528 conn.execute(
1530 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1531 params![now, agent_id],
1532 )?;
1533
1534 Ok(Task {
1535 worker_id: Some(agent_id.to_string()),
1536 claimed_at: Some(now),
1537 status: claim_status.to_string(),
1538 started_at: Some(now),
1539 updated_at: now,
1540 ..task
1541 })
1542 })
1543 }
1544
1545 pub fn release_task(
1547 &self,
1548 task_id: &str,
1549 agent_id: &str,
1550 states_config: &StatesConfig,
1551 ) -> Result<()> {
1552 let now = now_ms();
1553 let release_status = &states_config.initial;
1554
1555 self.with_conn(|conn| {
1556 let task =
1557 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1558
1559 if task.worker_id.as_deref() != Some(agent_id) {
1560 return Err(anyhow!("Task is not owned by this agent"));
1561 }
1562
1563 record_state_transition(
1565 conn,
1566 task_id,
1567 release_status,
1568 Some(agent_id),
1569 None,
1570 states_config,
1571 )?;
1572
1573 conn.execute(
1574 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1575 WHERE id = ?3",
1576 params![release_status, now, task_id],
1577 )?;
1578
1579 Ok(())
1580 })
1581 }
1582
1583 pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1585 let now = now_ms();
1586 let release_status = &states_config.initial;
1587
1588 self.with_conn(|conn| {
1589 let task =
1590 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1591
1592 record_state_transition(
1594 conn,
1595 task_id,
1596 release_status,
1597 task.worker_id.as_deref(),
1598 None,
1599 states_config,
1600 )?;
1601
1602 conn.execute(
1603 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1604 WHERE id = ?3",
1605 params![release_status, now, task_id],
1606 )?;
1607
1608 Ok(())
1609 })
1610 }
1611
1612 pub fn force_claim_task(
1614 &self,
1615 task_id: &str,
1616 agent_id: &str,
1617 states_config: &StatesConfig,
1618 ) -> Result<Task> {
1619 let now = now_ms();
1620
1621 let claim_status = states_config
1623 .definitions
1624 .iter()
1625 .find(|(_, def)| def.timed)
1626 .map(|(name, _)| name.as_str())
1627 .unwrap_or("working");
1628
1629 self.with_conn(|conn| {
1630 let task =
1632 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1633
1634 let agent =
1636 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1637
1638 if !task.needed_tags.is_empty() {
1640 for needed in &task.needed_tags {
1641 if !agent.tags.contains(needed) {
1642 return Err(anyhow!("Agent missing required tag: {}", needed));
1643 }
1644 }
1645 }
1646
1647 if !task.wanted_tags.is_empty() {
1649 let has_any = task
1650 .wanted_tags
1651 .iter()
1652 .any(|wanted| agent.tags.contains(wanted));
1653 if !has_any {
1654 return Err(anyhow!("Agent has none of the wanted tags"));
1655 }
1656 }
1657
1658 conn.execute(
1659 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1660 WHERE id = ?6",
1661 params![agent_id, now, claim_status, now, now, task_id,],
1662 )?;
1663
1664 record_state_transition(
1666 conn,
1667 task_id,
1668 claim_status,
1669 Some(agent_id),
1670 None,
1671 states_config,
1672 )?;
1673
1674 conn.execute(
1676 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1677 params![now, agent_id],
1678 )?;
1679
1680 Ok(Task {
1681 worker_id: Some(agent_id.to_string()),
1682 claimed_at: Some(now),
1683 status: claim_status.to_string(),
1684 started_at: task.started_at.or(Some(now)),
1685 updated_at: now,
1686 ..task
1687 })
1688 })
1689 }
1690
1691 pub fn release_task_with_state(
1693 &self,
1694 task_id: &str,
1695 agent_id: &str,
1696 state: &str,
1697 states_config: &StatesConfig,
1698 ) -> Result<()> {
1699 let now = now_ms();
1700
1701 self.with_conn(|conn| {
1702 let task =
1703 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1704
1705 if task.worker_id.as_deref() != Some(agent_id) {
1706 return Err(anyhow!("Task is not owned by this agent"));
1707 }
1708
1709 if !states_config.is_valid_state(state) {
1711 return Err(anyhow!(
1712 "Invalid state '{}'. Valid states: {:?}",
1713 state,
1714 states_config.state_names()
1715 ));
1716 }
1717
1718 if !states_config.is_valid_transition(&task.status, state) {
1720 let exits = states_config.get_exits(&task.status);
1721 return Err(anyhow!(
1722 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}. \
1723Tasks must transition through a timed state (e.g. working) for time tracking; \
1724skipping directly to a terminal state is not permitted.",
1725 task.status,
1726 state,
1727 exits
1728 ));
1729 }
1730
1731 let completed_at = if state == "completed" {
1733 Some(now)
1734 } else {
1735 None
1736 };
1737
1738 record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1740
1741 conn.execute(
1742 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1743 WHERE id = ?4",
1744 params![state, completed_at, now, task_id],
1745 )?;
1746
1747 Ok(())
1748 })
1749 }
1750
1751 pub fn force_release_stale(
1753 &self,
1754 timeout_seconds: i64,
1755 states_config: &StatesConfig,
1756 ) -> Result<i32> {
1757 let now = now_ms();
1758 let cutoff = now - (timeout_seconds * 1000);
1759 let release_status = &states_config.initial;
1760
1761 self.with_conn(|conn| {
1762 let updated = conn.execute(
1763 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1764 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1765 params![release_status, now, cutoff],
1766 )?;
1767
1768 Ok(updated as i32)
1769 })
1770 }
1771
1772 pub fn complete_task(
1776 &self,
1777 task_id: &str,
1778 agent_id: &str,
1779 states_config: &StatesConfig,
1780 ) -> Result<Task> {
1781 let now = now_ms();
1782
1783 let complete_status = if states_config.definitions.contains_key("completed") {
1785 "completed"
1786 } else {
1787 states_config
1789 .definitions
1790 .iter()
1791 .find(|(_, def)| def.exits.is_empty())
1792 .map(|(name, _)| name.as_str())
1793 .unwrap_or("completed")
1794 };
1795
1796 self.with_conn_mut(|conn| {
1797 let tx = conn.transaction()?;
1798
1799 let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1801 let task = stmt
1802 .query_row(params![task_id], parse_task_row)
1803 .map_err(|_| anyhow!("Task not found"))?;
1804 drop(stmt);
1805
1806 if task.worker_id.as_deref() != Some(agent_id) {
1808 return Err(anyhow!("Task is not owned by this agent"));
1809 }
1810
1811 let incomplete_children: i32 = tx.query_row(
1813 "SELECT COUNT(*) FROM dependencies d
1814 INNER JOIN tasks child ON d.to_task_id = child.id
1815 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1816 AND child.status IN (SELECT value FROM json_each(?2))",
1817 params![
1818 task_id,
1819 serde_json::to_string(&states_config.blocking_states)?
1820 ],
1821 |row| row.get(0),
1822 )?;
1823
1824 if incomplete_children > 0 {
1825 return Err(anyhow!(
1826 "Cannot complete task: {} child task(s) are not complete",
1827 incomplete_children
1828 ));
1829 }
1830
1831 if !states_config.is_valid_transition(&task.status, complete_status) {
1833 let exits = states_config.get_exits(&task.status);
1834 return Err(anyhow!(
1835 "Cannot complete task in state '{}'. Allowed transitions: {:?}. \
1836Tasks must transition through a timed state (e.g. working) for time tracking.",
1837 task.status,
1838 exits
1839 ));
1840 }
1841
1842 record_state_transition(
1844 &tx,
1845 task_id,
1846 complete_status,
1847 Some(agent_id),
1848 None,
1849 states_config,
1850 )?;
1851
1852 tx.execute(
1854 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1855 worker_id = NULL, claimed_at = NULL
1856 WHERE id = ?4",
1857 params![complete_status, now, now, task_id],
1858 )?;
1859
1860 tx.execute(
1862 "DELETE FROM file_locks WHERE task_id = ?1",
1863 params![task_id],
1864 )?;
1865
1866 tx.execute(
1868 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1869 params![now, agent_id],
1870 )?;
1871
1872 tx.commit()?;
1873
1874 Ok(Task {
1875 status: complete_status.to_string(),
1876 completed_at: Some(now),
1877 updated_at: now,
1878 worker_id: None,
1879 claimed_at: None,
1880 ..task
1881 })
1882 })
1883 }
1884
1885 pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1887 self.with_conn(|conn| {
1888 let mut stmt =
1889 conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1890 let tasks = stmt
1891 .query_map([], parse_task_row)?
1892 .filter_map(|r| r.ok())
1893 .collect();
1894 Ok(tasks)
1895 })
1896 }
1897
1898 #[allow(dead_code)]
1900 pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1901 self.with_conn(|conn| {
1902 let mut stmt =
1903 conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1904 let tasks = stmt
1905 .query_map(params![status], parse_task_row)?
1906 .filter_map(|r| r.ok())
1907 .collect();
1908 Ok(tasks)
1909 })
1910 }
1911
1912 pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1914 self.with_conn(|conn| {
1915 let tasks = if let Some(aid) = agent_id {
1916 let mut stmt = conn
1917 .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1918 stmt.query_map(params![aid], parse_task_row)?
1919 .filter_map(|r| r.ok())
1920 .collect()
1921 } else {
1922 let mut stmt = conn.prepare(
1923 "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1924 )?;
1925 stmt.query_map([], parse_task_row)?
1926 .filter_map(|r| r.ok())
1927 .collect()
1928 };
1929
1930 Ok(tasks)
1931 })
1932 }
1933}
1934
1935fn propagate_auto_rollup(
1939 conn: &Connection,
1940 completed_task_id: &str,
1941 agent_id: &str,
1942 states_config: &StatesConfig,
1943) -> Result<Vec<(String, String)>> {
1944 let now = super::now_ms();
1945 let mut auto_completed = Vec::new();
1946
1947 let parent_id: Option<String> = conn
1949 .query_row(
1950 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1951 params![completed_task_id],
1952 |row| row.get(0),
1953 )
1954 .ok();
1955
1956 let parent_id = match parent_id {
1957 Some(id) => id,
1958 None => return Ok(auto_completed), };
1960
1961 let parent = match get_task_internal(conn, &parent_id)? {
1963 Some(t) => t,
1964 None => return Ok(auto_completed),
1965 };
1966
1967 if !states_config.is_blocking_state(&parent.status) {
1969 return Ok(auto_completed);
1970 }
1971
1972 let non_terminal_children: i32 = conn.query_row(
1976 "SELECT COUNT(*) FROM dependencies d
1977 INNER JOIN tasks child ON d.to_task_id = child.id
1978 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1979 AND child.status IN (SELECT value FROM json_each(?2))",
1980 params![
1981 parent_id,
1982 serde_json::to_string(&states_config.blocking_states)?
1983 ],
1984 |row| row.get(0),
1985 )?;
1986
1987 if non_terminal_children > 0 {
1988 return Ok(auto_completed); }
1990
1991 let first_timed = states_config
1994 .definitions
1995 .iter()
1996 .find(|(_, def)| def.timed)
1997 .map(|(name, _)| name.clone())
1998 .unwrap_or_else(|| "working".to_string());
1999
2000 let parent_title = parent.title.clone();
2001
2002 let mut current_state = parent.status.clone();
2004
2005 if !states_config.is_timed_state(¤t_state) {
2007 if states_config.is_valid_transition(¤t_state, &first_timed) {
2009 conn.execute(
2010 "UPDATE tasks SET status = ?1, started_at = COALESCE(started_at, ?2), updated_at = ?2 WHERE id = ?3",
2011 params![&first_timed, now, &parent_id],
2012 )?;
2013
2014 record_state_transition(
2015 conn,
2016 &parent_id,
2017 &first_timed,
2018 Some(agent_id),
2019 Some("auto-rollup: transitioning to timed state before completion"),
2020 states_config,
2021 )?;
2022
2023 current_state = first_timed;
2024 } else {
2025 return Ok(auto_completed);
2027 }
2028 }
2029
2030 let completed_state = "completed";
2032 if !states_config.is_valid_transition(¤t_state, completed_state) {
2033 return Ok(auto_completed);
2035 }
2036
2037 conn.execute(
2038 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?2, worker_id = NULL, claimed_at = NULL WHERE id = ?3",
2039 params![completed_state, now, &parent_id],
2040 )?;
2041
2042 conn.execute(
2044 "DELETE FROM file_locks WHERE task_id = ?1",
2045 params![&parent_id],
2046 )?;
2047
2048 record_state_transition(
2049 conn,
2050 &parent_id,
2051 completed_state,
2052 Some(agent_id),
2053 Some("auto-rollup: all children reached terminal state"),
2054 states_config,
2055 )?;
2056
2057 auto_completed.push((parent_id.clone(), parent_title));
2058
2059 let mut grandparent_completed =
2061 propagate_auto_rollup(conn, &parent_id, agent_id, states_config)?;
2062 auto_completed.append(&mut grandparent_completed);
2063
2064 Ok(auto_completed)
2065}
2066
2067#[allow(clippy::too_many_arguments)]
2072fn create_tree_recursive(
2073 conn: &Connection,
2074 input: &TaskTreeInput,
2075 parent_id: Option<&str>,
2076 prev_sibling_id: Option<&str>,
2077 child_type: Option<&str>,
2078 sibling_type: Option<&str>,
2079 all_ids: &mut Vec<String>,
2080 phase_warnings: &mut Vec<String>,
2081 tag_warnings: &mut Vec<String>,
2082 states_config: &StatesConfig,
2083 phases_config: &PhasesConfig,
2084 tags_config: &TagsConfig,
2085 ids_config: &IdsConfig,
2086) -> Result<String> {
2087 let task_id = if let Some(ref ref_id) = input.ref_id {
2089 let exists: bool = conn.query_row(
2091 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
2092 params![ref_id],
2093 |row| row.get(0),
2094 )?;
2095 if !exists {
2096 return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
2097 }
2098 ref_id.clone()
2099 } else {
2100 let task_id = input
2102 .id
2103 .clone()
2104 .unwrap_or_else(|| generate_task_id(ids_config));
2105 let now = now_ms();
2106 let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
2107 let initial_status = &states_config.initial;
2108
2109 let title = input.title.clone().unwrap_or_else(|| {
2111 input
2112 .description
2113 .as_deref()
2114 .map(|d| crate::format::truncate_title(d).into_owned())
2115 .unwrap_or_default()
2116 });
2117
2118 if let Some(ref phase) = input.phase
2120 && let Some(warning) = phases_config.check_phase(phase)?
2121 {
2122 phase_warnings.push(format!("Task '{}': {}", task_id, warning));
2123 }
2124
2125 let needed_tags = input.needed_tags.clone().unwrap_or_default();
2126 let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
2127 let tags = input.tags.clone().unwrap_or_default();
2128
2129 for warning in tags_config.validate_tags(&tags)? {
2131 tag_warnings.push(format!("Task '{}': {}", task_id, warning));
2132 }
2133 for warning in tags_config.validate_tags(&needed_tags)? {
2134 tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
2135 }
2136 for warning in tags_config.validate_tags(&wanted_tags)? {
2137 tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
2138 }
2139
2140 let needed_tags_json = serde_json::to_string(&needed_tags)?;
2141 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
2142 let tags_json = serde_json::to_string(&tags)?;
2143
2144 conn.execute(
2145 "INSERT INTO tasks (
2146 id, title, description, status, phase, priority,
2147 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
2148 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
2149 params![
2150 &task_id,
2151 &title,
2152 &input.description,
2153 initial_status,
2154 &input.phase,
2155 priority.to_string(),
2156 needed_tags_json,
2157 wanted_tags_json,
2158 tags_json,
2159 input.points,
2160 input.time_estimate_ms,
2161 now,
2162 now,
2163 ],
2164 )?;
2165
2166 record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
2168
2169 sync_task_tags(conn, &task_id, &tags)?;
2171 sync_needed_tags(conn, &task_id, &needed_tags)?;
2172 sync_wanted_tags(conn, &task_id, &wanted_tags)?;
2173
2174 task_id
2175 };
2176
2177 if let (Some(pid), Some(ct)) = (parent_id, child_type) {
2179 Database::add_dependency_internal(conn, pid, &task_id, ct)?;
2180 }
2181
2182 if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
2184 Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
2185 }
2186
2187 all_ids.push(task_id.clone());
2188
2189 for blocker_id in &input.blocked_by {
2191 let blocker_exists = all_ids.iter().any(|id| id == blocker_id) || {
2193 let exists: bool = conn.query_row(
2194 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
2195 params![blocker_id],
2196 |row| row.get(0),
2197 )?;
2198 exists
2199 };
2200 if !blocker_exists {
2201 return Err(anyhow::anyhow!(
2202 "blocked_by task '{}' not found (not created earlier in tree and not in database)",
2203 blocker_id
2204 ));
2205 }
2206 Database::add_dependency_internal(conn, blocker_id, &task_id, "blocks")?;
2207 }
2208
2209 let mut prev_child_id: Option<String> = None;
2211 for child in input.children.iter() {
2212 let child_id = create_tree_recursive(
2213 conn,
2214 child,
2215 Some(&task_id),
2216 prev_child_id.as_deref(),
2217 child_type,
2218 sibling_type,
2219 all_ids,
2220 phase_warnings,
2221 tag_warnings,
2222 states_config,
2223 phases_config,
2224 tags_config,
2225 ids_config,
2226 )?;
2227 prev_child_id = Some(child_id);
2228 }
2229
2230 Ok(task_id)
2231}