1use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10#[derive(Debug)]
12pub struct RelinkResult {
13 pub unlinked: Vec<(String, String)>,
15 pub linked: Vec<(String, String)>,
17}
18
19fn would_create_cycle_in_tx(
21 tx: &rusqlite::Transaction,
22 from_task_id: &str,
23 to_task_id: &str,
24 dep_type: &str,
25 deps_config: &DependenciesConfig,
26) -> Result<bool> {
27 let def = deps_config
28 .get_definition(dep_type)
29 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31 let mut visited: HashSet<String> = HashSet::new();
34 let mut queue: VecDeque<String> = VecDeque::new();
35 queue.push_back(to_task_id.to_string());
36
37 while let Some(current) = queue.pop_front() {
38 if current == from_task_id {
39 return Ok(true); }
41
42 if visited.contains(¤t) {
43 continue;
44 }
45 visited.insert(current.clone());
46
47 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49 let mut stmt = tx.prepare(
51 "SELECT to_task_id FROM dependencies d
52 JOIN (SELECT value FROM json_each(?1)) types
53 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54 )?;
55 let vertical_types: Vec<&str> = deps_config.vertical_types();
56 let types_json = serde_json::to_string(&vertical_types)?;
57 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
58 .filter_map(|r| r.ok())
59 .collect()
60 } else {
61 let mut stmt = tx.prepare(
63 "SELECT to_task_id FROM dependencies d
64 JOIN (SELECT value FROM json_each(?1)) types
65 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66 )?;
67 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68 let types_json = serde_json::to_string(&start_blocking)?;
69 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
70 .filter_map(|r| r.ok())
71 .collect()
72 };
73
74 for dep in deps {
75 if !visited.contains(&dep) {
76 queue.push_back(dep);
77 }
78 }
79 }
80
81 Ok(false)
82}
83
84fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87 let field = match sort_by {
88 Some("priority") => "CAST(t.priority AS INTEGER)",
89 Some("created_at") => "t.created_at",
90 Some("updated_at") => "t.updated_at",
91 _ => "t.created_at", };
93
94 let order = match sort_order {
95 Some("asc") => "ASC",
96 Some("desc") => "DESC",
97 _ => {
98 "DESC"
100 }
101 };
102
103 format!("{} {}", field, order)
104}
105
106#[derive(Debug)]
108pub enum AddDependencyResult {
109 Created,
111 AlreadyExists,
113 FromTaskNotFound,
115 ToTaskNotFound,
117}
118
119impl Database {
120 pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122 self.with_conn(|conn| {
123 let count: i64 = conn.query_row(
124 "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125 params![task_id],
126 |row| row.get(0),
127 )?;
128 Ok(count > 0)
129 })
130 }
131
132 pub fn add_dependency(
135 &self,
136 from_task_id: &str,
137 to_task_id: &str,
138 dep_type: &str,
139 deps_config: &DependenciesConfig,
140 ) -> Result<()> {
141 match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142 AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143 AddDependencyResult::FromTaskNotFound => {
144 Err(anyhow!("Source task '{}' not found", from_task_id))
145 }
146 AddDependencyResult::ToTaskNotFound => {
147 Err(anyhow!("Target task '{}' not found", to_task_id))
148 }
149 }
150 }
151
152 pub fn add_dependency_soft(
155 &self,
156 from_task_id: &str,
157 to_task_id: &str,
158 dep_type: &str,
159 deps_config: &DependenciesConfig,
160 ) -> Result<AddDependencyResult> {
161 if !deps_config.is_valid_dep_type(dep_type) {
163 return Err(anyhow!(
164 "Invalid dependency type '{}'. Valid types: {:?}",
165 dep_type,
166 deps_config.dep_type_names()
167 ));
168 }
169
170 if !self.task_exists(from_task_id)? {
172 return Ok(AddDependencyResult::FromTaskNotFound);
173 }
174 if !self.task_exists(to_task_id)? {
175 return Ok(AddDependencyResult::ToTaskNotFound);
176 }
177
178 let def = deps_config
180 .get_definition(dep_type)
181 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182 if def.display == DependencyDisplay::Vertical
183 && let Some(existing_parent) = self.get_parent(to_task_id)?
184 && existing_parent != from_task_id
185 {
186 return Err(anyhow!(
187 "Task {} already has parent {}",
188 to_task_id,
189 existing_parent
190 ));
191 }
192
193 if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195 return Err(anyhow!("Adding this dependency would create a cycle"));
196 }
197
198 self.with_conn(|conn| {
199 let changes = conn.execute(
200 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201 params![from_task_id, to_task_id, dep_type],
202 )?;
203 if changes == 0 {
204 Ok(AddDependencyResult::AlreadyExists)
205 } else {
206 Ok(AddDependencyResult::Created)
207 }
208 })
209 }
210
211 pub fn would_create_cycle(
215 &self,
216 from_task_id: &str,
217 to_task_id: &str,
218 dep_type: &str,
219 deps_config: &DependenciesConfig,
220 ) -> Result<bool> {
221 let def = deps_config
222 .get_definition(dep_type)
223 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225 self.with_conn(|conn| {
226 let mut visited: HashSet<String> = HashSet::new();
229 let mut queue: VecDeque<String> = VecDeque::new();
230 queue.push_back(to_task_id.to_string());
231
232 while let Some(current) = queue.pop_front() {
233 if current == from_task_id {
234 return Ok(true); }
236
237 if visited.contains(¤t) {
238 continue;
239 }
240 visited.insert(current.clone());
241
242 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244 let mut stmt = conn.prepare(
246 "SELECT to_task_id FROM dependencies d
247 JOIN (SELECT value FROM json_each(?1)) types
248 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249 )?;
250 let vertical_types: Vec<&str> = deps_config.vertical_types();
251 let types_json = serde_json::to_string(&vertical_types)?;
252 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
253 .filter_map(|r| r.ok())
254 .collect()
255 } else {
256 let mut stmt = conn.prepare(
258 "SELECT to_task_id FROM dependencies d
259 JOIN (SELECT value FROM json_each(?1)) types
260 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261 )?;
262 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263 let types_json = serde_json::to_string(&start_blocking)?;
264 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
265 .filter_map(|r| r.ok())
266 .collect()
267 };
268
269 for dep in deps {
270 if !visited.contains(&dep) {
271 queue.push_back(dep);
272 }
273 }
274 }
275
276 Ok(false)
277 })
278 }
279
280 pub fn remove_dependency(
282 &self,
283 from_task_id: &str,
284 to_task_id: &str,
285 dep_type: &str,
286 ) -> Result<bool> {
287 self.with_conn(|conn| {
288 let rows = conn.execute(
289 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290 params![from_task_id, to_task_id, dep_type],
291 )?;
292 Ok(rows > 0)
293 })
294 }
295
296 pub fn remove_all_outgoing_dependencies(
299 &self,
300 from_task_id: &str,
301 dep_type: &str,
302 ) -> Result<Vec<Dependency>> {
303 self.with_conn_mut(|conn| {
304 let tx = conn.transaction()?;
305
306 let deps: Vec<Dependency> = {
308 let mut stmt = tx.prepare(
309 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310 )?;
311 stmt
312 .query_map(params![from_task_id, dep_type], |row| {
313 Ok(Dependency {
314 from_task_id: row.get(0)?,
315 to_task_id: row.get(1)?,
316 dep_type: row.get(2)?,
317 })
318 })?
319 .filter_map(|r| r.ok())
320 .collect()
321 };
322
323 tx.execute(
325 "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326 params![from_task_id, dep_type],
327 )?;
328
329 tx.commit()?;
330 Ok(deps)
331 })
332 }
333
334 pub fn remove_all_incoming_dependencies(
337 &self,
338 to_task_id: &str,
339 dep_type: &str,
340 ) -> Result<Vec<Dependency>> {
341 self.with_conn_mut(|conn| {
342 let tx = conn.transaction()?;
343
344 let deps: Vec<Dependency> = {
346 let mut stmt = tx.prepare(
347 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348 )?;
349 stmt
350 .query_map(params![to_task_id, dep_type], |row| {
351 Ok(Dependency {
352 from_task_id: row.get(0)?,
353 to_task_id: row.get(1)?,
354 dep_type: row.get(2)?,
355 })
356 })?
357 .filter_map(|r| r.ok())
358 .collect()
359 };
360
361 tx.execute(
363 "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364 params![to_task_id, dep_type],
365 )?;
366
367 tx.commit()?;
368 Ok(deps)
369 })
370 }
371
372 pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374 self.with_conn(|conn| {
375 let mut stmt =
376 conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378 let deps = stmt
379 .query_map([], |row| {
380 let from: String = row.get(0)?;
381 let to: String = row.get(1)?;
382 let dep_type: String = row.get(2)?;
383 Ok(Dependency {
384 from_task_id: from,
385 to_task_id: to,
386 dep_type,
387 })
388 })?
389 .filter_map(|r| r.ok())
390 .collect();
391
392 Ok(deps)
393 })
394 }
395
396 pub fn get_dependencies_by_type(
398 &self,
399 task_id: &str,
400 dep_type: &str,
401 direction: &str,
402 ) -> Result<Vec<Dependency>> {
403 self.with_conn(|conn| {
404 let sql = if direction == "incoming" {
405 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406 } else {
407 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408 };
409
410 let mut stmt = conn.prepare(sql)?;
411
412 let deps = stmt
413 .query_map(params![task_id, dep_type], |row| {
414 let from: String = row.get(0)?;
415 let to: String = row.get(1)?;
416 let dep_type: String = row.get(2)?;
417 Ok(Dependency {
418 from_task_id: from,
419 to_task_id: to,
420 dep_type,
421 })
422 })?
423 .filter_map(|r| r.ok())
424 .collect();
425
426 Ok(deps)
427 })
428 }
429
430 pub fn get_start_blockers(
432 &self,
433 task_id: &str,
434 deps_config: &DependenciesConfig,
435 ) -> Result<Vec<String>> {
436 let start_blocking_types = deps_config.start_blocking_types();
437 if start_blocking_types.is_empty() {
438 return Ok(vec![]);
439 }
440
441 self.with_conn(|conn| {
442 let placeholders: String = start_blocking_types
443 .iter()
444 .enumerate()
445 .map(|(i, _)| format!("?{}", i + 2))
446 .collect::<Vec<_>>()
447 .join(", ");
448
449 let sql = format!(
450 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451 placeholders
452 );
453
454 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455 params_vec.push(Box::new(task_id.to_string()));
456 for t in &start_blocking_types {
457 params_vec.push(Box::new(t.to_string()));
458 }
459 let params_refs: Vec<&dyn rusqlite::ToSql> =
460 params_vec.iter().map(|b| b.as_ref()).collect();
461
462 let mut stmt = conn.prepare(&sql)?;
463 let blockers = stmt
464 .query_map(params_refs.as_slice(), |row| {
465 let id: String = row.get(0)?;
466 Ok(id)
467 })?
468 .filter_map(|r| r.ok())
469 .collect();
470
471 Ok(blockers)
472 })
473 }
474
475 pub fn get_completion_blockers(
478 &self,
479 task_id: &str,
480 deps_config: &DependenciesConfig,
481 ) -> Result<Vec<String>> {
482 let completion_blocking_types = deps_config.completion_blocking_types();
483 if completion_blocking_types.is_empty() {
484 return Ok(vec![]);
485 }
486
487 self.with_conn(|conn| {
488 let placeholders: String = completion_blocking_types
489 .iter()
490 .enumerate()
491 .map(|(i, _)| format!("?{}", i + 2))
492 .collect::<Vec<_>>()
493 .join(", ");
494
495 let sql = format!(
498 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499 placeholders
500 );
501
502 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503 params_vec.push(Box::new(task_id.to_string()));
504 for t in &completion_blocking_types {
505 params_vec.push(Box::new(t.to_string()));
506 }
507 let params_refs: Vec<&dyn rusqlite::ToSql> =
508 params_vec.iter().map(|b| b.as_ref()).collect();
509
510 let mut stmt = conn.prepare(&sql)?;
511 let blockers = stmt
512 .query_map(params_refs.as_slice(), |row| {
513 let id: String = row.get(0)?;
514 Ok(id)
515 })?
516 .filter_map(|r| r.ok())
517 .collect();
518
519 Ok(blockers)
520 })
521 }
522
523 pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525 self.with_conn(|conn| {
526 let result: Result<String, rusqlite::Error> = conn.query_row(
527 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528 params![task_id],
529 |row| row.get(0),
530 );
531
532 match result {
533 Ok(parent_id) => Ok(Some(parent_id)),
534 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535 Err(e) => Err(e.into()),
536 }
537 })
538 }
539
540 pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542 self.with_conn(|conn| {
543 let mut stmt = conn.prepare(
544 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545 )?;
546
547 let children = stmt
548 .query_map(params![task_id], |row| {
549 let id: String = row.get(0)?;
550 Ok(id)
551 })?
552 .filter_map(|r| r.ok())
553 .collect();
554
555 Ok(children)
556 })
557 }
558
559 pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562 self.with_conn(|conn| {
563 let mut stmt = conn.prepare(
564 "SELECT from_task_id FROM dependencies
565 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566 )?;
567
568 let blockers = stmt
569 .query_map(params![task_id], |row| {
570 let id: String = row.get(0)?;
571 Ok(id)
572 })?
573 .filter_map(|r| r.ok())
574 .collect();
575
576 Ok(blockers)
577 })
578 }
579
580 #[allow(dead_code)]
582 pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
583 self.with_conn(|conn| {
584 let mut stmt = conn.prepare(
585 "SELECT to_task_id FROM dependencies
586 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
587 )?;
588
589 let blocking = stmt
590 .query_map(params![task_id], |row| {
591 let id: String = row.get(0)?;
592 Ok(id)
593 })?
594 .filter_map(|r| r.ok())
595 .collect();
596
597 Ok(blocking)
598 })
599 }
600
601 pub fn get_blocked_tasks(
605 &self,
606 states_config: &StatesConfig,
607 deps_config: &DependenciesConfig,
608 sort_by: Option<&str>,
609 sort_order: Option<&str>,
610 ) -> Result<Vec<Task>> {
611 let start_blocking_types = deps_config.start_blocking_types();
612 if start_blocking_types.is_empty() {
613 return Ok(vec![]);
614 }
615
616 self.with_conn(|conn| {
617 let state_placeholders: Vec<String> = states_config
619 .blocking_states
620 .iter()
621 .enumerate()
622 .map(|(i, _)| format!("?{}", i + 2))
623 .collect();
624 let state_clause = state_placeholders.join(", ");
625
626 let type_start = states_config.blocking_states.len() + 2;
628 let type_placeholders: Vec<String> = start_blocking_types
629 .iter()
630 .enumerate()
631 .map(|(i, _)| format!("?{}", type_start + i))
632 .collect();
633 let type_clause = type_placeholders.join(", ");
634
635 let order_clause = build_order_clause(sort_by, sort_order);
637
638 let sql = format!(
639 "SELECT DISTINCT t.*
640 FROM tasks t
641 INNER JOIN dependencies d ON t.id = d.to_task_id
642 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
643 WHERE d.dep_type IN ({})
644 AND blocker.status IN ({})
645 AND t.status = ?1
646 AND t.deleted_at IS NULL
647 ORDER BY {}",
648 type_clause, state_clause, order_clause
649 );
650
651 let mut stmt = conn.prepare(&sql)?;
652
653 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
655 params_vec.push(Box::new(states_config.initial.clone()));
656 for state in &states_config.blocking_states {
657 params_vec.push(Box::new(state.clone()));
658 }
659 for t in &start_blocking_types {
660 params_vec.push(Box::new(t.to_string()));
661 }
662 let params_refs: Vec<&dyn rusqlite::ToSql> =
663 params_vec.iter().map(|b| b.as_ref()).collect();
664
665 let tasks = stmt
666 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
667 .filter_map(|r| r.ok())
668 .collect();
669
670 Ok(tasks)
671 })
672 }
673
674 pub fn get_ready_tasks(
679 &self,
680 agent_id: Option<&str>,
681 states_config: &StatesConfig,
682 deps_config: &DependenciesConfig,
683 sort_by: Option<&str>,
684 sort_order: Option<&str>,
685 ) -> Result<Vec<Task>> {
686 let start_blocking_types = deps_config.start_blocking_types();
687
688 let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
690 Some(self.get_agent_tags(aid)?)
691 } else {
692 None
693 };
694
695 self.with_conn(|conn| {
696 let state_placeholders: Vec<String> = states_config
698 .blocking_states
699 .iter()
700 .enumerate()
701 .map(|(i, _)| format!("?{}", i + 2))
702 .collect();
703 let state_clause = state_placeholders.join(", ");
704
705 let type_start = states_config.blocking_states.len() + 2;
707 let type_placeholders: Vec<String> = start_blocking_types
708 .iter()
709 .enumerate()
710 .map(|(i, _)| format!("?{}", type_start + i))
711 .collect();
712 let type_clause = type_placeholders.join(", ");
713
714 let order_clause = if sort_by.is_some() {
716 build_order_clause(sort_by, sort_order)
717 } else {
718 "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
720 };
721
722 let mut param_idx = type_start + start_blocking_types.len();
724
725 let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
727 let needed_placeholders: Vec<String> = tags
731 .iter()
732 .enumerate()
733 .map(|(i, _)| format!("?{}", param_idx + i))
734 .collect();
735 param_idx += tags.len();
736
737 let needed_clause = if needed_placeholders.is_empty() {
738 "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
740 .to_string()
741 } else {
742 format!(
744 "AND (
745 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
746 OR (
747 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
748 ) = (
749 SELECT COUNT(*) FROM task_needed_tags
750 WHERE task_id = t.id AND tag IN ({})
751 )
752 )",
753 needed_placeholders.join(", ")
754 )
755 };
756
757 let wanted_placeholders: Vec<String> = tags
759 .iter()
760 .enumerate()
761 .map(|(i, _)| format!("?{}", param_idx + i))
762 .collect();
763
764 let wanted_clause = if wanted_placeholders.is_empty() {
765 "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
767 .to_string()
768 } else {
769 format!(
771 "AND (
772 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
773 OR EXISTS (
774 SELECT 1 FROM task_wanted_tags
775 WHERE task_id = t.id AND tag IN ({})
776 )
777 )",
778 wanted_placeholders.join(", ")
779 )
780 };
781
782 (needed_clause, wanted_clause)
783 } else {
784 (String::new(), String::new())
785 };
786
787 let sql = format!(
788 "SELECT t.*
789 FROM tasks t
790 WHERE t.status = ?1
791 AND t.worker_id IS NULL
792 AND t.deleted_at IS NULL
793 AND NOT EXISTS (
794 SELECT 1 FROM dependencies d
795 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
796 WHERE d.to_task_id = t.id
797 AND d.dep_type IN ({})
798 AND blocker.status IN ({})
799 )
800 {}
801 {}
802 ORDER BY {}",
803 type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
804 );
805
806 let mut stmt = conn.prepare(&sql)?;
807
808 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
810 params_vec.push(Box::new(states_config.initial.clone()));
811 for state in &states_config.blocking_states {
812 params_vec.push(Box::new(state.clone()));
813 }
814 for t in &start_blocking_types {
815 params_vec.push(Box::new(t.to_string()));
816 }
817 if let Some(ref tags) = agent_tags {
819 for tag in tags {
820 params_vec.push(Box::new(tag.clone()));
821 }
822 for tag in tags {
823 params_vec.push(Box::new(tag.clone()));
824 }
825 }
826 let params_refs: Vec<&dyn rusqlite::ToSql> =
827 params_vec.iter().map(|b| b.as_ref()).collect();
828
829 let tasks: Vec<Task> = stmt
830 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
831 .filter_map(|r| r.ok())
832 .collect();
833
834 Ok(tasks)
835 })
836 }
837
838 #[allow(dead_code)]
840 pub fn has_unmet_start_dependencies(
841 &self,
842 task_id: &str,
843 states_config: &StatesConfig,
844 deps_config: &DependenciesConfig,
845 ) -> Result<bool> {
846 let start_blocking_types = deps_config.start_blocking_types();
847 if start_blocking_types.is_empty() {
848 return Ok(false);
849 }
850
851 self.with_conn(|conn| {
852 let state_placeholders: Vec<String> = states_config
854 .blocking_states
855 .iter()
856 .enumerate()
857 .map(|(i, _)| format!("?{}", i + 2))
858 .collect();
859 let state_clause = state_placeholders.join(", ");
860
861 let type_start = states_config.blocking_states.len() + 2;
863 let type_placeholders: Vec<String> = start_blocking_types
864 .iter()
865 .enumerate()
866 .map(|(i, _)| format!("?{}", type_start + i))
867 .collect();
868 let type_clause = type_placeholders.join(", ");
869
870 let sql = format!(
871 "SELECT COUNT(*) FROM dependencies d
872 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
873 WHERE d.to_task_id = ?1
874 AND d.dep_type IN ({})
875 AND blocker.status IN ({})",
876 type_clause, state_clause
877 );
878
879 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
880 params_vec.push(Box::new(task_id.to_string()));
881 for state in &states_config.blocking_states {
882 params_vec.push(Box::new(state.clone()));
883 }
884 for t in &start_blocking_types {
885 params_vec.push(Box::new(t.to_string()));
886 }
887 let params_refs: Vec<&dyn rusqlite::ToSql> =
888 params_vec.iter().map(|b| b.as_ref()).collect();
889
890 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
891
892 Ok(count > 0)
893 })
894 }
895
896 pub fn has_incomplete_children(
898 &self,
899 task_id: &str,
900 states_config: &StatesConfig,
901 ) -> Result<bool> {
902 self.with_conn(|conn| {
903 let state_placeholders: Vec<String> = states_config
905 .blocking_states
906 .iter()
907 .enumerate()
908 .map(|(i, _)| format!("?{}", i + 2))
909 .collect();
910 let state_clause = state_placeholders.join(", ");
911
912 let sql = format!(
913 "SELECT COUNT(*) FROM dependencies d
914 INNER JOIN tasks child ON d.to_task_id = child.id
915 WHERE d.from_task_id = ?1
916 AND d.dep_type = 'contains'
917 AND child.status IN ({})",
918 state_clause
919 );
920
921 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
922 params_vec.push(Box::new(task_id.to_string()));
923 for state in &states_config.blocking_states {
924 params_vec.push(Box::new(state.clone()));
925 }
926 let params_refs: Vec<&dyn rusqlite::ToSql> =
927 params_vec.iter().map(|b| b.as_ref()).collect();
928
929 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
930
931 Ok(count > 0)
932 })
933 }
934
935 #[allow(clippy::too_many_arguments)]
942 pub fn list_tasks_with_tag_filters(
943 &self,
944 status: Option<Vec<String>>,
945 owner: Option<&str>,
946 parent_id: Option<Option<&str>>,
947 tags_any: Option<Vec<String>>,
948 tags_all: Option<Vec<String>>,
949 qualified_for_agent_tags: Option<Vec<String>>,
950 limit: Option<i32>,
951 sort_by: Option<&str>,
952 sort_order: Option<&str>,
953 ) -> Result<Vec<Task>> {
954 self.with_conn(|conn| {
955 let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
956 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
957 let mut param_idx = 1;
958
959 if let Some(ref statuses) = status {
961 if statuses.len() == 1 {
962 sql.push_str(&format!(" AND t.status = ?{}", param_idx));
963 params_vec.push(Box::new(statuses[0].clone()));
964 param_idx += 1;
965 } else if statuses.len() > 1 {
966 let placeholders: Vec<String> = statuses
967 .iter()
968 .enumerate()
969 .map(|(i, _)| format!("?{}", param_idx + i))
970 .collect();
971 sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
972 for s in statuses {
973 params_vec.push(Box::new(s.clone()));
974 }
975 param_idx += statuses.len();
976 }
977 }
978
979 if let Some(o) = owner {
981 sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
982 params_vec.push(Box::new(o.to_string()));
983 param_idx += 1;
984 }
985
986 if let Some(p) = parent_id {
988 match p {
989 Some(pid) => {
990 sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
991 params_vec.push(Box::new(pid.to_string()));
992 param_idx += 1;
993 }
994 None => {
995 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
997 }
998 }
999 }
1000
1001 if let Some(ref any_tags) = tags_any
1003 && !any_tags.is_empty() {
1004 let placeholders: Vec<String> = any_tags
1005 .iter()
1006 .enumerate()
1007 .map(|(i, _)| format!("?{}", param_idx + i))
1008 .collect();
1009 sql.push_str(&format!(
1010 " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1011 placeholders.join(", ")
1012 ));
1013 for tag in any_tags {
1014 params_vec.push(Box::new(tag.clone()));
1015 }
1016 param_idx += any_tags.len();
1017 }
1018
1019 if let Some(ref all_tags) = tags_all
1021 && !all_tags.is_empty() {
1022 let placeholders: Vec<String> = all_tags
1023 .iter()
1024 .enumerate()
1025 .map(|(i, _)| format!("?{}", param_idx + i))
1026 .collect();
1027 sql.push_str(&format!(
1029 " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1030 placeholders.join(", "),
1031 all_tags.len()
1032 ));
1033 for tag in all_tags {
1034 params_vec.push(Box::new(tag.clone()));
1035 }
1036 param_idx += all_tags.len();
1037 }
1038
1039 if let Some(ref agent_tags) = qualified_for_agent_tags {
1041 if agent_tags.is_empty() {
1043 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1045 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1047 } else {
1048 let needed_placeholders: Vec<String> = agent_tags
1050 .iter()
1051 .enumerate()
1052 .map(|(i, _)| format!("?{}", param_idx + i))
1053 .collect();
1054 sql.push_str(&format!(
1055 " AND (
1056 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1057 OR (
1058 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1059 ) = (
1060 SELECT COUNT(*) FROM task_needed_tags
1061 WHERE task_id = t.id AND tag IN ({})
1062 )
1063 )",
1064 needed_placeholders.join(", ")
1065 ));
1066 for tag in agent_tags {
1067 params_vec.push(Box::new(tag.clone()));
1068 }
1069 param_idx += agent_tags.len();
1070
1071 let wanted_placeholders: Vec<String> = agent_tags
1073 .iter()
1074 .enumerate()
1075 .map(|(i, _)| format!("?{}", param_idx + i))
1076 .collect();
1077 sql.push_str(&format!(
1078 " AND (
1079 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1080 OR EXISTS (
1081 SELECT 1 FROM task_wanted_tags
1082 WHERE task_id = t.id AND tag IN ({})
1083 )
1084 )",
1085 wanted_placeholders.join(", ")
1086 ));
1087 for tag in agent_tags {
1088 params_vec.push(Box::new(tag.clone()));
1089 }
1090 }
1092 }
1093
1094 let order_clause = build_order_clause(sort_by, sort_order);
1096 sql.push_str(&format!(" ORDER BY {}", order_clause));
1097
1098 if let Some(l) = limit {
1100 sql.push_str(&format!(" LIMIT {}", l));
1101 }
1102
1103 let params_refs: Vec<&dyn rusqlite::ToSql> =
1104 params_vec.iter().map(|b| b.as_ref()).collect();
1105
1106 let mut stmt = conn.prepare(&sql)?;
1107 let tasks: Vec<Task> = stmt
1108 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1109 .filter_map(|r| r.ok())
1110 .collect();
1111
1112 Ok(tasks)
1113 })
1114 }
1115
1116 pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1118 self.with_conn(|conn| {
1119 let result: Result<String, rusqlite::Error> = conn.query_row(
1120 "SELECT tags FROM workers WHERE id = ?1",
1121 params![agent_id],
1122 |row| row.get(0),
1123 );
1124
1125 match result {
1126 Ok(tags_json) => {
1127 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1128 Ok(tags)
1129 }
1130 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1131 Err(e) => Err(e.into()),
1132 }
1133 })
1134 }
1135
1136 pub(super) fn add_dependency_internal(
1138 conn: &Connection,
1139 from_task_id: &str,
1140 to_task_id: &str,
1141 dep_type: &str,
1142 ) -> Result<()> {
1143 conn.execute(
1144 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1145 params![from_task_id, to_task_id, dep_type],
1146 )?;
1147 Ok(())
1148 }
1149
1150 pub fn relink(
1154 &self,
1155 prev_from_ids: &[String],
1156 prev_to_ids: &[String],
1157 from_ids: &[String],
1158 to_ids: &[String],
1159 dep_type: &str,
1160 deps_config: &DependenciesConfig,
1161 ) -> Result<RelinkResult> {
1162 if !deps_config.is_valid_dep_type(dep_type) {
1164 return Err(anyhow!(
1165 "Invalid dependency type '{}'. Valid types: {:?}",
1166 dep_type,
1167 deps_config.dep_type_names()
1168 ));
1169 }
1170
1171 let def = deps_config
1172 .get_definition(dep_type)
1173 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1174 let is_vertical = def.display == DependencyDisplay::Vertical;
1175
1176 self.with_conn_mut(|conn| {
1177 let tx = conn.transaction()?;
1178
1179 let mut unlinked = Vec::new();
1180 let mut linked = Vec::new();
1181 let mut errors = Vec::new();
1182
1183 for prev_from in prev_from_ids {
1185 for prev_to in prev_to_ids {
1186 let rows = tx.execute(
1187 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1188 params![prev_from, prev_to, dep_type],
1189 )?;
1190 if rows > 0 {
1191 unlinked.push((prev_from.clone(), prev_to.clone()));
1192 }
1193 }
1194 }
1195
1196 for from_id in from_ids {
1198 for to_id in to_ids {
1199 if is_vertical {
1201 let existing_parent: Option<String> = tx.query_row(
1202 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1203 params![to_id],
1204 |row| row.get(0),
1205 ).optional()?;
1206
1207 if let Some(ref parent) = existing_parent
1208 && parent != from_id {
1209 errors.push(format!(
1210 "Task {} already has parent {}",
1211 to_id, parent
1212 ));
1213 continue;
1214 }
1215 }
1216
1217 if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1219 errors.push(format!(
1220 "Adding dependency {}→{} would create a cycle",
1221 from_id, to_id
1222 ));
1223 continue;
1224 }
1225
1226 tx.execute(
1227 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1228 params![from_id, to_id, dep_type],
1229 )?;
1230 linked.push((from_id.clone(), to_id.clone()));
1231 }
1232 }
1233
1234 if !errors.is_empty() {
1235 tx.rollback()?;
1237 return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1238 }
1239
1240 tx.commit()?;
1241 Ok(RelinkResult { unlinked, linked })
1242 })
1243 }
1244
1245 pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1252 if depth == 0 {
1253 return Ok(vec![]);
1254 }
1255
1256 self.with_conn(|conn| {
1257 let mut visited: HashSet<String> = HashSet::new();
1258 let mut result: Vec<Task> = Vec::new();
1259 let mut current_level: Vec<String> = vec![task_id.to_string()];
1260 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1261
1262 while !current_level.is_empty() && levels_remaining > 0 {
1263 let mut next_level: Vec<String> = Vec::new();
1264
1265 for tid in ¤t_level {
1266 let mut stmt = conn.prepare(
1268 "SELECT DISTINCT d.from_task_id FROM dependencies d
1269 WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1270 )?;
1271
1272 let predecessors: Vec<String> = stmt
1273 .query_map(params![tid], |row| row.get(0))?
1274 .filter_map(|r| r.ok())
1275 .collect();
1276
1277 for pred_id in predecessors {
1278 if !visited.contains(&pred_id) {
1279 visited.insert(pred_id.clone());
1280 if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1281 result.push(task);
1282 }
1283 next_level.push(pred_id);
1284 }
1285 }
1286 }
1287
1288 current_level = next_level;
1289 levels_remaining -= 1;
1290 }
1291
1292 Ok(result)
1293 })
1294 }
1295
1296 pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1299 if depth == 0 {
1300 return Ok(vec![]);
1301 }
1302
1303 self.with_conn(|conn| {
1304 let mut visited: HashSet<String> = HashSet::new();
1305 let mut result: Vec<Task> = Vec::new();
1306 let mut current_level: Vec<String> = vec![task_id.to_string()];
1307 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1308
1309 while !current_level.is_empty() && levels_remaining > 0 {
1310 let mut next_level: Vec<String> = Vec::new();
1311
1312 for tid in ¤t_level {
1313 let mut stmt = conn.prepare(
1315 "SELECT DISTINCT d.to_task_id FROM dependencies d
1316 WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1317 )?;
1318
1319 let successors: Vec<String> = stmt
1320 .query_map(params![tid], |row| row.get(0))?
1321 .filter_map(|r| r.ok())
1322 .collect();
1323
1324 for succ_id in successors {
1325 if !visited.contains(&succ_id) {
1326 visited.insert(succ_id.clone());
1327 if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1328 result.push(task);
1329 }
1330 next_level.push(succ_id);
1331 }
1332 }
1333 }
1334
1335 current_level = next_level;
1336 levels_remaining -= 1;
1337 }
1338
1339 Ok(result)
1340 })
1341 }
1342
1343 pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1346 if depth == 0 {
1347 return Ok(vec![]);
1348 }
1349
1350 self.with_conn(|conn| {
1351 let mut result: Vec<Task> = Vec::new();
1352 let mut current_id = task_id.to_string();
1353 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1354
1355 while levels_remaining > 0 {
1356 let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1358 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1359 params![¤t_id],
1360 |row| row.get(0),
1361 );
1362
1363 match parent_result {
1364 Ok(parent_id) => {
1365 if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1366 result.push(task);
1367 }
1368 current_id = parent_id;
1369 levels_remaining -= 1;
1370 }
1371 Err(rusqlite::Error::QueryReturnedNoRows) => break,
1372 Err(e) => return Err(e.into()),
1373 }
1374 }
1375
1376 Ok(result)
1377 })
1378 }
1379
1380 pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1383 if depth == 0 {
1384 return Ok(vec![]);
1385 }
1386
1387 self.with_conn(|conn| {
1388 let mut visited: HashSet<String> = HashSet::new();
1389 let mut result: Vec<Task> = Vec::new();
1390 let mut current_level: Vec<String> = vec![task_id.to_string()];
1391 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1392
1393 while !current_level.is_empty() && levels_remaining > 0 {
1394 let mut next_level: Vec<String> = Vec::new();
1395
1396 for tid in ¤t_level {
1397 let mut stmt = conn.prepare(
1399 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1400 )?;
1401
1402 let children: Vec<String> = stmt
1403 .query_map(params![tid], |row| row.get(0))?
1404 .filter_map(|r| r.ok())
1405 .collect();
1406
1407 for child_id in children {
1408 if !visited.contains(&child_id) {
1409 visited.insert(child_id.clone());
1410 if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1411 result.push(task);
1412 }
1413 next_level.push(child_id);
1414 }
1415 }
1416 }
1417
1418 current_level = next_level;
1419 levels_remaining -= 1;
1420 }
1421
1422 Ok(result)
1423 })
1424 }
1425}
1426
1427fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1429 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1430 let task = stmt
1431 .query_row(params![task_id], super::tasks::parse_task_row)
1432 .optional()?;
1433 Ok(task)
1434}
1435
1436pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1440 conn: &Connection,
1441 task_id: &str,
1442 states_config: &StatesConfig,
1443 deps_config: &DependenciesConfig,
1444) -> Result<Vec<String>> {
1445 let start_blocking_types = deps_config.start_blocking_types();
1446 if start_blocking_types.is_empty() {
1447 return Ok(vec![]);
1448 }
1449
1450 let state_placeholders: Vec<String> = states_config
1452 .blocking_states
1453 .iter()
1454 .enumerate()
1455 .map(|(i, _)| format!("?{}", i + 2))
1456 .collect();
1457 let state_clause = state_placeholders.join(", ");
1458
1459 let type_start = states_config.blocking_states.len() + 2;
1461 let type_placeholders: Vec<String> = start_blocking_types
1462 .iter()
1463 .enumerate()
1464 .map(|(i, _)| format!("?{}", type_start + i))
1465 .collect();
1466 let type_clause = type_placeholders.join(", ");
1467
1468 let sql = format!(
1469 "SELECT blocker.id FROM dependencies d
1470 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1471 WHERE d.to_task_id = ?1
1472 AND d.dep_type IN ({})
1473 AND blocker.status IN ({})",
1474 type_clause, state_clause
1475 );
1476
1477 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1478 params_vec.push(Box::new(task_id.to_string()));
1479 for state in &states_config.blocking_states {
1480 params_vec.push(Box::new(state.clone()));
1481 }
1482 for t in &start_blocking_types {
1483 params_vec.push(Box::new(t.to_string()));
1484 }
1485 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1486
1487 let mut stmt = conn.prepare(&sql)?;
1488 let blockers = stmt
1489 .query_map(params_refs.as_slice(), |row| {
1490 let id: String = row.get(0)?;
1491 Ok(id)
1492 })?
1493 .filter_map(|r| r.ok())
1494 .collect();
1495
1496 Ok(blockers)
1497}
1498
1499pub(crate) fn propagate_unblock_effects(
1516 conn: &Connection,
1517 completed_task_id: &str,
1518 agent_id: Option<&str>,
1519 states_config: &StatesConfig,
1520 deps_config: &DependenciesConfig,
1521 auto_advance: &AutoAdvanceConfig,
1522) -> Result<(Vec<String>, Vec<String>)> {
1523 let start_blocking_types = deps_config.start_blocking_types();
1525 if start_blocking_types.is_empty() {
1526 return Ok((vec![], vec![]));
1527 }
1528
1529 let type_placeholders: Vec<String> = start_blocking_types
1531 .iter()
1532 .enumerate()
1533 .map(|(i, _)| format!("?{}", i + 2))
1534 .collect();
1535 let type_clause = type_placeholders.join(", ");
1536
1537 let sql = format!(
1538 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1539 type_clause
1540 );
1541
1542 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1543 params_vec.push(Box::new(completed_task_id.to_string()));
1544 for t in &start_blocking_types {
1545 params_vec.push(Box::new(t.to_string()));
1546 }
1547 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1548
1549 let mut stmt = conn.prepare(&sql)?;
1550 let dependent_task_ids: Vec<String> = stmt
1551 .query_map(params_refs.as_slice(), |row| row.get(0))?
1552 .filter_map(|r| r.ok())
1553 .collect();
1554
1555 let mut unblocked = Vec::new();
1556 let mut auto_advanced = Vec::new();
1557 let now = super::now_ms();
1558
1559 let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1561 let target_state = auto_advance.target_state.clone();
1562
1563 if should_auto_advance {
1565 let ts = target_state.as_ref().unwrap();
1566 if !states_config.is_valid_state(ts) {
1567 return Err(anyhow!(
1568 "Auto-advance target state '{}' is not a valid state",
1569 ts
1570 ));
1571 }
1572 }
1573
1574 for task_id in dependent_task_ids {
1575 let task = match get_task_by_id_internal(conn, &task_id)? {
1577 Some(t) => t,
1578 None => continue,
1579 };
1580
1581 if task.status != states_config.initial {
1583 continue;
1584 }
1585
1586 if task.worker_id.is_some() {
1588 continue;
1589 }
1590
1591 let state_placeholders: Vec<String> = states_config
1594 .blocking_states
1595 .iter()
1596 .enumerate()
1597 .map(|(i, _)| format!("?{}", i + 3))
1598 .collect();
1599 let state_clause = state_placeholders.join(", ");
1600
1601 let type_start = states_config.blocking_states.len() + 3;
1603 let type_placeholders2: Vec<String> = start_blocking_types
1604 .iter()
1605 .enumerate()
1606 .map(|(i, _)| format!("?{}", type_start + i))
1607 .collect();
1608 let type_clause2 = type_placeholders2.join(", ");
1609
1610 let blocker_sql = format!(
1611 "SELECT COUNT(*) FROM dependencies d
1612 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1613 WHERE d.to_task_id = ?1
1614 AND d.from_task_id != ?2
1615 AND d.dep_type IN ({})
1616 AND blocker.status IN ({})",
1617 type_clause2, state_clause
1618 );
1619
1620 let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1621 blocker_params.push(Box::new(task_id.clone()));
1622 blocker_params.push(Box::new(completed_task_id.to_string()));
1623 for state in &states_config.blocking_states {
1624 blocker_params.push(Box::new(state.clone()));
1625 }
1626 for t in &start_blocking_types {
1627 blocker_params.push(Box::new(t.to_string()));
1628 }
1629 let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1630 blocker_params.iter().map(|b| b.as_ref()).collect();
1631
1632 let remaining_blockers: i32 =
1633 conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1634
1635 if remaining_blockers > 0 {
1636 continue; }
1638
1639 unblocked.push(task_id.clone());
1641
1642 if should_auto_advance {
1644 let ts = target_state.as_ref().unwrap();
1645
1646 if !states_config.is_valid_transition(&states_config.initial, ts) {
1648 continue;
1650 }
1651
1652 conn.execute(
1654 "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1655 params![ts, now, &task_id],
1656 )?;
1657
1658 let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1660 super::state_transitions::record_state_transition(
1661 conn,
1662 &task_id,
1663 ts,
1664 agent_id,
1665 Some(&reason),
1666 states_config,
1667 )?;
1668
1669 auto_advanced.push(task_id);
1670 }
1671 }
1672
1673 Ok((unblocked, auto_advanced))
1674}