1use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6 AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::types::{
9 PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
10 parse_priority,
11};
12use anyhow::{Result, anyhow};
13use petname::{Generator, Petnames};
14use rusqlite::{Connection, Row, params};
15
16fn generate_task_id(ids_config: &IdsConfig) -> String {
19 let words = ids_config.task_id_words;
20 let case = ids_config.id_case;
21
22 let base = Petnames::large()
24 .generate_one(words, "-")
25 .unwrap_or_else(|| format!("task-{}", super::now_ms()));
26
27 case.convert(&base)
29}
30
31fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
34 let field = match sort_by {
35 Some("priority") => "CAST(t.priority AS INTEGER)",
36 Some("created_at") => "t.created_at",
37 Some("updated_at") => "t.updated_at",
38 _ => "t.created_at", };
40
41 let order = match sort_order {
42 Some("asc") => "ASC",
43 Some("desc") => "DESC",
44 _ => {
45 "DESC"
47 }
48 };
49
50 format!("{} {}", field, order)
51}
52
53fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
60 conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
61 for tag in tags {
62 conn.execute(
63 "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
64 params![task_id, tag],
65 )?;
66 }
67 Ok(())
68}
69
70fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
72 conn.execute(
73 "DELETE FROM task_needed_tags WHERE task_id = ?1",
74 params![task_id],
75 )?;
76 for tag in tags {
77 conn.execute(
78 "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
79 params![task_id, tag],
80 )?;
81 }
82 Ok(())
83}
84
85fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
87 conn.execute(
88 "DELETE FROM task_wanted_tags WHERE task_id = ?1",
89 params![task_id],
90 )?;
91 for tag in tags {
92 conn.execute(
93 "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
94 params![task_id, tag],
95 )?;
96 }
97 Ok(())
98}
99
100pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
101 let id: String = row.get("id")?;
102 let title: String = row.get("title")?;
103 let description: Option<String> = row.get("description")?;
104 let status: String = row.get("status")?;
105 let phase: Option<String> = row.get("phase")?;
106 let priority: String = row.get("priority")?;
107 let worker_id: Option<String> = row.get("worker_id")?;
108 let claimed_at: Option<i64> = row.get("claimed_at")?;
109
110 let needed_tags_json: Option<String> = row.get("needed_tags")?;
111 let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
112 let tags_json: Option<String> = row.get("tags")?;
113
114 let points: Option<i32> = row.get("points")?;
115 let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
116 let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
117 let started_at: Option<i64> = row.get("started_at")?;
118 let completed_at: Option<i64> = row.get("completed_at")?;
119
120 let current_thought: Option<String> = row.get("current_thought")?;
121
122 let cost_usd: f64 = row.get("cost_usd")?;
123 let metric_0: i64 = row.get("metric_0")?;
124 let metric_1: i64 = row.get("metric_1")?;
125 let metric_2: i64 = row.get("metric_2")?;
126 let metric_3: i64 = row.get("metric_3")?;
127 let metric_4: i64 = row.get("metric_4")?;
128 let metric_5: i64 = row.get("metric_5")?;
129 let metric_6: i64 = row.get("metric_6")?;
130 let metric_7: i64 = row.get("metric_7")?;
131
132 let created_at: i64 = row.get("created_at")?;
133 let updated_at: i64 = row.get("updated_at")?;
134
135 Ok(Task {
136 id,
137 title,
138 description,
139 status,
140 phase,
141 priority: parse_priority(&priority),
142 worker_id,
143 claimed_at,
144 needed_tags: needed_tags_json
145 .map(|s| serde_json::from_str(&s).unwrap_or_default())
146 .unwrap_or_default(),
147 wanted_tags: wanted_tags_json
148 .map(|s| serde_json::from_str(&s).unwrap_or_default())
149 .unwrap_or_default(),
150 tags: tags_json
151 .map(|s| serde_json::from_str(&s).unwrap_or_default())
152 .unwrap_or_default(),
153 points,
154 time_estimate_ms,
155 time_actual_ms,
156 started_at,
157 completed_at,
158 current_thought,
159 cost_usd,
160 metrics: [
161 metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
162 ],
163 created_at,
164 updated_at,
165 })
166}
167
168fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
170 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
171
172 let result = stmt.query_row(params![task_id], parse_task_row);
173
174 match result {
175 Ok(task) => Ok(Some(task)),
176 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
177 Err(e) => Err(e.into()),
178 }
179}
180
181fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
183 let mut stmt = conn.prepare(
184 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow
185 FROM workers WHERE id = ?1",
186 )?;
187
188 let result = stmt.query_row(params![worker_id], |row| {
189 let id: String = row.get(0)?;
190 let tags_json: String = row.get(1)?;
191 let max_claims: i32 = row.get(2)?;
192 let registered_at: i64 = row.get(3)?;
193 let last_heartbeat: i64 = row.get(4)?;
194 let last_status: Option<String> = row.get(5)?;
195 let last_phase: Option<String> = row.get(6)?;
196 let workflow: Option<String> = row.get(7)?;
197
198 Ok((
199 id,
200 tags_json,
201 max_claims,
202 registered_at,
203 last_heartbeat,
204 last_status,
205 last_phase,
206 workflow,
207 ))
208 });
209
210 match result {
211 Ok((
212 id,
213 tags_json,
214 max_claims,
215 registered_at,
216 last_heartbeat,
217 last_status,
218 last_phase,
219 workflow,
220 )) => {
221 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
222 Ok(Some(Worker {
223 id,
224 tags,
225 max_claims,
226 registered_at,
227 last_heartbeat,
228 last_status,
229 last_phase,
230 workflow,
231 }))
232 }
233 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
234 Err(e) => Err(e.into()),
235 }
236}
237
238impl Database {
239 #[allow(clippy::too_many_arguments)]
243 pub fn create_task(
244 &self,
245 id: Option<String>,
246 description: String,
247 parent_id: Option<String>,
248 phase: Option<String>,
249 priority: Option<Priority>,
250 points: Option<i32>,
251 time_estimate_ms: Option<i64>,
252 agent_tags_all: Option<Vec<String>>,
253 agent_tags_any: Option<Vec<String>>,
254 tags: Option<Vec<String>>,
255 states_config: &StatesConfig,
256 ids_config: &IdsConfig,
257 ) -> Result<Task> {
258 let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
259 let now = now_ms();
260 let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
261 let initial_status = &states_config.initial;
262
263 let needed_tags = agent_tags_all.unwrap_or_default();
264 let wanted_tags = agent_tags_any.unwrap_or_default();
265 let tags = tags.unwrap_or_default();
266 let needed_tags_json = serde_json::to_string(&needed_tags)?;
267 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
268 let tags_json = serde_json::to_string(&tags)?;
269
270 self.with_conn_mut(|conn| {
271 let tx = conn.transaction()?;
272
273 tx.execute(
274 "INSERT INTO tasks (
275 id, title, description, status, phase, priority,
276 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
277 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
278 params![
279 &task_id,
280 &description, &description, initial_status,
283 &phase,
284 priority.to_string(),
285 needed_tags_json,
286 wanted_tags_json,
287 tags_json,
288 points,
289 time_estimate_ms,
290 now,
291 now,
292 ],
293 )?;
294
295 sync_task_tags(&tx, &task_id, &tags)?;
297 sync_needed_tags(&tx, &task_id, &needed_tags)?;
298 sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
299
300 if let Some(ref pid) = parent_id {
302 Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
303 }
304
305 record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
307
308 tx.commit()?;
309
310 Ok(Task {
311 id: task_id,
312 title: description.clone(),
313 description: Some(description),
314 status: initial_status.clone(),
315 phase,
316 priority,
317 worker_id: None,
318 claimed_at: None,
319 needed_tags,
320 wanted_tags,
321 tags,
322 points,
323 time_estimate_ms,
324 time_actual_ms: None,
325 started_at: None,
326 completed_at: None,
327 current_thought: None,
328 cost_usd: 0.0,
329 metrics: [0; 8],
330 created_at: now,
331 updated_at: now,
332 })
333 })
334 }
335
336 pub fn create_task_simple(
339 &self,
340 description: impl Into<String>,
341 states_config: &StatesConfig,
342 ids_config: &IdsConfig,
343 ) -> Result<Task> {
344 self.create_task(
345 None,
346 description.into(),
347 None,
348 None,
349 None,
350 None,
351 None,
352 None,
353 None,
354 None,
355 states_config,
356 ids_config,
357 )
358 }
359
360 pub fn create_task_tree(
364 &self,
365 input: TaskTreeInput,
366 parent_id: Option<String>,
367 child_type: Option<String>,
368 sibling_type: Option<String>,
369 states_config: &StatesConfig,
370 phases_config: &PhasesConfig,
371 tags_config: &TagsConfig,
372 ids_config: &IdsConfig,
373 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
374 let mut all_ids = Vec::new();
375 let mut phase_warnings = Vec::new();
376 let mut tag_warnings = Vec::new();
377 let child_type = child_type.or_else(|| Some("contains".to_string()));
379
380 self.with_conn_mut(|conn| {
381 let tx = conn.transaction()?;
382 let root_id = create_tree_recursive(
383 &tx,
384 &input,
385 parent_id.as_deref(),
386 None, child_type.as_deref(),
388 sibling_type.as_deref(),
389 &mut all_ids,
390 &mut phase_warnings,
391 &mut tag_warnings,
392 states_config,
393 phases_config,
394 tags_config,
395 ids_config,
396 )?;
397 tx.commit()?;
398 Ok((root_id, all_ids, phase_warnings, tag_warnings))
399 })
400 }
401
402 pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
404 self.with_conn(|conn| {
405 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
406
407 let result = stmt.query_row(params![task_id], parse_task_row);
408
409 match result {
410 Ok(task) => Ok(Some(task)),
411 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
412 Err(e) => Err(e.into()),
413 }
414 })
415 }
416
417 pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
419 let task = self.get_task(task_id)?;
420 match task {
421 None => Ok(None),
422 Some(task) => {
423 let children = self.get_children_recursive(&task.id)?;
424 Ok(Some(TaskTree { task, children }))
425 }
426 }
427 }
428
429 fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
431 let children = self.get_children(parent_id)?;
432 let mut result = Vec::new();
433
434 for child in children {
435 let child_children = self.get_children_recursive(&child.id)?;
436 result.push(TaskTree {
437 task: child,
438 children: child_children,
439 });
440 }
441
442 Ok(result)
443 }
444
445 pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
447 self.with_conn(|conn| {
448 let mut stmt = conn.prepare(
449 "SELECT t.* FROM tasks t
450 INNER JOIN dependencies d ON t.id = d.to_task_id
451 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
452 ORDER BY t.created_at",
453 )?;
454
455 let tasks = stmt
456 .query_map(params![parent_id], parse_task_row)?
457 .filter_map(|r| r.ok())
458 .collect();
459
460 Ok(tasks)
461 })
462 }
463
464 #[allow(clippy::too_many_arguments)]
466 pub fn update_task(
467 &self,
468 task_id: &str,
469 title: Option<String>,
470 description: Option<Option<String>>,
471 status: Option<String>,
472 priority: Option<Priority>,
473 points: Option<Option<i32>>,
474 tags: Option<Vec<String>>,
475 states_config: &StatesConfig,
476 ) -> Result<Task> {
477 let now = now_ms();
478
479 self.with_conn(|conn| {
480 let task =
481 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
482
483 let new_title = title.unwrap_or(task.title.clone());
484 let new_description = description.unwrap_or(task.description.clone());
485 let new_status = status.unwrap_or(task.status.clone());
486 let new_priority = priority.unwrap_or(task.priority);
487 let new_points = points.unwrap_or(task.points);
488 let new_tags = tags.unwrap_or(task.tags.clone());
489
490 if !states_config.is_valid_state(&new_status) {
492 return Err(anyhow!(
493 "Invalid state '{}'. Valid states: {:?}",
494 new_status,
495 states_config.state_names()
496 ));
497 }
498
499 if task.status != new_status
501 && !states_config.is_valid_transition(&task.status, &new_status)
502 {
503 let exits = states_config.get_exits(&task.status);
504 return Err(anyhow!(
505 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
506 task.status,
507 new_status,
508 exits
509 ));
510 }
511
512 let started_at =
515 if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
516 Some(now)
517 } else {
518 task.started_at
519 };
520
521 let completed_at = if new_status == "completed" {
523 Some(now)
524 } else {
525 task.completed_at
526 };
527
528 if task.status != new_status {
530 record_state_transition(
531 conn,
532 task_id,
533 &new_status,
534 task.worker_id.as_deref(),
535 None,
536 states_config,
537 )?;
538 }
539
540 conn.execute(
541 "UPDATE tasks SET
542 title = ?1, description = ?2, status = ?3, priority = ?4,
543 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
544 tags = ?9
545 WHERE id = ?10",
546 params![
547 new_title,
548 new_description,
549 new_status,
550 new_priority.to_string(),
551 new_points,
552 started_at,
553 completed_at,
554 now,
555 serde_json::to_string(&new_tags)?,
556 task_id,
557 ],
558 )?;
559
560 Ok(Task {
561 id: task_id.to_string(),
562 title: new_title,
563 description: new_description,
564 status: new_status,
565 priority: new_priority,
566 points: new_points,
567 tags: new_tags,
568 started_at,
569 completed_at,
570 updated_at: now,
571 ..task
572 })
573 })
574 }
575
576 #[allow(clippy::too_many_arguments)]
588 pub fn update_task_unified(
589 &self,
590 task_id: &str,
591 agent_id: &str,
592 assignee: Option<&str>,
593 title: Option<String>,
594 description: Option<Option<String>>,
595 status: Option<String>,
596 phase: Option<String>,
597 priority: Option<Priority>,
598 points: Option<Option<i32>>,
599 tags: Option<Vec<String>>,
600 needed_tags: Option<Vec<String>>,
601 wanted_tags: Option<Vec<String>>,
602 time_estimate_ms: Option<i64>,
603 reason: Option<String>,
604 force: bool,
605 states_config: &StatesConfig,
606 deps_config: &DependenciesConfig,
607 auto_advance: &AutoAdvanceConfig,
608 ) -> Result<(Task, Vec<String>, Vec<String>)> {
609 let now = now_ms();
610
611 self.with_conn_mut(|conn| {
612 let tx = conn.transaction()?;
613
614 let task =
615 get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
616
617 if let Some(ref current_owner) = task.worker_id
619 && current_owner != agent_id && !force {
620 return Err(anyhow!(
621 "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
622 current_owner
623 ));
624 }
625
626 let new_title = title.unwrap_or(task.title.clone());
627 let new_description = description.unwrap_or(task.description.clone());
628 let new_status = if assignee.is_some() && status.is_none() {
630 "assigned".to_string()
631 } else {
632 status.unwrap_or(task.status.clone())
633 };
634 let new_priority = priority.unwrap_or(task.priority);
635 let new_points = points.unwrap_or(task.points);
636 let new_tags = tags.unwrap_or(task.tags.clone());
637 let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
638 let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
639 let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
640 let new_phase = phase.or(task.phase.clone());
641
642 if !states_config.is_valid_state(&new_status) {
644 return Err(anyhow!(
645 "Invalid state '{}'. Valid states: {:?}",
646 new_status,
647 states_config.state_names()
648 ));
649 }
650
651 if task.status != new_status
653 && !states_config.is_valid_transition(&task.status, &new_status) {
654 let exits = states_config.get_exits(&task.status);
655 return Err(anyhow!(
656 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
657 task.status,
658 new_status,
659 exits
660 ));
661 }
662
663 let new_is_timed = states_config.is_timed_state(&new_status);
665 let new_is_terminal = states_config.is_terminal_state(&new_status);
666 let current_owner = task.worker_id.as_deref();
667 let is_owned_by_agent = current_owner == Some(agent_id);
668 let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
669
670 let mut new_owner: Option<String> = task.worker_id.clone();
671 let mut new_claimed_at: Option<i64> = task.claimed_at;
672
673 if let Some(target_agent) = assignee {
676 if is_owned_by_other && !force {
678 return Err(anyhow!(
679 "Task is already claimed by agent '{}'. Use force=true to reassign.",
680 current_owner.unwrap()
681 ));
682 }
683
684 let target = get_worker_internal(&tx, target_agent)?
686 .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
687
688 if !task.needed_tags.is_empty() {
690 for needed in &task.needed_tags {
691 if !target.tags.contains(needed) {
692 return Err(anyhow!(
693 "Assignee '{}' missing required tag: {}",
694 target_agent,
695 needed
696 ));
697 }
698 }
699 }
700
701 if !task.wanted_tags.is_empty() {
702 let has_any = task
703 .wanted_tags
704 .iter()
705 .any(|wanted| target.tags.contains(wanted));
706 if !has_any {
707 return Err(anyhow!(
708 "Assignee '{}' has none of the wanted tags: {:?}",
709 target_agent,
710 task.wanted_tags
711 ));
712 }
713 }
714
715 new_owner = Some(target_agent.to_string());
717 new_claimed_at = Some(now);
718 }
719
720 if new_is_timed && !is_owned_by_agent {
723 if is_owned_by_other && !force {
725 return Err(anyhow!(
726 "Task is already claimed by agent '{}'",
727 current_owner.unwrap()
728 ));
729 }
730
731 if !force {
733 let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
734 &tx,
735 task_id,
736 states_config,
737 deps_config,
738 )?;
739 if !unsatisfied_blockers.is_empty() {
740 return Err(anyhow!(
741 "Task has unsatisfied dependencies: [{}]",
742 unsatisfied_blockers.join(", ")
743 ));
744 }
745 }
746
747 let agent = get_worker_internal(&tx, agent_id)?
749 .ok_or_else(|| anyhow!("Agent not found"))?;
750
751 if !task.needed_tags.is_empty() {
753 for needed in &task.needed_tags {
754 if !agent.tags.contains(needed) {
755 return Err(anyhow!("Agent missing required tag: {}", needed));
756 }
757 }
758 }
759
760 if !task.wanted_tags.is_empty() {
762 let has_any = task
763 .wanted_tags
764 .iter()
765 .any(|wanted| agent.tags.contains(wanted));
766 if !has_any {
767 return Err(anyhow!("Agent has none of the wanted tags"));
768 }
769 }
770
771 new_owner = Some(agent_id.to_string());
773 new_claimed_at = Some(now);
774
775 tx.execute(
777 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
778 params![now, agent_id],
779 )?;
780 }
781
782 if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
784 if is_owned_by_other && !force {
786 return Err(anyhow!("Task is not owned by this agent"));
787 }
788
789 new_owner = None;
791 new_claimed_at = None;
792 }
793
794 if new_is_terminal {
796 if let Some(ref current_owner) = task.worker_id
798 && current_owner != agent_id && !force {
799 return Err(anyhow!("Task is not owned by this agent"));
800 }
801
802 let incomplete_children: i32 = tx.query_row(
804 "SELECT COUNT(*) FROM dependencies d
805 INNER JOIN tasks child ON d.to_task_id = child.id
806 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
807 AND child.status IN (SELECT value FROM json_each(?2))",
808 params![
809 task_id,
810 serde_json::to_string(&states_config.blocking_states)?
811 ],
812 |row| row.get(0),
813 )?;
814
815 if incomplete_children > 0 {
816 return Err(anyhow!(
817 "Cannot complete task: {} child task(s) are not complete",
818 incomplete_children
819 ));
820 }
821
822 new_owner = None;
824 new_claimed_at = None;
825
826 tx.execute(
828 "DELETE FROM file_locks WHERE task_id = ?1",
829 params![task_id],
830 )?;
831 }
832
833 let started_at =
835 if task.started_at.is_none() && new_is_timed {
836 Some(now)
837 } else {
838 task.started_at
839 };
840
841 let completed_at = if new_status == "completed" {
843 Some(now)
844 } else {
845 task.completed_at
846 };
847
848 let status_changed = task.status != new_status;
850 if status_changed {
851 record_state_transition(
852 &tx,
853 task_id,
854 &new_status,
855 new_owner.as_deref(),
856 reason.as_deref(),
857 states_config,
858 )?;
859 }
860
861 let phase_changed = task.phase != new_phase;
863 if phase_changed {
864 super::state_transitions::record_phase_transition(
865 &tx,
866 task_id,
867 new_phase.as_deref().unwrap_or(""),
868 Some(agent_id),
869 reason.as_deref(),
870 )?;
871 }
872
873 tx.execute(
874 "UPDATE tasks SET
875 title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
876 points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
877 tags = ?10, worker_id = ?11, claimed_at = ?12,
878 needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
879 WHERE id = ?16",
880 params![
881 new_title,
882 new_description,
883 new_status,
884 new_phase,
885 new_priority.to_string(),
886 new_points,
887 started_at,
888 completed_at,
889 now,
890 serde_json::to_string(&new_tags)?,
891 new_owner,
892 new_claimed_at,
893 serde_json::to_string(&new_needed_tags)?,
894 serde_json::to_string(&new_wanted_tags)?,
895 new_time_estimate_ms,
896 task_id,
897 ],
898 )?;
899
900 if new_tags != task.tags {
902 sync_task_tags(&tx, task_id, &new_tags)?;
903 }
904 if new_needed_tags != task.needed_tags {
905 sync_needed_tags(&tx, task_id, &new_needed_tags)?;
906 }
907 if new_wanted_tags != task.wanted_tags {
908 sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
909 }
910
911 let (unblocked, auto_advanced) = if status_changed {
913 let was_blocking = states_config.is_blocking_state(&task.status);
914 let is_blocking = states_config.is_blocking_state(&new_status);
915
916 if was_blocking && !is_blocking {
917 super::deps::propagate_unblock_effects(
918 &tx,
919 task_id,
920 Some(agent_id),
921 states_config,
922 deps_config,
923 auto_advance,
924 )?
925 } else {
926 (vec![], vec![])
927 }
928 } else {
929 (vec![], vec![])
930 };
931
932 tx.commit()?;
933
934 Ok((Task {
935 id: task_id.to_string(),
936 title: new_title,
937 description: new_description,
938 status: new_status,
939 phase: new_phase,
940 priority: new_priority,
941 points: new_points,
942 tags: new_tags,
943 needed_tags: new_needed_tags,
944 wanted_tags: new_wanted_tags,
945 time_estimate_ms: new_time_estimate_ms,
946 started_at,
947 completed_at,
948 updated_at: now,
949 worker_id: new_owner,
950 claimed_at: new_claimed_at,
951 ..task
952 }, unblocked, auto_advanced))
953 })
954 }
955
956 pub fn delete_task(
964 &self,
965 task_id: &str,
966 worker_id: &str,
967 cascade: bool,
968 reason: Option<String>,
969 obliterate: bool,
970 force: bool,
971 ) -> Result<()> {
972 let now = now_ms();
973
974 self.with_conn_mut(|conn| {
975 let tx = conn.transaction()?;
976
977 let task = get_task_internal(&tx, task_id)?
979 .ok_or_else(|| anyhow!("Task not found"))?;
980
981 if let Some(ref owner) = task.worker_id
983 && owner != worker_id && !force {
984 return Err(anyhow!(
985 "Task is claimed by worker '{}'. Use force=true to override.",
986 owner
987 ));
988 }
989
990 if obliterate {
991 if cascade {
993 tx.execute(
996 "WITH RECURSIVE descendants AS (
997 SELECT ?1 AS id
998 UNION ALL
999 SELECT dep.to_task_id FROM dependencies dep
1000 INNER JOIN descendants d ON dep.from_task_id = d.id
1001 WHERE dep.dep_type = 'contains'
1002 )
1003 DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1004 params![task_id],
1005 )?;
1006 } else {
1007 let child_count: i32 = tx.query_row(
1009 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1010 params![task_id],
1011 |row| row.get(0),
1012 )?;
1013
1014 if child_count > 0 {
1015 return Err(anyhow!("Task has children; use cascade=true to delete"));
1016 }
1017
1018 tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1019 }
1020 } else {
1021 if cascade {
1023 tx.execute(
1025 "WITH RECURSIVE descendants AS (
1026 SELECT ?1 AS id
1027 UNION ALL
1028 SELECT dep.to_task_id FROM dependencies dep
1029 INNER JOIN descendants d ON dep.from_task_id = d.id
1030 WHERE dep.dep_type = 'contains'
1031 )
1032 UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1033 WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1034 params![task_id, now, worker_id, reason],
1035 )?;
1036 } else {
1037 let child_count: i32 = tx.query_row(
1039 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1040 params![task_id],
1041 |row| row.get(0),
1042 )?;
1043
1044 if child_count > 0 {
1045 return Err(anyhow!("Task has children; use cascade=true to delete"));
1046 }
1047
1048 tx.execute(
1049 "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1050 params![now, worker_id, reason, task_id],
1051 )?;
1052 }
1053 }
1054
1055 tx.commit()?;
1056 Ok(())
1057 })
1058 }
1059
1060 pub fn list_tasks(
1063 &self,
1064 status: Option<&str>,
1065 phase: Option<&str>,
1066 owner: Option<&str>,
1067 parent_id: Option<Option<&str>>,
1068 limit: Option<i32>,
1069 sort_by: Option<&str>,
1070 sort_order: Option<&str>,
1071 ) -> Result<Vec<Task>> {
1072 self.with_conn(|conn| {
1073 let mut sql = String::from(
1074 "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1075 );
1076 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1077
1078 if let Some(s) = status {
1079 sql.push_str(" AND t.status = ?");
1080 params_vec.push(Box::new(s.to_string()));
1081 }
1082
1083 if let Some(p) = phase {
1084 sql.push_str(" AND t.phase = ?");
1085 params_vec.push(Box::new(p.to_string()));
1086 }
1087
1088 if let Some(o) = owner {
1089 sql.push_str(" AND t.worker_id = ?");
1090 params_vec.push(Box::new(o.to_string()));
1091 }
1092
1093 if let Some(p) = parent_id {
1095 match p {
1096 Some(pid) => {
1097 sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1098 params_vec.push(Box::new(pid.to_string()));
1099 }
1100 None => {
1101 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1103 }
1104 }
1105 }
1106
1107 let order_clause = build_order_clause(sort_by, sort_order);
1109 sql.push_str(&format!(" ORDER BY {}", order_clause));
1110
1111 if let Some(l) = limit {
1112 sql.push_str(&format!(" LIMIT {}", l));
1113 }
1114
1115 let params_refs: Vec<&dyn rusqlite::ToSql> =
1116 params_vec.iter().map(|b| b.as_ref()).collect();
1117
1118 let mut stmt = conn.prepare(&sql)?;
1119 let tasks = stmt
1120 .query_map(params_refs.as_slice(), parse_task_row)?
1121 .filter_map(|r| r.ok())
1122 .collect();
1123
1124 Ok(tasks)
1125 })
1126 }
1127
1128 pub fn set_thought(
1130 &self,
1131 agent_id: &str,
1132 thought: Option<String>,
1133 task_ids: Option<Vec<String>>,
1134 ) -> Result<i32> {
1135 let now = now_ms();
1136
1137 self.with_conn(|conn| {
1138 let updated = if let Some(ids) = task_ids {
1139 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1140 let sql = format!(
1141 "UPDATE tasks SET current_thought = ?, updated_at = ?
1142 WHERE worker_id = ? AND id IN ({})",
1143 placeholders.join(", ")
1144 );
1145
1146 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1147 params_vec.push(Box::new(thought.clone()));
1148 params_vec.push(Box::new(now));
1149 params_vec.push(Box::new(agent_id.to_string()));
1150 for id in &ids {
1151 params_vec.push(Box::new(id.clone()));
1152 }
1153
1154 let params_refs: Vec<&dyn rusqlite::ToSql> =
1155 params_vec.iter().map(|b| b.as_ref()).collect();
1156 conn.execute(&sql, params_refs.as_slice())?
1157 } else {
1158 conn.execute(
1159 "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1160 params![thought, now, agent_id],
1161 )?
1162 };
1163
1164 Ok(updated as i32)
1165 })
1166 }
1167
1168 pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1170 let now = now_ms();
1171
1172 self.with_conn(|conn| {
1173 conn.execute(
1174 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1175 WHERE id = ?3",
1176 params![duration_ms, now, task_id],
1177 )?;
1178
1179 let total: i64 = conn.query_row(
1180 "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1181 params![task_id],
1182 |row| row.get(0),
1183 )?;
1184
1185 Ok(total)
1186 })
1187 }
1188
1189 pub fn log_metrics(
1192 &self,
1193 task_id: &str,
1194 cost_usd: Option<f64>,
1195 values: &[i64],
1196 ) -> Result<Task> {
1197 let now = now_ms();
1198
1199 self.with_conn(|conn| {
1200 let task =
1201 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1202
1203 let mut new_metrics = task.metrics;
1205 for (i, &val) in values.iter().take(8).enumerate() {
1206 new_metrics[i] += val;
1207 }
1208
1209 let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1210
1211 conn.execute(
1212 "UPDATE tasks SET
1213 metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1214 metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1215 cost_usd = ?9, updated_at = ?10
1216 WHERE id = ?11",
1217 params![
1218 new_metrics[0],
1219 new_metrics[1],
1220 new_metrics[2],
1221 new_metrics[3],
1222 new_metrics[4],
1223 new_metrics[5],
1224 new_metrics[6],
1225 new_metrics[7],
1226 new_cost_usd,
1227 now,
1228 task_id,
1229 ],
1230 )?;
1231
1232 Ok(Task {
1233 cost_usd: new_cost_usd,
1234 metrics: new_metrics,
1235 updated_at: now,
1236 ..task
1237 })
1238 })
1239 }
1240
1241 pub fn claim_task(
1244 &self,
1245 task_id: &str,
1246 agent_id: &str,
1247 states_config: &StatesConfig,
1248 ) -> Result<Task> {
1249 let now = now_ms();
1250
1251 let claim_status = states_config
1253 .definitions
1254 .iter()
1255 .find(|(_, def)| def.timed)
1256 .map(|(name, _)| name.as_str())
1257 .unwrap_or("working");
1258
1259 self.with_conn(|conn| {
1260 let task =
1262 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1263
1264 if task.worker_id.is_some() {
1266 return Err(anyhow!("Task is already claimed"));
1267 }
1268
1269 if !states_config.is_valid_transition(&task.status, claim_status) {
1271 let exits = states_config.get_exits(&task.status);
1272 return Err(anyhow!(
1273 "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1274 task.status,
1275 exits
1276 ));
1277 }
1278
1279 let agent =
1281 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1282
1283 if !task.needed_tags.is_empty() {
1285 for needed in &task.needed_tags {
1286 if !agent.tags.contains(needed) {
1287 return Err(anyhow!("Agent missing required tag: {}", needed));
1288 }
1289 }
1290 }
1291
1292 if !task.wanted_tags.is_empty() {
1294 let has_any = task
1295 .wanted_tags
1296 .iter()
1297 .any(|wanted| agent.tags.contains(wanted));
1298 if !has_any {
1299 return Err(anyhow!("Agent has none of the wanted tags"));
1300 }
1301 }
1302
1303 conn.execute(
1304 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1305 WHERE id = ?6",
1306 params![agent_id, now, claim_status, now, now, task_id,],
1307 )?;
1308
1309 record_state_transition(
1311 conn,
1312 task_id,
1313 claim_status,
1314 Some(agent_id),
1315 None,
1316 states_config,
1317 )?;
1318
1319 conn.execute(
1321 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1322 params![now, agent_id],
1323 )?;
1324
1325 Ok(Task {
1326 worker_id: Some(agent_id.to_string()),
1327 claimed_at: Some(now),
1328 status: claim_status.to_string(),
1329 started_at: Some(now),
1330 updated_at: now,
1331 ..task
1332 })
1333 })
1334 }
1335
1336 pub fn release_task(
1338 &self,
1339 task_id: &str,
1340 agent_id: &str,
1341 states_config: &StatesConfig,
1342 ) -> Result<()> {
1343 let now = now_ms();
1344 let release_status = &states_config.initial;
1345
1346 self.with_conn(|conn| {
1347 let task =
1348 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1349
1350 if task.worker_id.as_deref() != Some(agent_id) {
1351 return Err(anyhow!("Task is not owned by this agent"));
1352 }
1353
1354 record_state_transition(
1356 conn,
1357 task_id,
1358 release_status,
1359 Some(agent_id),
1360 None,
1361 states_config,
1362 )?;
1363
1364 conn.execute(
1365 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1366 WHERE id = ?3",
1367 params![release_status, now, task_id],
1368 )?;
1369
1370 Ok(())
1371 })
1372 }
1373
1374 pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1376 let now = now_ms();
1377 let release_status = &states_config.initial;
1378
1379 self.with_conn(|conn| {
1380 let task =
1381 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1382
1383 record_state_transition(
1385 conn,
1386 task_id,
1387 release_status,
1388 task.worker_id.as_deref(),
1389 None,
1390 states_config,
1391 )?;
1392
1393 conn.execute(
1394 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1395 WHERE id = ?3",
1396 params![release_status, now, task_id],
1397 )?;
1398
1399 Ok(())
1400 })
1401 }
1402
1403 pub fn force_claim_task(
1405 &self,
1406 task_id: &str,
1407 agent_id: &str,
1408 states_config: &StatesConfig,
1409 ) -> Result<Task> {
1410 let now = now_ms();
1411
1412 let claim_status = states_config
1414 .definitions
1415 .iter()
1416 .find(|(_, def)| def.timed)
1417 .map(|(name, _)| name.as_str())
1418 .unwrap_or("working");
1419
1420 self.with_conn(|conn| {
1421 let task =
1423 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1424
1425 let agent =
1427 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1428
1429 if !task.needed_tags.is_empty() {
1431 for needed in &task.needed_tags {
1432 if !agent.tags.contains(needed) {
1433 return Err(anyhow!("Agent missing required tag: {}", needed));
1434 }
1435 }
1436 }
1437
1438 if !task.wanted_tags.is_empty() {
1440 let has_any = task
1441 .wanted_tags
1442 .iter()
1443 .any(|wanted| agent.tags.contains(wanted));
1444 if !has_any {
1445 return Err(anyhow!("Agent has none of the wanted tags"));
1446 }
1447 }
1448
1449 conn.execute(
1450 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1451 WHERE id = ?6",
1452 params![agent_id, now, claim_status, now, now, task_id,],
1453 )?;
1454
1455 record_state_transition(
1457 conn,
1458 task_id,
1459 claim_status,
1460 Some(agent_id),
1461 None,
1462 states_config,
1463 )?;
1464
1465 conn.execute(
1467 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1468 params![now, agent_id],
1469 )?;
1470
1471 Ok(Task {
1472 worker_id: Some(agent_id.to_string()),
1473 claimed_at: Some(now),
1474 status: claim_status.to_string(),
1475 started_at: task.started_at.or(Some(now)),
1476 updated_at: now,
1477 ..task
1478 })
1479 })
1480 }
1481
1482 pub fn release_task_with_state(
1484 &self,
1485 task_id: &str,
1486 agent_id: &str,
1487 state: &str,
1488 states_config: &StatesConfig,
1489 ) -> Result<()> {
1490 let now = now_ms();
1491
1492 self.with_conn(|conn| {
1493 let task =
1494 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1495
1496 if task.worker_id.as_deref() != Some(agent_id) {
1497 return Err(anyhow!("Task is not owned by this agent"));
1498 }
1499
1500 if !states_config.is_valid_state(state) {
1502 return Err(anyhow!(
1503 "Invalid state '{}'. Valid states: {:?}",
1504 state,
1505 states_config.state_names()
1506 ));
1507 }
1508
1509 if !states_config.is_valid_transition(&task.status, state) {
1511 let exits = states_config.get_exits(&task.status);
1512 return Err(anyhow!(
1513 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1514 task.status,
1515 state,
1516 exits
1517 ));
1518 }
1519
1520 let completed_at = if state == "completed" {
1522 Some(now)
1523 } else {
1524 None
1525 };
1526
1527 record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1529
1530 conn.execute(
1531 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1532 WHERE id = ?4",
1533 params![state, completed_at, now, task_id],
1534 )?;
1535
1536 Ok(())
1537 })
1538 }
1539
1540 pub fn force_release_stale(
1542 &self,
1543 timeout_seconds: i64,
1544 states_config: &StatesConfig,
1545 ) -> Result<i32> {
1546 let now = now_ms();
1547 let cutoff = now - (timeout_seconds * 1000);
1548 let release_status = &states_config.initial;
1549
1550 self.with_conn(|conn| {
1551 let updated = conn.execute(
1552 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1553 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1554 params![release_status, now, cutoff],
1555 )?;
1556
1557 Ok(updated as i32)
1558 })
1559 }
1560
1561 pub fn complete_task(
1565 &self,
1566 task_id: &str,
1567 agent_id: &str,
1568 states_config: &StatesConfig,
1569 ) -> Result<Task> {
1570 let now = now_ms();
1571
1572 let complete_status = if states_config.definitions.contains_key("completed") {
1574 "completed"
1575 } else {
1576 states_config
1578 .definitions
1579 .iter()
1580 .find(|(_, def)| def.exits.is_empty())
1581 .map(|(name, _)| name.as_str())
1582 .unwrap_or("completed")
1583 };
1584
1585 self.with_conn_mut(|conn| {
1586 let tx = conn.transaction()?;
1587
1588 let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1590 let task = stmt
1591 .query_row(params![task_id], parse_task_row)
1592 .map_err(|_| anyhow!("Task not found"))?;
1593 drop(stmt);
1594
1595 if task.worker_id.as_deref() != Some(agent_id) {
1597 return Err(anyhow!("Task is not owned by this agent"));
1598 }
1599
1600 let incomplete_children: i32 = tx.query_row(
1602 "SELECT COUNT(*) FROM dependencies d
1603 INNER JOIN tasks child ON d.to_task_id = child.id
1604 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1605 AND child.status IN (SELECT value FROM json_each(?2))",
1606 params![
1607 task_id,
1608 serde_json::to_string(&states_config.blocking_states)?
1609 ],
1610 |row| row.get(0),
1611 )?;
1612
1613 if incomplete_children > 0 {
1614 return Err(anyhow!(
1615 "Cannot complete task: {} child task(s) are not complete",
1616 incomplete_children
1617 ));
1618 }
1619
1620 if !states_config.is_valid_transition(&task.status, complete_status) {
1622 let exits = states_config.get_exits(&task.status);
1623 return Err(anyhow!(
1624 "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1625 task.status,
1626 exits
1627 ));
1628 }
1629
1630 record_state_transition(
1632 &tx,
1633 task_id,
1634 complete_status,
1635 Some(agent_id),
1636 None,
1637 states_config,
1638 )?;
1639
1640 tx.execute(
1642 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1643 worker_id = NULL, claimed_at = NULL
1644 WHERE id = ?4",
1645 params![complete_status, now, now, task_id],
1646 )?;
1647
1648 tx.execute(
1650 "DELETE FROM file_locks WHERE task_id = ?1",
1651 params![task_id],
1652 )?;
1653
1654 tx.execute(
1656 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1657 params![now, agent_id],
1658 )?;
1659
1660 tx.commit()?;
1661
1662 Ok(Task {
1663 status: complete_status.to_string(),
1664 completed_at: Some(now),
1665 updated_at: now,
1666 worker_id: None,
1667 claimed_at: None,
1668 ..task
1669 })
1670 })
1671 }
1672
1673 pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1675 self.with_conn(|conn| {
1676 let mut stmt =
1677 conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1678 let tasks = stmt
1679 .query_map([], parse_task_row)?
1680 .filter_map(|r| r.ok())
1681 .collect();
1682 Ok(tasks)
1683 })
1684 }
1685
1686 #[allow(dead_code)]
1688 pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1689 self.with_conn(|conn| {
1690 let mut stmt =
1691 conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1692 let tasks = stmt
1693 .query_map(params![status], parse_task_row)?
1694 .filter_map(|r| r.ok())
1695 .collect();
1696 Ok(tasks)
1697 })
1698 }
1699
1700 pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1702 self.with_conn(|conn| {
1703 let tasks = if let Some(aid) = agent_id {
1704 let mut stmt = conn
1705 .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1706 stmt.query_map(params![aid], parse_task_row)?
1707 .filter_map(|r| r.ok())
1708 .collect()
1709 } else {
1710 let mut stmt = conn.prepare(
1711 "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1712 )?;
1713 stmt.query_map([], parse_task_row)?
1714 .filter_map(|r| r.ok())
1715 .collect()
1716 };
1717
1718 Ok(tasks)
1719 })
1720 }
1721}
1722
1723#[allow(clippy::too_many_arguments)]
1728fn create_tree_recursive(
1729 conn: &Connection,
1730 input: &TaskTreeInput,
1731 parent_id: Option<&str>,
1732 prev_sibling_id: Option<&str>,
1733 child_type: Option<&str>,
1734 sibling_type: Option<&str>,
1735 all_ids: &mut Vec<String>,
1736 phase_warnings: &mut Vec<String>,
1737 tag_warnings: &mut Vec<String>,
1738 states_config: &StatesConfig,
1739 phases_config: &PhasesConfig,
1740 tags_config: &TagsConfig,
1741 ids_config: &IdsConfig,
1742) -> Result<String> {
1743 let task_id = if let Some(ref ref_id) = input.ref_id {
1745 let exists: bool = conn.query_row(
1747 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1748 params![ref_id],
1749 |row| row.get(0),
1750 )?;
1751 if !exists {
1752 return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1753 }
1754 ref_id.clone()
1755 } else {
1756 let task_id = input
1758 .id
1759 .clone()
1760 .unwrap_or_else(|| generate_task_id(ids_config));
1761 let now = now_ms();
1762 let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1763 let initial_status = &states_config.initial;
1764
1765 if let Some(ref phase) = input.phase {
1767 if let Some(warning) = phases_config.check_phase(phase)? {
1768 phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1769 }
1770 }
1771
1772 let needed_tags = input.needed_tags.clone().unwrap_or_default();
1773 let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1774 let tags = input.tags.clone().unwrap_or_default();
1775
1776 for warning in tags_config.validate_tags(&tags)? {
1778 tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1779 }
1780 for warning in tags_config.validate_tags(&needed_tags)? {
1781 tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1782 }
1783 for warning in tags_config.validate_tags(&wanted_tags)? {
1784 tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1785 }
1786
1787 let needed_tags_json = serde_json::to_string(&needed_tags)?;
1788 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1789 let tags_json = serde_json::to_string(&tags)?;
1790
1791 conn.execute(
1792 "INSERT INTO tasks (
1793 id, title, description, status, phase, priority,
1794 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1795 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1796 params![
1797 &task_id,
1798 &input.title,
1799 &input.description,
1800 initial_status,
1801 &input.phase,
1802 priority.to_string(),
1803 needed_tags_json,
1804 wanted_tags_json,
1805 tags_json,
1806 input.points,
1807 input.time_estimate_ms,
1808 now,
1809 now,
1810 ],
1811 )?;
1812
1813 record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1815
1816 sync_task_tags(conn, &task_id, &tags)?;
1818 sync_needed_tags(conn, &task_id, &needed_tags)?;
1819 sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1820
1821 task_id
1822 };
1823
1824 if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1826 Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1827 }
1828
1829 if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1831 Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1832 }
1833
1834 all_ids.push(task_id.clone());
1835
1836 let mut prev_child_id: Option<String> = None;
1838 for child in input.children.iter() {
1839 let child_id = create_tree_recursive(
1840 conn,
1841 child,
1842 Some(&task_id),
1843 prev_child_id.as_deref(),
1844 child_type,
1845 sibling_type,
1846 all_ids,
1847 phase_warnings,
1848 tag_warnings,
1849 states_config,
1850 phases_config,
1851 tags_config,
1852 ids_config,
1853 )?;
1854 prev_child_id = Some(child_id);
1855 }
1856
1857 Ok(task_id)
1858}