1use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10#[derive(Debug)]
12pub struct RelinkResult {
13 pub unlinked: Vec<(String, String)>,
15 pub linked: Vec<(String, String)>,
17}
18
19fn would_create_cycle_in_tx(
21 tx: &rusqlite::Transaction,
22 from_task_id: &str,
23 to_task_id: &str,
24 dep_type: &str,
25 deps_config: &DependenciesConfig,
26) -> Result<bool> {
27 let def = deps_config
28 .get_definition(dep_type)
29 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31 let mut visited: HashSet<String> = HashSet::new();
34 let mut queue: VecDeque<String> = VecDeque::new();
35 queue.push_back(to_task_id.to_string());
36
37 while let Some(current) = queue.pop_front() {
38 if current == from_task_id {
39 return Ok(true); }
41
42 if visited.contains(¤t) {
43 continue;
44 }
45 visited.insert(current.clone());
46
47 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49 let mut stmt = tx.prepare(
51 "SELECT to_task_id FROM dependencies d
52 JOIN (SELECT value FROM json_each(?1)) types
53 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54 )?;
55 let vertical_types: Vec<&str> = deps_config.vertical_types();
56 let types_json = serde_json::to_string(&vertical_types)?;
57 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
58 .filter_map(|r| r.ok())
59 .collect()
60 } else {
61 let mut stmt = tx.prepare(
63 "SELECT to_task_id FROM dependencies d
64 JOIN (SELECT value FROM json_each(?1)) types
65 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66 )?;
67 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68 let types_json = serde_json::to_string(&start_blocking)?;
69 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
70 .filter_map(|r| r.ok())
71 .collect()
72 };
73
74 for dep in deps {
75 if !visited.contains(&dep) {
76 queue.push_back(dep);
77 }
78 }
79 }
80
81 Ok(false)
82}
83
84fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87 let field = match sort_by {
88 Some("priority") => "CAST(t.priority AS INTEGER)",
89 Some("created_at") => "t.created_at",
90 Some("updated_at") => "t.updated_at",
91 _ => "t.created_at", };
93
94 let order = match sort_order {
95 Some("asc") => "ASC",
96 Some("desc") => "DESC",
97 _ => {
98 "DESC"
100 }
101 };
102
103 format!("{} {}", field, order)
104}
105
106#[derive(Debug)]
108pub enum AddDependencyResult {
109 Created,
111 AlreadyExists,
113 FromTaskNotFound,
115 ToTaskNotFound,
117}
118
119impl Database {
120 pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122 self.with_conn(|conn| {
123 let count: i64 = conn.query_row(
124 "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125 params![task_id],
126 |row| row.get(0),
127 )?;
128 Ok(count > 0)
129 })
130 }
131
132 pub fn add_dependency(
135 &self,
136 from_task_id: &str,
137 to_task_id: &str,
138 dep_type: &str,
139 deps_config: &DependenciesConfig,
140 ) -> Result<()> {
141 match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142 AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143 AddDependencyResult::FromTaskNotFound => {
144 Err(anyhow!("Source task '{}' not found", from_task_id))
145 }
146 AddDependencyResult::ToTaskNotFound => {
147 Err(anyhow!("Target task '{}' not found", to_task_id))
148 }
149 }
150 }
151
152 pub fn add_dependency_soft(
155 &self,
156 from_task_id: &str,
157 to_task_id: &str,
158 dep_type: &str,
159 deps_config: &DependenciesConfig,
160 ) -> Result<AddDependencyResult> {
161 if !deps_config.is_valid_dep_type(dep_type) {
163 return Err(anyhow!(
164 "Invalid dependency type '{}'. Valid types: {:?}",
165 dep_type,
166 deps_config.dep_type_names()
167 ));
168 }
169
170 if !self.task_exists(from_task_id)? {
172 return Ok(AddDependencyResult::FromTaskNotFound);
173 }
174 if !self.task_exists(to_task_id)? {
175 return Ok(AddDependencyResult::ToTaskNotFound);
176 }
177
178 let def = deps_config
180 .get_definition(dep_type)
181 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182 if def.display == DependencyDisplay::Vertical
183 && let Some(existing_parent) = self.get_parent(to_task_id)?
184 && existing_parent != from_task_id
185 {
186 return Err(anyhow!(
187 "Task {} already has parent {}",
188 to_task_id,
189 existing_parent
190 ));
191 }
192
193 if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195 return Err(anyhow!("Adding this dependency would create a cycle"));
196 }
197
198 self.with_conn(|conn| {
199 let changes = conn.execute(
200 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201 params![from_task_id, to_task_id, dep_type],
202 )?;
203 if changes == 0 {
204 Ok(AddDependencyResult::AlreadyExists)
205 } else {
206 Ok(AddDependencyResult::Created)
207 }
208 })
209 }
210
211 pub fn would_create_cycle(
215 &self,
216 from_task_id: &str,
217 to_task_id: &str,
218 dep_type: &str,
219 deps_config: &DependenciesConfig,
220 ) -> Result<bool> {
221 let def = deps_config
222 .get_definition(dep_type)
223 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225 self.with_conn(|conn| {
226 let mut visited: HashSet<String> = HashSet::new();
229 let mut queue: VecDeque<String> = VecDeque::new();
230 queue.push_back(to_task_id.to_string());
231
232 while let Some(current) = queue.pop_front() {
233 if current == from_task_id {
234 return Ok(true); }
236
237 if visited.contains(¤t) {
238 continue;
239 }
240 visited.insert(current.clone());
241
242 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244 let mut stmt = conn.prepare(
246 "SELECT to_task_id FROM dependencies d
247 JOIN (SELECT value FROM json_each(?1)) types
248 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249 )?;
250 let vertical_types: Vec<&str> = deps_config.vertical_types();
251 let types_json = serde_json::to_string(&vertical_types)?;
252 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
253 .filter_map(|r| r.ok())
254 .collect()
255 } else {
256 let mut stmt = conn.prepare(
258 "SELECT to_task_id FROM dependencies d
259 JOIN (SELECT value FROM json_each(?1)) types
260 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261 )?;
262 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263 let types_json = serde_json::to_string(&start_blocking)?;
264 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
265 .filter_map(|r| r.ok())
266 .collect()
267 };
268
269 for dep in deps {
270 if !visited.contains(&dep) {
271 queue.push_back(dep);
272 }
273 }
274 }
275
276 Ok(false)
277 })
278 }
279
280 pub fn remove_dependency(
282 &self,
283 from_task_id: &str,
284 to_task_id: &str,
285 dep_type: &str,
286 ) -> Result<bool> {
287 self.with_conn(|conn| {
288 let rows = conn.execute(
289 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290 params![from_task_id, to_task_id, dep_type],
291 )?;
292 Ok(rows > 0)
293 })
294 }
295
296 pub fn remove_all_outgoing_dependencies(
299 &self,
300 from_task_id: &str,
301 dep_type: &str,
302 ) -> Result<Vec<Dependency>> {
303 self.with_conn_mut(|conn| {
304 let tx = conn.transaction()?;
305
306 let deps: Vec<Dependency> = {
308 let mut stmt = tx.prepare(
309 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310 )?;
311 stmt
312 .query_map(params![from_task_id, dep_type], |row| {
313 Ok(Dependency {
314 from_task_id: row.get(0)?,
315 to_task_id: row.get(1)?,
316 dep_type: row.get(2)?,
317 })
318 })?
319 .filter_map(|r| r.ok())
320 .collect()
321 };
322
323 tx.execute(
325 "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326 params![from_task_id, dep_type],
327 )?;
328
329 tx.commit()?;
330 Ok(deps)
331 })
332 }
333
334 pub fn remove_all_incoming_dependencies(
337 &self,
338 to_task_id: &str,
339 dep_type: &str,
340 ) -> Result<Vec<Dependency>> {
341 self.with_conn_mut(|conn| {
342 let tx = conn.transaction()?;
343
344 let deps: Vec<Dependency> = {
346 let mut stmt = tx.prepare(
347 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348 )?;
349 stmt
350 .query_map(params![to_task_id, dep_type], |row| {
351 Ok(Dependency {
352 from_task_id: row.get(0)?,
353 to_task_id: row.get(1)?,
354 dep_type: row.get(2)?,
355 })
356 })?
357 .filter_map(|r| r.ok())
358 .collect()
359 };
360
361 tx.execute(
363 "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364 params![to_task_id, dep_type],
365 )?;
366
367 tx.commit()?;
368 Ok(deps)
369 })
370 }
371
372 pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374 self.with_conn(|conn| {
375 let mut stmt =
376 conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378 let deps = stmt
379 .query_map([], |row| {
380 let from: String = row.get(0)?;
381 let to: String = row.get(1)?;
382 let dep_type: String = row.get(2)?;
383 Ok(Dependency {
384 from_task_id: from,
385 to_task_id: to,
386 dep_type,
387 })
388 })?
389 .filter_map(|r| r.ok())
390 .collect();
391
392 Ok(deps)
393 })
394 }
395
396 pub fn get_dependencies_by_type(
398 &self,
399 task_id: &str,
400 dep_type: &str,
401 direction: &str,
402 ) -> Result<Vec<Dependency>> {
403 self.with_conn(|conn| {
404 let sql = if direction == "incoming" {
405 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406 } else {
407 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408 };
409
410 let mut stmt = conn.prepare(sql)?;
411
412 let deps = stmt
413 .query_map(params![task_id, dep_type], |row| {
414 let from: String = row.get(0)?;
415 let to: String = row.get(1)?;
416 let dep_type: String = row.get(2)?;
417 Ok(Dependency {
418 from_task_id: from,
419 to_task_id: to,
420 dep_type,
421 })
422 })?
423 .filter_map(|r| r.ok())
424 .collect();
425
426 Ok(deps)
427 })
428 }
429
430 pub fn get_start_blockers(
432 &self,
433 task_id: &str,
434 deps_config: &DependenciesConfig,
435 ) -> Result<Vec<String>> {
436 let start_blocking_types = deps_config.start_blocking_types();
437 if start_blocking_types.is_empty() {
438 return Ok(vec![]);
439 }
440
441 self.with_conn(|conn| {
442 let placeholders: String = start_blocking_types
443 .iter()
444 .enumerate()
445 .map(|(i, _)| format!("?{}", i + 2))
446 .collect::<Vec<_>>()
447 .join(", ");
448
449 let sql = format!(
450 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451 placeholders
452 );
453
454 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455 params_vec.push(Box::new(task_id.to_string()));
456 for t in &start_blocking_types {
457 params_vec.push(Box::new(t.to_string()));
458 }
459 let params_refs: Vec<&dyn rusqlite::ToSql> =
460 params_vec.iter().map(|b| b.as_ref()).collect();
461
462 let mut stmt = conn.prepare(&sql)?;
463 let blockers = stmt
464 .query_map(params_refs.as_slice(), |row| {
465 let id: String = row.get(0)?;
466 Ok(id)
467 })?
468 .filter_map(|r| r.ok())
469 .collect();
470
471 Ok(blockers)
472 })
473 }
474
475 pub fn get_completion_blockers(
478 &self,
479 task_id: &str,
480 deps_config: &DependenciesConfig,
481 ) -> Result<Vec<String>> {
482 let completion_blocking_types = deps_config.completion_blocking_types();
483 if completion_blocking_types.is_empty() {
484 return Ok(vec![]);
485 }
486
487 self.with_conn(|conn| {
488 let placeholders: String = completion_blocking_types
489 .iter()
490 .enumerate()
491 .map(|(i, _)| format!("?{}", i + 2))
492 .collect::<Vec<_>>()
493 .join(", ");
494
495 let sql = format!(
498 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499 placeholders
500 );
501
502 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503 params_vec.push(Box::new(task_id.to_string()));
504 for t in &completion_blocking_types {
505 params_vec.push(Box::new(t.to_string()));
506 }
507 let params_refs: Vec<&dyn rusqlite::ToSql> =
508 params_vec.iter().map(|b| b.as_ref()).collect();
509
510 let mut stmt = conn.prepare(&sql)?;
511 let blockers = stmt
512 .query_map(params_refs.as_slice(), |row| {
513 let id: String = row.get(0)?;
514 Ok(id)
515 })?
516 .filter_map(|r| r.ok())
517 .collect();
518
519 Ok(blockers)
520 })
521 }
522
523 pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525 self.with_conn(|conn| {
526 let result: Result<String, rusqlite::Error> = conn.query_row(
527 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528 params![task_id],
529 |row| row.get(0),
530 );
531
532 match result {
533 Ok(parent_id) => Ok(Some(parent_id)),
534 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535 Err(e) => Err(e.into()),
536 }
537 })
538 }
539
540 pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542 self.with_conn(|conn| {
543 let mut stmt = conn.prepare(
544 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545 )?;
546
547 let children = stmt
548 .query_map(params![task_id], |row| {
549 let id: String = row.get(0)?;
550 Ok(id)
551 })?
552 .filter_map(|r| r.ok())
553 .collect();
554
555 Ok(children)
556 })
557 }
558
559 pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562 self.with_conn(|conn| {
563 let mut stmt = conn.prepare(
564 "SELECT from_task_id FROM dependencies
565 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566 )?;
567
568 let blockers = stmt
569 .query_map(params![task_id], |row| {
570 let id: String = row.get(0)?;
571 Ok(id)
572 })?
573 .filter_map(|r| r.ok())
574 .collect();
575
576 Ok(blockers)
577 })
578 }
579
580 #[allow(dead_code)]
582 pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
583 self.with_conn(|conn| {
584 let mut stmt = conn.prepare(
585 "SELECT to_task_id FROM dependencies
586 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
587 )?;
588
589 let blocking = stmt
590 .query_map(params![task_id], |row| {
591 let id: String = row.get(0)?;
592 Ok(id)
593 })?
594 .filter_map(|r| r.ok())
595 .collect();
596
597 Ok(blocking)
598 })
599 }
600
601 pub fn get_blocked_tasks(
605 &self,
606 states_config: &StatesConfig,
607 deps_config: &DependenciesConfig,
608 sort_by: Option<&str>,
609 sort_order: Option<&str>,
610 ) -> Result<Vec<Task>> {
611 let start_blocking_types = deps_config.start_blocking_types();
612 if start_blocking_types.is_empty() {
613 return Ok(vec![]);
614 }
615
616 self.with_conn(|conn| {
617 let state_placeholders: Vec<String> = states_config
619 .blocking_states
620 .iter()
621 .enumerate()
622 .map(|(i, _)| format!("?{}", i + 2))
623 .collect();
624 let state_clause = state_placeholders.join(", ");
625
626 let type_start = states_config.blocking_states.len() + 2;
628 let type_placeholders: Vec<String> = start_blocking_types
629 .iter()
630 .enumerate()
631 .map(|(i, _)| format!("?{}", type_start + i))
632 .collect();
633 let type_clause = type_placeholders.join(", ");
634
635 let order_clause = build_order_clause(sort_by, sort_order);
637
638 let sql = format!(
639 "SELECT DISTINCT t.*
640 FROM tasks t
641 INNER JOIN dependencies d ON t.id = d.to_task_id
642 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
643 WHERE d.dep_type IN ({})
644 AND blocker.status IN ({})
645 AND t.status = ?1
646 AND t.deleted_at IS NULL
647 ORDER BY {}",
648 type_clause, state_clause, order_clause
649 );
650
651 let mut stmt = conn.prepare(&sql)?;
652
653 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
655 params_vec.push(Box::new(states_config.initial.clone()));
656 for state in &states_config.blocking_states {
657 params_vec.push(Box::new(state.clone()));
658 }
659 for t in &start_blocking_types {
660 params_vec.push(Box::new(t.to_string()));
661 }
662 let params_refs: Vec<&dyn rusqlite::ToSql> =
663 params_vec.iter().map(|b| b.as_ref()).collect();
664
665 let tasks = stmt
666 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
667 .filter_map(|r| r.ok())
668 .collect();
669
670 Ok(tasks)
671 })
672 }
673
674 pub fn get_ready_tasks(
679 &self,
680 agent_id: Option<&str>,
681 states_config: &StatesConfig,
682 deps_config: &DependenciesConfig,
683 sort_by: Option<&str>,
684 sort_order: Option<&str>,
685 ) -> Result<Vec<Task>> {
686 let start_blocking_types = deps_config.start_blocking_types();
687
688 let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
690 Some(self.get_agent_tags(aid)?)
691 } else {
692 None
693 };
694
695 self.with_conn(|conn| {
696 let state_placeholders: Vec<String> = states_config
698 .blocking_states
699 .iter()
700 .enumerate()
701 .map(|(i, _)| format!("?{}", i + 2))
702 .collect();
703 let state_clause = state_placeholders.join(", ");
704
705 let type_start = states_config.blocking_states.len() + 2;
707 let type_placeholders: Vec<String> = start_blocking_types
708 .iter()
709 .enumerate()
710 .map(|(i, _)| format!("?{}", type_start + i))
711 .collect();
712 let type_clause = type_placeholders.join(", ");
713
714 let order_clause = if sort_by.is_some() {
716 build_order_clause(sort_by, sort_order)
717 } else {
718 "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
720 };
721
722 let mut param_idx = type_start + start_blocking_types.len();
724
725 let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
727 let needed_placeholders: Vec<String> = tags
731 .iter()
732 .enumerate()
733 .map(|(i, _)| format!("?{}", param_idx + i))
734 .collect();
735 param_idx += tags.len();
736
737 let needed_clause = if needed_placeholders.is_empty() {
738 "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
740 .to_string()
741 } else {
742 format!(
744 "AND (
745 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
746 OR (
747 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
748 ) = (
749 SELECT COUNT(*) FROM task_needed_tags
750 WHERE task_id = t.id AND tag IN ({})
751 )
752 )",
753 needed_placeholders.join(", ")
754 )
755 };
756
757 let wanted_placeholders: Vec<String> = tags
759 .iter()
760 .enumerate()
761 .map(|(i, _)| format!("?{}", param_idx + i))
762 .collect();
763
764 let wanted_clause = if wanted_placeholders.is_empty() {
765 "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
767 .to_string()
768 } else {
769 format!(
771 "AND (
772 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
773 OR EXISTS (
774 SELECT 1 FROM task_wanted_tags
775 WHERE task_id = t.id AND tag IN ({})
776 )
777 )",
778 wanted_placeholders.join(", ")
779 )
780 };
781
782 (needed_clause, wanted_clause)
783 } else {
784 (String::new(), String::new())
785 };
786
787 let sql = format!(
788 "SELECT t.*
789 FROM tasks t
790 WHERE t.status = ?1
791 AND t.worker_id IS NULL
792 AND t.deleted_at IS NULL
793 AND NOT EXISTS (
794 SELECT 1 FROM dependencies d
795 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
796 WHERE d.to_task_id = t.id
797 AND d.dep_type IN ({})
798 AND blocker.status IN ({})
799 )
800 AND t.id NOT IN (
801 SELECT from_task_id FROM dependencies
802 WHERE dep_type = 'contains'
803 )
804 {}
805 {}
806 ORDER BY {}",
807 type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
808 );
809
810 let mut stmt = conn.prepare(&sql)?;
811
812 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
814 params_vec.push(Box::new(states_config.initial.clone()));
815 for state in &states_config.blocking_states {
816 params_vec.push(Box::new(state.clone()));
817 }
818 for t in &start_blocking_types {
819 params_vec.push(Box::new(t.to_string()));
820 }
821 if let Some(ref tags) = agent_tags {
823 for tag in tags {
824 params_vec.push(Box::new(tag.clone()));
825 }
826 for tag in tags {
827 params_vec.push(Box::new(tag.clone()));
828 }
829 }
830 let params_refs: Vec<&dyn rusqlite::ToSql> =
831 params_vec.iter().map(|b| b.as_ref()).collect();
832
833 let tasks: Vec<Task> = stmt
834 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
835 .filter_map(|r| r.ok())
836 .collect();
837
838 Ok(tasks)
839 })
840 }
841
842 #[allow(dead_code)]
844 pub fn has_unmet_start_dependencies(
845 &self,
846 task_id: &str,
847 states_config: &StatesConfig,
848 deps_config: &DependenciesConfig,
849 ) -> Result<bool> {
850 let start_blocking_types = deps_config.start_blocking_types();
851 if start_blocking_types.is_empty() {
852 return Ok(false);
853 }
854
855 self.with_conn(|conn| {
856 let state_placeholders: Vec<String> = states_config
858 .blocking_states
859 .iter()
860 .enumerate()
861 .map(|(i, _)| format!("?{}", i + 2))
862 .collect();
863 let state_clause = state_placeholders.join(", ");
864
865 let type_start = states_config.blocking_states.len() + 2;
867 let type_placeholders: Vec<String> = start_blocking_types
868 .iter()
869 .enumerate()
870 .map(|(i, _)| format!("?{}", type_start + i))
871 .collect();
872 let type_clause = type_placeholders.join(", ");
873
874 let sql = format!(
875 "SELECT COUNT(*) FROM dependencies d
876 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
877 WHERE d.to_task_id = ?1
878 AND d.dep_type IN ({})
879 AND blocker.status IN ({})",
880 type_clause, state_clause
881 );
882
883 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
884 params_vec.push(Box::new(task_id.to_string()));
885 for state in &states_config.blocking_states {
886 params_vec.push(Box::new(state.clone()));
887 }
888 for t in &start_blocking_types {
889 params_vec.push(Box::new(t.to_string()));
890 }
891 let params_refs: Vec<&dyn rusqlite::ToSql> =
892 params_vec.iter().map(|b| b.as_ref()).collect();
893
894 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
895
896 Ok(count > 0)
897 })
898 }
899
900 pub fn has_incomplete_children(
902 &self,
903 task_id: &str,
904 states_config: &StatesConfig,
905 ) -> Result<bool> {
906 self.with_conn(|conn| {
907 let state_placeholders: Vec<String> = states_config
909 .blocking_states
910 .iter()
911 .enumerate()
912 .map(|(i, _)| format!("?{}", i + 2))
913 .collect();
914 let state_clause = state_placeholders.join(", ");
915
916 let sql = format!(
917 "SELECT COUNT(*) FROM dependencies d
918 INNER JOIN tasks child ON d.to_task_id = child.id
919 WHERE d.from_task_id = ?1
920 AND d.dep_type = 'contains'
921 AND child.status IN ({})",
922 state_clause
923 );
924
925 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
926 params_vec.push(Box::new(task_id.to_string()));
927 for state in &states_config.blocking_states {
928 params_vec.push(Box::new(state.clone()));
929 }
930 let params_refs: Vec<&dyn rusqlite::ToSql> =
931 params_vec.iter().map(|b| b.as_ref()).collect();
932
933 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
934
935 Ok(count > 0)
936 })
937 }
938
939 #[allow(clippy::too_many_arguments)]
946 pub fn list_tasks_with_tag_filters(
947 &self,
948 status: Option<Vec<String>>,
949 owner: Option<&str>,
950 parent_id: Option<Option<&str>>,
951 tags_any: Option<Vec<String>>,
952 tags_all: Option<Vec<String>>,
953 qualified_for_agent_tags: Option<Vec<String>>,
954 limit: Option<i32>,
955 offset: i32,
956 sort_by: Option<&str>,
957 sort_order: Option<&str>,
958 ) -> Result<Vec<Task>> {
959 self.with_conn(|conn| {
960 let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
961 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
962 let mut param_idx = 1;
963
964 if let Some(ref statuses) = status {
966 if statuses.len() == 1 {
967 sql.push_str(&format!(" AND t.status = ?{}", param_idx));
968 params_vec.push(Box::new(statuses[0].clone()));
969 param_idx += 1;
970 } else if statuses.len() > 1 {
971 let placeholders: Vec<String> = statuses
972 .iter()
973 .enumerate()
974 .map(|(i, _)| format!("?{}", param_idx + i))
975 .collect();
976 sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
977 for s in statuses {
978 params_vec.push(Box::new(s.clone()));
979 }
980 param_idx += statuses.len();
981 }
982 }
983
984 if let Some(o) = owner {
986 sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
987 params_vec.push(Box::new(o.to_string()));
988 param_idx += 1;
989 }
990
991 if let Some(p) = parent_id {
993 match p {
994 Some(pid) => {
995 sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
996 params_vec.push(Box::new(pid.to_string()));
997 param_idx += 1;
998 }
999 None => {
1000 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1002 }
1003 }
1004 }
1005
1006 if let Some(ref any_tags) = tags_any
1008 && !any_tags.is_empty() {
1009 let placeholders: Vec<String> = any_tags
1010 .iter()
1011 .enumerate()
1012 .map(|(i, _)| format!("?{}", param_idx + i))
1013 .collect();
1014 sql.push_str(&format!(
1015 " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1016 placeholders.join(", ")
1017 ));
1018 for tag in any_tags {
1019 params_vec.push(Box::new(tag.clone()));
1020 }
1021 param_idx += any_tags.len();
1022 }
1023
1024 if let Some(ref all_tags) = tags_all
1026 && !all_tags.is_empty() {
1027 let placeholders: Vec<String> = all_tags
1028 .iter()
1029 .enumerate()
1030 .map(|(i, _)| format!("?{}", param_idx + i))
1031 .collect();
1032 sql.push_str(&format!(
1034 " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1035 placeholders.join(", "),
1036 all_tags.len()
1037 ));
1038 for tag in all_tags {
1039 params_vec.push(Box::new(tag.clone()));
1040 }
1041 param_idx += all_tags.len();
1042 }
1043
1044 if let Some(ref agent_tags) = qualified_for_agent_tags {
1046 if agent_tags.is_empty() {
1048 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1050 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1052 } else {
1053 let needed_placeholders: Vec<String> = agent_tags
1055 .iter()
1056 .enumerate()
1057 .map(|(i, _)| format!("?{}", param_idx + i))
1058 .collect();
1059 sql.push_str(&format!(
1060 " AND (
1061 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1062 OR (
1063 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1064 ) = (
1065 SELECT COUNT(*) FROM task_needed_tags
1066 WHERE task_id = t.id AND tag IN ({})
1067 )
1068 )",
1069 needed_placeholders.join(", ")
1070 ));
1071 for tag in agent_tags {
1072 params_vec.push(Box::new(tag.clone()));
1073 }
1074 param_idx += agent_tags.len();
1075
1076 let wanted_placeholders: Vec<String> = agent_tags
1078 .iter()
1079 .enumerate()
1080 .map(|(i, _)| format!("?{}", param_idx + i))
1081 .collect();
1082 sql.push_str(&format!(
1083 " AND (
1084 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1085 OR EXISTS (
1086 SELECT 1 FROM task_wanted_tags
1087 WHERE task_id = t.id AND tag IN ({})
1088 )
1089 )",
1090 wanted_placeholders.join(", ")
1091 ));
1092 for tag in agent_tags {
1093 params_vec.push(Box::new(tag.clone()));
1094 }
1095 }
1097 }
1098
1099 let order_clause = build_order_clause(sort_by, sort_order);
1101 sql.push_str(&format!(" ORDER BY {}", order_clause));
1102
1103 if let Some(l) = limit {
1105 sql.push_str(&format!(" LIMIT {}", l));
1106 }
1107
1108 if offset > 0 {
1109 sql.push_str(&format!(" OFFSET {}", offset));
1110 }
1111
1112 let params_refs: Vec<&dyn rusqlite::ToSql> =
1113 params_vec.iter().map(|b| b.as_ref()).collect();
1114
1115 let mut stmt = conn.prepare(&sql)?;
1116 let tasks: Vec<Task> = stmt
1117 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1118 .filter_map(|r| r.ok())
1119 .collect();
1120
1121 Ok(tasks)
1122 })
1123 }
1124
1125 pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1127 self.with_conn(|conn| {
1128 let result: Result<String, rusqlite::Error> = conn.query_row(
1129 "SELECT tags FROM workers WHERE id = ?1",
1130 params![agent_id],
1131 |row| row.get(0),
1132 );
1133
1134 match result {
1135 Ok(tags_json) => {
1136 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1137 Ok(tags)
1138 }
1139 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1140 Err(e) => Err(e.into()),
1141 }
1142 })
1143 }
1144
1145 pub(super) fn add_dependency_internal(
1147 conn: &Connection,
1148 from_task_id: &str,
1149 to_task_id: &str,
1150 dep_type: &str,
1151 ) -> Result<()> {
1152 conn.execute(
1153 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1154 params![from_task_id, to_task_id, dep_type],
1155 )?;
1156 Ok(())
1157 }
1158
1159 pub fn relink(
1163 &self,
1164 prev_from_ids: &[String],
1165 prev_to_ids: &[String],
1166 from_ids: &[String],
1167 to_ids: &[String],
1168 dep_type: &str,
1169 deps_config: &DependenciesConfig,
1170 ) -> Result<RelinkResult> {
1171 if !deps_config.is_valid_dep_type(dep_type) {
1173 return Err(anyhow!(
1174 "Invalid dependency type '{}'. Valid types: {:?}",
1175 dep_type,
1176 deps_config.dep_type_names()
1177 ));
1178 }
1179
1180 let def = deps_config
1181 .get_definition(dep_type)
1182 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1183 let is_vertical = def.display == DependencyDisplay::Vertical;
1184
1185 self.with_conn_mut(|conn| {
1186 let tx = conn.transaction()?;
1187
1188 let mut unlinked = Vec::new();
1189 let mut linked = Vec::new();
1190 let mut errors = Vec::new();
1191
1192 for prev_from in prev_from_ids {
1194 for prev_to in prev_to_ids {
1195 let rows = tx.execute(
1196 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1197 params![prev_from, prev_to, dep_type],
1198 )?;
1199 if rows > 0 {
1200 unlinked.push((prev_from.clone(), prev_to.clone()));
1201 }
1202 }
1203 }
1204
1205 for from_id in from_ids {
1207 for to_id in to_ids {
1208 if is_vertical {
1210 let existing_parent: Option<String> = tx.query_row(
1211 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1212 params![to_id],
1213 |row| row.get(0),
1214 ).optional()?;
1215
1216 if let Some(ref parent) = existing_parent
1217 && parent != from_id {
1218 errors.push(format!(
1219 "Task {} already has parent {}",
1220 to_id, parent
1221 ));
1222 continue;
1223 }
1224 }
1225
1226 if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1228 errors.push(format!(
1229 "Adding dependency {}→{} would create a cycle",
1230 from_id, to_id
1231 ));
1232 continue;
1233 }
1234
1235 tx.execute(
1236 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1237 params![from_id, to_id, dep_type],
1238 )?;
1239 linked.push((from_id.clone(), to_id.clone()));
1240 }
1241 }
1242
1243 if !errors.is_empty() {
1244 tx.rollback()?;
1246 return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1247 }
1248
1249 tx.commit()?;
1250 Ok(RelinkResult { unlinked, linked })
1251 })
1252 }
1253
1254 pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1261 if depth == 0 {
1262 return Ok(vec![]);
1263 }
1264
1265 self.with_conn(|conn| {
1266 let mut visited: HashSet<String> = HashSet::new();
1267 let mut result: Vec<Task> = Vec::new();
1268 let mut current_level: Vec<String> = vec![task_id.to_string()];
1269 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1270
1271 while !current_level.is_empty() && levels_remaining > 0 {
1272 let mut next_level: Vec<String> = Vec::new();
1273
1274 for tid in ¤t_level {
1275 let mut stmt = conn.prepare(
1277 "SELECT DISTINCT d.from_task_id FROM dependencies d
1278 WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1279 )?;
1280
1281 let predecessors: Vec<String> = stmt
1282 .query_map(params![tid], |row| row.get(0))?
1283 .filter_map(|r| r.ok())
1284 .collect();
1285
1286 for pred_id in predecessors {
1287 if !visited.contains(&pred_id) {
1288 visited.insert(pred_id.clone());
1289 if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1290 result.push(task);
1291 }
1292 next_level.push(pred_id);
1293 }
1294 }
1295 }
1296
1297 current_level = next_level;
1298 levels_remaining -= 1;
1299 }
1300
1301 Ok(result)
1302 })
1303 }
1304
1305 pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1308 if depth == 0 {
1309 return Ok(vec![]);
1310 }
1311
1312 self.with_conn(|conn| {
1313 let mut visited: HashSet<String> = HashSet::new();
1314 let mut result: Vec<Task> = Vec::new();
1315 let mut current_level: Vec<String> = vec![task_id.to_string()];
1316 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1317
1318 while !current_level.is_empty() && levels_remaining > 0 {
1319 let mut next_level: Vec<String> = Vec::new();
1320
1321 for tid in ¤t_level {
1322 let mut stmt = conn.prepare(
1324 "SELECT DISTINCT d.to_task_id FROM dependencies d
1325 WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1326 )?;
1327
1328 let successors: Vec<String> = stmt
1329 .query_map(params![tid], |row| row.get(0))?
1330 .filter_map(|r| r.ok())
1331 .collect();
1332
1333 for succ_id in successors {
1334 if !visited.contains(&succ_id) {
1335 visited.insert(succ_id.clone());
1336 if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1337 result.push(task);
1338 }
1339 next_level.push(succ_id);
1340 }
1341 }
1342 }
1343
1344 current_level = next_level;
1345 levels_remaining -= 1;
1346 }
1347
1348 Ok(result)
1349 })
1350 }
1351
1352 pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1355 if depth == 0 {
1356 return Ok(vec![]);
1357 }
1358
1359 self.with_conn(|conn| {
1360 let mut result: Vec<Task> = Vec::new();
1361 let mut current_id = task_id.to_string();
1362 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1363
1364 while levels_remaining > 0 {
1365 let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1367 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1368 params![¤t_id],
1369 |row| row.get(0),
1370 );
1371
1372 match parent_result {
1373 Ok(parent_id) => {
1374 if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1375 result.push(task);
1376 }
1377 current_id = parent_id;
1378 levels_remaining -= 1;
1379 }
1380 Err(rusqlite::Error::QueryReturnedNoRows) => break,
1381 Err(e) => return Err(e.into()),
1382 }
1383 }
1384
1385 Ok(result)
1386 })
1387 }
1388
1389 pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1392 if depth == 0 {
1393 return Ok(vec![]);
1394 }
1395
1396 self.with_conn(|conn| {
1397 let mut visited: HashSet<String> = HashSet::new();
1398 let mut result: Vec<Task> = Vec::new();
1399 let mut current_level: Vec<String> = vec![task_id.to_string()];
1400 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1401
1402 while !current_level.is_empty() && levels_remaining > 0 {
1403 let mut next_level: Vec<String> = Vec::new();
1404
1405 for tid in ¤t_level {
1406 let mut stmt = conn.prepare(
1408 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1409 )?;
1410
1411 let children: Vec<String> = stmt
1412 .query_map(params![tid], |row| row.get(0))?
1413 .filter_map(|r| r.ok())
1414 .collect();
1415
1416 for child_id in children {
1417 if !visited.contains(&child_id) {
1418 visited.insert(child_id.clone());
1419 if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1420 result.push(task);
1421 }
1422 next_level.push(child_id);
1423 }
1424 }
1425 }
1426
1427 current_level = next_level;
1428 levels_remaining -= 1;
1429 }
1430
1431 Ok(result)
1432 })
1433 }
1434}
1435
1436fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1438 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1439 let task = stmt
1440 .query_row(params![task_id], super::tasks::parse_task_row)
1441 .optional()?;
1442 Ok(task)
1443}
1444
1445pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1449 conn: &Connection,
1450 task_id: &str,
1451 states_config: &StatesConfig,
1452 deps_config: &DependenciesConfig,
1453) -> Result<Vec<String>> {
1454 let start_blocking_types = deps_config.start_blocking_types();
1455 if start_blocking_types.is_empty() {
1456 return Ok(vec![]);
1457 }
1458
1459 let state_placeholders: Vec<String> = states_config
1461 .blocking_states
1462 .iter()
1463 .enumerate()
1464 .map(|(i, _)| format!("?{}", i + 2))
1465 .collect();
1466 let state_clause = state_placeholders.join(", ");
1467
1468 let type_start = states_config.blocking_states.len() + 2;
1470 let type_placeholders: Vec<String> = start_blocking_types
1471 .iter()
1472 .enumerate()
1473 .map(|(i, _)| format!("?{}", type_start + i))
1474 .collect();
1475 let type_clause = type_placeholders.join(", ");
1476
1477 let sql = format!(
1478 "SELECT blocker.id FROM dependencies d
1479 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1480 WHERE d.to_task_id = ?1
1481 AND d.dep_type IN ({})
1482 AND blocker.status IN ({})",
1483 type_clause, state_clause
1484 );
1485
1486 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1487 params_vec.push(Box::new(task_id.to_string()));
1488 for state in &states_config.blocking_states {
1489 params_vec.push(Box::new(state.clone()));
1490 }
1491 for t in &start_blocking_types {
1492 params_vec.push(Box::new(t.to_string()));
1493 }
1494 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1495
1496 let mut stmt = conn.prepare(&sql)?;
1497 let blockers = stmt
1498 .query_map(params_refs.as_slice(), |row| {
1499 let id: String = row.get(0)?;
1500 Ok(id)
1501 })?
1502 .filter_map(|r| r.ok())
1503 .collect();
1504
1505 Ok(blockers)
1506}
1507
1508pub(crate) fn propagate_unblock_effects(
1525 conn: &Connection,
1526 completed_task_id: &str,
1527 agent_id: Option<&str>,
1528 states_config: &StatesConfig,
1529 deps_config: &DependenciesConfig,
1530 auto_advance: &AutoAdvanceConfig,
1531) -> Result<(Vec<String>, Vec<String>)> {
1532 let start_blocking_types = deps_config.start_blocking_types();
1534 if start_blocking_types.is_empty() {
1535 return Ok((vec![], vec![]));
1536 }
1537
1538 let type_placeholders: Vec<String> = start_blocking_types
1540 .iter()
1541 .enumerate()
1542 .map(|(i, _)| format!("?{}", i + 2))
1543 .collect();
1544 let type_clause = type_placeholders.join(", ");
1545
1546 let sql = format!(
1547 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1548 type_clause
1549 );
1550
1551 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1552 params_vec.push(Box::new(completed_task_id.to_string()));
1553 for t in &start_blocking_types {
1554 params_vec.push(Box::new(t.to_string()));
1555 }
1556 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1557
1558 let mut stmt = conn.prepare(&sql)?;
1559 let dependent_task_ids: Vec<String> = stmt
1560 .query_map(params_refs.as_slice(), |row| row.get(0))?
1561 .filter_map(|r| r.ok())
1562 .collect();
1563
1564 let mut unblocked = Vec::new();
1565 let mut auto_advanced = Vec::new();
1566 let now = super::now_ms();
1567
1568 let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1570 let target_state = auto_advance.target_state.clone();
1571
1572 if should_auto_advance {
1574 let ts = target_state.as_ref().unwrap();
1575 if !states_config.is_valid_state(ts) {
1576 return Err(anyhow!(
1577 "Auto-advance target state '{}' is not a valid state",
1578 ts
1579 ));
1580 }
1581 }
1582
1583 for task_id in dependent_task_ids {
1584 let task = match get_task_by_id_internal(conn, &task_id)? {
1586 Some(t) => t,
1587 None => continue,
1588 };
1589
1590 if task.status != states_config.initial {
1592 continue;
1593 }
1594
1595 if task.worker_id.is_some() {
1597 continue;
1598 }
1599
1600 let state_placeholders: Vec<String> = states_config
1603 .blocking_states
1604 .iter()
1605 .enumerate()
1606 .map(|(i, _)| format!("?{}", i + 3))
1607 .collect();
1608 let state_clause = state_placeholders.join(", ");
1609
1610 let type_start = states_config.blocking_states.len() + 3;
1612 let type_placeholders2: Vec<String> = start_blocking_types
1613 .iter()
1614 .enumerate()
1615 .map(|(i, _)| format!("?{}", type_start + i))
1616 .collect();
1617 let type_clause2 = type_placeholders2.join(", ");
1618
1619 let blocker_sql = format!(
1620 "SELECT COUNT(*) FROM dependencies d
1621 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1622 WHERE d.to_task_id = ?1
1623 AND d.from_task_id != ?2
1624 AND d.dep_type IN ({})
1625 AND blocker.status IN ({})",
1626 type_clause2, state_clause
1627 );
1628
1629 let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1630 blocker_params.push(Box::new(task_id.clone()));
1631 blocker_params.push(Box::new(completed_task_id.to_string()));
1632 for state in &states_config.blocking_states {
1633 blocker_params.push(Box::new(state.clone()));
1634 }
1635 for t in &start_blocking_types {
1636 blocker_params.push(Box::new(t.to_string()));
1637 }
1638 let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1639 blocker_params.iter().map(|b| b.as_ref()).collect();
1640
1641 let remaining_blockers: i32 =
1642 conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1643
1644 if remaining_blockers > 0 {
1645 continue; }
1647
1648 unblocked.push(task_id.clone());
1650
1651 if should_auto_advance {
1653 let ts = target_state.as_ref().unwrap();
1654
1655 if !states_config.is_valid_transition(&states_config.initial, ts) {
1657 continue;
1659 }
1660
1661 conn.execute(
1663 "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1664 params![ts, now, &task_id],
1665 )?;
1666
1667 let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1669 super::state_transitions::record_state_transition(
1670 conn,
1671 &task_id,
1672 ts,
1673 agent_id,
1674 Some(&reason),
1675 states_config,
1676 )?;
1677
1678 auto_advanced.push(task_id);
1679 }
1680 }
1681
1682 Ok((unblocked, auto_advanced))
1683}