Skip to main content

task_graph_mcp/db/
deps.rs

1//! Dependency operations and cycle detection with typed dependencies.
2
3use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{Result, anyhow};
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::{HashSet, VecDeque};
9
10/// Result of a relink operation.
11#[derive(Debug)]
12pub struct RelinkResult {
13    /// Dependencies that were unlinked (from, to).
14    pub unlinked: Vec<(String, String)>,
15    /// Dependencies that were linked (from, to).
16    pub linked: Vec<(String, String)>,
17}
18
19/// Check if adding a dependency would create a cycle (transaction-safe version).
20fn would_create_cycle_in_tx(
21    tx: &rusqlite::Transaction,
22    from_task_id: &str,
23    to_task_id: &str,
24    dep_type: &str,
25    deps_config: &DependenciesConfig,
26) -> Result<bool> {
27    let def = deps_config
28        .get_definition(dep_type)
29        .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
30
31    // A cycle would occur if to_task can already reach from_task
32    // through the same "graph" (horizontal or vertical)
33    let mut visited: HashSet<String> = HashSet::new();
34    let mut queue: VecDeque<String> = VecDeque::new();
35    queue.push_back(to_task_id.to_string());
36
37    while let Some(current) = queue.pop_front() {
38        if current == from_task_id {
39            return Ok(true); // Would create a cycle
40        }
41
42        if visited.contains(&current) {
43            continue;
44        }
45        visited.insert(current.clone());
46
47        // Get all tasks that current points to (in the relevant graph)
48        let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
49            // For vertical deps, only check containment relationships
50            let mut stmt = tx.prepare(
51                "SELECT to_task_id FROM dependencies d
52                 JOIN (SELECT value FROM json_each(?1)) types
53                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
54            )?;
55            let vertical_types: Vec<&str> = deps_config.vertical_types();
56            let types_json = serde_json::to_string(&vertical_types)?;
57            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
58                .filter_map(|r| r.ok())
59                .collect()
60        } else {
61            // For horizontal deps, check all start-blocking relationships
62            let mut stmt = tx.prepare(
63                "SELECT to_task_id FROM dependencies d
64                 JOIN (SELECT value FROM json_each(?1)) types
65                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
66            )?;
67            let start_blocking: Vec<&str> = deps_config.start_blocking_types();
68            let types_json = serde_json::to_string(&start_blocking)?;
69            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
70                .filter_map(|r| r.ok())
71                .collect()
72        };
73
74        for dep in deps {
75            if !visited.contains(&dep) {
76                queue.push_back(dep);
77            }
78        }
79    }
80
81    Ok(false)
82}
83
84/// Build an ORDER BY clause from sort_by and sort_order parameters.
85/// Returns a safe SQL ORDER BY expression.
86fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
87    let field = match sort_by {
88        Some("priority") => "CAST(t.priority AS INTEGER)",
89        Some("created_at") => "t.created_at",
90        Some("updated_at") => "t.updated_at",
91        _ => "t.created_at", // default
92    };
93
94    let order = match sort_order {
95        Some("asc") => "ASC",
96        Some("desc") => "DESC",
97        _ => {
98            // Default: priority is descending (higher number = more important), dates are descending
99            "DESC"
100        }
101    };
102
103    format!("{} {}", field, order)
104}
105
106/// Result of attempting to add a dependency.
107#[derive(Debug)]
108pub enum AddDependencyResult {
109    /// Dependency was created successfully.
110    Created,
111    /// Dependency already existed (no change).
112    AlreadyExists,
113    /// Source task does not exist.
114    FromTaskNotFound,
115    /// Target task does not exist.
116    ToTaskNotFound,
117}
118
119impl Database {
120    /// Check if a task exists by ID.
121    pub fn task_exists(&self, task_id: &str) -> Result<bool> {
122        self.with_conn(|conn| {
123            let count: i64 = conn.query_row(
124                "SELECT COUNT(*) FROM tasks WHERE id = ?1",
125                params![task_id],
126                |row| row.get(0),
127            )?;
128            Ok(count > 0)
129        })
130    }
131
132    /// Add a typed dependency (from blocks/contains to).
133    /// Returns Ok(()) on success. For soft linking with warnings, use add_dependency_soft.
134    pub fn add_dependency(
135        &self,
136        from_task_id: &str,
137        to_task_id: &str,
138        dep_type: &str,
139        deps_config: &DependenciesConfig,
140    ) -> Result<()> {
141        match self.add_dependency_soft(from_task_id, to_task_id, dep_type, deps_config)? {
142            AddDependencyResult::Created | AddDependencyResult::AlreadyExists => Ok(()),
143            AddDependencyResult::FromTaskNotFound => {
144                Err(anyhow!("Source task '{}' not found", from_task_id))
145            }
146            AddDependencyResult::ToTaskNotFound => {
147                Err(anyhow!("Target task '{}' not found", to_task_id))
148            }
149        }
150    }
151
152    /// Add a typed dependency with soft failure (returns result status instead of error).
153    /// Use this when you want to handle missing tasks as warnings.
154    pub fn add_dependency_soft(
155        &self,
156        from_task_id: &str,
157        to_task_id: &str,
158        dep_type: &str,
159        deps_config: &DependenciesConfig,
160    ) -> Result<AddDependencyResult> {
161        // Validate dependency type
162        if !deps_config.is_valid_dep_type(dep_type) {
163            return Err(anyhow!(
164                "Invalid dependency type '{}'. Valid types: {:?}",
165                dep_type,
166                deps_config.dep_type_names()
167            ));
168        }
169
170        // Check if tasks exist first
171        if !self.task_exists(from_task_id)? {
172            return Ok(AddDependencyResult::FromTaskNotFound);
173        }
174        if !self.task_exists(to_task_id)? {
175            return Ok(AddDependencyResult::ToTaskNotFound);
176        }
177
178        // For vertical (contains) dependencies, check single-parent constraint
179        let def = deps_config
180            .get_definition(dep_type)
181            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
182        if def.display == DependencyDisplay::Vertical
183            && let Some(existing_parent) = self.get_parent(to_task_id)?
184            && existing_parent != from_task_id
185        {
186            return Err(anyhow!(
187                "Task {} already has parent {}",
188                to_task_id,
189                existing_parent
190            ));
191        }
192
193        // Check for cycle in the appropriate graph
194        if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
195            return Err(anyhow!("Adding this dependency would create a cycle"));
196        }
197
198        self.with_conn(|conn| {
199            let changes = conn.execute(
200                "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
201                params![from_task_id, to_task_id, dep_type],
202            )?;
203            if changes == 0 {
204                Ok(AddDependencyResult::AlreadyExists)
205            } else {
206                Ok(AddDependencyResult::Created)
207            }
208        })
209    }
210
211    /// Check if adding a dependency would create a cycle.
212    /// For horizontal deps: check cycle in the start-blocking graph.
213    /// For vertical deps: check containment cycle.
214    pub fn would_create_cycle(
215        &self,
216        from_task_id: &str,
217        to_task_id: &str,
218        dep_type: &str,
219        deps_config: &DependenciesConfig,
220    ) -> Result<bool> {
221        let def = deps_config
222            .get_definition(dep_type)
223            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
224
225        self.with_conn(|conn| {
226            // A cycle would occur if to_task can already reach from_task
227            // through the same "graph" (horizontal or vertical)
228            let mut visited: HashSet<String> = HashSet::new();
229            let mut queue: VecDeque<String> = VecDeque::new();
230            queue.push_back(to_task_id.to_string());
231
232            while let Some(current) = queue.pop_front() {
233                if current == from_task_id {
234                    return Ok(true); // Would create a cycle
235                }
236
237                if visited.contains(&current) {
238                    continue;
239                }
240                visited.insert(current.clone());
241
242                // Get all tasks that current points to (in the relevant graph)
243                let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
244                    // For vertical deps, only check containment relationships
245                    let mut stmt = conn.prepare(
246                        "SELECT to_task_id FROM dependencies d
247                         JOIN (SELECT value FROM json_each(?1)) types
248                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
249                    )?;
250                    let vertical_types: Vec<&str> = deps_config.vertical_types();
251                    let types_json = serde_json::to_string(&vertical_types)?;
252                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
253                        .filter_map(|r| r.ok())
254                        .collect()
255                } else {
256                    // For horizontal deps, check all start-blocking relationships
257                    let mut stmt = conn.prepare(
258                        "SELECT to_task_id FROM dependencies d
259                         JOIN (SELECT value FROM json_each(?1)) types
260                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value",
261                    )?;
262                    let start_blocking: Vec<&str> = deps_config.start_blocking_types();
263                    let types_json = serde_json::to_string(&start_blocking)?;
264                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
265                        .filter_map(|r| r.ok())
266                        .collect()
267                };
268
269                for dep in deps {
270                    if !visited.contains(&dep) {
271                        queue.push_back(dep);
272                    }
273                }
274            }
275
276            Ok(false)
277        })
278    }
279
280    /// Remove a typed dependency. Returns true if a row was deleted.
281    pub fn remove_dependency(
282        &self,
283        from_task_id: &str,
284        to_task_id: &str,
285        dep_type: &str,
286    ) -> Result<bool> {
287        self.with_conn(|conn| {
288            let rows = conn.execute(
289                "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
290                params![from_task_id, to_task_id, dep_type],
291            )?;
292            Ok(rows > 0)
293        })
294    }
295
296    /// Remove all dependencies of a given type from a task (outgoing edges).
297    /// Returns the list of removed dependencies.
298    pub fn remove_all_outgoing_dependencies(
299        &self,
300        from_task_id: &str,
301        dep_type: &str,
302    ) -> Result<Vec<Dependency>> {
303        self.with_conn_mut(|conn| {
304            let tx = conn.transaction()?;
305
306            // First get the dependencies that will be removed
307            let deps: Vec<Dependency> = {
308                let mut stmt = tx.prepare(
309                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
310                )?;
311                stmt
312                    .query_map(params![from_task_id, dep_type], |row| {
313                        Ok(Dependency {
314                            from_task_id: row.get(0)?,
315                            to_task_id: row.get(1)?,
316                            dep_type: row.get(2)?,
317                        })
318                    })?
319                    .filter_map(|r| r.ok())
320                    .collect()
321            };
322
323            // Then delete them
324            tx.execute(
325                "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
326                params![from_task_id, dep_type],
327            )?;
328
329            tx.commit()?;
330            Ok(deps)
331        })
332    }
333
334    /// Remove all dependencies of a given type to a task (incoming edges).
335    /// Returns the list of removed dependencies.
336    pub fn remove_all_incoming_dependencies(
337        &self,
338        to_task_id: &str,
339        dep_type: &str,
340    ) -> Result<Vec<Dependency>> {
341        self.with_conn_mut(|conn| {
342            let tx = conn.transaction()?;
343
344            // First get the dependencies that will be removed
345            let deps: Vec<Dependency> = {
346                let mut stmt = tx.prepare(
347                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
348                )?;
349                stmt
350                    .query_map(params![to_task_id, dep_type], |row| {
351                        Ok(Dependency {
352                            from_task_id: row.get(0)?,
353                            to_task_id: row.get(1)?,
354                            dep_type: row.get(2)?,
355                        })
356                    })?
357                    .filter_map(|r| r.ok())
358                    .collect()
359            };
360
361            // Then delete them
362            tx.execute(
363                "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
364                params![to_task_id, dep_type],
365            )?;
366
367            tx.commit()?;
368            Ok(deps)
369        })
370    }
371
372    /// Get all dependencies.
373    pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
374        self.with_conn(|conn| {
375            let mut stmt =
376                conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
377
378            let deps = stmt
379                .query_map([], |row| {
380                    let from: String = row.get(0)?;
381                    let to: String = row.get(1)?;
382                    let dep_type: String = row.get(2)?;
383                    Ok(Dependency {
384                        from_task_id: from,
385                        to_task_id: to,
386                        dep_type,
387                    })
388                })?
389                .filter_map(|r| r.ok())
390                .collect();
391
392            Ok(deps)
393        })
394    }
395
396    /// Get dependencies of a specific type for a task.
397    pub fn get_dependencies_by_type(
398        &self,
399        task_id: &str,
400        dep_type: &str,
401        direction: &str,
402    ) -> Result<Vec<Dependency>> {
403        self.with_conn(|conn| {
404            let sql = if direction == "incoming" {
405                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
406            } else {
407                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
408            };
409
410            let mut stmt = conn.prepare(sql)?;
411
412            let deps = stmt
413                .query_map(params![task_id, dep_type], |row| {
414                    let from: String = row.get(0)?;
415                    let to: String = row.get(1)?;
416                    let dep_type: String = row.get(2)?;
417                    Ok(Dependency {
418                        from_task_id: from,
419                        to_task_id: to,
420                        dep_type,
421                    })
422                })?
423                .filter_map(|r| r.ok())
424                .collect();
425
426            Ok(deps)
427        })
428    }
429
430    /// Get tasks that block a given task from starting (dep_type with blocks: start).
431    pub fn get_start_blockers(
432        &self,
433        task_id: &str,
434        deps_config: &DependenciesConfig,
435    ) -> Result<Vec<String>> {
436        let start_blocking_types = deps_config.start_blocking_types();
437        if start_blocking_types.is_empty() {
438            return Ok(vec![]);
439        }
440
441        self.with_conn(|conn| {
442            let placeholders: String = start_blocking_types
443                .iter()
444                .enumerate()
445                .map(|(i, _)| format!("?{}", i + 2))
446                .collect::<Vec<_>>()
447                .join(", ");
448
449            let sql = format!(
450                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
451                placeholders
452            );
453
454            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
455            params_vec.push(Box::new(task_id.to_string()));
456            for t in &start_blocking_types {
457                params_vec.push(Box::new(t.to_string()));
458            }
459            let params_refs: Vec<&dyn rusqlite::ToSql> =
460                params_vec.iter().map(|b| b.as_ref()).collect();
461
462            let mut stmt = conn.prepare(&sql)?;
463            let blockers = stmt
464                .query_map(params_refs.as_slice(), |row| {
465                    let id: String = row.get(0)?;
466                    Ok(id)
467                })?
468                .filter_map(|r| r.ok())
469                .collect();
470
471            Ok(blockers)
472        })
473    }
474
475    /// Get tasks that block a given task from completing (dep_type with blocks: completion).
476    /// For a parent task, this returns children that must complete first.
477    pub fn get_completion_blockers(
478        &self,
479        task_id: &str,
480        deps_config: &DependenciesConfig,
481    ) -> Result<Vec<String>> {
482        let completion_blocking_types = deps_config.completion_blocking_types();
483        if completion_blocking_types.is_empty() {
484            return Ok(vec![]);
485        }
486
487        self.with_conn(|conn| {
488            let placeholders: String = completion_blocking_types
489                .iter()
490                .enumerate()
491                .map(|(i, _)| format!("?{}", i + 2))
492                .collect::<Vec<_>>()
493                .join(", ");
494
495            // For completion blockers, we look at outgoing edges (from_task_id = parent)
496            // because "contains" means parent -> child, and child blocks parent completion
497            let sql = format!(
498                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
499                placeholders
500            );
501
502            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
503            params_vec.push(Box::new(task_id.to_string()));
504            for t in &completion_blocking_types {
505                params_vec.push(Box::new(t.to_string()));
506            }
507            let params_refs: Vec<&dyn rusqlite::ToSql> =
508                params_vec.iter().map(|b| b.as_ref()).collect();
509
510            let mut stmt = conn.prepare(&sql)?;
511            let blockers = stmt
512                .query_map(params_refs.as_slice(), |row| {
513                    let id: String = row.get(0)?;
514                    Ok(id)
515                })?
516                .filter_map(|r| r.ok())
517                .collect();
518
519            Ok(blockers)
520        })
521    }
522
523    /// Get the parent of a task (via 'contains' dependency).
524    pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
525        self.with_conn(|conn| {
526            let result: Result<String, rusqlite::Error> = conn.query_row(
527                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
528                params![task_id],
529                |row| row.get(0),
530            );
531
532            match result {
533                Ok(parent_id) => Ok(Some(parent_id)),
534                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
535                Err(e) => Err(e.into()),
536            }
537        })
538    }
539
540    /// Get children of a task (via 'contains' dependency).
541    pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
542        self.with_conn(|conn| {
543            let mut stmt = conn.prepare(
544                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
545            )?;
546
547            let children = stmt
548                .query_map(params![task_id], |row| {
549                    let id: String = row.get(0)?;
550                    Ok(id)
551                })?
552                .filter_map(|r| r.ok())
553                .collect();
554
555            Ok(children)
556        })
557    }
558
559    /// Get all tasks that block a given task (backwards compatible).
560    /// Returns tasks from both 'blocks' and 'follows' dependencies.
561    pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
562        self.with_conn(|conn| {
563            let mut stmt = conn.prepare(
564                "SELECT from_task_id FROM dependencies 
565                 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
566            )?;
567
568            let blockers = stmt
569                .query_map(params![task_id], |row| {
570                    let id: String = row.get(0)?;
571                    Ok(id)
572                })?
573                .filter_map(|r| r.ok())
574                .collect();
575
576            Ok(blockers)
577        })
578    }
579
580    /// Get tasks that a given task blocks.
581    #[allow(dead_code)]
582    pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
583        self.with_conn(|conn| {
584            let mut stmt = conn.prepare(
585                "SELECT to_task_id FROM dependencies 
586                 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
587            )?;
588
589            let blocking = stmt
590                .query_map(params![task_id], |row| {
591                    let id: String = row.get(0)?;
592                    Ok(id)
593                })?
594                .filter_map(|r| r.ok())
595                .collect();
596
597            Ok(blocking)
598        })
599    }
600
601    /// Get tasks that are blocked by incomplete start dependencies.
602    /// A task is blocked if any of its start-blocking dependencies are in a blocking state.
603    /// Excludes soft-deleted tasks.
604    pub fn get_blocked_tasks(
605        &self,
606        states_config: &StatesConfig,
607        deps_config: &DependenciesConfig,
608        sort_by: Option<&str>,
609        sort_order: Option<&str>,
610    ) -> Result<Vec<Task>> {
611        let start_blocking_types = deps_config.start_blocking_types();
612        if start_blocking_types.is_empty() {
613            return Ok(vec![]);
614        }
615
616        self.with_conn(|conn| {
617            // Build IN clause from blocking_states
618            let state_placeholders: Vec<String> = states_config
619                .blocking_states
620                .iter()
621                .enumerate()
622                .map(|(i, _)| format!("?{}", i + 2))
623                .collect();
624            let state_clause = state_placeholders.join(", ");
625
626            // Build IN clause from start_blocking_types
627            let type_start = states_config.blocking_states.len() + 2;
628            let type_placeholders: Vec<String> = start_blocking_types
629                .iter()
630                .enumerate()
631                .map(|(i, _)| format!("?{}", type_start + i))
632                .collect();
633            let type_clause = type_placeholders.join(", ");
634
635            // Build ORDER BY clause
636            let order_clause = build_order_clause(sort_by, sort_order);
637
638            let sql = format!(
639                "SELECT DISTINCT t.*
640                 FROM tasks t
641                 INNER JOIN dependencies d ON t.id = d.to_task_id
642                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
643                 WHERE d.dep_type IN ({})
644                 AND blocker.status IN ({})
645                 AND t.status = ?1
646                 AND t.deleted_at IS NULL
647                 ORDER BY {}",
648                type_clause, state_clause, order_clause
649            );
650
651            let mut stmt = conn.prepare(&sql)?;
652
653            // Build params: initial state + blocking states + start_blocking_types
654            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
655            params_vec.push(Box::new(states_config.initial.clone()));
656            for state in &states_config.blocking_states {
657                params_vec.push(Box::new(state.clone()));
658            }
659            for t in &start_blocking_types {
660                params_vec.push(Box::new(t.to_string()));
661            }
662            let params_refs: Vec<&dyn rusqlite::ToSql> =
663                params_vec.iter().map(|b| b.as_ref()).collect();
664
665            let tasks = stmt
666                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
667                .filter_map(|r| r.ok())
668                .collect();
669
670            Ok(tasks)
671        })
672    }
673
674    /// Get tasks that are ready to be claimed (all start dependencies satisfied).
675    /// A task is ready if it's in the initial state, unclaimed, and all start-blocking deps are not blocking.
676    /// When agent_id is provided, also filters by agent's tag qualifications using junction tables.
677    /// Excludes soft-deleted tasks.
678    pub fn get_ready_tasks(
679        &self,
680        agent_id: Option<&str>,
681        states_config: &StatesConfig,
682        deps_config: &DependenciesConfig,
683        sort_by: Option<&str>,
684        sort_order: Option<&str>,
685    ) -> Result<Vec<Task>> {
686        let start_blocking_types = deps_config.start_blocking_types();
687
688        // Get agent tags if agent_id is provided (for junction table filtering)
689        let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
690            Some(self.get_agent_tags(aid)?)
691        } else {
692            None
693        };
694
695        self.with_conn(|conn| {
696            // Build IN clause from blocking_states
697            let state_placeholders: Vec<String> = states_config
698                .blocking_states
699                .iter()
700                .enumerate()
701                .map(|(i, _)| format!("?{}", i + 2))
702                .collect();
703            let state_clause = state_placeholders.join(", ");
704
705            // Build IN clause from start_blocking_types
706            let type_start = states_config.blocking_states.len() + 2;
707            let type_placeholders: Vec<String> = start_blocking_types
708                .iter()
709                .enumerate()
710                .map(|(i, _)| format!("?{}", type_start + i))
711                .collect();
712            let type_clause = type_placeholders.join(", ");
713
714            // Build ORDER BY clause - for ready tasks, default is priority then created_at
715            let order_clause = if sort_by.is_some() {
716                build_order_clause(sort_by, sort_order)
717            } else {
718                // Default for ready: priority (high first), then created_at
719                "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
720            };
721
722            // Track param index for agent tag filters
723            let mut param_idx = type_start + start_blocking_types.len();
724
725            // Build agent qualification filters using junction tables
726            let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
727                // For agent_tags_all (AND): agent must have ALL needed tags
728                // Count how many of the task's needed_tags match agent's tags
729                // Either the task has no needed_tags, or all must match
730                let needed_placeholders: Vec<String> = tags
731                    .iter()
732                    .enumerate()
733                    .map(|(i, _)| format!("?{}", param_idx + i))
734                    .collect();
735                param_idx += tags.len();
736
737                let needed_clause = if needed_placeholders.is_empty() {
738                    // Agent has no tags - only match tasks with no needed_tags
739                    "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)"
740                        .to_string()
741                } else {
742                    // Task must have no needed_tags OR agent must have all of them
743                    format!(
744                        "AND (
745                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
746                            OR (
747                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
748                            ) = (
749                                SELECT COUNT(*) FROM task_needed_tags 
750                                WHERE task_id = t.id AND tag IN ({})
751                            )
752                        )",
753                        needed_placeholders.join(", ")
754                    )
755                };
756
757                // For agent_tags_any (OR): agent must have at least ONE wanted tag
758                let wanted_placeholders: Vec<String> = tags
759                    .iter()
760                    .enumerate()
761                    .map(|(i, _)| format!("?{}", param_idx + i))
762                    .collect();
763
764                let wanted_clause = if wanted_placeholders.is_empty() {
765                    // Agent has no tags - only match tasks with no wanted_tags
766                    "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)"
767                        .to_string()
768                } else {
769                    // Task must have no wanted_tags OR agent must have at least one
770                    format!(
771                        "AND (
772                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
773                            OR EXISTS (
774                                SELECT 1 FROM task_wanted_tags 
775                                WHERE task_id = t.id AND tag IN ({})
776                            )
777                        )",
778                        wanted_placeholders.join(", ")
779                    )
780                };
781
782                (needed_clause, wanted_clause)
783            } else {
784                (String::new(), String::new())
785            };
786
787            let sql = format!(
788                "SELECT t.*
789                 FROM tasks t
790                 WHERE t.status = ?1
791                 AND t.worker_id IS NULL
792                 AND t.deleted_at IS NULL
793                 AND NOT EXISTS (
794                     SELECT 1 FROM dependencies d
795                     INNER JOIN tasks blocker ON d.from_task_id = blocker.id
796                     WHERE d.to_task_id = t.id 
797                     AND d.dep_type IN ({})
798                     AND blocker.status IN ({})
799                 )
800                 {}
801                 {}
802                 ORDER BY {}",
803                type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
804            );
805
806            let mut stmt = conn.prepare(&sql)?;
807
808            // Build params: initial state + blocking states + types + agent tags (twice if present)
809            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
810            params_vec.push(Box::new(states_config.initial.clone()));
811            for state in &states_config.blocking_states {
812                params_vec.push(Box::new(state.clone()));
813            }
814            for t in &start_blocking_types {
815                params_vec.push(Box::new(t.to_string()));
816            }
817            // Add agent tags twice (once for needed_tags check, once for wanted_tags check)
818            if let Some(ref tags) = agent_tags {
819                for tag in tags {
820                    params_vec.push(Box::new(tag.clone()));
821                }
822                for tag in tags {
823                    params_vec.push(Box::new(tag.clone()));
824                }
825            }
826            let params_refs: Vec<&dyn rusqlite::ToSql> =
827                params_vec.iter().map(|b| b.as_ref()).collect();
828
829            let tasks: Vec<Task> = stmt
830                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
831                .filter_map(|r| r.ok())
832                .collect();
833
834            Ok(tasks)
835        })
836    }
837
838    /// Check if a task has unmet start dependencies.
839    #[allow(dead_code)]
840    pub fn has_unmet_start_dependencies(
841        &self,
842        task_id: &str,
843        states_config: &StatesConfig,
844        deps_config: &DependenciesConfig,
845    ) -> Result<bool> {
846        let start_blocking_types = deps_config.start_blocking_types();
847        if start_blocking_types.is_empty() {
848            return Ok(false);
849        }
850
851        self.with_conn(|conn| {
852            // Build IN clause from blocking_states
853            let state_placeholders: Vec<String> = states_config
854                .blocking_states
855                .iter()
856                .enumerate()
857                .map(|(i, _)| format!("?{}", i + 2))
858                .collect();
859            let state_clause = state_placeholders.join(", ");
860
861            // Build IN clause from types
862            let type_start = states_config.blocking_states.len() + 2;
863            let type_placeholders: Vec<String> = start_blocking_types
864                .iter()
865                .enumerate()
866                .map(|(i, _)| format!("?{}", type_start + i))
867                .collect();
868            let type_clause = type_placeholders.join(", ");
869
870            let sql = format!(
871                "SELECT COUNT(*) FROM dependencies d
872                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
873                 WHERE d.to_task_id = ?1 
874                 AND d.dep_type IN ({})
875                 AND blocker.status IN ({})",
876                type_clause, state_clause
877            );
878
879            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
880            params_vec.push(Box::new(task_id.to_string()));
881            for state in &states_config.blocking_states {
882                params_vec.push(Box::new(state.clone()));
883            }
884            for t in &start_blocking_types {
885                params_vec.push(Box::new(t.to_string()));
886            }
887            let params_refs: Vec<&dyn rusqlite::ToSql> =
888                params_vec.iter().map(|b| b.as_ref()).collect();
889
890            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
891
892            Ok(count > 0)
893        })
894    }
895
896    /// Check if a task has incomplete children (blocking completion).
897    pub fn has_incomplete_children(
898        &self,
899        task_id: &str,
900        states_config: &StatesConfig,
901    ) -> Result<bool> {
902        self.with_conn(|conn| {
903            // Build IN clause from blocking_states
904            let state_placeholders: Vec<String> = states_config
905                .blocking_states
906                .iter()
907                .enumerate()
908                .map(|(i, _)| format!("?{}", i + 2))
909                .collect();
910            let state_clause = state_placeholders.join(", ");
911
912            let sql = format!(
913                "SELECT COUNT(*) FROM dependencies d
914                 INNER JOIN tasks child ON d.to_task_id = child.id
915                 WHERE d.from_task_id = ?1 
916                 AND d.dep_type = 'contains'
917                 AND child.status IN ({})",
918                state_clause
919            );
920
921            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
922            params_vec.push(Box::new(task_id.to_string()));
923            for state in &states_config.blocking_states {
924                params_vec.push(Box::new(state.clone()));
925            }
926            let params_refs: Vec<&dyn rusqlite::ToSql> =
927                params_vec.iter().map(|b| b.as_ref()).collect();
928
929            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
930
931            Ok(count > 0)
932        })
933    }
934
935    /// Get tasks with tag-based filtering using junction tables for indexed lookups.
936    /// - `tags_any`: Task must have at least one of these tags (OR)
937    /// - `tags_all`: Task must have all of these tags (AND)
938    /// - `qualified_for_agent_tags`: If provided, only return tasks where these tags satisfy the task's agent_tags_all/agent_tags_any
939    ///
940    /// Excludes soft-deleted tasks.
941    #[allow(clippy::too_many_arguments)]
942    pub fn list_tasks_with_tag_filters(
943        &self,
944        status: Option<Vec<String>>,
945        owner: Option<&str>,
946        parent_id: Option<Option<&str>>,
947        tags_any: Option<Vec<String>>,
948        tags_all: Option<Vec<String>>,
949        qualified_for_agent_tags: Option<Vec<String>>,
950        limit: Option<i32>,
951        offset: i32,
952        sort_by: Option<&str>,
953        sort_order: Option<&str>,
954    ) -> Result<Vec<Task>> {
955        self.with_conn(|conn| {
956            let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
957            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
958            let mut param_idx = 1;
959
960            // Status filter (can be single or multiple)
961            if let Some(ref statuses) = status {
962                if statuses.len() == 1 {
963                    sql.push_str(&format!(" AND t.status = ?{}", param_idx));
964                    params_vec.push(Box::new(statuses[0].clone()));
965                    param_idx += 1;
966                } else if statuses.len() > 1 {
967                    let placeholders: Vec<String> = statuses
968                        .iter()
969                        .enumerate()
970                        .map(|(i, _)| format!("?{}", param_idx + i))
971                        .collect();
972                    sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
973                    for s in statuses {
974                        params_vec.push(Box::new(s.clone()));
975                    }
976                    param_idx += statuses.len();
977                }
978            }
979
980            // Owner filter
981            if let Some(o) = owner {
982                sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
983                params_vec.push(Box::new(o.to_string()));
984                param_idx += 1;
985            }
986
987            // Parent filter via dependencies table
988            if let Some(p) = parent_id {
989                match p {
990                    Some(pid) => {
991                        sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
992                        params_vec.push(Box::new(pid.to_string()));
993                        param_idx += 1;
994                    }
995                    None => {
996                        // Root tasks: not contained by any other task
997                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
998                    }
999                }
1000            }
1001
1002            // tags_any: Task must have at least one of these tags (OR) - uses task_tags junction
1003            if let Some(ref any_tags) = tags_any
1004                && !any_tags.is_empty() {
1005                    let placeholders: Vec<String> = any_tags
1006                        .iter()
1007                        .enumerate()
1008                        .map(|(i, _)| format!("?{}", param_idx + i))
1009                        .collect();
1010                    sql.push_str(&format!(
1011                        " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
1012                        placeholders.join(", ")
1013                    ));
1014                    for tag in any_tags {
1015                        params_vec.push(Box::new(tag.clone()));
1016                    }
1017                    param_idx += any_tags.len();
1018                }
1019
1020            // tags_all: Task must have all of these tags (AND) - uses task_tags junction
1021            if let Some(ref all_tags) = tags_all
1022                && !all_tags.is_empty() {
1023                    let placeholders: Vec<String> = all_tags
1024                        .iter()
1025                        .enumerate()
1026                        .map(|(i, _)| format!("?{}", param_idx + i))
1027                        .collect();
1028                    // Count matching tags must equal total required tags
1029                    sql.push_str(&format!(
1030                        " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
1031                        placeholders.join(", "),
1032                        all_tags.len()
1033                    ));
1034                    for tag in all_tags {
1035                        params_vec.push(Box::new(tag.clone()));
1036                    }
1037                    param_idx += all_tags.len();
1038                }
1039
1040            // qualified_for: Agent's tags must satisfy task's requirements - uses junction tables
1041            if let Some(ref agent_tags) = qualified_for_agent_tags {
1042                // Agent must have ALL of task's agent_tags_all
1043                if agent_tags.is_empty() {
1044                    // Agent has no tags - only match tasks with no needed_tags
1045                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
1046                    // And no wanted_tags
1047                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
1048                } else {
1049                    // For needed_tags (AND): task must have no needed_tags OR agent has all
1050                    let needed_placeholders: Vec<String> = agent_tags
1051                        .iter()
1052                        .enumerate()
1053                        .map(|(i, _)| format!("?{}", param_idx + i))
1054                        .collect();
1055                    sql.push_str(&format!(
1056                        " AND (
1057                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
1058                            OR (
1059                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
1060                            ) = (
1061                                SELECT COUNT(*) FROM task_needed_tags 
1062                                WHERE task_id = t.id AND tag IN ({})
1063                            )
1064                        )",
1065                        needed_placeholders.join(", ")
1066                    ));
1067                    for tag in agent_tags {
1068                        params_vec.push(Box::new(tag.clone()));
1069                    }
1070                    param_idx += agent_tags.len();
1071
1072                    // For wanted_tags (OR): task must have no wanted_tags OR agent has at least one
1073                    let wanted_placeholders: Vec<String> = agent_tags
1074                        .iter()
1075                        .enumerate()
1076                        .map(|(i, _)| format!("?{}", param_idx + i))
1077                        .collect();
1078                    sql.push_str(&format!(
1079                        " AND (
1080                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1081                            OR EXISTS (
1082                                SELECT 1 FROM task_wanted_tags 
1083                                WHERE task_id = t.id AND tag IN ({})
1084                            )
1085                        )",
1086                        wanted_placeholders.join(", ")
1087                    ));
1088                    for tag in agent_tags {
1089                        params_vec.push(Box::new(tag.clone()));
1090                    }
1091                    // param_idx += agent_tags.len(); // not needed, last param set
1092                }
1093            }
1094
1095            // Build ORDER BY clause
1096            let order_clause = build_order_clause(sort_by, sort_order);
1097            sql.push_str(&format!(" ORDER BY {}", order_clause));
1098
1099            // Apply limit in SQL
1100            if let Some(l) = limit {
1101                sql.push_str(&format!(" LIMIT {}", l));
1102            }
1103
1104            if offset > 0 {
1105                sql.push_str(&format!(" OFFSET {}", offset));
1106            }
1107
1108            let params_refs: Vec<&dyn rusqlite::ToSql> =
1109                params_vec.iter().map(|b| b.as_ref()).collect();
1110
1111            let mut stmt = conn.prepare(&sql)?;
1112            let tasks: Vec<Task> = stmt
1113                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1114                .filter_map(|r| r.ok())
1115                .collect();
1116
1117            Ok(tasks)
1118        })
1119    }
1120
1121    /// Get agent tags by agent ID.
1122    pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1123        self.with_conn(|conn| {
1124            let result: Result<String, rusqlite::Error> = conn.query_row(
1125                "SELECT tags FROM workers WHERE id = ?1",
1126                params![agent_id],
1127                |row| row.get(0),
1128            );
1129
1130            match result {
1131                Ok(tags_json) => {
1132                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1133                    Ok(tags)
1134                }
1135                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1136                Err(e) => Err(e.into()),
1137            }
1138        })
1139    }
1140
1141    /// Internal helper: add dependency within a transaction (used by tasks.rs).
1142    pub(super) fn add_dependency_internal(
1143        conn: &Connection,
1144        from_task_id: &str,
1145        to_task_id: &str,
1146        dep_type: &str,
1147    ) -> Result<()> {
1148        conn.execute(
1149            "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1150            params![from_task_id, to_task_id, dep_type],
1151        )?;
1152        Ok(())
1153    }
1154
1155    /// Atomically relink dependencies: unlink all prev_from→prev_to, then link all from→to.
1156    /// This is a transaction-safe operation for moving children between parents.
1157    /// Returns a result with unlinked and linked pairs.
1158    pub fn relink(
1159        &self,
1160        prev_from_ids: &[String],
1161        prev_to_ids: &[String],
1162        from_ids: &[String],
1163        to_ids: &[String],
1164        dep_type: &str,
1165        deps_config: &DependenciesConfig,
1166    ) -> Result<RelinkResult> {
1167        // Validate dependency type upfront
1168        if !deps_config.is_valid_dep_type(dep_type) {
1169            return Err(anyhow!(
1170                "Invalid dependency type '{}'. Valid types: {:?}",
1171                dep_type,
1172                deps_config.dep_type_names()
1173            ));
1174        }
1175
1176        let def = deps_config
1177            .get_definition(dep_type)
1178            .ok_or_else(|| anyhow!("Unknown dependency type: {}", dep_type))?;
1179        let is_vertical = def.display == DependencyDisplay::Vertical;
1180
1181        self.with_conn_mut(|conn| {
1182            let tx = conn.transaction()?;
1183
1184            let mut unlinked = Vec::new();
1185            let mut linked = Vec::new();
1186            let mut errors = Vec::new();
1187
1188            // Phase 1: Unlink all prev_from × prev_to
1189            for prev_from in prev_from_ids {
1190                for prev_to in prev_to_ids {
1191                    let rows = tx.execute(
1192                        "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1193                        params![prev_from, prev_to, dep_type],
1194                    )?;
1195                    if rows > 0 {
1196                        unlinked.push((prev_from.clone(), prev_to.clone()));
1197                    }
1198                }
1199            }
1200
1201            // Phase 2: Link all from × to (with validation)
1202            for from_id in from_ids {
1203                for to_id in to_ids {
1204                    // For vertical deps, check single-parent constraint
1205                    if is_vertical {
1206                        let existing_parent: Option<String> = tx.query_row(
1207                            "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1208                            params![to_id],
1209                            |row| row.get(0),
1210                        ).optional()?;
1211
1212                        if let Some(ref parent) = existing_parent
1213                            && parent != from_id {
1214                                errors.push(format!(
1215                                    "Task {} already has parent {}",
1216                                    to_id, parent
1217                                ));
1218                                continue;
1219                            }
1220                    }
1221
1222                    // Check for cycles using temporary view within transaction
1223                    if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1224                        errors.push(format!(
1225                            "Adding dependency {}→{} would create a cycle",
1226                            from_id, to_id
1227                        ));
1228                        continue;
1229                    }
1230
1231                    tx.execute(
1232                        "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1233                        params![from_id, to_id, dep_type],
1234                    )?;
1235                    linked.push((from_id.clone(), to_id.clone()));
1236                }
1237            }
1238
1239            if !errors.is_empty() {
1240                // Rollback on validation errors
1241                tx.rollback()?;
1242                return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1243            }
1244
1245            tx.commit()?;
1246            Ok(RelinkResult { unlinked, linked })
1247        })
1248    }
1249
1250    // ============================================================================
1251    // Graph Traversal Methods for scan tool
1252    // ============================================================================
1253
1254    /// Get predecessors (tasks that block this task) via blocks/follows dependencies.
1255    /// depth: 0 = none, N = N levels, -1 = all
1256    pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1257        if depth == 0 {
1258            return Ok(vec![]);
1259        }
1260
1261        self.with_conn(|conn| {
1262            let mut visited: HashSet<String> = HashSet::new();
1263            let mut result: Vec<Task> = Vec::new();
1264            let mut current_level: Vec<String> = vec![task_id.to_string()];
1265            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1266
1267            while !current_level.is_empty() && levels_remaining > 0 {
1268                let mut next_level: Vec<String> = Vec::new();
1269
1270                for tid in &current_level {
1271                    // Get tasks that block this one (from_task_id blocks to_task_id)
1272                    let mut stmt = conn.prepare(
1273                        "SELECT DISTINCT d.from_task_id FROM dependencies d
1274                         WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1275                    )?;
1276
1277                    let predecessors: Vec<String> = stmt
1278                        .query_map(params![tid], |row| row.get(0))?
1279                        .filter_map(|r| r.ok())
1280                        .collect();
1281
1282                    for pred_id in predecessors {
1283                        if !visited.contains(&pred_id) {
1284                            visited.insert(pred_id.clone());
1285                            if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1286                                result.push(task);
1287                            }
1288                            next_level.push(pred_id);
1289                        }
1290                    }
1291                }
1292
1293                current_level = next_level;
1294                levels_remaining -= 1;
1295            }
1296
1297            Ok(result)
1298        })
1299    }
1300
1301    /// Get successors (tasks that this task blocks) via blocks/follows dependencies.
1302    /// depth: 0 = none, N = N levels, -1 = all
1303    pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1304        if depth == 0 {
1305            return Ok(vec![]);
1306        }
1307
1308        self.with_conn(|conn| {
1309            let mut visited: HashSet<String> = HashSet::new();
1310            let mut result: Vec<Task> = Vec::new();
1311            let mut current_level: Vec<String> = vec![task_id.to_string()];
1312            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1313
1314            while !current_level.is_empty() && levels_remaining > 0 {
1315                let mut next_level: Vec<String> = Vec::new();
1316
1317                for tid in &current_level {
1318                    // Get tasks that this one blocks (from_task_id blocks to_task_id)
1319                    let mut stmt = conn.prepare(
1320                        "SELECT DISTINCT d.to_task_id FROM dependencies d
1321                         WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')",
1322                    )?;
1323
1324                    let successors: Vec<String> = stmt
1325                        .query_map(params![tid], |row| row.get(0))?
1326                        .filter_map(|r| r.ok())
1327                        .collect();
1328
1329                    for succ_id in successors {
1330                        if !visited.contains(&succ_id) {
1331                            visited.insert(succ_id.clone());
1332                            if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1333                                result.push(task);
1334                            }
1335                            next_level.push(succ_id);
1336                        }
1337                    }
1338                }
1339
1340                current_level = next_level;
1341                levels_remaining -= 1;
1342            }
1343
1344            Ok(result)
1345        })
1346    }
1347
1348    /// Get ancestors (parent chain) via contains dependency.
1349    /// depth: 0 = none, N = N levels up, -1 = all
1350    pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1351        if depth == 0 {
1352            return Ok(vec![]);
1353        }
1354
1355        self.with_conn(|conn| {
1356            let mut result: Vec<Task> = Vec::new();
1357            let mut current_id = task_id.to_string();
1358            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1359
1360            while levels_remaining > 0 {
1361                // Get parent (from_task_id contains to_task_id)
1362                let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1363                    "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1364                    params![&current_id],
1365                    |row| row.get(0),
1366                );
1367
1368                match parent_result {
1369                    Ok(parent_id) => {
1370                        if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1371                            result.push(task);
1372                        }
1373                        current_id = parent_id;
1374                        levels_remaining -= 1;
1375                    }
1376                    Err(rusqlite::Error::QueryReturnedNoRows) => break,
1377                    Err(e) => return Err(e.into()),
1378                }
1379            }
1380
1381            Ok(result)
1382        })
1383    }
1384
1385    /// Get descendants (children tree) via contains dependency.
1386    /// depth: 0 = none, N = N levels down, -1 = all
1387    pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1388        if depth == 0 {
1389            return Ok(vec![]);
1390        }
1391
1392        self.with_conn(|conn| {
1393            let mut visited: HashSet<String> = HashSet::new();
1394            let mut result: Vec<Task> = Vec::new();
1395            let mut current_level: Vec<String> = vec![task_id.to_string()];
1396            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1397
1398            while !current_level.is_empty() && levels_remaining > 0 {
1399                let mut next_level: Vec<String> = Vec::new();
1400
1401                for tid in &current_level {
1402                    // Get children (from_task_id contains to_task_id)
1403                    let mut stmt = conn.prepare(
1404                        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1405                    )?;
1406
1407                    let children: Vec<String> = stmt
1408                        .query_map(params![tid], |row| row.get(0))?
1409                        .filter_map(|r| r.ok())
1410                        .collect();
1411
1412                    for child_id in children {
1413                        if !visited.contains(&child_id) {
1414                            visited.insert(child_id.clone());
1415                            if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1416                                result.push(task);
1417                            }
1418                            next_level.push(child_id);
1419                        }
1420                    }
1421                }
1422
1423                current_level = next_level;
1424                levels_remaining -= 1;
1425            }
1426
1427            Ok(result)
1428        })
1429    }
1430}
1431
1432/// Helper to get a task by ID within a connection context.
1433fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1434    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1435    let task = stmt
1436        .query_row(params![task_id], super::tasks::parse_task_row)
1437        .optional()?;
1438    Ok(task)
1439}
1440
1441/// Get the IDs of tasks that block a given task from starting (unsatisfied dependencies).
1442/// A task blocks starting if it has a start-blocking dependency type and is in a blocking state.
1443/// This is a transaction-safe version for use within existing transactions.
1444pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1445    conn: &Connection,
1446    task_id: &str,
1447    states_config: &StatesConfig,
1448    deps_config: &DependenciesConfig,
1449) -> Result<Vec<String>> {
1450    let start_blocking_types = deps_config.start_blocking_types();
1451    if start_blocking_types.is_empty() {
1452        return Ok(vec![]);
1453    }
1454
1455    // Build IN clause from blocking_states
1456    let state_placeholders: Vec<String> = states_config
1457        .blocking_states
1458        .iter()
1459        .enumerate()
1460        .map(|(i, _)| format!("?{}", i + 2))
1461        .collect();
1462    let state_clause = state_placeholders.join(", ");
1463
1464    // Build IN clause from types
1465    let type_start = states_config.blocking_states.len() + 2;
1466    let type_placeholders: Vec<String> = start_blocking_types
1467        .iter()
1468        .enumerate()
1469        .map(|(i, _)| format!("?{}", type_start + i))
1470        .collect();
1471    let type_clause = type_placeholders.join(", ");
1472
1473    let sql = format!(
1474        "SELECT blocker.id FROM dependencies d
1475         INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1476         WHERE d.to_task_id = ?1 
1477         AND d.dep_type IN ({})
1478         AND blocker.status IN ({})",
1479        type_clause, state_clause
1480    );
1481
1482    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1483    params_vec.push(Box::new(task_id.to_string()));
1484    for state in &states_config.blocking_states {
1485        params_vec.push(Box::new(state.clone()));
1486    }
1487    for t in &start_blocking_types {
1488        params_vec.push(Box::new(t.to_string()));
1489    }
1490    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1491
1492    let mut stmt = conn.prepare(&sql)?;
1493    let blockers = stmt
1494        .query_map(params_refs.as_slice(), |row| {
1495            let id: String = row.get(0)?;
1496            Ok(id)
1497        })?
1498        .filter_map(|r| r.ok())
1499        .collect();
1500
1501    Ok(blockers)
1502}
1503
1504/// Propagate unblock effects when a task transitions out of a blocking state.
1505/// This is called after a task completes to find newly unblocked tasks and
1506/// optionally auto-advance them.
1507///
1508/// Returns (unblocked, auto_advanced):
1509/// - unblocked: All task IDs that are now ready (all dependencies satisfied)
1510/// - auto_advanced: Subset of unblocked that were actually transitioned (when enabled)
1511///
1512/// Algorithm:
1513/// 1. Find all tasks that have a start-blocking dependency on the completed task
1514/// 2. For each candidate:
1515///    - Skip if not in initial state or already claimed
1516///    - Check if ALL other start-blockers are also satisfied
1517///    - If fully unblocked → add to unblocked list
1518///    - If auto_advance enabled → also transition to target_state
1519/// 3. Return both lists
1520pub(crate) fn propagate_unblock_effects(
1521    conn: &Connection,
1522    completed_task_id: &str,
1523    agent_id: Option<&str>,
1524    states_config: &StatesConfig,
1525    deps_config: &DependenciesConfig,
1526    auto_advance: &AutoAdvanceConfig,
1527) -> Result<(Vec<String>, Vec<String>)> {
1528    // Get start-blocking dependency types
1529    let start_blocking_types = deps_config.start_blocking_types();
1530    if start_blocking_types.is_empty() {
1531        return Ok((vec![], vec![]));
1532    }
1533
1534    // Find all tasks that depend on the completed task via start-blocking dependencies
1535    let type_placeholders: Vec<String> = start_blocking_types
1536        .iter()
1537        .enumerate()
1538        .map(|(i, _)| format!("?{}", i + 2))
1539        .collect();
1540    let type_clause = type_placeholders.join(", ");
1541
1542    let sql = format!(
1543        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1544        type_clause
1545    );
1546
1547    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1548    params_vec.push(Box::new(completed_task_id.to_string()));
1549    for t in &start_blocking_types {
1550        params_vec.push(Box::new(t.to_string()));
1551    }
1552    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
1553
1554    let mut stmt = conn.prepare(&sql)?;
1555    let dependent_task_ids: Vec<String> = stmt
1556        .query_map(params_refs.as_slice(), |row| row.get(0))?
1557        .filter_map(|r| r.ok())
1558        .collect();
1559
1560    let mut unblocked = Vec::new();
1561    let mut auto_advanced = Vec::new();
1562    let now = super::now_ms();
1563
1564    // Determine if we should auto-advance
1565    let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1566    let target_state = auto_advance.target_state.clone();
1567
1568    // Validate target state if auto-advance is enabled
1569    if should_auto_advance {
1570        let ts = target_state.as_ref().unwrap();
1571        if !states_config.is_valid_state(ts) {
1572            return Err(anyhow!(
1573                "Auto-advance target state '{}' is not a valid state",
1574                ts
1575            ));
1576        }
1577    }
1578
1579    for task_id in dependent_task_ids {
1580        // Get the task
1581        let task = match get_task_by_id_internal(conn, &task_id)? {
1582            Some(t) => t,
1583            None => continue,
1584        };
1585
1586        // Skip if not in initial state
1587        if task.status != states_config.initial {
1588            continue;
1589        }
1590
1591        // Skip if task is already claimed
1592        if task.worker_id.is_some() {
1593            continue;
1594        }
1595
1596        // Check if ALL start-blockers are now satisfied (not in blocking states)
1597        // Build query to count remaining blockers that are still blocking
1598        let state_placeholders: Vec<String> = states_config
1599            .blocking_states
1600            .iter()
1601            .enumerate()
1602            .map(|(i, _)| format!("?{}", i + 3))
1603            .collect();
1604        let state_clause = state_placeholders.join(", ");
1605
1606        // Reuse type_placeholders from above
1607        let type_start = states_config.blocking_states.len() + 3;
1608        let type_placeholders2: Vec<String> = start_blocking_types
1609            .iter()
1610            .enumerate()
1611            .map(|(i, _)| format!("?{}", type_start + i))
1612            .collect();
1613        let type_clause2 = type_placeholders2.join(", ");
1614
1615        let blocker_sql = format!(
1616            "SELECT COUNT(*) FROM dependencies d
1617             INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1618             WHERE d.to_task_id = ?1
1619             AND d.from_task_id != ?2
1620             AND d.dep_type IN ({})
1621             AND blocker.status IN ({})",
1622            type_clause2, state_clause
1623        );
1624
1625        let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1626        blocker_params.push(Box::new(task_id.clone()));
1627        blocker_params.push(Box::new(completed_task_id.to_string()));
1628        for state in &states_config.blocking_states {
1629            blocker_params.push(Box::new(state.clone()));
1630        }
1631        for t in &start_blocking_types {
1632            blocker_params.push(Box::new(t.to_string()));
1633        }
1634        let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1635            blocker_params.iter().map(|b| b.as_ref()).collect();
1636
1637        let remaining_blockers: i32 =
1638            conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1639
1640        if remaining_blockers > 0 {
1641            continue; // Still blocked by other tasks
1642        }
1643
1644        // Task is now fully unblocked - add to unblocked list
1645        unblocked.push(task_id.clone());
1646
1647        // Auto-advance if enabled and transition is valid
1648        if should_auto_advance {
1649            let ts = target_state.as_ref().unwrap();
1650
1651            // Validate transition from initial to target_state
1652            if !states_config.is_valid_transition(&states_config.initial, ts) {
1653                // Skip auto-advance for this task - transition not allowed
1654                continue;
1655            }
1656
1657            // Auto-advance: update the task's status
1658            conn.execute(
1659                "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1660                params![ts, now, &task_id],
1661            )?;
1662
1663            // Record state transition
1664            let reason = format!("auto-advanced: blocker '{}' completed", completed_task_id);
1665            super::state_transitions::record_state_transition(
1666                conn,
1667                &task_id,
1668                ts,
1669                agent_id,
1670                Some(&reason),
1671                states_config,
1672            )?;
1673
1674            auto_advanced.push(task_id);
1675        }
1676    }
1677
1678    Ok((unblocked, auto_advanced))
1679}