1use super::state_transitions::record_state_transition;
4use super::{now_ms, Database};
5use crate::config::{AutoAdvanceConfig, DependenciesConfig, StatesConfig};
6use crate::types::{clamp_priority, parse_priority, Priority, Task, TaskTree, TaskTreeInput, Worker, PRIORITY_DEFAULT};
7use anyhow::{anyhow, Result};
8use rusqlite::{params, Connection, Row};
9use uuid::Uuid;
10
11fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
14 let field = match sort_by {
15 Some("priority") => "CAST(t.priority AS INTEGER)",
16 Some("created_at") => "t.created_at",
17 Some("updated_at") => "t.updated_at",
18 _ => "t.created_at", };
20
21 let order = match sort_order {
22 Some("asc") => "ASC",
23 Some("desc") => "DESC",
24 _ => {
25 "DESC"
27 }
28 };
29
30 format!("{} {}", field, order)
31}
32
33fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
40 conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
41 for tag in tags {
42 conn.execute(
43 "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
44 params![task_id, tag],
45 )?;
46 }
47 Ok(())
48}
49
50fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
52 conn.execute("DELETE FROM task_needed_tags WHERE task_id = ?1", params![task_id])?;
53 for tag in tags {
54 conn.execute(
55 "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
56 params![task_id, tag],
57 )?;
58 }
59 Ok(())
60}
61
62fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
64 conn.execute("DELETE FROM task_wanted_tags WHERE task_id = ?1", params![task_id])?;
65 for tag in tags {
66 conn.execute(
67 "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
68 params![task_id, tag],
69 )?;
70 }
71 Ok(())
72}
73
74pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
75 let id: String = row.get("id")?;
76 let title: String = row.get("title")?;
77 let description: Option<String> = row.get("description")?;
78 let status: String = row.get("status")?;
79 let priority: String = row.get("priority")?;
80 let worker_id: Option<String> = row.get("worker_id")?;
81 let claimed_at: Option<i64> = row.get("claimed_at")?;
82
83 let needed_tags_json: Option<String> = row.get("needed_tags")?;
84 let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
85 let tags_json: Option<String> = row.get("tags")?;
86
87 let points: Option<i32> = row.get("points")?;
88 let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
89 let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
90 let started_at: Option<i64> = row.get("started_at")?;
91 let completed_at: Option<i64> = row.get("completed_at")?;
92
93 let current_thought: Option<String> = row.get("current_thought")?;
94
95 let cost_usd: f64 = row.get("cost_usd")?;
96 let metric_0: i64 = row.get("metric_0")?;
97 let metric_1: i64 = row.get("metric_1")?;
98 let metric_2: i64 = row.get("metric_2")?;
99 let metric_3: i64 = row.get("metric_3")?;
100 let metric_4: i64 = row.get("metric_4")?;
101 let metric_5: i64 = row.get("metric_5")?;
102 let metric_6: i64 = row.get("metric_6")?;
103 let metric_7: i64 = row.get("metric_7")?;
104
105 let created_at: i64 = row.get("created_at")?;
106 let updated_at: i64 = row.get("updated_at")?;
107
108 Ok(Task {
109 id,
110 title,
111 description,
112 status,
113 priority: parse_priority(&priority),
114 worker_id,
115 claimed_at,
116 needed_tags: needed_tags_json
117 .map(|s| serde_json::from_str(&s).unwrap_or_default())
118 .unwrap_or_default(),
119 wanted_tags: wanted_tags_json
120 .map(|s| serde_json::from_str(&s).unwrap_or_default())
121 .unwrap_or_default(),
122 tags: tags_json
123 .map(|s| serde_json::from_str(&s).unwrap_or_default())
124 .unwrap_or_default(),
125 points,
126 time_estimate_ms,
127 time_actual_ms,
128 started_at,
129 completed_at,
130 current_thought,
131 cost_usd,
132 metrics: [metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7],
133 created_at,
134 updated_at,
135 })
136}
137
138fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
140 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
141
142 let result = stmt.query_row(params![task_id], parse_task_row);
143
144 match result {
145 Ok(task) => Ok(Some(task)),
146 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
147 Err(e) => Err(e.into()),
148 }
149}
150
151fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
153 let mut stmt = conn.prepare(
154 "SELECT id, tags, max_claims, registered_at, last_heartbeat
155 FROM workers WHERE id = ?1",
156 )?;
157
158 let result = stmt.query_row(params![worker_id], |row| {
159 let id: String = row.get(0)?;
160 let tags_json: String = row.get(1)?;
161 let max_claims: i32 = row.get(2)?;
162 let registered_at: i64 = row.get(3)?;
163 let last_heartbeat: i64 = row.get(4)?;
164
165 Ok((id, tags_json, max_claims, registered_at, last_heartbeat))
166 });
167
168 match result {
169 Ok((id, tags_json, max_claims, registered_at, last_heartbeat)) => {
170 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
171 Ok(Some(Worker {
172 id,
173 tags,
174 max_claims,
175 registered_at,
176 last_heartbeat,
177 }))
178 }
179 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
180 Err(e) => Err(e.into()),
181 }
182}
183
184impl Database {
185 pub fn create_task(
189 &self,
190 id: Option<String>,
191 description: String,
192 parent_id: Option<String>,
193 priority: Option<Priority>,
194 points: Option<i32>,
195 time_estimate_ms: Option<i64>,
196 agent_tags_all: Option<Vec<String>>,
197 agent_tags_any: Option<Vec<String>>,
198 tags: Option<Vec<String>>,
199 states_config: &StatesConfig,
200 ) -> Result<Task> {
201 let task_id = id.unwrap_or_else(|| Uuid::now_v7().to_string());
202 let now = now_ms();
203 let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
204 let initial_status = &states_config.initial;
205
206 let needed_tags = agent_tags_all.unwrap_or_default();
207 let wanted_tags = agent_tags_any.unwrap_or_default();
208 let tags = tags.unwrap_or_default();
209 let needed_tags_json = serde_json::to_string(&needed_tags)?;
210 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
211 let tags_json = serde_json::to_string(&tags)?;
212
213 self.with_conn_mut(|conn| {
214 let tx = conn.transaction()?;
215
216 tx.execute(
217 "INSERT INTO tasks (
218 id, title, description, status, priority,
219 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
220 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
221 params![
222 &task_id,
223 &description, &description, initial_status,
226 priority.to_string(),
227 needed_tags_json,
228 wanted_tags_json,
229 tags_json,
230 points,
231 time_estimate_ms,
232 now,
233 now,
234 ],
235 )?;
236
237 sync_task_tags(&tx, &task_id, &tags)?;
239 sync_needed_tags(&tx, &task_id, &needed_tags)?;
240 sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
241
242 if let Some(ref pid) = parent_id {
244 Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
245 }
246
247 record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
249
250 tx.commit()?;
251
252 Ok(Task {
253 id: task_id,
254 title: description.clone(),
255 description: Some(description),
256 status: initial_status.clone(),
257 priority,
258 worker_id: None,
259 claimed_at: None,
260 needed_tags,
261 wanted_tags,
262 tags,
263 points,
264 time_estimate_ms,
265 time_actual_ms: None,
266 started_at: None,
267 completed_at: None,
268 current_thought: None,
269 cost_usd: 0.0,
270 metrics: [0; 8],
271 created_at: now,
272 updated_at: now,
273 })
274 })
275 }
276
277 pub fn create_task_tree(
281 &self,
282 input: TaskTreeInput,
283 parent_id: Option<String>,
284 child_type: Option<String>,
285 sibling_type: Option<String>,
286 states_config: &StatesConfig,
287 ) -> Result<(String, Vec<String>)> {
288 let mut all_ids = Vec::new();
289 let child_type = child_type.or_else(|| Some("contains".to_string()));
291
292 self.with_conn_mut(|conn| {
293 let tx = conn.transaction()?;
294 let root_id = create_tree_recursive(
295 &tx,
296 &input,
297 parent_id.as_deref(),
298 None, child_type.as_deref(),
300 sibling_type.as_deref(),
301 &mut all_ids,
302 states_config,
303 )?;
304 tx.commit()?;
305 Ok((root_id, all_ids))
306 })
307 }
308
309 pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
311 self.with_conn(|conn| {
312 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
313
314 let result = stmt.query_row(params![task_id], parse_task_row);
315
316 match result {
317 Ok(task) => Ok(Some(task)),
318 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
319 Err(e) => Err(e.into()),
320 }
321 })
322 }
323
324 pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
326 let task = self.get_task(task_id)?;
327 match task {
328 None => Ok(None),
329 Some(task) => {
330 let children = self.get_children_recursive(&task.id)?;
331 Ok(Some(TaskTree { task, children }))
332 }
333 }
334 }
335
336 fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
338 let children = self.get_children(parent_id)?;
339 let mut result = Vec::new();
340
341 for child in children {
342 let child_children = self.get_children_recursive(&child.id)?;
343 result.push(TaskTree {
344 task: child,
345 children: child_children,
346 });
347 }
348
349 Ok(result)
350 }
351
352 pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
354 self.with_conn(|conn| {
355 let mut stmt = conn.prepare(
356 "SELECT t.* FROM tasks t
357 INNER JOIN dependencies d ON t.id = d.to_task_id
358 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
359 ORDER BY t.created_at",
360 )?;
361
362 let tasks = stmt
363 .query_map(params![parent_id], parse_task_row)?
364 .filter_map(|r| r.ok())
365 .collect();
366
367 Ok(tasks)
368 })
369 }
370
371 pub fn update_task(
373 &self,
374 task_id: &str,
375 title: Option<String>,
376 description: Option<Option<String>>,
377 status: Option<String>,
378 priority: Option<Priority>,
379 points: Option<Option<i32>>,
380 tags: Option<Vec<String>>,
381 states_config: &StatesConfig,
382 ) -> Result<Task> {
383 let now = now_ms();
384
385 self.with_conn(|conn| {
386 let task =
387 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
388
389 let new_title = title.unwrap_or(task.title.clone());
390 let new_description = description.unwrap_or(task.description.clone());
391 let new_status = status.unwrap_or(task.status.clone());
392 let new_priority = priority.unwrap_or(task.priority);
393 let new_points = points.unwrap_or(task.points);
394 let new_tags = tags.unwrap_or(task.tags.clone());
395
396 if !states_config.is_valid_state(&new_status) {
398 return Err(anyhow!(
399 "Invalid state '{}'. Valid states: {:?}",
400 new_status,
401 states_config.state_names()
402 ));
403 }
404
405 if task.status != new_status {
407 if !states_config.is_valid_transition(&task.status, &new_status) {
408 let exits = states_config.get_exits(&task.status);
409 return Err(anyhow!(
410 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
411 task.status,
412 new_status,
413 exits
414 ));
415 }
416 }
417
418 let started_at =
421 if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
422 Some(now)
423 } else {
424 task.started_at
425 };
426
427 let completed_at = if states_config.is_terminal_state(&new_status) {
429 Some(now)
430 } else {
431 task.completed_at
432 };
433
434 if task.status != new_status {
436 record_state_transition(
437 conn,
438 task_id,
439 &new_status,
440 task.worker_id.as_deref(),
441 None,
442 states_config,
443 )?;
444 }
445
446 conn.execute(
447 "UPDATE tasks SET
448 title = ?1, description = ?2, status = ?3, priority = ?4,
449 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
450 tags = ?9
451 WHERE id = ?10",
452 params![
453 new_title,
454 new_description,
455 new_status,
456 new_priority.to_string(),
457 new_points,
458 started_at,
459 completed_at,
460 now,
461 serde_json::to_string(&new_tags)?,
462 task_id,
463 ],
464 )?;
465
466 Ok(Task {
467 id: task_id.to_string(),
468 title: new_title,
469 description: new_description,
470 status: new_status,
471 priority: new_priority,
472 points: new_points,
473 tags: new_tags,
474 started_at,
475 completed_at,
476 updated_at: now,
477 ..task
478 })
479 })
480 }
481
482
483 #[allow(clippy::too_many_arguments)]
495 pub fn update_task_unified(
496 &self,
497 task_id: &str,
498 agent_id: &str,
499 assignee: Option<&str>,
500 title: Option<String>,
501 description: Option<Option<String>>,
502 status: Option<String>,
503 priority: Option<Priority>,
504 points: Option<Option<i32>>,
505 tags: Option<Vec<String>>,
506 needed_tags: Option<Vec<String>>,
507 wanted_tags: Option<Vec<String>>,
508 time_estimate_ms: Option<i64>,
509 reason: Option<String>,
510 force: bool,
511 states_config: &StatesConfig,
512 deps_config: &DependenciesConfig,
513 auto_advance: &AutoAdvanceConfig,
514 ) -> Result<(Task, Vec<String>, Vec<String>)> {
515 let now = now_ms();
516
517 self.with_conn_mut(|conn| {
518 let tx = conn.transaction()?;
519
520 let task =
521 get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
522
523 if let Some(ref current_owner) = task.worker_id {
525 if current_owner != agent_id && !force {
526 return Err(anyhow!(
527 "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
528 current_owner
529 ));
530 }
531 }
532
533 let new_title = title.unwrap_or(task.title.clone());
534 let new_description = description.unwrap_or(task.description.clone());
535 let new_status = if assignee.is_some() && status.is_none() {
537 "assigned".to_string()
538 } else {
539 status.unwrap_or(task.status.clone())
540 };
541 let new_priority = priority.unwrap_or(task.priority);
542 let new_points = points.unwrap_or(task.points);
543 let new_tags = tags.unwrap_or(task.tags.clone());
544 let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
545 let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
546 let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
547
548 if !states_config.is_valid_state(&new_status) {
550 return Err(anyhow!(
551 "Invalid state '{}'. Valid states: {:?}",
552 new_status,
553 states_config.state_names()
554 ));
555 }
556
557 if task.status != new_status {
559 if !states_config.is_valid_transition(&task.status, &new_status) {
560 let exits = states_config.get_exits(&task.status);
561 return Err(anyhow!(
562 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
563 task.status,
564 new_status,
565 exits
566 ));
567 }
568 }
569
570 let new_is_timed = states_config.is_timed_state(&new_status);
572 let new_is_terminal = states_config.is_terminal_state(&new_status);
573 let current_owner = task.worker_id.as_deref();
574 let is_owned_by_agent = current_owner == Some(agent_id);
575 let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
576
577 let mut new_owner: Option<String> = task.worker_id.clone();
578 let mut new_claimed_at: Option<i64> = task.claimed_at;
579
580 if let Some(target_agent) = assignee {
583 if is_owned_by_other && !force {
585 return Err(anyhow!(
586 "Task is already claimed by agent '{}'. Use force=true to reassign.",
587 current_owner.unwrap()
588 ));
589 }
590
591 let target = get_worker_internal(&tx, target_agent)?
593 .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
594
595 if !task.needed_tags.is_empty() {
597 for needed in &task.needed_tags {
598 if !target.tags.contains(needed) {
599 return Err(anyhow!(
600 "Assignee '{}' missing required tag: {}",
601 target_agent,
602 needed
603 ));
604 }
605 }
606 }
607
608 if !task.wanted_tags.is_empty() {
609 let has_any = task
610 .wanted_tags
611 .iter()
612 .any(|wanted| target.tags.contains(wanted));
613 if !has_any {
614 return Err(anyhow!(
615 "Assignee '{}' has none of the wanted tags: {:?}",
616 target_agent,
617 task.wanted_tags
618 ));
619 }
620 }
621
622 new_owner = Some(target_agent.to_string());
624 new_claimed_at = Some(now);
625 }
626
627 if new_is_timed && !is_owned_by_agent {
630 if is_owned_by_other && !force {
632 return Err(anyhow!(
633 "Task is already claimed by agent '{}'",
634 current_owner.unwrap()
635 ));
636 }
637
638 if !force {
640 let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
641 &tx,
642 task_id,
643 states_config,
644 deps_config,
645 )?;
646 if !unsatisfied_blockers.is_empty() {
647 return Err(anyhow!(
648 "Task has unsatisfied dependencies: [{}]",
649 unsatisfied_blockers.join(", ")
650 ));
651 }
652 }
653
654 let agent = get_worker_internal(&tx, agent_id)?
656 .ok_or_else(|| anyhow!("Agent not found"))?;
657
658 if !task.needed_tags.is_empty() {
660 for needed in &task.needed_tags {
661 if !agent.tags.contains(needed) {
662 return Err(anyhow!("Agent missing required tag: {}", needed));
663 }
664 }
665 }
666
667 if !task.wanted_tags.is_empty() {
669 let has_any = task
670 .wanted_tags
671 .iter()
672 .any(|wanted| agent.tags.contains(wanted));
673 if !has_any {
674 return Err(anyhow!("Agent has none of the wanted tags"));
675 }
676 }
677
678 new_owner = Some(agent_id.to_string());
680 new_claimed_at = Some(now);
681
682 tx.execute(
684 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
685 params![now, agent_id],
686 )?;
687 }
688
689 if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
691 if is_owned_by_other && !force {
693 return Err(anyhow!("Task is not owned by this agent"));
694 }
695
696 new_owner = None;
698 new_claimed_at = None;
699 }
700
701 if new_is_terminal {
703 if let Some(ref current_owner) = task.worker_id {
705 if current_owner != agent_id && !force {
706 return Err(anyhow!("Task is not owned by this agent"));
707 }
708 }
709
710 let incomplete_children: i32 = tx.query_row(
712 "SELECT COUNT(*) FROM dependencies d
713 INNER JOIN tasks child ON d.to_task_id = child.id
714 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
715 AND child.status IN (SELECT value FROM json_each(?2))",
716 params![
717 task_id,
718 serde_json::to_string(&states_config.blocking_states)?
719 ],
720 |row| row.get(0),
721 )?;
722
723 if incomplete_children > 0 {
724 return Err(anyhow!(
725 "Cannot complete task: {} child task(s) are not complete",
726 incomplete_children
727 ));
728 }
729
730 new_owner = None;
732 new_claimed_at = None;
733
734 tx.execute(
736 "DELETE FROM file_locks WHERE task_id = ?1",
737 params![task_id],
738 )?;
739 }
740
741 let started_at =
743 if task.started_at.is_none() && new_is_timed {
744 Some(now)
745 } else {
746 task.started_at
747 };
748
749 let completed_at = if new_is_terminal {
750 Some(now)
751 } else {
752 task.completed_at
753 };
754
755 let status_changed = task.status != new_status;
757 if status_changed {
758 record_state_transition(
759 &tx,
760 task_id,
761 &new_status,
762 new_owner.as_deref(),
763 reason.as_deref(),
764 states_config,
765 )?;
766 }
767
768 tx.execute(
769 "UPDATE tasks SET
770 title = ?1, description = ?2, status = ?3, priority = ?4,
771 points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
772 tags = ?9, worker_id = ?10, claimed_at = ?11,
773 needed_tags = ?12, wanted_tags = ?13, time_estimate_ms = ?14
774 WHERE id = ?15",
775 params![
776 new_title,
777 new_description,
778 new_status,
779 new_priority.to_string(),
780 new_points,
781 started_at,
782 completed_at,
783 now,
784 serde_json::to_string(&new_tags)?,
785 new_owner,
786 new_claimed_at,
787 serde_json::to_string(&new_needed_tags)?,
788 serde_json::to_string(&new_wanted_tags)?,
789 new_time_estimate_ms,
790 task_id,
791 ],
792 )?;
793
794 if new_tags != task.tags {
796 sync_task_tags(&tx, task_id, &new_tags)?;
797 }
798 if new_needed_tags != task.needed_tags {
799 sync_needed_tags(&tx, task_id, &new_needed_tags)?;
800 }
801 if new_wanted_tags != task.wanted_tags {
802 sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
803 }
804
805 let (unblocked, auto_advanced) = if status_changed {
807 let was_blocking = states_config.is_blocking_state(&task.status);
808 let is_blocking = states_config.is_blocking_state(&new_status);
809
810 if was_blocking && !is_blocking {
811 super::deps::propagate_unblock_effects(
812 &tx,
813 task_id,
814 Some(agent_id),
815 states_config,
816 deps_config,
817 auto_advance,
818 )?
819 } else {
820 (vec![], vec![])
821 }
822 } else {
823 (vec![], vec![])
824 };
825
826 tx.commit()?;
827
828 Ok((Task {
829 id: task_id.to_string(),
830 title: new_title,
831 description: new_description,
832 status: new_status,
833 priority: new_priority,
834 points: new_points,
835 tags: new_tags,
836 needed_tags: new_needed_tags,
837 wanted_tags: new_wanted_tags,
838 time_estimate_ms: new_time_estimate_ms,
839 started_at,
840 completed_at,
841 updated_at: now,
842 worker_id: new_owner,
843 claimed_at: new_claimed_at,
844 ..task
845 }, unblocked, auto_advanced))
846 })
847 }
848
849 pub fn delete_task(
857 &self,
858 task_id: &str,
859 worker_id: &str,
860 cascade: bool,
861 reason: Option<String>,
862 obliterate: bool,
863 force: bool,
864 ) -> Result<()> {
865 let now = now_ms();
866
867 self.with_conn_mut(|conn| {
868 let tx = conn.transaction()?;
869
870 let task = get_task_internal(&tx, task_id)?
872 .ok_or_else(|| anyhow!("Task not found"))?;
873
874 if let Some(ref owner) = task.worker_id {
876 if owner != worker_id && !force {
877 return Err(anyhow!(
878 "Task is claimed by worker '{}'. Use force=true to override.",
879 owner
880 ));
881 }
882 }
883
884 if obliterate {
885 if cascade {
887 tx.execute(
890 "WITH RECURSIVE descendants AS (
891 SELECT ?1 AS id
892 UNION ALL
893 SELECT dep.to_task_id FROM dependencies dep
894 INNER JOIN descendants d ON dep.from_task_id = d.id
895 WHERE dep.dep_type = 'contains'
896 )
897 DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
898 params![task_id],
899 )?;
900 } else {
901 let child_count: i32 = tx.query_row(
903 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
904 params![task_id],
905 |row| row.get(0),
906 )?;
907
908 if child_count > 0 {
909 return Err(anyhow!("Task has children; use cascade=true to delete"));
910 }
911
912 tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
913 }
914 } else {
915 if cascade {
917 tx.execute(
919 "WITH RECURSIVE descendants AS (
920 SELECT ?1 AS id
921 UNION ALL
922 SELECT dep.to_task_id FROM dependencies dep
923 INNER JOIN descendants d ON dep.from_task_id = d.id
924 WHERE dep.dep_type = 'contains'
925 )
926 UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
927 WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
928 params![task_id, now, worker_id, reason],
929 )?;
930 } else {
931 let child_count: i32 = tx.query_row(
933 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
934 params![task_id],
935 |row| row.get(0),
936 )?;
937
938 if child_count > 0 {
939 return Err(anyhow!("Task has children; use cascade=true to delete"));
940 }
941
942 tx.execute(
943 "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
944 params![now, worker_id, reason, task_id],
945 )?;
946 }
947 }
948
949 tx.commit()?;
950 Ok(())
951 })
952 }
953
954 pub fn list_tasks(
957 &self,
958 status: Option<&str>,
959 owner: Option<&str>,
960 parent_id: Option<Option<&str>>,
961 limit: Option<i32>,
962 sort_by: Option<&str>,
963 sort_order: Option<&str>,
964 ) -> Result<Vec<Task>> {
965 self.with_conn(|conn| {
966 let mut sql = String::from(
967 "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
968 );
969 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
970
971 if let Some(s) = status {
972 sql.push_str(" AND t.status = ?");
973 params_vec.push(Box::new(s.to_string()));
974 }
975
976 if let Some(o) = owner {
977 sql.push_str(" AND t.worker_id = ?");
978 params_vec.push(Box::new(o.to_string()));
979 }
980
981 if let Some(p) = parent_id {
983 match p {
984 Some(pid) => {
985 sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
986 params_vec.push(Box::new(pid.to_string()));
987 }
988 None => {
989 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
991 }
992 }
993 }
994
995 let order_clause = build_order_clause(sort_by, sort_order);
997 sql.push_str(&format!(" ORDER BY {}", order_clause));
998
999 if let Some(l) = limit {
1000 sql.push_str(&format!(" LIMIT {}", l));
1001 }
1002
1003 let params_refs: Vec<&dyn rusqlite::ToSql> =
1004 params_vec.iter().map(|b| b.as_ref()).collect();
1005
1006 let mut stmt = conn.prepare(&sql)?;
1007 let tasks = stmt
1008 .query_map(params_refs.as_slice(), parse_task_row)?
1009 .filter_map(|r| r.ok())
1010 .collect();
1011
1012 Ok(tasks)
1013 })
1014 }
1015
1016 pub fn set_thought(
1018 &self,
1019 agent_id: &str,
1020 thought: Option<String>,
1021 task_ids: Option<Vec<String>>,
1022 ) -> Result<i32> {
1023 let now = now_ms();
1024
1025 self.with_conn(|conn| {
1026 let updated = if let Some(ids) = task_ids {
1027 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1028 let sql = format!(
1029 "UPDATE tasks SET current_thought = ?, updated_at = ?
1030 WHERE worker_id = ? AND id IN ({})",
1031 placeholders.join(", ")
1032 );
1033
1034 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1035 params_vec.push(Box::new(thought.clone()));
1036 params_vec.push(Box::new(now));
1037 params_vec.push(Box::new(agent_id.to_string()));
1038 for id in &ids {
1039 params_vec.push(Box::new(id.clone()));
1040 }
1041
1042 let params_refs: Vec<&dyn rusqlite::ToSql> =
1043 params_vec.iter().map(|b| b.as_ref()).collect();
1044 conn.execute(&sql, params_refs.as_slice())?
1045 } else {
1046 conn.execute(
1047 "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1048 params![thought, now, agent_id],
1049 )?
1050 };
1051
1052 Ok(updated as i32)
1053 })
1054 }
1055
1056 pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1058 let now = now_ms();
1059
1060 self.with_conn(|conn| {
1061 conn.execute(
1062 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1063 WHERE id = ?3",
1064 params![duration_ms, now, task_id],
1065 )?;
1066
1067 let total: i64 = conn.query_row(
1068 "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1069 params![task_id],
1070 |row| row.get(0),
1071 )?;
1072
1073 Ok(total)
1074 })
1075 }
1076
1077 pub fn log_metrics(
1080 &self,
1081 task_id: &str,
1082 cost_usd: Option<f64>,
1083 values: &[i64],
1084 ) -> Result<Task> {
1085 let now = now_ms();
1086
1087 self.with_conn(|conn| {
1088 let task =
1089 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1090
1091 let mut new_metrics = task.metrics;
1093 for (i, &val) in values.iter().take(8).enumerate() {
1094 new_metrics[i] += val;
1095 }
1096
1097 let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1098
1099 conn.execute(
1100 "UPDATE tasks SET
1101 metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1102 metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1103 cost_usd = ?9, updated_at = ?10
1104 WHERE id = ?11",
1105 params![
1106 new_metrics[0],
1107 new_metrics[1],
1108 new_metrics[2],
1109 new_metrics[3],
1110 new_metrics[4],
1111 new_metrics[5],
1112 new_metrics[6],
1113 new_metrics[7],
1114 new_cost_usd,
1115 now,
1116 task_id,
1117 ],
1118 )?;
1119
1120 Ok(Task {
1121 cost_usd: new_cost_usd,
1122 metrics: new_metrics,
1123 updated_at: now,
1124 ..task
1125 })
1126 })
1127 }
1128
1129 pub fn claim_task(
1132 &self,
1133 task_id: &str,
1134 agent_id: &str,
1135 states_config: &StatesConfig,
1136 ) -> Result<Task> {
1137 let now = now_ms();
1138
1139 let claim_status = states_config
1141 .definitions
1142 .iter()
1143 .find(|(_, def)| def.timed)
1144 .map(|(name, _)| name.as_str())
1145 .unwrap_or("in_progress");
1146
1147 self.with_conn(|conn| {
1148 let task =
1150 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1151
1152 if task.worker_id.is_some() {
1154 return Err(anyhow!("Task is already claimed"));
1155 }
1156
1157 if !states_config.is_valid_transition(&task.status, claim_status) {
1159 let exits = states_config.get_exits(&task.status);
1160 return Err(anyhow!(
1161 "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1162 task.status,
1163 exits
1164 ));
1165 }
1166
1167 let agent =
1169 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1170
1171 if !task.needed_tags.is_empty() {
1173 for needed in &task.needed_tags {
1174 if !agent.tags.contains(needed) {
1175 return Err(anyhow!("Agent missing required tag: {}", needed));
1176 }
1177 }
1178 }
1179
1180 if !task.wanted_tags.is_empty() {
1182 let has_any = task
1183 .wanted_tags
1184 .iter()
1185 .any(|wanted| agent.tags.contains(wanted));
1186 if !has_any {
1187 return Err(anyhow!("Agent has none of the wanted tags"));
1188 }
1189 }
1190
1191 conn.execute(
1192 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1193 WHERE id = ?6",
1194 params![agent_id, now, claim_status, now, now, task_id,],
1195 )?;
1196
1197 record_state_transition(
1199 conn,
1200 task_id,
1201 claim_status,
1202 Some(agent_id),
1203 None,
1204 states_config,
1205 )?;
1206
1207 conn.execute(
1209 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1210 params![now, agent_id],
1211 )?;
1212
1213 Ok(Task {
1214 worker_id: Some(agent_id.to_string()),
1215 claimed_at: Some(now),
1216 status: claim_status.to_string(),
1217 started_at: Some(now),
1218 updated_at: now,
1219 ..task
1220 })
1221 })
1222 }
1223
1224 pub fn release_task(
1226 &self,
1227 task_id: &str,
1228 agent_id: &str,
1229 states_config: &StatesConfig,
1230 ) -> Result<()> {
1231 let now = now_ms();
1232 let release_status = &states_config.initial;
1233
1234 self.with_conn(|conn| {
1235 let task =
1236 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1237
1238 if task.worker_id.as_deref() != Some(agent_id) {
1239 return Err(anyhow!("Task is not owned by this agent"));
1240 }
1241
1242 record_state_transition(
1244 conn,
1245 task_id,
1246 release_status,
1247 Some(agent_id),
1248 None,
1249 states_config,
1250 )?;
1251
1252 conn.execute(
1253 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1254 WHERE id = ?3",
1255 params![release_status, now, task_id],
1256 )?;
1257
1258 Ok(())
1259 })
1260 }
1261
1262 pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1264 let now = now_ms();
1265 let release_status = &states_config.initial;
1266
1267 self.with_conn(|conn| {
1268 let task =
1269 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1270
1271 record_state_transition(
1273 conn,
1274 task_id,
1275 release_status,
1276 task.worker_id.as_deref(),
1277 None,
1278 states_config,
1279 )?;
1280
1281 conn.execute(
1282 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1283 WHERE id = ?3",
1284 params![release_status, now, task_id],
1285 )?;
1286
1287 Ok(())
1288 })
1289 }
1290
1291 pub fn force_claim_task(
1293 &self,
1294 task_id: &str,
1295 agent_id: &str,
1296 states_config: &StatesConfig,
1297 ) -> Result<Task> {
1298 let now = now_ms();
1299
1300 let claim_status = states_config
1302 .definitions
1303 .iter()
1304 .find(|(_, def)| def.timed)
1305 .map(|(name, _)| name.as_str())
1306 .unwrap_or("in_progress");
1307
1308 self.with_conn(|conn| {
1309 let task =
1311 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1312
1313 let agent =
1315 get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1316
1317 if !task.needed_tags.is_empty() {
1319 for needed in &task.needed_tags {
1320 if !agent.tags.contains(needed) {
1321 return Err(anyhow!("Agent missing required tag: {}", needed));
1322 }
1323 }
1324 }
1325
1326 if !task.wanted_tags.is_empty() {
1328 let has_any = task
1329 .wanted_tags
1330 .iter()
1331 .any(|wanted| agent.tags.contains(wanted));
1332 if !has_any {
1333 return Err(anyhow!("Agent has none of the wanted tags"));
1334 }
1335 }
1336
1337 conn.execute(
1338 "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1339 WHERE id = ?6",
1340 params![agent_id, now, claim_status, now, now, task_id,],
1341 )?;
1342
1343 record_state_transition(
1345 conn,
1346 task_id,
1347 claim_status,
1348 Some(agent_id),
1349 None,
1350 states_config,
1351 )?;
1352
1353 conn.execute(
1355 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1356 params![now, agent_id],
1357 )?;
1358
1359 Ok(Task {
1360 worker_id: Some(agent_id.to_string()),
1361 claimed_at: Some(now),
1362 status: claim_status.to_string(),
1363 started_at: task.started_at.or(Some(now)),
1364 updated_at: now,
1365 ..task
1366 })
1367 })
1368 }
1369
1370 pub fn release_task_with_state(
1372 &self,
1373 task_id: &str,
1374 agent_id: &str,
1375 state: &str,
1376 states_config: &StatesConfig,
1377 ) -> Result<()> {
1378 let now = now_ms();
1379
1380 self.with_conn(|conn| {
1381 let task =
1382 get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1383
1384 if task.worker_id.as_deref() != Some(agent_id) {
1385 return Err(anyhow!("Task is not owned by this agent"));
1386 }
1387
1388 if !states_config.is_valid_state(state) {
1390 return Err(anyhow!(
1391 "Invalid state '{}'. Valid states: {:?}",
1392 state,
1393 states_config.state_names()
1394 ));
1395 }
1396
1397 if !states_config.is_valid_transition(&task.status, state) {
1399 let exits = states_config.get_exits(&task.status);
1400 return Err(anyhow!(
1401 "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1402 task.status,
1403 state,
1404 exits
1405 ));
1406 }
1407
1408 let completed_at = if states_config.is_terminal_state(state) {
1410 Some(now)
1411 } else {
1412 None
1413 };
1414
1415 record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1417
1418 conn.execute(
1419 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1420 WHERE id = ?4",
1421 params![state, completed_at, now, task_id],
1422 )?;
1423
1424 Ok(())
1425 })
1426 }
1427
1428 pub fn force_release_stale(
1430 &self,
1431 timeout_seconds: i64,
1432 states_config: &StatesConfig,
1433 ) -> Result<i32> {
1434 let now = now_ms();
1435 let cutoff = now - (timeout_seconds * 1000);
1436 let release_status = &states_config.initial;
1437
1438 self.with_conn(|conn| {
1439 let updated = conn.execute(
1440 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1441 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1442 params![release_status, now, cutoff],
1443 )?;
1444
1445 Ok(updated as i32)
1446 })
1447 }
1448
1449 pub fn complete_task(
1453 &self,
1454 task_id: &str,
1455 agent_id: &str,
1456 states_config: &StatesConfig,
1457 ) -> Result<Task> {
1458 let now = now_ms();
1459
1460 let complete_status = if states_config.definitions.contains_key("completed") {
1462 "completed"
1463 } else {
1464 states_config
1466 .definitions
1467 .iter()
1468 .find(|(_, def)| def.exits.is_empty())
1469 .map(|(name, _)| name.as_str())
1470 .unwrap_or("completed")
1471 };
1472
1473 self.with_conn_mut(|conn| {
1474 let tx = conn.transaction()?;
1475
1476 let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1478 let task = stmt
1479 .query_row(params![task_id], parse_task_row)
1480 .map_err(|_| anyhow!("Task not found"))?;
1481 drop(stmt);
1482
1483 if task.worker_id.as_deref() != Some(agent_id) {
1485 return Err(anyhow!("Task is not owned by this agent"));
1486 }
1487
1488 let incomplete_children: i32 = tx.query_row(
1490 "SELECT COUNT(*) FROM dependencies d
1491 INNER JOIN tasks child ON d.to_task_id = child.id
1492 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1493 AND child.status IN (SELECT value FROM json_each(?2))",
1494 params![
1495 task_id,
1496 serde_json::to_string(&states_config.blocking_states)?
1497 ],
1498 |row| row.get(0),
1499 )?;
1500
1501 if incomplete_children > 0 {
1502 return Err(anyhow!(
1503 "Cannot complete task: {} child task(s) are not complete",
1504 incomplete_children
1505 ));
1506 }
1507
1508 if !states_config.is_valid_transition(&task.status, complete_status) {
1510 let exits = states_config.get_exits(&task.status);
1511 return Err(anyhow!(
1512 "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1513 task.status,
1514 exits
1515 ));
1516 }
1517
1518 record_state_transition(
1520 &tx,
1521 task_id,
1522 complete_status,
1523 Some(agent_id),
1524 None,
1525 states_config,
1526 )?;
1527
1528 tx.execute(
1530 "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1531 worker_id = NULL, claimed_at = NULL
1532 WHERE id = ?4",
1533 params![complete_status, now, now, task_id],
1534 )?;
1535
1536 tx.execute(
1538 "DELETE FROM file_locks WHERE task_id = ?1",
1539 params![task_id],
1540 )?;
1541
1542 tx.execute(
1544 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1545 params![now, agent_id],
1546 )?;
1547
1548 tx.commit()?;
1549
1550 Ok(Task {
1551 status: complete_status.to_string(),
1552 completed_at: Some(now),
1553 updated_at: now,
1554 worker_id: None,
1555 claimed_at: None,
1556 ..task
1557 })
1558 })
1559 }
1560
1561 pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1563 self.with_conn(|conn| {
1564 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1565 let tasks = stmt
1566 .query_map([], parse_task_row)?
1567 .filter_map(|r| r.ok())
1568 .collect();
1569 Ok(tasks)
1570 })
1571 }
1572
1573 #[allow(dead_code)]
1575 pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1576 self.with_conn(|conn| {
1577 let mut stmt =
1578 conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1579 let tasks = stmt
1580 .query_map(params![status], parse_task_row)?
1581 .filter_map(|r| r.ok())
1582 .collect();
1583 Ok(tasks)
1584 })
1585 }
1586
1587 pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1589 self.with_conn(|conn| {
1590 let tasks = if let Some(aid) = agent_id {
1591 let mut stmt = conn
1592 .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1593 stmt.query_map(params![aid], parse_task_row)?
1594 .filter_map(|r| r.ok())
1595 .collect()
1596 } else {
1597 let mut stmt = conn.prepare(
1598 "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1599 )?;
1600 stmt.query_map([], parse_task_row)?
1601 .filter_map(|r| r.ok())
1602 .collect()
1603 };
1604
1605 Ok(tasks)
1606 })
1607 }
1608}
1609
1610fn create_tree_recursive(
1615 conn: &Connection,
1616 input: &TaskTreeInput,
1617 parent_id: Option<&str>,
1618 prev_sibling_id: Option<&str>,
1619 child_type: Option<&str>,
1620 sibling_type: Option<&str>,
1621 all_ids: &mut Vec<String>,
1622 states_config: &StatesConfig,
1623) -> Result<String> {
1624 let task_id = if let Some(ref ref_id) = input.ref_id {
1626 let exists: bool = conn.query_row(
1628 "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1629 params![ref_id],
1630 |row| row.get(0),
1631 )?;
1632 if !exists {
1633 return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1634 }
1635 ref_id.clone()
1636 } else {
1637 let generated_id = Uuid::now_v7().to_string();
1639 let task_id = input.id.clone().unwrap_or(generated_id);
1640 let now = now_ms();
1641 let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1642 let initial_status = &states_config.initial;
1643
1644 let needed_tags = input.needed_tags.clone().unwrap_or_default();
1645 let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1646 let tags = input.tags.clone().unwrap_or_default();
1647 let needed_tags_json = serde_json::to_string(&needed_tags)?;
1648 let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1649 let tags_json = serde_json::to_string(&tags)?;
1650
1651 conn.execute(
1652 "INSERT INTO tasks (
1653 id, title, description, status, priority,
1654 needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1655 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1656 params![
1657 &task_id,
1658 &input.title,
1659 &input.description,
1660 initial_status,
1661 priority.to_string(),
1662 needed_tags_json,
1663 wanted_tags_json,
1664 tags_json,
1665 input.points,
1666 input.time_estimate_ms,
1667 now,
1668 now,
1669 ],
1670 )?;
1671
1672 record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1674
1675 sync_task_tags(conn, &task_id, &tags)?;
1677 sync_needed_tags(conn, &task_id, &needed_tags)?;
1678 sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1679
1680 task_id
1681 };
1682
1683 if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1685 Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1686 }
1687
1688 if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1690 Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1691 }
1692
1693 all_ids.push(task_id.clone());
1694
1695 let mut prev_child_id: Option<String> = None;
1697 for child in input.children.iter() {
1698 let child_id = create_tree_recursive(
1699 conn,
1700 child,
1701 Some(&task_id),
1702 prev_child_id.as_deref(),
1703 child_type,
1704 sibling_type,
1705 all_ids,
1706 states_config,
1707 )?;
1708 prev_child_id = Some(child_id);
1709 }
1710
1711 Ok(task_id)
1712}