Skip to main content

task_graph_mcp/db/
deps.rs

1//! Dependency operations and cycle detection with typed dependencies.
2
3use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10/// Result of a relink operation.
11#[derive(Debug)]
12pub struct RelinkResult {
13    /// Dependencies that were unlinked (from, to).
14    pub unlinked: Vec<(String, String)>,
15    /// Dependencies that were linked (from, to).
16    pub linked: Vec<(String, String)>,
17}
18
19/// Check if adding a dependency would create a cycle (transaction-safe version).
20fn would_create_cycle_in_tx(
21    tx: &rusqlite::Transaction,
22    from_task_id: &str,
23    to_task_id: &str,
24    dep_type: &str,
25    deps_config: &DependenciesConfig,
26) -> Result<bool> {
27    let def = deps_config.get_definition(dep_type).unwrap();
28
29    // A cycle would occur if to_task can already reach from_task
30    // through the same "graph" (horizontal or vertical)
31    let mut visited: HashSet<String> = HashSet::new();
32    let mut queue: VecDeque<String> = VecDeque::new();
33    queue.push_back(to_task_id.to_string());
34
35    while let Some(current) = queue.pop_front() {
36        if current == from_task_id {
37            return Ok(true); // Would create a cycle
38        }
39
40        if visited.contains(&current) {
41            continue;
42        }
43        visited.insert(current.clone());
44
45        // Get all tasks that current points to (in the relevant graph)
46        let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
47            // For vertical deps, only check containment relationships
48            let mut stmt = tx.prepare(
49                "SELECT to_task_id FROM dependencies d
50                 JOIN (SELECT value FROM json_each(?1)) types
51                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
52            )?;
53            let vertical_types: Vec<&str> = deps_config.vertical_types();
54            let types_json = serde_json::to_string(&vertical_types)?;
55            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
56                .filter_map(|r| r.ok())
57                .collect()
58        } else {
59            // For horizontal deps, check all start-blocking relationships
60            let mut stmt = tx.prepare(
61                "SELECT to_task_id FROM dependencies d
62                 JOIN (SELECT value FROM json_each(?1)) types
63                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
64            )?;
65            let start_blocking: Vec<&str> = deps_config.start_blocking_types();
66            let types_json = serde_json::to_string(&start_blocking)?;
67            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
68                .filter_map(|r| r.ok())
69                .collect()
70        };
71
72        for dep in deps {
73            if !visited.contains(&dep) {
74                queue.push_back(dep);
75            }
76        }
77    }
78
79    Ok(false)
80}
81
82/// Build an ORDER BY clause from sort_by and sort_order parameters.
83/// Returns a safe SQL ORDER BY expression.
84fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
85    let field = match sort_by {
86        Some("priority") => "CAST(t.priority AS INTEGER)",
87        Some("created_at") => "t.created_at",
88        Some("updated_at") => "t.updated_at",
89        _ => "t.created_at", // default
90    };
91
92    let order = match sort_order {
93        Some("asc") => "ASC",
94        Some("desc") => "DESC",
95        _ => {
96            // Default: priority is descending (higher number = more important), dates are descending
97            "DESC"
98        }
99    };
100
101    format!("{} {}", field, order)
102}
103
104impl Database {
105    /// Add a typed dependency (from blocks/contains to).
106    pub fn add_dependency(
107        &self,
108        from_task_id: &str,
109        to_task_id: &str,
110        dep_type: &str,
111        deps_config: &DependenciesConfig,
112    ) -> Result<()> {
113        // Validate dependency type
114        if !deps_config.is_valid_dep_type(dep_type) {
115            return Err(anyhow!(
116                "Invalid dependency type '{}'. Valid types: {:?}",
117                dep_type,
118                deps_config.dep_type_names()
119            ));
120        }
121
122        // For vertical (contains) dependencies, check single-parent constraint
123        let def = deps_config.get_definition(dep_type).unwrap();
124        if def.display == DependencyDisplay::Vertical
125            && let Some(existing_parent) = self.get_parent(to_task_id)?
126                && existing_parent != from_task_id {
127                    return Err(anyhow!(
128                        "Task {} already has parent {}",
129                        to_task_id,
130                        existing_parent
131                    ));
132                }
133
134        // Check for cycle in the appropriate graph
135        if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
136            return Err(anyhow!("Adding this dependency would create a cycle"));
137        }
138
139        self.with_conn(|conn| {
140            conn.execute(
141                "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
142                params![from_task_id, to_task_id, dep_type],
143            )?;
144            Ok(())
145        })
146    }
147
148    /// Check if adding a dependency would create a cycle.
149    /// For horizontal deps: check cycle in the start-blocking graph.
150    /// For vertical deps: check containment cycle.
151    pub fn would_create_cycle(
152        &self,
153        from_task_id: &str,
154        to_task_id: &str,
155        dep_type: &str,
156        deps_config: &DependenciesConfig,
157    ) -> Result<bool> {
158        let def = deps_config.get_definition(dep_type).unwrap();
159
160        self.with_conn(|conn| {
161            // A cycle would occur if to_task can already reach from_task
162            // through the same "graph" (horizontal or vertical)
163            let mut visited: HashSet<String> = HashSet::new();
164            let mut queue: VecDeque<String> = VecDeque::new();
165            queue.push_back(to_task_id.to_string());
166
167            while let Some(current) = queue.pop_front() {
168                if current == from_task_id {
169                    return Ok(true); // Would create a cycle
170                }
171
172                if visited.contains(&current) {
173                    continue;
174                }
175                visited.insert(current.clone());
176
177                // Get all tasks that current points to (in the relevant graph)
178                let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
179                    // For vertical deps, only check containment relationships
180                    let mut stmt = conn.prepare(
181                        "SELECT to_task_id FROM dependencies d
182                         JOIN (SELECT value FROM json_each(?1)) types
183                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
184                    )?;
185                    let vertical_types: Vec<&str> = deps_config.vertical_types();
186                    let types_json = serde_json::to_string(&vertical_types)?;
187                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
188                        .filter_map(|r| r.ok())
189                        .collect()
190                } else {
191                    // For horizontal deps, check all start-blocking relationships
192                    let mut stmt = conn.prepare(
193                        "SELECT to_task_id FROM dependencies d
194                         JOIN (SELECT value FROM json_each(?1)) types
195                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
196                    )?;
197                    let start_blocking: Vec<&str> = deps_config.start_blocking_types();
198                    let types_json = serde_json::to_string(&start_blocking)?;
199                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
200                        .filter_map(|r| r.ok())
201                        .collect()
202                };
203
204                for dep in deps {
205                    if !visited.contains(&dep) {
206                        queue.push_back(dep);
207                    }
208                }
209            }
210
211            Ok(false)
212        })
213    }
214
215    /// Remove a typed dependency. Returns true if a row was deleted.
216    pub fn remove_dependency(
217        &self,
218        from_task_id: &str,
219        to_task_id: &str,
220        dep_type: &str,
221    ) -> Result<bool> {
222        self.with_conn(|conn| {
223            let rows = conn.execute(
224                "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
225                params![from_task_id, to_task_id, dep_type],
226            )?;
227            Ok(rows > 0)
228        })
229    }
230
231    /// Remove all dependencies of a given type from a task (outgoing edges).
232    /// Returns the list of removed dependencies.
233    pub fn remove_all_outgoing_dependencies(
234        &self,
235        from_task_id: &str,
236        dep_type: &str,
237    ) -> Result<Vec<Dependency>> {
238        self.with_conn_mut(|conn| {
239            let tx = conn.transaction()?;
240
241            // First get the dependencies that will be removed
242            let deps: Vec<Dependency> = {
243                let mut stmt = tx.prepare(
244                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
245                )?;
246                stmt
247                    .query_map(params![from_task_id, dep_type], |row| {
248                        Ok(Dependency {
249                            from_task_id: row.get(0)?,
250                            to_task_id: row.get(1)?,
251                            dep_type: row.get(2)?,
252                        })
253                    })?
254                    .filter_map(|r| r.ok())
255                    .collect()
256            };
257
258            // Then delete them
259            tx.execute(
260                "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
261                params![from_task_id, dep_type],
262            )?;
263
264            tx.commit()?;
265            Ok(deps)
266        })
267    }
268
269    /// Remove all dependencies of a given type to a task (incoming edges).
270    /// Returns the list of removed dependencies.
271    pub fn remove_all_incoming_dependencies(
272        &self,
273        to_task_id: &str,
274        dep_type: &str,
275    ) -> Result<Vec<Dependency>> {
276        self.with_conn_mut(|conn| {
277            let tx = conn.transaction()?;
278
279            // First get the dependencies that will be removed
280            let deps: Vec<Dependency> = {
281                let mut stmt = tx.prepare(
282                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
283                )?;
284                stmt
285                    .query_map(params![to_task_id, dep_type], |row| {
286                        Ok(Dependency {
287                            from_task_id: row.get(0)?,
288                            to_task_id: row.get(1)?,
289                            dep_type: row.get(2)?,
290                        })
291                    })?
292                    .filter_map(|r| r.ok())
293                    .collect()
294            };
295
296            // Then delete them
297            tx.execute(
298                "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
299                params![to_task_id, dep_type],
300            )?;
301
302            tx.commit()?;
303            Ok(deps)
304        })
305    }
306
307    /// Get all dependencies.
308    pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
309        self.with_conn(|conn| {
310            let mut stmt =
311                conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
312
313            let deps = stmt
314                .query_map([], |row| {
315                    let from: String = row.get(0)?;
316                    let to: String = row.get(1)?;
317                    let dep_type: String = row.get(2)?;
318                    Ok(Dependency {
319                        from_task_id: from,
320                        to_task_id: to,
321                        dep_type,
322                    })
323                })?
324                .filter_map(|r| r.ok())
325                .collect();
326
327            Ok(deps)
328        })
329    }
330
331    /// Get dependencies of a specific type for a task.
332    pub fn get_dependencies_by_type(
333        &self,
334        task_id: &str,
335        dep_type: &str,
336        direction: &str,
337    ) -> Result<Vec<Dependency>> {
338        self.with_conn(|conn| {
339            let sql = if direction == "incoming" {
340                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
341            } else {
342                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
343            };
344
345            let mut stmt = conn.prepare(sql)?;
346
347            let deps = stmt
348                .query_map(params![task_id, dep_type], |row| {
349                    let from: String = row.get(0)?;
350                    let to: String = row.get(1)?;
351                    let dep_type: String = row.get(2)?;
352                    Ok(Dependency {
353                        from_task_id: from,
354                        to_task_id: to,
355                        dep_type,
356                    })
357                })?
358                .filter_map(|r| r.ok())
359                .collect();
360
361            Ok(deps)
362        })
363    }
364
365    /// Get tasks that block a given task from starting (dep_type with blocks: start).
366    pub fn get_start_blockers(
367        &self,
368        task_id: &str,
369        deps_config: &DependenciesConfig,
370    ) -> Result<Vec<String>> {
371        let start_blocking_types = deps_config.start_blocking_types();
372        if start_blocking_types.is_empty() {
373            return Ok(vec![]);
374        }
375
376        self.with_conn(|conn| {
377            let placeholders: String = start_blocking_types
378                .iter()
379                .enumerate()
380                .map(|(i, _)| format!("?{}", i + 2))
381                .collect::<Vec<_>>()
382                .join(", ");
383
384            let sql = format!(
385                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
386                placeholders
387            );
388
389            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
390            params_vec.push(Box::new(task_id.to_string()));
391            for t in &start_blocking_types {
392                params_vec.push(Box::new(t.to_string()));
393            }
394            let params_refs: Vec<&dyn rusqlite::ToSql> =
395                params_vec.iter().map(|b| b.as_ref()).collect();
396
397            let mut stmt = conn.prepare(&sql)?;
398            let blockers = stmt
399                .query_map(params_refs.as_slice(), |row| {
400                    let id: String = row.get(0)?;
401                    Ok(id)
402                })?
403                .filter_map(|r| r.ok())
404                .collect();
405
406            Ok(blockers)
407        })
408    }
409
410    /// Get tasks that block a given task from completing (dep_type with blocks: completion).
411    /// For a parent task, this returns children that must complete first.
412    pub fn get_completion_blockers(
413        &self,
414        task_id: &str,
415        deps_config: &DependenciesConfig,
416    ) -> Result<Vec<String>> {
417        let completion_blocking_types = deps_config.completion_blocking_types();
418        if completion_blocking_types.is_empty() {
419            return Ok(vec![]);
420        }
421
422        self.with_conn(|conn| {
423            let placeholders: String = completion_blocking_types
424                .iter()
425                .enumerate()
426                .map(|(i, _)| format!("?{}", i + 2))
427                .collect::<Vec<_>>()
428                .join(", ");
429
430            // For completion blockers, we look at outgoing edges (from_task_id = parent)
431            // because "contains" means parent -> child, and child blocks parent completion
432            let sql = format!(
433                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
434                placeholders
435            );
436
437            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
438            params_vec.push(Box::new(task_id.to_string()));
439            for t in &completion_blocking_types {
440                params_vec.push(Box::new(t.to_string()));
441            }
442            let params_refs: Vec<&dyn rusqlite::ToSql> =
443                params_vec.iter().map(|b| b.as_ref()).collect();
444
445            let mut stmt = conn.prepare(&sql)?;
446            let blockers = stmt
447                .query_map(params_refs.as_slice(), |row| {
448                    let id: String = row.get(0)?;
449                    Ok(id)
450                })?
451                .filter_map(|r| r.ok())
452                .collect();
453
454            Ok(blockers)
455        })
456    }
457
458    /// Get the parent of a task (via 'contains' dependency).
459    pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
460        self.with_conn(|conn| {
461            let result: Result<String, rusqlite::Error> = conn.query_row(
462                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
463                params![task_id],
464                |row| row.get(0),
465            );
466
467            match result {
468                Ok(parent_id) => Ok(Some(parent_id)),
469                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
470                Err(e) => Err(e.into()),
471            }
472        })
473    }
474
475    /// Get children of a task (via 'contains' dependency).
476    pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
477        self.with_conn(|conn| {
478            let mut stmt = conn.prepare(
479                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
480            )?;
481
482            let children = stmt
483                .query_map(params![task_id], |row| {
484                    let id: String = row.get(0)?;
485                    Ok(id)
486                })?
487                .filter_map(|r| r.ok())
488                .collect();
489
490            Ok(children)
491        })
492    }
493
494    /// Get all tasks that block a given task (backwards compatible).
495    /// Returns tasks from both 'blocks' and 'follows' dependencies.
496    pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
497        self.with_conn(|conn| {
498            let mut stmt = conn.prepare(
499                "SELECT from_task_id FROM dependencies 
500                 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
501            )?;
502
503            let blockers = stmt
504                .query_map(params![task_id], |row| {
505                    let id: String = row.get(0)?;
506                    Ok(id)
507                })?
508                .filter_map(|r| r.ok())
509                .collect();
510
511            Ok(blockers)
512        })
513    }
514
515    /// Get tasks that a given task blocks.
516    #[allow(dead_code)]
517    pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
518        self.with_conn(|conn| {
519            let mut stmt = conn.prepare(
520                "SELECT to_task_id FROM dependencies 
521                 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
522            )?;
523
524            let blocking = stmt
525                .query_map(params![task_id], |row| {
526                    let id: String = row.get(0)?;
527                    Ok(id)
528                })?
529                .filter_map(|r| r.ok())
530                .collect();
531
532            Ok(blocking)
533        })
534    }
535
536    /// Get tasks that are blocked by incomplete start dependencies.
537    /// A task is blocked if any of its start-blocking dependencies are in a blocking state.
538    /// Excludes soft-deleted tasks.
539    pub fn get_blocked_tasks(
540        &self,
541        states_config: &StatesConfig,
542        deps_config: &DependenciesConfig,
543        sort_by: Option<&str>,
544        sort_order: Option<&str>,
545    ) -> Result<Vec<Task>> {
546        let start_blocking_types = deps_config.start_blocking_types();
547        if start_blocking_types.is_empty() {
548            return Ok(vec![]);
549        }
550
551        self.with_conn(|conn| {
552            // Build IN clause from blocking_states
553            let state_placeholders: Vec<String> = states_config
554                .blocking_states
555                .iter()
556                .enumerate()
557                .map(|(i, _)| format!("?{}", i + 2))
558                .collect();
559            let state_clause = state_placeholders.join(", ");
560
561            // Build IN clause from start_blocking_types
562            let type_start = states_config.blocking_states.len() + 2;
563            let type_placeholders: Vec<String> = start_blocking_types
564                .iter()
565                .enumerate()
566                .map(|(i, _)| format!("?{}", type_start + i))
567                .collect();
568            let type_clause = type_placeholders.join(", ");
569
570            // Build ORDER BY clause
571            let order_clause = build_order_clause(sort_by, sort_order);
572
573            let sql = format!(
574                "SELECT DISTINCT t.*
575                 FROM tasks t
576                 INNER JOIN dependencies d ON t.id = d.to_task_id
577                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
578                 WHERE d.dep_type IN ({})
579                 AND blocker.status IN ({})
580                 AND t.status = ?1
581                 AND t.deleted_at IS NULL
582                 ORDER BY {}",
583                type_clause, state_clause, order_clause
584            );
585
586            let mut stmt = conn.prepare(&sql)?;
587
588            // Build params: initial state + blocking states + start_blocking_types
589            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
590            params_vec.push(Box::new(states_config.initial.clone()));
591            for state in &states_config.blocking_states {
592                params_vec.push(Box::new(state.clone()));
593            }
594            for t in &start_blocking_types {
595                params_vec.push(Box::new(t.to_string()));
596            }
597            let params_refs: Vec<&dyn rusqlite::ToSql> =
598                params_vec.iter().map(|b| b.as_ref()).collect();
599
600            let tasks = stmt
601                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
602                .filter_map(|r| r.ok())
603                .collect();
604
605            Ok(tasks)
606        })
607    }
608
609    /// Get tasks that are ready to be claimed (all start dependencies satisfied).
610    /// A task is ready if it's in the initial state, unclaimed, and all start-blocking deps are not blocking.
611    /// When agent_id is provided, also filters by agent's tag qualifications using junction tables.
612    /// Excludes soft-deleted tasks.
613    pub fn get_ready_tasks(
614        &self,
615        agent_id: Option<&str>,
616        states_config: &StatesConfig,
617        deps_config: &DependenciesConfig,
618        sort_by: Option<&str>,
619        sort_order: Option<&str>,
620    ) -> Result<Vec<Task>> {
621        let start_blocking_types = deps_config.start_blocking_types();
622
623        // Get agent tags if agent_id is provided (for junction table filtering)
624        let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
625            Some(self.get_agent_tags(aid)?)
626        } else {
627            None
628        };
629
630        self.with_conn(|conn| {
631            // Build IN clause from blocking_states
632            let state_placeholders: Vec<String> = states_config
633                .blocking_states
634                .iter()
635                .enumerate()
636                .map(|(i, _)| format!("?{}", i + 2))
637                .collect();
638            let state_clause = state_placeholders.join(", ");
639
640            // Build IN clause from start_blocking_types
641            let type_start = states_config.blocking_states.len() + 2;
642            let type_placeholders: Vec<String> = start_blocking_types
643                .iter()
644                .enumerate()
645                .map(|(i, _)| format!("?{}", type_start + i))
646                .collect();
647            let type_clause = type_placeholders.join(", ");
648
649            // Build ORDER BY clause - for ready tasks, default is priority then created_at
650            let order_clause = if sort_by.is_some() {
651                build_order_clause(sort_by, sort_order)
652            } else {
653                // Default for ready: priority (high first), then created_at
654                "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
655            };
656
657            // Track param index for agent tag filters
658            let mut param_idx = type_start + start_blocking_types.len();
659
660            // Build agent qualification filters using junction tables
661            let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
662                // For agent_tags_all (AND): agent must have ALL needed tags
663                // Count how many of the task's needed_tags match agent's tags
664                // Either the task has no needed_tags, or all must match
665                let needed_placeholders: Vec<String> = tags
666                    .iter()
667                    .enumerate()
668                    .map(|(i, _)| format!("?{}", param_idx + i))
669                    .collect();
670                param_idx += tags.len();
671
672                let needed_clause = if needed_placeholders.is_empty() {
673                    // Agent has no tags - only match tasks with no needed_tags
674                    "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
675                        .to_string()
676                } else {
677                    // Task must have no needed_tags OR agent must have all of them
678                    format!(
679                        "AND (
680                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
681                            OR (
682                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
683                            ) = (
684                                SELECT COUNT(*) FROM task_needed_tags 
685                                WHERE task_id = t.id AND tag IN ({})
686                            )
687                        )",
688                        needed_placeholders.join(", ")
689                    )
690                };
691
692                // For agent_tags_any (OR): agent must have at least ONE wanted tag
693                let wanted_placeholders: Vec<String> = tags
694                    .iter()
695                    .enumerate()
696                    .map(|(i, _)| format!("?{}", param_idx + i))
697                    .collect();
698
699                let wanted_clause = if wanted_placeholders.is_empty() {
700                    // Agent has no tags - only match tasks with no wanted_tags
701                    "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
702                        .to_string()
703                } else {
704                    // Task must have no wanted_tags OR agent must have at least one
705                    format!(
706                        "AND (
707                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
708                            OR EXISTS (
709                                SELECT 1 FROM task_wanted_tags 
710                                WHERE task_id = t.id AND tag IN ({})
711                            )
712                        )",
713                        wanted_placeholders.join(", ")
714                    )
715                };
716
717                (needed_clause, wanted_clause)
718            } else {
719                (String::new(), String::new())
720            };
721
722            let sql = format!(
723                "SELECT t.*
724                 FROM tasks t
725                 WHERE t.status = ?1
726                 AND t.worker_id IS NULL
727                 AND t.deleted_at IS NULL
728                 AND NOT EXISTS (
729                     SELECT 1 FROM dependencies d
730                     INNER JOIN tasks blocker ON d.from_task_id = blocker.id
731                     WHERE d.to_task_id = t.id 
732                     AND d.dep_type IN ({})
733                     AND blocker.status IN ({})
734                 )
735                 {}
736                 {}
737                 ORDER BY {}",
738                type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
739            );
740
741            let mut stmt = conn.prepare(&sql)?;
742
743            // Build params: initial state + blocking states + types + agent tags (twice if present)
744            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
745            params_vec.push(Box::new(states_config.initial.clone()));
746            for state in &states_config.blocking_states {
747                params_vec.push(Box::new(state.clone()));
748            }
749            for t in &start_blocking_types {
750                params_vec.push(Box::new(t.to_string()));
751            }
752            // Add agent tags twice (once for needed_tags check, once for wanted_tags check)
753            if let Some(ref tags) = agent_tags {
754                for tag in tags {
755                    params_vec.push(Box::new(tag.clone()));
756                }
757                for tag in tags {
758                    params_vec.push(Box::new(tag.clone()));
759                }
760            }
761            let params_refs: Vec<&dyn rusqlite::ToSql> =
762                params_vec.iter().map(|b| b.as_ref()).collect();
763
764            let tasks: Vec<Task> = stmt
765                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
766                .filter_map(|r| r.ok())
767                .collect();
768
769            Ok(tasks)
770        })
771    }
772
773    /// Check if a task has unmet start dependencies.
774    #[allow(dead_code)]
775    pub fn has_unmet_start_dependencies(
776        &self,
777        task_id: &str,
778        states_config: &StatesConfig,
779        deps_config: &DependenciesConfig,
780    ) -> Result<bool> {
781        let start_blocking_types = deps_config.start_blocking_types();
782        if start_blocking_types.is_empty() {
783            return Ok(false);
784        }
785
786        self.with_conn(|conn| {
787            // Build IN clause from blocking_states
788            let state_placeholders: Vec<String> = states_config
789                .blocking_states
790                .iter()
791                .enumerate()
792                .map(|(i, _)| format!("?{}", i + 2))
793                .collect();
794            let state_clause = state_placeholders.join(", ");
795
796            // Build IN clause from types
797            let type_start = states_config.blocking_states.len() + 2;
798            let type_placeholders: Vec<String> = start_blocking_types
799                .iter()
800                .enumerate()
801                .map(|(i, _)| format!("?{}", type_start + i))
802                .collect();
803            let type_clause = type_placeholders.join(", ");
804
805            let sql = format!(
806                "SELECT COUNT(*) FROM dependencies d
807                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
808                 WHERE d.to_task_id = ?1 
809                 AND d.dep_type IN ({})
810                 AND blocker.status IN ({})",
811                type_clause, state_clause
812            );
813
814            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
815            params_vec.push(Box::new(task_id.to_string()));
816            for state in &states_config.blocking_states {
817                params_vec.push(Box::new(state.clone()));
818            }
819            for t in &start_blocking_types {
820                params_vec.push(Box::new(t.to_string()));
821            }
822            let params_refs: Vec<&dyn rusqlite::ToSql> =
823                params_vec.iter().map(|b| b.as_ref()).collect();
824
825            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
826
827            Ok(count > 0)
828        })
829    }
830
831    /// Check if a task has incomplete children (blocking completion).
832    pub fn has_incomplete_children(
833        &self,
834        task_id: &str,
835        states_config: &StatesConfig,
836    ) -> Result<bool> {
837        self.with_conn(|conn| {
838            // Build IN clause from blocking_states
839            let state_placeholders: Vec<String> = states_config
840                .blocking_states
841                .iter()
842                .enumerate()
843                .map(|(i, _)| format!("?{}", i + 2))
844                .collect();
845            let state_clause = state_placeholders.join(", ");
846
847            let sql = format!(
848                "SELECT COUNT(*) FROM dependencies d
849                 INNER JOIN tasks child ON d.to_task_id = child.id
850                 WHERE d.from_task_id = ?1 
851                 AND d.dep_type = 'contains'
852                 AND child.status IN ({})",
853                state_clause
854            );
855
856            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
857            params_vec.push(Box::new(task_id.to_string()));
858            for state in &states_config.blocking_states {
859                params_vec.push(Box::new(state.clone()));
860            }
861            let params_refs: Vec<&dyn rusqlite::ToSql> =
862                params_vec.iter().map(|b| b.as_ref()).collect();
863
864            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
865
866            Ok(count > 0)
867        })
868    }
869
870    /// Get tasks with tag-based filtering using junction tables for indexed lookups.
871    /// - `tags_any`: Task must have at least one of these tags (OR)
872    /// - `tags_all`: Task must have all of these tags (AND)
873    /// - `qualified_for_agent_tags`: If provided, only return tasks where these tags satisfy the task's agent_tags_all/agent_tags_any
874    ///
875    /// Excludes soft-deleted tasks.
876    #[allow(clippy::too_many_arguments)]
877    pub fn list_tasks_with_tag_filters(
878        &self,
879        status: Option<Vec<String>>,
880        owner: Option<&str>,
881        parent_id: Option<Option<&str>>,
882        tags_any: Option<Vec<String>>,
883        tags_all: Option<Vec<String>>,
884        qualified_for_agent_tags: Option<Vec<String>>,
885        limit: Option<i32>,
886        sort_by: Option<&str>,
887        sort_order: Option<&str>,
888    ) -> Result<Vec<Task>> {
889        self.with_conn(|conn| {
890            let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
891            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
892            let mut param_idx = 1;
893
894            // Status filter (can be single or multiple)
895            if let Some(ref statuses) = status {
896                if statuses.len() == 1 {
897                    sql.push_str(&format!(" AND t.status = ?{}", param_idx));
898                    params_vec.push(Box::new(statuses[0].clone()));
899                    param_idx += 1;
900                } else if statuses.len() > 1 {
901                    let placeholders: Vec<String> = statuses
902                        .iter()
903                        .enumerate()
904                        .map(|(i, _)| format!("?{}", param_idx + i))
905                        .collect();
906                    sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
907                    for s in statuses {
908                        params_vec.push(Box::new(s.clone()));
909                    }
910                    param_idx += statuses.len();
911                }
912            }
913
914            // Owner filter
915            if let Some(o) = owner {
916                sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
917                params_vec.push(Box::new(o.to_string()));
918                param_idx += 1;
919            }
920
921            // Parent filter via dependencies table
922            if let Some(p) = parent_id {
923                match p {
924                    Some(pid) => {
925                        sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
926                        params_vec.push(Box::new(pid.to_string()));
927                        param_idx += 1;
928                    }
929                    None => {
930                        // Root tasks: not contained by any other task
931                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
932                    }
933                }
934            }
935
936            // tags_any: Task must have at least one of these tags (OR) - uses task_tags junction
937            if let Some(ref any_tags) = tags_any
938                && !any_tags.is_empty() {
939                    let placeholders: Vec<String> = any_tags
940                        .iter()
941                        .enumerate()
942                        .map(|(i, _)| format!("?{}", param_idx + i))
943                        .collect();
944                    sql.push_str(&format!(
945                        " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
946                        placeholders.join(", ")
947                    ));
948                    for tag in any_tags {
949                        params_vec.push(Box::new(tag.clone()));
950                    }
951                    param_idx += any_tags.len();
952                }
953
954            // tags_all: Task must have all of these tags (AND) - uses task_tags junction
955            if let Some(ref all_tags) = tags_all
956                && !all_tags.is_empty() {
957                    let placeholders: Vec<String> = all_tags
958                        .iter()
959                        .enumerate()
960                        .map(|(i, _)| format!("?{}", param_idx + i))
961                        .collect();
962                    // Count matching tags must equal total required tags
963                    sql.push_str(&format!(
964                        " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
965                        placeholders.join(", "),
966                        all_tags.len()
967                    ));
968                    for tag in all_tags {
969                        params_vec.push(Box::new(tag.clone()));
970                    }
971                    param_idx += all_tags.len();
972                }
973
974            // qualified_for: Agent's tags must satisfy task's requirements - uses junction tables
975            if let Some(ref agent_tags) = qualified_for_agent_tags {
976                // Agent must have ALL of task's agent_tags_all
977                if agent_tags.is_empty() {
978                    // Agent has no tags - only match tasks with no needed_tags
979                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
980                    // And no wanted_tags
981                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
982                } else {
983                    // For needed_tags (AND): task must have no needed_tags OR agent has all
984                    let needed_placeholders: Vec<String> = agent_tags
985                        .iter()
986                        .enumerate()
987                        .map(|(i, _)| format!("?{}", param_idx + i))
988                        .collect();
989                    sql.push_str(&format!(
990                        " AND (
991                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
992                            OR (
993                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
994                            ) = (
995                                SELECT COUNT(*) FROM task_needed_tags 
996                                WHERE task_id = t.id AND tag IN ({})
997                            )
998                        )",
999                        needed_placeholders.join(", ")
1000                    ));
1001                    for tag in agent_tags {
1002                        params_vec.push(Box::new(tag.clone()));
1003                    }
1004                    param_idx += agent_tags.len();
1005
1006                    // For wanted_tags (OR): task must have no wanted_tags OR agent has at least one
1007                    let wanted_placeholders: Vec<String> = agent_tags
1008                        .iter()
1009                        .enumerate()
1010                        .map(|(i, _)| format!("?{}", param_idx + i))
1011                        .collect();
1012                    sql.push_str(&format!(
1013                        " AND (
1014                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1015                            OR EXISTS (
1016                                SELECT 1 FROM task_wanted_tags 
1017                                WHERE task_id = t.id AND tag IN ({})
1018                            )
1019                        )",
1020                        wanted_placeholders.join(", ")
1021                    ));
1022                    for tag in agent_tags {
1023                        params_vec.push(Box::new(tag.clone()));
1024                    }
1025                    // param_idx += agent_tags.len(); // not needed, last param set
1026                }
1027            }
1028
1029            // Build ORDER BY clause
1030            let order_clause = build_order_clause(sort_by, sort_order);
1031            sql.push_str(&format!(" ORDER BY {}", order_clause));
1032
1033            // Apply limit in SQL
1034            if let Some(l) = limit {
1035                sql.push_str(&format!(" LIMIT {}", l));
1036            }
1037
1038            let params_refs: Vec<&dyn rusqlite::ToSql> =
1039                params_vec.iter().map(|b| b.as_ref()).collect();
1040
1041            let mut stmt = conn.prepare(&sql)?;
1042            let tasks: Vec<Task> = stmt
1043                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1044                .filter_map(|r| r.ok())
1045                .collect();
1046
1047            Ok(tasks)
1048        })
1049    }
1050
1051    /// Get agent tags by agent ID.
1052    pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1053        self.with_conn(|conn| {
1054            let result: Result<String, rusqlite::Error> = conn.query_row(
1055                "SELECT tags FROM workers WHERE id = ?1",
1056                params![agent_id],
1057                |row| row.get(0),
1058            );
1059
1060            match result {
1061                Ok(tags_json) => {
1062                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1063                    Ok(tags)
1064                }
1065                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1066                Err(e) => Err(e.into()),
1067            }
1068        })
1069    }
1070
1071    /// Internal helper: add dependency within a transaction (used by tasks.rs).
1072    pub(super) fn add_dependency_internal(
1073        conn: &Connection,
1074        from_task_id: &str,
1075        to_task_id: &str,
1076        dep_type: &str,
1077    ) -> Result<()> {
1078        conn.execute(
1079            "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1080            params![from_task_id, to_task_id, dep_type],
1081        )?;
1082        Ok(())
1083    }
1084
1085    /// Atomically relink dependencies: unlink all prev_from→prev_to, then link all from→to.
1086    /// This is a transaction-safe operation for moving children between parents.
1087    /// Returns a result with unlinked and linked pairs.
1088    pub fn relink(
1089        &self,
1090        prev_from_ids: &[String],
1091        prev_to_ids: &[String],
1092        from_ids: &[String],
1093        to_ids: &[String],
1094        dep_type: &str,
1095        deps_config: &DependenciesConfig,
1096    ) -> Result<RelinkResult> {
1097        // Validate dependency type upfront
1098        if !deps_config.is_valid_dep_type(dep_type) {
1099            return Err(anyhow!(
1100                "Invalid dependency type '{}'. Valid types: {:?}",
1101                dep_type,
1102                deps_config.dep_type_names()
1103            ));
1104        }
1105
1106        let def = deps_config.get_definition(dep_type).unwrap();
1107        let is_vertical = def.display == DependencyDisplay::Vertical;
1108
1109        self.with_conn_mut(|conn| {
1110            let tx = conn.transaction()?;
1111
1112            let mut unlinked = Vec::new();
1113            let mut linked = Vec::new();
1114            let mut errors = Vec::new();
1115
1116            // Phase 1: Unlink all prev_from × prev_to
1117            for prev_from in prev_from_ids {
1118                for prev_to in prev_to_ids {
1119                    let rows = tx.execute(
1120                        "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1121                        params![prev_from, prev_to, dep_type],
1122                    )?;
1123                    if rows > 0 {
1124                        unlinked.push((prev_from.clone(), prev_to.clone()));
1125                    }
1126                }
1127            }
1128
1129            // Phase 2: Link all from × to (with validation)
1130            for from_id in from_ids {
1131                for to_id in to_ids {
1132                    // For vertical deps, check single-parent constraint
1133                    if is_vertical {
1134                        let existing_parent: Option<String> = tx.query_row(
1135                            "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1136                            params![to_id],
1137                            |row| row.get(0),
1138                        ).optional()?;
1139
1140                        if let Some(ref parent) = existing_parent
1141                            && parent != from_id {
1142                                errors.push(format!(
1143                                    "Task {} already has parent {}",
1144                                    to_id, parent
1145                                ));
1146                                continue;
1147                            }
1148                    }
1149
1150                    // Check for cycles using temporary view within transaction
1151                    if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1152                        errors.push(format!(
1153                            "Adding dependency {}→{} would create a cycle",
1154                            from_id, to_id
1155                        ));
1156                        continue;
1157                    }
1158
1159                    tx.execute(
1160                        "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1161                        params![from_id, to_id, dep_type],
1162                    )?;
1163                    linked.push((from_id.clone(), to_id.clone()));
1164                }
1165            }
1166
1167            if !errors.is_empty() {
1168                // Rollback on validation errors
1169                tx.rollback()?;
1170                return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1171            }
1172
1173            tx.commit()?;
1174            Ok(RelinkResult { unlinked, linked })
1175        })
1176    }
1177
1178    // ============================================================================
1179    // Graph Traversal Methods for scan tool
1180    // ============================================================================
1181
1182    /// Get predecessors (tasks that block this task) via blocks/follows dependencies.
1183    /// depth: 0 = none, N = N levels, -1 = all
1184    pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1185        if depth == 0 {
1186            return Ok(vec![]);
1187        }
1188
1189        self.with_conn(|conn| {
1190            let mut visited: HashSet<String> = HashSet::new();
1191            let mut result: Vec<Task> = Vec::new();
1192            let mut current_level: Vec<String> = vec![task_id.to_string()];
1193            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1194
1195            while !current_level.is_empty() && levels_remaining > 0 {
1196                let mut next_level: Vec<String> = Vec::new();
1197
1198                for tid in &current_level {
1199                    // Get tasks that block this one (from_task_id blocks to_task_id)
1200                    let mut stmt = conn.prepare(
1201                        "SELECT DISTINCT d.from_task_id FROM dependencies d
1202                         WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1203                    )?;
1204
1205                    let predecessors: Vec<String> = stmt
1206                        .query_map(params![tid], |row| row.get(0))?
1207                        .filter_map(|r| r.ok())
1208                        .collect();
1209
1210                    for pred_id in predecessors {
1211                        if !visited.contains(&pred_id) {
1212                            visited.insert(pred_id.clone());
1213                            if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1214                                result.push(task);
1215                            }
1216                            next_level.push(pred_id);
1217                        }
1218                    }
1219                }
1220
1221                current_level = next_level;
1222                levels_remaining -= 1;
1223            }
1224
1225            Ok(result)
1226        })
1227    }
1228
1229    /// Get successors (tasks that this task blocks) via blocks/follows dependencies.
1230    /// depth: 0 = none, N = N levels, -1 = all
1231    pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1232        if depth == 0 {
1233            return Ok(vec![]);
1234        }
1235
1236        self.with_conn(|conn| {
1237            let mut visited: HashSet<String> = HashSet::new();
1238            let mut result: Vec<Task> = Vec::new();
1239            let mut current_level: Vec<String> = vec![task_id.to_string()];
1240            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1241
1242            while !current_level.is_empty() && levels_remaining > 0 {
1243                let mut next_level: Vec<String> = Vec::new();
1244
1245                for tid in &current_level {
1246                    // Get tasks that this one blocks (from_task_id blocks to_task_id)
1247                    let mut stmt = conn.prepare(
1248                        "SELECT DISTINCT d.to_task_id FROM dependencies d
1249                         WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1250                    )?;
1251
1252                    let successors: Vec<String> = stmt
1253                        .query_map(params![tid], |row| row.get(0))?
1254                        .filter_map(|r| r.ok())
1255                        .collect();
1256
1257                    for succ_id in successors {
1258                        if !visited.contains(&succ_id) {
1259                            visited.insert(succ_id.clone());
1260                            if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1261                                result.push(task);
1262                            }
1263                            next_level.push(succ_id);
1264                        }
1265                    }
1266                }
1267
1268                current_level = next_level;
1269                levels_remaining -= 1;
1270            }
1271
1272            Ok(result)
1273        })
1274    }
1275
1276    /// Get ancestors (parent chain) via contains dependency.
1277    /// depth: 0 = none, N = N levels up, -1 = all
1278    pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1279        if depth == 0 {
1280            return Ok(vec![]);
1281        }
1282
1283        self.with_conn(|conn| {
1284            let mut result: Vec<Task> = Vec::new();
1285            let mut current_id = task_id.to_string();
1286            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1287
1288            while levels_remaining > 0 {
1289                // Get parent (from_task_id contains to_task_id)
1290                let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1291                    "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1292                    params![&current_id],
1293                    |row| row.get(0),
1294                );
1295
1296                match parent_result {
1297                    Ok(parent_id) => {
1298                        if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1299                            result.push(task);
1300                        }
1301                        current_id = parent_id;
1302                        levels_remaining -= 1;
1303                    }
1304                    Err(rusqlite::Error::QueryReturnedNoRows) => break,
1305                    Err(e) => return Err(e.into()),
1306                }
1307            }
1308
1309            Ok(result)
1310        })
1311    }
1312
1313    /// Get descendants (children tree) via contains dependency.
1314    /// depth: 0 = none, N = N levels down, -1 = all
1315    pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1316        if depth == 0 {
1317            return Ok(vec![]);
1318        }
1319
1320        self.with_conn(|conn| {
1321            let mut visited: HashSet<String> = HashSet::new();
1322            let mut result: Vec<Task> = Vec::new();
1323            let mut current_level: Vec<String> = vec![task_id.to_string()];
1324            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1325
1326            while !current_level.is_empty() && levels_remaining > 0 {
1327                let mut next_level: Vec<String> = Vec::new();
1328
1329                for tid in &current_level {
1330                    // Get children (from_task_id contains to_task_id)
1331                    let mut stmt = conn.prepare(
1332                        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1333                    )?;
1334
1335                    let children: Vec<String> = stmt
1336                        .query_map(params![tid], |row| row.get(0))?
1337                        .filter_map(|r| r.ok())
1338                        .collect();
1339
1340                    for child_id in children {
1341                        if !visited.contains(&child_id) {
1342                            visited.insert(child_id.clone());
1343                            if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1344                                result.push(task);
1345                            }
1346                            next_level.push(child_id);
1347                        }
1348                    }
1349                }
1350
1351                current_level = next_level;
1352                levels_remaining -= 1;
1353            }
1354
1355            Ok(result)
1356        })
1357    }
1358}
1359
1360/// Helper to get a task by ID within a connection context.
1361fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1362    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1363    let task = stmt
1364        .query_row(params![task_id], super::tasks::parse_task_row)
1365        .optional()?;
1366    Ok(task)
1367}
1368
1369/// Get the IDs of tasks that block a given task from starting (unsatisfied dependencies).
1370/// A task blocks starting if it has a start-blocking dependency type and is in a blocking state.
1371/// This is a transaction-safe version for use within existing transactions.
1372pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1373    conn: &Connection,
1374    task_id: &str,
1375    states_config: &StatesConfig,
1376    deps_config: &DependenciesConfig,
1377) -> Result<Vec<String>> {
1378    let start_blocking_types = deps_config.start_blocking_types();
1379    if start_blocking_types.is_empty() {
1380        return Ok(vec![]);
1381    }
1382
1383    // Build IN clause from blocking_states
1384    let state_placeholders: Vec<String> = states_config
1385        .blocking_states
1386        .iter()
1387        .enumerate()
1388        .map(|(i, _)| format!("?{}", i + 2))
1389        .collect();
1390    let state_clause = state_placeholders.join(", ");
1391
1392    // Build IN clause from types
1393    let type_start = states_config.blocking_states.len() + 2;
1394    let type_placeholders: Vec<String> = start_blocking_types
1395        .iter()
1396        .enumerate()
1397        .map(|(i, _)| format!("?{}", type_start + i))
1398        .collect();
1399    let type_clause = type_placeholders.join(", ");
1400
1401    let sql = format!(
1402        "SELECT blocker.id FROM dependencies d
1403         INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1404         WHERE d.to_task_id = ?1 
1405         AND d.dep_type IN ({})
1406         AND blocker.status IN ({})",
1407        type_clause, state_clause
1408    );
1409
1410    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1411    params_vec.push(Box::new(task_id.to_string()));
1412    for state in &states_config.blocking_states {
1413        params_vec.push(Box::new(state.clone()));
1414    }
1415    for t in &start_blocking_types {
1416        params_vec.push(Box::new(t.to_string()));
1417    }
1418    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1419
1420    let mut stmt = conn.prepare(&sql)?;
1421    let blockers = stmt
1422        .query_map(params_refs.as_slice(), |row| {
1423            let id: String = row.get(0)?;
1424            Ok(id)
1425        })?
1426        .filter_map(|r| r.ok())
1427        .collect();
1428
1429    Ok(blockers)
1430}
1431
1432/// Propagate unblock effects when a task transitions out of a blocking state.
1433/// This is called after a task completes to find newly unblocked tasks and
1434/// optionally auto-advance them.
1435///
1436/// Returns (unblocked, auto_advanced):
1437/// - unblocked: All task IDs that are now ready (all dependencies satisfied)
1438/// - auto_advanced: Subset of unblocked that were actually transitioned (when enabled)
1439///
1440/// Algorithm:
1441/// 1. Find all tasks that have a start-blocking dependency on the completed task
1442/// 2. For each candidate:
1443///    - Skip if not in initial state or already claimed
1444///    - Check if ALL other start-blockers are also satisfied
1445///    - If fully unblocked → add to unblocked list
1446///    - If auto_advance enabled → also transition to target_state
1447/// 3. Return both lists
1448pub(crate) fn propagate_unblock_effects(
1449    conn: &Connection,
1450    completed_task_id: &str,
1451    agent_id: Option<&str>,
1452    states_config: &StatesConfig,
1453    deps_config: &DependenciesConfig,
1454    auto_advance: &AutoAdvanceConfig,
1455) -> Result<(Vec<String>, Vec<String>)> {
1456    // Get start-blocking dependency types
1457    let start_blocking_types = deps_config.start_blocking_types();
1458    if start_blocking_types.is_empty() {
1459        return Ok((vec![], vec![]));
1460    }
1461
1462    // Find all tasks that depend on the completed task via start-blocking dependencies
1463    let type_placeholders: Vec<String> = start_blocking_types
1464        .iter()
1465        .enumerate()
1466        .map(|(i, _)| format!("?{}", i + 2))
1467        .collect();
1468    let type_clause = type_placeholders.join(", ");
1469
1470    let sql = format!(
1471        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1472        type_clause
1473    );
1474
1475    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1476    params_vec.push(Box::new(completed_task_id.to_string()));
1477    for t in &start_blocking_types {
1478        params_vec.push(Box::new(t.to_string()));
1479    }
1480    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1481
1482    let mut stmt = conn.prepare(&sql)?;
1483    let dependent_task_ids: Vec<String> = stmt
1484        .query_map(params_refs.as_slice(), |row| row.get(0))?
1485        .filter_map(|r| r.ok())
1486        .collect();
1487
1488    let mut unblocked = Vec::new();
1489    let mut auto_advanced = Vec::new();
1490    let now = super::now_ms();
1491
1492    // Determine if we should auto-advance
1493    let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1494    let target_state = auto_advance.target_state.clone();
1495
1496    // Validate target state if auto-advance is enabled
1497    if should_auto_advance {
1498        let ts = target_state.as_ref().unwrap();
1499        if !states_config.is_valid_state(ts) {
1500            return Err(anyhow!(
1501                "Auto-advance target state '{}' is not a valid state",
1502                ts
1503            ));
1504        }
1505    }
1506
1507    for task_id in dependent_task_ids {
1508        // Get the task
1509        let task = match get_task_by_id_internal(conn, &task_id)? {
1510            Some(t) => t,
1511            None => continue,
1512        };
1513
1514        // Skip if not in initial state
1515        if task.status != states_config.initial {
1516            continue;
1517        }
1518
1519        // Skip if task is already claimed
1520        if task.worker_id.is_some() {
1521            continue;
1522        }
1523
1524        // Check if ALL start-blockers are now satisfied (not in blocking states)
1525        // Build query to count remaining blockers that are still blocking
1526        let state_placeholders: Vec<String> = states_config
1527            .blocking_states
1528            .iter()
1529            .enumerate()
1530            .map(|(i, _)| format!("?{}", i + 3))
1531            .collect();
1532        let state_clause = state_placeholders.join(", ");
1533
1534        // Reuse type_placeholders from above
1535        let type_start = states_config.blocking_states.len() + 3;
1536        let type_placeholders2: Vec<String> = start_blocking_types
1537            .iter()
1538            .enumerate()
1539            .map(|(i, _)| format!("?{}", type_start + i))
1540            .collect();
1541        let type_clause2 = type_placeholders2.join(", ");
1542
1543        let blocker_sql = format!(
1544            "SELECT COUNT(*) FROM dependencies d
1545             INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1546             WHERE d.to_task_id = ?1
1547             AND d.from_task_id != ?2
1548             AND d.dep_type IN ({})
1549             AND blocker.status IN ({})",
1550            type_clause2, state_clause
1551        );
1552
1553        let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1554        blocker_params.push(Box::new(task_id.clone()));
1555        blocker_params.push(Box::new(completed_task_id.to_string()));
1556        for state in &states_config.blocking_states {
1557            blocker_params.push(Box::new(state.clone()));
1558        }
1559        for t in &start_blocking_types {
1560            blocker_params.push(Box::new(t.to_string()));
1561        }
1562        let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1563            blocker_params.iter().map(|b| b.as_ref()).collect();
1564
1565        let remaining_blockers: i32 =
1566            conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1567
1568        if remaining_blockers > 0 {
1569            continue; // Still blocked by other tasks
1570        }
1571
1572        // Task is now fully unblocked - add to unblocked list
1573        unblocked.push(task_id.clone());
1574
1575        // Auto-advance if enabled and transition is valid
1576        if should_auto_advance {
1577            let ts = target_state.as_ref().unwrap();
1578
1579            // Validate transition from initial to target_state
1580            if !states_config.is_valid_transition(&states_config.initial, ts) {
1581                // Skip auto-advance for this task - transition not allowed
1582                continue;
1583            }
1584
1585            // Auto-advance: update the task's status
1586            conn.execute(
1587                "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1588                params![ts, now, &task_id],
1589            )?;
1590
1591            // Record state transition
1592            let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1593            super::state_transitions::record_state_transition(
1594                conn,
1595                &task_id,
1596                ts,
1597                agent_id,
1598                Some(&reason),
1599                states_config,
1600            )?;
1601
1602            auto_advanced.push(task_id);
1603        }
1604    }
1605
1606    Ok((unblocked, auto_advanced))
1607}