1use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10#[derive(Debug)]
12pub struct RelinkResult {
13 pub unlinked: Vec<(String, String)>,
15 pub linked: Vec<(String, String)>,
17}
18
19fn would_create_cycle_in_tx(
21 tx: &rusqlite::Transaction,
22 from_task_id: &str,
23 to_task_id: &str,
24 dep_type: &str,
25 deps_config: &DependenciesConfig,
26) -> Result<bool> {
27 let def = deps_config
28 .get_definition(dep_type)
29 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31 let mut visited: HashSet<String> = HashSet::new();
34 let mut queue: VecDeque<String> = VecDeque::new();
35 queue.push_back(to_task_id.to_string());
36
37 while let Some(current) = queue.pop_front() {
38 if current == from_task_id {
39 return Ok(true); }
41
42 if visited.contains(¤t) {
43 continue;
44 }
45 visited.insert(current.clone());
46
47 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49 let mut stmt = tx.prepare(
51 "SELECT to_task_id FROM dependencies d
52 JOIN (SELECT value FROM json_each(?1)) types
53 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54 )?;
55 let vertical_types: Vec<&str> = deps_config.vertical_types();
56 let types_json = serde_json::to_string(&vertical_types)?;
57 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
58 .filter_map(|r| r.ok())
59 .collect()
60 } else {
61 let mut stmt = tx.prepare(
63 "SELECT to_task_id FROM dependencies d
64 JOIN (SELECT value FROM json_each(?1)) types
65 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66 )?;
67 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68 let types_json = serde_json::to_string(&start_blocking)?;
69 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
70 .filter_map(|r| r.ok())
71 .collect()
72 };
73
74 for dep in deps {
75 if !visited.contains(&dep) {
76 queue.push_back(dep);
77 }
78 }
79 }
80
81 Ok(false)
82}
83
84fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87 let field = match sort_by {
88 Some("priority") => "CAST(t.priority AS INTEGER)",
89 Some("created_at") => "t.created_at",
90 Some("updated_at") => "t.updated_at",
91 _ => "t.created_at", };
93
94 let order = match sort_order {
95 Some("asc") => "ASC",
96 Some("desc") => "DESC",
97 _ => {
98 "DESC"
100 }
101 };
102
103 format!("{} {}", field, order)
104}
105
106#[derive(Debug)]
108pub enum AddDependencyResult {
109 Created,
111 AlreadyExists,
113 FromTaskNotFound,
115 ToTaskNotFound,
117}
118
119impl Database {
120 pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122 self.with_conn(|conn| {
123 let count: i64 = conn.query_row(
124 "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125 params![task_id],
126 |row| row.get(0),
127 )?;
128 Ok(count > 0)
129 })
130 }
131
132 pub fn add_dependency(
135 &self,
136 from_task_id: &str,
137 to_task_id: &str,
138 dep_type: &str,
139 deps_config: &DependenciesConfig,
140 ) -> Result<()> {
141 match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142 AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143 AddDependencyResult::FromTaskNotFound => {
144 Err(anyhow!("Source task '{}' not found", from_task_id))
145 }
146 AddDependencyResult::ToTaskNotFound => {
147 Err(anyhow!("Target task '{}' not found", to_task_id))
148 }
149 }
150 }
151
152 pub fn add_dependency_soft(
155 &self,
156 from_task_id: &str,
157 to_task_id: &str,
158 dep_type: &str,
159 deps_config: &DependenciesConfig,
160 ) -> Result<AddDependencyResult> {
161 if !deps_config.is_valid_dep_type(dep_type) {
163 return Err(anyhow!(
164 "Invalid dependency type '{}'. Valid types: {:?}",
165 dep_type,
166 deps_config.dep_type_names()
167 ));
168 }
169
170 if !self.task_exists(from_task_id)? {
172 return Ok(AddDependencyResult::FromTaskNotFound);
173 }
174 if !self.task_exists(to_task_id)? {
175 return Ok(AddDependencyResult::ToTaskNotFound);
176 }
177
178 let def = deps_config
180 .get_definition(dep_type)
181 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182 if def.display == DependencyDisplay::Vertical
183 && let Some(existing_parent) = self.get_parent(to_task_id)?
184 && existing_parent != from_task_id
185 {
186 return Err(anyhow!(
187 "Task {} already has parent {}",
188 to_task_id,
189 existing_parent
190 ));
191 }
192
193 if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195 return Err(anyhow!("Adding this dependency would create a cycle"));
196 }
197
198 self.with_conn(|conn| {
199 let changes = conn.execute(
200 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201 params![from_task_id, to_task_id, dep_type],
202 )?;
203 if changes == 0 {
204 Ok(AddDependencyResult::AlreadyExists)
205 } else {
206 Ok(AddDependencyResult::Created)
207 }
208 })
209 }
210
211 pub fn would_create_cycle(
215 &self,
216 from_task_id: &str,
217 to_task_id: &str,
218 dep_type: &str,
219 deps_config: &DependenciesConfig,
220 ) -> Result<bool> {
221 let def = deps_config
222 .get_definition(dep_type)
223 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225 self.with_conn(|conn| {
226 let mut visited: HashSet<String> = HashSet::new();
229 let mut queue: VecDeque<String> = VecDeque::new();
230 queue.push_back(to_task_id.to_string());
231
232 while let Some(current) = queue.pop_front() {
233 if current == from_task_id {
234 return Ok(true); }
236
237 if visited.contains(¤t) {
238 continue;
239 }
240 visited.insert(current.clone());
241
242 let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244 let mut stmt = conn.prepare(
246 "SELECT to_task_id FROM dependencies d
247 JOIN (SELECT value FROM json_each(?1)) types
248 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249 )?;
250 let vertical_types: Vec<&str> = deps_config.vertical_types();
251 let types_json = serde_json::to_string(&vertical_types)?;
252 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
253 .filter_map(|r| r.ok())
254 .collect()
255 } else {
256 let mut stmt = conn.prepare(
258 "SELECT to_task_id FROM dependencies d
259 JOIN (SELECT value FROM json_each(?1)) types
260 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261 )?;
262 let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263 let types_json = serde_json::to_string(&start_blocking)?;
264 stmt.query_map(params![&types_json, ¤t], |row| row.get(0))?
265 .filter_map(|r| r.ok())
266 .collect()
267 };
268
269 for dep in deps {
270 if !visited.contains(&dep) {
271 queue.push_back(dep);
272 }
273 }
274 }
275
276 Ok(false)
277 })
278 }
279
280 pub fn remove_dependency(
282 &self,
283 from_task_id: &str,
284 to_task_id: &str,
285 dep_type: &str,
286 ) -> Result<bool> {
287 self.with_conn(|conn| {
288 let rows = conn.execute(
289 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290 params![from_task_id, to_task_id, dep_type],
291 )?;
292 Ok(rows > 0)
293 })
294 }
295
296 pub fn remove_all_outgoing_dependencies(
299 &self,
300 from_task_id: &str,
301 dep_type: &str,
302 ) -> Result<Vec<Dependency>> {
303 self.with_conn_mut(|conn| {
304 let tx = conn.transaction()?;
305
306 let deps: Vec<Dependency> = {
308 let mut stmt = tx.prepare(
309 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310 )?;
311 stmt
312 .query_map(params![from_task_id, dep_type], |row| {
313 Ok(Dependency {
314 from_task_id: row.get(0)?,
315 to_task_id: row.get(1)?,
316 dep_type: row.get(2)?,
317 })
318 })?
319 .filter_map(|r| r.ok())
320 .collect()
321 };
322
323 tx.execute(
325 "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326 params![from_task_id, dep_type],
327 )?;
328
329 tx.commit()?;
330 Ok(deps)
331 })
332 }
333
334 pub fn remove_all_incoming_dependencies(
337 &self,
338 to_task_id: &str,
339 dep_type: &str,
340 ) -> Result<Vec<Dependency>> {
341 self.with_conn_mut(|conn| {
342 let tx = conn.transaction()?;
343
344 let deps: Vec<Dependency> = {
346 let mut stmt = tx.prepare(
347 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348 )?;
349 stmt
350 .query_map(params![to_task_id, dep_type], |row| {
351 Ok(Dependency {
352 from_task_id: row.get(0)?,
353 to_task_id: row.get(1)?,
354 dep_type: row.get(2)?,
355 })
356 })?
357 .filter_map(|r| r.ok())
358 .collect()
359 };
360
361 tx.execute(
363 "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364 params![to_task_id, dep_type],
365 )?;
366
367 tx.commit()?;
368 Ok(deps)
369 })
370 }
371
372 pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374 self.with_conn(|conn| {
375 let mut stmt =
376 conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378 let deps = stmt
379 .query_map([], |row| {
380 let from: String = row.get(0)?;
381 let to: String = row.get(1)?;
382 let dep_type: String = row.get(2)?;
383 Ok(Dependency {
384 from_task_id: from,
385 to_task_id: to,
386 dep_type,
387 })
388 })?
389 .filter_map(|r| r.ok())
390 .collect();
391
392 Ok(deps)
393 })
394 }
395
396 pub fn get_dependencies_by_type(
398 &self,
399 task_id: &str,
400 dep_type: &str,
401 direction: &str,
402 ) -> Result<Vec<Dependency>> {
403 self.with_conn(|conn| {
404 let sql = if direction == "incoming" {
405 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406 } else {
407 "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408 };
409
410 let mut stmt = conn.prepare(sql)?;
411
412 let deps = stmt
413 .query_map(params![task_id, dep_type], |row| {
414 let from: String = row.get(0)?;
415 let to: String = row.get(1)?;
416 let dep_type: String = row.get(2)?;
417 Ok(Dependency {
418 from_task_id: from,
419 to_task_id: to,
420 dep_type,
421 })
422 })?
423 .filter_map(|r| r.ok())
424 .collect();
425
426 Ok(deps)
427 })
428 }
429
430 pub fn get_start_blockers(
432 &self,
433 task_id: &str,
434 deps_config: &DependenciesConfig,
435 ) -> Result<Vec<String>> {
436 let start_blocking_types = deps_config.start_blocking_types();
437 if start_blocking_types.is_empty() {
438 return Ok(vec![]);
439 }
440
441 self.with_conn(|conn| {
442 let placeholders: String = start_blocking_types
443 .iter()
444 .enumerate()
445 .map(|(i, _)| format!("?{}", i + 2))
446 .collect::<Vec<_>>()
447 .join(", ");
448
449 let sql = format!(
450 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451 placeholders
452 );
453
454 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455 params_vec.push(Box::new(task_id.to_string()));
456 for t in &start_blocking_types {
457 params_vec.push(Box::new(t.to_string()));
458 }
459 let params_refs: Vec<&dyn rusqlite::ToSql> =
460 params_vec.iter().map(|b| b.as_ref()).collect();
461
462 let mut stmt = conn.prepare(&sql)?;
463 let blockers = stmt
464 .query_map(params_refs.as_slice(), |row| {
465 let id: String = row.get(0)?;
466 Ok(id)
467 })?
468 .filter_map(|r| r.ok())
469 .collect();
470
471 Ok(blockers)
472 })
473 }
474
475 pub fn get_completion_blockers(
478 &self,
479 task_id: &str,
480 deps_config: &DependenciesConfig,
481 ) -> Result<Vec<String>> {
482 let completion_blocking_types = deps_config.completion_blocking_types();
483 if completion_blocking_types.is_empty() {
484 return Ok(vec![]);
485 }
486
487 self.with_conn(|conn| {
488 let placeholders: String = completion_blocking_types
489 .iter()
490 .enumerate()
491 .map(|(i, _)| format!("?{}", i + 2))
492 .collect::<Vec<_>>()
493 .join(", ");
494
495 let sql = format!(
498 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499 placeholders
500 );
501
502 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503 params_vec.push(Box::new(task_id.to_string()));
504 for t in &completion_blocking_types {
505 params_vec.push(Box::new(t.to_string()));
506 }
507 let params_refs: Vec<&dyn rusqlite::ToSql> =
508 params_vec.iter().map(|b| b.as_ref()).collect();
509
510 let mut stmt = conn.prepare(&sql)?;
511 let blockers = stmt
512 .query_map(params_refs.as_slice(), |row| {
513 let id: String = row.get(0)?;
514 Ok(id)
515 })?
516 .filter_map(|r| r.ok())
517 .collect();
518
519 Ok(blockers)
520 })
521 }
522
523 pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525 self.with_conn(|conn| {
526 let result: Result<String, rusqlite::Error> = conn.query_row(
527 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528 params![task_id],
529 |row| row.get(0),
530 );
531
532 match result {
533 Ok(parent_id) => Ok(Some(parent_id)),
534 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535 Err(e) => Err(e.into()),
536 }
537 })
538 }
539
540 pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542 self.with_conn(|conn| {
543 let mut stmt = conn.prepare(
544 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545 )?;
546
547 let children = stmt
548 .query_map(params![task_id], |row| {
549 let id: String = row.get(0)?;
550 Ok(id)
551 })?
552 .filter_map(|r| r.ok())
553 .collect();
554
555 Ok(children)
556 })
557 }
558
559 pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562 self.with_conn(|conn| {
563 let mut stmt = conn.prepare(
564 "SELECT from_task_id FROM dependencies
565 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566 )?;
567
568 let blockers = stmt
569 .query_map(params![task_id], |row| {
570 let id: String = row.get(0)?;
571 Ok(id)
572 })?
573 .filter_map(|r| r.ok())
574 .collect();
575
576 Ok(blockers)
577 })
578 }
579
580 #[allow(dead_code)]
582 pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
583 self.with_conn(|conn| {
584 let mut stmt = conn.prepare(
585 "SELECT to_task_id FROM dependencies
586 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
587 )?;
588
589 let blocking = stmt
590 .query_map(params![task_id], |row| {
591 let id: String = row.get(0)?;
592 Ok(id)
593 })?
594 .filter_map(|r| r.ok())
595 .collect();
596
597 Ok(blocking)
598 })
599 }
600
601 pub fn get_blocked_tasks(
605 &self,
606 states_config: &StatesConfig,
607 deps_config: &DependenciesConfig,
608 sort_by: Option<&str>,
609 sort_order: Option<&str>,
610 ) -> Result<Vec<Task>> {
611 let start_blocking_types = deps_config.start_blocking_types();
612 if start_blocking_types.is_empty() {
613 return Ok(vec![]);
614 }
615
616 self.with_conn(|conn| {
617 let state_placeholders: Vec<String> = states_config
619 .blocking_states
620 .iter()
621 .enumerate()
622 .map(|(i, _)| format!("?{}", i + 2))
623 .collect();
624 let state_clause = state_placeholders.join(", ");
625
626 let type_start = states_config.blocking_states.len() + 2;
628 let type_placeholders: Vec<String> = start_blocking_types
629 .iter()
630 .enumerate()
631 .map(|(i, _)| format!("?{}", type_start + i))
632 .collect();
633 let type_clause = type_placeholders.join(", ");
634
635 let order_clause = build_order_clause(sort_by, sort_order);
637
638 let sql = format!(
639 "SELECT DISTINCT t.*
640 FROM tasks t
641 INNER JOIN dependencies d ON t.id = d.to_task_id
642 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
643 WHERE d.dep_type IN ({})
644 AND blocker.status IN ({})
645 AND t.status = ?1
646 AND t.deleted_at IS NULL
647 ORDER BY {}",
648 type_clause, state_clause, order_clause
649 );
650
651 let mut stmt = conn.prepare(&sql)?;
652
653 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
655 params_vec.push(Box::new(states_config.initial.clone()));
656 for state in &states_config.blocking_states {
657 params_vec.push(Box::new(state.clone()));
658 }
659 for t in &start_blocking_types {
660 params_vec.push(Box::new(t.to_string()));
661 }
662 let params_refs: Vec<&dyn rusqlite::ToSql> =
663 params_vec.iter().map(|b| b.as_ref()).collect();
664
665 let tasks = stmt
666 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
667 .filter_map(|r| r.ok())
668 .collect();
669
670 Ok(tasks)
671 })
672 }
673
674 pub fn get_ready_tasks(
679 &self,
680 agent_id: Option<&str>,
681 states_config: &StatesConfig,
682 deps_config: &DependenciesConfig,
683 sort_by: Option<&str>,
684 sort_order: Option<&str>,
685 ) -> Result<Vec<Task>> {
686 let start_blocking_types = deps_config.start_blocking_types();
687
688 let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
690 Some(self.get_agent_tags(aid)?)
691 } else {
692 None
693 };
694
695 self.with_conn(|conn| {
696 let state_placeholders: Vec<String> = states_config
698 .blocking_states
699 .iter()
700 .enumerate()
701 .map(|(i, _)| format!("?{}", i + 2))
702 .collect();
703 let state_clause = state_placeholders.join(", ");
704
705 let type_start = states_config.blocking_states.len() + 2;
707 let type_placeholders: Vec<String> = start_blocking_types
708 .iter()
709 .enumerate()
710 .map(|(i, _)| format!("?{}", type_start + i))
711 .collect();
712 let type_clause = type_placeholders.join(", ");
713
714 let order_clause = if sort_by.is_some() {
716 build_order_clause(sort_by, sort_order)
717 } else {
718 "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
720 };
721
722 let mut param_idx = type_start + start_blocking_types.len();
724
725 let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
727 let needed_placeholders: Vec<String> = tags
731 .iter()
732 .enumerate()
733 .map(|(i, _)| format!("?{}", param_idx + i))
734 .collect();
735 param_idx += tags.len();
736
737 let needed_clause = if needed_placeholders.is_empty() {
738 "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
740 .to_string()
741 } else {
742 format!(
744 "AND (
745 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
746 OR (
747 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
748 ) = (
749 SELECT COUNT(*) FROM task_needed_tags
750 WHERE task_id = t.id AND tag IN ({})
751 )
752 )",
753 needed_placeholders.join(", ")
754 )
755 };
756
757 let wanted_placeholders: Vec<String> = tags
759 .iter()
760 .enumerate()
761 .map(|(i, _)| format!("?{}", param_idx + i))
762 .collect();
763
764 let wanted_clause = if wanted_placeholders.is_empty() {
765 "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
767 .to_string()
768 } else {
769 format!(
771 "AND (
772 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
773 OR EXISTS (
774 SELECT 1 FROM task_wanted_tags
775 WHERE task_id = t.id AND tag IN ({})
776 )
777 )",
778 wanted_placeholders.join(", ")
779 )
780 };
781
782 (needed_clause, wanted_clause)
783 } else {
784 (String::new(), String::new())
785 };
786
787 let sql = format!(
788 "SELECT t.*
789 FROM tasks t
790 WHERE t.status = ?1
791 AND t.worker_id IS NULL
792 AND t.deleted_at IS NULL
793 AND NOT EXISTS (
794 SELECT 1 FROM dependencies d
795 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
796 WHERE d.to_task_id = t.id
797 AND d.dep_type IN ({})
798 AND blocker.status IN ({})
799 )
800 {}
801 {}
802 ORDER BY {}",
803 type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
804 );
805
806 let mut stmt = conn.prepare(&sql)?;
807
808 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
810 params_vec.push(Box::new(states_config.initial.clone()));
811 for state in &states_config.blocking_states {
812 params_vec.push(Box::new(state.clone()));
813 }
814 for t in &start_blocking_types {
815 params_vec.push(Box::new(t.to_string()));
816 }
817 if let Some(ref tags) = agent_tags {
819 for tag in tags {
820 params_vec.push(Box::new(tag.clone()));
821 }
822 for tag in tags {
823 params_vec.push(Box::new(tag.clone()));
824 }
825 }
826 let params_refs: Vec<&dyn rusqlite::ToSql> =
827 params_vec.iter().map(|b| b.as_ref()).collect();
828
829 let tasks: Vec<Task> = stmt
830 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
831 .filter_map(|r| r.ok())
832 .collect();
833
834 Ok(tasks)
835 })
836 }
837
838 #[allow(dead_code)]
840 pub fn has_unmet_start_dependencies(
841 &self,
842 task_id: &str,
843 states_config: &StatesConfig,
844 deps_config: &DependenciesConfig,
845 ) -> Result<bool> {
846 let start_blocking_types = deps_config.start_blocking_types();
847 if start_blocking_types.is_empty() {
848 return Ok(false);
849 }
850
851 self.with_conn(|conn| {
852 let state_placeholders: Vec<String> = states_config
854 .blocking_states
855 .iter()
856 .enumerate()
857 .map(|(i, _)| format!("?{}", i + 2))
858 .collect();
859 let state_clause = state_placeholders.join(", ");
860
861 let type_start = states_config.blocking_states.len() + 2;
863 let type_placeholders: Vec<String> = start_blocking_types
864 .iter()
865 .enumerate()
866 .map(|(i, _)| format!("?{}", type_start + i))
867 .collect();
868 let type_clause = type_placeholders.join(", ");
869
870 let sql = format!(
871 "SELECT COUNT(*) FROM dependencies d
872 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
873 WHERE d.to_task_id = ?1
874 AND d.dep_type IN ({})
875 AND blocker.status IN ({})",
876 type_clause, state_clause
877 );
878
879 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
880 params_vec.push(Box::new(task_id.to_string()));
881 for state in &states_config.blocking_states {
882 params_vec.push(Box::new(state.clone()));
883 }
884 for t in &start_blocking_types {
885 params_vec.push(Box::new(t.to_string()));
886 }
887 let params_refs: Vec<&dyn rusqlite::ToSql> =
888 params_vec.iter().map(|b| b.as_ref()).collect();
889
890 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
891
892 Ok(count > 0)
893 })
894 }
895
896 pub fn has_incomplete_children(
898 &self,
899 task_id: &str,
900 states_config: &StatesConfig,
901 ) -> Result<bool> {
902 self.with_conn(|conn| {
903 let state_placeholders: Vec<String> = states_config
905 .blocking_states
906 .iter()
907 .enumerate()
908 .map(|(i, _)| format!("?{}", i + 2))
909 .collect();
910 let state_clause = state_placeholders.join(", ");
911
912 let sql = format!(
913 "SELECT COUNT(*) FROM dependencies d
914 INNER JOIN tasks child ON d.to_task_id = child.id
915 WHERE d.from_task_id = ?1
916 AND d.dep_type = 'contains'
917 AND child.status IN ({})",
918 state_clause
919 );
920
921 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
922 params_vec.push(Box::new(task_id.to_string()));
923 for state in &states_config.blocking_states {
924 params_vec.push(Box::new(state.clone()));
925 }
926 let params_refs: Vec<&dyn rusqlite::ToSql> =
927 params_vec.iter().map(|b| b.as_ref()).collect();
928
929 let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
930
931 Ok(count > 0)
932 })
933 }
934
935 #[allow(clippy::too_many_arguments)]
942 pub fn list_tasks_with_tag_filters(
943 &self,
944 status: Option<Vec<String>>,
945 owner: Option<&str>,
946 parent_id: Option<Option<&str>>,
947 tags_any: Option<Vec<String>>,
948 tags_all: Option<Vec<String>>,
949 qualified_for_agent_tags: Option<Vec<String>>,
950 limit: Option<i32>,
951 offset: i32,
952 sort_by: Option<&str>,
953 sort_order: Option<&str>,
954 ) -> Result<Vec<Task>> {
955 self.with_conn(|conn| {
956 let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
957 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
958 let mut param_idx = 1;
959
960 if let Some(ref statuses) = status {
962 if statuses.len() == 1 {
963 sql.push_str(&format!(" AND t.status = ?{}", param_idx));
964 params_vec.push(Box::new(statuses[0].clone()));
965 param_idx += 1;
966 } else if statuses.len() > 1 {
967 let placeholders: Vec<String> = statuses
968 .iter()
969 .enumerate()
970 .map(|(i, _)| format!("?{}", param_idx + i))
971 .collect();
972 sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
973 for s in statuses {
974 params_vec.push(Box::new(s.clone()));
975 }
976 param_idx += statuses.len();
977 }
978 }
979
980 if let Some(o) = owner {
982 sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
983 params_vec.push(Box::new(o.to_string()));
984 param_idx += 1;
985 }
986
987 if let Some(p) = parent_id {
989 match p {
990 Some(pid) => {
991 sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
992 params_vec.push(Box::new(pid.to_string()));
993 param_idx += 1;
994 }
995 None => {
996 sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
998 }
999 }
1000 }
1001
1002 if let Some(ref any_tags) = tags_any
1004 && !any_tags.is_empty() {
1005 let placeholders: Vec<String> = any_tags
1006 .iter()
1007 .enumerate()
1008 .map(|(i, _)| format!("?{}", param_idx + i))
1009 .collect();
1010 sql.push_str(&format!(
1011 " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1012 placeholders.join(", ")
1013 ));
1014 for tag in any_tags {
1015 params_vec.push(Box::new(tag.clone()));
1016 }
1017 param_idx += any_tags.len();
1018 }
1019
1020 if let Some(ref all_tags) = tags_all
1022 && !all_tags.is_empty() {
1023 let placeholders: Vec<String> = all_tags
1024 .iter()
1025 .enumerate()
1026 .map(|(i, _)| format!("?{}", param_idx + i))
1027 .collect();
1028 sql.push_str(&format!(
1030 " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1031 placeholders.join(", "),
1032 all_tags.len()
1033 ));
1034 for tag in all_tags {
1035 params_vec.push(Box::new(tag.clone()));
1036 }
1037 param_idx += all_tags.len();
1038 }
1039
1040 if let Some(ref agent_tags) = qualified_for_agent_tags {
1042 if agent_tags.is_empty() {
1044 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1046 sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1048 } else {
1049 let needed_placeholders: Vec<String> = agent_tags
1051 .iter()
1052 .enumerate()
1053 .map(|(i, _)| format!("?{}", param_idx + i))
1054 .collect();
1055 sql.push_str(&format!(
1056 " AND (
1057 NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1058 OR (
1059 SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1060 ) = (
1061 SELECT COUNT(*) FROM task_needed_tags
1062 WHERE task_id = t.id AND tag IN ({})
1063 )
1064 )",
1065 needed_placeholders.join(", ")
1066 ));
1067 for tag in agent_tags {
1068 params_vec.push(Box::new(tag.clone()));
1069 }
1070 param_idx += agent_tags.len();
1071
1072 let wanted_placeholders: Vec<String> = agent_tags
1074 .iter()
1075 .enumerate()
1076 .map(|(i, _)| format!("?{}", param_idx + i))
1077 .collect();
1078 sql.push_str(&format!(
1079 " AND (
1080 NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1081 OR EXISTS (
1082 SELECT 1 FROM task_wanted_tags
1083 WHERE task_id = t.id AND tag IN ({})
1084 )
1085 )",
1086 wanted_placeholders.join(", ")
1087 ));
1088 for tag in agent_tags {
1089 params_vec.push(Box::new(tag.clone()));
1090 }
1091 }
1093 }
1094
1095 let order_clause = build_order_clause(sort_by, sort_order);
1097 sql.push_str(&format!(" ORDER BY {}", order_clause));
1098
1099 if let Some(l) = limit {
1101 sql.push_str(&format!(" LIMIT {}", l));
1102 }
1103
1104 if offset > 0 {
1105 sql.push_str(&format!(" OFFSET {}", offset));
1106 }
1107
1108 let params_refs: Vec<&dyn rusqlite::ToSql> =
1109 params_vec.iter().map(|b| b.as_ref()).collect();
1110
1111 let mut stmt = conn.prepare(&sql)?;
1112 let tasks: Vec<Task> = stmt
1113 .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1114 .filter_map(|r| r.ok())
1115 .collect();
1116
1117 Ok(tasks)
1118 })
1119 }
1120
1121 pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1123 self.with_conn(|conn| {
1124 let result: Result<String, rusqlite::Error> = conn.query_row(
1125 "SELECT tags FROM workers WHERE id = ?1",
1126 params![agent_id],
1127 |row| row.get(0),
1128 );
1129
1130 match result {
1131 Ok(tags_json) => {
1132 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1133 Ok(tags)
1134 }
1135 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1136 Err(e) => Err(e.into()),
1137 }
1138 })
1139 }
1140
1141 pub(super) fn add_dependency_internal(
1143 conn: &Connection,
1144 from_task_id: &str,
1145 to_task_id: &str,
1146 dep_type: &str,
1147 ) -> Result<()> {
1148 conn.execute(
1149 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1150 params![from_task_id, to_task_id, dep_type],
1151 )?;
1152 Ok(())
1153 }
1154
1155 pub fn relink(
1159 &self,
1160 prev_from_ids: &[String],
1161 prev_to_ids: &[String],
1162 from_ids: &[String],
1163 to_ids: &[String],
1164 dep_type: &str,
1165 deps_config: &DependenciesConfig,
1166 ) -> Result<RelinkResult> {
1167 if !deps_config.is_valid_dep_type(dep_type) {
1169 return Err(anyhow!(
1170 "Invalid dependency type '{}'. Valid types: {:?}",
1171 dep_type,
1172 deps_config.dep_type_names()
1173 ));
1174 }
1175
1176 let def = deps_config
1177 .get_definition(dep_type)
1178 .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1179 let is_vertical = def.display == DependencyDisplay::Vertical;
1180
1181 self.with_conn_mut(|conn| {
1182 let tx = conn.transaction()?;
1183
1184 let mut unlinked = Vec::new();
1185 let mut linked = Vec::new();
1186 let mut errors = Vec::new();
1187
1188 for prev_from in prev_from_ids {
1190 for prev_to in prev_to_ids {
1191 let rows = tx.execute(
1192 "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1193 params![prev_from, prev_to, dep_type],
1194 )?;
1195 if rows > 0 {
1196 unlinked.push((prev_from.clone(), prev_to.clone()));
1197 }
1198 }
1199 }
1200
1201 for from_id in from_ids {
1203 for to_id in to_ids {
1204 if is_vertical {
1206 let existing_parent: Option<String> = tx.query_row(
1207 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1208 params![to_id],
1209 |row| row.get(0),
1210 ).optional()?;
1211
1212 if let Some(ref parent) = existing_parent
1213 && parent != from_id {
1214 errors.push(format!(
1215 "Task {} already has parent {}",
1216 to_id, parent
1217 ));
1218 continue;
1219 }
1220 }
1221
1222 if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1224 errors.push(format!(
1225 "Adding dependency {}→{} would create a cycle",
1226 from_id, to_id
1227 ));
1228 continue;
1229 }
1230
1231 tx.execute(
1232 "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1233 params![from_id, to_id, dep_type],
1234 )?;
1235 linked.push((from_id.clone(), to_id.clone()));
1236 }
1237 }
1238
1239 if !errors.is_empty() {
1240 tx.rollback()?;
1242 return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1243 }
1244
1245 tx.commit()?;
1246 Ok(RelinkResult { unlinked, linked })
1247 })
1248 }
1249
1250 pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1257 if depth == 0 {
1258 return Ok(vec![]);
1259 }
1260
1261 self.with_conn(|conn| {
1262 let mut visited: HashSet<String> = HashSet::new();
1263 let mut result: Vec<Task> = Vec::new();
1264 let mut current_level: Vec<String> = vec![task_id.to_string()];
1265 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1266
1267 while !current_level.is_empty() && levels_remaining > 0 {
1268 let mut next_level: Vec<String> = Vec::new();
1269
1270 for tid in ¤t_level {
1271 let mut stmt = conn.prepare(
1273 "SELECT DISTINCT d.from_task_id FROM dependencies d
1274 WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1275 )?;
1276
1277 let predecessors: Vec<String> = stmt
1278 .query_map(params![tid], |row| row.get(0))?
1279 .filter_map(|r| r.ok())
1280 .collect();
1281
1282 for pred_id in predecessors {
1283 if !visited.contains(&pred_id) {
1284 visited.insert(pred_id.clone());
1285 if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1286 result.push(task);
1287 }
1288 next_level.push(pred_id);
1289 }
1290 }
1291 }
1292
1293 current_level = next_level;
1294 levels_remaining -= 1;
1295 }
1296
1297 Ok(result)
1298 })
1299 }
1300
1301 pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1304 if depth == 0 {
1305 return Ok(vec![]);
1306 }
1307
1308 self.with_conn(|conn| {
1309 let mut visited: HashSet<String> = HashSet::new();
1310 let mut result: Vec<Task> = Vec::new();
1311 let mut current_level: Vec<String> = vec![task_id.to_string()];
1312 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1313
1314 while !current_level.is_empty() && levels_remaining > 0 {
1315 let mut next_level: Vec<String> = Vec::new();
1316
1317 for tid in ¤t_level {
1318 let mut stmt = conn.prepare(
1320 "SELECT DISTINCT d.to_task_id FROM dependencies d
1321 WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1322 )?;
1323
1324 let successors: Vec<String> = stmt
1325 .query_map(params![tid], |row| row.get(0))?
1326 .filter_map(|r| r.ok())
1327 .collect();
1328
1329 for succ_id in successors {
1330 if !visited.contains(&succ_id) {
1331 visited.insert(succ_id.clone());
1332 if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1333 result.push(task);
1334 }
1335 next_level.push(succ_id);
1336 }
1337 }
1338 }
1339
1340 current_level = next_level;
1341 levels_remaining -= 1;
1342 }
1343
1344 Ok(result)
1345 })
1346 }
1347
1348 pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1351 if depth == 0 {
1352 return Ok(vec![]);
1353 }
1354
1355 self.with_conn(|conn| {
1356 let mut result: Vec<Task> = Vec::new();
1357 let mut current_id = task_id.to_string();
1358 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1359
1360 while levels_remaining > 0 {
1361 let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1363 "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1364 params![¤t_id],
1365 |row| row.get(0),
1366 );
1367
1368 match parent_result {
1369 Ok(parent_id) => {
1370 if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1371 result.push(task);
1372 }
1373 current_id = parent_id;
1374 levels_remaining -= 1;
1375 }
1376 Err(rusqlite::Error::QueryReturnedNoRows) => break,
1377 Err(e) => return Err(e.into()),
1378 }
1379 }
1380
1381 Ok(result)
1382 })
1383 }
1384
1385 pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1388 if depth == 0 {
1389 return Ok(vec![]);
1390 }
1391
1392 self.with_conn(|conn| {
1393 let mut visited: HashSet<String> = HashSet::new();
1394 let mut result: Vec<Task> = Vec::new();
1395 let mut current_level: Vec<String> = vec![task_id.to_string()];
1396 let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1397
1398 while !current_level.is_empty() && levels_remaining > 0 {
1399 let mut next_level: Vec<String> = Vec::new();
1400
1401 for tid in ¤t_level {
1402 let mut stmt = conn.prepare(
1404 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1405 )?;
1406
1407 let children: Vec<String> = stmt
1408 .query_map(params![tid], |row| row.get(0))?
1409 .filter_map(|r| r.ok())
1410 .collect();
1411
1412 for child_id in children {
1413 if !visited.contains(&child_id) {
1414 visited.insert(child_id.clone());
1415 if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1416 result.push(task);
1417 }
1418 next_level.push(child_id);
1419 }
1420 }
1421 }
1422
1423 current_level = next_level;
1424 levels_remaining -= 1;
1425 }
1426
1427 Ok(result)
1428 })
1429 }
1430}
1431
1432fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1434 let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1435 let task = stmt
1436 .query_row(params![task_id], super::tasks::parse_task_row)
1437 .optional()?;
1438 Ok(task)
1439}
1440
1441pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1445 conn: &Connection,
1446 task_id: &str,
1447 states_config: &StatesConfig,
1448 deps_config: &DependenciesConfig,
1449) -> Result<Vec<String>> {
1450 let start_blocking_types = deps_config.start_blocking_types();
1451 if start_blocking_types.is_empty() {
1452 return Ok(vec![]);
1453 }
1454
1455 let state_placeholders: Vec<String> = states_config
1457 .blocking_states
1458 .iter()
1459 .enumerate()
1460 .map(|(i, _)| format!("?{}", i + 2))
1461 .collect();
1462 let state_clause = state_placeholders.join(", ");
1463
1464 let type_start = states_config.blocking_states.len() + 2;
1466 let type_placeholders: Vec<String> = start_blocking_types
1467 .iter()
1468 .enumerate()
1469 .map(|(i, _)| format!("?{}", type_start + i))
1470 .collect();
1471 let type_clause = type_placeholders.join(", ");
1472
1473 let sql = format!(
1474 "SELECT blocker.id FROM dependencies d
1475 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1476 WHERE d.to_task_id = ?1
1477 AND d.dep_type IN ({})
1478 AND blocker.status IN ({})",
1479 type_clause, state_clause
1480 );
1481
1482 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1483 params_vec.push(Box::new(task_id.to_string()));
1484 for state in &states_config.blocking_states {
1485 params_vec.push(Box::new(state.clone()));
1486 }
1487 for t in &start_blocking_types {
1488 params_vec.push(Box::new(t.to_string()));
1489 }
1490 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1491
1492 let mut stmt = conn.prepare(&sql)?;
1493 let blockers = stmt
1494 .query_map(params_refs.as_slice(), |row| {
1495 let id: String = row.get(0)?;
1496 Ok(id)
1497 })?
1498 .filter_map(|r| r.ok())
1499 .collect();
1500
1501 Ok(blockers)
1502}
1503
1504pub(crate) fn propagate_unblock_effects(
1521 conn: &Connection,
1522 completed_task_id: &str,
1523 agent_id: Option<&str>,
1524 states_config: &StatesConfig,
1525 deps_config: &DependenciesConfig,
1526 auto_advance: &AutoAdvanceConfig,
1527) -> Result<(Vec<String>, Vec<String>)> {
1528 let start_blocking_types = deps_config.start_blocking_types();
1530 if start_blocking_types.is_empty() {
1531 return Ok((vec![], vec![]));
1532 }
1533
1534 let type_placeholders: Vec<String> = start_blocking_types
1536 .iter()
1537 .enumerate()
1538 .map(|(i, _)| format!("?{}", i + 2))
1539 .collect();
1540 let type_clause = type_placeholders.join(", ");
1541
1542 let sql = format!(
1543 "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1544 type_clause
1545 );
1546
1547 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1548 params_vec.push(Box::new(completed_task_id.to_string()));
1549 for t in &start_blocking_types {
1550 params_vec.push(Box::new(t.to_string()));
1551 }
1552 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1553
1554 let mut stmt = conn.prepare(&sql)?;
1555 let dependent_task_ids: Vec<String> = stmt
1556 .query_map(params_refs.as_slice(), |row| row.get(0))?
1557 .filter_map(|r| r.ok())
1558 .collect();
1559
1560 let mut unblocked = Vec::new();
1561 let mut auto_advanced = Vec::new();
1562 let now = super::now_ms();
1563
1564 let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1566 let target_state = auto_advance.target_state.clone();
1567
1568 if should_auto_advance {
1570 let ts = target_state.as_ref().unwrap();
1571 if !states_config.is_valid_state(ts) {
1572 return Err(anyhow!(
1573 "Auto-advance target state '{}' is not a valid state",
1574 ts
1575 ));
1576 }
1577 }
1578
1579 for task_id in dependent_task_ids {
1580 let task = match get_task_by_id_internal(conn, &task_id)? {
1582 Some(t) => t,
1583 None => continue,
1584 };
1585
1586 if task.status != states_config.initial {
1588 continue;
1589 }
1590
1591 if task.worker_id.is_some() {
1593 continue;
1594 }
1595
1596 let state_placeholders: Vec<String> = states_config
1599 .blocking_states
1600 .iter()
1601 .enumerate()
1602 .map(|(i, _)| format!("?{}", i + 3))
1603 .collect();
1604 let state_clause = state_placeholders.join(", ");
1605
1606 let type_start = states_config.blocking_states.len() + 3;
1608 let type_placeholders2: Vec<String> = start_blocking_types
1609 .iter()
1610 .enumerate()
1611 .map(|(i, _)| format!("?{}", type_start + i))
1612 .collect();
1613 let type_clause2 = type_placeholders2.join(", ");
1614
1615 let blocker_sql = format!(
1616 "SELECT COUNT(*) FROM dependencies d
1617 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1618 WHERE d.to_task_id = ?1
1619 AND d.from_task_id != ?2
1620 AND d.dep_type IN ({})
1621 AND blocker.status IN ({})",
1622 type_clause2, state_clause
1623 );
1624
1625 let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1626 blocker_params.push(Box::new(task_id.clone()));
1627 blocker_params.push(Box::new(completed_task_id.to_string()));
1628 for state in &states_config.blocking_states {
1629 blocker_params.push(Box::new(state.clone()));
1630 }
1631 for t in &start_blocking_types {
1632 blocker_params.push(Box::new(t.to_string()));
1633 }
1634 let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1635 blocker_params.iter().map(|b| b.as_ref()).collect();
1636
1637 let remaining_blockers: i32 =
1638 conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1639
1640 if remaining_blockers > 0 {
1641 continue; }
1643
1644 unblocked.push(task_id.clone());
1646
1647 if should_auto_advance {
1649 let ts = target_state.as_ref().unwrap();
1650
1651 if !states_config.is_valid_transition(&states_config.initial, ts) {
1653 continue;
1655 }
1656
1657 conn.execute(
1659 "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1660 params![ts, now, &task_id],
1661 )?;
1662
1663 let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1665 super::state_transitions::record_state_transition(
1666 conn,
1667 &task_id,
1668 ts,
1669 agent_id,
1670 Some(&reason),
1671 states_config,
1672 )?;
1673
1674 auto_advanced.push(task_id);
1675 }
1676 }
1677
1678 Ok((unblocked, auto_advanced))
1679}