Skip to main content

task_graph_mcp/db/
deps.rs

1//! Dependency operations and cycle detection with typed dependencies.
2
3use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10/// Result of a relink operation.
11#[derive(Debug)]
12pub struct RelinkResult {
13    /// Dependencies that were unlinked (from, to).
14    pub unlinked: Vec<(String, String)>,
15    /// Dependencies that were linked (from, to).
16    pub linked: Vec<(String, String)>,
17}
18
19/// Check if adding a dependency would create a cycle (transaction-safe version).
20fn would_create_cycle_in_tx(
21    tx: &rusqlite::Transaction,
22    from_task_id: &str,
23    to_task_id: &str,
24    dep_type: &str,
25    deps_config: &DependenciesConfig,
26) -> Result<bool> {
27    let def = deps_config
28        .get_definition(dep_type)
29        .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31    // A cycle would occur if to_task can already reach from_task
32    // through the same "graph" (horizontal or vertical)
33    let mut visited: HashSet<String> = HashSet::new();
34    let mut queue: VecDeque<String> = VecDeque::new();
35    queue.push_back(to_task_id.to_string());
36
37    while let Some(current) = queue.pop_front() {
38        if current == from_task_id {
39            return Ok(true); // Would create a cycle
40        }
41
42        if visited.contains(&current) {
43            continue;
44        }
45        visited.insert(current.clone());
46
47        // Get all tasks that current points to (in the relevant graph)
48        let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49            // For vertical deps, only check containment relationships
50            let mut stmt = tx.prepare(
51                "SELECT to_task_id FROM dependencies d
52                 JOIN (SELECT value FROM json_each(?1)) types
53                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54            )?;
55            let vertical_types: Vec<&str> = deps_config.vertical_types();
56            let types_json = serde_json::to_string(&vertical_types)?;
57            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
58                .filter_map(|r| r.ok())
59                .collect()
60        } else {
61            // For horizontal deps, check all start-blocking relationships
62            let mut stmt = tx.prepare(
63                "SELECT to_task_id FROM dependencies d
64                 JOIN (SELECT value FROM json_each(?1)) types
65                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66            )?;
67            let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68            let types_json = serde_json::to_string(&start_blocking)?;
69            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
70                .filter_map(|r| r.ok())
71                .collect()
72        };
73
74        for dep in deps {
75            if !visited.contains(&dep) {
76                queue.push_back(dep);
77            }
78        }
79    }
80
81    Ok(false)
82}
83
84/// Build an ORDER BY clause from sort_by and sort_order parameters.
85/// Returns a safe SQL ORDER BY expression.
86fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87    let field = match sort_by {
88        Some("priority") => "CAST(t.priority AS INTEGER)",
89        Some("created_at") => "t.created_at",
90        Some("updated_at") => "t.updated_at",
91        _ => "t.created_at", // default
92    };
93
94    let order = match sort_order {
95        Some("asc") => "ASC",
96        Some("desc") => "DESC",
97        _ => {
98            // Default: priority is descending (higher number = more important), dates are descending
99            "DESC"
100        }
101    };
102
103    format!("{} {}", field, order)
104}
105
106/// Result of attempting to add a dependency.
107#[derive(Debug)]
108pub enum AddDependencyResult {
109    /// Dependency was created successfully.
110    Created,
111    /// Dependency already existed (no change).
112    AlreadyExists,
113    /// Source task does not exist.
114    FromTaskNotFound,
115    /// Target task does not exist.
116    ToTaskNotFound,
117}
118
119impl Database {
120    /// Check if a task exists by ID.
121    pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122        self.with_conn(|conn| {
123            let count: i64 = conn.query_row(
124                "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125                params![task_id],
126                |row| row.get(0),
127            )?;
128            Ok(count > 0)
129        })
130    }
131
132    /// Add a typed dependency (from blocks/contains to).
133    /// Returns Ok(()) on success. For soft linking with warnings, use add_dependency_soft.
134    pub fn add_dependency(
135        &self,
136        from_task_id: &str,
137        to_task_id: &str,
138        dep_type: &str,
139        deps_config: &DependenciesConfig,
140    ) -> Result<()> {
141        match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142            AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143            AddDependencyResult::FromTaskNotFound => {
144                Err(anyhow!("Source task '{}' not found", from_task_id))
145            }
146            AddDependencyResult::ToTaskNotFound => {
147                Err(anyhow!("Target task '{}' not found", to_task_id))
148            }
149        }
150    }
151
152    /// Add a typed dependency with soft failure (returns result status instead of error).
153    /// Use this when you want to handle missing tasks as warnings.
154    pub fn add_dependency_soft(
155        &self,
156        from_task_id: &str,
157        to_task_id: &str,
158        dep_type: &str,
159        deps_config: &DependenciesConfig,
160    ) -> Result<AddDependencyResult> {
161        // Validate dependency type
162        if !deps_config.is_valid_dep_type(dep_type) {
163            return Err(anyhow!(
164                "Invalid dependency type '{}'. Valid types: {:?}",
165                dep_type,
166                deps_config.dep_type_names()
167            ));
168        }
169
170        // Check if tasks exist first
171        if !self.task_exists(from_task_id)? {
172            return Ok(AddDependencyResult::FromTaskNotFound);
173        }
174        if !self.task_exists(to_task_id)? {
175            return Ok(AddDependencyResult::ToTaskNotFound);
176        }
177
178        // For vertical (contains) dependencies, check single-parent constraint
179        let def = deps_config
180            .get_definition(dep_type)
181            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182        if def.display == DependencyDisplay::Vertical
183            && let Some(existing_parent) = self.get_parent(to_task_id)?
184            && existing_parent != from_task_id
185        {
186            return Err(anyhow!(
187                "Task {} already has parent {}",
188                to_task_id,
189                existing_parent
190            ));
191        }
192
193        // Check for cycle in the appropriate graph
194        if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195            return Err(anyhow!("Adding this dependency would create a cycle"));
196        }
197
198        self.with_conn(|conn| {
199            let changes = conn.execute(
200                "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201                params![from_task_id, to_task_id, dep_type],
202            )?;
203            if changes == 0 {
204                Ok(AddDependencyResult::AlreadyExists)
205            } else {
206                Ok(AddDependencyResult::Created)
207            }
208        })
209    }
210
211    /// Check if adding a dependency would create a cycle.
212    /// For horizontal deps: check cycle in the start-blocking graph.
213    /// For vertical deps: check containment cycle.
214    pub fn would_create_cycle(
215        &self,
216        from_task_id: &str,
217        to_task_id: &str,
218        dep_type: &str,
219        deps_config: &DependenciesConfig,
220    ) -> Result<bool> {
221        let def = deps_config
222            .get_definition(dep_type)
223            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225        self.with_conn(|conn| {
226            // A cycle would occur if to_task can already reach from_task
227            // through the same "graph" (horizontal or vertical)
228            let mut visited: HashSet<String> = HashSet::new();
229            let mut queue: VecDeque<String> = VecDeque::new();
230            queue.push_back(to_task_id.to_string());
231
232            while let Some(current) = queue.pop_front() {
233                if current == from_task_id {
234                    return Ok(true); // Would create a cycle
235                }
236
237                if visited.contains(&current) {
238                    continue;
239                }
240                visited.insert(current.clone());
241
242                // Get all tasks that current points to (in the relevant graph)
243                let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244                    // For vertical deps, only check containment relationships
245                    let mut stmt = conn.prepare(
246                        "SELECT to_task_id FROM dependencies d
247                         JOIN (SELECT value FROM json_each(?1)) types
248                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249                    )?;
250                    let vertical_types: Vec<&str> = deps_config.vertical_types();
251                    let types_json = serde_json::to_string(&vertical_types)?;
252                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
253                        .filter_map(|r| r.ok())
254                        .collect()
255                } else {
256                    // For horizontal deps, check all start-blocking relationships
257                    let mut stmt = conn.prepare(
258                        "SELECT to_task_id FROM dependencies d
259                         JOIN (SELECT value FROM json_each(?1)) types
260                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261                    )?;
262                    let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263                    let types_json = serde_json::to_string(&start_blocking)?;
264                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
265                        .filter_map(|r| r.ok())
266                        .collect()
267                };
268
269                for dep in deps {
270                    if !visited.contains(&dep) {
271                        queue.push_back(dep);
272                    }
273                }
274            }
275
276            Ok(false)
277        })
278    }
279
280    /// Remove a typed dependency. Returns true if a row was deleted.
281    pub fn remove_dependency(
282        &self,
283        from_task_id: &str,
284        to_task_id: &str,
285        dep_type: &str,
286    ) -> Result<bool> {
287        self.with_conn(|conn| {
288            let rows = conn.execute(
289                "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290                params![from_task_id, to_task_id, dep_type],
291            )?;
292            Ok(rows > 0)
293        })
294    }
295
296    /// Remove all dependencies of a given type from a task (outgoing edges).
297    /// Returns the list of removed dependencies.
298    pub fn remove_all_outgoing_dependencies(
299        &self,
300        from_task_id: &str,
301        dep_type: &str,
302    ) -> Result<Vec<Dependency>> {
303        self.with_conn_mut(|conn| {
304            let tx = conn.transaction()?;
305
306            // First get the dependencies that will be removed
307            let deps: Vec<Dependency> = {
308                let mut stmt = tx.prepare(
309                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310                )?;
311                stmt
312                    .query_map(params![from_task_id, dep_type], |row| {
313                        Ok(Dependency {
314                            from_task_id: row.get(0)?,
315                            to_task_id: row.get(1)?,
316                            dep_type: row.get(2)?,
317                        })
318                    })?
319                    .filter_map(|r| r.ok())
320                    .collect()
321            };
322
323            // Then delete them
324            tx.execute(
325                "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326                params![from_task_id, dep_type],
327            )?;
328
329            tx.commit()?;
330            Ok(deps)
331        })
332    }
333
334    /// Remove all dependencies of a given type to a task (incoming edges).
335    /// Returns the list of removed dependencies.
336    pub fn remove_all_incoming_dependencies(
337        &self,
338        to_task_id: &str,
339        dep_type: &str,
340    ) -> Result<Vec<Dependency>> {
341        self.with_conn_mut(|conn| {
342            let tx = conn.transaction()?;
343
344            // First get the dependencies that will be removed
345            let deps: Vec<Dependency> = {
346                let mut stmt = tx.prepare(
347                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348                )?;
349                stmt
350                    .query_map(params![to_task_id, dep_type], |row| {
351                        Ok(Dependency {
352                            from_task_id: row.get(0)?,
353                            to_task_id: row.get(1)?,
354                            dep_type: row.get(2)?,
355                        })
356                    })?
357                    .filter_map(|r| r.ok())
358                    .collect()
359            };
360
361            // Then delete them
362            tx.execute(
363                "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364                params![to_task_id, dep_type],
365            )?;
366
367            tx.commit()?;
368            Ok(deps)
369        })
370    }
371
372    /// Get all dependencies.
373    pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374        self.with_conn(|conn| {
375            let mut stmt =
376                conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378            let deps = stmt
379                .query_map([], |row| {
380                    let from: String = row.get(0)?;
381                    let to: String = row.get(1)?;
382                    let dep_type: String = row.get(2)?;
383                    Ok(Dependency {
384                        from_task_id: from,
385                        to_task_id: to,
386                        dep_type,
387                    })
388                })?
389                .filter_map(|r| r.ok())
390                .collect();
391
392            Ok(deps)
393        })
394    }
395
396    /// Get dependencies of a specific type for a task.
397    pub fn get_dependencies_by_type(
398        &self,
399        task_id: &str,
400        dep_type: &str,
401        direction: &str,
402    ) -> Result<Vec<Dependency>> {
403        self.with_conn(|conn| {
404            let sql = if direction == "incoming" {
405                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406            } else {
407                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408            };
409
410            let mut stmt = conn.prepare(sql)?;
411
412            let deps = stmt
413                .query_map(params![task_id, dep_type], |row| {
414                    let from: String = row.get(0)?;
415                    let to: String = row.get(1)?;
416                    let dep_type: String = row.get(2)?;
417                    Ok(Dependency {
418                        from_task_id: from,
419                        to_task_id: to,
420                        dep_type,
421                    })
422                })?
423                .filter_map(|r| r.ok())
424                .collect();
425
426            Ok(deps)
427        })
428    }
429
430    /// Get tasks that block a given task from starting (dep_type with blocks: start).
431    pub fn get_start_blockers(
432        &self,
433        task_id: &str,
434        deps_config: &DependenciesConfig,
435    ) -> Result<Vec<String>> {
436        let start_blocking_types = deps_config.start_blocking_types();
437        if start_blocking_types.is_empty() {
438            return Ok(vec![]);
439        }
440
441        self.with_conn(|conn| {
442            let placeholders: String = start_blocking_types
443                .iter()
444                .enumerate()
445                .map(|(i, _)| format!("?{}", i + 2))
446                .collect::<Vec<_>>()
447                .join(", ");
448
449            let sql = format!(
450                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451                placeholders
452            );
453
454            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455            params_vec.push(Box::new(task_id.to_string()));
456            for t in &start_blocking_types {
457                params_vec.push(Box::new(t.to_string()));
458            }
459            let params_refs: Vec<&dyn rusqlite::ToSql> =
460                params_vec.iter().map(|b| b.as_ref()).collect();
461
462            let mut stmt = conn.prepare(&sql)?;
463            let blockers = stmt
464                .query_map(params_refs.as_slice(), |row| {
465                    let id: String = row.get(0)?;
466                    Ok(id)
467                })?
468                .filter_map(|r| r.ok())
469                .collect();
470
471            Ok(blockers)
472        })
473    }
474
475    /// Get tasks that block a given task from completing (dep_type with blocks: completion).
476    /// For a parent task, this returns children that must complete first.
477    pub fn get_completion_blockers(
478        &self,
479        task_id: &str,
480        deps_config: &DependenciesConfig,
481    ) -> Result<Vec<String>> {
482        let completion_blocking_types = deps_config.completion_blocking_types();
483        if completion_blocking_types.is_empty() {
484            return Ok(vec![]);
485        }
486
487        self.with_conn(|conn| {
488            let placeholders: String = completion_blocking_types
489                .iter()
490                .enumerate()
491                .map(|(i, _)| format!("?{}", i + 2))
492                .collect::<Vec<_>>()
493                .join(", ");
494
495            // For completion blockers, we look at outgoing edges (from_task_id = parent)
496            // because "contains" means parent -> child, and child blocks parent completion
497            let sql = format!(
498                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499                placeholders
500            );
501
502            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503            params_vec.push(Box::new(task_id.to_string()));
504            for t in &completion_blocking_types {
505                params_vec.push(Box::new(t.to_string()));
506            }
507            let params_refs: Vec<&dyn rusqlite::ToSql> =
508                params_vec.iter().map(|b| b.as_ref()).collect();
509
510            let mut stmt = conn.prepare(&sql)?;
511            let blockers = stmt
512                .query_map(params_refs.as_slice(), |row| {
513                    let id: String = row.get(0)?;
514                    Ok(id)
515                })?
516                .filter_map(|r| r.ok())
517                .collect();
518
519            Ok(blockers)
520        })
521    }
522
523    /// Get the parent of a task (via 'contains' dependency).
524    pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525        self.with_conn(|conn| {
526            let result: Result<String, rusqlite::Error> = conn.query_row(
527                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528                params![task_id],
529                |row| row.get(0),
530            );
531
532            match result {
533                Ok(parent_id) => Ok(Some(parent_id)),
534                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535                Err(e) => Err(e.into()),
536            }
537        })
538    }
539
540    /// Get children of a task (via 'contains' dependency).
541    pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542        self.with_conn(|conn| {
543            let mut stmt = conn.prepare(
544                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545            )?;
546
547            let children = stmt
548                .query_map(params![task_id], |row| {
549                    let id: String = row.get(0)?;
550                    Ok(id)
551                })?
552                .filter_map(|r| r.ok())
553                .collect();
554
555            Ok(children)
556        })
557    }
558
559    /// Get all tasks that block a given task (backwards compatible).
560    /// Returns tasks from both 'blocks' and 'follows' dependencies.
561    pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562        self.with_conn(|conn| {
563            let mut stmt = conn.prepare(
564                "SELECT from_task_id FROM dependencies 
565                 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566            )?;
567
568            let blockers = stmt
569                .query_map(params![task_id], |row| {
570                    let id: String = row.get(0)?;
571                    Ok(id)
572                })?
573                .filter_map(|r| r.ok())
574                .collect();
575
576            Ok(blockers)
577        })
578    }
579
580    /// Get unsatisfied blockers for a given task.
581    /// Returns only blocker task IDs whose status is still in a blocking state
582    /// (i.e., the dependency is not yet satisfied). Blockers in terminal states
583    /// like completed/cancelled/failed are excluded.
584    pub fn get_unsatisfied_blockers(
585        &self,
586        task_id: &str,
587        states_config: &StatesConfig,
588    ) -> Result<Vec<String>> {
589        self.with_conn(|conn| {
590            // Build IN clause for blocking states
591            let state_placeholders: Vec<String> = states_config
592                .blocking_states
593                .iter()
594                .enumerate()
595                .map(|(i, _)| format!("?{}", i + 2))
596                .collect();
597            let state_clause = state_placeholders.join(", ");
598
599            let sql = format!(
600                "SELECT d.from_task_id FROM dependencies d
601                 INNER JOIN tasks t ON d.from_task_id = t.id
602                 WHERE d.to_task_id = ?1
603                 AND d.dep_type IN ('blocks', 'follows')
604                 AND t.status IN ({})",
605                state_clause
606            );
607
608            let mut stmt = conn.prepare(&sql)?;
609
610            // Build params: task_id + blocking states
611            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
612            params_vec.push(Box::new(task_id.to_string()));
613            for state in &states_config.blocking_states {
614                params_vec.push(Box::new(state.clone()));
615            }
616            let params_refs: Vec<&dyn rusqlite::ToSql> =
617                params_vec.iter().map(|b| b.as_ref()).collect();
618
619            let blockers = stmt
620                .query_map(params_refs.as_slice(), |row| {
621                    let id: String = row.get(0)?;
622                    Ok(id)
623                })?
624                .filter_map(|r| r.ok())
625                .collect();
626
627            Ok(blockers)
628        })
629    }
630
631    /// Get tasks that a given task blocks.
632    #[allow(dead_code)]
633    pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
634        self.with_conn(|conn| {
635            let mut stmt = conn.prepare(
636                "SELECT to_task_id FROM dependencies 
637                 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
638            )?;
639
640            let blocking = stmt
641                .query_map(params![task_id], |row| {
642                    let id: String = row.get(0)?;
643                    Ok(id)
644                })?
645                .filter_map(|r| r.ok())
646                .collect();
647
648            Ok(blocking)
649        })
650    }
651
652    /// Get tasks that are blocked by incomplete start dependencies.
653    /// A task is blocked if any of its start-blocking dependencies are in a blocking state.
654    /// Excludes soft-deleted tasks.
655    pub fn get_blocked_tasks(
656        &self,
657        states_config: &StatesConfig,
658        deps_config: &DependenciesConfig,
659        sort_by: Option<&str>,
660        sort_order: Option<&str>,
661    ) -> Result<Vec<Task>> {
662        let start_blocking_types = deps_config.start_blocking_types();
663        if start_blocking_types.is_empty() {
664            return Ok(vec![]);
665        }
666
667        self.with_conn(|conn| {
668            // Build IN clause from blocking_states
669            let state_placeholders: Vec<String> = states_config
670                .blocking_states
671                .iter()
672                .enumerate()
673                .map(|(i, _)| format!("?{}", i + 2))
674                .collect();
675            let state_clause = state_placeholders.join(", ");
676
677            // Build IN clause from start_blocking_types
678            let type_start = states_config.blocking_states.len() + 2;
679            let type_placeholders: Vec<String> = start_blocking_types
680                .iter()
681                .enumerate()
682                .map(|(i, _)| format!("?{}", type_start + i))
683                .collect();
684            let type_clause = type_placeholders.join(", ");
685
686            // Build ORDER BY clause
687            let order_clause = build_order_clause(sort_by, sort_order);
688
689            let sql = format!(
690                "SELECT DISTINCT t.*
691                 FROM tasks t
692                 INNER JOIN dependencies d ON t.id = d.to_task_id
693                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
694                 WHERE d.dep_type IN ({})
695                 AND blocker.status IN ({})
696                 AND t.status = ?1
697                 AND t.deleted_at IS NULL
698                 ORDER BY {}",
699                type_clause, state_clause, order_clause
700            );
701
702            let mut stmt = conn.prepare(&sql)?;
703
704            // Build params: initial state + blocking states + start_blocking_types
705            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
706            params_vec.push(Box::new(states_config.initial.clone()));
707            for state in &states_config.blocking_states {
708                params_vec.push(Box::new(state.clone()));
709            }
710            for t in &start_blocking_types {
711                params_vec.push(Box::new(t.to_string()));
712            }
713            let params_refs: Vec<&dyn rusqlite::ToSql> =
714                params_vec.iter().map(|b| b.as_ref()).collect();
715
716            let tasks = stmt
717                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
718                .filter_map(|r| r.ok())
719                .collect();
720
721            Ok(tasks)
722        })
723    }
724
725    /// Get tasks that are ready to be claimed (all start dependencies satisfied).
726    /// A task is ready if it's in the initial state, unclaimed, and all start-blocking deps are not blocking.
727    /// When agent_id is provided, also filters by agent's tag qualifications using junction tables.
728    /// Excludes soft-deleted tasks.
729    pub fn get_ready_tasks(
730        &self,
731        agent_id: Option<&str>,
732        states_config: &StatesConfig,
733        deps_config: &DependenciesConfig,
734        sort_by: Option<&str>,
735        sort_order: Option<&str>,
736    ) -> Result<Vec<Task>> {
737        let start_blocking_types = deps_config.start_blocking_types();
738
739        // Get agent tags if agent_id is provided (for junction table filtering)
740        let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
741            Some(self.get_agent_tags(aid)?)
742        } else {
743            None
744        };
745
746        self.with_conn(|conn| {
747            // Build IN clause from blocking_states
748            let state_placeholders: Vec<String> = states_config
749                .blocking_states
750                .iter()
751                .enumerate()
752                .map(|(i, _)| format!("?{}", i + 2))
753                .collect();
754            let state_clause = state_placeholders.join(", ");
755
756            // Build IN clause from start_blocking_types
757            let type_start = states_config.blocking_states.len() + 2;
758            let type_placeholders: Vec<String> = start_blocking_types
759                .iter()
760                .enumerate()
761                .map(|(i, _)| format!("?{}", type_start + i))
762                .collect();
763            let type_clause = type_placeholders.join(", ");
764
765            // Build ORDER BY clause - for ready tasks, default is priority then created_at
766            let order_clause = if sort_by.is_some() {
767                build_order_clause(sort_by, sort_order)
768            } else {
769                // Default for ready: priority (high first), then created_at
770                "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
771            };
772
773            // Track param index for agent tag filters
774            let mut param_idx = type_start + start_blocking_types.len();
775
776            // Build agent qualification filters using junction tables
777            let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
778                // For agent_tags_all (AND): agent must have ALL needed tags
779                // Count how many of the task's needed_tags match agent's tags
780                // Either the task has no needed_tags, or all must match
781                let needed_placeholders: Vec<String> = tags
782                    .iter()
783                    .enumerate()
784                    .map(|(i, _)| format!("?{}", param_idx + i))
785                    .collect();
786                param_idx += tags.len();
787
788                let needed_clause = if needed_placeholders.is_empty() {
789                    // Agent has no tags - only match tasks with no needed_tags
790                    "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
791                        .to_string()
792                } else {
793                    // Task must have no needed_tags OR agent must have all of them
794                    format!(
795                        "AND (
796                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
797                            OR (
798                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
799                            ) = (
800                                SELECT COUNT(*) FROM task_needed_tags 
801                                WHERE task_id = t.id AND tag IN ({})
802                            )
803                        )",
804                        needed_placeholders.join(", ")
805                    )
806                };
807
808                // For agent_tags_any (OR): agent must have at least ONE wanted tag
809                let wanted_placeholders: Vec<String> = tags
810                    .iter()
811                    .enumerate()
812                    .map(|(i, _)| format!("?{}", param_idx + i))
813                    .collect();
814
815                let wanted_clause = if wanted_placeholders.is_empty() {
816                    // Agent has no tags - only match tasks with no wanted_tags
817                    "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
818                        .to_string()
819                } else {
820                    // Task must have no wanted_tags OR agent must have at least one
821                    format!(
822                        "AND (
823                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
824                            OR EXISTS (
825                                SELECT 1 FROM task_wanted_tags 
826                                WHERE task_id = t.id AND tag IN ({})
827                            )
828                        )",
829                        wanted_placeholders.join(", ")
830                    )
831                };
832
833                (needed_clause, wanted_clause)
834            } else {
835                (String::new(), String::new())
836            };
837
838            let sql = format!(
839                "SELECT t.*
840                 FROM tasks t
841                 WHERE t.status = ?1
842                 AND t.worker_id IS NULL
843                 AND t.deleted_at IS NULL
844                 AND NOT EXISTS (
845                     SELECT 1 FROM dependencies d
846                     INNER JOIN tasks blocker ON d.from_task_id = blocker.id
847                     WHERE d.to_task_id = t.id
848                     AND d.dep_type IN ({})
849                     AND blocker.status IN ({})
850                 )
851                 AND t.id NOT IN (
852                     SELECT from_task_id FROM dependencies
853                     WHERE dep_type = 'contains'
854                 )
855                 {}
856                 {}
857                 ORDER BY {}",
858                type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
859            );
860
861            let mut stmt = conn.prepare(&sql)?;
862
863            // Build params: initial state + blocking states + types + agent tags (twice if present)
864            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
865            params_vec.push(Box::new(states_config.initial.clone()));
866            for state in &states_config.blocking_states {
867                params_vec.push(Box::new(state.clone()));
868            }
869            for t in &start_blocking_types {
870                params_vec.push(Box::new(t.to_string()));
871            }
872            // Add agent tags twice (once for needed_tags check, once for wanted_tags check)
873            if let Some(ref tags) = agent_tags {
874                for tag in tags {
875                    params_vec.push(Box::new(tag.clone()));
876                }
877                for tag in tags {
878                    params_vec.push(Box::new(tag.clone()));
879                }
880            }
881            let params_refs: Vec<&dyn rusqlite::ToSql> =
882                params_vec.iter().map(|b| b.as_ref()).collect();
883
884            let tasks: Vec<Task> = stmt
885                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
886                .filter_map(|r| r.ok())
887                .collect();
888
889            Ok(tasks)
890        })
891    }
892
893    /// Check if a task has unmet start dependencies.
894    #[allow(dead_code)]
895    pub fn has_unmet_start_dependencies(
896        &self,
897        task_id: &str,
898        states_config: &StatesConfig,
899        deps_config: &DependenciesConfig,
900    ) -> Result<bool> {
901        let start_blocking_types = deps_config.start_blocking_types();
902        if start_blocking_types.is_empty() {
903            return Ok(false);
904        }
905
906        self.with_conn(|conn| {
907            // Build IN clause from blocking_states
908            let state_placeholders: Vec<String> = states_config
909                .blocking_states
910                .iter()
911                .enumerate()
912                .map(|(i, _)| format!("?{}", i + 2))
913                .collect();
914            let state_clause = state_placeholders.join(", ");
915
916            // Build IN clause from types
917            let type_start = states_config.blocking_states.len() + 2;
918            let type_placeholders: Vec<String> = start_blocking_types
919                .iter()
920                .enumerate()
921                .map(|(i, _)| format!("?{}", type_start + i))
922                .collect();
923            let type_clause = type_placeholders.join(", ");
924
925            let sql = format!(
926                "SELECT COUNT(*) FROM dependencies d
927                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
928                 WHERE d.to_task_id = ?1 
929                 AND d.dep_type IN ({})
930                 AND blocker.status IN ({})",
931                type_clause, state_clause
932            );
933
934            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
935            params_vec.push(Box::new(task_id.to_string()));
936            for state in &states_config.blocking_states {
937                params_vec.push(Box::new(state.clone()));
938            }
939            for t in &start_blocking_types {
940                params_vec.push(Box::new(t.to_string()));
941            }
942            let params_refs: Vec<&dyn rusqlite::ToSql> =
943                params_vec.iter().map(|b| b.as_ref()).collect();
944
945            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
946
947            Ok(count > 0)
948        })
949    }
950
951    /// Check if a task has incomplete children (blocking completion).
952    pub fn has_incomplete_children(
953        &self,
954        task_id: &str,
955        states_config: &StatesConfig,
956    ) -> Result<bool> {
957        self.with_conn(|conn| {
958            // Build IN clause from blocking_states
959            let state_placeholders: Vec<String> = states_config
960                .blocking_states
961                .iter()
962                .enumerate()
963                .map(|(i, _)| format!("?{}", i + 2))
964                .collect();
965            let state_clause = state_placeholders.join(", ");
966
967            let sql = format!(
968                "SELECT COUNT(*) FROM dependencies d
969                 INNER JOIN tasks child ON d.to_task_id = child.id
970                 WHERE d.from_task_id = ?1 
971                 AND d.dep_type = 'contains'
972                 AND child.status IN ({})",
973                state_clause
974            );
975
976            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
977            params_vec.push(Box::new(task_id.to_string()));
978            for state in &states_config.blocking_states {
979                params_vec.push(Box::new(state.clone()));
980            }
981            let params_refs: Vec<&dyn rusqlite::ToSql> =
982                params_vec.iter().map(|b| b.as_ref()).collect();
983
984            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
985
986            Ok(count > 0)
987        })
988    }
989
990    /// Get tasks with tag-based filtering using junction tables for indexed lookups.
991    /// - `tags_any`: Task must have at least one of these tags (OR)
992    /// - `tags_all`: Task must have all of these tags (AND)
993    /// - `qualified_for_agent_tags`: If provided, only return tasks where these tags satisfy the task's agent_tags_all/agent_tags_any
994    ///
995    /// Excludes soft-deleted tasks.
996    #[allow(clippy::too_many_arguments)]
997    pub fn list_tasks_with_tag_filters(
998        &self,
999        status: Option<Vec<String>>,
1000        owner: Option<&str>,
1001        parent_id: Option<Option<&str>>,
1002        tags_any: Option<Vec<String>>,
1003        tags_all: Option<Vec<String>>,
1004        qualified_for_agent_tags: Option<Vec<String>>,
1005        limit: Option<i32>,
1006        offset: i32,
1007        sort_by: Option<&str>,
1008        sort_order: Option<&str>,
1009    ) -> Result<Vec<Task>> {
1010        self.with_conn(|conn| {
1011            let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
1012            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1013            let mut param_idx = 1;
1014
1015            // Status filter (can be single or multiple)
1016            if let Some(ref statuses) = status {
1017                if statuses.len() == 1 {
1018                    sql.push_str(&format!(" AND t.status = ?{}", param_idx));
1019                    params_vec.push(Box::new(statuses[0].clone()));
1020                    param_idx += 1;
1021                } else if statuses.len() > 1 {
1022                    let placeholders: Vec<String> = statuses
1023                        .iter()
1024                        .enumerate()
1025                        .map(|(i, _)| format!("?{}", param_idx + i))
1026                        .collect();
1027                    sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
1028                    for s in statuses {
1029                        params_vec.push(Box::new(s.clone()));
1030                    }
1031                    param_idx += statuses.len();
1032                }
1033            }
1034
1035            // Owner filter
1036            if let Some(o) = owner {
1037                sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
1038                params_vec.push(Box::new(o.to_string()));
1039                param_idx += 1;
1040            }
1041
1042            // Parent filter via dependencies table
1043            if let Some(p) = parent_id {
1044                match p {
1045                    Some(pid) => {
1046                        sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
1047                        params_vec.push(Box::new(pid.to_string()));
1048                        param_idx += 1;
1049                    }
1050                    None => {
1051                        // Root tasks: not contained by any other task
1052                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1053                    }
1054                }
1055            }
1056
1057            // tags_any: Task must have at least one of these tags (OR) - uses task_tags junction
1058            if let Some(ref any_tags) = tags_any
1059                && !any_tags.is_empty() {
1060                    let placeholders: Vec<String> = any_tags
1061                        .iter()
1062                        .enumerate()
1063                        .map(|(i, _)| format!("?{}", param_idx + i))
1064                        .collect();
1065                    sql.push_str(&format!(
1066                        " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1067                        placeholders.join(", ")
1068                    ));
1069                    for tag in any_tags {
1070                        params_vec.push(Box::new(tag.clone()));
1071                    }
1072                    param_idx += any_tags.len();
1073                }
1074
1075            // tags_all: Task must have all of these tags (AND) - uses task_tags junction
1076            if let Some(ref all_tags) = tags_all
1077                && !all_tags.is_empty() {
1078                    let placeholders: Vec<String> = all_tags
1079                        .iter()
1080                        .enumerate()
1081                        .map(|(i, _)| format!("?{}", param_idx + i))
1082                        .collect();
1083                    // Count matching tags must equal total required tags
1084                    sql.push_str(&format!(
1085                        " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1086                        placeholders.join(", "),
1087                        all_tags.len()
1088                    ));
1089                    for tag in all_tags {
1090                        params_vec.push(Box::new(tag.clone()));
1091                    }
1092                    param_idx += all_tags.len();
1093                }
1094
1095            // qualified_for: Agent's tags must satisfy task's requirements - uses junction tables
1096            if let Some(ref agent_tags) = qualified_for_agent_tags {
1097                // Agent must have ALL of task's agent_tags_all
1098                if agent_tags.is_empty() {
1099                    // Agent has no tags - only match tasks with no needed_tags
1100                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1101                    // And no wanted_tags
1102                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1103                } else {
1104                    // For needed_tags (AND): task must have no needed_tags OR agent has all
1105                    let needed_placeholders: Vec<String> = agent_tags
1106                        .iter()
1107                        .enumerate()
1108                        .map(|(i, _)| format!("?{}", param_idx + i))
1109                        .collect();
1110                    sql.push_str(&format!(
1111                        " AND (
1112                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1113                            OR (
1114                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1115                            ) = (
1116                                SELECT COUNT(*) FROM task_needed_tags 
1117                                WHERE task_id = t.id AND tag IN ({})
1118                            )
1119                        )",
1120                        needed_placeholders.join(", ")
1121                    ));
1122                    for tag in agent_tags {
1123                        params_vec.push(Box::new(tag.clone()));
1124                    }
1125                    param_idx += agent_tags.len();
1126
1127                    // For wanted_tags (OR): task must have no wanted_tags OR agent has at least one
1128                    let wanted_placeholders: Vec<String> = agent_tags
1129                        .iter()
1130                        .enumerate()
1131                        .map(|(i, _)| format!("?{}", param_idx + i))
1132                        .collect();
1133                    sql.push_str(&format!(
1134                        " AND (
1135                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1136                            OR EXISTS (
1137                                SELECT 1 FROM task_wanted_tags 
1138                                WHERE task_id = t.id AND tag IN ({})
1139                            )
1140                        )",
1141                        wanted_placeholders.join(", ")
1142                    ));
1143                    for tag in agent_tags {
1144                        params_vec.push(Box::new(tag.clone()));
1145                    }
1146                    // param_idx += agent_tags.len(); // not needed, last param set
1147                }
1148            }
1149
1150            // Build ORDER BY clause
1151            let order_clause = build_order_clause(sort_by, sort_order);
1152            sql.push_str(&format!(" ORDER BY {}", order_clause));
1153
1154            // Apply limit in SQL
1155            if let Some(l) = limit {
1156                sql.push_str(&format!(" LIMIT {}", l));
1157            }
1158
1159            if offset > 0 {
1160                sql.push_str(&format!(" OFFSET {}", offset));
1161            }
1162
1163            let params_refs: Vec<&dyn rusqlite::ToSql> =
1164                params_vec.iter().map(|b| b.as_ref()).collect();
1165
1166            let mut stmt = conn.prepare(&sql)?;
1167            let tasks: Vec<Task> = stmt
1168                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1169                .filter_map(|r| r.ok())
1170                .collect();
1171
1172            Ok(tasks)
1173        })
1174    }
1175
1176    /// Get agent tags by agent ID.
1177    pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1178        self.with_conn(|conn| {
1179            let result: Result<String, rusqlite::Error> = conn.query_row(
1180                "SELECT tags FROM workers WHERE id = ?1",
1181                params![agent_id],
1182                |row| row.get(0),
1183            );
1184
1185            match result {
1186                Ok(tags_json) => {
1187                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1188                    Ok(tags)
1189                }
1190                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1191                Err(e) => Err(e.into()),
1192            }
1193        })
1194    }
1195
1196    /// Internal helper: add dependency within a transaction (used by tasks.rs).
1197    pub(super) fn add_dependency_internal(
1198        conn: &Connection,
1199        from_task_id: &str,
1200        to_task_id: &str,
1201        dep_type: &str,
1202    ) -> Result<()> {
1203        conn.execute(
1204            "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1205            params![from_task_id, to_task_id, dep_type],
1206        )?;
1207        Ok(())
1208    }
1209
1210    /// Atomically relink dependencies: unlink all prev_from→prev_to, then link all from→to.
1211    /// This is a transaction-safe operation for moving children between parents.
1212    /// Returns a result with unlinked and linked pairs.
1213    pub fn relink(
1214        &self,
1215        prev_from_ids: &[String],
1216        prev_to_ids: &[String],
1217        from_ids: &[String],
1218        to_ids: &[String],
1219        dep_type: &str,
1220        deps_config: &DependenciesConfig,
1221    ) -> Result<RelinkResult> {
1222        // Validate dependency type upfront
1223        if !deps_config.is_valid_dep_type(dep_type) {
1224            return Err(anyhow!(
1225                "Invalid dependency type '{}'. Valid types: {:?}",
1226                dep_type,
1227                deps_config.dep_type_names()
1228            ));
1229        }
1230
1231        let def = deps_config
1232            .get_definition(dep_type)
1233            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1234        let is_vertical = def.display == DependencyDisplay::Vertical;
1235
1236        self.with_conn_mut(|conn| {
1237            let tx = conn.transaction()?;
1238
1239            let mut unlinked = Vec::new();
1240            let mut linked = Vec::new();
1241            let mut errors = Vec::new();
1242
1243            // Phase 1: Unlink all prev_from × prev_to
1244            for prev_from in prev_from_ids {
1245                for prev_to in prev_to_ids {
1246                    let rows = tx.execute(
1247                        "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1248                        params![prev_from, prev_to, dep_type],
1249                    )?;
1250                    if rows > 0 {
1251                        unlinked.push((prev_from.clone(), prev_to.clone()));
1252                    }
1253                }
1254            }
1255
1256            // Phase 2: Link all from × to (with validation)
1257            for from_id in from_ids {
1258                for to_id in to_ids {
1259                    // For vertical deps, check single-parent constraint
1260                    if is_vertical {
1261                        let existing_parent: Option<String> = tx.query_row(
1262                            "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1263                            params![to_id],
1264                            |row| row.get(0),
1265                        ).optional()?;
1266
1267                        if let Some(ref parent) = existing_parent
1268                            && parent != from_id {
1269                                errors.push(format!(
1270                                    "Task {} already has parent {}",
1271                                    to_id, parent
1272                                ));
1273                                continue;
1274                            }
1275                    }
1276
1277                    // Check for cycles using temporary view within transaction
1278                    if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1279                        errors.push(format!(
1280                            "Adding dependency {}→{} would create a cycle",
1281                            from_id, to_id
1282                        ));
1283                        continue;
1284                    }
1285
1286                    tx.execute(
1287                        "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1288                        params![from_id, to_id, dep_type],
1289                    )?;
1290                    linked.push((from_id.clone(), to_id.clone()));
1291                }
1292            }
1293
1294            if !errors.is_empty() {
1295                // Rollback on validation errors
1296                tx.rollback()?;
1297                return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1298            }
1299
1300            tx.commit()?;
1301            Ok(RelinkResult { unlinked, linked })
1302        })
1303    }
1304
1305    // ============================================================================
1306    // Graph Traversal Methods for scan tool
1307    // ============================================================================
1308
1309    /// Get predecessors (tasks that block this task) via blocks/follows dependencies.
1310    /// depth: 0 = none, N = N levels, -1 = all
1311    pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1312        if depth == 0 {
1313            return Ok(vec![]);
1314        }
1315
1316        self.with_conn(|conn| {
1317            let mut visited: HashSet<String> = HashSet::new();
1318            let mut result: Vec<Task> = Vec::new();
1319            let mut current_level: Vec<String> = vec![task_id.to_string()];
1320            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1321
1322            while !current_level.is_empty() && levels_remaining > 0 {
1323                let mut next_level: Vec<String> = Vec::new();
1324
1325                for tid in &current_level {
1326                    // Get tasks that block this one (from_task_id blocks to_task_id)
1327                    let mut stmt = conn.prepare(
1328                        "SELECT DISTINCT d.from_task_id FROM dependencies d
1329                         WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1330                    )?;
1331
1332                    let predecessors: Vec<String> = stmt
1333                        .query_map(params![tid], |row| row.get(0))?
1334                        .filter_map(|r| r.ok())
1335                        .collect();
1336
1337                    for pred_id in predecessors {
1338                        if !visited.contains(&pred_id) {
1339                            visited.insert(pred_id.clone());
1340                            if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1341                                result.push(task);
1342                            }
1343                            next_level.push(pred_id);
1344                        }
1345                    }
1346                }
1347
1348                current_level = next_level;
1349                levels_remaining -= 1;
1350            }
1351
1352            Ok(result)
1353        })
1354    }
1355
1356    /// Get successors (tasks that this task blocks) via blocks/follows dependencies.
1357    /// depth: 0 = none, N = N levels, -1 = all
1358    pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1359        if depth == 0 {
1360            return Ok(vec![]);
1361        }
1362
1363        self.with_conn(|conn| {
1364            let mut visited: HashSet<String> = HashSet::new();
1365            let mut result: Vec<Task> = Vec::new();
1366            let mut current_level: Vec<String> = vec![task_id.to_string()];
1367            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1368
1369            while !current_level.is_empty() && levels_remaining > 0 {
1370                let mut next_level: Vec<String> = Vec::new();
1371
1372                for tid in &current_level {
1373                    // Get tasks that this one blocks (from_task_id blocks to_task_id)
1374                    let mut stmt = conn.prepare(
1375                        "SELECT DISTINCT d.to_task_id FROM dependencies d
1376                         WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1377                    )?;
1378
1379                    let successors: Vec<String> = stmt
1380                        .query_map(params![tid], |row| row.get(0))?
1381                        .filter_map(|r| r.ok())
1382                        .collect();
1383
1384                    for succ_id in successors {
1385                        if !visited.contains(&succ_id) {
1386                            visited.insert(succ_id.clone());
1387                            if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1388                                result.push(task);
1389                            }
1390                            next_level.push(succ_id);
1391                        }
1392                    }
1393                }
1394
1395                current_level = next_level;
1396                levels_remaining -= 1;
1397            }
1398
1399            Ok(result)
1400        })
1401    }
1402
1403    /// Get ancestors (parent chain) via contains dependency.
1404    /// depth: 0 = none, N = N levels up, -1 = all
1405    pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1406        if depth == 0 {
1407            return Ok(vec![]);
1408        }
1409
1410        self.with_conn(|conn| {
1411            let mut result: Vec<Task> = Vec::new();
1412            let mut current_id = task_id.to_string();
1413            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1414
1415            while levels_remaining > 0 {
1416                // Get parent (from_task_id contains to_task_id)
1417                let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1418                    "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1419                    params![&current_id],
1420                    |row| row.get(0),
1421                );
1422
1423                match parent_result {
1424                    Ok(parent_id) => {
1425                        if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1426                            result.push(task);
1427                        }
1428                        current_id = parent_id;
1429                        levels_remaining -= 1;
1430                    }
1431                    Err(rusqlite::Error::QueryReturnedNoRows) => break,
1432                    Err(e) => return Err(e.into()),
1433                }
1434            }
1435
1436            Ok(result)
1437        })
1438    }
1439
1440    /// Get descendants (children tree) via contains dependency.
1441    /// depth: 0 = none, N = N levels down, -1 = all
1442    pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1443        if depth == 0 {
1444            return Ok(vec![]);
1445        }
1446
1447        self.with_conn(|conn| {
1448            let mut visited: HashSet<String> = HashSet::new();
1449            let mut result: Vec<Task> = Vec::new();
1450            let mut current_level: Vec<String> = vec![task_id.to_string()];
1451            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1452
1453            while !current_level.is_empty() && levels_remaining > 0 {
1454                let mut next_level: Vec<String> = Vec::new();
1455
1456                for tid in &current_level {
1457                    // Get children (from_task_id contains to_task_id)
1458                    let mut stmt = conn.prepare(
1459                        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1460                    )?;
1461
1462                    let children: Vec<String> = stmt
1463                        .query_map(params![tid], |row| row.get(0))?
1464                        .filter_map(|r| r.ok())
1465                        .collect();
1466
1467                    for child_id in children {
1468                        if !visited.contains(&child_id) {
1469                            visited.insert(child_id.clone());
1470                            if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1471                                result.push(task);
1472                            }
1473                            next_level.push(child_id);
1474                        }
1475                    }
1476                }
1477
1478                current_level = next_level;
1479                levels_remaining -= 1;
1480            }
1481
1482            Ok(result)
1483        })
1484    }
1485}
1486
1487/// Helper to get a task by ID within a connection context.
1488fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1489    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1490    let task = stmt
1491        .query_row(params![task_id], super::tasks::parse_task_row)
1492        .optional()?;
1493    Ok(task)
1494}
1495
1496/// Get the IDs of tasks that block a given task from starting (unsatisfied dependencies).
1497/// A task blocks starting if it has a start-blocking dependency type and is in a blocking state.
1498/// This is a transaction-safe version for use within existing transactions.
1499pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1500    conn: &Connection,
1501    task_id: &str,
1502    states_config: &StatesConfig,
1503    deps_config: &DependenciesConfig,
1504) -> Result<Vec<String>> {
1505    let start_blocking_types = deps_config.start_blocking_types();
1506    if start_blocking_types.is_empty() {
1507        return Ok(vec![]);
1508    }
1509
1510    // Build IN clause from blocking_states
1511    let state_placeholders: Vec<String> = states_config
1512        .blocking_states
1513        .iter()
1514        .enumerate()
1515        .map(|(i, _)| format!("?{}", i + 2))
1516        .collect();
1517    let state_clause = state_placeholders.join(", ");
1518
1519    // Build IN clause from types
1520    let type_start = states_config.blocking_states.len() + 2;
1521    let type_placeholders: Vec<String> = start_blocking_types
1522        .iter()
1523        .enumerate()
1524        .map(|(i, _)| format!("?{}", type_start + i))
1525        .collect();
1526    let type_clause = type_placeholders.join(", ");
1527
1528    let sql = format!(
1529        "SELECT blocker.id FROM dependencies d
1530         INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1531         WHERE d.to_task_id = ?1 
1532         AND d.dep_type IN ({})
1533         AND blocker.status IN ({})",
1534        type_clause, state_clause
1535    );
1536
1537    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1538    params_vec.push(Box::new(task_id.to_string()));
1539    for state in &states_config.blocking_states {
1540        params_vec.push(Box::new(state.clone()));
1541    }
1542    for t in &start_blocking_types {
1543        params_vec.push(Box::new(t.to_string()));
1544    }
1545    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1546
1547    let mut stmt = conn.prepare(&sql)?;
1548    let blockers = stmt
1549        .query_map(params_refs.as_slice(), |row| {
1550            let id: String = row.get(0)?;
1551            Ok(id)
1552        })?
1553        .filter_map(|r| r.ok())
1554        .collect();
1555
1556    Ok(blockers)
1557}
1558
1559/// Propagate unblock effects when a task transitions out of a blocking state.
1560/// This is called after a task completes to find newly unblocked tasks and
1561/// optionally auto-advance them.
1562///
1563/// Returns (unblocked, auto_advanced):
1564/// - unblocked: All task IDs that are now ready (all dependencies satisfied)
1565/// - auto_advanced: Subset of unblocked that were actually transitioned (when enabled)
1566///
1567/// Algorithm:
1568/// 1. Find all tasks that have a start-blocking dependency on the completed task
1569/// 2. For each candidate:
1570///    - Skip if not in initial state or already claimed
1571///    - Check if ALL other start-blockers are also satisfied
1572///    - If fully unblocked → add to unblocked list
1573///    - If auto_advance enabled → also transition to target_state
1574/// 3. Return both lists
1575pub(crate) fn propagate_unblock_effects(
1576    conn: &Connection,
1577    completed_task_id: &str,
1578    agent_id: Option<&str>,
1579    states_config: &StatesConfig,
1580    deps_config: &DependenciesConfig,
1581    auto_advance: &AutoAdvanceConfig,
1582) -> Result<(Vec<String>, Vec<String>)> {
1583    // Get start-blocking dependency types
1584    let start_blocking_types = deps_config.start_blocking_types();
1585    if start_blocking_types.is_empty() {
1586        return Ok((vec![], vec![]));
1587    }
1588
1589    // Find all tasks that depend on the completed task via start-blocking dependencies
1590    let type_placeholders: Vec<String> = start_blocking_types
1591        .iter()
1592        .enumerate()
1593        .map(|(i, _)| format!("?{}", i + 2))
1594        .collect();
1595    let type_clause = type_placeholders.join(", ");
1596
1597    let sql = format!(
1598        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1599        type_clause
1600    );
1601
1602    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1603    params_vec.push(Box::new(completed_task_id.to_string()));
1604    for t in &start_blocking_types {
1605        params_vec.push(Box::new(t.to_string()));
1606    }
1607    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1608
1609    let mut stmt = conn.prepare(&sql)?;
1610    let dependent_task_ids: Vec<String> = stmt
1611        .query_map(params_refs.as_slice(), |row| row.get(0))?
1612        .filter_map(|r| r.ok())
1613        .collect();
1614
1615    let mut unblocked = Vec::new();
1616    let mut auto_advanced = Vec::new();
1617    let now = super::now_ms();
1618
1619    // Determine if we should auto-advance
1620    let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1621    let target_state = auto_advance.target_state.clone();
1622
1623    // Validate target state if auto-advance is enabled
1624    if should_auto_advance {
1625        let ts = target_state.as_ref().unwrap();
1626        if !states_config.is_valid_state(ts) {
1627            return Err(anyhow!(
1628                "Auto-advance target state '{}' is not a valid state",
1629                ts
1630            ));
1631        }
1632    }
1633
1634    for task_id in dependent_task_ids {
1635        // Get the task
1636        let task = match get_task_by_id_internal(conn, &task_id)? {
1637            Some(t) => t,
1638            None => continue,
1639        };
1640
1641        // Skip if not in initial state
1642        if task.status != states_config.initial {
1643            continue;
1644        }
1645
1646        // Skip if task is already claimed
1647        if task.worker_id.is_some() {
1648            continue;
1649        }
1650
1651        // Check if ALL start-blockers are now satisfied (not in blocking states)
1652        // Build query to count remaining blockers that are still blocking
1653        let state_placeholders: Vec<String> = states_config
1654            .blocking_states
1655            .iter()
1656            .enumerate()
1657            .map(|(i, _)| format!("?{}", i + 3))
1658            .collect();
1659        let state_clause = state_placeholders.join(", ");
1660
1661        // Reuse type_placeholders from above
1662        let type_start = states_config.blocking_states.len() + 3;
1663        let type_placeholders2: Vec<String> = start_blocking_types
1664            .iter()
1665            .enumerate()
1666            .map(|(i, _)| format!("?{}", type_start + i))
1667            .collect();
1668        let type_clause2 = type_placeholders2.join(", ");
1669
1670        let blocker_sql = format!(
1671            "SELECT COUNT(*) FROM dependencies d
1672             INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1673             WHERE d.to_task_id = ?1
1674             AND d.from_task_id != ?2
1675             AND d.dep_type IN ({})
1676             AND blocker.status IN ({})",
1677            type_clause2, state_clause
1678        );
1679
1680        let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1681        blocker_params.push(Box::new(task_id.clone()));
1682        blocker_params.push(Box::new(completed_task_id.to_string()));
1683        for state in &states_config.blocking_states {
1684            blocker_params.push(Box::new(state.clone()));
1685        }
1686        for t in &start_blocking_types {
1687            blocker_params.push(Box::new(t.to_string()));
1688        }
1689        let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1690            blocker_params.iter().map(|b| b.as_ref()).collect();
1691
1692        let remaining_blockers: i32 =
1693            conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1694
1695        if remaining_blockers > 0 {
1696            continue; // Still blocked by other tasks
1697        }
1698
1699        // Task is now fully unblocked - add to unblocked list
1700        unblocked.push(task_id.clone());
1701
1702        // Auto-advance if enabled and transition is valid
1703        if should_auto_advance {
1704            let ts = target_state.as_ref().unwrap();
1705
1706            // Validate transition from initial to target_state
1707            if !states_config.is_valid_transition(&states_config.initial, ts) {
1708                // Skip auto-advance for this task - transition not allowed
1709                continue;
1710            }
1711
1712            // Auto-advance: update the task's status
1713            conn.execute(
1714                "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1715                params![ts, now, &task_id],
1716            )?;
1717
1718            // Record state transition
1719            let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1720            super::state_transitions::record_state_transition(
1721                conn,
1722                &task_id,
1723                ts,
1724                agent_id,
1725                Some(&reason),
1726                states_config,
1727            )?;
1728
1729            auto_advanced.push(task_id);
1730        }
1731    }
1732
1733    Ok((unblocked, auto_advanced))
1734}