1use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{anyhow, Result};
7use rusqlite::{params, Connection, OptionalExtension};
8use std::collections::{HashSet, VecDeque};
9
10#[derive(Debug)]
12pub struct RelinkResult {
13 pub unlinked: Vec<(String, String)>,
15 pub linked: Vec<(String, String)>,
17}
18
19fn would_create_cycle_in_tx(
21 tx: &rusqlite::Transaction,
22 from_task_id: &str,
23 to_task_id: &str,
24 dep_type: &str,
25 deps_config: &DependenciesConfig,
26) -> Result<bool> {
27 let def = deps_config.get_definition(dep_type).unwrap();
28
29 let mut visited: HashSet<String> = HashSet::new();
32 let mut queue: VecDeque<String> = VecDeque::new();
33 queue.push_back(to_task_id.to_string());
34
35 while let Some(current) = queue.pop_front() {
36 if current == from_task_id {
37 return Ok(true); }
39
40 if visited.contains(¤t) {
41 continue;
42 }
43 visited.insert(current.clone());
44
45 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
47 let mut stmt = tx.prepare(
49 "SELECT to_task_id FROM dependencies d
50 JOIN (SELECT value FROM json_each(?1)) types
51 WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
52 )?;
53 let vertical_types: Vec<&str> = deps_config.vertical_types();
54 let types_json = serde_json::to_string(&vertical_types)?;
55 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
56 .filter_map(|r| r.ok())
57 .collect()
58 } else {
59 let mut stmt = tx.prepare(
61 "SELECT to_task_id FROM dependencies d
62 JOIN (SELECT value FROM json_each(?1)) types
63 WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
64 )?;
65 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
66 let types_json = serde_json::to_string(&start_blocking)?;
67 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
68 .filter_map(|r| r.ok())
69 .collect()
70 };
71
72 for dep in deps {
73 if !visited.contains(&dep) {
74 queue.push_back(dep);
75 }
76 }
77 }
78
79 Ok(false)
80}
81
82fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
85 let field = match sort_by {
86 Some("priority") => "CAST(t.priority AS INTEGER)",
87 Some("created_at") => "t.created_at",
88 Some("updated_at") => "t.updated_at",
89 _ => "t.created_at", };
91
92 let order = match sort_order {
93 Some("asc") => "ASC",
94 Some("desc") => "DESC",
95 _ => {
96 "DESC"
98 }
99 };
100
101 format!("{} {}", field, order)
102}
103
104impl Database {
105 pub fn add_dependency(
107 &self,
108 from_task_id: &str,
109 to_task_id: &str,
110 dep_type: &str,
111 deps_config: &DependenciesConfig,
112 ) -> Result<()> {
113 if !deps_config.is_valid_dep_type(dep_type) {
115 return Err(anyhow!(
116 "Invalid dependency type '{}'. Valid types: {:?}",
117 dep_type,
118 deps_config.dep_type_names()
119 ));
120 }
121
122 let def = deps_config.get_definition(dep_type).unwrap();
124 if def.display == DependencyDisplay::Vertical {
125 if let Some(existing_parent) = self.get_parent(to_task_id)? {
126 if existing_parent != from_task_id {
127 return Err(anyhow!(
128 "Task {} already has parent {}",
129 to_task_id,
130 existing_parent
131 ));
132 }
133 }
134 }
135
136 if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
138 return Err(anyhow!("Adding this dependency would create a cycle"));
139 }
140
141 self.with_conn(|conn| {
142 conn.execute(
143 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
144 params![from_task_id, to_task_id, dep_type],
145 )?;
146 Ok(())
147 })
148 }
149
150 pub fn would_create_cycle(
154 &self,
155 from_task_id: &str,
156 to_task_id: &str,
157 dep_type: &str,
158 deps_config: &DependenciesConfig,
159 ) -> Result<bool> {
160 let def = deps_config.get_definition(dep_type).unwrap();
161
162 self.with_conn(|conn| {
163 let mut visited: HashSet<String> = HashSet::new();
166 let mut queue: VecDeque<String> = VecDeque::new();
167 queue.push_back(to_task_id.to_string());
168
169 while let Some(current) = queue.pop_front() {
170 if current == from_task_id {
171 return Ok(true); }
173
174 if visited.contains(¤t) {
175 continue;
176 }
177 visited.insert(current.clone());
178
179 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
181 let mut stmt = conn.prepare(
183 "SELECT to_task_id FROM dependencies d
184 JOIN (SELECT value FROM json_each(?1)) types
185 WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
186 )?;
187 let vertical_types: Vec<&str> = deps_config.vertical_types();
188 let types_json = serde_json::to_string(&vertical_types)?;
189 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
190 .filter_map(|r| r.ok())
191 .collect()
192 } else {
193 let mut stmt = conn.prepare(
195 "SELECT to_task_id FROM dependencies d
196 JOIN (SELECT value FROM json_each(?1)) types
197 WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
198 )?;
199 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
200 let types_json = serde_json::to_string(&start_blocking)?;
201 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
202 .filter_map(|r| r.ok())
203 .collect()
204 };
205
206 for dep in deps {
207 if !visited.contains(&dep) {
208 queue.push_back(dep);
209 }
210 }
211 }
212
213 Ok(false)
214 })
215 }
216
217 pub fn remove_dependency(
219 &self,
220 from_task_id: &str,
221 to_task_id: &str,
222 dep_type: &str,
223 ) -> Result<bool> {
224 self.with_conn(|conn| {
225 let rows = conn.execute(
226 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
227 params![from_task_id, to_task_id, dep_type],
228 )?;
229 Ok(rows > 0)
230 })
231 }
232
233 pub fn remove_all_outgoing_dependencies(
236 &self,
237 from_task_id: &str,
238 dep_type: &str,
239 ) -> Result<Vec<Dependency>> {
240 self.with_conn_mut(|conn| {
241 let tx = conn.transaction()?;
242
243 let deps: Vec<Dependency> = {
245 let mut stmt = tx.prepare(
246 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
247 )?;
248 stmt
249 .query_map(params![from_task_id, dep_type], |row| {
250 Ok(Dependency {
251 from_task_id: row.get(0)?,
252 to_task_id: row.get(1)?,
253 dep_type: row.get(2)?,
254 })
255 })?
256 .filter_map(|r| r.ok())
257 .collect()
258 };
259
260 tx.execute(
262 "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
263 params![from_task_id, dep_type],
264 )?;
265
266 tx.commit()?;
267 Ok(deps)
268 })
269 }
270
271 pub fn remove_all_incoming_dependencies(
274 &self,
275 to_task_id: &str,
276 dep_type: &str,
277 ) -> Result<Vec<Dependency>> {
278 self.with_conn_mut(|conn| {
279 let tx = conn.transaction()?;
280
281 let deps: Vec<Dependency> = {
283 let mut stmt = tx.prepare(
284 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
285 )?;
286 stmt
287 .query_map(params![to_task_id, dep_type], |row| {
288 Ok(Dependency {
289 from_task_id: row.get(0)?,
290 to_task_id: row.get(1)?,
291 dep_type: row.get(2)?,
292 })
293 })?
294 .filter_map(|r| r.ok())
295 .collect()
296 };
297
298 tx.execute(
300 "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
301 params![to_task_id, dep_type],
302 )?;
303
304 tx.commit()?;
305 Ok(deps)
306 })
307 }
308
309 pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
311 self.with_conn(|conn| {
312 let mut stmt =
313 conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
314
315 let deps = stmt
316 .query_map([], |row| {
317 let from: String = row.get(0)?;
318 let to: String = row.get(1)?;
319 let dep_type: String = row.get(2)?;
320 Ok(Dependency {
321 from_task_id: from,
322 to_task_id: to,
323 dep_type,
324 })
325 })?
326 .filter_map(|r| r.ok())
327 .collect();
328
329 Ok(deps)
330 })
331 }
332
333 pub fn get_dependencies_by_type(
335 &self,
336 task_id: &str,
337 dep_type: &str,
338 direction: &str,
339 ) -> Result<Vec<Dependency>> {
340 self.with_conn(|conn| {
341 let sql = if direction == "incoming" {
342 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
343 } else {
344 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
345 };
346
347 let mut stmt = conn.prepare(sql)?;
348
349 let deps = stmt
350 .query_map(params![task_id, dep_type], |row| {
351 let from: String = row.get(0)?;
352 let to: String = row.get(1)?;
353 let dep_type: String = row.get(2)?;
354 Ok(Dependency {
355 from_task_id: from,
356 to_task_id: to,
357 dep_type,
358 })
359 })?
360 .filter_map(|r| r.ok())
361 .collect();
362
363 Ok(deps)
364 })
365 }
366
367 pub fn get_start_blockers(
369 &self,
370 task_id: &str,
371 deps_config: &DependenciesConfig,
372 ) -> Result<Vec<String>> {
373 let start_blocking_types = deps_config.start_blocking_types();
374 if start_blocking_types.is_empty() {
375 return Ok(vec![]);
376 }
377
378 self.with_conn(|conn| {
379 let placeholders: String = start_blocking_types
380 .iter()
381 .enumerate()
382 .map(|(i, _)| format!("?{}", i + 2))
383 .collect::<Vec<_>>()
384 .join(", ");
385
386 let sql = format!(
387 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
388 placeholders
389 );
390
391 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
392 params_vec.push(Box::new(task_id.to_string()));
393 for t in &start_blocking_types {
394 params_vec.push(Box::new(t.to_string()));
395 }
396 let params_refs: Vec<&dyn rusqlite::ToSql> =
397 params_vec.iter().map(|b| b.as_ref()).collect();
398
399 let mut stmt = conn.prepare(&sql)?;
400 let blockers = stmt
401 .query_map(params_refs.as_slice(), |row| {
402 let id: String = row.get(0)?;
403 Ok(id)
404 })?
405 .filter_map(|r| r.ok())
406 .collect();
407
408 Ok(blockers)
409 })
410 }
411
412 pub fn get_completion_blockers(
415 &self,
416 task_id: &str,
417 deps_config: &DependenciesConfig,
418 ) -> Result<Vec<String>> {
419 let completion_blocking_types = deps_config.completion_blocking_types();
420 if completion_blocking_types.is_empty() {
421 return Ok(vec![]);
422 }
423
424 self.with_conn(|conn| {
425 let placeholders: String = completion_blocking_types
426 .iter()
427 .enumerate()
428 .map(|(i, _)| format!("?{}", i + 2))
429 .collect::<Vec<_>>()
430 .join(", ");
431
432 let sql = format!(
435 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
436 placeholders
437 );
438
439 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
440 params_vec.push(Box::new(task_id.to_string()));
441 for t in &completion_blocking_types {
442 params_vec.push(Box::new(t.to_string()));
443 }
444 let params_refs: Vec<&dyn rusqlite::ToSql> =
445 params_vec.iter().map(|b| b.as_ref()).collect();
446
447 let mut stmt = conn.prepare(&sql)?;
448 let blockers = stmt
449 .query_map(params_refs.as_slice(), |row| {
450 let id: String = row.get(0)?;
451 Ok(id)
452 })?
453 .filter_map(|r| r.ok())
454 .collect();
455
456 Ok(blockers)
457 })
458 }
459
460 pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
462 self.with_conn(|conn| {
463 let result: Result<String, rusqlite::Error> = conn.query_row(
464 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
465 params![task_id],
466 |row| row.get(0),
467 );
468
469 match result {
470 Ok(parent_id) => Ok(Some(parent_id)),
471 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
472 Err(e) => Err(e.into()),
473 }
474 })
475 }
476
477 pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
479 self.with_conn(|conn| {
480 let mut stmt = conn.prepare(
481 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
482 )?;
483
484 let children = stmt
485 .query_map(params![task_id], |row| {
486 let id: String = row.get(0)?;
487 Ok(id)
488 })?
489 .filter_map(|r| r.ok())
490 .collect();
491
492 Ok(children)
493 })
494 }
495
496 pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
499 self.with_conn(|conn| {
500 let mut stmt = conn.prepare(
501 "SELECT from_task_id FROM dependencies
502 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
503 )?;
504
505 let blockers = stmt
506 .query_map(params![task_id], |row| {
507 let id: String = row.get(0)?;
508 Ok(id)
509 })?
510 .filter_map(|r| r.ok())
511 .collect();
512
513 Ok(blockers)
514 })
515 }
516
517 #[allow(dead_code)]
519 pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
520 self.with_conn(|conn| {
521 let mut stmt = conn.prepare(
522 "SELECT to_task_id FROM dependencies
523 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
524 )?;
525
526 let blocking = stmt
527 .query_map(params![task_id], |row| {
528 let id: String = row.get(0)?;
529 Ok(id)
530 })?
531 .filter_map(|r| r.ok())
532 .collect();
533
534 Ok(blocking)
535 })
536 }
537
538 pub fn get_blocked_tasks(
542 &self,
543 states_config: &StatesConfig,
544 deps_config: &DependenciesConfig,
545 sort_by: Option<&str>,
546 sort_order: Option<&str>,
547 ) -> Result<Vec<Task>> {
548 let start_blocking_types = deps_config.start_blocking_types();
549 if start_blocking_types.is_empty() {
550 return Ok(vec![]);
551 }
552
553 self.with_conn(|conn| {
554 let state_placeholders: Vec<String> = states_config
556 .blocking_states
557 .iter()
558 .enumerate()
559 .map(|(i, _)| format!("?{}", i + 2))
560 .collect();
561 let state_clause = state_placeholders.join(", ");
562
563 let type_start = states_config.blocking_states.len() + 2;
565 let type_placeholders: Vec<String> = start_blocking_types
566 .iter()
567 .enumerate()
568 .map(|(i, _)| format!("?{}", type_start + i))
569 .collect();
570 let type_clause = type_placeholders.join(", ");
571
572 let order_clause = build_order_clause(sort_by, sort_order);
574
575 let sql = format!(
576 "SELECT DISTINCT t.*
577 FROM tasks t
578 INNER JOIN dependencies d ON t.id = d.to_task_id
579 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
580 WHERE d.dep_type IN ({})
581 AND blocker.status IN ({})
582 AND t.status = ?1
583 AND t.deleted_at IS NULL
584 ORDER BY {}",
585 type_clause, state_clause, order_clause
586 );
587
588 let mut stmt = conn.prepare(&sql)?;
589
590 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
592 params_vec.push(Box::new(states_config.initial.clone()));
593 for state in &states_config.blocking_states {
594 params_vec.push(Box::new(state.clone()));
595 }
596 for t in &start_blocking_types {
597 params_vec.push(Box::new(t.to_string()));
598 }
599 let params_refs: Vec<&dyn rusqlite::ToSql> =
600 params_vec.iter().map(|b| b.as_ref()).collect();
601
602 let tasks = stmt
603 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
604 .filter_map(|r| r.ok())
605 .collect();
606
607 Ok(tasks)
608 })
609 }
610
611 pub fn get_ready_tasks(
616 &self,
617 agent_id: Option<&str>,
618 states_config: &StatesConfig,
619 deps_config: &DependenciesConfig,
620 sort_by: Option<&str>,
621 sort_order: Option<&str>,
622 ) -> Result<Vec<Task>> {
623 let start_blocking_types = deps_config.start_blocking_types();
624
625 let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
627 Some(self.get_agent_tags(aid)?)
628 } else {
629 None
630 };
631
632 self.with_conn(|conn| {
633 let state_placeholders: Vec<String> = states_config
635 .blocking_states
636 .iter()
637 .enumerate()
638 .map(|(i, _)| format!("?{}", i + 2))
639 .collect();
640 let state_clause = state_placeholders.join(", ");
641
642 let type_start = states_config.blocking_states.len() + 2;
644 let type_placeholders: Vec<String> = start_blocking_types
645 .iter()
646 .enumerate()
647 .map(|(i, _)| format!("?{}", type_start + i))
648 .collect();
649 let type_clause = type_placeholders.join(", ");
650
651 let order_clause = if sort_by.is_some() {
653 build_order_clause(sort_by, sort_order)
654 } else {
655 "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
657 };
658
659 let mut param_idx = type_start + start_blocking_types.len();
661
662 let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
664 let needed_placeholders: Vec<String> = tags
668 .iter()
669 .enumerate()
670 .map(|(i, _)| format!("?{}", param_idx + i))
671 .collect();
672 param_idx += tags.len();
673
674 let needed_clause = if needed_placeholders.is_empty() {
675 "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)".to_string()
677 } else {
678 format!(
680 "AND (
681 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
682 OR (
683 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
684 ) = (
685 SELECT COUNT(*) FROM task_needed_tags
686 WHERE task_id = t.id AND tag IN ({})
687 )
688 )",
689 needed_placeholders.join(", ")
690 )
691 };
692
693 let wanted_placeholders: Vec<String> = tags
695 .iter()
696 .enumerate()
697 .map(|(i, _)| format!("?{}", param_idx + i))
698 .collect();
699
700 let wanted_clause = if wanted_placeholders.is_empty() {
701 "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)".to_string()
703 } else {
704 format!(
706 "AND (
707 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
708 OR EXISTS (
709 SELECT 1 FROM task_wanted_tags
710 WHERE task_id = t.id AND tag IN ({})
711 )
712 )",
713 wanted_placeholders.join(", ")
714 )
715 };
716
717 (needed_clause, wanted_clause)
718 } else {
719 (String::new(), String::new())
720 };
721
722 let sql = format!(
723 "SELECT t.*
724 FROM tasks t
725 WHERE t.status = ?1
726 AND t.worker_id IS NULL
727 AND t.deleted_at IS NULL
728 AND NOT EXISTS (
729 SELECT 1 FROM dependencies d
730 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
731 WHERE d.to_task_id = t.id
732 AND d.dep_type IN ({})
733 AND blocker.status IN ({})
734 )
735 {}
736 {}
737 ORDER BY {}",
738 type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
739 );
740
741 let mut stmt = conn.prepare(&sql)?;
742
743 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
745 params_vec.push(Box::new(states_config.initial.clone()));
746 for state in &states_config.blocking_states {
747 params_vec.push(Box::new(state.clone()));
748 }
749 for t in &start_blocking_types {
750 params_vec.push(Box::new(t.to_string()));
751 }
752 if let Some(ref tags) = agent_tags {
754 for tag in tags {
755 params_vec.push(Box::new(tag.clone()));
756 }
757 for tag in tags {
758 params_vec.push(Box::new(tag.clone()));
759 }
760 }
761 let params_refs: Vec<&dyn rusqlite::ToSql> =
762 params_vec.iter().map(|b| b.as_ref()).collect();
763
764 let tasks: Vec<Task> = stmt
765 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
766 .filter_map(|r| r.ok())
767 .collect();
768
769 Ok(tasks)
770 })
771 }
772
773 #[allow(dead_code)]
775 pub fn has_unmet_start_dependencies(
776 &self,
777 task_id: &str,
778 states_config: &StatesConfig,
779 deps_config: &DependenciesConfig,
780 ) -> Result<bool> {
781 let start_blocking_types = deps_config.start_blocking_types();
782 if start_blocking_types.is_empty() {
783 return Ok(false);
784 }
785
786 self.with_conn(|conn| {
787 let state_placeholders: Vec<String> = states_config
789 .blocking_states
790 .iter()
791 .enumerate()
792 .map(|(i, _)| format!("?{}", i + 2))
793 .collect();
794 let state_clause = state_placeholders.join(", ");
795
796 let type_start = states_config.blocking_states.len() + 2;
798 let type_placeholders: Vec<String> = start_blocking_types
799 .iter()
800 .enumerate()
801 .map(|(i, _)| format!("?{}", type_start + i))
802 .collect();
803 let type_clause = type_placeholders.join(", ");
804
805 let sql = format!(
806 "SELECT COUNT(*) FROM dependencies d
807 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
808 WHERE d.to_task_id = ?1
809 AND d.dep_type IN ({})
810 AND blocker.status IN ({})",
811 type_clause, state_clause
812 );
813
814 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
815 params_vec.push(Box::new(task_id.to_string()));
816 for state in &states_config.blocking_states {
817 params_vec.push(Box::new(state.clone()));
818 }
819 for t in &start_blocking_types {
820 params_vec.push(Box::new(t.to_string()));
821 }
822 let params_refs: Vec<&dyn rusqlite::ToSql> =
823 params_vec.iter().map(|b| b.as_ref()).collect();
824
825 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
826
827 Ok(count > 0)
828 })
829 }
830
831 pub fn has_incomplete_children(
833 &self,
834 task_id: &str,
835 states_config: &StatesConfig,
836 ) -> Result<bool> {
837 self.with_conn(|conn| {
838 let state_placeholders: Vec<String> = states_config
840 .blocking_states
841 .iter()
842 .enumerate()
843 .map(|(i, _)| format!("?{}", i + 2))
844 .collect();
845 let state_clause = state_placeholders.join(", ");
846
847 let sql = format!(
848 "SELECT COUNT(*) FROM dependencies d
849 INNER JOIN tasks child ON d.to_task_id = child.id
850 WHERE d.from_task_id = ?1
851 AND d.dep_type = 'contains'
852 AND child.status IN ({})",
853 state_clause
854 );
855
856 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
857 params_vec.push(Box::new(task_id.to_string()));
858 for state in &states_config.blocking_states {
859 params_vec.push(Box::new(state.clone()));
860 }
861 let params_refs: Vec<&dyn rusqlite::ToSql> =
862 params_vec.iter().map(|b| b.as_ref()).collect();
863
864 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
865
866 Ok(count > 0)
867 })
868 }
869
870 pub fn list_tasks_with_tag_filters(
876 &self,
877 status: Option<Vec<String>>,
878 owner: Option<&str>,
879 parent_id: Option<Option<&str>>,
880 tags_any: Option<Vec<String>>,
881 tags_all: Option<Vec<String>>,
882 qualified_for_agent_tags: Option<Vec<String>>,
883 limit: Option<i32>,
884 sort_by: Option<&str>,
885 sort_order: Option<&str>,
886 ) -> Result<Vec<Task>> {
887 self.with_conn(|conn| {
888 let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
889 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
890 let mut param_idx = 1;
891
892 if let Some(ref statuses) = status {
894 if statuses.len() == 1 {
895 sql.push_str(&format!(" AND t.status = ?{}", param_idx));
896 params_vec.push(Box::new(statuses[0].clone()));
897 param_idx += 1;
898 } else if statuses.len() > 1 {
899 let placeholders: Vec<String> = statuses
900 .iter()
901 .enumerate()
902 .map(|(i, _)| format!("?{}", param_idx + i))
903 .collect();
904 sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
905 for s in statuses {
906 params_vec.push(Box::new(s.clone()));
907 }
908 param_idx += statuses.len();
909 }
910 }
911
912 if let Some(o) = owner {
914 sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
915 params_vec.push(Box::new(o.to_string()));
916 param_idx += 1;
917 }
918
919 if let Some(p) = parent_id {
921 match p {
922 Some(pid) => {
923 sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
924 params_vec.push(Box::new(pid.to_string()));
925 param_idx += 1;
926 }
927 None => {
928 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
930 }
931 }
932 }
933
934 if let Some(ref any_tags) = tags_any {
936 if !any_tags.is_empty() {
937 let placeholders: Vec<String> = any_tags
938 .iter()
939 .enumerate()
940 .map(|(i, _)| format!("?{}", param_idx + i))
941 .collect();
942 sql.push_str(&format!(
943 " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
944 placeholders.join(", ")
945 ));
946 for tag in any_tags {
947 params_vec.push(Box::new(tag.clone()));
948 }
949 param_idx += any_tags.len();
950 }
951 }
952
953 if let Some(ref all_tags) = tags_all {
955 if !all_tags.is_empty() {
956 let placeholders: Vec<String> = all_tags
957 .iter()
958 .enumerate()
959 .map(|(i, _)| format!("?{}", param_idx + i))
960 .collect();
961 sql.push_str(&format!(
963 " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
964 placeholders.join(", "),
965 all_tags.len()
966 ));
967 for tag in all_tags {
968 params_vec.push(Box::new(tag.clone()));
969 }
970 param_idx += all_tags.len();
971 }
972 }
973
974 if let Some(ref agent_tags) = qualified_for_agent_tags {
976 if agent_tags.is_empty() {
978 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
980 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
982 } else {
983 let needed_placeholders: Vec<String> = agent_tags
985 .iter()
986 .enumerate()
987 .map(|(i, _)| format!("?{}", param_idx + i))
988 .collect();
989 sql.push_str(&format!(
990 " AND (
991 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
992 OR (
993 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
994 ) = (
995 SELECT COUNT(*) FROM task_needed_tags
996 WHERE task_id = t.id AND tag IN ({})
997 )
998 )",
999 needed_placeholders.join(", ")
1000 ));
1001 for tag in agent_tags {
1002 params_vec.push(Box::new(tag.clone()));
1003 }
1004 param_idx += agent_tags.len();
1005
1006 let wanted_placeholders: Vec<String> = agent_tags
1008 .iter()
1009 .enumerate()
1010 .map(|(i, _)| format!("?{}", param_idx + i))
1011 .collect();
1012 sql.push_str(&format!(
1013 " AND (
1014 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1015 OR EXISTS (
1016 SELECT 1 FROM task_wanted_tags
1017 WHERE task_id = t.id AND tag IN ({})
1018 )
1019 )",
1020 wanted_placeholders.join(", ")
1021 ));
1022 for tag in agent_tags {
1023 params_vec.push(Box::new(tag.clone()));
1024 }
1025 }
1027 }
1028
1029 let order_clause = build_order_clause(sort_by, sort_order);
1031 sql.push_str(&format!(" ORDER BY {}", order_clause));
1032
1033 if let Some(l) = limit {
1035 sql.push_str(&format!(" LIMIT {}", l));
1036 }
1037
1038 let params_refs: Vec<&dyn rusqlite::ToSql> =
1039 params_vec.iter().map(|b| b.as_ref()).collect();
1040
1041 let mut stmt = conn.prepare(&sql)?;
1042 let tasks: Vec<Task> = stmt
1043 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1044 .filter_map(|r| r.ok())
1045 .collect();
1046
1047 Ok(tasks)
1048 })
1049 }
1050
1051 pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1053 self.with_conn(|conn| {
1054 let result: Result<String, rusqlite::Error> = conn.query_row(
1055 "SELECT tags FROM workers WHERE id = ?1",
1056 params![agent_id],
1057 |row| row.get(0),
1058 );
1059
1060 match result {
1061 Ok(tags_json) => {
1062 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1063 Ok(tags)
1064 }
1065 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1066 Err(e) => Err(e.into()),
1067 }
1068 })
1069 }
1070
1071 pub(super) fn add_dependency_internal(
1073 conn: &Connection,
1074 from_task_id: &str,
1075 to_task_id: &str,
1076 dep_type: &str,
1077 ) -> Result<()> {
1078 conn.execute(
1079 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1080 params![from_task_id, to_task_id, dep_type],
1081 )?;
1082 Ok(())
1083 }
1084
1085
1086 pub fn relink(
1090 &self,
1091 prev_from_ids: &[String],
1092 prev_to_ids: &[String],
1093 from_ids: &[String],
1094 to_ids: &[String],
1095 dep_type: &str,
1096 deps_config: &DependenciesConfig,
1097 ) -> Result<RelinkResult> {
1098 if !deps_config.is_valid_dep_type(dep_type) {
1100 return Err(anyhow!(
1101 "Invalid dependency type '{}'. Valid types: {:?}",
1102 dep_type,
1103 deps_config.dep_type_names()
1104 ));
1105 }
1106
1107 let def = deps_config.get_definition(dep_type).unwrap();
1108 let is_vertical = def.display == DependencyDisplay::Vertical;
1109
1110 self.with_conn_mut(|conn| {
1111 let tx = conn.transaction()?;
1112
1113 let mut unlinked = Vec::new();
1114 let mut linked = Vec::new();
1115 let mut errors = Vec::new();
1116
1117 for prev_from in prev_from_ids {
1119 for prev_to in prev_to_ids {
1120 let rows = tx.execute(
1121 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1122 params![prev_from, prev_to, dep_type],
1123 )?;
1124 if rows > 0 {
1125 unlinked.push((prev_from.clone(), prev_to.clone()));
1126 }
1127 }
1128 }
1129
1130 for from_id in from_ids {
1132 for to_id in to_ids {
1133 if is_vertical {
1135 let existing_parent: Option<String> = tx.query_row(
1136 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1137 params![to_id],
1138 |row| row.get(0),
1139 ).optional()?;
1140
1141 if let Some(ref parent) = existing_parent {
1142 if parent != from_id {
1143 errors.push(format!(
1144 "Task {} already has parent {}",
1145 to_id, parent
1146 ));
1147 continue;
1148 }
1149 }
1150 }
1151
1152 if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1154 errors.push(format!(
1155 "Adding dependency {}→{} would create a cycle",
1156 from_id, to_id
1157 ));
1158 continue;
1159 }
1160
1161 tx.execute(
1162 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1163 params![from_id, to_id, dep_type],
1164 )?;
1165 linked.push((from_id.clone(), to_id.clone()));
1166 }
1167 }
1168
1169 if !errors.is_empty() {
1170 tx.rollback()?;
1172 return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1173 }
1174
1175 tx.commit()?;
1176 Ok(RelinkResult { unlinked, linked })
1177 })
1178 }
1179
1180 pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1187 if depth == 0 {
1188 return Ok(vec![]);
1189 }
1190
1191 self.with_conn(|conn| {
1192 let mut visited: HashSet<String> = HashSet::new();
1193 let mut result: Vec<Task> = Vec::new();
1194 let mut current_level: Vec<String> = vec![task_id.to_string()];
1195 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1196
1197 while !current_level.is_empty() && levels_remaining > 0 {
1198 let mut next_level: Vec<String> = Vec::new();
1199
1200 for tid in ¤t_level {
1201 let mut stmt = conn.prepare(
1203 "SELECT DISTINCT d.from_task_id FROM dependencies d
1204 WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')"
1205 )?;
1206
1207 let predecessors: Vec<String> = stmt
1208 .query_map(params![tid], |row| row.get(0))?
1209 .filter_map(|r| r.ok())
1210 .collect();
1211
1212 for pred_id in predecessors {
1213 if !visited.contains(&pred_id) {
1214 visited.insert(pred_id.clone());
1215 if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1216 result.push(task);
1217 }
1218 next_level.push(pred_id);
1219 }
1220 }
1221 }
1222
1223 current_level = next_level;
1224 levels_remaining -= 1;
1225 }
1226
1227 Ok(result)
1228 })
1229 }
1230
1231 pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1234 if depth == 0 {
1235 return Ok(vec![]);
1236 }
1237
1238 self.with_conn(|conn| {
1239 let mut visited: HashSet<String> = HashSet::new();
1240 let mut result: Vec<Task> = Vec::new();
1241 let mut current_level: Vec<String> = vec![task_id.to_string()];
1242 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1243
1244 while !current_level.is_empty() && levels_remaining > 0 {
1245 let mut next_level: Vec<String> = Vec::new();
1246
1247 for tid in ¤t_level {
1248 let mut stmt = conn.prepare(
1250 "SELECT DISTINCT d.to_task_id FROM dependencies d
1251 WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')"
1252 )?;
1253
1254 let successors: Vec<String> = stmt
1255 .query_map(params![tid], |row| row.get(0))?
1256 .filter_map(|r| r.ok())
1257 .collect();
1258
1259 for succ_id in successors {
1260 if !visited.contains(&succ_id) {
1261 visited.insert(succ_id.clone());
1262 if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1263 result.push(task);
1264 }
1265 next_level.push(succ_id);
1266 }
1267 }
1268 }
1269
1270 current_level = next_level;
1271 levels_remaining -= 1;
1272 }
1273
1274 Ok(result)
1275 })
1276 }
1277
1278 pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1281 if depth == 0 {
1282 return Ok(vec![]);
1283 }
1284
1285 self.with_conn(|conn| {
1286 let mut result: Vec<Task> = Vec::new();
1287 let mut current_id = task_id.to_string();
1288 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1289
1290 while levels_remaining > 0 {
1291 let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1293 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1294 params![¤t_id],
1295 |row| row.get(0),
1296 );
1297
1298 match parent_result {
1299 Ok(parent_id) => {
1300 if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1301 result.push(task);
1302 }
1303 current_id = parent_id;
1304 levels_remaining -= 1;
1305 }
1306 Err(rusqlite::Error::QueryReturnedNoRows) => break,
1307 Err(e) => return Err(e.into()),
1308 }
1309 }
1310
1311 Ok(result)
1312 })
1313 }
1314
1315 pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1318 if depth == 0 {
1319 return Ok(vec![]);
1320 }
1321
1322 self.with_conn(|conn| {
1323 let mut visited: HashSet<String> = HashSet::new();
1324 let mut result: Vec<Task> = Vec::new();
1325 let mut current_level: Vec<String> = vec![task_id.to_string()];
1326 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1327
1328 while !current_level.is_empty() && levels_remaining > 0 {
1329 let mut next_level: Vec<String> = Vec::new();
1330
1331 for tid in ¤t_level {
1332 let mut stmt = conn.prepare(
1334 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1335 )?;
1336
1337 let children: Vec<String> = stmt
1338 .query_map(params![tid], |row| row.get(0))?
1339 .filter_map(|r| r.ok())
1340 .collect();
1341
1342 for child_id in children {
1343 if !visited.contains(&child_id) {
1344 visited.insert(child_id.clone());
1345 if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1346 result.push(task);
1347 }
1348 next_level.push(child_id);
1349 }
1350 }
1351 }
1352
1353 current_level = next_level;
1354 levels_remaining -= 1;
1355 }
1356
1357 Ok(result)
1358 })
1359 }
1360
1361}
1362
1363fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1365 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1366 let task = stmt
1367 .query_row(params![task_id], super::tasks::parse_task_row)
1368 .optional()?;
1369 Ok(task)
1370}
1371
1372
1373pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1377 conn: &Connection,
1378 task_id: &str,
1379 states_config: &StatesConfig,
1380 deps_config: &DependenciesConfig,
1381) -> Result<Vec<String>> {
1382 let start_blocking_types = deps_config.start_blocking_types();
1383 if start_blocking_types.is_empty() {
1384 return Ok(vec![]);
1385 }
1386
1387 let state_placeholders: Vec<String> = states_config
1389 .blocking_states
1390 .iter()
1391 .enumerate()
1392 .map(|(i, _)| format!("?{}", i + 2))
1393 .collect();
1394 let state_clause = state_placeholders.join(", ");
1395
1396 let type_start = states_config.blocking_states.len() + 2;
1398 let type_placeholders: Vec<String> = start_blocking_types
1399 .iter()
1400 .enumerate()
1401 .map(|(i, _)| format!("?{}", type_start + i))
1402 .collect();
1403 let type_clause = type_placeholders.join(", ");
1404
1405 let sql = format!(
1406 "SELECT blocker.id FROM dependencies d
1407 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1408 WHERE d.to_task_id = ?1
1409 AND d.dep_type IN ({})
1410 AND blocker.status IN ({})",
1411 type_clause, state_clause
1412 );
1413
1414 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1415 params_vec.push(Box::new(task_id.to_string()));
1416 for state in &states_config.blocking_states {
1417 params_vec.push(Box::new(state.clone()));
1418 }
1419 for t in &start_blocking_types {
1420 params_vec.push(Box::new(t.to_string()));
1421 }
1422 let params_refs: Vec<&dyn rusqlite::ToSql> =
1423 params_vec.iter().map(|b| b.as_ref()).collect();
1424
1425 let mut stmt = conn.prepare(&sql)?;
1426 let blockers = stmt
1427 .query_map(params_refs.as_slice(), |row| {
1428 let id: String = row.get(0)?;
1429 Ok(id)
1430 })?
1431 .filter_map(|r| r.ok())
1432 .collect();
1433
1434 Ok(blockers)
1435}
1436
1437pub(crate) fn propagate_unblock_effects(
1454 conn: &Connection,
1455 completed_task_id: &str,
1456 agent_id: Option<&str>,
1457 states_config: &StatesConfig,
1458 deps_config: &DependenciesConfig,
1459 auto_advance: &AutoAdvanceConfig,
1460) -> Result<(Vec<String>, Vec<String>)> {
1461 let start_blocking_types = deps_config.start_blocking_types();
1463 if start_blocking_types.is_empty() {
1464 return Ok((vec![], vec![]));
1465 }
1466
1467 let type_placeholders: Vec<String> = start_blocking_types
1469 .iter()
1470 .enumerate()
1471 .map(|(i, _)| format!("?{}", i + 2))
1472 .collect();
1473 let type_clause = type_placeholders.join(", ");
1474
1475 let sql = format!(
1476 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1477 type_clause
1478 );
1479
1480 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1481 params_vec.push(Box::new(completed_task_id.to_string()));
1482 for t in &start_blocking_types {
1483 params_vec.push(Box::new(t.to_string()));
1484 }
1485 let params_refs: Vec<&dyn rusqlite::ToSql> =
1486 params_vec.iter().map(|b| b.as_ref()).collect();
1487
1488 let mut stmt = conn.prepare(&sql)?;
1489 let dependent_task_ids: Vec<String> = stmt
1490 .query_map(params_refs.as_slice(), |row| row.get(0))?
1491 .filter_map(|r| r.ok())
1492 .collect();
1493
1494 let mut unblocked = Vec::new();
1495 let mut auto_advanced = Vec::new();
1496 let now = super::now_ms();
1497
1498 let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1500 let target_state = auto_advance.target_state.clone();
1501
1502 if should_auto_advance {
1504 let ts = target_state.as_ref().unwrap();
1505 if !states_config.is_valid_state(ts) {
1506 return Err(anyhow!(
1507 "Auto-advance target state '{}' is not a valid state",
1508 ts
1509 ));
1510 }
1511 }
1512
1513 for task_id in dependent_task_ids {
1514 let task = match get_task_by_id_internal(conn, &task_id)? {
1516 Some(t) => t,
1517 None => continue,
1518 };
1519
1520 if task.status != states_config.initial {
1522 continue;
1523 }
1524
1525 if task.worker_id.is_some() {
1527 continue;
1528 }
1529
1530 let state_placeholders: Vec<String> = states_config
1533 .blocking_states
1534 .iter()
1535 .enumerate()
1536 .map(|(i, _)| format!("?{}", i + 3))
1537 .collect();
1538 let state_clause = state_placeholders.join(", ");
1539
1540 let type_start = states_config.blocking_states.len() + 3;
1542 let type_placeholders2: Vec<String> = start_blocking_types
1543 .iter()
1544 .enumerate()
1545 .map(|(i, _)| format!("?{}", type_start + i))
1546 .collect();
1547 let type_clause2 = type_placeholders2.join(", ");
1548
1549 let blocker_sql = format!(
1550 "SELECT COUNT(*) FROM dependencies d
1551 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1552 WHERE d.to_task_id = ?1
1553 AND d.from_task_id != ?2
1554 AND d.dep_type IN ({})
1555 AND blocker.status IN ({})",
1556 type_clause2, state_clause
1557 );
1558
1559 let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1560 blocker_params.push(Box::new(task_id.clone()));
1561 blocker_params.push(Box::new(completed_task_id.to_string()));
1562 for state in &states_config.blocking_states {
1563 blocker_params.push(Box::new(state.clone()));
1564 }
1565 for t in &start_blocking_types {
1566 blocker_params.push(Box::new(t.to_string()));
1567 }
1568 let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1569 blocker_params.iter().map(|b| b.as_ref()).collect();
1570
1571 let remaining_blockers: i32 =
1572 conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1573
1574 if remaining_blockers > 0 {
1575 continue; }
1577
1578 unblocked.push(task_id.clone());
1580
1581 if should_auto_advance {
1583 let ts = target_state.as_ref().unwrap();
1584
1585 if !states_config.is_valid_transition(&states_config.initial, ts) {
1587 continue;
1589 }
1590
1591 conn.execute(
1593 "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1594 params![ts, now, &task_id],
1595 )?;
1596
1597 let reason = format!(
1599 "auto-advanced: blocker '{}' completed",
1600 completed_task_id
1601 );
1602 super::state_transitions::record_state_transition(
1603 conn,
1604 &task_id,
1605 ts,
1606 agent_id,
1607 Some(&reason),
1608 states_config,
1609 )?;
1610
1611 auto_advanced.push(task_id);
1612 }
1613 }
1614
1615 Ok((unblocked, auto_advanced))
1616}
1617