Skip to main content

task_graph_mcp/db/
deps.rs

1//! Dependency operations and cycle detection with typed dependencies.
2
3use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10/// Result of a relink operation.
11#[derive(Debug)]
12pub struct RelinkResult {
13    /// Dependencies that were unlinked (from, to).
14    pub unlinked: Vec<(String, String)>,
15    /// Dependencies that were linked (from, to).
16    pub linked: Vec<(String, String)>,
17}
18
19/// Check if adding a dependency would create a cycle (transaction-safe version).
20fn would_create_cycle_in_tx(
21    tx: &rusqlite::Transaction,
22    from_task_id: &str,
23    to_task_id: &str,
24    dep_type: &str,
25    deps_config: &DependenciesConfig,
26) -> Result<bool> {
27    let def = deps_config
28        .get_definition(dep_type)
29        .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31    // A cycle would occur if to_task can already reach from_task
32    // through the same "graph" (horizontal or vertical)
33    let mut visited: HashSet<String> = HashSet::new();
34    let mut queue: VecDeque<String> = VecDeque::new();
35    queue.push_back(to_task_id.to_string());
36
37    while let Some(current) = queue.pop_front() {
38        if current == from_task_id {
39            return Ok(true); // Would create a cycle
40        }
41
42        if visited.contains(&current) {
43            continue;
44        }
45        visited.insert(current.clone());
46
47        // Get all tasks that current points to (in the relevant graph)
48        let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49            // For vertical deps, only check containment relationships
50            let mut stmt = tx.prepare(
51                "SELECT to_task_id FROM dependencies d
52                 JOIN (SELECT value FROM json_each(?1)) types
53                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54            )?;
55            let vertical_types: Vec<&str> = deps_config.vertical_types();
56            let types_json = serde_json::to_string(&vertical_types)?;
57            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
58                .filter_map(|r| r.ok())
59                .collect()
60        } else {
61            // For horizontal deps, check all start-blocking relationships
62            let mut stmt = tx.prepare(
63                "SELECT to_task_id FROM dependencies d
64                 JOIN (SELECT value FROM json_each(?1)) types
65                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66            )?;
67            let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68            let types_json = serde_json::to_string(&start_blocking)?;
69            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
70                .filter_map(|r| r.ok())
71                .collect()
72        };
73
74        for dep in deps {
75            if !visited.contains(&dep) {
76                queue.push_back(dep);
77            }
78        }
79    }
80
81    Ok(false)
82}
83
84/// Build an ORDER BY clause from sort_by and sort_order parameters.
85/// Returns a safe SQL ORDER BY expression.
86fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87    let field = match sort_by {
88        Some("priority") => "CAST(t.priority AS INTEGER)",
89        Some("created_at") => "t.created_at",
90        Some("updated_at") => "t.updated_at",
91        _ => "t.created_at", // default
92    };
93
94    let order = match sort_order {
95        Some("asc") => "ASC",
96        Some("desc") => "DESC",
97        _ => {
98            // Default: priority is descending (higher number = more important), dates are descending
99            "DESC"
100        }
101    };
102
103    format!("{} {}", field, order)
104}
105
106/// Result of attempting to add a dependency.
107#[derive(Debug)]
108pub enum AddDependencyResult {
109    /// Dependency was created successfully.
110    Created,
111    /// Dependency already existed (no change).
112    AlreadyExists,
113    /// Source task does not exist.
114    FromTaskNotFound,
115    /// Target task does not exist.
116    ToTaskNotFound,
117}
118
119impl Database {
120    /// Check if a task exists by ID.
121    pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122        self.with_conn(|conn| {
123            let count: i64 = conn.query_row(
124                "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125                params![task_id],
126                |row| row.get(0),
127            )?;
128            Ok(count > 0)
129        })
130    }
131
132    /// Add a typed dependency (from blocks/contains to).
133    /// Returns Ok(()) on success. For soft linking with warnings, use add_dependency_soft.
134    pub fn add_dependency(
135        &self,
136        from_task_id: &str,
137        to_task_id: &str,
138        dep_type: &str,
139        deps_config: &DependenciesConfig,
140    ) -> Result<()> {
141        match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142            AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143            AddDependencyResult::FromTaskNotFound => {
144                Err(anyhow!("Source task '{}' not found", from_task_id))
145            }
146            AddDependencyResult::ToTaskNotFound => {
147                Err(anyhow!("Target task '{}' not found", to_task_id))
148            }
149        }
150    }
151
152    /// Add a typed dependency with soft failure (returns result status instead of error).
153    /// Use this when you want to handle missing tasks as warnings.
154    pub fn add_dependency_soft(
155        &self,
156        from_task_id: &str,
157        to_task_id: &str,
158        dep_type: &str,
159        deps_config: &DependenciesConfig,
160    ) -> Result<AddDependencyResult> {
161        // Validate dependency type
162        if !deps_config.is_valid_dep_type(dep_type) {
163            return Err(anyhow!(
164                "Invalid dependency type '{}'. Valid types: {:?}",
165                dep_type,
166                deps_config.dep_type_names()
167            ));
168        }
169
170        // Check if tasks exist first
171        if !self.task_exists(from_task_id)? {
172            return Ok(AddDependencyResult::FromTaskNotFound);
173        }
174        if !self.task_exists(to_task_id)? {
175            return Ok(AddDependencyResult::ToTaskNotFound);
176        }
177
178        // For vertical (contains) dependencies, check single-parent constraint
179        let def = deps_config
180            .get_definition(dep_type)
181            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182        if def.display == DependencyDisplay::Vertical
183            && let Some(existing_parent) = self.get_parent(to_task_id)?
184            && existing_parent != from_task_id
185        {
186            return Err(anyhow!(
187                "Task {} already has parent {}",
188                to_task_id,
189                existing_parent
190            ));
191        }
192
193        // Check for cycle in the appropriate graph
194        if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195            return Err(anyhow!("Adding this dependency would create a cycle"));
196        }
197
198        self.with_conn(|conn| {
199            let changes = conn.execute(
200                "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201                params![from_task_id, to_task_id, dep_type],
202            )?;
203            if changes == 0 {
204                Ok(AddDependencyResult::AlreadyExists)
205            } else {
206                Ok(AddDependencyResult::Created)
207            }
208        })
209    }
210
211    /// Check if adding a dependency would create a cycle.
212    /// For horizontal deps: check cycle in the start-blocking graph.
213    /// For vertical deps: check containment cycle.
214    pub fn would_create_cycle(
215        &self,
216        from_task_id: &str,
217        to_task_id: &str,
218        dep_type: &str,
219        deps_config: &DependenciesConfig,
220    ) -> Result<bool> {
221        let def = deps_config
222            .get_definition(dep_type)
223            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225        self.with_conn(|conn| {
226            // A cycle would occur if to_task can already reach from_task
227            // through the same "graph" (horizontal or vertical)
228            let mut visited: HashSet<String> = HashSet::new();
229            let mut queue: VecDeque<String> = VecDeque::new();
230            queue.push_back(to_task_id.to_string());
231
232            while let Some(current) = queue.pop_front() {
233                if current == from_task_id {
234                    return Ok(true); // Would create a cycle
235                }
236
237                if visited.contains(&current) {
238                    continue;
239                }
240                visited.insert(current.clone());
241
242                // Get all tasks that current points to (in the relevant graph)
243                let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244                    // For vertical deps, only check containment relationships
245                    let mut stmt = conn.prepare(
246                        "SELECT to_task_id FROM dependencies d
247                         JOIN (SELECT value FROM json_each(?1)) types
248                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249                    )?;
250                    let vertical_types: Vec<&str> = deps_config.vertical_types();
251                    let types_json = serde_json::to_string(&vertical_types)?;
252                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
253                        .filter_map(|r| r.ok())
254                        .collect()
255                } else {
256                    // For horizontal deps, check all start-blocking relationships
257                    let mut stmt = conn.prepare(
258                        "SELECT to_task_id FROM dependencies d
259                         JOIN (SELECT value FROM json_each(?1)) types
260                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261                    )?;
262                    let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263                    let types_json = serde_json::to_string(&start_blocking)?;
264                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
265                        .filter_map(|r| r.ok())
266                        .collect()
267                };
268
269                for dep in deps {
270                    if !visited.contains(&dep) {
271                        queue.push_back(dep);
272                    }
273                }
274            }
275
276            Ok(false)
277        })
278    }
279
280    /// Remove a typed dependency. Returns true if a row was deleted.
281    pub fn remove_dependency(
282        &self,
283        from_task_id: &str,
284        to_task_id: &str,
285        dep_type: &str,
286    ) -> Result<bool> {
287        self.with_conn(|conn| {
288            let rows = conn.execute(
289                "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290                params![from_task_id, to_task_id, dep_type],
291            )?;
292            Ok(rows > 0)
293        })
294    }
295
296    /// Remove all dependencies of a given type from a task (outgoing edges).
297    /// Returns the list of removed dependencies.
298    pub fn remove_all_outgoing_dependencies(
299        &self,
300        from_task_id: &str,
301        dep_type: &str,
302    ) -> Result<Vec<Dependency>> {
303        self.with_conn_mut(|conn| {
304            let tx = conn.transaction()?;
305
306            // First get the dependencies that will be removed
307            let deps: Vec<Dependency> = {
308                let mut stmt = tx.prepare(
309                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310                )?;
311                stmt
312                    .query_map(params![from_task_id, dep_type], |row| {
313                        Ok(Dependency {
314                            from_task_id: row.get(0)?,
315                            to_task_id: row.get(1)?,
316                            dep_type: row.get(2)?,
317                        })
318                    })?
319                    .filter_map(|r| r.ok())
320                    .collect()
321            };
322
323            // Then delete them
324            tx.execute(
325                "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326                params![from_task_id, dep_type],
327            )?;
328
329            tx.commit()?;
330            Ok(deps)
331        })
332    }
333
334    /// Remove all dependencies of a given type to a task (incoming edges).
335    /// Returns the list of removed dependencies.
336    pub fn remove_all_incoming_dependencies(
337        &self,
338        to_task_id: &str,
339        dep_type: &str,
340    ) -> Result<Vec<Dependency>> {
341        self.with_conn_mut(|conn| {
342            let tx = conn.transaction()?;
343
344            // First get the dependencies that will be removed
345            let deps: Vec<Dependency> = {
346                let mut stmt = tx.prepare(
347                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348                )?;
349                stmt
350                    .query_map(params![to_task_id, dep_type], |row| {
351                        Ok(Dependency {
352                            from_task_id: row.get(0)?,
353                            to_task_id: row.get(1)?,
354                            dep_type: row.get(2)?,
355                        })
356                    })?
357                    .filter_map(|r| r.ok())
358                    .collect()
359            };
360
361            // Then delete them
362            tx.execute(
363                "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364                params![to_task_id, dep_type],
365            )?;
366
367            tx.commit()?;
368            Ok(deps)
369        })
370    }
371
372    /// Get all dependencies.
373    pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374        self.with_conn(|conn| {
375            let mut stmt =
376                conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378            let deps = stmt
379                .query_map([], |row| {
380                    let from: String = row.get(0)?;
381                    let to: String = row.get(1)?;
382                    let dep_type: String = row.get(2)?;
383                    Ok(Dependency {
384                        from_task_id: from,
385                        to_task_id: to,
386                        dep_type,
387                    })
388                })?
389                .filter_map(|r| r.ok())
390                .collect();
391
392            Ok(deps)
393        })
394    }
395
396    /// Get dependencies of a specific type for a task.
397    pub fn get_dependencies_by_type(
398        &self,
399        task_id: &str,
400        dep_type: &str,
401        direction: &str,
402    ) -> Result<Vec<Dependency>> {
403        self.with_conn(|conn| {
404            let sql = if direction == "incoming" {
405                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406            } else {
407                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408            };
409
410            let mut stmt = conn.prepare(sql)?;
411
412            let deps = stmt
413                .query_map(params![task_id, dep_type], |row| {
414                    let from: String = row.get(0)?;
415                    let to: String = row.get(1)?;
416                    let dep_type: String = row.get(2)?;
417                    Ok(Dependency {
418                        from_task_id: from,
419                        to_task_id: to,
420                        dep_type,
421                    })
422                })?
423                .filter_map(|r| r.ok())
424                .collect();
425
426            Ok(deps)
427        })
428    }
429
430    /// Get tasks that block a given task from starting (dep_type with blocks: start).
431    pub fn get_start_blockers(
432        &self,
433        task_id: &str,
434        deps_config: &DependenciesConfig,
435    ) -> Result<Vec<String>> {
436        let start_blocking_types = deps_config.start_blocking_types();
437        if start_blocking_types.is_empty() {
438            return Ok(vec![]);
439        }
440
441        self.with_conn(|conn| {
442            let placeholders: String = start_blocking_types
443                .iter()
444                .enumerate()
445                .map(|(i, _)| format!("?{}", i + 2))
446                .collect::<Vec<_>>()
447                .join(", ");
448
449            let sql = format!(
450                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451                placeholders
452            );
453
454            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455            params_vec.push(Box::new(task_id.to_string()));
456            for t in &start_blocking_types {
457                params_vec.push(Box::new(t.to_string()));
458            }
459            let params_refs: Vec<&dyn rusqlite::ToSql> =
460                params_vec.iter().map(|b| b.as_ref()).collect();
461
462            let mut stmt = conn.prepare(&sql)?;
463            let blockers = stmt
464                .query_map(params_refs.as_slice(), |row| {
465                    let id: String = row.get(0)?;
466                    Ok(id)
467                })?
468                .filter_map(|r| r.ok())
469                .collect();
470
471            Ok(blockers)
472        })
473    }
474
475    /// Get tasks that block a given task from completing (dep_type with blocks: completion).
476    /// For a parent task, this returns children that must complete first.
477    pub fn get_completion_blockers(
478        &self,
479        task_id: &str,
480        deps_config: &DependenciesConfig,
481    ) -> Result<Vec<String>> {
482        let completion_blocking_types = deps_config.completion_blocking_types();
483        if completion_blocking_types.is_empty() {
484            return Ok(vec![]);
485        }
486
487        self.with_conn(|conn| {
488            let placeholders: String = completion_blocking_types
489                .iter()
490                .enumerate()
491                .map(|(i, _)| format!("?{}", i + 2))
492                .collect::<Vec<_>>()
493                .join(", ");
494
495            // For completion blockers, we look at outgoing edges (from_task_id = parent)
496            // because "contains" means parent -> child, and child blocks parent completion
497            let sql = format!(
498                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499                placeholders
500            );
501
502            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503            params_vec.push(Box::new(task_id.to_string()));
504            for t in &completion_blocking_types {
505                params_vec.push(Box::new(t.to_string()));
506            }
507            let params_refs: Vec<&dyn rusqlite::ToSql> =
508                params_vec.iter().map(|b| b.as_ref()).collect();
509
510            let mut stmt = conn.prepare(&sql)?;
511            let blockers = stmt
512                .query_map(params_refs.as_slice(), |row| {
513                    let id: String = row.get(0)?;
514                    Ok(id)
515                })?
516                .filter_map(|r| r.ok())
517                .collect();
518
519            Ok(blockers)
520        })
521    }
522
523    /// Get the parent of a task (via 'contains' dependency).
524    pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525        self.with_conn(|conn| {
526            let result: Result<String, rusqlite::Error> = conn.query_row(
527                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528                params![task_id],
529                |row| row.get(0),
530            );
531
532            match result {
533                Ok(parent_id) => Ok(Some(parent_id)),
534                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535                Err(e) => Err(e.into()),
536            }
537        })
538    }
539
540    /// Get children of a task (via 'contains' dependency).
541    pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542        self.with_conn(|conn| {
543            let mut stmt = conn.prepare(
544                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545            )?;
546
547            let children = stmt
548                .query_map(params![task_id], |row| {
549                    let id: String = row.get(0)?;
550                    Ok(id)
551                })?
552                .filter_map(|r| r.ok())
553                .collect();
554
555            Ok(children)
556        })
557    }
558
559    /// Get all tasks that block a given task (backwards compatible).
560    /// Returns tasks from both 'blocks' and 'follows' dependencies.
561    pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562        self.with_conn(|conn| {
563            let mut stmt = conn.prepare(
564                "SELECT from_task_id FROM dependencies 
565                 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566            )?;
567
568            let blockers = stmt
569                .query_map(params![task_id], |row| {
570                    let id: String = row.get(0)?;
571                    Ok(id)
572                })?
573                .filter_map(|r| r.ok())
574                .collect();
575
576            Ok(blockers)
577        })
578    }
579
580    /// Get tasks that a given task blocks.
581    #[allow(dead_code)]
582    pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
583        self.with_conn(|conn| {
584            let mut stmt = conn.prepare(
585                "SELECT to_task_id FROM dependencies 
586                 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
587            )?;
588
589            let blocking = stmt
590                .query_map(params![task_id], |row| {
591                    let id: String = row.get(0)?;
592                    Ok(id)
593                })?
594                .filter_map(|r| r.ok())
595                .collect();
596
597            Ok(blocking)
598        })
599    }
600
601    /// Get tasks that are blocked by incomplete start dependencies.
602    /// A task is blocked if any of its start-blocking dependencies are in a blocking state.
603    /// Excludes soft-deleted tasks.
604    pub fn get_blocked_tasks(
605        &self,
606        states_config: &StatesConfig,
607        deps_config: &DependenciesConfig,
608        sort_by: Option<&str>,
609        sort_order: Option<&str>,
610    ) -> Result<Vec<Task>> {
611        let start_blocking_types = deps_config.start_blocking_types();
612        if start_blocking_types.is_empty() {
613            return Ok(vec![]);
614        }
615
616        self.with_conn(|conn| {
617            // Build IN clause from blocking_states
618            let state_placeholders: Vec<String> = states_config
619                .blocking_states
620                .iter()
621                .enumerate()
622                .map(|(i, _)| format!("?{}", i + 2))
623                .collect();
624            let state_clause = state_placeholders.join(", ");
625
626            // Build IN clause from start_blocking_types
627            let type_start = states_config.blocking_states.len() + 2;
628            let type_placeholders: Vec<String> = start_blocking_types
629                .iter()
630                .enumerate()
631                .map(|(i, _)| format!("?{}", type_start + i))
632                .collect();
633            let type_clause = type_placeholders.join(", ");
634
635            // Build ORDER BY clause
636            let order_clause = build_order_clause(sort_by, sort_order);
637
638            let sql = format!(
639                "SELECT DISTINCT t.*
640                 FROM tasks t
641                 INNER JOIN dependencies d ON t.id = d.to_task_id
642                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
643                 WHERE d.dep_type IN ({})
644                 AND blocker.status IN ({})
645                 AND t.status = ?1
646                 AND t.deleted_at IS NULL
647                 ORDER BY {}",
648                type_clause, state_clause, order_clause
649            );
650
651            let mut stmt = conn.prepare(&sql)?;
652
653            // Build params: initial state + blocking states + start_blocking_types
654            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
655            params_vec.push(Box::new(states_config.initial.clone()));
656            for state in &states_config.blocking_states {
657                params_vec.push(Box::new(state.clone()));
658            }
659            for t in &start_blocking_types {
660                params_vec.push(Box::new(t.to_string()));
661            }
662            let params_refs: Vec<&dyn rusqlite::ToSql> =
663                params_vec.iter().map(|b| b.as_ref()).collect();
664
665            let tasks = stmt
666                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
667                .filter_map(|r| r.ok())
668                .collect();
669
670            Ok(tasks)
671        })
672    }
673
674    /// Get tasks that are ready to be claimed (all start dependencies satisfied).
675    /// A task is ready if it's in the initial state, unclaimed, and all start-blocking deps are not blocking.
676    /// When agent_id is provided, also filters by agent's tag qualifications using junction tables.
677    /// Excludes soft-deleted tasks.
678    pub fn get_ready_tasks(
679        &self,
680        agent_id: Option<&str>,
681        states_config: &StatesConfig,
682        deps_config: &DependenciesConfig,
683        sort_by: Option<&str>,
684        sort_order: Option<&str>,
685    ) -> Result<Vec<Task>> {
686        let start_blocking_types = deps_config.start_blocking_types();
687
688        // Get agent tags if agent_id is provided (for junction table filtering)
689        let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
690            Some(self.get_agent_tags(aid)?)
691        } else {
692            None
693        };
694
695        self.with_conn(|conn| {
696            // Build IN clause from blocking_states
697            let state_placeholders: Vec<String> = states_config
698                .blocking_states
699                .iter()
700                .enumerate()
701                .map(|(i, _)| format!("?{}", i + 2))
702                .collect();
703            let state_clause = state_placeholders.join(", ");
704
705            // Build IN clause from start_blocking_types
706            let type_start = states_config.blocking_states.len() + 2;
707            let type_placeholders: Vec<String> = start_blocking_types
708                .iter()
709                .enumerate()
710                .map(|(i, _)| format!("?{}", type_start + i))
711                .collect();
712            let type_clause = type_placeholders.join(", ");
713
714            // Build ORDER BY clause - for ready tasks, default is priority then created_at
715            let order_clause = if sort_by.is_some() {
716                build_order_clause(sort_by, sort_order)
717            } else {
718                // Default for ready: priority (high first), then created_at
719                "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
720            };
721
722            // Track param index for agent tag filters
723            let mut param_idx = type_start + start_blocking_types.len();
724
725            // Build agent qualification filters using junction tables
726            let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
727                // For agent_tags_all (AND): agent must have ALL needed tags
728                // Count how many of the task's needed_tags match agent's tags
729                // Either the task has no needed_tags, or all must match
730                let needed_placeholders: Vec<String> = tags
731                    .iter()
732                    .enumerate()
733                    .map(|(i, _)| format!("?{}", param_idx + i))
734                    .collect();
735                param_idx += tags.len();
736
737                let needed_clause = if needed_placeholders.is_empty() {
738                    // Agent has no tags - only match tasks with no needed_tags
739                    "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
740                        .to_string()
741                } else {
742                    // Task must have no needed_tags OR agent must have all of them
743                    format!(
744                        "AND (
745                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
746                            OR (
747                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
748                            ) = (
749                                SELECT COUNT(*) FROM task_needed_tags 
750                                WHERE task_id = t.id AND tag IN ({})
751                            )
752                        )",
753                        needed_placeholders.join(", ")
754                    )
755                };
756
757                // For agent_tags_any (OR): agent must have at least ONE wanted tag
758                let wanted_placeholders: Vec<String> = tags
759                    .iter()
760                    .enumerate()
761                    .map(|(i, _)| format!("?{}", param_idx + i))
762                    .collect();
763
764                let wanted_clause = if wanted_placeholders.is_empty() {
765                    // Agent has no tags - only match tasks with no wanted_tags
766                    "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
767                        .to_string()
768                } else {
769                    // Task must have no wanted_tags OR agent must have at least one
770                    format!(
771                        "AND (
772                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
773                            OR EXISTS (
774                                SELECT 1 FROM task_wanted_tags 
775                                WHERE task_id = t.id AND tag IN ({})
776                            )
777                        )",
778                        wanted_placeholders.join(", ")
779                    )
780                };
781
782                (needed_clause, wanted_clause)
783            } else {
784                (String::new(), String::new())
785            };
786
787            let sql = format!(
788                "SELECT t.*
789                 FROM tasks t
790                 WHERE t.status = ?1
791                 AND t.worker_id IS NULL
792                 AND t.deleted_at IS NULL
793                 AND NOT EXISTS (
794                     SELECT 1 FROM dependencies d
795                     INNER JOIN tasks blocker ON d.from_task_id = blocker.id
796                     WHERE d.to_task_id = t.id
797                     AND d.dep_type IN ({})
798                     AND blocker.status IN ({})
799                 )
800                 AND t.id NOT IN (
801                     SELECT from_task_id FROM dependencies
802                     WHERE dep_type = 'contains'
803                 )
804                 {}
805                 {}
806                 ORDER BY {}",
807                type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
808            );
809
810            let mut stmt = conn.prepare(&sql)?;
811
812            // Build params: initial state + blocking states + types + agent tags (twice if present)
813            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
814            params_vec.push(Box::new(states_config.initial.clone()));
815            for state in &states_config.blocking_states {
816                params_vec.push(Box::new(state.clone()));
817            }
818            for t in &start_blocking_types {
819                params_vec.push(Box::new(t.to_string()));
820            }
821            // Add agent tags twice (once for needed_tags check, once for wanted_tags check)
822            if let Some(ref tags) = agent_tags {
823                for tag in tags {
824                    params_vec.push(Box::new(tag.clone()));
825                }
826                for tag in tags {
827                    params_vec.push(Box::new(tag.clone()));
828                }
829            }
830            let params_refs: Vec<&dyn rusqlite::ToSql> =
831                params_vec.iter().map(|b| b.as_ref()).collect();
832
833            let tasks: Vec<Task> = stmt
834                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
835                .filter_map(|r| r.ok())
836                .collect();
837
838            Ok(tasks)
839        })
840    }
841
842    /// Check if a task has unmet start dependencies.
843    #[allow(dead_code)]
844    pub fn has_unmet_start_dependencies(
845        &self,
846        task_id: &str,
847        states_config: &StatesConfig,
848        deps_config: &DependenciesConfig,
849    ) -> Result<bool> {
850        let start_blocking_types = deps_config.start_blocking_types();
851        if start_blocking_types.is_empty() {
852            return Ok(false);
853        }
854
855        self.with_conn(|conn| {
856            // Build IN clause from blocking_states
857            let state_placeholders: Vec<String> = states_config
858                .blocking_states
859                .iter()
860                .enumerate()
861                .map(|(i, _)| format!("?{}", i + 2))
862                .collect();
863            let state_clause = state_placeholders.join(", ");
864
865            // Build IN clause from types
866            let type_start = states_config.blocking_states.len() + 2;
867            let type_placeholders: Vec<String> = start_blocking_types
868                .iter()
869                .enumerate()
870                .map(|(i, _)| format!("?{}", type_start + i))
871                .collect();
872            let type_clause = type_placeholders.join(", ");
873
874            let sql = format!(
875                "SELECT COUNT(*) FROM dependencies d
876                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
877                 WHERE d.to_task_id = ?1 
878                 AND d.dep_type IN ({})
879                 AND blocker.status IN ({})",
880                type_clause, state_clause
881            );
882
883            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
884            params_vec.push(Box::new(task_id.to_string()));
885            for state in &states_config.blocking_states {
886                params_vec.push(Box::new(state.clone()));
887            }
888            for t in &start_blocking_types {
889                params_vec.push(Box::new(t.to_string()));
890            }
891            let params_refs: Vec<&dyn rusqlite::ToSql> =
892                params_vec.iter().map(|b| b.as_ref()).collect();
893
894            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
895
896            Ok(count > 0)
897        })
898    }
899
900    /// Check if a task has incomplete children (blocking completion).
901    pub fn has_incomplete_children(
902        &self,
903        task_id: &str,
904        states_config: &StatesConfig,
905    ) -> Result<bool> {
906        self.with_conn(|conn| {
907            // Build IN clause from blocking_states
908            let state_placeholders: Vec<String> = states_config
909                .blocking_states
910                .iter()
911                .enumerate()
912                .map(|(i, _)| format!("?{}", i + 2))
913                .collect();
914            let state_clause = state_placeholders.join(", ");
915
916            let sql = format!(
917                "SELECT COUNT(*) FROM dependencies d
918                 INNER JOIN tasks child ON d.to_task_id = child.id
919                 WHERE d.from_task_id = ?1 
920                 AND d.dep_type = 'contains'
921                 AND child.status IN ({})",
922                state_clause
923            );
924
925            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
926            params_vec.push(Box::new(task_id.to_string()));
927            for state in &states_config.blocking_states {
928                params_vec.push(Box::new(state.clone()));
929            }
930            let params_refs: Vec<&dyn rusqlite::ToSql> =
931                params_vec.iter().map(|b| b.as_ref()).collect();
932
933            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
934
935            Ok(count > 0)
936        })
937    }
938
939    /// Get tasks with tag-based filtering using junction tables for indexed lookups.
940    /// - `tags_any`: Task must have at least one of these tags (OR)
941    /// - `tags_all`: Task must have all of these tags (AND)
942    /// - `qualified_for_agent_tags`: If provided, only return tasks where these tags satisfy the task's agent_tags_all/agent_tags_any
943    ///
944    /// Excludes soft-deleted tasks.
945    #[allow(clippy::too_many_arguments)]
946    pub fn list_tasks_with_tag_filters(
947        &self,
948        status: Option<Vec<String>>,
949        owner: Option<&str>,
950        parent_id: Option<Option<&str>>,
951        tags_any: Option<Vec<String>>,
952        tags_all: Option<Vec<String>>,
953        qualified_for_agent_tags: Option<Vec<String>>,
954        limit: Option<i32>,
955        offset: i32,
956        sort_by: Option<&str>,
957        sort_order: Option<&str>,
958    ) -> Result<Vec<Task>> {
959        self.with_conn(|conn| {
960            let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
961            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
962            let mut param_idx = 1;
963
964            // Status filter (can be single or multiple)
965            if let Some(ref statuses) = status {
966                if statuses.len() == 1 {
967                    sql.push_str(&format!(" AND t.status = ?{}", param_idx));
968                    params_vec.push(Box::new(statuses[0].clone()));
969                    param_idx += 1;
970                } else if statuses.len() > 1 {
971                    let placeholders: Vec<String> = statuses
972                        .iter()
973                        .enumerate()
974                        .map(|(i, _)| format!("?{}", param_idx + i))
975                        .collect();
976                    sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
977                    for s in statuses {
978                        params_vec.push(Box::new(s.clone()));
979                    }
980                    param_idx += statuses.len();
981                }
982            }
983
984            // Owner filter
985            if let Some(o) = owner {
986                sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
987                params_vec.push(Box::new(o.to_string()));
988                param_idx += 1;
989            }
990
991            // Parent filter via dependencies table
992            if let Some(p) = parent_id {
993                match p {
994                    Some(pid) => {
995                        sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
996                        params_vec.push(Box::new(pid.to_string()));
997                        param_idx += 1;
998                    }
999                    None => {
1000                        // Root tasks: not contained by any other task
1001                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1002                    }
1003                }
1004            }
1005
1006            // tags_any: Task must have at least one of these tags (OR) - uses task_tags junction
1007            if let Some(ref any_tags) = tags_any
1008                && !any_tags.is_empty() {
1009                    let placeholders: Vec<String> = any_tags
1010                        .iter()
1011                        .enumerate()
1012                        .map(|(i, _)| format!("?{}", param_idx + i))
1013                        .collect();
1014                    sql.push_str(&format!(
1015                        " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1016                        placeholders.join(", ")
1017                    ));
1018                    for tag in any_tags {
1019                        params_vec.push(Box::new(tag.clone()));
1020                    }
1021                    param_idx += any_tags.len();
1022                }
1023
1024            // tags_all: Task must have all of these tags (AND) - uses task_tags junction
1025            if let Some(ref all_tags) = tags_all
1026                && !all_tags.is_empty() {
1027                    let placeholders: Vec<String> = all_tags
1028                        .iter()
1029                        .enumerate()
1030                        .map(|(i, _)| format!("?{}", param_idx + i))
1031                        .collect();
1032                    // Count matching tags must equal total required tags
1033                    sql.push_str(&format!(
1034                        " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1035                        placeholders.join(", "),
1036                        all_tags.len()
1037                    ));
1038                    for tag in all_tags {
1039                        params_vec.push(Box::new(tag.clone()));
1040                    }
1041                    param_idx += all_tags.len();
1042                }
1043
1044            // qualified_for: Agent's tags must satisfy task's requirements - uses junction tables
1045            if let Some(ref agent_tags) = qualified_for_agent_tags {
1046                // Agent must have ALL of task's agent_tags_all
1047                if agent_tags.is_empty() {
1048                    // Agent has no tags - only match tasks with no needed_tags
1049                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1050                    // And no wanted_tags
1051                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1052                } else {
1053                    // For needed_tags (AND): task must have no needed_tags OR agent has all
1054                    let needed_placeholders: Vec<String> = agent_tags
1055                        .iter()
1056                        .enumerate()
1057                        .map(|(i, _)| format!("?{}", param_idx + i))
1058                        .collect();
1059                    sql.push_str(&format!(
1060                        " AND (
1061                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1062                            OR (
1063                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1064                            ) = (
1065                                SELECT COUNT(*) FROM task_needed_tags 
1066                                WHERE task_id = t.id AND tag IN ({})
1067                            )
1068                        )",
1069                        needed_placeholders.join(", ")
1070                    ));
1071                    for tag in agent_tags {
1072                        params_vec.push(Box::new(tag.clone()));
1073                    }
1074                    param_idx += agent_tags.len();
1075
1076                    // For wanted_tags (OR): task must have no wanted_tags OR agent has at least one
1077                    let wanted_placeholders: Vec<String> = agent_tags
1078                        .iter()
1079                        .enumerate()
1080                        .map(|(i, _)| format!("?{}", param_idx + i))
1081                        .collect();
1082                    sql.push_str(&format!(
1083                        " AND (
1084                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1085                            OR EXISTS (
1086                                SELECT 1 FROM task_wanted_tags 
1087                                WHERE task_id = t.id AND tag IN ({})
1088                            )
1089                        )",
1090                        wanted_placeholders.join(", ")
1091                    ));
1092                    for tag in agent_tags {
1093                        params_vec.push(Box::new(tag.clone()));
1094                    }
1095                    // param_idx += agent_tags.len(); // not needed, last param set
1096                }
1097            }
1098
1099            // Build ORDER BY clause
1100            let order_clause = build_order_clause(sort_by, sort_order);
1101            sql.push_str(&format!(" ORDER BY {}", order_clause));
1102
1103            // Apply limit in SQL
1104            if let Some(l) = limit {
1105                sql.push_str(&format!(" LIMIT {}", l));
1106            }
1107
1108            if offset > 0 {
1109                sql.push_str(&format!(" OFFSET {}", offset));
1110            }
1111
1112            let params_refs: Vec<&dyn rusqlite::ToSql> =
1113                params_vec.iter().map(|b| b.as_ref()).collect();
1114
1115            let mut stmt = conn.prepare(&sql)?;
1116            let tasks: Vec<Task> = stmt
1117                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1118                .filter_map(|r| r.ok())
1119                .collect();
1120
1121            Ok(tasks)
1122        })
1123    }
1124
1125    /// Get agent tags by agent ID.
1126    pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1127        self.with_conn(|conn| {
1128            let result: Result<String, rusqlite::Error> = conn.query_row(
1129                "SELECT tags FROM workers WHERE id = ?1",
1130                params![agent_id],
1131                |row| row.get(0),
1132            );
1133
1134            match result {
1135                Ok(tags_json) => {
1136                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1137                    Ok(tags)
1138                }
1139                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1140                Err(e) => Err(e.into()),
1141            }
1142        })
1143    }
1144
1145    /// Internal helper: add dependency within a transaction (used by tasks.rs).
1146    pub(super) fn add_dependency_internal(
1147        conn: &Connection,
1148        from_task_id: &str,
1149        to_task_id: &str,
1150        dep_type: &str,
1151    ) -> Result<()> {
1152        conn.execute(
1153            "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1154            params![from_task_id, to_task_id, dep_type],
1155        )?;
1156        Ok(())
1157    }
1158
1159    /// Atomically relink dependencies: unlink all prev_from→prev_to, then link all from→to.
1160    /// This is a transaction-safe operation for moving children between parents.
1161    /// Returns a result with unlinked and linked pairs.
1162    pub fn relink(
1163        &self,
1164        prev_from_ids: &[String],
1165        prev_to_ids: &[String],
1166        from_ids: &[String],
1167        to_ids: &[String],
1168        dep_type: &str,
1169        deps_config: &DependenciesConfig,
1170    ) -> Result<RelinkResult> {
1171        // Validate dependency type upfront
1172        if !deps_config.is_valid_dep_type(dep_type) {
1173            return Err(anyhow!(
1174                "Invalid dependency type '{}'. Valid types: {:?}",
1175                dep_type,
1176                deps_config.dep_type_names()
1177            ));
1178        }
1179
1180        let def = deps_config
1181            .get_definition(dep_type)
1182            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1183        let is_vertical = def.display == DependencyDisplay::Vertical;
1184
1185        self.with_conn_mut(|conn| {
1186            let tx = conn.transaction()?;
1187
1188            let mut unlinked = Vec::new();
1189            let mut linked = Vec::new();
1190            let mut errors = Vec::new();
1191
1192            // Phase 1: Unlink all prev_from × prev_to
1193            for prev_from in prev_from_ids {
1194                for prev_to in prev_to_ids {
1195                    let rows = tx.execute(
1196                        "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1197                        params![prev_from, prev_to, dep_type],
1198                    )?;
1199                    if rows > 0 {
1200                        unlinked.push((prev_from.clone(), prev_to.clone()));
1201                    }
1202                }
1203            }
1204
1205            // Phase 2: Link all from × to (with validation)
1206            for from_id in from_ids {
1207                for to_id in to_ids {
1208                    // For vertical deps, check single-parent constraint
1209                    if is_vertical {
1210                        let existing_parent: Option<String> = tx.query_row(
1211                            "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1212                            params![to_id],
1213                            |row| row.get(0),
1214                        ).optional()?;
1215
1216                        if let Some(ref parent) = existing_parent
1217                            && parent != from_id {
1218                                errors.push(format!(
1219                                    "Task {} already has parent {}",
1220                                    to_id, parent
1221                                ));
1222                                continue;
1223                            }
1224                    }
1225
1226                    // Check for cycles using temporary view within transaction
1227                    if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1228                        errors.push(format!(
1229                            "Adding dependency {}→{} would create a cycle",
1230                            from_id, to_id
1231                        ));
1232                        continue;
1233                    }
1234
1235                    tx.execute(
1236                        "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1237                        params![from_id, to_id, dep_type],
1238                    )?;
1239                    linked.push((from_id.clone(), to_id.clone()));
1240                }
1241            }
1242
1243            if !errors.is_empty() {
1244                // Rollback on validation errors
1245                tx.rollback()?;
1246                return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1247            }
1248
1249            tx.commit()?;
1250            Ok(RelinkResult { unlinked, linked })
1251        })
1252    }
1253
1254    // ============================================================================
1255    // Graph Traversal Methods for scan tool
1256    // ============================================================================
1257
1258    /// Get predecessors (tasks that block this task) via blocks/follows dependencies.
1259    /// depth: 0 = none, N = N levels, -1 = all
1260    pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1261        if depth == 0 {
1262            return Ok(vec![]);
1263        }
1264
1265        self.with_conn(|conn| {
1266            let mut visited: HashSet<String> = HashSet::new();
1267            let mut result: Vec<Task> = Vec::new();
1268            let mut current_level: Vec<String> = vec![task_id.to_string()];
1269            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1270
1271            while !current_level.is_empty() && levels_remaining > 0 {
1272                let mut next_level: Vec<String> = Vec::new();
1273
1274                for tid in &current_level {
1275                    // Get tasks that block this one (from_task_id blocks to_task_id)
1276                    let mut stmt = conn.prepare(
1277                        "SELECT DISTINCT d.from_task_id FROM dependencies d
1278                         WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1279                    )?;
1280
1281                    let predecessors: Vec<String> = stmt
1282                        .query_map(params![tid], |row| row.get(0))?
1283                        .filter_map(|r| r.ok())
1284                        .collect();
1285
1286                    for pred_id in predecessors {
1287                        if !visited.contains(&pred_id) {
1288                            visited.insert(pred_id.clone());
1289                            if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1290                                result.push(task);
1291                            }
1292                            next_level.push(pred_id);
1293                        }
1294                    }
1295                }
1296
1297                current_level = next_level;
1298                levels_remaining -= 1;
1299            }
1300
1301            Ok(result)
1302        })
1303    }
1304
1305    /// Get successors (tasks that this task blocks) via blocks/follows dependencies.
1306    /// depth: 0 = none, N = N levels, -1 = all
1307    pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1308        if depth == 0 {
1309            return Ok(vec![]);
1310        }
1311
1312        self.with_conn(|conn| {
1313            let mut visited: HashSet<String> = HashSet::new();
1314            let mut result: Vec<Task> = Vec::new();
1315            let mut current_level: Vec<String> = vec![task_id.to_string()];
1316            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1317
1318            while !current_level.is_empty() && levels_remaining > 0 {
1319                let mut next_level: Vec<String> = Vec::new();
1320
1321                for tid in &current_level {
1322                    // Get tasks that this one blocks (from_task_id blocks to_task_id)
1323                    let mut stmt = conn.prepare(
1324                        "SELECT DISTINCT d.to_task_id FROM dependencies d
1325                         WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1326                    )?;
1327
1328                    let successors: Vec<String> = stmt
1329                        .query_map(params![tid], |row| row.get(0))?
1330                        .filter_map(|r| r.ok())
1331                        .collect();
1332
1333                    for succ_id in successors {
1334                        if !visited.contains(&succ_id) {
1335                            visited.insert(succ_id.clone());
1336                            if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1337                                result.push(task);
1338                            }
1339                            next_level.push(succ_id);
1340                        }
1341                    }
1342                }
1343
1344                current_level = next_level;
1345                levels_remaining -= 1;
1346            }
1347
1348            Ok(result)
1349        })
1350    }
1351
1352    /// Get ancestors (parent chain) via contains dependency.
1353    /// depth: 0 = none, N = N levels up, -1 = all
1354    pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1355        if depth == 0 {
1356            return Ok(vec![]);
1357        }
1358
1359        self.with_conn(|conn| {
1360            let mut result: Vec<Task> = Vec::new();
1361            let mut current_id = task_id.to_string();
1362            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1363
1364            while levels_remaining > 0 {
1365                // Get parent (from_task_id contains to_task_id)
1366                let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1367                    "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1368                    params![&current_id],
1369                    |row| row.get(0),
1370                );
1371
1372                match parent_result {
1373                    Ok(parent_id) => {
1374                        if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1375                            result.push(task);
1376                        }
1377                        current_id = parent_id;
1378                        levels_remaining -= 1;
1379                    }
1380                    Err(rusqlite::Error::QueryReturnedNoRows) => break,
1381                    Err(e) => return Err(e.into()),
1382                }
1383            }
1384
1385            Ok(result)
1386        })
1387    }
1388
1389    /// Get descendants (children tree) via contains dependency.
1390    /// depth: 0 = none, N = N levels down, -1 = all
1391    pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1392        if depth == 0 {
1393            return Ok(vec![]);
1394        }
1395
1396        self.with_conn(|conn| {
1397            let mut visited: HashSet<String> = HashSet::new();
1398            let mut result: Vec<Task> = Vec::new();
1399            let mut current_level: Vec<String> = vec![task_id.to_string()];
1400            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1401
1402            while !current_level.is_empty() && levels_remaining > 0 {
1403                let mut next_level: Vec<String> = Vec::new();
1404
1405                for tid in &current_level {
1406                    // Get children (from_task_id contains to_task_id)
1407                    let mut stmt = conn.prepare(
1408                        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1409                    )?;
1410
1411                    let children: Vec<String> = stmt
1412                        .query_map(params![tid], |row| row.get(0))?
1413                        .filter_map(|r| r.ok())
1414                        .collect();
1415
1416                    for child_id in children {
1417                        if !visited.contains(&child_id) {
1418                            visited.insert(child_id.clone());
1419                            if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1420                                result.push(task);
1421                            }
1422                            next_level.push(child_id);
1423                        }
1424                    }
1425                }
1426
1427                current_level = next_level;
1428                levels_remaining -= 1;
1429            }
1430
1431            Ok(result)
1432        })
1433    }
1434}
1435
1436/// Helper to get a task by ID within a connection context.
1437fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1438    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1439    let task = stmt
1440        .query_row(params![task_id], super::tasks::parse_task_row)
1441        .optional()?;
1442    Ok(task)
1443}
1444
1445/// Get the IDs of tasks that block a given task from starting (unsatisfied dependencies).
1446/// A task blocks starting if it has a start-blocking dependency type and is in a blocking state.
1447/// This is a transaction-safe version for use within existing transactions.
1448pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1449    conn: &Connection,
1450    task_id: &str,
1451    states_config: &StatesConfig,
1452    deps_config: &DependenciesConfig,
1453) -> Result<Vec<String>> {
1454    let start_blocking_types = deps_config.start_blocking_types();
1455    if start_blocking_types.is_empty() {
1456        return Ok(vec![]);
1457    }
1458
1459    // Build IN clause from blocking_states
1460    let state_placeholders: Vec<String> = states_config
1461        .blocking_states
1462        .iter()
1463        .enumerate()
1464        .map(|(i, _)| format!("?{}", i + 2))
1465        .collect();
1466    let state_clause = state_placeholders.join(", ");
1467
1468    // Build IN clause from types
1469    let type_start = states_config.blocking_states.len() + 2;
1470    let type_placeholders: Vec<String> = start_blocking_types
1471        .iter()
1472        .enumerate()
1473        .map(|(i, _)| format!("?{}", type_start + i))
1474        .collect();
1475    let type_clause = type_placeholders.join(", ");
1476
1477    let sql = format!(
1478        "SELECT blocker.id FROM dependencies d
1479         INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1480         WHERE d.to_task_id = ?1 
1481         AND d.dep_type IN ({})
1482         AND blocker.status IN ({})",
1483        type_clause, state_clause
1484    );
1485
1486    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1487    params_vec.push(Box::new(task_id.to_string()));
1488    for state in &states_config.blocking_states {
1489        params_vec.push(Box::new(state.clone()));
1490    }
1491    for t in &start_blocking_types {
1492        params_vec.push(Box::new(t.to_string()));
1493    }
1494    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1495
1496    let mut stmt = conn.prepare(&sql)?;
1497    let blockers = stmt
1498        .query_map(params_refs.as_slice(), |row| {
1499            let id: String = row.get(0)?;
1500            Ok(id)
1501        })?
1502        .filter_map(|r| r.ok())
1503        .collect();
1504
1505    Ok(blockers)
1506}
1507
1508/// Propagate unblock effects when a task transitions out of a blocking state.
1509/// This is called after a task completes to find newly unblocked tasks and
1510/// optionally auto-advance them.
1511///
1512/// Returns (unblocked, auto_advanced):
1513/// - unblocked: All task IDs that are now ready (all dependencies satisfied)
1514/// - auto_advanced: Subset of unblocked that were actually transitioned (when enabled)
1515///
1516/// Algorithm:
1517/// 1. Find all tasks that have a start-blocking dependency on the completed task
1518/// 2. For each candidate:
1519///    - Skip if not in initial state or already claimed
1520///    - Check if ALL other start-blockers are also satisfied
1521///    - If fully unblocked → add to unblocked list
1522///    - If auto_advance enabled → also transition to target_state
1523/// 3. Return both lists
1524pub(crate) fn propagate_unblock_effects(
1525    conn: &Connection,
1526    completed_task_id: &str,
1527    agent_id: Option<&str>,
1528    states_config: &StatesConfig,
1529    deps_config: &DependenciesConfig,
1530    auto_advance: &AutoAdvanceConfig,
1531) -> Result<(Vec<String>, Vec<String>)> {
1532    // Get start-blocking dependency types
1533    let start_blocking_types = deps_config.start_blocking_types();
1534    if start_blocking_types.is_empty() {
1535        return Ok((vec![], vec![]));
1536    }
1537
1538    // Find all tasks that depend on the completed task via start-blocking dependencies
1539    let type_placeholders: Vec<String> = start_blocking_types
1540        .iter()
1541        .enumerate()
1542        .map(|(i, _)| format!("?{}", i + 2))
1543        .collect();
1544    let type_clause = type_placeholders.join(", ");
1545
1546    let sql = format!(
1547        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1548        type_clause
1549    );
1550
1551    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1552    params_vec.push(Box::new(completed_task_id.to_string()));
1553    for t in &start_blocking_types {
1554        params_vec.push(Box::new(t.to_string()));
1555    }
1556    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1557
1558    let mut stmt = conn.prepare(&sql)?;
1559    let dependent_task_ids: Vec<String> = stmt
1560        .query_map(params_refs.as_slice(), |row| row.get(0))?
1561        .filter_map(|r| r.ok())
1562        .collect();
1563
1564    let mut unblocked = Vec::new();
1565    let mut auto_advanced = Vec::new();
1566    let now = super::now_ms();
1567
1568    // Determine if we should auto-advance
1569    let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1570    let target_state = auto_advance.target_state.clone();
1571
1572    // Validate target state if auto-advance is enabled
1573    if should_auto_advance {
1574        let ts = target_state.as_ref().unwrap();
1575        if !states_config.is_valid_state(ts) {
1576            return Err(anyhow!(
1577                "Auto-advance target state '{}' is not a valid state",
1578                ts
1579            ));
1580        }
1581    }
1582
1583    for task_id in dependent_task_ids {
1584        // Get the task
1585        let task = match get_task_by_id_internal(conn, &task_id)? {
1586            Some(t) => t,
1587            None => continue,
1588        };
1589
1590        // Skip if not in initial state
1591        if task.status != states_config.initial {
1592            continue;
1593        }
1594
1595        // Skip if task is already claimed
1596        if task.worker_id.is_some() {
1597            continue;
1598        }
1599
1600        // Check if ALL start-blockers are now satisfied (not in blocking states)
1601        // Build query to count remaining blockers that are still blocking
1602        let state_placeholders: Vec<String> = states_config
1603            .blocking_states
1604            .iter()
1605            .enumerate()
1606            .map(|(i, _)| format!("?{}", i + 3))
1607            .collect();
1608        let state_clause = state_placeholders.join(", ");
1609
1610        // Reuse type_placeholders from above
1611        let type_start = states_config.blocking_states.len() + 3;
1612        let type_placeholders2: Vec<String> = start_blocking_types
1613            .iter()
1614            .enumerate()
1615            .map(|(i, _)| format!("?{}", type_start + i))
1616            .collect();
1617        let type_clause2 = type_placeholders2.join(", ");
1618
1619        let blocker_sql = format!(
1620            "SELECT COUNT(*) FROM dependencies d
1621             INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1622             WHERE d.to_task_id = ?1
1623             AND d.from_task_id != ?2
1624             AND d.dep_type IN ({})
1625             AND blocker.status IN ({})",
1626            type_clause2, state_clause
1627        );
1628
1629        let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1630        blocker_params.push(Box::new(task_id.clone()));
1631        blocker_params.push(Box::new(completed_task_id.to_string()));
1632        for state in &states_config.blocking_states {
1633            blocker_params.push(Box::new(state.clone()));
1634        }
1635        for t in &start_blocking_types {
1636            blocker_params.push(Box::new(t.to_string()));
1637        }
1638        let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1639            blocker_params.iter().map(|b| b.as_ref()).collect();
1640
1641        let remaining_blockers: i32 =
1642            conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1643
1644        if remaining_blockers > 0 {
1645            continue; // Still blocked by other tasks
1646        }
1647
1648        // Task is now fully unblocked - add to unblocked list
1649        unblocked.push(task_id.clone());
1650
1651        // Auto-advance if enabled and transition is valid
1652        if should_auto_advance {
1653            let ts = target_state.as_ref().unwrap();
1654
1655            // Validate transition from initial to target_state
1656            if !states_config.is_valid_transition(&states_config.initial, ts) {
1657                // Skip auto-advance for this task - transition not allowed
1658                continue;
1659            }
1660
1661            // Auto-advance: update the task's status
1662            conn.execute(
1663                "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1664                params![ts, now, &task_id],
1665            )?;
1666
1667            // Record state transition
1668            let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1669            super::state_transitions::record_state_transition(
1670                conn,
1671                &task_id,
1672                ts,
1673                agent_id,
1674                Some(&reason),
1675                states_config,
1676            )?;
1677
1678            auto_advanced.push(task_id);
1679        }
1680    }
1681
1682    Ok((unblocked, auto_advanced))
1683}