1use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10#[derive(Debug)]
12pub struct RelinkResult {
13 pub unlinked: Vec<(String, String)>,
15 pub linked: Vec<(String, String)>,
17}
18
19fn would_create_cycle_in_tx(
21 tx: &rusqlite::Transaction,
22 from_task_id: &str,
23 to_task_id: &str,
24 dep_type: &str,
25 deps_config: &DependenciesConfig,
26) -> Result<bool> {
27 let def = deps_config.get_definition(dep_type).unwrap();
28
29 let mut visited: HashSet<String> = HashSet::new();
32 let mut queue: VecDeque<String> = VecDeque::new();
33 queue.push_back(to_task_id.to_string());
34
35 while let Some(current) = queue.pop_front() {
36 if current == from_task_id {
37 return Ok(true); }
39
40 if visited.contains(¤t) {
41 continue;
42 }
43 visited.insert(current.clone());
44
45 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
47 let mut stmt = tx.prepare(
49 "SELECT to_task_id FROM dependencies d
50 JOIN (SELECT value FROM json_each(?1)) types
51 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
52 )?;
53 let vertical_types: Vec<&str> = deps_config.vertical_types();
54 let types_json = serde_json::to_string(&vertical_types)?;
55 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
56 .filter_map(|r| r.ok())
57 .collect()
58 } else {
59 let mut stmt = tx.prepare(
61 "SELECT to_task_id FROM dependencies d
62 JOIN (SELECT value FROM json_each(?1)) types
63 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
64 )?;
65 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
66 let types_json = serde_json::to_string(&start_blocking)?;
67 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
68 .filter_map(|r| r.ok())
69 .collect()
70 };
71
72 for dep in deps {
73 if !visited.contains(&dep) {
74 queue.push_back(dep);
75 }
76 }
77 }
78
79 Ok(false)
80}
81
82fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
85 let field = match sort_by {
86 Some("priority") => "CAST(t.priority AS INTEGER)",
87 Some("created_at") => "t.created_at",
88 Some("updated_at") => "t.updated_at",
89 _ => "t.created_at", };
91
92 let order = match sort_order {
93 Some("asc") => "ASC",
94 Some("desc") => "DESC",
95 _ => {
96 "DESC"
98 }
99 };
100
101 format!("{} {}", field, order)
102}
103
104impl Database {
105 pub fn add_dependency(
107 &self,
108 from_task_id: &str,
109 to_task_id: &str,
110 dep_type: &str,
111 deps_config: &DependenciesConfig,
112 ) -> Result<()> {
113 if !deps_config.is_valid_dep_type(dep_type) {
115 return Err(anyhow!(
116 "Invalid dependency type '{}'. Valid types: {:?}",
117 dep_type,
118 deps_config.dep_type_names()
119 ));
120 }
121
122 let def = deps_config.get_definition(dep_type).unwrap();
124 if def.display == DependencyDisplay::Vertical
125 && let Some(existing_parent) = self.get_parent(to_task_id)?
126 && existing_parent != from_task_id {
127 return Err(anyhow!(
128 "Task {} already has parent {}",
129 to_task_id,
130 existing_parent
131 ));
132 }
133
134 if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
136 return Err(anyhow!("Adding this dependency would create a cycle"));
137 }
138
139 self.with_conn(|conn| {
140 conn.execute(
141 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
142 params![from_task_id, to_task_id, dep_type],
143 )?;
144 Ok(())
145 })
146 }
147
148 pub fn would_create_cycle(
152 &self,
153 from_task_id: &str,
154 to_task_id: &str,
155 dep_type: &str,
156 deps_config: &DependenciesConfig,
157 ) -> Result<bool> {
158 let def = deps_config.get_definition(dep_type).unwrap();
159
160 self.with_conn(|conn| {
161 let mut visited: HashSet<String> = HashSet::new();
164 let mut queue: VecDeque<String> = VecDeque::new();
165 queue.push_back(to_task_id.to_string());
166
167 while let Some(current) = queue.pop_front() {
168 if current == from_task_id {
169 return Ok(true); }
171
172 if visited.contains(¤t) {
173 continue;
174 }
175 visited.insert(current.clone());
176
177 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
179 let mut stmt = conn.prepare(
181 "SELECT to_task_id FROM dependencies d
182 JOIN (SELECT value FROM json_each(?1)) types
183 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
184 )?;
185 let vertical_types: Vec<&str> = deps_config.vertical_types();
186 let types_json = serde_json::to_string(&vertical_types)?;
187 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
188 .filter_map(|r| r.ok())
189 .collect()
190 } else {
191 let mut stmt = conn.prepare(
193 "SELECT to_task_id FROM dependencies d
194 JOIN (SELECT value FROM json_each(?1)) types
195 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
196 )?;
197 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
198 let types_json = serde_json::to_string(&start_blocking)?;
199 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
200 .filter_map(|r| r.ok())
201 .collect()
202 };
203
204 for dep in deps {
205 if !visited.contains(&dep) {
206 queue.push_back(dep);
207 }
208 }
209 }
210
211 Ok(false)
212 })
213 }
214
215 pub fn remove_dependency(
217 &self,
218 from_task_id: &str,
219 to_task_id: &str,
220 dep_type: &str,
221 ) -> Result<bool> {
222 self.with_conn(|conn| {
223 let rows = conn.execute(
224 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
225 params![from_task_id, to_task_id, dep_type],
226 )?;
227 Ok(rows > 0)
228 })
229 }
230
231 pub fn remove_all_outgoing_dependencies(
234 &self,
235 from_task_id: &str,
236 dep_type: &str,
237 ) -> Result<Vec<Dependency>> {
238 self.with_conn_mut(|conn| {
239 let tx = conn.transaction()?;
240
241 let deps: Vec<Dependency> = {
243 let mut stmt = tx.prepare(
244 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
245 )?;
246 stmt
247 .query_map(params![from_task_id, dep_type], |row| {
248 Ok(Dependency {
249 from_task_id: row.get(0)?,
250 to_task_id: row.get(1)?,
251 dep_type: row.get(2)?,
252 })
253 })?
254 .filter_map(|r| r.ok())
255 .collect()
256 };
257
258 tx.execute(
260 "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
261 params![from_task_id, dep_type],
262 )?;
263
264 tx.commit()?;
265 Ok(deps)
266 })
267 }
268
269 pub fn remove_all_incoming_dependencies(
272 &self,
273 to_task_id: &str,
274 dep_type: &str,
275 ) -> Result<Vec<Dependency>> {
276 self.with_conn_mut(|conn| {
277 let tx = conn.transaction()?;
278
279 let deps: Vec<Dependency> = {
281 let mut stmt = tx.prepare(
282 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
283 )?;
284 stmt
285 .query_map(params![to_task_id, dep_type], |row| {
286 Ok(Dependency {
287 from_task_id: row.get(0)?,
288 to_task_id: row.get(1)?,
289 dep_type: row.get(2)?,
290 })
291 })?
292 .filter_map(|r| r.ok())
293 .collect()
294 };
295
296 tx.execute(
298 "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
299 params![to_task_id, dep_type],
300 )?;
301
302 tx.commit()?;
303 Ok(deps)
304 })
305 }
306
307 pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
309 self.with_conn(|conn| {
310 let mut stmt =
311 conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
312
313 let deps = stmt
314 .query_map([], |row| {
315 let from: String = row.get(0)?;
316 let to: String = row.get(1)?;
317 let dep_type: String = row.get(2)?;
318 Ok(Dependency {
319 from_task_id: from,
320 to_task_id: to,
321 dep_type,
322 })
323 })?
324 .filter_map(|r| r.ok())
325 .collect();
326
327 Ok(deps)
328 })
329 }
330
331 pub fn get_dependencies_by_type(
333 &self,
334 task_id: &str,
335 dep_type: &str,
336 direction: &str,
337 ) -> Result<Vec<Dependency>> {
338 self.with_conn(|conn| {
339 let sql = if direction == "incoming" {
340 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
341 } else {
342 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
343 };
344
345 let mut stmt = conn.prepare(sql)?;
346
347 let deps = stmt
348 .query_map(params![task_id, dep_type], |row| {
349 let from: String = row.get(0)?;
350 let to: String = row.get(1)?;
351 let dep_type: String = row.get(2)?;
352 Ok(Dependency {
353 from_task_id: from,
354 to_task_id: to,
355 dep_type,
356 })
357 })?
358 .filter_map(|r| r.ok())
359 .collect();
360
361 Ok(deps)
362 })
363 }
364
365 pub fn get_start_blockers(
367 &self,
368 task_id: &str,
369 deps_config: &DependenciesConfig,
370 ) -> Result<Vec<String>> {
371 let start_blocking_types = deps_config.start_blocking_types();
372 if start_blocking_types.is_empty() {
373 return Ok(vec![]);
374 }
375
376 self.with_conn(|conn| {
377 let placeholders: String = start_blocking_types
378 .iter()
379 .enumerate()
380 .map(|(i, _)| format!("?{}", i + 2))
381 .collect::<Vec<_>>()
382 .join(", ");
383
384 let sql = format!(
385 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
386 placeholders
387 );
388
389 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
390 params_vec.push(Box::new(task_id.to_string()));
391 for t in &start_blocking_types {
392 params_vec.push(Box::new(t.to_string()));
393 }
394 let params_refs: Vec<&dyn rusqlite::ToSql> =
395 params_vec.iter().map(|b| b.as_ref()).collect();
396
397 let mut stmt = conn.prepare(&sql)?;
398 let blockers = stmt
399 .query_map(params_refs.as_slice(), |row| {
400 let id: String = row.get(0)?;
401 Ok(id)
402 })?
403 .filter_map(|r| r.ok())
404 .collect();
405
406 Ok(blockers)
407 })
408 }
409
410 pub fn get_completion_blockers(
413 &self,
414 task_id: &str,
415 deps_config: &DependenciesConfig,
416 ) -> Result<Vec<String>> {
417 let completion_blocking_types = deps_config.completion_blocking_types();
418 if completion_blocking_types.is_empty() {
419 return Ok(vec![]);
420 }
421
422 self.with_conn(|conn| {
423 let placeholders: String = completion_blocking_types
424 .iter()
425 .enumerate()
426 .map(|(i, _)| format!("?{}", i + 2))
427 .collect::<Vec<_>>()
428 .join(", ");
429
430 let sql = format!(
433 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
434 placeholders
435 );
436
437 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
438 params_vec.push(Box::new(task_id.to_string()));
439 for t in &completion_blocking_types {
440 params_vec.push(Box::new(t.to_string()));
441 }
442 let params_refs: Vec<&dyn rusqlite::ToSql> =
443 params_vec.iter().map(|b| b.as_ref()).collect();
444
445 let mut stmt = conn.prepare(&sql)?;
446 let blockers = stmt
447 .query_map(params_refs.as_slice(), |row| {
448 let id: String = row.get(0)?;
449 Ok(id)
450 })?
451 .filter_map(|r| r.ok())
452 .collect();
453
454 Ok(blockers)
455 })
456 }
457
458 pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
460 self.with_conn(|conn| {
461 let result: Result<String, rusqlite::Error> = conn.query_row(
462 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
463 params![task_id],
464 |row| row.get(0),
465 );
466
467 match result {
468 Ok(parent_id) => Ok(Some(parent_id)),
469 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
470 Err(e) => Err(e.into()),
471 }
472 })
473 }
474
475 pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
477 self.with_conn(|conn| {
478 let mut stmt = conn.prepare(
479 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
480 )?;
481
482 let children = stmt
483 .query_map(params![task_id], |row| {
484 let id: String = row.get(0)?;
485 Ok(id)
486 })?
487 .filter_map(|r| r.ok())
488 .collect();
489
490 Ok(children)
491 })
492 }
493
494 pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
497 self.with_conn(|conn| {
498 let mut stmt = conn.prepare(
499 "SELECT from_task_id FROM dependencies
500 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
501 )?;
502
503 let blockers = stmt
504 .query_map(params![task_id], |row| {
505 let id: String = row.get(0)?;
506 Ok(id)
507 })?
508 .filter_map(|r| r.ok())
509 .collect();
510
511 Ok(blockers)
512 })
513 }
514
515 #[allow(dead_code)]
517 pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
518 self.with_conn(|conn| {
519 let mut stmt = conn.prepare(
520 "SELECT to_task_id FROM dependencies
521 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
522 )?;
523
524 let blocking = stmt
525 .query_map(params![task_id], |row| {
526 let id: String = row.get(0)?;
527 Ok(id)
528 })?
529 .filter_map(|r| r.ok())
530 .collect();
531
532 Ok(blocking)
533 })
534 }
535
536 pub fn get_blocked_tasks(
540 &self,
541 states_config: &StatesConfig,
542 deps_config: &DependenciesConfig,
543 sort_by: Option<&str>,
544 sort_order: Option<&str>,
545 ) -> Result<Vec<Task>> {
546 let start_blocking_types = deps_config.start_blocking_types();
547 if start_blocking_types.is_empty() {
548 return Ok(vec![]);
549 }
550
551 self.with_conn(|conn| {
552 let state_placeholders: Vec<String> = states_config
554 .blocking_states
555 .iter()
556 .enumerate()
557 .map(|(i, _)| format!("?{}", i + 2))
558 .collect();
559 let state_clause = state_placeholders.join(", ");
560
561 let type_start = states_config.blocking_states.len() + 2;
563 let type_placeholders: Vec<String> = start_blocking_types
564 .iter()
565 .enumerate()
566 .map(|(i, _)| format!("?{}", type_start + i))
567 .collect();
568 let type_clause = type_placeholders.join(", ");
569
570 let order_clause = build_order_clause(sort_by, sort_order);
572
573 let sql = format!(
574 "SELECT DISTINCT t.*
575 FROM tasks t
576 INNER JOIN dependencies d ON t.id = d.to_task_id
577 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
578 WHERE d.dep_type IN ({})
579 AND blocker.status IN ({})
580 AND t.status = ?1
581 AND t.deleted_at IS NULL
582 ORDER BY {}",
583 type_clause, state_clause, order_clause
584 );
585
586 let mut stmt = conn.prepare(&sql)?;
587
588 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
590 params_vec.push(Box::new(states_config.initial.clone()));
591 for state in &states_config.blocking_states {
592 params_vec.push(Box::new(state.clone()));
593 }
594 for t in &start_blocking_types {
595 params_vec.push(Box::new(t.to_string()));
596 }
597 let params_refs: Vec<&dyn rusqlite::ToSql> =
598 params_vec.iter().map(|b| b.as_ref()).collect();
599
600 let tasks = stmt
601 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
602 .filter_map(|r| r.ok())
603 .collect();
604
605 Ok(tasks)
606 })
607 }
608
609 pub fn get_ready_tasks(
614 &self,
615 agent_id: Option<&str>,
616 states_config: &StatesConfig,
617 deps_config: &DependenciesConfig,
618 sort_by: Option<&str>,
619 sort_order: Option<&str>,
620 ) -> Result<Vec<Task>> {
621 let start_blocking_types = deps_config.start_blocking_types();
622
623 let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
625 Some(self.get_agent_tags(aid)?)
626 } else {
627 None
628 };
629
630 self.with_conn(|conn| {
631 let state_placeholders: Vec<String> = states_config
633 .blocking_states
634 .iter()
635 .enumerate()
636 .map(|(i, _)| format!("?{}", i + 2))
637 .collect();
638 let state_clause = state_placeholders.join(", ");
639
640 let type_start = states_config.blocking_states.len() + 2;
642 let type_placeholders: Vec<String> = start_blocking_types
643 .iter()
644 .enumerate()
645 .map(|(i, _)| format!("?{}", type_start + i))
646 .collect();
647 let type_clause = type_placeholders.join(", ");
648
649 let order_clause = if sort_by.is_some() {
651 build_order_clause(sort_by, sort_order)
652 } else {
653 "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
655 };
656
657 let mut param_idx = type_start + start_blocking_types.len();
659
660 let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
662 let needed_placeholders: Vec<String> = tags
666 .iter()
667 .enumerate()
668 .map(|(i, _)| format!("?{}", param_idx + i))
669 .collect();
670 param_idx += tags.len();
671
672 let needed_clause = if needed_placeholders.is_empty() {
673 "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
675 .to_string()
676 } else {
677 format!(
679 "AND (
680 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
681 OR (
682 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
683 ) = (
684 SELECT COUNT(*) FROM task_needed_tags
685 WHERE task_id = t.id AND tag IN ({})
686 )
687 )",
688 needed_placeholders.join(", ")
689 )
690 };
691
692 let wanted_placeholders: Vec<String> = tags
694 .iter()
695 .enumerate()
696 .map(|(i, _)| format!("?{}", param_idx + i))
697 .collect();
698
699 let wanted_clause = if wanted_placeholders.is_empty() {
700 "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
702 .to_string()
703 } else {
704 format!(
706 "AND (
707 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
708 OR EXISTS (
709 SELECT 1 FROM task_wanted_tags
710 WHERE task_id = t.id AND tag IN ({})
711 )
712 )",
713 wanted_placeholders.join(", ")
714 )
715 };
716
717 (needed_clause, wanted_clause)
718 } else {
719 (String::new(), String::new())
720 };
721
722 let sql = format!(
723 "SELECT t.*
724 FROM tasks t
725 WHERE t.status = ?1
726 AND t.worker_id IS NULL
727 AND t.deleted_at IS NULL
728 AND NOT EXISTS (
729 SELECT 1 FROM dependencies d
730 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
731 WHERE d.to_task_id = t.id
732 AND d.dep_type IN ({})
733 AND blocker.status IN ({})
734 )
735 {}
736 {}
737 ORDER BY {}",
738 type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
739 );
740
741 let mut stmt = conn.prepare(&sql)?;
742
743 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
745 params_vec.push(Box::new(states_config.initial.clone()));
746 for state in &states_config.blocking_states {
747 params_vec.push(Box::new(state.clone()));
748 }
749 for t in &start_blocking_types {
750 params_vec.push(Box::new(t.to_string()));
751 }
752 if let Some(ref tags) = agent_tags {
754 for tag in tags {
755 params_vec.push(Box::new(tag.clone()));
756 }
757 for tag in tags {
758 params_vec.push(Box::new(tag.clone()));
759 }
760 }
761 let params_refs: Vec<&dyn rusqlite::ToSql> =
762 params_vec.iter().map(|b| b.as_ref()).collect();
763
764 let tasks: Vec<Task> = stmt
765 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
766 .filter_map(|r| r.ok())
767 .collect();
768
769 Ok(tasks)
770 })
771 }
772
773 #[allow(dead_code)]
775 pub fn has_unmet_start_dependencies(
776 &self,
777 task_id: &str,
778 states_config: &StatesConfig,
779 deps_config: &DependenciesConfig,
780 ) -> Result<bool> {
781 let start_blocking_types = deps_config.start_blocking_types();
782 if start_blocking_types.is_empty() {
783 return Ok(false);
784 }
785
786 self.with_conn(|conn| {
787 let state_placeholders: Vec<String> = states_config
789 .blocking_states
790 .iter()
791 .enumerate()
792 .map(|(i, _)| format!("?{}", i + 2))
793 .collect();
794 let state_clause = state_placeholders.join(", ");
795
796 let type_start = states_config.blocking_states.len() + 2;
798 let type_placeholders: Vec<String> = start_blocking_types
799 .iter()
800 .enumerate()
801 .map(|(i, _)| format!("?{}", type_start + i))
802 .collect();
803 let type_clause = type_placeholders.join(", ");
804
805 let sql = format!(
806 "SELECT COUNT(*) FROM dependencies d
807 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
808 WHERE d.to_task_id = ?1
809 AND d.dep_type IN ({})
810 AND blocker.status IN ({})",
811 type_clause, state_clause
812 );
813
814 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
815 params_vec.push(Box::new(task_id.to_string()));
816 for state in &states_config.blocking_states {
817 params_vec.push(Box::new(state.clone()));
818 }
819 for t in &start_blocking_types {
820 params_vec.push(Box::new(t.to_string()));
821 }
822 let params_refs: Vec<&dyn rusqlite::ToSql> =
823 params_vec.iter().map(|b| b.as_ref()).collect();
824
825 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
826
827 Ok(count > 0)
828 })
829 }
830
831 pub fn has_incomplete_children(
833 &self,
834 task_id: &str,
835 states_config: &StatesConfig,
836 ) -> Result<bool> {
837 self.with_conn(|conn| {
838 let state_placeholders: Vec<String> = states_config
840 .blocking_states
841 .iter()
842 .enumerate()
843 .map(|(i, _)| format!("?{}", i + 2))
844 .collect();
845 let state_clause = state_placeholders.join(", ");
846
847 let sql = format!(
848 "SELECT COUNT(*) FROM dependencies d
849 INNER JOIN tasks child ON d.to_task_id = child.id
850 WHERE d.from_task_id = ?1
851 AND d.dep_type = 'contains'
852 AND child.status IN ({})",
853 state_clause
854 );
855
856 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
857 params_vec.push(Box::new(task_id.to_string()));
858 for state in &states_config.blocking_states {
859 params_vec.push(Box::new(state.clone()));
860 }
861 let params_refs: Vec<&dyn rusqlite::ToSql> =
862 params_vec.iter().map(|b| b.as_ref()).collect();
863
864 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
865
866 Ok(count > 0)
867 })
868 }
869
870 #[allow(clippy::too_many_arguments)]
877 pub fn list_tasks_with_tag_filters(
878 &self,
879 status: Option<Vec<String>>,
880 owner: Option<&str>,
881 parent_id: Option<Option<&str>>,
882 tags_any: Option<Vec<String>>,
883 tags_all: Option<Vec<String>>,
884 qualified_for_agent_tags: Option<Vec<String>>,
885 limit: Option<i32>,
886 sort_by: Option<&str>,
887 sort_order: Option<&str>,
888 ) -> Result<Vec<Task>> {
889 self.with_conn(|conn| {
890 let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
891 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
892 let mut param_idx = 1;
893
894 if let Some(ref statuses) = status {
896 if statuses.len() == 1 {
897 sql.push_str(&format!(" AND t.status = ?{}", param_idx));
898 params_vec.push(Box::new(statuses[0].clone()));
899 param_idx += 1;
900 } else if statuses.len() > 1 {
901 let placeholders: Vec<String> = statuses
902 .iter()
903 .enumerate()
904 .map(|(i, _)| format!("?{}", param_idx + i))
905 .collect();
906 sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
907 for s in statuses {
908 params_vec.push(Box::new(s.clone()));
909 }
910 param_idx += statuses.len();
911 }
912 }
913
914 if let Some(o) = owner {
916 sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
917 params_vec.push(Box::new(o.to_string()));
918 param_idx += 1;
919 }
920
921 if let Some(p) = parent_id {
923 match p {
924 Some(pid) => {
925 sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
926 params_vec.push(Box::new(pid.to_string()));
927 param_idx += 1;
928 }
929 None => {
930 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
932 }
933 }
934 }
935
936 if let Some(ref any_tags) = tags_any
938 && !any_tags.is_empty() {
939 let placeholders: Vec<String> = any_tags
940 .iter()
941 .enumerate()
942 .map(|(i, _)| format!("?{}", param_idx + i))
943 .collect();
944 sql.push_str(&format!(
945 " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
946 placeholders.join(", ")
947 ));
948 for tag in any_tags {
949 params_vec.push(Box::new(tag.clone()));
950 }
951 param_idx += any_tags.len();
952 }
953
954 if let Some(ref all_tags) = tags_all
956 && !all_tags.is_empty() {
957 let placeholders: Vec<String> = all_tags
958 .iter()
959 .enumerate()
960 .map(|(i, _)| format!("?{}", param_idx + i))
961 .collect();
962 sql.push_str(&format!(
964 " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
965 placeholders.join(", "),
966 all_tags.len()
967 ));
968 for tag in all_tags {
969 params_vec.push(Box::new(tag.clone()));
970 }
971 param_idx += all_tags.len();
972 }
973
974 if let Some(ref agent_tags) = qualified_for_agent_tags {
976 if agent_tags.is_empty() {
978 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
980 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
982 } else {
983 let needed_placeholders: Vec<String> = agent_tags
985 .iter()
986 .enumerate()
987 .map(|(i, _)| format!("?{}", param_idx + i))
988 .collect();
989 sql.push_str(&format!(
990 " AND (
991 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
992 OR (
993 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
994 ) = (
995 SELECT COUNT(*) FROM task_needed_tags
996 WHERE task_id = t.id AND tag IN ({})
997 )
998 )",
999 needed_placeholders.join(", ")
1000 ));
1001 for tag in agent_tags {
1002 params_vec.push(Box::new(tag.clone()));
1003 }
1004 param_idx += agent_tags.len();
1005
1006 let wanted_placeholders: Vec<String> = agent_tags
1008 .iter()
1009 .enumerate()
1010 .map(|(i, _)| format!("?{}", param_idx + i))
1011 .collect();
1012 sql.push_str(&format!(
1013 " AND (
1014 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1015 OR EXISTS (
1016 SELECT 1 FROM task_wanted_tags
1017 WHERE task_id = t.id AND tag IN ({})
1018 )
1019 )",
1020 wanted_placeholders.join(", ")
1021 ));
1022 for tag in agent_tags {
1023 params_vec.push(Box::new(tag.clone()));
1024 }
1025 }
1027 }
1028
1029 let order_clause = build_order_clause(sort_by, sort_order);
1031 sql.push_str(&format!(" ORDER BY {}", order_clause));
1032
1033 if let Some(l) = limit {
1035 sql.push_str(&format!(" LIMIT {}", l));
1036 }
1037
1038 let params_refs: Vec<&dyn rusqlite::ToSql> =
1039 params_vec.iter().map(|b| b.as_ref()).collect();
1040
1041 let mut stmt = conn.prepare(&sql)?;
1042 let tasks: Vec<Task> = stmt
1043 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1044 .filter_map(|r| r.ok())
1045 .collect();
1046
1047 Ok(tasks)
1048 })
1049 }
1050
1051 pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1053 self.with_conn(|conn| {
1054 let result: Result<String, rusqlite::Error> = conn.query_row(
1055 "SELECT tags FROM workers WHERE id = ?1",
1056 params![agent_id],
1057 |row| row.get(0),
1058 );
1059
1060 match result {
1061 Ok(tags_json) => {
1062 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1063 Ok(tags)
1064 }
1065 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1066 Err(e) => Err(e.into()),
1067 }
1068 })
1069 }
1070
1071 pub(super) fn add_dependency_internal(
1073 conn: &Connection,
1074 from_task_id: &str,
1075 to_task_id: &str,
1076 dep_type: &str,
1077 ) -> Result<()> {
1078 conn.execute(
1079 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1080 params![from_task_id, to_task_id, dep_type],
1081 )?;
1082 Ok(())
1083 }
1084
1085 pub fn relink(
1089 &self,
1090 prev_from_ids: &[String],
1091 prev_to_ids: &[String],
1092 from_ids: &[String],
1093 to_ids: &[String],
1094 dep_type: &str,
1095 deps_config: &DependenciesConfig,
1096 ) -> Result<RelinkResult> {
1097 if !deps_config.is_valid_dep_type(dep_type) {
1099 return Err(anyhow!(
1100 "Invalid dependency type '{}'. Valid types: {:?}",
1101 dep_type,
1102 deps_config.dep_type_names()
1103 ));
1104 }
1105
1106 let def = deps_config.get_definition(dep_type).unwrap();
1107 let is_vertical = def.display == DependencyDisplay::Vertical;
1108
1109 self.with_conn_mut(|conn| {
1110 let tx = conn.transaction()?;
1111
1112 let mut unlinked = Vec::new();
1113 let mut linked = Vec::new();
1114 let mut errors = Vec::new();
1115
1116 for prev_from in prev_from_ids {
1118 for prev_to in prev_to_ids {
1119 let rows = tx.execute(
1120 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1121 params![prev_from, prev_to, dep_type],
1122 )?;
1123 if rows > 0 {
1124 unlinked.push((prev_from.clone(), prev_to.clone()));
1125 }
1126 }
1127 }
1128
1129 for from_id in from_ids {
1131 for to_id in to_ids {
1132 if is_vertical {
1134 let existing_parent: Option<String> = tx.query_row(
1135 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1136 params![to_id],
1137 |row| row.get(0),
1138 ).optional()?;
1139
1140 if let Some(ref parent) = existing_parent
1141 && parent != from_id {
1142 errors.push(format!(
1143 "Task {} already has parent {}",
1144 to_id, parent
1145 ));
1146 continue;
1147 }
1148 }
1149
1150 if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1152 errors.push(format!(
1153 "Adding dependency {}→{} would create a cycle",
1154 from_id, to_id
1155 ));
1156 continue;
1157 }
1158
1159 tx.execute(
1160 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1161 params![from_id, to_id, dep_type],
1162 )?;
1163 linked.push((from_id.clone(), to_id.clone()));
1164 }
1165 }
1166
1167 if !errors.is_empty() {
1168 tx.rollback()?;
1170 return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1171 }
1172
1173 tx.commit()?;
1174 Ok(RelinkResult { unlinked, linked })
1175 })
1176 }
1177
1178 pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1185 if depth == 0 {
1186 return Ok(vec![]);
1187 }
1188
1189 self.with_conn(|conn| {
1190 let mut visited: HashSet<String> = HashSet::new();
1191 let mut result: Vec<Task> = Vec::new();
1192 let mut current_level: Vec<String> = vec![task_id.to_string()];
1193 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1194
1195 while !current_level.is_empty() && levels_remaining > 0 {
1196 let mut next_level: Vec<String> = Vec::new();
1197
1198 for tid in ¤t_level {
1199 let mut stmt = conn.prepare(
1201 "SELECT DISTINCT d.from_task_id FROM dependencies d
1202 WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1203 )?;
1204
1205 let predecessors: Vec<String> = stmt
1206 .query_map(params![tid], |row| row.get(0))?
1207 .filter_map(|r| r.ok())
1208 .collect();
1209
1210 for pred_id in predecessors {
1211 if !visited.contains(&pred_id) {
1212 visited.insert(pred_id.clone());
1213 if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1214 result.push(task);
1215 }
1216 next_level.push(pred_id);
1217 }
1218 }
1219 }
1220
1221 current_level = next_level;
1222 levels_remaining -= 1;
1223 }
1224
1225 Ok(result)
1226 })
1227 }
1228
1229 pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1232 if depth == 0 {
1233 return Ok(vec![]);
1234 }
1235
1236 self.with_conn(|conn| {
1237 let mut visited: HashSet<String> = HashSet::new();
1238 let mut result: Vec<Task> = Vec::new();
1239 let mut current_level: Vec<String> = vec![task_id.to_string()];
1240 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1241
1242 while !current_level.is_empty() && levels_remaining > 0 {
1243 let mut next_level: Vec<String> = Vec::new();
1244
1245 for tid in ¤t_level {
1246 let mut stmt = conn.prepare(
1248 "SELECT DISTINCT d.to_task_id FROM dependencies d
1249 WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1250 )?;
1251
1252 let successors: Vec<String> = stmt
1253 .query_map(params![tid], |row| row.get(0))?
1254 .filter_map(|r| r.ok())
1255 .collect();
1256
1257 for succ_id in successors {
1258 if !visited.contains(&succ_id) {
1259 visited.insert(succ_id.clone());
1260 if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1261 result.push(task);
1262 }
1263 next_level.push(succ_id);
1264 }
1265 }
1266 }
1267
1268 current_level = next_level;
1269 levels_remaining -= 1;
1270 }
1271
1272 Ok(result)
1273 })
1274 }
1275
1276 pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1279 if depth == 0 {
1280 return Ok(vec![]);
1281 }
1282
1283 self.with_conn(|conn| {
1284 let mut result: Vec<Task> = Vec::new();
1285 let mut current_id = task_id.to_string();
1286 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1287
1288 while levels_remaining > 0 {
1289 let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1291 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1292 params![¤t_id],
1293 |row| row.get(0),
1294 );
1295
1296 match parent_result {
1297 Ok(parent_id) => {
1298 if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1299 result.push(task);
1300 }
1301 current_id = parent_id;
1302 levels_remaining -= 1;
1303 }
1304 Err(rusqlite::Error::QueryReturnedNoRows) => break,
1305 Err(e) => return Err(e.into()),
1306 }
1307 }
1308
1309 Ok(result)
1310 })
1311 }
1312
1313 pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1316 if depth == 0 {
1317 return Ok(vec![]);
1318 }
1319
1320 self.with_conn(|conn| {
1321 let mut visited: HashSet<String> = HashSet::new();
1322 let mut result: Vec<Task> = Vec::new();
1323 let mut current_level: Vec<String> = vec![task_id.to_string()];
1324 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1325
1326 while !current_level.is_empty() && levels_remaining > 0 {
1327 let mut next_level: Vec<String> = Vec::new();
1328
1329 for tid in ¤t_level {
1330 let mut stmt = conn.prepare(
1332 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1333 )?;
1334
1335 let children: Vec<String> = stmt
1336 .query_map(params![tid], |row| row.get(0))?
1337 .filter_map(|r| r.ok())
1338 .collect();
1339
1340 for child_id in children {
1341 if !visited.contains(&child_id) {
1342 visited.insert(child_id.clone());
1343 if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1344 result.push(task);
1345 }
1346 next_level.push(child_id);
1347 }
1348 }
1349 }
1350
1351 current_level = next_level;
1352 levels_remaining -= 1;
1353 }
1354
1355 Ok(result)
1356 })
1357 }
1358}
1359
1360fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1362 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1363 let task = stmt
1364 .query_row(params![task_id], super::tasks::parse_task_row)
1365 .optional()?;
1366 Ok(task)
1367}
1368
1369pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1373 conn: &Connection,
1374 task_id: &str,
1375 states_config: &StatesConfig,
1376 deps_config: &DependenciesConfig,
1377) -> Result<Vec<String>> {
1378 let start_blocking_types = deps_config.start_blocking_types();
1379 if start_blocking_types.is_empty() {
1380 return Ok(vec![]);
1381 }
1382
1383 let state_placeholders: Vec<String> = states_config
1385 .blocking_states
1386 .iter()
1387 .enumerate()
1388 .map(|(i, _)| format!("?{}", i + 2))
1389 .collect();
1390 let state_clause = state_placeholders.join(", ");
1391
1392 let type_start = states_config.blocking_states.len() + 2;
1394 let type_placeholders: Vec<String> = start_blocking_types
1395 .iter()
1396 .enumerate()
1397 .map(|(i, _)| format!("?{}", type_start + i))
1398 .collect();
1399 let type_clause = type_placeholders.join(", ");
1400
1401 let sql = format!(
1402 "SELECT blocker.id FROM dependencies d
1403 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1404 WHERE d.to_task_id = ?1
1405 AND d.dep_type IN ({})
1406 AND blocker.status IN ({})",
1407 type_clause, state_clause
1408 );
1409
1410 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1411 params_vec.push(Box::new(task_id.to_string()));
1412 for state in &states_config.blocking_states {
1413 params_vec.push(Box::new(state.clone()));
1414 }
1415 for t in &start_blocking_types {
1416 params_vec.push(Box::new(t.to_string()));
1417 }
1418 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1419
1420 let mut stmt = conn.prepare(&sql)?;
1421 let blockers = stmt
1422 .query_map(params_refs.as_slice(), |row| {
1423 let id: String = row.get(0)?;
1424 Ok(id)
1425 })?
1426 .filter_map(|r| r.ok())
1427 .collect();
1428
1429 Ok(blockers)
1430}
1431
1432pub(crate) fn propagate_unblock_effects(
1449 conn: &Connection,
1450 completed_task_id: &str,
1451 agent_id: Option<&str>,
1452 states_config: &StatesConfig,
1453 deps_config: &DependenciesConfig,
1454 auto_advance: &AutoAdvanceConfig,
1455) -> Result<(Vec<String>, Vec<String>)> {
1456 let start_blocking_types = deps_config.start_blocking_types();
1458 if start_blocking_types.is_empty() {
1459 return Ok((vec![], vec![]));
1460 }
1461
1462 let type_placeholders: Vec<String> = start_blocking_types
1464 .iter()
1465 .enumerate()
1466 .map(|(i, _)| format!("?{}", i + 2))
1467 .collect();
1468 let type_clause = type_placeholders.join(", ");
1469
1470 let sql = format!(
1471 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1472 type_clause
1473 );
1474
1475 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1476 params_vec.push(Box::new(completed_task_id.to_string()));
1477 for t in &start_blocking_types {
1478 params_vec.push(Box::new(t.to_string()));
1479 }
1480 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1481
1482 let mut stmt = conn.prepare(&sql)?;
1483 let dependent_task_ids: Vec<String> = stmt
1484 .query_map(params_refs.as_slice(), |row| row.get(0))?
1485 .filter_map(|r| r.ok())
1486 .collect();
1487
1488 let mut unblocked = Vec::new();
1489 let mut auto_advanced = Vec::new();
1490 let now = super::now_ms();
1491
1492 let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1494 let target_state = auto_advance.target_state.clone();
1495
1496 if should_auto_advance {
1498 let ts = target_state.as_ref().unwrap();
1499 if !states_config.is_valid_state(ts) {
1500 return Err(anyhow!(
1501 "Auto-advance target state '{}' is not a valid state",
1502 ts
1503 ));
1504 }
1505 }
1506
1507 for task_id in dependent_task_ids {
1508 let task = match get_task_by_id_internal(conn, &task_id)? {
1510 Some(t) => t,
1511 None => continue,
1512 };
1513
1514 if task.status != states_config.initial {
1516 continue;
1517 }
1518
1519 if task.worker_id.is_some() {
1521 continue;
1522 }
1523
1524 let state_placeholders: Vec<String> = states_config
1527 .blocking_states
1528 .iter()
1529 .enumerate()
1530 .map(|(i, _)| format!("?{}", i + 3))
1531 .collect();
1532 let state_clause = state_placeholders.join(", ");
1533
1534 let type_start = states_config.blocking_states.len() + 3;
1536 let type_placeholders2: Vec<String> = start_blocking_types
1537 .iter()
1538 .enumerate()
1539 .map(|(i, _)| format!("?{}", type_start + i))
1540 .collect();
1541 let type_clause2 = type_placeholders2.join(", ");
1542
1543 let blocker_sql = format!(
1544 "SELECT COUNT(*) FROM dependencies d
1545 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1546 WHERE d.to_task_id = ?1
1547 AND d.from_task_id != ?2
1548 AND d.dep_type IN ({})
1549 AND blocker.status IN ({})",
1550 type_clause2, state_clause
1551 );
1552
1553 let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1554 blocker_params.push(Box::new(task_id.clone()));
1555 blocker_params.push(Box::new(completed_task_id.to_string()));
1556 for state in &states_config.blocking_states {
1557 blocker_params.push(Box::new(state.clone()));
1558 }
1559 for t in &start_blocking_types {
1560 blocker_params.push(Box::new(t.to_string()));
1561 }
1562 let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1563 blocker_params.iter().map(|b| b.as_ref()).collect();
1564
1565 let remaining_blockers: i32 =
1566 conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1567
1568 if remaining_blockers > 0 {
1569 continue; }
1571
1572 unblocked.push(task_id.clone());
1574
1575 if should_auto_advance {
1577 let ts = target_state.as_ref().unwrap();
1578
1579 if !states_config.is_valid_transition(&states_config.initial, ts) {
1581 continue;
1583 }
1584
1585 conn.execute(
1587 "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1588 params![ts, now, &task_id],
1589 )?;
1590
1591 let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1593 super::state_transitions::record_state_transition(
1594 conn,
1595 &task_id,
1596 ts,
1597 agent_id,
1598 Some(&reason),
1599 states_config,
1600 )?;
1601
1602 auto_advanced.push(task_id);
1603 }
1604 }
1605
1606 Ok((unblocked, auto_advanced))
1607}