Skip to main content

task_graph_mcp/db/
deps.rs

1//! Dependency operations and cycle detection with typed dependencies.
2
3use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10/// Result of a relink operation.
11#[derive(Debug)]
12pub struct RelinkResult {
13    /// Dependencies that were unlinked (from, to).
14    pub unlinked: Vec<(String, String)>,
15    /// Dependencies that were linked (from, to).
16    pub linked: Vec<(String, String)>,
17}
18
19/// Check if adding a dependency would create a cycle (transaction-safe version).
20fn would_create_cycle_in_tx(
21    tx: &rusqlite::Transaction,
22    from_task_id: &str,
23    to_task_id: &str,
24    dep_type: &str,
25    deps_config: &DependenciesConfig,
26) -> Result<bool> {
27    let def = deps_config
28        .get_definition(dep_type)
29        .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31    // A cycle would occur if to_task can already reach from_task
32    // through the same "graph" (horizontal or vertical)
33    let mut visited: HashSet<String> = HashSet::new();
34    let mut queue: VecDeque<String> = VecDeque::new();
35    queue.push_back(to_task_id.to_string());
36
37    while let Some(current) = queue.pop_front() {
38        if current == from_task_id {
39            return Ok(true); // Would create a cycle
40        }
41
42        if visited.contains(&current) {
43            continue;
44        }
45        visited.insert(current.clone());
46
47        // Get all tasks that current points to (in the relevant graph)
48        let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49            // For vertical deps, only check containment relationships
50            let mut stmt = tx.prepare(
51                "SELECT to_task_id FROM dependencies d
52                 JOIN (SELECT value FROM json_each(?1)) types
53                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54            )?;
55            let vertical_types: Vec<&str> = deps_config.vertical_types();
56            let types_json = serde_json::to_string(&vertical_types)?;
57            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
58                .filter_map(|r| r.ok())
59                .collect()
60        } else {
61            // For horizontal deps, check all start-blocking relationships
62            let mut stmt = tx.prepare(
63                "SELECT to_task_id FROM dependencies d
64                 JOIN (SELECT value FROM json_each(?1)) types
65                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66            )?;
67            let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68            let types_json = serde_json::to_string(&start_blocking)?;
69            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
70                .filter_map(|r| r.ok())
71                .collect()
72        };
73
74        for dep in deps {
75            if !visited.contains(&dep) {
76                queue.push_back(dep);
77            }
78        }
79    }
80
81    Ok(false)
82}
83
84/// Build an ORDER BY clause from sort_by and sort_order parameters.
85/// Returns a safe SQL ORDER BY expression.
86fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87    let field = match sort_by {
88        Some("priority") => "CAST(t.priority AS INTEGER)",
89        Some("created_at") => "t.created_at",
90        Some("updated_at") => "t.updated_at",
91        _ => "t.created_at", // default
92    };
93
94    let order = match sort_order {
95        Some("asc") => "ASC",
96        Some("desc") => "DESC",
97        _ => {
98            // Default: priority is descending (higher number = more important), dates are descending
99            "DESC"
100        }
101    };
102
103    format!("{} {}", field, order)
104}
105
106/// Result of attempting to add a dependency.
107#[derive(Debug)]
108pub enum AddDependencyResult {
109    /// Dependency was created successfully.
110    Created,
111    /// Dependency already existed (no change).
112    AlreadyExists,
113    /// Source task does not exist.
114    FromTaskNotFound,
115    /// Target task does not exist.
116    ToTaskNotFound,
117}
118
119impl Database {
120    /// Check if a task exists by ID.
121    pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122        self.with_conn(|conn| {
123            let count: i64 = conn.query_row(
124                "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125                params![task_id],
126                |row| row.get(0),
127            )?;
128            Ok(count > 0)
129        })
130    }
131
132    /// Add a typed dependency (from blocks/contains to).
133    /// Returns Ok(()) on success. For soft linking with warnings, use add_dependency_soft.
134    pub fn add_dependency(
135        &self,
136        from_task_id: &str,
137        to_task_id: &str,
138        dep_type: &str,
139        deps_config: &DependenciesConfig,
140    ) -> Result<()> {
141        match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142            AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143            AddDependencyResult::FromTaskNotFound => {
144                Err(anyhow!("Source task '{}' not found", from_task_id))
145            }
146            AddDependencyResult::ToTaskNotFound => {
147                Err(anyhow!("Target task '{}' not found", to_task_id))
148            }
149        }
150    }
151
152    /// Add a typed dependency with soft failure (returns result status instead of error).
153    /// Use this when you want to handle missing tasks as warnings.
154    pub fn add_dependency_soft(
155        &self,
156        from_task_id: &str,
157        to_task_id: &str,
158        dep_type: &str,
159        deps_config: &DependenciesConfig,
160    ) -> Result<AddDependencyResult> {
161        // Validate dependency type
162        if !deps_config.is_valid_dep_type(dep_type) {
163            return Err(anyhow!(
164                "Invalid dependency type '{}'. Valid types: {:?}",
165                dep_type,
166                deps_config.dep_type_names()
167            ));
168        }
169
170        // Check if tasks exist first
171        if !self.task_exists(from_task_id)? {
172            return Ok(AddDependencyResult::FromTaskNotFound);
173        }
174        if !self.task_exists(to_task_id)? {
175            return Ok(AddDependencyResult::ToTaskNotFound);
176        }
177
178        // For vertical (contains) dependencies, check single-parent constraint
179        let def = deps_config
180            .get_definition(dep_type)
181            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182        if def.display == DependencyDisplay::Vertical
183            && let Some(existing_parent) = self.get_parent(to_task_id)?
184            && existing_parent != from_task_id
185        {
186            return Err(anyhow!(
187                "Task {} already has parent {}",
188                to_task_id,
189                existing_parent
190            ));
191        }
192
193        // Check for cycle in the appropriate graph
194        if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195            return Err(anyhow!("Adding this dependency would create a cycle"));
196        }
197
198        self.with_conn(|conn| {
199            let changes = conn.execute(
200                "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201                params![from_task_id, to_task_id, dep_type],
202            )?;
203            if changes == 0 {
204                Ok(AddDependencyResult::AlreadyExists)
205            } else {
206                Ok(AddDependencyResult::Created)
207            }
208        })
209    }
210
211    /// Check if adding a dependency would create a cycle.
212    /// For horizontal deps: check cycle in the start-blocking graph.
213    /// For vertical deps: check containment cycle.
214    pub fn would_create_cycle(
215        &self,
216        from_task_id: &str,
217        to_task_id: &str,
218        dep_type: &str,
219        deps_config: &DependenciesConfig,
220    ) -> Result<bool> {
221        let def = deps_config
222            .get_definition(dep_type)
223            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225        self.with_conn(|conn| {
226            // A cycle would occur if to_task can already reach from_task
227            // through the same "graph" (horizontal or vertical)
228            let mut visited: HashSet<String> = HashSet::new();
229            let mut queue: VecDeque<String> = VecDeque::new();
230            queue.push_back(to_task_id.to_string());
231
232            while let Some(current) = queue.pop_front() {
233                if current == from_task_id {
234                    return Ok(true); // Would create a cycle
235                }
236
237                if visited.contains(&current) {
238                    continue;
239                }
240                visited.insert(current.clone());
241
242                // Get all tasks that current points to (in the relevant graph)
243                let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244                    // For vertical deps, only check containment relationships
245                    let mut stmt = conn.prepare(
246                        "SELECT to_task_id FROM dependencies d
247                         JOIN (SELECT value FROM json_each(?1)) types
248                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249                    )?;
250                    let vertical_types: Vec<&str> = deps_config.vertical_types();
251                    let types_json = serde_json::to_string(&vertical_types)?;
252                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
253                        .filter_map(|r| r.ok())
254                        .collect()
255                } else {
256                    // For horizontal deps, check all start-blocking relationships
257                    let mut stmt = conn.prepare(
258                        "SELECT to_task_id FROM dependencies d
259                         JOIN (SELECT value FROM json_each(?1)) types
260                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261                    )?;
262                    let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263                    let types_json = serde_json::to_string(&start_blocking)?;
264                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
265                        .filter_map(|r| r.ok())
266                        .collect()
267                };
268
269                for dep in deps {
270                    if !visited.contains(&dep) {
271                        queue.push_back(dep);
272                    }
273                }
274            }
275
276            Ok(false)
277        })
278    }
279
280    /// Remove a typed dependency. Returns true if a row was deleted.
281    pub fn remove_dependency(
282        &self,
283        from_task_id: &str,
284        to_task_id: &str,
285        dep_type: &str,
286    ) -> Result<bool> {
287        self.with_conn(|conn| {
288            let rows = conn.execute(
289                "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290                params![from_task_id, to_task_id, dep_type],
291            )?;
292            Ok(rows > 0)
293        })
294    }
295
296    /// Remove all dependencies of a given type from a task (outgoing edges).
297    /// Returns the list of removed dependencies.
298    pub fn remove_all_outgoing_dependencies(
299        &self,
300        from_task_id: &str,
301        dep_type: &str,
302    ) -> Result<Vec<Dependency>> {
303        self.with_conn_mut(|conn| {
304            let tx = conn.transaction()?;
305
306            // First get the dependencies that will be removed
307            let deps: Vec<Dependency> = {
308                let mut stmt = tx.prepare(
309                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310                )?;
311                stmt
312                    .query_map(params![from_task_id, dep_type], |row| {
313                        Ok(Dependency {
314                            from_task_id: row.get(0)?,
315                            to_task_id: row.get(1)?,
316                            dep_type: row.get(2)?,
317                        })
318                    })?
319                    .filter_map(|r| r.ok())
320                    .collect()
321            };
322
323            // Then delete them
324            tx.execute(
325                "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326                params![from_task_id, dep_type],
327            )?;
328
329            tx.commit()?;
330            Ok(deps)
331        })
332    }
333
334    /// Remove all dependencies of a given type to a task (incoming edges).
335    /// Returns the list of removed dependencies.
336    pub fn remove_all_incoming_dependencies(
337        &self,
338        to_task_id: &str,
339        dep_type: &str,
340    ) -> Result<Vec<Dependency>> {
341        self.with_conn_mut(|conn| {
342            let tx = conn.transaction()?;
343
344            // First get the dependencies that will be removed
345            let deps: Vec<Dependency> = {
346                let mut stmt = tx.prepare(
347                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348                )?;
349                stmt
350                    .query_map(params![to_task_id, dep_type], |row| {
351                        Ok(Dependency {
352                            from_task_id: row.get(0)?,
353                            to_task_id: row.get(1)?,
354                            dep_type: row.get(2)?,
355                        })
356                    })?
357                    .filter_map(|r| r.ok())
358                    .collect()
359            };
360
361            // Then delete them
362            tx.execute(
363                "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364                params![to_task_id, dep_type],
365            )?;
366
367            tx.commit()?;
368            Ok(deps)
369        })
370    }
371
372    /// Get all dependencies.
373    pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374        self.with_conn(|conn| {
375            let mut stmt =
376                conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378            let deps = stmt
379                .query_map([], |row| {
380                    let from: String = row.get(0)?;
381                    let to: String = row.get(1)?;
382                    let dep_type: String = row.get(2)?;
383                    Ok(Dependency {
384                        from_task_id: from,
385                        to_task_id: to,
386                        dep_type,
387                    })
388                })?
389                .filter_map(|r| r.ok())
390                .collect();
391
392            Ok(deps)
393        })
394    }
395
396    /// Get dependencies of a specific type for a task.
397    pub fn get_dependencies_by_type(
398        &self,
399        task_id: &str,
400        dep_type: &str,
401        direction: &str,
402    ) -> Result<Vec<Dependency>> {
403        self.with_conn(|conn| {
404            let sql = if direction == "incoming" {
405                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406            } else {
407                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408            };
409
410            let mut stmt = conn.prepare(sql)?;
411
412            let deps = stmt
413                .query_map(params![task_id, dep_type], |row| {
414                    let from: String = row.get(0)?;
415                    let to: String = row.get(1)?;
416                    let dep_type: String = row.get(2)?;
417                    Ok(Dependency {
418                        from_task_id: from,
419                        to_task_id: to,
420                        dep_type,
421                    })
422                })?
423                .filter_map(|r| r.ok())
424                .collect();
425
426            Ok(deps)
427        })
428    }
429
430    /// Get tasks that block a given task from starting (dep_type with blocks: start).
431    pub fn get_start_blockers(
432        &self,
433        task_id: &str,
434        deps_config: &DependenciesConfig,
435    ) -> Result<Vec<String>> {
436        let start_blocking_types = deps_config.start_blocking_types();
437        if start_blocking_types.is_empty() {
438            return Ok(vec![]);
439        }
440
441        self.with_conn(|conn| {
442            let placeholders: String = start_blocking_types
443                .iter()
444                .enumerate()
445                .map(|(i, _)| format!("?{}", i + 2))
446                .collect::<Vec<_>>()
447                .join(", ");
448
449            let sql = format!(
450                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451                placeholders
452            );
453
454            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455            params_vec.push(Box::new(task_id.to_string()));
456            for t in &start_blocking_types {
457                params_vec.push(Box::new(t.to_string()));
458            }
459            let params_refs: Vec<&dyn rusqlite::ToSql> =
460                params_vec.iter().map(|b| b.as_ref()).collect();
461
462            let mut stmt = conn.prepare(&sql)?;
463            let blockers = stmt
464                .query_map(params_refs.as_slice(), |row| {
465                    let id: String = row.get(0)?;
466                    Ok(id)
467                })?
468                .filter_map(|r| r.ok())
469                .collect();
470
471            Ok(blockers)
472        })
473    }
474
475    /// Get tasks that block a given task from completing (dep_type with blocks: completion).
476    /// For a parent task, this returns children that must complete first.
477    pub fn get_completion_blockers(
478        &self,
479        task_id: &str,
480        deps_config: &DependenciesConfig,
481    ) -> Result<Vec<String>> {
482        let completion_blocking_types = deps_config.completion_blocking_types();
483        if completion_blocking_types.is_empty() {
484            return Ok(vec![]);
485        }
486
487        self.with_conn(|conn| {
488            let placeholders: String = completion_blocking_types
489                .iter()
490                .enumerate()
491                .map(|(i, _)| format!("?{}", i + 2))
492                .collect::<Vec<_>>()
493                .join(", ");
494
495            // For completion blockers, we look at outgoing edges (from_task_id = parent)
496            // because "contains" means parent -> child, and child blocks parent completion
497            let sql = format!(
498                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499                placeholders
500            );
501
502            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503            params_vec.push(Box::new(task_id.to_string()));
504            for t in &completion_blocking_types {
505                params_vec.push(Box::new(t.to_string()));
506            }
507            let params_refs: Vec<&dyn rusqlite::ToSql> =
508                params_vec.iter().map(|b| b.as_ref()).collect();
509
510            let mut stmt = conn.prepare(&sql)?;
511            let blockers = stmt
512                .query_map(params_refs.as_slice(), |row| {
513                    let id: String = row.get(0)?;
514                    Ok(id)
515                })?
516                .filter_map(|r| r.ok())
517                .collect();
518
519            Ok(blockers)
520        })
521    }
522
523    /// Get the parent of a task (via 'contains' dependency).
524    pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525        self.with_conn(|conn| {
526            let result: Result<String, rusqlite::Error> = conn.query_row(
527                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528                params![task_id],
529                |row| row.get(0),
530            );
531
532            match result {
533                Ok(parent_id) => Ok(Some(parent_id)),
534                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535                Err(e) => Err(e.into()),
536            }
537        })
538    }
539
540    /// Get children of a task (via 'contains' dependency).
541    pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542        self.with_conn(|conn| {
543            let mut stmt = conn.prepare(
544                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545            )?;
546
547            let children = stmt
548                .query_map(params![task_id], |row| {
549                    let id: String = row.get(0)?;
550                    Ok(id)
551                })?
552                .filter_map(|r| r.ok())
553                .collect();
554
555            Ok(children)
556        })
557    }
558
559    /// Get all tasks that block a given task (backwards compatible).
560    /// Returns tasks from both 'blocks' and 'follows' dependencies.
561    pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562        self.with_conn(|conn| {
563            let mut stmt = conn.prepare(
564                "SELECT from_task_id FROM dependencies 
565                 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566            )?;
567
568            let blockers = stmt
569                .query_map(params![task_id], |row| {
570                    let id: String = row.get(0)?;
571                    Ok(id)
572                })?
573                .filter_map(|r| r.ok())
574                .collect();
575
576            Ok(blockers)
577        })
578    }
579
580    /// Get tasks that a given task blocks.
581    #[allow(dead_code)]
582    pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
583        self.with_conn(|conn| {
584            let mut stmt = conn.prepare(
585                "SELECT to_task_id FROM dependencies 
586                 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
587            )?;
588
589            let blocking = stmt
590                .query_map(params![task_id], |row| {
591                    let id: String = row.get(0)?;
592                    Ok(id)
593                })?
594                .filter_map(|r| r.ok())
595                .collect();
596
597            Ok(blocking)
598        })
599    }
600
601    /// Get tasks that are blocked by incomplete start dependencies.
602    /// A task is blocked if any of its start-blocking dependencies are in a blocking state.
603    /// Excludes soft-deleted tasks.
604    pub fn get_blocked_tasks(
605        &self,
606        states_config: &StatesConfig,
607        deps_config: &DependenciesConfig,
608        sort_by: Option<&str>,
609        sort_order: Option<&str>,
610    ) -> Result<Vec<Task>> {
611        let start_blocking_types = deps_config.start_blocking_types();
612        if start_blocking_types.is_empty() {
613            return Ok(vec![]);
614        }
615
616        self.with_conn(|conn| {
617            // Build IN clause from blocking_states
618            let state_placeholders: Vec<String> = states_config
619                .blocking_states
620                .iter()
621                .enumerate()
622                .map(|(i, _)| format!("?{}", i + 2))
623                .collect();
624            let state_clause = state_placeholders.join(", ");
625
626            // Build IN clause from start_blocking_types
627            let type_start = states_config.blocking_states.len() + 2;
628            let type_placeholders: Vec<String> = start_blocking_types
629                .iter()
630                .enumerate()
631                .map(|(i, _)| format!("?{}", type_start + i))
632                .collect();
633            let type_clause = type_placeholders.join(", ");
634
635            // Build ORDER BY clause
636            let order_clause = build_order_clause(sort_by, sort_order);
637
638            let sql = format!(
639                "SELECT DISTINCT t.*
640                 FROM tasks t
641                 INNER JOIN dependencies d ON t.id = d.to_task_id
642                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
643                 WHERE d.dep_type IN ({})
644                 AND blocker.status IN ({})
645                 AND t.status = ?1
646                 AND t.deleted_at IS NULL
647                 ORDER BY {}",
648                type_clause, state_clause, order_clause
649            );
650
651            let mut stmt = conn.prepare(&sql)?;
652
653            // Build params: initial state + blocking states + start_blocking_types
654            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
655            params_vec.push(Box::new(states_config.initial.clone()));
656            for state in &states_config.blocking_states {
657                params_vec.push(Box::new(state.clone()));
658            }
659            for t in &start_blocking_types {
660                params_vec.push(Box::new(t.to_string()));
661            }
662            let params_refs: Vec<&dyn rusqlite::ToSql> =
663                params_vec.iter().map(|b| b.as_ref()).collect();
664
665            let tasks = stmt
666                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
667                .filter_map(|r| r.ok())
668                .collect();
669
670            Ok(tasks)
671        })
672    }
673
674    /// Get tasks that are ready to be claimed (all start dependencies satisfied).
675    /// A task is ready if it's in the initial state, unclaimed, and all start-blocking deps are not blocking.
676    /// When agent_id is provided, also filters by agent's tag qualifications using junction tables.
677    /// Excludes soft-deleted tasks.
678    pub fn get_ready_tasks(
679        &self,
680        agent_id: Option<&str>,
681        states_config: &StatesConfig,
682        deps_config: &DependenciesConfig,
683        sort_by: Option<&str>,
684        sort_order: Option<&str>,
685    ) -> Result<Vec<Task>> {
686        let start_blocking_types = deps_config.start_blocking_types();
687
688        // Get agent tags if agent_id is provided (for junction table filtering)
689        let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
690            Some(self.get_agent_tags(aid)?)
691        } else {
692            None
693        };
694
695        self.with_conn(|conn| {
696            // Build IN clause from blocking_states
697            let state_placeholders: Vec<String> = states_config
698                .blocking_states
699                .iter()
700                .enumerate()
701                .map(|(i, _)| format!("?{}", i + 2))
702                .collect();
703            let state_clause = state_placeholders.join(", ");
704
705            // Build IN clause from start_blocking_types
706            let type_start = states_config.blocking_states.len() + 2;
707            let type_placeholders: Vec<String> = start_blocking_types
708                .iter()
709                .enumerate()
710                .map(|(i, _)| format!("?{}", type_start + i))
711                .collect();
712            let type_clause = type_placeholders.join(", ");
713
714            // Build ORDER BY clause - for ready tasks, default is priority then created_at
715            let order_clause = if sort_by.is_some() {
716                build_order_clause(sort_by, sort_order)
717            } else {
718                // Default for ready: priority (high first), then created_at
719                "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
720            };
721
722            // Track param index for agent tag filters
723            let mut param_idx = type_start + start_blocking_types.len();
724
725            // Build agent qualification filters using junction tables
726            let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
727                // For agent_tags_all (AND): agent must have ALL needed tags
728                // Count how many of the task's needed_tags match agent's tags
729                // Either the task has no needed_tags, or all must match
730                let needed_placeholders: Vec<String> = tags
731                    .iter()
732                    .enumerate()
733                    .map(|(i, _)| format!("?{}", param_idx + i))
734                    .collect();
735                param_idx += tags.len();
736
737                let needed_clause = if needed_placeholders.is_empty() {
738                    // Agent has no tags - only match tasks with no needed_tags
739                    "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
740                        .to_string()
741                } else {
742                    // Task must have no needed_tags OR agent must have all of them
743                    format!(
744                        "AND (
745                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
746                            OR (
747                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
748                            ) = (
749                                SELECT COUNT(*) FROM task_needed_tags 
750                                WHERE task_id = t.id AND tag IN ({})
751                            )
752                        )",
753                        needed_placeholders.join(", ")
754                    )
755                };
756
757                // For agent_tags_any (OR): agent must have at least ONE wanted tag
758                let wanted_placeholders: Vec<String> = tags
759                    .iter()
760                    .enumerate()
761                    .map(|(i, _)| format!("?{}", param_idx + i))
762                    .collect();
763
764                let wanted_clause = if wanted_placeholders.is_empty() {
765                    // Agent has no tags - only match tasks with no wanted_tags
766                    "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
767                        .to_string()
768                } else {
769                    // Task must have no wanted_tags OR agent must have at least one
770                    format!(
771                        "AND (
772                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
773                            OR EXISTS (
774                                SELECT 1 FROM task_wanted_tags 
775                                WHERE task_id = t.id AND tag IN ({})
776                            )
777                        )",
778                        wanted_placeholders.join(", ")
779                    )
780                };
781
782                (needed_clause, wanted_clause)
783            } else {
784                (String::new(), String::new())
785            };
786
787            let sql = format!(
788                "SELECT t.*
789                 FROM tasks t
790                 WHERE t.status = ?1
791                 AND t.worker_id IS NULL
792                 AND t.deleted_at IS NULL
793                 AND NOT EXISTS (
794                     SELECT 1 FROM dependencies d
795                     INNER JOIN tasks blocker ON d.from_task_id = blocker.id
796                     WHERE d.to_task_id = t.id 
797                     AND d.dep_type IN ({})
798                     AND blocker.status IN ({})
799                 )
800                 {}
801                 {}
802                 ORDER BY {}",
803                type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
804            );
805
806            let mut stmt = conn.prepare(&sql)?;
807
808            // Build params: initial state + blocking states + types + agent tags (twice if present)
809            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
810            params_vec.push(Box::new(states_config.initial.clone()));
811            for state in &states_config.blocking_states {
812                params_vec.push(Box::new(state.clone()));
813            }
814            for t in &start_blocking_types {
815                params_vec.push(Box::new(t.to_string()));
816            }
817            // Add agent tags twice (once for needed_tags check, once for wanted_tags check)
818            if let Some(ref tags) = agent_tags {
819                for tag in tags {
820                    params_vec.push(Box::new(tag.clone()));
821                }
822                for tag in tags {
823                    params_vec.push(Box::new(tag.clone()));
824                }
825            }
826            let params_refs: Vec<&dyn rusqlite::ToSql> =
827                params_vec.iter().map(|b| b.as_ref()).collect();
828
829            let tasks: Vec<Task> = stmt
830                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
831                .filter_map(|r| r.ok())
832                .collect();
833
834            Ok(tasks)
835        })
836    }
837
838    /// Check if a task has unmet start dependencies.
839    #[allow(dead_code)]
840    pub fn has_unmet_start_dependencies(
841        &self,
842        task_id: &str,
843        states_config: &StatesConfig,
844        deps_config: &DependenciesConfig,
845    ) -> Result<bool> {
846        let start_blocking_types = deps_config.start_blocking_types();
847        if start_blocking_types.is_empty() {
848            return Ok(false);
849        }
850
851        self.with_conn(|conn| {
852            // Build IN clause from blocking_states
853            let state_placeholders: Vec<String> = states_config
854                .blocking_states
855                .iter()
856                .enumerate()
857                .map(|(i, _)| format!("?{}", i + 2))
858                .collect();
859            let state_clause = state_placeholders.join(", ");
860
861            // Build IN clause from types
862            let type_start = states_config.blocking_states.len() + 2;
863            let type_placeholders: Vec<String> = start_blocking_types
864                .iter()
865                .enumerate()
866                .map(|(i, _)| format!("?{}", type_start + i))
867                .collect();
868            let type_clause = type_placeholders.join(", ");
869
870            let sql = format!(
871                "SELECT COUNT(*) FROM dependencies d
872                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
873                 WHERE d.to_task_id = ?1 
874                 AND d.dep_type IN ({})
875                 AND blocker.status IN ({})",
876                type_clause, state_clause
877            );
878
879            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
880            params_vec.push(Box::new(task_id.to_string()));
881            for state in &states_config.blocking_states {
882                params_vec.push(Box::new(state.clone()));
883            }
884            for t in &start_blocking_types {
885                params_vec.push(Box::new(t.to_string()));
886            }
887            let params_refs: Vec<&dyn rusqlite::ToSql> =
888                params_vec.iter().map(|b| b.as_ref()).collect();
889
890            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
891
892            Ok(count > 0)
893        })
894    }
895
896    /// Check if a task has incomplete children (blocking completion).
897    pub fn has_incomplete_children(
898        &self,
899        task_id: &str,
900        states_config: &StatesConfig,
901    ) -> Result<bool> {
902        self.with_conn(|conn| {
903            // Build IN clause from blocking_states
904            let state_placeholders: Vec<String> = states_config
905                .blocking_states
906                .iter()
907                .enumerate()
908                .map(|(i, _)| format!("?{}", i + 2))
909                .collect();
910            let state_clause = state_placeholders.join(", ");
911
912            let sql = format!(
913                "SELECT COUNT(*) FROM dependencies d
914                 INNER JOIN tasks child ON d.to_task_id = child.id
915                 WHERE d.from_task_id = ?1 
916                 AND d.dep_type = 'contains'
917                 AND child.status IN ({})",
918                state_clause
919            );
920
921            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
922            params_vec.push(Box::new(task_id.to_string()));
923            for state in &states_config.blocking_states {
924                params_vec.push(Box::new(state.clone()));
925            }
926            let params_refs: Vec<&dyn rusqlite::ToSql> =
927                params_vec.iter().map(|b| b.as_ref()).collect();
928
929            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
930
931            Ok(count > 0)
932        })
933    }
934
935    /// Get tasks with tag-based filtering using junction tables for indexed lookups.
936    /// - `tags_any`: Task must have at least one of these tags (OR)
937    /// - `tags_all`: Task must have all of these tags (AND)
938    /// - `qualified_for_agent_tags`: If provided, only return tasks where these tags satisfy the task's agent_tags_all/agent_tags_any
939    ///
940    /// Excludes soft-deleted tasks.
941    #[allow(clippy::too_many_arguments)]
942    pub fn list_tasks_with_tag_filters(
943        &self,
944        status: Option<Vec<String>>,
945        owner: Option<&str>,
946        parent_id: Option<Option<&str>>,
947        tags_any: Option<Vec<String>>,
948        tags_all: Option<Vec<String>>,
949        qualified_for_agent_tags: Option<Vec<String>>,
950        limit: Option<i32>,
951        sort_by: Option<&str>,
952        sort_order: Option<&str>,
953    ) -> Result<Vec<Task>> {
954        self.with_conn(|conn| {
955            let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
956            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
957            let mut param_idx = 1;
958
959            // Status filter (can be single or multiple)
960            if let Some(ref statuses) = status {
961                if statuses.len() == 1 {
962                    sql.push_str(&format!(" AND t.status = ?{}", param_idx));
963                    params_vec.push(Box::new(statuses[0].clone()));
964                    param_idx += 1;
965                } else if statuses.len() > 1 {
966                    let placeholders: Vec<String> = statuses
967                        .iter()
968                        .enumerate()
969                        .map(|(i, _)| format!("?{}", param_idx + i))
970                        .collect();
971                    sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
972                    for s in statuses {
973                        params_vec.push(Box::new(s.clone()));
974                    }
975                    param_idx += statuses.len();
976                }
977            }
978
979            // Owner filter
980            if let Some(o) = owner {
981                sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
982                params_vec.push(Box::new(o.to_string()));
983                param_idx += 1;
984            }
985
986            // Parent filter via dependencies table
987            if let Some(p) = parent_id {
988                match p {
989                    Some(pid) => {
990                        sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
991                        params_vec.push(Box::new(pid.to_string()));
992                        param_idx += 1;
993                    }
994                    None => {
995                        // Root tasks: not contained by any other task
996                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
997                    }
998                }
999            }
1000
1001            // tags_any: Task must have at least one of these tags (OR) - uses task_tags junction
1002            if let Some(ref any_tags) = tags_any
1003                && !any_tags.is_empty() {
1004                    let placeholders: Vec<String> = any_tags
1005                        .iter()
1006                        .enumerate()
1007                        .map(|(i, _)| format!("?{}", param_idx + i))
1008                        .collect();
1009                    sql.push_str(&format!(
1010                        " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1011                        placeholders.join(", ")
1012                    ));
1013                    for tag in any_tags {
1014                        params_vec.push(Box::new(tag.clone()));
1015                    }
1016                    param_idx += any_tags.len();
1017                }
1018
1019            // tags_all: Task must have all of these tags (AND) - uses task_tags junction
1020            if let Some(ref all_tags) = tags_all
1021                && !all_tags.is_empty() {
1022                    let placeholders: Vec<String> = all_tags
1023                        .iter()
1024                        .enumerate()
1025                        .map(|(i, _)| format!("?{}", param_idx + i))
1026                        .collect();
1027                    // Count matching tags must equal total required tags
1028                    sql.push_str(&format!(
1029                        " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1030                        placeholders.join(", "),
1031                        all_tags.len()
1032                    ));
1033                    for tag in all_tags {
1034                        params_vec.push(Box::new(tag.clone()));
1035                    }
1036                    param_idx += all_tags.len();
1037                }
1038
1039            // qualified_for: Agent's tags must satisfy task's requirements - uses junction tables
1040            if let Some(ref agent_tags) = qualified_for_agent_tags {
1041                // Agent must have ALL of task's agent_tags_all
1042                if agent_tags.is_empty() {
1043                    // Agent has no tags - only match tasks with no needed_tags
1044                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1045                    // And no wanted_tags
1046                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1047                } else {
1048                    // For needed_tags (AND): task must have no needed_tags OR agent has all
1049                    let needed_placeholders: Vec<String> = agent_tags
1050                        .iter()
1051                        .enumerate()
1052                        .map(|(i, _)| format!("?{}", param_idx + i))
1053                        .collect();
1054                    sql.push_str(&format!(
1055                        " AND (
1056                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1057                            OR (
1058                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1059                            ) = (
1060                                SELECT COUNT(*) FROM task_needed_tags 
1061                                WHERE task_id = t.id AND tag IN ({})
1062                            )
1063                        )",
1064                        needed_placeholders.join(", ")
1065                    ));
1066                    for tag in agent_tags {
1067                        params_vec.push(Box::new(tag.clone()));
1068                    }
1069                    param_idx += agent_tags.len();
1070
1071                    // For wanted_tags (OR): task must have no wanted_tags OR agent has at least one
1072                    let wanted_placeholders: Vec<String> = agent_tags
1073                        .iter()
1074                        .enumerate()
1075                        .map(|(i, _)| format!("?{}", param_idx + i))
1076                        .collect();
1077                    sql.push_str(&format!(
1078                        " AND (
1079                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1080                            OR EXISTS (
1081                                SELECT 1 FROM task_wanted_tags 
1082                                WHERE task_id = t.id AND tag IN ({})
1083                            )
1084                        )",
1085                        wanted_placeholders.join(", ")
1086                    ));
1087                    for tag in agent_tags {
1088                        params_vec.push(Box::new(tag.clone()));
1089                    }
1090                    // param_idx += agent_tags.len(); // not needed, last param set
1091                }
1092            }
1093
1094            // Build ORDER BY clause
1095            let order_clause = build_order_clause(sort_by, sort_order);
1096            sql.push_str(&format!(" ORDER BY {}", order_clause));
1097
1098            // Apply limit in SQL
1099            if let Some(l) = limit {
1100                sql.push_str(&format!(" LIMIT {}", l));
1101            }
1102
1103            let params_refs: Vec<&dyn rusqlite::ToSql> =
1104                params_vec.iter().map(|b| b.as_ref()).collect();
1105
1106            let mut stmt = conn.prepare(&sql)?;
1107            let tasks: Vec<Task> = stmt
1108                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1109                .filter_map(|r| r.ok())
1110                .collect();
1111
1112            Ok(tasks)
1113        })
1114    }
1115
1116    /// Get agent tags by agent ID.
1117    pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1118        self.with_conn(|conn| {
1119            let result: Result<String, rusqlite::Error> = conn.query_row(
1120                "SELECT tags FROM workers WHERE id = ?1",
1121                params![agent_id],
1122                |row| row.get(0),
1123            );
1124
1125            match result {
1126                Ok(tags_json) => {
1127                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1128                    Ok(tags)
1129                }
1130                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1131                Err(e) => Err(e.into()),
1132            }
1133        })
1134    }
1135
1136    /// Internal helper: add dependency within a transaction (used by tasks.rs).
1137    pub(super) fn add_dependency_internal(
1138        conn: &Connection,
1139        from_task_id: &str,
1140        to_task_id: &str,
1141        dep_type: &str,
1142    ) -> Result<()> {
1143        conn.execute(
1144            "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1145            params![from_task_id, to_task_id, dep_type],
1146        )?;
1147        Ok(())
1148    }
1149
1150    /// Atomically relink dependencies: unlink all prev_from→prev_to, then link all from→to.
1151    /// This is a transaction-safe operation for moving children between parents.
1152    /// Returns a result with unlinked and linked pairs.
1153    pub fn relink(
1154        &self,
1155        prev_from_ids: &[String],
1156        prev_to_ids: &[String],
1157        from_ids: &[String],
1158        to_ids: &[String],
1159        dep_type: &str,
1160        deps_config: &DependenciesConfig,
1161    ) -> Result<RelinkResult> {
1162        // Validate dependency type upfront
1163        if !deps_config.is_valid_dep_type(dep_type) {
1164            return Err(anyhow!(
1165                "Invalid dependency type '{}'. Valid types: {:?}",
1166                dep_type,
1167                deps_config.dep_type_names()
1168            ));
1169        }
1170
1171        let def = deps_config
1172            .get_definition(dep_type)
1173            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1174        let is_vertical = def.display == DependencyDisplay::Vertical;
1175
1176        self.with_conn_mut(|conn| {
1177            let tx = conn.transaction()?;
1178
1179            let mut unlinked = Vec::new();
1180            let mut linked = Vec::new();
1181            let mut errors = Vec::new();
1182
1183            // Phase 1: Unlink all prev_from × prev_to
1184            for prev_from in prev_from_ids {
1185                for prev_to in prev_to_ids {
1186                    let rows = tx.execute(
1187                        "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1188                        params![prev_from, prev_to, dep_type],
1189                    )?;
1190                    if rows > 0 {
1191                        unlinked.push((prev_from.clone(), prev_to.clone()));
1192                    }
1193                }
1194            }
1195
1196            // Phase 2: Link all from × to (with validation)
1197            for from_id in from_ids {
1198                for to_id in to_ids {
1199                    // For vertical deps, check single-parent constraint
1200                    if is_vertical {
1201                        let existing_parent: Option<String> = tx.query_row(
1202                            "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1203                            params![to_id],
1204                            |row| row.get(0),
1205                        ).optional()?;
1206
1207                        if let Some(ref parent) = existing_parent
1208                            && parent != from_id {
1209                                errors.push(format!(
1210                                    "Task {} already has parent {}",
1211                                    to_id, parent
1212                                ));
1213                                continue;
1214                            }
1215                    }
1216
1217                    // Check for cycles using temporary view within transaction
1218                    if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1219                        errors.push(format!(
1220                            "Adding dependency {}→{} would create a cycle",
1221                            from_id, to_id
1222                        ));
1223                        continue;
1224                    }
1225
1226                    tx.execute(
1227                        "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1228                        params![from_id, to_id, dep_type],
1229                    )?;
1230                    linked.push((from_id.clone(), to_id.clone()));
1231                }
1232            }
1233
1234            if !errors.is_empty() {
1235                // Rollback on validation errors
1236                tx.rollback()?;
1237                return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1238            }
1239
1240            tx.commit()?;
1241            Ok(RelinkResult { unlinked, linked })
1242        })
1243    }
1244
1245    // ============================================================================
1246    // Graph Traversal Methods for scan tool
1247    // ============================================================================
1248
1249    /// Get predecessors (tasks that block this task) via blocks/follows dependencies.
1250    /// depth: 0 = none, N = N levels, -1 = all
1251    pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1252        if depth == 0 {
1253            return Ok(vec![]);
1254        }
1255
1256        self.with_conn(|conn| {
1257            let mut visited: HashSet<String> = HashSet::new();
1258            let mut result: Vec<Task> = Vec::new();
1259            let mut current_level: Vec<String> = vec![task_id.to_string()];
1260            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1261
1262            while !current_level.is_empty() && levels_remaining > 0 {
1263                let mut next_level: Vec<String> = Vec::new();
1264
1265                for tid in &current_level {
1266                    // Get tasks that block this one (from_task_id blocks to_task_id)
1267                    let mut stmt = conn.prepare(
1268                        "SELECT DISTINCT d.from_task_id FROM dependencies d
1269                         WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1270                    )?;
1271
1272                    let predecessors: Vec<String> = stmt
1273                        .query_map(params![tid], |row| row.get(0))?
1274                        .filter_map(|r| r.ok())
1275                        .collect();
1276
1277                    for pred_id in predecessors {
1278                        if !visited.contains(&pred_id) {
1279                            visited.insert(pred_id.clone());
1280                            if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1281                                result.push(task);
1282                            }
1283                            next_level.push(pred_id);
1284                        }
1285                    }
1286                }
1287
1288                current_level = next_level;
1289                levels_remaining -= 1;
1290            }
1291
1292            Ok(result)
1293        })
1294    }
1295
1296    /// Get successors (tasks that this task blocks) via blocks/follows dependencies.
1297    /// depth: 0 = none, N = N levels, -1 = all
1298    pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1299        if depth == 0 {
1300            return Ok(vec![]);
1301        }
1302
1303        self.with_conn(|conn| {
1304            let mut visited: HashSet<String> = HashSet::new();
1305            let mut result: Vec<Task> = Vec::new();
1306            let mut current_level: Vec<String> = vec![task_id.to_string()];
1307            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1308
1309            while !current_level.is_empty() && levels_remaining > 0 {
1310                let mut next_level: Vec<String> = Vec::new();
1311
1312                for tid in &current_level {
1313                    // Get tasks that this one blocks (from_task_id blocks to_task_id)
1314                    let mut stmt = conn.prepare(
1315                        "SELECT DISTINCT d.to_task_id FROM dependencies d
1316                         WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1317                    )?;
1318
1319                    let successors: Vec<String> = stmt
1320                        .query_map(params![tid], |row| row.get(0))?
1321                        .filter_map(|r| r.ok())
1322                        .collect();
1323
1324                    for succ_id in successors {
1325                        if !visited.contains(&succ_id) {
1326                            visited.insert(succ_id.clone());
1327                            if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1328                                result.push(task);
1329                            }
1330                            next_level.push(succ_id);
1331                        }
1332                    }
1333                }
1334
1335                current_level = next_level;
1336                levels_remaining -= 1;
1337            }
1338
1339            Ok(result)
1340        })
1341    }
1342
1343    /// Get ancestors (parent chain) via contains dependency.
1344    /// depth: 0 = none, N = N levels up, -1 = all
1345    pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1346        if depth == 0 {
1347            return Ok(vec![]);
1348        }
1349
1350        self.with_conn(|conn| {
1351            let mut result: Vec<Task> = Vec::new();
1352            let mut current_id = task_id.to_string();
1353            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1354
1355            while levels_remaining > 0 {
1356                // Get parent (from_task_id contains to_task_id)
1357                let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1358                    "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1359                    params![&current_id],
1360                    |row| row.get(0),
1361                );
1362
1363                match parent_result {
1364                    Ok(parent_id) => {
1365                        if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1366                            result.push(task);
1367                        }
1368                        current_id = parent_id;
1369                        levels_remaining -= 1;
1370                    }
1371                    Err(rusqlite::Error::QueryReturnedNoRows) => break,
1372                    Err(e) => return Err(e.into()),
1373                }
1374            }
1375
1376            Ok(result)
1377        })
1378    }
1379
1380    /// Get descendants (children tree) via contains dependency.
1381    /// depth: 0 = none, N = N levels down, -1 = all
1382    pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1383        if depth == 0 {
1384            return Ok(vec![]);
1385        }
1386
1387        self.with_conn(|conn| {
1388            let mut visited: HashSet<String> = HashSet::new();
1389            let mut result: Vec<Task> = Vec::new();
1390            let mut current_level: Vec<String> = vec![task_id.to_string()];
1391            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1392
1393            while !current_level.is_empty() && levels_remaining > 0 {
1394                let mut next_level: Vec<String> = Vec::new();
1395
1396                for tid in &current_level {
1397                    // Get children (from_task_id contains to_task_id)
1398                    let mut stmt = conn.prepare(
1399                        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1400                    )?;
1401
1402                    let children: Vec<String> = stmt
1403                        .query_map(params![tid], |row| row.get(0))?
1404                        .filter_map(|r| r.ok())
1405                        .collect();
1406
1407                    for child_id in children {
1408                        if !visited.contains(&child_id) {
1409                            visited.insert(child_id.clone());
1410                            if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1411                                result.push(task);
1412                            }
1413                            next_level.push(child_id);
1414                        }
1415                    }
1416                }
1417
1418                current_level = next_level;
1419                levels_remaining -= 1;
1420            }
1421
1422            Ok(result)
1423        })
1424    }
1425}
1426
1427/// Helper to get a task by ID within a connection context.
1428fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1429    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1430    let task = stmt
1431        .query_row(params![task_id], super::tasks::parse_task_row)
1432        .optional()?;
1433    Ok(task)
1434}
1435
1436/// Get the IDs of tasks that block a given task from starting (unsatisfied dependencies).
1437/// A task blocks starting if it has a start-blocking dependency type and is in a blocking state.
1438/// This is a transaction-safe version for use within existing transactions.
1439pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1440    conn: &Connection,
1441    task_id: &str,
1442    states_config: &StatesConfig,
1443    deps_config: &DependenciesConfig,
1444) -> Result<Vec<String>> {
1445    let start_blocking_types = deps_config.start_blocking_types();
1446    if start_blocking_types.is_empty() {
1447        return Ok(vec![]);
1448    }
1449
1450    // Build IN clause from blocking_states
1451    let state_placeholders: Vec<String> = states_config
1452        .blocking_states
1453        .iter()
1454        .enumerate()
1455        .map(|(i, _)| format!("?{}", i + 2))
1456        .collect();
1457    let state_clause = state_placeholders.join(", ");
1458
1459    // Build IN clause from types
1460    let type_start = states_config.blocking_states.len() + 2;
1461    let type_placeholders: Vec<String> = start_blocking_types
1462        .iter()
1463        .enumerate()
1464        .map(|(i, _)| format!("?{}", type_start + i))
1465        .collect();
1466    let type_clause = type_placeholders.join(", ");
1467
1468    let sql = format!(
1469        "SELECT blocker.id FROM dependencies d
1470         INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1471         WHERE d.to_task_id = ?1 
1472         AND d.dep_type IN ({})
1473         AND blocker.status IN ({})",
1474        type_clause, state_clause
1475    );
1476
1477    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1478    params_vec.push(Box::new(task_id.to_string()));
1479    for state in &states_config.blocking_states {
1480        params_vec.push(Box::new(state.clone()));
1481    }
1482    for t in &start_blocking_types {
1483        params_vec.push(Box::new(t.to_string()));
1484    }
1485    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1486
1487    let mut stmt = conn.prepare(&sql)?;
1488    let blockers = stmt
1489        .query_map(params_refs.as_slice(), |row| {
1490            let id: String = row.get(0)?;
1491            Ok(id)
1492        })?
1493        .filter_map(|r| r.ok())
1494        .collect();
1495
1496    Ok(blockers)
1497}
1498
1499/// Propagate unblock effects when a task transitions out of a blocking state.
1500/// This is called after a task completes to find newly unblocked tasks and
1501/// optionally auto-advance them.
1502///
1503/// Returns (unblocked, auto_advanced):
1504/// - unblocked: All task IDs that are now ready (all dependencies satisfied)
1505/// - auto_advanced: Subset of unblocked that were actually transitioned (when enabled)
1506///
1507/// Algorithm:
1508/// 1. Find all tasks that have a start-blocking dependency on the completed task
1509/// 2. For each candidate:
1510///    - Skip if not in initial state or already claimed
1511///    - Check if ALL other start-blockers are also satisfied
1512///    - If fully unblocked → add to unblocked list
1513///    - If auto_advance enabled → also transition to target_state
1514/// 3. Return both lists
1515pub(crate) fn propagate_unblock_effects(
1516    conn: &Connection,
1517    completed_task_id: &str,
1518    agent_id: Option<&str>,
1519    states_config: &StatesConfig,
1520    deps_config: &DependenciesConfig,
1521    auto_advance: &AutoAdvanceConfig,
1522) -> Result<(Vec<String>, Vec<String>)> {
1523    // Get start-blocking dependency types
1524    let start_blocking_types = deps_config.start_blocking_types();
1525    if start_blocking_types.is_empty() {
1526        return Ok((vec![], vec![]));
1527    }
1528
1529    // Find all tasks that depend on the completed task via start-blocking dependencies
1530    let type_placeholders: Vec<String> = start_blocking_types
1531        .iter()
1532        .enumerate()
1533        .map(|(i, _)| format!("?{}", i + 2))
1534        .collect();
1535    let type_clause = type_placeholders.join(", ");
1536
1537    let sql = format!(
1538        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1539        type_clause
1540    );
1541
1542    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1543    params_vec.push(Box::new(completed_task_id.to_string()));
1544    for t in &start_blocking_types {
1545        params_vec.push(Box::new(t.to_string()));
1546    }
1547    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1548
1549    let mut stmt = conn.prepare(&sql)?;
1550    let dependent_task_ids: Vec<String> = stmt
1551        .query_map(params_refs.as_slice(), |row| row.get(0))?
1552        .filter_map(|r| r.ok())
1553        .collect();
1554
1555    let mut unblocked = Vec::new();
1556    let mut auto_advanced = Vec::new();
1557    let now = super::now_ms();
1558
1559    // Determine if we should auto-advance
1560    let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1561    let target_state = auto_advance.target_state.clone();
1562
1563    // Validate target state if auto-advance is enabled
1564    if should_auto_advance {
1565        let ts = target_state.as_ref().unwrap();
1566        if !states_config.is_valid_state(ts) {
1567            return Err(anyhow!(
1568                "Auto-advance target state '{}' is not a valid state",
1569                ts
1570            ));
1571        }
1572    }
1573
1574    for task_id in dependent_task_ids {
1575        // Get the task
1576        let task = match get_task_by_id_internal(conn, &task_id)? {
1577            Some(t) => t,
1578            None => continue,
1579        };
1580
1581        // Skip if not in initial state
1582        if task.status != states_config.initial {
1583            continue;
1584        }
1585
1586        // Skip if task is already claimed
1587        if task.worker_id.is_some() {
1588            continue;
1589        }
1590
1591        // Check if ALL start-blockers are now satisfied (not in blocking states)
1592        // Build query to count remaining blockers that are still blocking
1593        let state_placeholders: Vec<String> = states_config
1594            .blocking_states
1595            .iter()
1596            .enumerate()
1597            .map(|(i, _)| format!("?{}", i + 3))
1598            .collect();
1599        let state_clause = state_placeholders.join(", ");
1600
1601        // Reuse type_placeholders from above
1602        let type_start = states_config.blocking_states.len() + 3;
1603        let type_placeholders2: Vec<String> = start_blocking_types
1604            .iter()
1605            .enumerate()
1606            .map(|(i, _)| format!("?{}", type_start + i))
1607            .collect();
1608        let type_clause2 = type_placeholders2.join(", ");
1609
1610        let blocker_sql = format!(
1611            "SELECT COUNT(*) FROM dependencies d
1612             INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1613             WHERE d.to_task_id = ?1
1614             AND d.from_task_id != ?2
1615             AND d.dep_type IN ({})
1616             AND blocker.status IN ({})",
1617            type_clause2, state_clause
1618        );
1619
1620        let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1621        blocker_params.push(Box::new(task_id.clone()));
1622        blocker_params.push(Box::new(completed_task_id.to_string()));
1623        for state in &states_config.blocking_states {
1624            blocker_params.push(Box::new(state.clone()));
1625        }
1626        for t in &start_blocking_types {
1627            blocker_params.push(Box::new(t.to_string()));
1628        }
1629        let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1630            blocker_params.iter().map(|b| b.as_ref()).collect();
1631
1632        let remaining_blockers: i32 =
1633            conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1634
1635        if remaining_blockers > 0 {
1636            continue; // Still blocked by other tasks
1637        }
1638
1639        // Task is now fully unblocked - add to unblocked list
1640        unblocked.push(task_id.clone());
1641
1642        // Auto-advance if enabled and transition is valid
1643        if should_auto_advance {
1644            let ts = target_state.as_ref().unwrap();
1645
1646            // Validate transition from initial to target_state
1647            if !states_config.is_valid_transition(&states_config.initial, ts) {
1648                // Skip auto-advance for this task - transition not allowed
1649                continue;
1650            }
1651
1652            // Auto-advance: update the task's status
1653            conn.execute(
1654                "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1655                params![ts, now, &task_id],
1656            )?;
1657
1658            // Record state transition
1659            let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1660            super::state_transitions::record_state_transition(
1661                conn,
1662                &task_id,
1663                ts,
1664                agent_id,
1665                Some(&reason),
1666                states_config,
1667            )?;
1668
1669            auto_advanced.push(task_id);
1670        }
1671    }
1672
1673    Ok((unblocked, auto_advanced))
1674}