1use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{AutoAdvanceConfig, DependenciesConfig, StatesConfig};
6use crate::types::{
7 PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
8 parse_priority,
9};
10use anyhow::{Result, anyhow};
11use rusqlite::{Connection, Row, params};
12use uuid::Uuid;
13
14fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
17 let field = match sort_by {
18 Some("priority") => "CAST(t.priority AS INTEGER)",
19 Some("created_at") => "t.created_at",
20 Some("updated_at") => "t.updated_at",
21 _ => "t.created_at", };
23
24 let order = match sort_order {
25 Some("asc") => "ASC",
26 Some("desc") => "DESC",
27 _ => {
28 "DESC"
30 }
31 };
32
33 format!("{} {}", field, order)
34}
35
36fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
43 conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
44 for tag in tags {
45 conn.execute(
46 "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
47 params![task_id, tag],
48 )?;
49 }
50 Ok(())
51}
52
53fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
55 conn.execute(
56 "DELETE FROM task_needed_tags WHERE task_id = ?1",
57 params![task_id],
58 )?;
59 for tag in tags {
60 conn.execute(
61 "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
62 params![task_id, tag],
63 )?;
64 }
65 Ok(())
66}
67
68fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
70 conn.execute(
71 "DELETE FROM task_wanted_tags WHERE task_id = ?1",
72 params![task_id],
73 )?;
74 for tag in tags {
75 conn.execute(
76 "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
77 params![task_id, tag],
78 )?;
79 }
80 Ok(())
81}
82
83pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
84 let id: String = row.get("id")?;
85 let title: String = row.get("title")?;
86 let description: Option<String> = row.get("description")?;
87 let status: String = row.get("status")?;
88 let priority: String = row.get("priority")?;
89 let worker_id: Option<String> = row.get("worker_id")?;
90 let claimed_at: Option<i64> = row.get("claimed_at")?;
91
92 let needed_tags_json: Option<String> = row.get("needed_tags")?;
93 let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
94 let tags_json: Option<String> = row.get("tags")?;
95
96 let points: Option<i32> = row.get("points")?;
97 let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
98 let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
99 let started_at: Option<i64> = row.get("started_at")?;
100 let completed_at: Option<i64> = row.get("completed_at")?;
101
102 let current_thought: Option<String> = row.get("current_thought")?;
103
104 let cost_usd: f64 = row.get("cost_usd")?;
105 let metric_0: i64 = row.get("metric_0")?;
106 let metric_1: i64 = row.get("metric_1")?;
107 let metric_2: i64 = row.get("metric_2")?;
108 let metric_3: i64 = row.get("metric_3")?;
109 let metric_4: i64 = row.get("metric_4")?;
110 let metric_5: i64 = row.get("metric_5")?;
111 let metric_6: i64 = row.get("metric_6")?;
112 let metric_7: i64 = row.get("metric_7")?;
113
114 let created_at: i64 = row.get("created_at")?;
115 let updated_at: i64 = row.get("updated_at")?;
116
117 Ok(Task {
118 id,
119 title,
120 description,
121 status,
122 priority: parse_priority(&priority),
123 worker_id,
124 claimed_at,
125 needed_tags: needed_tags_json
126 .map(|s| serde_json::from_str(&s).unwrap_or_default())
127 .unwrap_or_default(),
128 wanted_tags: wanted_tags_json
129 .map(|s| serde_json::from_str(&s).unwrap_or_default())
130 .unwrap_or_default(),
131 tags: tags_json
132 .map(|s| serde_json::from_str(&s).unwrap_or_default())
133 .unwrap_or_default(),
134 points,
135 time_estimate_ms,
136 time_actual_ms,
137 started_at,
138 completed_at,
139 current_thought,
140 cost_usd,
141 metrics: [
142 metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
143 ],
144 created_at,
145 updated_at,
146 })
147}
148
149fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
151 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
152
153 let result = stmt.query_row(params![task_id], parse_task_row);
154
155 match result {
156 Ok(task) => Ok(Some(task)),
157 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
158 Err(e) => Err(e.into()),
159 }
160}
161
162fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
164 let mut stmt = conn.prepare(
165 "SELECT id, tags, max_claims, registered_at, last_heartbeat
166 FROM workers WHERE id = ?1",
167 )?;
168
169 let result = stmt.query_row(params![worker_id], |row| {
170 let id: String = row.get(0)?;
171 let tags_json: String = row.get(1)?;
172 let max_claims: i32 = row.get(2)?;
173 let registered_at: i64 = row.get(3)?;
174 let last_heartbeat: i64 = row.get(4)?;
175
176 Ok((id, tags_json, max_claims, registered_at, last_heartbeat))
177 });
178
179 match result {
180 Ok((id, tags_json, max_claims, registered_at, last_heartbeat)) => {
181 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
182 Ok(Some(Worker {
183 id,
184 tags,
185 max_claims,
186 registered_at,
187 last_heartbeat,
188 }))
189 }
190 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
191 Err(e) => Err(e.into()),
192 }
193}
194
195impl Database {
196 #[allow(clippy::too_many_arguments)]
200 pub fn create_task(
201 &self,
202 id: Option<String>,
203 description: String,
204 parent_id: Option<String>,
205 priority: Option<Priority>,
206 points: Option<i32>,
207 time_estimate_ms: Option<i64>,
208 agent_tags_all: Option<Vec<String>>,
209 agent_tags_any: Option<Vec<String>>,
210 tags: Option<Vec<String>>,
211 states_config: &StatesConfig,
212 ) -> Result<Task> {
213 let task_id = id.unwrap_or_else(|| Uuid::now_v7().to_string());
214 let now = now_ms();
215 let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
216 let initial_status = &states_config.initial;
217
218 let needed_tags = agent_tags_all.unwrap_or_default();
219 let wanted_tags = agent_tags_any.unwrap_or_default();
220 let tags = tags.unwrap_or_default();
221 let needed_tags_json = serde_json::to_string(&needed_tags)?;
222 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
223 let tags_json = serde_json::to_string(&tags)?;
224
225 self.with_conn_mut(|conn| {
226 let tx = conn.transaction()?;
227
228 tx.execute(
229 "INSERT INTO tasks (
230 id, title, description, status, priority,
231 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
232 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
233 params![
234 &task_id,
235 &description, &description, initial_status,
238 priority.to_string(),
239 needed_tags_json,
240 wanted_tags_json,
241 tags_json,
242 points,
243 time_estimate_ms,
244 now,
245 now,
246 ],
247 )?;
248
249 sync_task_tags(&tx, &task_id, &tags)?;
251 sync_needed_tags(&tx, &task_id, &needed_tags)?;
252 sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
253
254 if let Some(ref pid) = parent_id {
256 Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
257 }
258
259 record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
261
262 tx.commit()?;
263
264 Ok(Task {
265 id: task_id,
266 title: description.clone(),
267 description: Some(description),
268 status: initial_status.clone(),
269 priority,
270 worker_id: None,
271 claimed_at: None,
272 needed_tags,
273 wanted_tags,
274 tags,
275 points,
276 time_estimate_ms,
277 time_actual_ms: None,
278 started_at: None,
279 completed_at: None,
280 current_thought: None,
281 cost_usd: 0.0,
282 metrics: [0; 8],
283 created_at: now,
284 updated_at: now,
285 })
286 })
287 }
288
289 pub fn create_task_tree(
293 &self,
294 input: TaskTreeInput,
295 parent_id: Option<String>,
296 child_type: Option<String>,
297 sibling_type: Option<String>,
298 states_config: &StatesConfig,
299 ) -> Result<(String, Vec<String>)> {
300 let mut all_ids = Vec::new();
301 let child_type = child_type.or_else(|| Some("contains".to_string()));
303
304 self.with_conn_mut(|conn| {
305 let tx = conn.transaction()?;
306 let root_id = create_tree_recursive(
307 &tx,
308 &input,
309 parent_id.as_deref(),
310 None, child_type.as_deref(),
312 sibling_type.as_deref(),
313 &mut all_ids,
314 states_config,
315 )?;
316 tx.commit()?;
317 Ok((root_id, all_ids))
318 })
319 }
320
321 pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
323 self.with_conn(|conn| {
324 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
325
326 let result = stmt.query_row(params![task_id], parse_task_row);
327
328 match result {
329 Ok(task) => Ok(Some(task)),
330 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
331 Err(e) => Err(e.into()),
332 }
333 })
334 }
335
336 pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
338 let task = self.get_task(task_id)?;
339 match task {
340 None => Ok(None),
341 Some(task) => {
342 let children = self.get_children_recursive(&task.id)?;
343 Ok(Some(TaskTree { task, children }))
344 }
345 }
346 }
347
348 fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
350 let children = self.get_children(parent_id)?;
351 let mut result = Vec::new();
352
353 for child in children {
354 let child_children = self.get_children_recursive(&child.id)?;
355 result.push(TaskTree {
356 task: child,
357 children: child_children,
358 });
359 }
360
361 Ok(result)
362 }
363
364 pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
366 self.with_conn(|conn| {
367 let mut stmt = conn.prepare(
368 "SELECT t.* FROM tasks t
369 INNER JOIN dependencies d ON t.id = d.to_task_id
370 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
371 ORDER BY t.created_at",
372 )?;
373
374 let tasks = stmt
375 .query_map(params![parent_id], parse_task_row)?
376 .filter_map(|r| r.ok())
377 .collect();
378
379 Ok(tasks)
380 })
381 }
382
383 #[allow(clippy::too_many_arguments)]
385 pub fn update_task(
386 &self,
387 task_id: &str,
388 title: Option<String>,
389 description: Option<Option<String>>,
390 status: Option<String>,
391 priority: Option<Priority>,
392 points: Option<Option<i32>>,
393 tags: Option<Vec<String>>,
394 states_config: &StatesConfig,
395 ) -> Result<Task> {
396 let now = now_ms();
397
398 self.with_conn(|conn| {
399 let task =
400 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
401
402 let new_title = title.unwrap_or(task.title.clone());
403 let new_description = description.unwrap_or(task.description.clone());
404 let new_status = status.unwrap_or(task.status.clone());
405 let new_priority = priority.unwrap_or(task.priority);
406 let new_points = points.unwrap_or(task.points);
407 let new_tags = tags.unwrap_or(task.tags.clone());
408
409 if !states_config.is_valid_state(&new_status) {
411 return Err(anyhow!(
412 "Invalid state '{}'. Valid states: {:?}",
413 new_status,
414 states_config.state_names()
415 ));
416 }
417
418 if task.status != new_status
420 && !states_config.is_valid_transition(&task.status, &new_status) {
421 let exits = states_config.get_exits(&task.status);
422 return Err(anyhow!(
423 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
424 task.status,
425 new_status,
426 exits
427 ));
428 }
429
430 let started_at =
433 if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
434 Some(now)
435 } else {
436 task.started_at
437 };
438
439 let completed_at = if states_config.is_terminal_state(&new_status) {
441 Some(now)
442 } else {
443 task.completed_at
444 };
445
446 if task.status != new_status {
448 record_state_transition(
449 conn,
450 task_id,
451 &new_status,
452 task.worker_id.as_deref(),
453 None,
454 states_config,
455 )?;
456 }
457
458 conn.execute(
459 "UPDATE tasks SET
460 title = ?1, description = ?2, status = ?3, priority = ?4,
461 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
462 tags = ?9
463 WHERE id = ?10",
464 params![
465 new_title,
466 new_description,
467 new_status,
468 new_priority.to_string(),
469 new_points,
470 started_at,
471 completed_at,
472 now,
473 serde_json::to_string(&new_tags)?,
474 task_id,
475 ],
476 )?;
477
478 Ok(Task {
479 id: task_id.to_string(),
480 title: new_title,
481 description: new_description,
482 status: new_status,
483 priority: new_priority,
484 points: new_points,
485 tags: new_tags,
486 started_at,
487 completed_at,
488 updated_at: now,
489 ..task
490 })
491 })
492 }
493
494 #[allow(clippy::too_many_arguments)]
506 pub fn update_task_unified(
507 &self,
508 task_id: &str,
509 agent_id: &str,
510 assignee: Option<&str>,
511 title: Option<String>,
512 description: Option<Option<String>>,
513 status: Option<String>,
514 priority: Option<Priority>,
515 points: Option<Option<i32>>,
516 tags: Option<Vec<String>>,
517 needed_tags: Option<Vec<String>>,
518 wanted_tags: Option<Vec<String>>,
519 time_estimate_ms: Option<i64>,
520 reason: Option<String>,
521 force: bool,
522 states_config: &StatesConfig,
523 deps_config: &DependenciesConfig,
524 auto_advance: &AutoAdvanceConfig,
525 ) -> Result<(Task, Vec<String>, Vec<String>)> {
526 let now = now_ms();
527
528 self.with_conn_mut(|conn| {
529 let tx = conn.transaction()?;
530
531 let task =
532 get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
533
534 if let Some(ref current_owner) = task.worker_id
536 && current_owner != agent_id && !force {
537 return Err(anyhow!(
538 "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
539 current_owner
540 ));
541 }
542
543 let new_title = title.unwrap_or(task.title.clone());
544 let new_description = description.unwrap_or(task.description.clone());
545 let new_status = if assignee.is_some() && status.is_none() {
547 "assigned".to_string()
548 } else {
549 status.unwrap_or(task.status.clone())
550 };
551 let new_priority = priority.unwrap_or(task.priority);
552 let new_points = points.unwrap_or(task.points);
553 let new_tags = tags.unwrap_or(task.tags.clone());
554 let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
555 let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
556 let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
557
558 if !states_config.is_valid_state(&new_status) {
560 return Err(anyhow!(
561 "Invalid state '{}'. Valid states: {:?}",
562 new_status,
563 states_config.state_names()
564 ));
565 }
566
567 if task.status != new_status
569 && !states_config.is_valid_transition(&task.status, &new_status) {
570 let exits = states_config.get_exits(&task.status);
571 return Err(anyhow!(
572 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
573 task.status,
574 new_status,
575 exits
576 ));
577 }
578
579 let new_is_timed = states_config.is_timed_state(&new_status);
581 let new_is_terminal = states_config.is_terminal_state(&new_status);
582 let current_owner = task.worker_id.as_deref();
583 let is_owned_by_agent = current_owner == Some(agent_id);
584 let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
585
586 let mut new_owner: Option<String> = task.worker_id.clone();
587 let mut new_claimed_at: Option<i64> = task.claimed_at;
588
589 if let Some(target_agent) = assignee {
592 if is_owned_by_other && !force {
594 return Err(anyhow!(
595 "Task is already claimed by agent '{}'. Use force=true to reassign.",
596 current_owner.unwrap()
597 ));
598 }
599
600 let target = get_worker_internal(&tx, target_agent)?
602 .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
603
604 if !task.needed_tags.is_empty() {
606 for needed in &task.needed_tags {
607 if !target.tags.contains(needed) {
608 return Err(anyhow!(
609 "Assignee '{}' missing required tag: {}",
610 target_agent,
611 needed
612 ));
613 }
614 }
615 }
616
617 if !task.wanted_tags.is_empty() {
618 let has_any = task
619 .wanted_tags
620 .iter()
621 .any(|wanted| target.tags.contains(wanted));
622 if !has_any {
623 return Err(anyhow!(
624 "Assignee '{}' has none of the wanted tags: {:?}",
625 target_agent,
626 task.wanted_tags
627 ));
628 }
629 }
630
631 new_owner = Some(target_agent.to_string());
633 new_claimed_at = Some(now);
634 }
635
636 if new_is_timed && !is_owned_by_agent {
639 if is_owned_by_other && !force {
641 return Err(anyhow!(
642 "Task is already claimed by agent '{}'",
643 current_owner.unwrap()
644 ));
645 }
646
647 if !force {
649 let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
650 &tx,
651 task_id,
652 states_config,
653 deps_config,
654 )?;
655 if !unsatisfied_blockers.is_empty() {
656 return Err(anyhow!(
657 "Task has unsatisfied dependencies: [{}]",
658 unsatisfied_blockers.join(", ")
659 ));
660 }
661 }
662
663 let agent = get_worker_internal(&tx, agent_id)?
665 .ok_or_else(|| anyhow!("Agent not found"))?;
666
667 if !task.needed_tags.is_empty() {
669 for needed in &task.needed_tags {
670 if !agent.tags.contains(needed) {
671 return Err(anyhow!("Agent missing required tag: {}", needed));
672 }
673 }
674 }
675
676 if !task.wanted_tags.is_empty() {
678 let has_any = task
679 .wanted_tags
680 .iter()
681 .any(|wanted| agent.tags.contains(wanted));
682 if !has_any {
683 return Err(anyhow!("Agent has none of the wanted tags"));
684 }
685 }
686
687 new_owner = Some(agent_id.to_string());
689 new_claimed_at = Some(now);
690
691 tx.execute(
693 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
694 params![now, agent_id],
695 )?;
696 }
697
698 if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
700 if is_owned_by_other && !force {
702 return Err(anyhow!("Task is not owned by this agent"));
703 }
704
705 new_owner = None;
707 new_claimed_at = None;
708 }
709
710 if new_is_terminal {
712 if let Some(ref current_owner) = task.worker_id
714 && current_owner != agent_id && !force {
715 return Err(anyhow!("Task is not owned by this agent"));
716 }
717
718 let incomplete_children: i32 = tx.query_row(
720 "SELECT COUNT(*) FROM dependencies d
721 INNER JOIN tasks child ON d.to_task_id = child.id
722 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
723 AND child.status IN (SELECT value FROM json_each(?2))",
724 params![
725 task_id,
726 serde_json::to_string(&states_config.blocking_states)?
727 ],
728 |row| row.get(0),
729 )?;
730
731 if incomplete_children > 0 {
732 return Err(anyhow!(
733 "Cannot complete task: {} child task(s) are not complete",
734 incomplete_children
735 ));
736 }
737
738 new_owner = None;
740 new_claimed_at = None;
741
742 tx.execute(
744 "DELETE FROM file_locks WHERE task_id = ?1",
745 params![task_id],
746 )?;
747 }
748
749 let started_at =
751 if task.started_at.is_none() && new_is_timed {
752 Some(now)
753 } else {
754 task.started_at
755 };
756
757 let completed_at = if new_is_terminal {
758 Some(now)
759 } else {
760 task.completed_at
761 };
762
763 let status_changed = task.status != new_status;
765 if status_changed {
766 record_state_transition(
767 &tx,
768 task_id,
769 &new_status,
770 new_owner.as_deref(),
771 reason.as_deref(),
772 states_config,
773 )?;
774 }
775
776 tx.execute(
777 "UPDATE tasks SET
778 title = ?1, description = ?2, status = ?3, priority = ?4,
779 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
780 tags = ?9, worker_id = ?10, claimed_at = ?11,
781 needed_tags = ?12, wanted_tags = ?13, time_estimate_ms = ?14
782 WHERE id = ?15",
783 params![
784 new_title,
785 new_description,
786 new_status,
787 new_priority.to_string(),
788 new_points,
789 started_at,
790 completed_at,
791 now,
792 serde_json::to_string(&new_tags)?,
793 new_owner,
794 new_claimed_at,
795 serde_json::to_string(&new_needed_tags)?,
796 serde_json::to_string(&new_wanted_tags)?,
797 new_time_estimate_ms,
798 task_id,
799 ],
800 )?;
801
802 if new_tags != task.tags {
804 sync_task_tags(&tx, task_id, &new_tags)?;
805 }
806 if new_needed_tags != task.needed_tags {
807 sync_needed_tags(&tx, task_id, &new_needed_tags)?;
808 }
809 if new_wanted_tags != task.wanted_tags {
810 sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
811 }
812
813 let (unblocked, auto_advanced) = if status_changed {
815 let was_blocking = states_config.is_blocking_state(&task.status);
816 let is_blocking = states_config.is_blocking_state(&new_status);
817
818 if was_blocking && !is_blocking {
819 super::deps::propagate_unblock_effects(
820 &tx,
821 task_id,
822 Some(agent_id),
823 states_config,
824 deps_config,
825 auto_advance,
826 )?
827 } else {
828 (vec![], vec![])
829 }
830 } else {
831 (vec![], vec![])
832 };
833
834 tx.commit()?;
835
836 Ok((Task {
837 id: task_id.to_string(),
838 title: new_title,
839 description: new_description,
840 status: new_status,
841 priority: new_priority,
842 points: new_points,
843 tags: new_tags,
844 needed_tags: new_needed_tags,
845 wanted_tags: new_wanted_tags,
846 time_estimate_ms: new_time_estimate_ms,
847 started_at,
848 completed_at,
849 updated_at: now,
850 worker_id: new_owner,
851 claimed_at: new_claimed_at,
852 ..task
853 }, unblocked, auto_advanced))
854 })
855 }
856
857 pub fn delete_task(
865 &self,
866 task_id: &str,
867 worker_id: &str,
868 cascade: bool,
869 reason: Option<String>,
870 obliterate: bool,
871 force: bool,
872 ) -> Result<()> {
873 let now = now_ms();
874
875 self.with_conn_mut(|conn| {
876 let tx = conn.transaction()?;
877
878 let task = get_task_internal(&tx, task_id)?
880 .ok_or_else(|| anyhow!("Task not found"))?;
881
882 if let Some(ref owner) = task.worker_id
884 && owner != worker_id && !force {
885 return Err(anyhow!(
886 "Task is claimed by worker '{}'. Use force=true to override.",
887 owner
888 ));
889 }
890
891 if obliterate {
892 if cascade {
894 tx.execute(
897 "WITH RECURSIVE descendants AS (
898 SELECT ?1 AS id
899 UNION ALL
900 SELECT dep.to_task_id FROM dependencies dep
901 INNER JOIN descendants d ON dep.from_task_id = d.id
902 WHERE dep.dep_type = 'contains'
903 )
904 DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
905 params![task_id],
906 )?;
907 } else {
908 let child_count: i32 = tx.query_row(
910 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
911 params![task_id],
912 |row| row.get(0),
913 )?;
914
915 if child_count > 0 {
916 return Err(anyhow!("Task has children; use cascade=true to delete"));
917 }
918
919 tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
920 }
921 } else {
922 if cascade {
924 tx.execute(
926 "WITH RECURSIVE descendants AS (
927 SELECT ?1 AS id
928 UNION ALL
929 SELECT dep.to_task_id FROM dependencies dep
930 INNER JOIN descendants d ON dep.from_task_id = d.id
931 WHERE dep.dep_type = 'contains'
932 )
933 UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
934 WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
935 params![task_id, now, worker_id, reason],
936 )?;
937 } else {
938 let child_count: i32 = tx.query_row(
940 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
941 params![task_id],
942 |row| row.get(0),
943 )?;
944
945 if child_count > 0 {
946 return Err(anyhow!("Task has children; use cascade=true to delete"));
947 }
948
949 tx.execute(
950 "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
951 params![now, worker_id, reason, task_id],
952 )?;
953 }
954 }
955
956 tx.commit()?;
957 Ok(())
958 })
959 }
960
961 pub fn list_tasks(
964 &self,
965 status: Option<&str>,
966 owner: Option<&str>,
967 parent_id: Option<Option<&str>>,
968 limit: Option<i32>,
969 sort_by: Option<&str>,
970 sort_order: Option<&str>,
971 ) -> Result<Vec<Task>> {
972 self.with_conn(|conn| {
973 let mut sql = String::from(
974 "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
975 );
976 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
977
978 if let Some(s) = status {
979 sql.push_str(" AND t.status = ?");
980 params_vec.push(Box::new(s.to_string()));
981 }
982
983 if let Some(o) = owner {
984 sql.push_str(" AND t.worker_id = ?");
985 params_vec.push(Box::new(o.to_string()));
986 }
987
988 if let Some(p) = parent_id {
990 match p {
991 Some(pid) => {
992 sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
993 params_vec.push(Box::new(pid.to_string()));
994 }
995 None => {
996 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
998 }
999 }
1000 }
1001
1002 let order_clause = build_order_clause(sort_by, sort_order);
1004 sql.push_str(&format!(" ORDER BY {}", order_clause));
1005
1006 if let Some(l) = limit {
1007 sql.push_str(&format!(" LIMIT {}", l));
1008 }
1009
1010 let params_refs: Vec<&dyn rusqlite::ToSql> =
1011 params_vec.iter().map(|b| b.as_ref()).collect();
1012
1013 let mut stmt = conn.prepare(&sql)?;
1014 let tasks = stmt
1015 .query_map(params_refs.as_slice(), parse_task_row)?
1016 .filter_map(|r| r.ok())
1017 .collect();
1018
1019 Ok(tasks)
1020 })
1021 }
1022
1023 pub fn set_thought(
1025 &self,
1026 agent_id: &str,
1027 thought: Option<String>,
1028 task_ids: Option<Vec<String>>,
1029 ) -> Result<i32> {
1030 let now = now_ms();
1031
1032 self.with_conn(|conn| {
1033 let updated = if let Some(ids) = task_ids {
1034 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1035 let sql = format!(
1036 "UPDATE tasks SET current_thought = ?, updated_at = ?
1037 WHERE worker_id = ? AND id IN ({})",
1038 placeholders.join(", ")
1039 );
1040
1041 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1042 params_vec.push(Box::new(thought.clone()));
1043 params_vec.push(Box::new(now));
1044 params_vec.push(Box::new(agent_id.to_string()));
1045 for id in &ids {
1046 params_vec.push(Box::new(id.clone()));
1047 }
1048
1049 let params_refs: Vec<&dyn rusqlite::ToSql> =
1050 params_vec.iter().map(|b| b.as_ref()).collect();
1051 conn.execute(&sql, params_refs.as_slice())?
1052 } else {
1053 conn.execute(
1054 "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1055 params![thought, now, agent_id],
1056 )?
1057 };
1058
1059 Ok(updated as i32)
1060 })
1061 }
1062
1063 pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1065 let now = now_ms();
1066
1067 self.with_conn(|conn| {
1068 conn.execute(
1069 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1070 WHERE id = ?3",
1071 params![duration_ms, now, task_id],
1072 )?;
1073
1074 let total: i64 = conn.query_row(
1075 "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1076 params![task_id],
1077 |row| row.get(0),
1078 )?;
1079
1080 Ok(total)
1081 })
1082 }
1083
1084 pub fn log_metrics(
1087 &self,
1088 task_id: &str,
1089 cost_usd: Option<f64>,
1090 values: &[i64],
1091 ) -> Result<Task> {
1092 let now = now_ms();
1093
1094 self.with_conn(|conn| {
1095 let task =
1096 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1097
1098 let mut new_metrics = task.metrics;
1100 for (i, &val) in values.iter().take(8).enumerate() {
1101 new_metrics[i] += val;
1102 }
1103
1104 let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1105
1106 conn.execute(
1107 "UPDATE tasks SET
1108 metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1109 metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1110 cost_usd = ?9, updated_at = ?10
1111 WHERE id = ?11",
1112 params![
1113 new_metrics[0],
1114 new_metrics[1],
1115 new_metrics[2],
1116 new_metrics[3],
1117 new_metrics[4],
1118 new_metrics[5],
1119 new_metrics[6],
1120 new_metrics[7],
1121 new_cost_usd,
1122 now,
1123 task_id,
1124 ],
1125 )?;
1126
1127 Ok(Task {
1128 cost_usd: new_cost_usd,
1129 metrics: new_metrics,
1130 updated_at: now,
1131 ..task
1132 })
1133 })
1134 }
1135
1136 pub fn claim_task(
1139 &self,
1140 task_id: &str,
1141 agent_id: &str,
1142 states_config: &StatesConfig,
1143 ) -> Result<Task> {
1144 let now = now_ms();
1145
1146 let claim_status = states_config
1148 .definitions
1149 .iter()
1150 .find(|(_, def)| def.timed)
1151 .map(|(name, _)| name.as_str())
1152 .unwrap_or("in_progress");
1153
1154 self.with_conn(|conn| {
1155 let task =
1157 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1158
1159 if task.worker_id.is_some() {
1161 return Err(anyhow!("Task is already claimed"));
1162 }
1163
1164 if !states_config.is_valid_transition(&task.status, claim_status) {
1166 let exits = states_config.get_exits(&task.status);
1167 return Err(anyhow!(
1168 "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1169 task.status,
1170 exits
1171 ));
1172 }
1173
1174 let agent =
1176 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1177
1178 if !task.needed_tags.is_empty() {
1180 for needed in &task.needed_tags {
1181 if !agent.tags.contains(needed) {
1182 return Err(anyhow!("Agent missing required tag: {}", needed));
1183 }
1184 }
1185 }
1186
1187 if !task.wanted_tags.is_empty() {
1189 let has_any = task
1190 .wanted_tags
1191 .iter()
1192 .any(|wanted| agent.tags.contains(wanted));
1193 if !has_any {
1194 return Err(anyhow!("Agent has none of the wanted tags"));
1195 }
1196 }
1197
1198 conn.execute(
1199 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1200 WHERE id = ?6",
1201 params![agent_id, now, claim_status, now, now, task_id,],
1202 )?;
1203
1204 record_state_transition(
1206 conn,
1207 task_id,
1208 claim_status,
1209 Some(agent_id),
1210 None,
1211 states_config,
1212 )?;
1213
1214 conn.execute(
1216 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1217 params![now, agent_id],
1218 )?;
1219
1220 Ok(Task {
1221 worker_id: Some(agent_id.to_string()),
1222 claimed_at: Some(now),
1223 status: claim_status.to_string(),
1224 started_at: Some(now),
1225 updated_at: now,
1226 ..task
1227 })
1228 })
1229 }
1230
1231 pub fn release_task(
1233 &self,
1234 task_id: &str,
1235 agent_id: &str,
1236 states_config: &StatesConfig,
1237 ) -> Result<()> {
1238 let now = now_ms();
1239 let release_status = &states_config.initial;
1240
1241 self.with_conn(|conn| {
1242 let task =
1243 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1244
1245 if task.worker_id.as_deref() != Some(agent_id) {
1246 return Err(anyhow!("Task is not owned by this agent"));
1247 }
1248
1249 record_state_transition(
1251 conn,
1252 task_id,
1253 release_status,
1254 Some(agent_id),
1255 None,
1256 states_config,
1257 )?;
1258
1259 conn.execute(
1260 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1261 WHERE id = ?3",
1262 params![release_status, now, task_id],
1263 )?;
1264
1265 Ok(())
1266 })
1267 }
1268
1269 pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1271 let now = now_ms();
1272 let release_status = &states_config.initial;
1273
1274 self.with_conn(|conn| {
1275 let task =
1276 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1277
1278 record_state_transition(
1280 conn,
1281 task_id,
1282 release_status,
1283 task.worker_id.as_deref(),
1284 None,
1285 states_config,
1286 )?;
1287
1288 conn.execute(
1289 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1290 WHERE id = ?3",
1291 params![release_status, now, task_id],
1292 )?;
1293
1294 Ok(())
1295 })
1296 }
1297
1298 pub fn force_claim_task(
1300 &self,
1301 task_id: &str,
1302 agent_id: &str,
1303 states_config: &StatesConfig,
1304 ) -> Result<Task> {
1305 let now = now_ms();
1306
1307 let claim_status = states_config
1309 .definitions
1310 .iter()
1311 .find(|(_, def)| def.timed)
1312 .map(|(name, _)| name.as_str())
1313 .unwrap_or("in_progress");
1314
1315 self.with_conn(|conn| {
1316 let task =
1318 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1319
1320 let agent =
1322 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1323
1324 if !task.needed_tags.is_empty() {
1326 for needed in &task.needed_tags {
1327 if !agent.tags.contains(needed) {
1328 return Err(anyhow!("Agent missing required tag: {}", needed));
1329 }
1330 }
1331 }
1332
1333 if !task.wanted_tags.is_empty() {
1335 let has_any = task
1336 .wanted_tags
1337 .iter()
1338 .any(|wanted| agent.tags.contains(wanted));
1339 if !has_any {
1340 return Err(anyhow!("Agent has none of the wanted tags"));
1341 }
1342 }
1343
1344 conn.execute(
1345 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1346 WHERE id = ?6",
1347 params![agent_id, now, claim_status, now, now, task_id,],
1348 )?;
1349
1350 record_state_transition(
1352 conn,
1353 task_id,
1354 claim_status,
1355 Some(agent_id),
1356 None,
1357 states_config,
1358 )?;
1359
1360 conn.execute(
1362 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1363 params![now, agent_id],
1364 )?;
1365
1366 Ok(Task {
1367 worker_id: Some(agent_id.to_string()),
1368 claimed_at: Some(now),
1369 status: claim_status.to_string(),
1370 started_at: task.started_at.or(Some(now)),
1371 updated_at: now,
1372 ..task
1373 })
1374 })
1375 }
1376
1377 pub fn release_task_with_state(
1379 &self,
1380 task_id: &str,
1381 agent_id: &str,
1382 state: &str,
1383 states_config: &StatesConfig,
1384 ) -> Result<()> {
1385 let now = now_ms();
1386
1387 self.with_conn(|conn| {
1388 let task =
1389 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1390
1391 if task.worker_id.as_deref() != Some(agent_id) {
1392 return Err(anyhow!("Task is not owned by this agent"));
1393 }
1394
1395 if !states_config.is_valid_state(state) {
1397 return Err(anyhow!(
1398 "Invalid state '{}'. Valid states: {:?}",
1399 state,
1400 states_config.state_names()
1401 ));
1402 }
1403
1404 if !states_config.is_valid_transition(&task.status, state) {
1406 let exits = states_config.get_exits(&task.status);
1407 return Err(anyhow!(
1408 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1409 task.status,
1410 state,
1411 exits
1412 ));
1413 }
1414
1415 let completed_at = if states_config.is_terminal_state(state) {
1417 Some(now)
1418 } else {
1419 None
1420 };
1421
1422 record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1424
1425 conn.execute(
1426 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1427 WHERE id = ?4",
1428 params![state, completed_at, now, task_id],
1429 )?;
1430
1431 Ok(())
1432 })
1433 }
1434
1435 pub fn force_release_stale(
1437 &self,
1438 timeout_seconds: i64,
1439 states_config: &StatesConfig,
1440 ) -> Result<i32> {
1441 let now = now_ms();
1442 let cutoff = now - (timeout_seconds * 1000);
1443 let release_status = &states_config.initial;
1444
1445 self.with_conn(|conn| {
1446 let updated = conn.execute(
1447 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1448 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1449 params![release_status, now, cutoff],
1450 )?;
1451
1452 Ok(updated as i32)
1453 })
1454 }
1455
1456 pub fn complete_task(
1460 &self,
1461 task_id: &str,
1462 agent_id: &str,
1463 states_config: &StatesConfig,
1464 ) -> Result<Task> {
1465 let now = now_ms();
1466
1467 let complete_status = if states_config.definitions.contains_key("completed") {
1469 "completed"
1470 } else {
1471 states_config
1473 .definitions
1474 .iter()
1475 .find(|(_, def)| def.exits.is_empty())
1476 .map(|(name, _)| name.as_str())
1477 .unwrap_or("completed")
1478 };
1479
1480 self.with_conn_mut(|conn| {
1481 let tx = conn.transaction()?;
1482
1483 let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1485 let task = stmt
1486 .query_row(params![task_id], parse_task_row)
1487 .map_err(|_| anyhow!("Task not found"))?;
1488 drop(stmt);
1489
1490 if task.worker_id.as_deref() != Some(agent_id) {
1492 return Err(anyhow!("Task is not owned by this agent"));
1493 }
1494
1495 let incomplete_children: i32 = tx.query_row(
1497 "SELECT COUNT(*) FROM dependencies d
1498 INNER JOIN tasks child ON d.to_task_id = child.id
1499 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1500 AND child.status IN (SELECT value FROM json_each(?2))",
1501 params![
1502 task_id,
1503 serde_json::to_string(&states_config.blocking_states)?
1504 ],
1505 |row| row.get(0),
1506 )?;
1507
1508 if incomplete_children > 0 {
1509 return Err(anyhow!(
1510 "Cannot complete task: {} child task(s) are not complete",
1511 incomplete_children
1512 ));
1513 }
1514
1515 if !states_config.is_valid_transition(&task.status, complete_status) {
1517 let exits = states_config.get_exits(&task.status);
1518 return Err(anyhow!(
1519 "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1520 task.status,
1521 exits
1522 ));
1523 }
1524
1525 record_state_transition(
1527 &tx,
1528 task_id,
1529 complete_status,
1530 Some(agent_id),
1531 None,
1532 states_config,
1533 )?;
1534
1535 tx.execute(
1537 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1538 worker_id = NULL, claimed_at = NULL
1539 WHERE id = ?4",
1540 params![complete_status, now, now, task_id],
1541 )?;
1542
1543 tx.execute(
1545 "DELETE FROM file_locks WHERE task_id = ?1",
1546 params![task_id],
1547 )?;
1548
1549 tx.execute(
1551 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1552 params![now, agent_id],
1553 )?;
1554
1555 tx.commit()?;
1556
1557 Ok(Task {
1558 status: complete_status.to_string(),
1559 completed_at: Some(now),
1560 updated_at: now,
1561 worker_id: None,
1562 claimed_at: None,
1563 ..task
1564 })
1565 })
1566 }
1567
1568 pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1570 self.with_conn(|conn| {
1571 let mut stmt =
1572 conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1573 let tasks = stmt
1574 .query_map([], parse_task_row)?
1575 .filter_map(|r| r.ok())
1576 .collect();
1577 Ok(tasks)
1578 })
1579 }
1580
1581 #[allow(dead_code)]
1583 pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1584 self.with_conn(|conn| {
1585 let mut stmt =
1586 conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1587 let tasks = stmt
1588 .query_map(params![status], parse_task_row)?
1589 .filter_map(|r| r.ok())
1590 .collect();
1591 Ok(tasks)
1592 })
1593 }
1594
1595 pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1597 self.with_conn(|conn| {
1598 let tasks = if let Some(aid) = agent_id {
1599 let mut stmt = conn
1600 .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1601 stmt.query_map(params![aid], parse_task_row)?
1602 .filter_map(|r| r.ok())
1603 .collect()
1604 } else {
1605 let mut stmt = conn.prepare(
1606 "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1607 )?;
1608 stmt.query_map([], parse_task_row)?
1609 .filter_map(|r| r.ok())
1610 .collect()
1611 };
1612
1613 Ok(tasks)
1614 })
1615 }
1616}
1617
1618#[allow(clippy::too_many_arguments)]
1623fn create_tree_recursive(
1624 conn: &Connection,
1625 input: &TaskTreeInput,
1626 parent_id: Option<&str>,
1627 prev_sibling_id: Option<&str>,
1628 child_type: Option<&str>,
1629 sibling_type: Option<&str>,
1630 all_ids: &mut Vec<String>,
1631 states_config: &StatesConfig,
1632) -> Result<String> {
1633 let task_id = if let Some(ref ref_id) = input.ref_id {
1635 let exists: bool = conn.query_row(
1637 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1638 params![ref_id],
1639 |row| row.get(0),
1640 )?;
1641 if !exists {
1642 return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1643 }
1644 ref_id.clone()
1645 } else {
1646 let generated_id = Uuid::now_v7().to_string();
1648 let task_id = input.id.clone().unwrap_or(generated_id);
1649 let now = now_ms();
1650 let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1651 let initial_status = &states_config.initial;
1652
1653 let needed_tags = input.needed_tags.clone().unwrap_or_default();
1654 let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1655 let tags = input.tags.clone().unwrap_or_default();
1656 let needed_tags_json = serde_json::to_string(&needed_tags)?;
1657 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1658 let tags_json = serde_json::to_string(&tags)?;
1659
1660 conn.execute(
1661 "INSERT INTO tasks (
1662 id, title, description, status, priority,
1663 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1664 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1665 params![
1666 &task_id,
1667 &input.title,
1668 &input.description,
1669 initial_status,
1670 priority.to_string(),
1671 needed_tags_json,
1672 wanted_tags_json,
1673 tags_json,
1674 input.points,
1675 input.time_estimate_ms,
1676 now,
1677 now,
1678 ],
1679 )?;
1680
1681 record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1683
1684 sync_task_tags(conn, &task_id, &tags)?;
1686 sync_needed_tags(conn, &task_id, &needed_tags)?;
1687 sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1688
1689 task_id
1690 };
1691
1692 if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1694 Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1695 }
1696
1697 if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1699 Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1700 }
1701
1702 all_ids.push(task_id.clone());
1703
1704 let mut prev_child_id: Option<String> = None;
1706 for child in input.children.iter() {
1707 let child_id = create_tree_recursive(
1708 conn,
1709 child,
1710 Some(&task_id),
1711 prev_child_id.as_deref(),
1712 child_type,
1713 sibling_type,
1714 all_ids,
1715 states_config,
1716 )?;
1717 prev_child_id = Some(child_id);
1718 }
1719
1720 Ok(task_id)
1721}