1use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10#[derive(Debug)]
12pub struct RelinkResult {
13 pub unlinked: Vec<(String, String)>,
15 pub linked: Vec<(String, String)>,
17}
18
19fn would_create_cycle_in_tx(
21 tx: &rusqlite::Transaction,
22 from_task_id: &str,
23 to_task_id: &str,
24 dep_type: &str,
25 deps_config: &DependenciesConfig,
26) -> Result<bool> {
27 let def = deps_config
28 .get_definition(dep_type)
29 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31 let mut visited: HashSet<String> = HashSet::new();
34 let mut queue: VecDeque<String> = VecDeque::new();
35 queue.push_back(to_task_id.to_string());
36
37 while let Some(current) = queue.pop_front() {
38 if current == from_task_id {
39 return Ok(true); }
41
42 if visited.contains(¤t) {
43 continue;
44 }
45 visited.insert(current.clone());
46
47 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49 let mut stmt = tx.prepare(
51 "SELECT to_task_id FROM dependencies d
52 JOIN (SELECT value FROM json_each(?1)) types
53 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54 )?;
55 let vertical_types: Vec<&str> = deps_config.vertical_types();
56 let types_json = serde_json::to_string(&vertical_types)?;
57 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
58 .filter_map(|r| r.ok())
59 .collect()
60 } else {
61 let mut stmt = tx.prepare(
63 "SELECT to_task_id FROM dependencies d
64 JOIN (SELECT value FROM json_each(?1)) types
65 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66 )?;
67 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68 let types_json = serde_json::to_string(&start_blocking)?;
69 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
70 .filter_map(|r| r.ok())
71 .collect()
72 };
73
74 for dep in deps {
75 if !visited.contains(&dep) {
76 queue.push_back(dep);
77 }
78 }
79 }
80
81 Ok(false)
82}
83
84fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87 let field = match sort_by {
88 Some("priority") => "CAST(t.priority AS INTEGER)",
89 Some("created_at") => "t.created_at",
90 Some("updated_at") => "t.updated_at",
91 _ => "t.created_at", };
93
94 let order = match sort_order {
95 Some("asc") => "ASC",
96 Some("desc") => "DESC",
97 _ => {
98 "DESC"
100 }
101 };
102
103 format!("{} {}", field, order)
104}
105
106#[derive(Debug)]
108pub enum AddDependencyResult {
109 Created,
111 AlreadyExists,
113 FromTaskNotFound,
115 ToTaskNotFound,
117}
118
119impl Database {
120 pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122 self.with_conn(|conn| {
123 let count: i64 = conn.query_row(
124 "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125 params![task_id],
126 |row| row.get(0),
127 )?;
128 Ok(count > 0)
129 })
130 }
131
132 pub fn add_dependency(
135 &self,
136 from_task_id: &str,
137 to_task_id: &str,
138 dep_type: &str,
139 deps_config: &DependenciesConfig,
140 ) -> Result<()> {
141 match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142 AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143 AddDependencyResult::FromTaskNotFound => {
144 Err(anyhow!("Source task '{}' not found", from_task_id))
145 }
146 AddDependencyResult::ToTaskNotFound => {
147 Err(anyhow!("Target task '{}' not found", to_task_id))
148 }
149 }
150 }
151
152 pub fn add_dependency_soft(
155 &self,
156 from_task_id: &str,
157 to_task_id: &str,
158 dep_type: &str,
159 deps_config: &DependenciesConfig,
160 ) -> Result<AddDependencyResult> {
161 if !deps_config.is_valid_dep_type(dep_type) {
163 return Err(anyhow!(
164 "Invalid dependency type '{}'. Valid types: {:?}",
165 dep_type,
166 deps_config.dep_type_names()
167 ));
168 }
169
170 if !self.task_exists(from_task_id)? {
172 return Ok(AddDependencyResult::FromTaskNotFound);
173 }
174 if !self.task_exists(to_task_id)? {
175 return Ok(AddDependencyResult::ToTaskNotFound);
176 }
177
178 let def = deps_config
180 .get_definition(dep_type)
181 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182 if def.display == DependencyDisplay::Vertical
183 && let Some(existing_parent) = self.get_parent(to_task_id)?
184 && existing_parent != from_task_id
185 {
186 return Err(anyhow!(
187 "Task {} already has parent {}",
188 to_task_id,
189 existing_parent
190 ));
191 }
192
193 if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195 return Err(anyhow!("Adding this dependency would create a cycle"));
196 }
197
198 self.with_conn(|conn| {
199 let changes = conn.execute(
200 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201 params![from_task_id, to_task_id, dep_type],
202 )?;
203 if changes == 0 {
204 Ok(AddDependencyResult::AlreadyExists)
205 } else {
206 Ok(AddDependencyResult::Created)
207 }
208 })
209 }
210
211 pub fn would_create_cycle(
215 &self,
216 from_task_id: &str,
217 to_task_id: &str,
218 dep_type: &str,
219 deps_config: &DependenciesConfig,
220 ) -> Result<bool> {
221 let def = deps_config
222 .get_definition(dep_type)
223 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225 self.with_conn(|conn| {
226 let mut visited: HashSet<String> = HashSet::new();
229 let mut queue: VecDeque<String> = VecDeque::new();
230 queue.push_back(to_task_id.to_string());
231
232 while let Some(current) = queue.pop_front() {
233 if current == from_task_id {
234 return Ok(true); }
236
237 if visited.contains(¤t) {
238 continue;
239 }
240 visited.insert(current.clone());
241
242 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244 let mut stmt = conn.prepare(
246 "SELECT to_task_id FROM dependencies d
247 JOIN (SELECT value FROM json_each(?1)) types
248 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249 )?;
250 let vertical_types: Vec<&str> = deps_config.vertical_types();
251 let types_json = serde_json::to_string(&vertical_types)?;
252 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
253 .filter_map(|r| r.ok())
254 .collect()
255 } else {
256 let mut stmt = conn.prepare(
258 "SELECT to_task_id FROM dependencies d
259 JOIN (SELECT value FROM json_each(?1)) types
260 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261 )?;
262 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263 let types_json = serde_json::to_string(&start_blocking)?;
264 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
265 .filter_map(|r| r.ok())
266 .collect()
267 };
268
269 for dep in deps {
270 if !visited.contains(&dep) {
271 queue.push_back(dep);
272 }
273 }
274 }
275
276 Ok(false)
277 })
278 }
279
280 pub fn remove_dependency(
282 &self,
283 from_task_id: &str,
284 to_task_id: &str,
285 dep_type: &str,
286 ) -> Result<bool> {
287 self.with_conn(|conn| {
288 let rows = conn.execute(
289 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290 params![from_task_id, to_task_id, dep_type],
291 )?;
292 Ok(rows > 0)
293 })
294 }
295
296 pub fn remove_all_outgoing_dependencies(
299 &self,
300 from_task_id: &str,
301 dep_type: &str,
302 ) -> Result<Vec<Dependency>> {
303 self.with_conn_mut(|conn| {
304 let tx = conn.transaction()?;
305
306 let deps: Vec<Dependency> = {
308 let mut stmt = tx.prepare(
309 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310 )?;
311 stmt
312 .query_map(params![from_task_id, dep_type], |row| {
313 Ok(Dependency {
314 from_task_id: row.get(0)?,
315 to_task_id: row.get(1)?,
316 dep_type: row.get(2)?,
317 })
318 })?
319 .filter_map(|r| r.ok())
320 .collect()
321 };
322
323 tx.execute(
325 "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326 params![from_task_id, dep_type],
327 )?;
328
329 tx.commit()?;
330 Ok(deps)
331 })
332 }
333
334 pub fn remove_all_incoming_dependencies(
337 &self,
338 to_task_id: &str,
339 dep_type: &str,
340 ) -> Result<Vec<Dependency>> {
341 self.with_conn_mut(|conn| {
342 let tx = conn.transaction()?;
343
344 let deps: Vec<Dependency> = {
346 let mut stmt = tx.prepare(
347 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348 )?;
349 stmt
350 .query_map(params![to_task_id, dep_type], |row| {
351 Ok(Dependency {
352 from_task_id: row.get(0)?,
353 to_task_id: row.get(1)?,
354 dep_type: row.get(2)?,
355 })
356 })?
357 .filter_map(|r| r.ok())
358 .collect()
359 };
360
361 tx.execute(
363 "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364 params![to_task_id, dep_type],
365 )?;
366
367 tx.commit()?;
368 Ok(deps)
369 })
370 }
371
372 pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374 self.with_conn(|conn| {
375 let mut stmt =
376 conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378 let deps = stmt
379 .query_map([], |row| {
380 let from: String = row.get(0)?;
381 let to: String = row.get(1)?;
382 let dep_type: String = row.get(2)?;
383 Ok(Dependency {
384 from_task_id: from,
385 to_task_id: to,
386 dep_type,
387 })
388 })?
389 .filter_map(|r| r.ok())
390 .collect();
391
392 Ok(deps)
393 })
394 }
395
396 pub fn get_dependencies_by_type(
398 &self,
399 task_id: &str,
400 dep_type: &str,
401 direction: &str,
402 ) -> Result<Vec<Dependency>> {
403 self.with_conn(|conn| {
404 let sql = if direction == "incoming" {
405 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406 } else {
407 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408 };
409
410 let mut stmt = conn.prepare(sql)?;
411
412 let deps = stmt
413 .query_map(params![task_id, dep_type], |row| {
414 let from: String = row.get(0)?;
415 let to: String = row.get(1)?;
416 let dep_type: String = row.get(2)?;
417 Ok(Dependency {
418 from_task_id: from,
419 to_task_id: to,
420 dep_type,
421 })
422 })?
423 .filter_map(|r| r.ok())
424 .collect();
425
426 Ok(deps)
427 })
428 }
429
430 pub fn get_start_blockers(
432 &self,
433 task_id: &str,
434 deps_config: &DependenciesConfig,
435 ) -> Result<Vec<String>> {
436 let start_blocking_types = deps_config.start_blocking_types();
437 if start_blocking_types.is_empty() {
438 return Ok(vec![]);
439 }
440
441 self.with_conn(|conn| {
442 let placeholders: String = start_blocking_types
443 .iter()
444 .enumerate()
445 .map(|(i, _)| format!("?{}", i + 2))
446 .collect::<Vec<_>>()
447 .join(", ");
448
449 let sql = format!(
450 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451 placeholders
452 );
453
454 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455 params_vec.push(Box::new(task_id.to_string()));
456 for t in &start_blocking_types {
457 params_vec.push(Box::new(t.to_string()));
458 }
459 let params_refs: Vec<&dyn rusqlite::ToSql> =
460 params_vec.iter().map(|b| b.as_ref()).collect();
461
462 let mut stmt = conn.prepare(&sql)?;
463 let blockers = stmt
464 .query_map(params_refs.as_slice(), |row| {
465 let id: String = row.get(0)?;
466 Ok(id)
467 })?
468 .filter_map(|r| r.ok())
469 .collect();
470
471 Ok(blockers)
472 })
473 }
474
475 pub fn get_completion_blockers(
478 &self,
479 task_id: &str,
480 deps_config: &DependenciesConfig,
481 ) -> Result<Vec<String>> {
482 let completion_blocking_types = deps_config.completion_blocking_types();
483 if completion_blocking_types.is_empty() {
484 return Ok(vec![]);
485 }
486
487 self.with_conn(|conn| {
488 let placeholders: String = completion_blocking_types
489 .iter()
490 .enumerate()
491 .map(|(i, _)| format!("?{}", i + 2))
492 .collect::<Vec<_>>()
493 .join(", ");
494
495 let sql = format!(
498 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499 placeholders
500 );
501
502 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503 params_vec.push(Box::new(task_id.to_string()));
504 for t in &completion_blocking_types {
505 params_vec.push(Box::new(t.to_string()));
506 }
507 let params_refs: Vec<&dyn rusqlite::ToSql> =
508 params_vec.iter().map(|b| b.as_ref()).collect();
509
510 let mut stmt = conn.prepare(&sql)?;
511 let blockers = stmt
512 .query_map(params_refs.as_slice(), |row| {
513 let id: String = row.get(0)?;
514 Ok(id)
515 })?
516 .filter_map(|r| r.ok())
517 .collect();
518
519 Ok(blockers)
520 })
521 }
522
523 pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525 self.with_conn(|conn| {
526 let result: Result<String, rusqlite::Error> = conn.query_row(
527 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528 params![task_id],
529 |row| row.get(0),
530 );
531
532 match result {
533 Ok(parent_id) => Ok(Some(parent_id)),
534 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535 Err(e) => Err(e.into()),
536 }
537 })
538 }
539
540 pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542 self.with_conn(|conn| {
543 let mut stmt = conn.prepare(
544 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545 )?;
546
547 let children = stmt
548 .query_map(params![task_id], |row| {
549 let id: String = row.get(0)?;
550 Ok(id)
551 })?
552 .filter_map(|r| r.ok())
553 .collect();
554
555 Ok(children)
556 })
557 }
558
559 pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562 self.with_conn(|conn| {
563 let mut stmt = conn.prepare(
564 "SELECT from_task_id FROM dependencies
565 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566 )?;
567
568 let blockers = stmt
569 .query_map(params![task_id], |row| {
570 let id: String = row.get(0)?;
571 Ok(id)
572 })?
573 .filter_map(|r| r.ok())
574 .collect();
575
576 Ok(blockers)
577 })
578 }
579
580 pub fn get_unsatisfied_blockers(
585 &self,
586 task_id: &str,
587 states_config: &StatesConfig,
588 ) -> Result<Vec<String>> {
589 self.with_conn(|conn| {
590 let state_placeholders: Vec<String> = states_config
592 .blocking_states
593 .iter()
594 .enumerate()
595 .map(|(i, _)| format!("?{}", i + 2))
596 .collect();
597 let state_clause = state_placeholders.join(", ");
598
599 let sql = format!(
600 "SELECT d.from_task_id FROM dependencies d
601 INNER JOIN tasks t ON d.from_task_id = t.id
602 WHERE d.to_task_id = ?1
603 AND d.dep_type IN ('blocks', 'follows')
604 AND t.status IN ({})",
605 state_clause
606 );
607
608 let mut stmt = conn.prepare(&sql)?;
609
610 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
612 params_vec.push(Box::new(task_id.to_string()));
613 for state in &states_config.blocking_states {
614 params_vec.push(Box::new(state.clone()));
615 }
616 let params_refs: Vec<&dyn rusqlite::ToSql> =
617 params_vec.iter().map(|b| b.as_ref()).collect();
618
619 let blockers = stmt
620 .query_map(params_refs.as_slice(), |row| {
621 let id: String = row.get(0)?;
622 Ok(id)
623 })?
624 .filter_map(|r| r.ok())
625 .collect();
626
627 Ok(blockers)
628 })
629 }
630
631 #[allow(dead_code)]
633 pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
634 self.with_conn(|conn| {
635 let mut stmt = conn.prepare(
636 "SELECT to_task_id FROM dependencies
637 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
638 )?;
639
640 let blocking = stmt
641 .query_map(params![task_id], |row| {
642 let id: String = row.get(0)?;
643 Ok(id)
644 })?
645 .filter_map(|r| r.ok())
646 .collect();
647
648 Ok(blocking)
649 })
650 }
651
652 pub fn get_blocked_tasks(
656 &self,
657 states_config: &StatesConfig,
658 deps_config: &DependenciesConfig,
659 sort_by: Option<&str>,
660 sort_order: Option<&str>,
661 ) -> Result<Vec<Task>> {
662 let start_blocking_types = deps_config.start_blocking_types();
663 if start_blocking_types.is_empty() {
664 return Ok(vec![]);
665 }
666
667 self.with_conn(|conn| {
668 let state_placeholders: Vec<String> = states_config
670 .blocking_states
671 .iter()
672 .enumerate()
673 .map(|(i, _)| format!("?{}", i + 2))
674 .collect();
675 let state_clause = state_placeholders.join(", ");
676
677 let type_start = states_config.blocking_states.len() + 2;
679 let type_placeholders: Vec<String> = start_blocking_types
680 .iter()
681 .enumerate()
682 .map(|(i, _)| format!("?{}", type_start + i))
683 .collect();
684 let type_clause = type_placeholders.join(", ");
685
686 let order_clause = build_order_clause(sort_by, sort_order);
688
689 let sql = format!(
690 "SELECT DISTINCT t.*
691 FROM tasks t
692 INNER JOIN dependencies d ON t.id = d.to_task_id
693 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
694 WHERE d.dep_type IN ({})
695 AND blocker.status IN ({})
696 AND t.status = ?1
697 AND t.deleted_at IS NULL
698 ORDER BY {}",
699 type_clause, state_clause, order_clause
700 );
701
702 let mut stmt = conn.prepare(&sql)?;
703
704 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
706 params_vec.push(Box::new(states_config.initial.clone()));
707 for state in &states_config.blocking_states {
708 params_vec.push(Box::new(state.clone()));
709 }
710 for t in &start_blocking_types {
711 params_vec.push(Box::new(t.to_string()));
712 }
713 let params_refs: Vec<&dyn rusqlite::ToSql> =
714 params_vec.iter().map(|b| b.as_ref()).collect();
715
716 let tasks = stmt
717 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
718 .filter_map(|r| r.ok())
719 .collect();
720
721 Ok(tasks)
722 })
723 }
724
725 pub fn get_ready_tasks(
730 &self,
731 agent_id: Option<&str>,
732 states_config: &StatesConfig,
733 deps_config: &DependenciesConfig,
734 sort_by: Option<&str>,
735 sort_order: Option<&str>,
736 ) -> Result<Vec<Task>> {
737 let start_blocking_types = deps_config.start_blocking_types();
738
739 let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
741 Some(self.get_agent_tags(aid)?)
742 } else {
743 None
744 };
745
746 self.with_conn(|conn| {
747 let state_placeholders: Vec<String> = states_config
749 .blocking_states
750 .iter()
751 .enumerate()
752 .map(|(i, _)| format!("?{}", i + 2))
753 .collect();
754 let state_clause = state_placeholders.join(", ");
755
756 let type_start = states_config.blocking_states.len() + 2;
758 let type_placeholders: Vec<String> = start_blocking_types
759 .iter()
760 .enumerate()
761 .map(|(i, _)| format!("?{}", type_start + i))
762 .collect();
763 let type_clause = type_placeholders.join(", ");
764
765 let order_clause = if sort_by.is_some() {
767 build_order_clause(sort_by, sort_order)
768 } else {
769 "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
771 };
772
773 let mut param_idx = type_start + start_blocking_types.len();
775
776 let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
778 let needed_placeholders: Vec<String> = tags
782 .iter()
783 .enumerate()
784 .map(|(i, _)| format!("?{}", param_idx + i))
785 .collect();
786 param_idx += tags.len();
787
788 let needed_clause = if needed_placeholders.is_empty() {
789 "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
791 .to_string()
792 } else {
793 format!(
795 "AND (
796 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
797 OR (
798 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
799 ) = (
800 SELECT COUNT(*) FROM task_needed_tags
801 WHERE task_id = t.id AND tag IN ({})
802 )
803 )",
804 needed_placeholders.join(", ")
805 )
806 };
807
808 let wanted_placeholders: Vec<String> = tags
810 .iter()
811 .enumerate()
812 .map(|(i, _)| format!("?{}", param_idx + i))
813 .collect();
814
815 let wanted_clause = if wanted_placeholders.is_empty() {
816 "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
818 .to_string()
819 } else {
820 format!(
822 "AND (
823 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
824 OR EXISTS (
825 SELECT 1 FROM task_wanted_tags
826 WHERE task_id = t.id AND tag IN ({})
827 )
828 )",
829 wanted_placeholders.join(", ")
830 )
831 };
832
833 (needed_clause, wanted_clause)
834 } else {
835 (String::new(), String::new())
836 };
837
838 let sql = format!(
839 "SELECT t.*
840 FROM tasks t
841 WHERE t.status = ?1
842 AND t.worker_id IS NULL
843 AND t.deleted_at IS NULL
844 AND NOT EXISTS (
845 SELECT 1 FROM dependencies d
846 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
847 WHERE d.to_task_id = t.id
848 AND d.dep_type IN ({})
849 AND blocker.status IN ({})
850 )
851 AND t.id NOT IN (
852 SELECT from_task_id FROM dependencies
853 WHERE dep_type = 'contains'
854 )
855 {}
856 {}
857 ORDER BY {}",
858 type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
859 );
860
861 let mut stmt = conn.prepare(&sql)?;
862
863 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
865 params_vec.push(Box::new(states_config.initial.clone()));
866 for state in &states_config.blocking_states {
867 params_vec.push(Box::new(state.clone()));
868 }
869 for t in &start_blocking_types {
870 params_vec.push(Box::new(t.to_string()));
871 }
872 if let Some(ref tags) = agent_tags {
874 for tag in tags {
875 params_vec.push(Box::new(tag.clone()));
876 }
877 for tag in tags {
878 params_vec.push(Box::new(tag.clone()));
879 }
880 }
881 let params_refs: Vec<&dyn rusqlite::ToSql> =
882 params_vec.iter().map(|b| b.as_ref()).collect();
883
884 let tasks: Vec<Task> = stmt
885 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
886 .filter_map(|r| r.ok())
887 .collect();
888
889 Ok(tasks)
890 })
891 }
892
893 #[allow(dead_code)]
895 pub fn has_unmet_start_dependencies(
896 &self,
897 task_id: &str,
898 states_config: &StatesConfig,
899 deps_config: &DependenciesConfig,
900 ) -> Result<bool> {
901 let start_blocking_types = deps_config.start_blocking_types();
902 if start_blocking_types.is_empty() {
903 return Ok(false);
904 }
905
906 self.with_conn(|conn| {
907 let state_placeholders: Vec<String> = states_config
909 .blocking_states
910 .iter()
911 .enumerate()
912 .map(|(i, _)| format!("?{}", i + 2))
913 .collect();
914 let state_clause = state_placeholders.join(", ");
915
916 let type_start = states_config.blocking_states.len() + 2;
918 let type_placeholders: Vec<String> = start_blocking_types
919 .iter()
920 .enumerate()
921 .map(|(i, _)| format!("?{}", type_start + i))
922 .collect();
923 let type_clause = type_placeholders.join(", ");
924
925 let sql = format!(
926 "SELECT COUNT(*) FROM dependencies d
927 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
928 WHERE d.to_task_id = ?1
929 AND d.dep_type IN ({})
930 AND blocker.status IN ({})",
931 type_clause, state_clause
932 );
933
934 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
935 params_vec.push(Box::new(task_id.to_string()));
936 for state in &states_config.blocking_states {
937 params_vec.push(Box::new(state.clone()));
938 }
939 for t in &start_blocking_types {
940 params_vec.push(Box::new(t.to_string()));
941 }
942 let params_refs: Vec<&dyn rusqlite::ToSql> =
943 params_vec.iter().map(|b| b.as_ref()).collect();
944
945 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
946
947 Ok(count > 0)
948 })
949 }
950
951 pub fn has_incomplete_children(
953 &self,
954 task_id: &str,
955 states_config: &StatesConfig,
956 ) -> Result<bool> {
957 self.with_conn(|conn| {
958 let state_placeholders: Vec<String> = states_config
960 .blocking_states
961 .iter()
962 .enumerate()
963 .map(|(i, _)| format!("?{}", i + 2))
964 .collect();
965 let state_clause = state_placeholders.join(", ");
966
967 let sql = format!(
968 "SELECT COUNT(*) FROM dependencies d
969 INNER JOIN tasks child ON d.to_task_id = child.id
970 WHERE d.from_task_id = ?1
971 AND d.dep_type = 'contains'
972 AND child.status IN ({})",
973 state_clause
974 );
975
976 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
977 params_vec.push(Box::new(task_id.to_string()));
978 for state in &states_config.blocking_states {
979 params_vec.push(Box::new(state.clone()));
980 }
981 let params_refs: Vec<&dyn rusqlite::ToSql> =
982 params_vec.iter().map(|b| b.as_ref()).collect();
983
984 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
985
986 Ok(count > 0)
987 })
988 }
989
990 #[allow(clippy::too_many_arguments)]
997 pub fn list_tasks_with_tag_filters(
998 &self,
999 status: Option<Vec<String>>,
1000 owner: Option<&str>,
1001 parent_id: Option<Option<&str>>,
1002 tags_any: Option<Vec<String>>,
1003 tags_all: Option<Vec<String>>,
1004 qualified_for_agent_tags: Option<Vec<String>>,
1005 limit: Option<i32>,
1006 offset: i32,
1007 sort_by: Option<&str>,
1008 sort_order: Option<&str>,
1009 ) -> Result<Vec<Task>> {
1010 self.with_conn(|conn| {
1011 let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
1012 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1013 let mut param_idx = 1;
1014
1015 if let Some(ref statuses) = status {
1017 if statuses.len() == 1 {
1018 sql.push_str(&format!(" AND t.status = ?{}", param_idx));
1019 params_vec.push(Box::new(statuses[0].clone()));
1020 param_idx += 1;
1021 } else if statuses.len() > 1 {
1022 let placeholders: Vec<String> = statuses
1023 .iter()
1024 .enumerate()
1025 .map(|(i, _)| format!("?{}", param_idx + i))
1026 .collect();
1027 sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
1028 for s in statuses {
1029 params_vec.push(Box::new(s.clone()));
1030 }
1031 param_idx += statuses.len();
1032 }
1033 }
1034
1035 if let Some(o) = owner {
1037 sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
1038 params_vec.push(Box::new(o.to_string()));
1039 param_idx += 1;
1040 }
1041
1042 if let Some(p) = parent_id {
1044 match p {
1045 Some(pid) => {
1046 sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
1047 params_vec.push(Box::new(pid.to_string()));
1048 param_idx += 1;
1049 }
1050 None => {
1051 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1053 }
1054 }
1055 }
1056
1057 if let Some(ref any_tags) = tags_any
1059 && !any_tags.is_empty() {
1060 let placeholders: Vec<String> = any_tags
1061 .iter()
1062 .enumerate()
1063 .map(|(i, _)| format!("?{}", param_idx + i))
1064 .collect();
1065 sql.push_str(&format!(
1066 " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1067 placeholders.join(", ")
1068 ));
1069 for tag in any_tags {
1070 params_vec.push(Box::new(tag.clone()));
1071 }
1072 param_idx += any_tags.len();
1073 }
1074
1075 if let Some(ref all_tags) = tags_all
1077 && !all_tags.is_empty() {
1078 let placeholders: Vec<String> = all_tags
1079 .iter()
1080 .enumerate()
1081 .map(|(i, _)| format!("?{}", param_idx + i))
1082 .collect();
1083 sql.push_str(&format!(
1085 " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1086 placeholders.join(", "),
1087 all_tags.len()
1088 ));
1089 for tag in all_tags {
1090 params_vec.push(Box::new(tag.clone()));
1091 }
1092 param_idx += all_tags.len();
1093 }
1094
1095 if let Some(ref agent_tags) = qualified_for_agent_tags {
1097 if agent_tags.is_empty() {
1099 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1101 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1103 } else {
1104 let needed_placeholders: Vec<String> = agent_tags
1106 .iter()
1107 .enumerate()
1108 .map(|(i, _)| format!("?{}", param_idx + i))
1109 .collect();
1110 sql.push_str(&format!(
1111 " AND (
1112 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1113 OR (
1114 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1115 ) = (
1116 SELECT COUNT(*) FROM task_needed_tags
1117 WHERE task_id = t.id AND tag IN ({})
1118 )
1119 )",
1120 needed_placeholders.join(", ")
1121 ));
1122 for tag in agent_tags {
1123 params_vec.push(Box::new(tag.clone()));
1124 }
1125 param_idx += agent_tags.len();
1126
1127 let wanted_placeholders: Vec<String> = agent_tags
1129 .iter()
1130 .enumerate()
1131 .map(|(i, _)| format!("?{}", param_idx + i))
1132 .collect();
1133 sql.push_str(&format!(
1134 " AND (
1135 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1136 OR EXISTS (
1137 SELECT 1 FROM task_wanted_tags
1138 WHERE task_id = t.id AND tag IN ({})
1139 )
1140 )",
1141 wanted_placeholders.join(", ")
1142 ));
1143 for tag in agent_tags {
1144 params_vec.push(Box::new(tag.clone()));
1145 }
1146 }
1148 }
1149
1150 let order_clause = build_order_clause(sort_by, sort_order);
1152 sql.push_str(&format!(" ORDER BY {}", order_clause));
1153
1154 if let Some(l) = limit {
1156 sql.push_str(&format!(" LIMIT {}", l));
1157 }
1158
1159 if offset > 0 {
1160 sql.push_str(&format!(" OFFSET {}", offset));
1161 }
1162
1163 let params_refs: Vec<&dyn rusqlite::ToSql> =
1164 params_vec.iter().map(|b| b.as_ref()).collect();
1165
1166 let mut stmt = conn.prepare(&sql)?;
1167 let tasks: Vec<Task> = stmt
1168 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1169 .filter_map(|r| r.ok())
1170 .collect();
1171
1172 Ok(tasks)
1173 })
1174 }
1175
1176 pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1178 self.with_conn(|conn| {
1179 let result: Result<String, rusqlite::Error> = conn.query_row(
1180 "SELECT tags FROM workers WHERE id = ?1",
1181 params![agent_id],
1182 |row| row.get(0),
1183 );
1184
1185 match result {
1186 Ok(tags_json) => {
1187 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1188 Ok(tags)
1189 }
1190 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1191 Err(e) => Err(e.into()),
1192 }
1193 })
1194 }
1195
1196 pub(super) fn add_dependency_internal(
1198 conn: &Connection,
1199 from_task_id: &str,
1200 to_task_id: &str,
1201 dep_type: &str,
1202 ) -> Result<()> {
1203 conn.execute(
1204 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1205 params![from_task_id, to_task_id, dep_type],
1206 )?;
1207 Ok(())
1208 }
1209
1210 pub fn relink(
1214 &self,
1215 prev_from_ids: &[String],
1216 prev_to_ids: &[String],
1217 from_ids: &[String],
1218 to_ids: &[String],
1219 dep_type: &str,
1220 deps_config: &DependenciesConfig,
1221 ) -> Result<RelinkResult> {
1222 if !deps_config.is_valid_dep_type(dep_type) {
1224 return Err(anyhow!(
1225 "Invalid dependency type '{}'. Valid types: {:?}",
1226 dep_type,
1227 deps_config.dep_type_names()
1228 ));
1229 }
1230
1231 let def = deps_config
1232 .get_definition(dep_type)
1233 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1234 let is_vertical = def.display == DependencyDisplay::Vertical;
1235
1236 self.with_conn_mut(|conn| {
1237 let tx = conn.transaction()?;
1238
1239 let mut unlinked = Vec::new();
1240 let mut linked = Vec::new();
1241 let mut errors = Vec::new();
1242
1243 for prev_from in prev_from_ids {
1245 for prev_to in prev_to_ids {
1246 let rows = tx.execute(
1247 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1248 params![prev_from, prev_to, dep_type],
1249 )?;
1250 if rows > 0 {
1251 unlinked.push((prev_from.clone(), prev_to.clone()));
1252 }
1253 }
1254 }
1255
1256 for from_id in from_ids {
1258 for to_id in to_ids {
1259 if is_vertical {
1261 let existing_parent: Option<String> = tx.query_row(
1262 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1263 params![to_id],
1264 |row| row.get(0),
1265 ).optional()?;
1266
1267 if let Some(ref parent) = existing_parent
1268 && parent != from_id {
1269 errors.push(format!(
1270 "Task {} already has parent {}",
1271 to_id, parent
1272 ));
1273 continue;
1274 }
1275 }
1276
1277 if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1279 errors.push(format!(
1280 "Adding dependency {}→{} would create a cycle",
1281 from_id, to_id
1282 ));
1283 continue;
1284 }
1285
1286 tx.execute(
1287 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1288 params![from_id, to_id, dep_type],
1289 )?;
1290 linked.push((from_id.clone(), to_id.clone()));
1291 }
1292 }
1293
1294 if !errors.is_empty() {
1295 tx.rollback()?;
1297 return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1298 }
1299
1300 tx.commit()?;
1301 Ok(RelinkResult { unlinked, linked })
1302 })
1303 }
1304
1305 pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1312 if depth == 0 {
1313 return Ok(vec![]);
1314 }
1315
1316 self.with_conn(|conn| {
1317 let mut visited: HashSet<String> = HashSet::new();
1318 let mut result: Vec<Task> = Vec::new();
1319 let mut current_level: Vec<String> = vec![task_id.to_string()];
1320 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1321
1322 while !current_level.is_empty() && levels_remaining > 0 {
1323 let mut next_level: Vec<String> = Vec::new();
1324
1325 for tid in ¤t_level {
1326 let mut stmt = conn.prepare(
1328 "SELECT DISTINCT d.from_task_id FROM dependencies d
1329 WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1330 )?;
1331
1332 let predecessors: Vec<String> = stmt
1333 .query_map(params![tid], |row| row.get(0))?
1334 .filter_map(|r| r.ok())
1335 .collect();
1336
1337 for pred_id in predecessors {
1338 if !visited.contains(&pred_id) {
1339 visited.insert(pred_id.clone());
1340 if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1341 result.push(task);
1342 }
1343 next_level.push(pred_id);
1344 }
1345 }
1346 }
1347
1348 current_level = next_level;
1349 levels_remaining -= 1;
1350 }
1351
1352 Ok(result)
1353 })
1354 }
1355
1356 pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1359 if depth == 0 {
1360 return Ok(vec![]);
1361 }
1362
1363 self.with_conn(|conn| {
1364 let mut visited: HashSet<String> = HashSet::new();
1365 let mut result: Vec<Task> = Vec::new();
1366 let mut current_level: Vec<String> = vec![task_id.to_string()];
1367 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1368
1369 while !current_level.is_empty() && levels_remaining > 0 {
1370 let mut next_level: Vec<String> = Vec::new();
1371
1372 for tid in ¤t_level {
1373 let mut stmt = conn.prepare(
1375 "SELECT DISTINCT d.to_task_id FROM dependencies d
1376 WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1377 )?;
1378
1379 let successors: Vec<String> = stmt
1380 .query_map(params![tid], |row| row.get(0))?
1381 .filter_map(|r| r.ok())
1382 .collect();
1383
1384 for succ_id in successors {
1385 if !visited.contains(&succ_id) {
1386 visited.insert(succ_id.clone());
1387 if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1388 result.push(task);
1389 }
1390 next_level.push(succ_id);
1391 }
1392 }
1393 }
1394
1395 current_level = next_level;
1396 levels_remaining -= 1;
1397 }
1398
1399 Ok(result)
1400 })
1401 }
1402
1403 pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1406 if depth == 0 {
1407 return Ok(vec![]);
1408 }
1409
1410 self.with_conn(|conn| {
1411 let mut result: Vec<Task> = Vec::new();
1412 let mut current_id = task_id.to_string();
1413 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1414
1415 while levels_remaining > 0 {
1416 let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1418 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1419 params![¤t_id],
1420 |row| row.get(0),
1421 );
1422
1423 match parent_result {
1424 Ok(parent_id) => {
1425 if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1426 result.push(task);
1427 }
1428 current_id = parent_id;
1429 levels_remaining -= 1;
1430 }
1431 Err(rusqlite::Error::QueryReturnedNoRows) => break,
1432 Err(e) => return Err(e.into()),
1433 }
1434 }
1435
1436 Ok(result)
1437 })
1438 }
1439
1440 pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1443 if depth == 0 {
1444 return Ok(vec![]);
1445 }
1446
1447 self.with_conn(|conn| {
1448 let mut visited: HashSet<String> = HashSet::new();
1449 let mut result: Vec<Task> = Vec::new();
1450 let mut current_level: Vec<String> = vec![task_id.to_string()];
1451 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1452
1453 while !current_level.is_empty() && levels_remaining > 0 {
1454 let mut next_level: Vec<String> = Vec::new();
1455
1456 for tid in ¤t_level {
1457 let mut stmt = conn.prepare(
1459 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1460 )?;
1461
1462 let children: Vec<String> = stmt
1463 .query_map(params![tid], |row| row.get(0))?
1464 .filter_map(|r| r.ok())
1465 .collect();
1466
1467 for child_id in children {
1468 if !visited.contains(&child_id) {
1469 visited.insert(child_id.clone());
1470 if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1471 result.push(task);
1472 }
1473 next_level.push(child_id);
1474 }
1475 }
1476 }
1477
1478 current_level = next_level;
1479 levels_remaining -= 1;
1480 }
1481
1482 Ok(result)
1483 })
1484 }
1485}
1486
1487fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1489 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1490 let task = stmt
1491 .query_row(params![task_id], super::tasks::parse_task_row)
1492 .optional()?;
1493 Ok(task)
1494}
1495
1496pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1500 conn: &Connection,
1501 task_id: &str,
1502 states_config: &StatesConfig,
1503 deps_config: &DependenciesConfig,
1504) -> Result<Vec<String>> {
1505 let start_blocking_types = deps_config.start_blocking_types();
1506 if start_blocking_types.is_empty() {
1507 return Ok(vec![]);
1508 }
1509
1510 let state_placeholders: Vec<String> = states_config
1512 .blocking_states
1513 .iter()
1514 .enumerate()
1515 .map(|(i, _)| format!("?{}", i + 2))
1516 .collect();
1517 let state_clause = state_placeholders.join(", ");
1518
1519 let type_start = states_config.blocking_states.len() + 2;
1521 let type_placeholders: Vec<String> = start_blocking_types
1522 .iter()
1523 .enumerate()
1524 .map(|(i, _)| format!("?{}", type_start + i))
1525 .collect();
1526 let type_clause = type_placeholders.join(", ");
1527
1528 let sql = format!(
1529 "SELECT blocker.id FROM dependencies d
1530 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1531 WHERE d.to_task_id = ?1
1532 AND d.dep_type IN ({})
1533 AND blocker.status IN ({})",
1534 type_clause, state_clause
1535 );
1536
1537 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1538 params_vec.push(Box::new(task_id.to_string()));
1539 for state in &states_config.blocking_states {
1540 params_vec.push(Box::new(state.clone()));
1541 }
1542 for t in &start_blocking_types {
1543 params_vec.push(Box::new(t.to_string()));
1544 }
1545 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1546
1547 let mut stmt = conn.prepare(&sql)?;
1548 let blockers = stmt
1549 .query_map(params_refs.as_slice(), |row| {
1550 let id: String = row.get(0)?;
1551 Ok(id)
1552 })?
1553 .filter_map(|r| r.ok())
1554 .collect();
1555
1556 Ok(blockers)
1557}
1558
1559pub(crate) fn propagate_unblock_effects(
1576 conn: &Connection,
1577 completed_task_id: &str,
1578 agent_id: Option<&str>,
1579 states_config: &StatesConfig,
1580 deps_config: &DependenciesConfig,
1581 auto_advance: &AutoAdvanceConfig,
1582) -> Result<(Vec<String>, Vec<String>)> {
1583 let start_blocking_types = deps_config.start_blocking_types();
1585 if start_blocking_types.is_empty() {
1586 return Ok((vec![], vec![]));
1587 }
1588
1589 let type_placeholders: Vec<String> = start_blocking_types
1591 .iter()
1592 .enumerate()
1593 .map(|(i, _)| format!("?{}", i + 2))
1594 .collect();
1595 let type_clause = type_placeholders.join(", ");
1596
1597 let sql = format!(
1598 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1599 type_clause
1600 );
1601
1602 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1603 params_vec.push(Box::new(completed_task_id.to_string()));
1604 for t in &start_blocking_types {
1605 params_vec.push(Box::new(t.to_string()));
1606 }
1607 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1608
1609 let mut stmt = conn.prepare(&sql)?;
1610 let dependent_task_ids: Vec<String> = stmt
1611 .query_map(params_refs.as_slice(), |row| row.get(0))?
1612 .filter_map(|r| r.ok())
1613 .collect();
1614
1615 let mut unblocked = Vec::new();
1616 let mut auto_advanced = Vec::new();
1617 let now = super::now_ms();
1618
1619 let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1621 let target_state = auto_advance.target_state.clone();
1622
1623 if should_auto_advance {
1625 let ts = target_state.as_ref().unwrap();
1626 if !states_config.is_valid_state(ts) {
1627 return Err(anyhow!(
1628 "Auto-advance target state '{}' is not a valid state",
1629 ts
1630 ));
1631 }
1632 }
1633
1634 for task_id in dependent_task_ids {
1635 let task = match get_task_by_id_internal(conn, &task_id)? {
1637 Some(t) => t,
1638 None => continue,
1639 };
1640
1641 if task.status != states_config.initial {
1643 continue;
1644 }
1645
1646 if task.worker_id.is_some() {
1648 continue;
1649 }
1650
1651 let state_placeholders: Vec<String> = states_config
1654 .blocking_states
1655 .iter()
1656 .enumerate()
1657 .map(|(i, _)| format!("?{}", i + 3))
1658 .collect();
1659 let state_clause = state_placeholders.join(", ");
1660
1661 let type_start = states_config.blocking_states.len() + 3;
1663 let type_placeholders2: Vec<String> = start_blocking_types
1664 .iter()
1665 .enumerate()
1666 .map(|(i, _)| format!("?{}", type_start + i))
1667 .collect();
1668 let type_clause2 = type_placeholders2.join(", ");
1669
1670 let blocker_sql = format!(
1671 "SELECT COUNT(*) FROM dependencies d
1672 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1673 WHERE d.to_task_id = ?1
1674 AND d.from_task_id != ?2
1675 AND d.dep_type IN ({})
1676 AND blocker.status IN ({})",
1677 type_clause2, state_clause
1678 );
1679
1680 let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1681 blocker_params.push(Box::new(task_id.clone()));
1682 blocker_params.push(Box::new(completed_task_id.to_string()));
1683 for state in &states_config.blocking_states {
1684 blocker_params.push(Box::new(state.clone()));
1685 }
1686 for t in &start_blocking_types {
1687 blocker_params.push(Box::new(t.to_string()));
1688 }
1689 let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1690 blocker_params.iter().map(|b| b.as_ref()).collect();
1691
1692 let remaining_blockers: i32 =
1693 conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1694
1695 if remaining_blockers > 0 {
1696 continue; }
1698
1699 unblocked.push(task_id.clone());
1701
1702 if should_auto_advance {
1704 let ts = target_state.as_ref().unwrap();
1705
1706 if !states_config.is_valid_transition(&states_config.initial, ts) {
1708 continue;
1710 }
1711
1712 conn.execute(
1714 "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1715 params![ts, now, &task_id],
1716 )?;
1717
1718 let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1720 super::state_transitions::record_state_transition(
1721 conn,
1722 &task_id,
1723 ts,
1724 agent_id,
1725 Some(&reason),
1726 states_config,
1727 )?;
1728
1729 auto_advanced.push(task_id);
1730 }
1731 }
1732
1733 Ok((unblocked, auto_advanced))
1734}