1use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6 AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10 PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11 parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17fn generate_task_id(ids_config: &IdsConfig) -> String {
20 let words = ids_config.task_id_words;
21 let case = ids_config.id_case;
22
23 let base = Petnames::medium()
25 .generate_one(words, "-")
26 .unwrap_or_else(|| format!("task-{}", super::now_ms()));
27
28 case.convert(&base)
30}
31
32fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
35 let field = match sort_by {
36 Some("priority") => "CAST(t.priority AS INTEGER)",
37 Some("created_at") => "t.created_at",
38 Some("updated_at") => "t.updated_at",
39 _ => "t.created_at", };
41
42 let order = match sort_order {
43 Some("asc") => "ASC",
44 Some("desc") => "DESC",
45 _ => {
46 "DESC"
48 }
49 };
50
51 format!("{} {}", field, order)
52}
53
54fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
61 conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
62 for tag in tags {
63 conn.execute(
64 "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
65 params![task_id, tag],
66 )?;
67 }
68 Ok(())
69}
70
71fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
73 conn.execute(
74 "DELETE FROM task_needed_tags WHERE task_id = ?1",
75 params![task_id],
76 )?;
77 for tag in tags {
78 conn.execute(
79 "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
80 params![task_id, tag],
81 )?;
82 }
83 Ok(())
84}
85
86fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
88 conn.execute(
89 "DELETE FROM task_wanted_tags WHERE task_id = ?1",
90 params![task_id],
91 )?;
92 for tag in tags {
93 conn.execute(
94 "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
95 params![task_id, tag],
96 )?;
97 }
98 Ok(())
99}
100
101pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
102 let id: String = row.get("id")?;
103 let title: String = row.get("title")?;
104 let description: Option<String> = row.get("description")?;
105 let status: String = row.get("status")?;
106 let phase: Option<String> = row.get("phase")?;
107 let priority: String = row.get("priority")?;
108 let worker_id: Option<String> = row.get("worker_id")?;
109 let claimed_at: Option<i64> = row.get("claimed_at")?;
110
111 let needed_tags_json: Option<String> = row.get("needed_tags")?;
112 let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
113 let tags_json: Option<String> = row.get("tags")?;
114
115 let points: Option<i32> = row.get("points")?;
116 let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
117 let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
118 let started_at: Option<i64> = row.get("started_at")?;
119 let completed_at: Option<i64> = row.get("completed_at")?;
120
121 let current_thought: Option<String> = row.get("current_thought")?;
122
123 let cost_usd: f64 = row.get("cost_usd")?;
124 let metric_0: i64 = row.get("metric_0")?;
125 let metric_1: i64 = row.get("metric_1")?;
126 let metric_2: i64 = row.get("metric_2")?;
127 let metric_3: i64 = row.get("metric_3")?;
128 let metric_4: i64 = row.get("metric_4")?;
129 let metric_5: i64 = row.get("metric_5")?;
130 let metric_6: i64 = row.get("metric_6")?;
131 let metric_7: i64 = row.get("metric_7")?;
132
133 let created_at: i64 = row.get("created_at")?;
134 let updated_at: i64 = row.get("updated_at")?;
135
136 Ok(Task {
137 id,
138 title,
139 description,
140 status,
141 phase,
142 priority: parse_priority(&priority),
143 worker_id,
144 claimed_at,
145 needed_tags: needed_tags_json
146 .map(|s| serde_json::from_str(&s).unwrap_or_default())
147 .unwrap_or_default(),
148 wanted_tags: wanted_tags_json
149 .map(|s| serde_json::from_str(&s).unwrap_or_default())
150 .unwrap_or_default(),
151 tags: tags_json
152 .map(|s| serde_json::from_str(&s).unwrap_or_default())
153 .unwrap_or_default(),
154 points,
155 time_estimate_ms,
156 time_actual_ms,
157 started_at,
158 completed_at,
159 current_thought,
160 cost_usd,
161 metrics: [
162 metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
163 ],
164 created_at,
165 updated_at,
166 })
167}
168
169fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
171 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
172
173 let result = stmt.query_row(params![task_id], parse_task_row);
174
175 match result {
176 Ok(task) => Ok(Some(task)),
177 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
178 Err(e) => Err(e.into()),
179 }
180}
181
182fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
184 let mut stmt = conn.prepare(
185 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow
186 FROM workers WHERE id = ?1",
187 )?;
188
189 let result = stmt.query_row(params![worker_id], |row| {
190 let id: String = row.get(0)?;
191 let tags_json: String = row.get(1)?;
192 let max_claims: i32 = row.get(2)?;
193 let registered_at: i64 = row.get(3)?;
194 let last_heartbeat: i64 = row.get(4)?;
195 let last_status: Option<String> = row.get(5)?;
196 let last_phase: Option<String> = row.get(6)?;
197 let workflow: Option<String> = row.get(7)?;
198
199 Ok((
200 id,
201 tags_json,
202 max_claims,
203 registered_at,
204 last_heartbeat,
205 last_status,
206 last_phase,
207 workflow,
208 ))
209 });
210
211 match result {
212 Ok((
213 id,
214 tags_json,
215 max_claims,
216 registered_at,
217 last_heartbeat,
218 last_status,
219 last_phase,
220 workflow,
221 )) => {
222 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
223 Ok(Some(Worker {
224 id,
225 tags,
226 max_claims,
227 registered_at,
228 last_heartbeat,
229 last_status,
230 last_phase,
231 workflow,
232 }))
233 }
234 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
235 Err(e) => Err(e.into()),
236 }
237}
238
239impl Database {
240 #[allow(clippy::too_many_arguments)]
244 pub fn create_task(
245 &self,
246 id: Option<String>,
247 description: String,
248 parent_id: Option<String>,
249 phase: Option<String>,
250 priority: Option<Priority>,
251 points: Option<i32>,
252 time_estimate_ms: Option<i64>,
253 agent_tags_all: Option<Vec<String>>,
254 agent_tags_any: Option<Vec<String>>,
255 tags: Option<Vec<String>>,
256 states_config: &StatesConfig,
257 ids_config: &IdsConfig,
258 ) -> Result<Task> {
259 let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
260 let now = now_ms();
261 let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
262 let initial_status = &states_config.initial;
263
264 let needed_tags = agent_tags_all.unwrap_or_default();
265 let wanted_tags = agent_tags_any.unwrap_or_default();
266 let tags = tags.unwrap_or_default();
267 let needed_tags_json = serde_json::to_string(&needed_tags)?;
268 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
269 let tags_json = serde_json::to_string(&tags)?;
270
271 self.with_conn_mut(|conn| {
272 let tx = conn.transaction()?;
273
274 tx.execute(
275 "INSERT INTO tasks (
276 id, title, description, status, phase, priority,
277 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
278 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
279 params![
280 &task_id,
281 &description, &description, initial_status,
284 &phase,
285 priority.to_string(),
286 needed_tags_json,
287 wanted_tags_json,
288 tags_json,
289 points,
290 time_estimate_ms,
291 now,
292 now,
293 ],
294 )?;
295
296 sync_task_tags(&tx, &task_id, &tags)?;
298 sync_needed_tags(&tx, &task_id, &needed_tags)?;
299 sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
300
301 if let Some(ref pid) = parent_id {
303 Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
304 }
305
306 record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
308
309 tx.commit()?;
310
311 Ok(Task {
312 id: task_id,
313 title: description.clone(),
314 description: Some(description),
315 status: initial_status.clone(),
316 phase,
317 priority,
318 worker_id: None,
319 claimed_at: None,
320 needed_tags,
321 wanted_tags,
322 tags,
323 points,
324 time_estimate_ms,
325 time_actual_ms: None,
326 started_at: None,
327 completed_at: None,
328 current_thought: None,
329 cost_usd: 0.0,
330 metrics: [0; 8],
331 created_at: now,
332 updated_at: now,
333 })
334 })
335 }
336
337 pub fn create_task_simple(
340 &self,
341 description: impl Into<String>,
342 states_config: &StatesConfig,
343 ids_config: &IdsConfig,
344 ) -> Result<Task> {
345 self.create_task(
346 None,
347 description.into(),
348 None,
349 None,
350 None,
351 None,
352 None,
353 None,
354 None,
355 None,
356 states_config,
357 ids_config,
358 )
359 }
360
361 pub fn create_task_tree(
365 &self,
366 input: TaskTreeInput,
367 parent_id: Option<String>,
368 child_type: Option<String>,
369 sibling_type: Option<String>,
370 states_config: &StatesConfig,
371 phases_config: &PhasesConfig,
372 tags_config: &TagsConfig,
373 ids_config: &IdsConfig,
374 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
375 let mut all_ids = Vec::new();
376 let mut phase_warnings = Vec::new();
377 let mut tag_warnings = Vec::new();
378 let child_type = child_type.or_else(|| Some("contains".to_string()));
380
381 self.with_conn_mut(|conn| {
382 let tx = conn.transaction()?;
383 let root_id = create_tree_recursive(
384 &tx,
385 &input,
386 parent_id.as_deref(),
387 None, child_type.as_deref(),
389 sibling_type.as_deref(),
390 &mut all_ids,
391 &mut phase_warnings,
392 &mut tag_warnings,
393 states_config,
394 phases_config,
395 tags_config,
396 ids_config,
397 )?;
398 tx.commit()?;
399 Ok((root_id, all_ids, phase_warnings, tag_warnings))
400 })
401 }
402
403 pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
405 self.with_conn(|conn| {
406 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
407
408 let result = stmt.query_row(params![task_id], parse_task_row);
409
410 match result {
411 Ok(task) => Ok(Some(task)),
412 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
413 Err(e) => Err(e.into()),
414 }
415 })
416 }
417
418 pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
420 let task = self.get_task(task_id)?;
421 match task {
422 None => Ok(None),
423 Some(task) => {
424 let children = self.get_children_recursive(&task.id)?;
425 Ok(Some(TaskTree { task, children }))
426 }
427 }
428 }
429
430 fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
432 let children = self.get_children(parent_id)?;
433 let mut result = Vec::new();
434
435 for child in children {
436 let child_children = self.get_children_recursive(&child.id)?;
437 result.push(TaskTree {
438 task: child,
439 children: child_children,
440 });
441 }
442
443 Ok(result)
444 }
445
446 pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
448 self.with_conn(|conn| {
449 let mut stmt = conn.prepare(
450 "SELECT t.* FROM tasks t
451 INNER JOIN dependencies d ON t.id = d.to_task_id
452 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
453 ORDER BY t.created_at",
454 )?;
455
456 let tasks = stmt
457 .query_map(params![parent_id], parse_task_row)?
458 .filter_map(|r| r.ok())
459 .collect();
460
461 Ok(tasks)
462 })
463 }
464
465 #[allow(clippy::too_many_arguments)]
467 pub fn update_task(
468 &self,
469 task_id: &str,
470 title: Option<String>,
471 description: Option<Option<String>>,
472 status: Option<String>,
473 priority: Option<Priority>,
474 points: Option<Option<i32>>,
475 tags: Option<Vec<String>>,
476 states_config: &StatesConfig,
477 ) -> Result<Task> {
478 let now = now_ms();
479
480 self.with_conn(|conn| {
481 let task =
482 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
483
484 let new_title = title.unwrap_or(task.title.clone());
485 let new_description = description.unwrap_or(task.description.clone());
486 let new_status = status.unwrap_or(task.status.clone());
487 let new_priority = priority.unwrap_or(task.priority);
488 let new_points = points.unwrap_or(task.points);
489 let new_tags = tags.unwrap_or(task.tags.clone());
490
491 if !states_config.is_valid_state(&new_status) {
493 return Err(anyhow!(
494 "Invalid state '{}'. Valid states: {:?}",
495 new_status,
496 states_config.state_names()
497 ));
498 }
499
500 if task.status != new_status
502 && !states_config.is_valid_transition(&task.status, &new_status)
503 {
504 let exits = states_config.get_exits(&task.status);
505 return Err(anyhow!(
506 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
507 task.status,
508 new_status,
509 exits
510 ));
511 }
512
513 let started_at =
516 if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
517 Some(now)
518 } else {
519 task.started_at
520 };
521
522 let completed_at = if new_status == "completed" {
524 Some(now)
525 } else {
526 task.completed_at
527 };
528
529 if task.status != new_status {
531 record_state_transition(
532 conn,
533 task_id,
534 &new_status,
535 task.worker_id.as_deref(),
536 None,
537 states_config,
538 )?;
539 }
540
541 conn.execute(
542 "UPDATE tasks SET
543 title = ?1, description = ?2, status = ?3, priority = ?4,
544 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
545 tags = ?9
546 WHERE id = ?10",
547 params![
548 new_title,
549 new_description,
550 new_status,
551 new_priority.to_string(),
552 new_points,
553 started_at,
554 completed_at,
555 now,
556 serde_json::to_string(&new_tags)?,
557 task_id,
558 ],
559 )?;
560
561 Ok(Task {
562 id: task_id.to_string(),
563 title: new_title,
564 description: new_description,
565 status: new_status,
566 priority: new_priority,
567 points: new_points,
568 tags: new_tags,
569 started_at,
570 completed_at,
571 updated_at: now,
572 ..task
573 })
574 })
575 }
576
577 #[allow(clippy::too_many_arguments)]
589 pub fn update_task_unified(
590 &self,
591 task_id: &str,
592 agent_id: &str,
593 assignee: Option<&str>,
594 title: Option<String>,
595 description: Option<Option<String>>,
596 status: Option<String>,
597 phase: Option<String>,
598 priority: Option<Priority>,
599 points: Option<Option<i32>>,
600 tags: Option<Vec<String>>,
601 needed_tags: Option<Vec<String>>,
602 wanted_tags: Option<Vec<String>>,
603 time_estimate_ms: Option<i64>,
604 reason: Option<String>,
605 force: bool,
606 states_config: &StatesConfig,
607 deps_config: &DependenciesConfig,
608 auto_advance: &AutoAdvanceConfig,
609 ) -> Result<(Task, Vec<String>, Vec<String>)> {
610 let now = now_ms();
611
612 self.with_conn_mut(|conn| {
613 let tx = conn.transaction()?;
614
615 let task =
616 get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
617
618 if let Some(ref current_owner) = task.worker_id
620 && current_owner != agent_id && !force {
621 return Err(anyhow!(
622 "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
623 current_owner
624 ));
625 }
626
627 let new_title = title.unwrap_or(task.title.clone());
628 let new_description = description.unwrap_or(task.description.clone());
629 let new_status = if assignee.is_some() && status.is_none() {
631 "assigned".to_string()
632 } else {
633 status.unwrap_or(task.status.clone())
634 };
635 let new_priority = priority.unwrap_or(task.priority);
636 let new_points = points.unwrap_or(task.points);
637 let new_tags = tags.unwrap_or(task.tags.clone());
638 let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
639 let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
640 let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
641 let new_phase = phase.or(task.phase.clone());
642
643 if !states_config.is_valid_state(&new_status) {
645 return Err(anyhow!(
646 "Invalid state '{}'. Valid states: {:?}",
647 new_status,
648 states_config.state_names()
649 ));
650 }
651
652 if task.status != new_status
654 && !states_config.is_valid_transition(&task.status, &new_status) {
655 let exits = states_config.get_exits(&task.status);
656 return Err(anyhow!(
657 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
658 task.status,
659 new_status,
660 exits
661 ));
662 }
663
664 let new_is_timed = states_config.is_timed_state(&new_status);
666 let new_is_terminal = states_config.is_terminal_state(&new_status);
667 let current_owner = task.worker_id.as_deref();
668 let is_owned_by_agent = current_owner == Some(agent_id);
669 let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
670
671 let mut new_owner: Option<String> = task.worker_id.clone();
672 let mut new_claimed_at: Option<i64> = task.claimed_at;
673
674 if let Some(target_agent) = assignee {
677 if is_owned_by_other && !force {
679 return Err(anyhow!(
680 "Task is already claimed by agent '{}'. Use force=true to reassign.",
681 current_owner.unwrap()
682 ));
683 }
684
685 let target = get_worker_internal(&tx, target_agent)?
687 .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
688
689 if !task.needed_tags.is_empty() {
691 for needed in &task.needed_tags {
692 if !target.tags.contains(needed) {
693 return Err(anyhow!(
694 "Assignee '{}' missing required tag: {}",
695 target_agent,
696 needed
697 ));
698 }
699 }
700 }
701
702 if !task.wanted_tags.is_empty() {
703 let has_any = task
704 .wanted_tags
705 .iter()
706 .any(|wanted| target.tags.contains(wanted));
707 if !has_any {
708 return Err(anyhow!(
709 "Assignee '{}' has none of the wanted tags: {:?}",
710 target_agent,
711 task.wanted_tags
712 ));
713 }
714 }
715
716 new_owner = Some(target_agent.to_string());
718 new_claimed_at = Some(now);
719 }
720
721 if new_is_timed && !is_owned_by_agent {
724 if is_owned_by_other && !force {
726 return Err(anyhow!(
727 "Task is already claimed by agent '{}'",
728 current_owner.unwrap()
729 ));
730 }
731
732 if !force {
734 let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
735 &tx,
736 task_id,
737 states_config,
738 deps_config,
739 )?;
740 if !unsatisfied_blockers.is_empty() {
741 return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
744 }
745 }
746
747 let agent = get_worker_internal(&tx, agent_id)?
749 .ok_or_else(|| anyhow!("Agent not found"))?;
750
751 if !task.needed_tags.is_empty() {
753 for needed in &task.needed_tags {
754 if !agent.tags.contains(needed) {
755 return Err(anyhow!("Agent missing required tag: {}", needed));
756 }
757 }
758 }
759
760 if !task.wanted_tags.is_empty() {
762 let has_any = task
763 .wanted_tags
764 .iter()
765 .any(|wanted| agent.tags.contains(wanted));
766 if !has_any {
767 return Err(anyhow!("Agent has none of the wanted tags"));
768 }
769 }
770
771 new_owner = Some(agent_id.to_string());
773 new_claimed_at = Some(now);
774
775 tx.execute(
777 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
778 params![now, agent_id],
779 )?;
780 }
781
782 if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
784 if is_owned_by_other && !force {
786 return Err(anyhow!("Task is not owned by this agent"));
787 }
788
789 new_owner = None;
791 new_claimed_at = None;
792 }
793
794 if new_is_terminal {
796 if let Some(ref current_owner) = task.worker_id
798 && current_owner != agent_id && !force {
799 return Err(anyhow!("Task is not owned by this agent"));
800 }
801
802 let incomplete_children: i32 = tx.query_row(
804 "SELECT COUNT(*) FROM dependencies d
805 INNER JOIN tasks child ON d.to_task_id = child.id
806 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
807 AND child.status IN (SELECT value FROM json_each(?2))",
808 params![
809 task_id,
810 serde_json::to_string(&states_config.blocking_states)?
811 ],
812 |row| row.get(0),
813 )?;
814
815 if incomplete_children > 0 {
816 return Err(anyhow!(
817 "Cannot complete task: {} child task(s) are not complete",
818 incomplete_children
819 ));
820 }
821
822 new_owner = None;
824 new_claimed_at = None;
825
826 tx.execute(
828 "DELETE FROM file_locks WHERE task_id = ?1",
829 params![task_id],
830 )?;
831 }
832
833 let started_at =
835 if task.started_at.is_none() && new_is_timed {
836 Some(now)
837 } else {
838 task.started_at
839 };
840
841 let completed_at = if new_status == "completed" {
843 Some(now)
844 } else {
845 task.completed_at
846 };
847
848 let status_changed = task.status != new_status;
850 if status_changed {
851 record_state_transition(
852 &tx,
853 task_id,
854 &new_status,
855 new_owner.as_deref(),
856 reason.as_deref(),
857 states_config,
858 )?;
859 }
860
861 let phase_changed = task.phase != new_phase;
863 if phase_changed {
864 super::state_transitions::record_phase_transition(
865 &tx,
866 task_id,
867 new_phase.as_deref().unwrap_or(""),
868 Some(agent_id),
869 reason.as_deref(),
870 )?;
871 }
872
873 tx.execute(
874 "UPDATE tasks SET
875 title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
876 points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
877 tags = ?10, worker_id = ?11, claimed_at = ?12,
878 needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
879 WHERE id = ?16",
880 params![
881 new_title,
882 new_description,
883 new_status,
884 new_phase,
885 new_priority.to_string(),
886 new_points,
887 started_at,
888 completed_at,
889 now,
890 serde_json::to_string(&new_tags)?,
891 new_owner,
892 new_claimed_at,
893 serde_json::to_string(&new_needed_tags)?,
894 serde_json::to_string(&new_wanted_tags)?,
895 new_time_estimate_ms,
896 task_id,
897 ],
898 )?;
899
900 if new_tags != task.tags {
902 sync_task_tags(&tx, task_id, &new_tags)?;
903 }
904 if new_needed_tags != task.needed_tags {
905 sync_needed_tags(&tx, task_id, &new_needed_tags)?;
906 }
907 if new_wanted_tags != task.wanted_tags {
908 sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
909 }
910
911 let (unblocked, auto_advanced) = if status_changed {
913 let was_blocking = states_config.is_blocking_state(&task.status);
914 let is_blocking = states_config.is_blocking_state(&new_status);
915
916 if was_blocking && !is_blocking {
917 super::deps::propagate_unblock_effects(
918 &tx,
919 task_id,
920 Some(agent_id),
921 states_config,
922 deps_config,
923 auto_advance,
924 )?
925 } else {
926 (vec![], vec![])
927 }
928 } else {
929 (vec![], vec![])
930 };
931
932 tx.commit()?;
933
934 Ok((Task {
935 id: task_id.to_string(),
936 title: new_title,
937 description: new_description,
938 status: new_status,
939 phase: new_phase,
940 priority: new_priority,
941 points: new_points,
942 tags: new_tags,
943 needed_tags: new_needed_tags,
944 wanted_tags: new_wanted_tags,
945 time_estimate_ms: new_time_estimate_ms,
946 started_at,
947 completed_at,
948 updated_at: now,
949 worker_id: new_owner,
950 claimed_at: new_claimed_at,
951 ..task
952 }, unblocked, auto_advanced))
953 })
954 }
955
956 pub fn delete_task(
964 &self,
965 task_id: &str,
966 worker_id: &str,
967 cascade: bool,
968 reason: Option<String>,
969 obliterate: bool,
970 force: bool,
971 ) -> Result<()> {
972 let now = now_ms();
973
974 self.with_conn_mut(|conn| {
975 let tx = conn.transaction()?;
976
977 let task = get_task_internal(&tx, task_id)?
979 .ok_or_else(|| anyhow!("Task not found"))?;
980
981 if let Some(ref owner) = task.worker_id
983 && owner != worker_id && !force {
984 return Err(anyhow!(
985 "Task is claimed by worker '{}'. Use force=true to override.",
986 owner
987 ));
988 }
989
990 if obliterate {
991 if cascade {
993 tx.execute(
996 "WITH RECURSIVE descendants AS (
997 SELECT ?1 AS id
998 UNION ALL
999 SELECT dep.to_task_id FROM dependencies dep
1000 INNER JOIN descendants d ON dep.from_task_id = d.id
1001 WHERE dep.dep_type = 'contains'
1002 )
1003 DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1004 params![task_id],
1005 )?;
1006 } else {
1007 let child_count: i32 = tx.query_row(
1009 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1010 params![task_id],
1011 |row| row.get(0),
1012 )?;
1013
1014 if child_count > 0 {
1015 return Err(anyhow!("Task has children; use cascade=true to delete"));
1016 }
1017
1018 tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1019 }
1020 } else {
1021 if cascade {
1023 tx.execute(
1025 "WITH RECURSIVE descendants AS (
1026 SELECT ?1 AS id
1027 UNION ALL
1028 SELECT dep.to_task_id FROM dependencies dep
1029 INNER JOIN descendants d ON dep.from_task_id = d.id
1030 WHERE dep.dep_type = 'contains'
1031 )
1032 UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1033 WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1034 params![task_id, now, worker_id, reason],
1035 )?;
1036 } else {
1037 let child_count: i32 = tx.query_row(
1039 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1040 params![task_id],
1041 |row| row.get(0),
1042 )?;
1043
1044 if child_count > 0 {
1045 return Err(anyhow!("Task has children; use cascade=true to delete"));
1046 }
1047
1048 tx.execute(
1049 "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1050 params![now, worker_id, reason, task_id],
1051 )?;
1052 }
1053 }
1054
1055 tx.commit()?;
1056 Ok(())
1057 })
1058 }
1059
1060 pub fn list_tasks(
1063 &self,
1064 status: Option<&str>,
1065 phase: Option<&str>,
1066 owner: Option<&str>,
1067 parent_id: Option<Option<&str>>,
1068 limit: Option<i32>,
1069 offset: i32,
1070 sort_by: Option<&str>,
1071 sort_order: Option<&str>,
1072 ) -> Result<Vec<Task>> {
1073 self.with_conn(|conn| {
1074 let mut sql = String::from(
1075 "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1076 );
1077 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1078
1079 if let Some(s) = status {
1080 sql.push_str(" AND t.status = ?");
1081 params_vec.push(Box::new(s.to_string()));
1082 }
1083
1084 if let Some(p) = phase {
1085 sql.push_str(" AND t.phase = ?");
1086 params_vec.push(Box::new(p.to_string()));
1087 }
1088
1089 if let Some(o) = owner {
1090 sql.push_str(" AND t.worker_id = ?");
1091 params_vec.push(Box::new(o.to_string()));
1092 }
1093
1094 if let Some(p) = parent_id {
1096 match p {
1097 Some(pid) => {
1098 sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1099 params_vec.push(Box::new(pid.to_string()));
1100 }
1101 None => {
1102 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1104 }
1105 }
1106 }
1107
1108 let order_clause = build_order_clause(sort_by, sort_order);
1110 sql.push_str(&format!(" ORDER BY {}", order_clause));
1111
1112 if let Some(l) = limit {
1113 sql.push_str(&format!(" LIMIT {}", l));
1114 }
1115
1116 if offset > 0 {
1117 sql.push_str(&format!(" OFFSET {}", offset));
1118 }
1119
1120 let params_refs: Vec<&dyn rusqlite::ToSql> =
1121 params_vec.iter().map(|b| b.as_ref()).collect();
1122
1123 let mut stmt = conn.prepare(&sql)?;
1124 let tasks = stmt
1125 .query_map(params_refs.as_slice(), parse_task_row)?
1126 .filter_map(|r| r.ok())
1127 .collect();
1128
1129 Ok(tasks)
1130 })
1131 }
1132
1133 pub fn set_thought(
1135 &self,
1136 agent_id: &str,
1137 thought: Option<String>,
1138 task_ids: Option<Vec<String>>,
1139 ) -> Result<i32> {
1140 let now = now_ms();
1141
1142 self.with_conn(|conn| {
1143 let updated = if let Some(ids) = task_ids {
1144 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1145 let sql = format!(
1146 "UPDATE tasks SET current_thought = ?, updated_at = ?
1147 WHERE worker_id = ? AND id IN ({})",
1148 placeholders.join(", ")
1149 );
1150
1151 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1152 params_vec.push(Box::new(thought.clone()));
1153 params_vec.push(Box::new(now));
1154 params_vec.push(Box::new(agent_id.to_string()));
1155 for id in &ids {
1156 params_vec.push(Box::new(id.clone()));
1157 }
1158
1159 let params_refs: Vec<&dyn rusqlite::ToSql> =
1160 params_vec.iter().map(|b| b.as_ref()).collect();
1161 conn.execute(&sql, params_refs.as_slice())?
1162 } else {
1163 conn.execute(
1164 "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1165 params![thought, now, agent_id],
1166 )?
1167 };
1168
1169 Ok(updated as i32)
1170 })
1171 }
1172
1173 pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1175 let now = now_ms();
1176
1177 self.with_conn(|conn| {
1178 conn.execute(
1179 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1180 WHERE id = ?3",
1181 params![duration_ms, now, task_id],
1182 )?;
1183
1184 let total: i64 = conn.query_row(
1185 "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1186 params![task_id],
1187 |row| row.get(0),
1188 )?;
1189
1190 Ok(total)
1191 })
1192 }
1193
1194 pub fn log_metrics(
1197 &self,
1198 task_id: &str,
1199 cost_usd: Option<f64>,
1200 values: &[i64],
1201 ) -> Result<Task> {
1202 let now = now_ms();
1203
1204 self.with_conn(|conn| {
1205 let task =
1206 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1207
1208 let mut new_metrics = task.metrics;
1210 for (i, &val) in values.iter().take(8).enumerate() {
1211 new_metrics[i] += val;
1212 }
1213
1214 let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1215
1216 conn.execute(
1217 "UPDATE tasks SET
1218 metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1219 metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1220 cost_usd = ?9, updated_at = ?10
1221 WHERE id = ?11",
1222 params![
1223 new_metrics[0],
1224 new_metrics[1],
1225 new_metrics[2],
1226 new_metrics[3],
1227 new_metrics[4],
1228 new_metrics[5],
1229 new_metrics[6],
1230 new_metrics[7],
1231 new_cost_usd,
1232 now,
1233 task_id,
1234 ],
1235 )?;
1236
1237 Ok(Task {
1238 cost_usd: new_cost_usd,
1239 metrics: new_metrics,
1240 updated_at: now,
1241 ..task
1242 })
1243 })
1244 }
1245
1246 pub fn claim_task(
1249 &self,
1250 task_id: &str,
1251 agent_id: &str,
1252 states_config: &StatesConfig,
1253 ) -> Result<Task> {
1254 let now = now_ms();
1255
1256 let claim_status = states_config
1258 .definitions
1259 .iter()
1260 .find(|(_, def)| def.timed)
1261 .map(|(name, _)| name.as_str())
1262 .unwrap_or("working");
1263
1264 self.with_conn(|conn| {
1265 let task =
1267 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1268
1269 if task.worker_id.is_some() {
1271 return Err(anyhow!("Task is already claimed"));
1272 }
1273
1274 if !states_config.is_valid_transition(&task.status, claim_status) {
1276 let exits = states_config.get_exits(&task.status);
1277 return Err(anyhow!(
1278 "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1279 task.status,
1280 exits
1281 ));
1282 }
1283
1284 let agent =
1286 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1287
1288 if !task.needed_tags.is_empty() {
1290 for needed in &task.needed_tags {
1291 if !agent.tags.contains(needed) {
1292 return Err(anyhow!("Agent missing required tag: {}", needed));
1293 }
1294 }
1295 }
1296
1297 if !task.wanted_tags.is_empty() {
1299 let has_any = task
1300 .wanted_tags
1301 .iter()
1302 .any(|wanted| agent.tags.contains(wanted));
1303 if !has_any {
1304 return Err(anyhow!("Agent has none of the wanted tags"));
1305 }
1306 }
1307
1308 conn.execute(
1309 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1310 WHERE id = ?6",
1311 params![agent_id, now, claim_status, now, now, task_id,],
1312 )?;
1313
1314 record_state_transition(
1316 conn,
1317 task_id,
1318 claim_status,
1319 Some(agent_id),
1320 None,
1321 states_config,
1322 )?;
1323
1324 conn.execute(
1326 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1327 params![now, agent_id],
1328 )?;
1329
1330 Ok(Task {
1331 worker_id: Some(agent_id.to_string()),
1332 claimed_at: Some(now),
1333 status: claim_status.to_string(),
1334 started_at: Some(now),
1335 updated_at: now,
1336 ..task
1337 })
1338 })
1339 }
1340
1341 pub fn release_task(
1343 &self,
1344 task_id: &str,
1345 agent_id: &str,
1346 states_config: &StatesConfig,
1347 ) -> Result<()> {
1348 let now = now_ms();
1349 let release_status = &states_config.initial;
1350
1351 self.with_conn(|conn| {
1352 let task =
1353 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1354
1355 if task.worker_id.as_deref() != Some(agent_id) {
1356 return Err(anyhow!("Task is not owned by this agent"));
1357 }
1358
1359 record_state_transition(
1361 conn,
1362 task_id,
1363 release_status,
1364 Some(agent_id),
1365 None,
1366 states_config,
1367 )?;
1368
1369 conn.execute(
1370 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1371 WHERE id = ?3",
1372 params![release_status, now, task_id],
1373 )?;
1374
1375 Ok(())
1376 })
1377 }
1378
1379 pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1381 let now = now_ms();
1382 let release_status = &states_config.initial;
1383
1384 self.with_conn(|conn| {
1385 let task =
1386 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1387
1388 record_state_transition(
1390 conn,
1391 task_id,
1392 release_status,
1393 task.worker_id.as_deref(),
1394 None,
1395 states_config,
1396 )?;
1397
1398 conn.execute(
1399 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1400 WHERE id = ?3",
1401 params![release_status, now, task_id],
1402 )?;
1403
1404 Ok(())
1405 })
1406 }
1407
1408 pub fn force_claim_task(
1410 &self,
1411 task_id: &str,
1412 agent_id: &str,
1413 states_config: &StatesConfig,
1414 ) -> Result<Task> {
1415 let now = now_ms();
1416
1417 let claim_status = states_config
1419 .definitions
1420 .iter()
1421 .find(|(_, def)| def.timed)
1422 .map(|(name, _)| name.as_str())
1423 .unwrap_or("working");
1424
1425 self.with_conn(|conn| {
1426 let task =
1428 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1429
1430 let agent =
1432 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1433
1434 if !task.needed_tags.is_empty() {
1436 for needed in &task.needed_tags {
1437 if !agent.tags.contains(needed) {
1438 return Err(anyhow!("Agent missing required tag: {}", needed));
1439 }
1440 }
1441 }
1442
1443 if !task.wanted_tags.is_empty() {
1445 let has_any = task
1446 .wanted_tags
1447 .iter()
1448 .any(|wanted| agent.tags.contains(wanted));
1449 if !has_any {
1450 return Err(anyhow!("Agent has none of the wanted tags"));
1451 }
1452 }
1453
1454 conn.execute(
1455 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1456 WHERE id = ?6",
1457 params![agent_id, now, claim_status, now, now, task_id,],
1458 )?;
1459
1460 record_state_transition(
1462 conn,
1463 task_id,
1464 claim_status,
1465 Some(agent_id),
1466 None,
1467 states_config,
1468 )?;
1469
1470 conn.execute(
1472 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1473 params![now, agent_id],
1474 )?;
1475
1476 Ok(Task {
1477 worker_id: Some(agent_id.to_string()),
1478 claimed_at: Some(now),
1479 status: claim_status.to_string(),
1480 started_at: task.started_at.or(Some(now)),
1481 updated_at: now,
1482 ..task
1483 })
1484 })
1485 }
1486
1487 pub fn release_task_with_state(
1489 &self,
1490 task_id: &str,
1491 agent_id: &str,
1492 state: &str,
1493 states_config: &StatesConfig,
1494 ) -> Result<()> {
1495 let now = now_ms();
1496
1497 self.with_conn(|conn| {
1498 let task =
1499 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1500
1501 if task.worker_id.as_deref() != Some(agent_id) {
1502 return Err(anyhow!("Task is not owned by this agent"));
1503 }
1504
1505 if !states_config.is_valid_state(state) {
1507 return Err(anyhow!(
1508 "Invalid state '{}'. Valid states: {:?}",
1509 state,
1510 states_config.state_names()
1511 ));
1512 }
1513
1514 if !states_config.is_valid_transition(&task.status, state) {
1516 let exits = states_config.get_exits(&task.status);
1517 return Err(anyhow!(
1518 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1519 task.status,
1520 state,
1521 exits
1522 ));
1523 }
1524
1525 let completed_at = if state == "completed" {
1527 Some(now)
1528 } else {
1529 None
1530 };
1531
1532 record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1534
1535 conn.execute(
1536 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1537 WHERE id = ?4",
1538 params![state, completed_at, now, task_id],
1539 )?;
1540
1541 Ok(())
1542 })
1543 }
1544
1545 pub fn force_release_stale(
1547 &self,
1548 timeout_seconds: i64,
1549 states_config: &StatesConfig,
1550 ) -> Result<i32> {
1551 let now = now_ms();
1552 let cutoff = now - (timeout_seconds * 1000);
1553 let release_status = &states_config.initial;
1554
1555 self.with_conn(|conn| {
1556 let updated = conn.execute(
1557 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1558 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1559 params![release_status, now, cutoff],
1560 )?;
1561
1562 Ok(updated as i32)
1563 })
1564 }
1565
1566 pub fn complete_task(
1570 &self,
1571 task_id: &str,
1572 agent_id: &str,
1573 states_config: &StatesConfig,
1574 ) -> Result<Task> {
1575 let now = now_ms();
1576
1577 let complete_status = if states_config.definitions.contains_key("completed") {
1579 "completed"
1580 } else {
1581 states_config
1583 .definitions
1584 .iter()
1585 .find(|(_, def)| def.exits.is_empty())
1586 .map(|(name, _)| name.as_str())
1587 .unwrap_or("completed")
1588 };
1589
1590 self.with_conn_mut(|conn| {
1591 let tx = conn.transaction()?;
1592
1593 let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1595 let task = stmt
1596 .query_row(params![task_id], parse_task_row)
1597 .map_err(|_| anyhow!("Task not found"))?;
1598 drop(stmt);
1599
1600 if task.worker_id.as_deref() != Some(agent_id) {
1602 return Err(anyhow!("Task is not owned by this agent"));
1603 }
1604
1605 let incomplete_children: i32 = tx.query_row(
1607 "SELECT COUNT(*) FROM dependencies d
1608 INNER JOIN tasks child ON d.to_task_id = child.id
1609 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1610 AND child.status IN (SELECT value FROM json_each(?2))",
1611 params![
1612 task_id,
1613 serde_json::to_string(&states_config.blocking_states)?
1614 ],
1615 |row| row.get(0),
1616 )?;
1617
1618 if incomplete_children > 0 {
1619 return Err(anyhow!(
1620 "Cannot complete task: {} child task(s) are not complete",
1621 incomplete_children
1622 ));
1623 }
1624
1625 if !states_config.is_valid_transition(&task.status, complete_status) {
1627 let exits = states_config.get_exits(&task.status);
1628 return Err(anyhow!(
1629 "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1630 task.status,
1631 exits
1632 ));
1633 }
1634
1635 record_state_transition(
1637 &tx,
1638 task_id,
1639 complete_status,
1640 Some(agent_id),
1641 None,
1642 states_config,
1643 )?;
1644
1645 tx.execute(
1647 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1648 worker_id = NULL, claimed_at = NULL
1649 WHERE id = ?4",
1650 params![complete_status, now, now, task_id],
1651 )?;
1652
1653 tx.execute(
1655 "DELETE FROM file_locks WHERE task_id = ?1",
1656 params![task_id],
1657 )?;
1658
1659 tx.execute(
1661 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1662 params![now, agent_id],
1663 )?;
1664
1665 tx.commit()?;
1666
1667 Ok(Task {
1668 status: complete_status.to_string(),
1669 completed_at: Some(now),
1670 updated_at: now,
1671 worker_id: None,
1672 claimed_at: None,
1673 ..task
1674 })
1675 })
1676 }
1677
1678 pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1680 self.with_conn(|conn| {
1681 let mut stmt =
1682 conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1683 let tasks = stmt
1684 .query_map([], parse_task_row)?
1685 .filter_map(|r| r.ok())
1686 .collect();
1687 Ok(tasks)
1688 })
1689 }
1690
1691 #[allow(dead_code)]
1693 pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1694 self.with_conn(|conn| {
1695 let mut stmt =
1696 conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1697 let tasks = stmt
1698 .query_map(params![status], parse_task_row)?
1699 .filter_map(|r| r.ok())
1700 .collect();
1701 Ok(tasks)
1702 })
1703 }
1704
1705 pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1707 self.with_conn(|conn| {
1708 let tasks = if let Some(aid) = agent_id {
1709 let mut stmt = conn
1710 .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1711 stmt.query_map(params![aid], parse_task_row)?
1712 .filter_map(|r| r.ok())
1713 .collect()
1714 } else {
1715 let mut stmt = conn.prepare(
1716 "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1717 )?;
1718 stmt.query_map([], parse_task_row)?
1719 .filter_map(|r| r.ok())
1720 .collect()
1721 };
1722
1723 Ok(tasks)
1724 })
1725 }
1726}
1727
1728#[allow(clippy::too_many_arguments)]
1733fn create_tree_recursive(
1734 conn: &Connection,
1735 input: &TaskTreeInput,
1736 parent_id: Option<&str>,
1737 prev_sibling_id: Option<&str>,
1738 child_type: Option<&str>,
1739 sibling_type: Option<&str>,
1740 all_ids: &mut Vec<String>,
1741 phase_warnings: &mut Vec<String>,
1742 tag_warnings: &mut Vec<String>,
1743 states_config: &StatesConfig,
1744 phases_config: &PhasesConfig,
1745 tags_config: &TagsConfig,
1746 ids_config: &IdsConfig,
1747) -> Result<String> {
1748 let task_id = if let Some(ref ref_id) = input.ref_id {
1750 let exists: bool = conn.query_row(
1752 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1753 params![ref_id],
1754 |row| row.get(0),
1755 )?;
1756 if !exists {
1757 return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1758 }
1759 ref_id.clone()
1760 } else {
1761 let task_id = input
1763 .id
1764 .clone()
1765 .unwrap_or_else(|| generate_task_id(ids_config));
1766 let now = now_ms();
1767 let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1768 let initial_status = &states_config.initial;
1769
1770 if let Some(ref phase) = input.phase
1772 && let Some(warning) = phases_config.check_phase(phase)?
1773 {
1774 phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1775 }
1776
1777 let needed_tags = input.needed_tags.clone().unwrap_or_default();
1778 let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1779 let tags = input.tags.clone().unwrap_or_default();
1780
1781 for warning in tags_config.validate_tags(&tags)? {
1783 tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1784 }
1785 for warning in tags_config.validate_tags(&needed_tags)? {
1786 tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1787 }
1788 for warning in tags_config.validate_tags(&wanted_tags)? {
1789 tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1790 }
1791
1792 let needed_tags_json = serde_json::to_string(&needed_tags)?;
1793 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1794 let tags_json = serde_json::to_string(&tags)?;
1795
1796 conn.execute(
1797 "INSERT INTO tasks (
1798 id, title, description, status, phase, priority,
1799 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1800 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1801 params![
1802 &task_id,
1803 &input.title,
1804 &input.description,
1805 initial_status,
1806 &input.phase,
1807 priority.to_string(),
1808 needed_tags_json,
1809 wanted_tags_json,
1810 tags_json,
1811 input.points,
1812 input.time_estimate_ms,
1813 now,
1814 now,
1815 ],
1816 )?;
1817
1818 record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1820
1821 sync_task_tags(conn, &task_id, &tags)?;
1823 sync_needed_tags(conn, &task_id, &needed_tags)?;
1824 sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1825
1826 task_id
1827 };
1828
1829 if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1831 Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1832 }
1833
1834 if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1836 Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1837 }
1838
1839 all_ids.push(task_id.clone());
1840
1841 let mut prev_child_id: Option<String> = None;
1843 for child in input.children.iter() {
1844 let child_id = create_tree_recursive(
1845 conn,
1846 child,
1847 Some(&task_id),
1848 prev_child_id.as_deref(),
1849 child_type,
1850 sibling_type,
1851 all_ids,
1852 phase_warnings,
1853 tag_warnings,
1854 states_config,
1855 phases_config,
1856 tags_config,
1857 ids_config,
1858 )?;
1859 prev_child_id = Some(child_id);
1860 }
1861
1862 Ok(task_id)
1863}