Skip to main content

task_graph_mcp/db/
deps.rs

1//! Dependency operations and cycle detection with typed dependencies.
2
3use super::Database;
4use crate::config::{AutoAdvanceConfig, DependenciesConfig, DependencyDisplay, StatesConfig};
5use crate::types::{Dependency, Task};
6use anyhow::{anyhow, Result};
7use rusqlite::{params, Connection, OptionalExtension};
8use std::collections::{HashSet, VecDeque};
9
10/// Result of a relink operation.
11#[derive(Debug)]
12pub struct RelinkResult {
13    /// Dependencies that were unlinked (from, to).
14    pub unlinked: Vec<(String, String)>,
15    /// Dependencies that were linked (from, to).
16    pub linked: Vec<(String, String)>,
17}
18
19/// Check if adding a dependency would create a cycle (transaction-safe version).
20fn would_create_cycle_in_tx(
21    tx: &rusqlite::Transaction,
22    from_task_id: &str,
23    to_task_id: &str,
24    dep_type: &str,
25    deps_config: &DependenciesConfig,
26) -> Result<bool> {
27    let def = deps_config.get_definition(dep_type).unwrap();
28
29    // A cycle would occur if to_task can already reach from_task
30    // through the same "graph" (horizontal or vertical)
31    let mut visited: HashSet<String> = HashSet::new();
32    let mut queue: VecDeque<String> = VecDeque::new();
33    queue.push_back(to_task_id.to_string());
34
35    while let Some(current) = queue.pop_front() {
36        if current == from_task_id {
37            return Ok(true); // Would create a cycle
38        }
39
40        if visited.contains(&current) {
41            continue;
42        }
43        visited.insert(current.clone());
44
45        // Get all tasks that current points to (in the relevant graph)
46        let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
47            // For vertical deps, only check containment relationships
48            let mut stmt = tx.prepare(
49                "SELECT to_task_id FROM dependencies d
50                 JOIN (SELECT value FROM json_each(?1)) types
51                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
52            )?;
53            let vertical_types: Vec<&str> = deps_config.vertical_types();
54            let types_json = serde_json::to_string(&vertical_types)?;
55            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
56                .filter_map(|r| r.ok())
57                .collect()
58        } else {
59            // For horizontal deps, check all start-blocking relationships
60            let mut stmt = tx.prepare(
61                "SELECT to_task_id FROM dependencies d
62                 JOIN (SELECT value FROM json_each(?1)) types
63                 WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
64            )?;
65            let start_blocking: Vec<&str> = deps_config.start_blocking_types();
66            let types_json = serde_json::to_string(&start_blocking)?;
67            stmt.query_map(params![&types_json, &current], |row| row.get(0))?
68                .filter_map(|r| r.ok())
69                .collect()
70        };
71
72        for dep in deps {
73            if !visited.contains(&dep) {
74                queue.push_back(dep);
75            }
76        }
77    }
78
79    Ok(false)
80}
81
82/// Build an ORDER BY clause from sort_by and sort_order parameters.
83/// Returns a safe SQL ORDER BY expression.
84fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
85    let field = match sort_by {
86        Some("priority") => "CAST(t.priority AS INTEGER)",
87        Some("created_at") => "t.created_at",
88        Some("updated_at") => "t.updated_at",
89        _ => "t.created_at", // default
90    };
91
92    let order = match sort_order {
93        Some("asc") => "ASC",
94        Some("desc") => "DESC",
95        _ => {
96            // Default: priority is descending (higher number = more important), dates are descending
97            "DESC"
98        }
99    };
100
101    format!("{} {}", field, order)
102}
103
104impl Database {
105    /// Add a typed dependency (from blocks/contains to).
106    pub fn add_dependency(
107        &self,
108        from_task_id: &str,
109        to_task_id: &str,
110        dep_type: &str,
111        deps_config: &DependenciesConfig,
112    ) -> Result<()> {
113        // Validate dependency type
114        if !deps_config.is_valid_dep_type(dep_type) {
115            return Err(anyhow!(
116                "Invalid dependency type '{}'. Valid types: {:?}",
117                dep_type,
118                deps_config.dep_type_names()
119            ));
120        }
121
122        // For vertical (contains) dependencies, check single-parent constraint
123        let def = deps_config.get_definition(dep_type).unwrap();
124        if def.display == DependencyDisplay::Vertical {
125            if let Some(existing_parent) = self.get_parent(to_task_id)? {
126                if existing_parent != from_task_id {
127                    return Err(anyhow!(
128                        "Task {} already has parent {}",
129                        to_task_id,
130                        existing_parent
131                    ));
132                }
133            }
134        }
135
136        // Check for cycle in the appropriate graph
137        if self.would_create_cycle(from_task_id, to_task_id, dep_type, deps_config)? {
138            return Err(anyhow!("Adding this dependency would create a cycle"));
139        }
140
141        self.with_conn(|conn| {
142            conn.execute(
143                "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
144                params![from_task_id, to_task_id, dep_type],
145            )?;
146            Ok(())
147        })
148    }
149
150    /// Check if adding a dependency would create a cycle.
151    /// For horizontal deps: check cycle in the start-blocking graph.
152    /// For vertical deps: check containment cycle.
153    pub fn would_create_cycle(
154        &self,
155        from_task_id: &str,
156        to_task_id: &str,
157        dep_type: &str,
158        deps_config: &DependenciesConfig,
159    ) -> Result<bool> {
160        let def = deps_config.get_definition(dep_type).unwrap();
161
162        self.with_conn(|conn| {
163            // A cycle would occur if to_task can already reach from_task
164            // through the same "graph" (horizontal or vertical)
165            let mut visited: HashSet<String> = HashSet::new();
166            let mut queue: VecDeque<String> = VecDeque::new();
167            queue.push_back(to_task_id.to_string());
168
169            while let Some(current) = queue.pop_front() {
170                if current == from_task_id {
171                    return Ok(true); // Would create a cycle
172                }
173
174                if visited.contains(&current) {
175                    continue;
176                }
177                visited.insert(current.clone());
178
179                // Get all tasks that current points to (in the relevant graph)
180                let deps: Vec<String> = if def.display == DependencyDisplay::Vertical {
181                    // For vertical deps, only check containment relationships
182                    let mut stmt = conn.prepare(
183                        "SELECT to_task_id FROM dependencies d
184                         JOIN (SELECT value FROM json_each(?1)) types
185                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
186                    )?;
187                    let vertical_types: Vec<&str> = deps_config.vertical_types();
188                    let types_json = serde_json::to_string(&vertical_types)?;
189                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
190                        .filter_map(|r| r.ok())
191                        .collect()
192                } else {
193                    // For horizontal deps, check all start-blocking relationships
194                    let mut stmt = conn.prepare(
195                        "SELECT to_task_id FROM dependencies d
196                         JOIN (SELECT value FROM json_each(?1)) types
197                         WHERE d.from_task_id = ?2 AND d.dep_type = types.value"
198                    )?;
199                    let start_blocking: Vec<&str> = deps_config.start_blocking_types();
200                    let types_json = serde_json::to_string(&start_blocking)?;
201                    stmt.query_map(params![&types_json, &current], |row| row.get(0))?
202                        .filter_map(|r| r.ok())
203                        .collect()
204                };
205
206                for dep in deps {
207                    if !visited.contains(&dep) {
208                        queue.push_back(dep);
209                    }
210                }
211            }
212
213            Ok(false)
214        })
215    }
216
217    /// Remove a typed dependency. Returns true if a row was deleted.
218    pub fn remove_dependency(
219        &self,
220        from_task_id: &str,
221        to_task_id: &str,
222        dep_type: &str,
223    ) -> Result<bool> {
224        self.with_conn(|conn| {
225            let rows = conn.execute(
226                "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
227                params![from_task_id, to_task_id, dep_type],
228            )?;
229            Ok(rows > 0)
230        })
231    }
232
233    /// Remove all dependencies of a given type from a task (outgoing edges).
234    /// Returns the list of removed dependencies.
235    pub fn remove_all_outgoing_dependencies(
236        &self,
237        from_task_id: &str,
238        dep_type: &str,
239    ) -> Result<Vec<Dependency>> {
240        self.with_conn_mut(|conn| {
241            let tx = conn.transaction()?;
242
243            // First get the dependencies that will be removed
244            let deps: Vec<Dependency> = {
245                let mut stmt = tx.prepare(
246                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
247                )?;
248                stmt
249                    .query_map(params![from_task_id, dep_type], |row| {
250                        Ok(Dependency {
251                            from_task_id: row.get(0)?,
252                            to_task_id: row.get(1)?,
253                            dep_type: row.get(2)?,
254                        })
255                    })?
256                    .filter_map(|r| r.ok())
257                    .collect()
258            };
259
260            // Then delete them
261            tx.execute(
262                "DELETE FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2",
263                params![from_task_id, dep_type],
264            )?;
265
266            tx.commit()?;
267            Ok(deps)
268        })
269    }
270
271    /// Remove all dependencies of a given type to a task (incoming edges).
272    /// Returns the list of removed dependencies.
273    pub fn remove_all_incoming_dependencies(
274        &self,
275        to_task_id: &str,
276        dep_type: &str,
277    ) -> Result<Vec<Dependency>> {
278        self.with_conn_mut(|conn| {
279            let tx = conn.transaction()?;
280
281            // First get the dependencies that will be removed
282            let deps: Vec<Dependency> = {
283                let mut stmt = tx.prepare(
284                    "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
285                )?;
286                stmt
287                    .query_map(params![to_task_id, dep_type], |row| {
288                        Ok(Dependency {
289                            from_task_id: row.get(0)?,
290                            to_task_id: row.get(1)?,
291                            dep_type: row.get(2)?,
292                        })
293                    })?
294                    .filter_map(|r| r.ok())
295                    .collect()
296            };
297
298            // Then delete them
299            tx.execute(
300                "DELETE FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2",
301                params![to_task_id, dep_type],
302            )?;
303
304            tx.commit()?;
305            Ok(deps)
306        })
307    }
308
309    /// Get all dependencies.
310    pub fn get_all_dependencies(&self) -> Result<Vec<Dependency>> {
311        self.with_conn(|conn| {
312            let mut stmt =
313                conn.prepare("SELECT from_task_id, to_task_id, dep_type FROM dependencies")?;
314
315            let deps = stmt
316                .query_map([], |row| {
317                    let from: String = row.get(0)?;
318                    let to: String = row.get(1)?;
319                    let dep_type: String = row.get(2)?;
320                    Ok(Dependency {
321                        from_task_id: from,
322                        to_task_id: to,
323                        dep_type,
324                    })
325                })?
326                .filter_map(|r| r.ok())
327                .collect();
328
329            Ok(deps)
330        })
331    }
332
333    /// Get dependencies of a specific type for a task.
334    pub fn get_dependencies_by_type(
335        &self,
336        task_id: &str,
337        dep_type: &str,
338        direction: &str,
339    ) -> Result<Vec<Dependency>> {
340        self.with_conn(|conn| {
341            let sql = if direction == "incoming" {
342                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE to_task_id = ?1 AND dep_type = ?2"
343            } else {
344                "SELECT from_task_id, to_task_id, dep_type FROM dependencies WHERE from_task_id = ?1 AND dep_type = ?2"
345            };
346
347            let mut stmt = conn.prepare(sql)?;
348
349            let deps = stmt
350                .query_map(params![task_id, dep_type], |row| {
351                    let from: String = row.get(0)?;
352                    let to: String = row.get(1)?;
353                    let dep_type: String = row.get(2)?;
354                    Ok(Dependency {
355                        from_task_id: from,
356                        to_task_id: to,
357                        dep_type,
358                    })
359                })?
360                .filter_map(|r| r.ok())
361                .collect();
362
363            Ok(deps)
364        })
365    }
366
367    /// Get tasks that block a given task from starting (dep_type with blocks: start).
368    pub fn get_start_blockers(
369        &self,
370        task_id: &str,
371        deps_config: &DependenciesConfig,
372    ) -> Result<Vec<String>> {
373        let start_blocking_types = deps_config.start_blocking_types();
374        if start_blocking_types.is_empty() {
375            return Ok(vec![]);
376        }
377
378        self.with_conn(|conn| {
379            let placeholders: String = start_blocking_types
380                .iter()
381                .enumerate()
382                .map(|(i, _)| format!("?{}", i + 2))
383                .collect::<Vec<_>>()
384                .join(", ");
385
386            let sql = format!(
387                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type IN ({})",
388                placeholders
389            );
390
391            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
392            params_vec.push(Box::new(task_id.to_string()));
393            for t in &start_blocking_types {
394                params_vec.push(Box::new(t.to_string()));
395            }
396            let params_refs: Vec<&dyn rusqlite::ToSql> =
397                params_vec.iter().map(|b| b.as_ref()).collect();
398
399            let mut stmt = conn.prepare(&sql)?;
400            let blockers = stmt
401                .query_map(params_refs.as_slice(), |row| {
402                    let id: String = row.get(0)?;
403                    Ok(id)
404                })?
405                .filter_map(|r| r.ok())
406                .collect();
407
408            Ok(blockers)
409        })
410    }
411
412    /// Get tasks that block a given task from completing (dep_type with blocks: completion).
413    /// For a parent task, this returns children that must complete first.
414    pub fn get_completion_blockers(
415        &self,
416        task_id: &str,
417        deps_config: &DependenciesConfig,
418    ) -> Result<Vec<String>> {
419        let completion_blocking_types = deps_config.completion_blocking_types();
420        if completion_blocking_types.is_empty() {
421            return Ok(vec![]);
422        }
423
424        self.with_conn(|conn| {
425            let placeholders: String = completion_blocking_types
426                .iter()
427                .enumerate()
428                .map(|(i, _)| format!("?{}", i + 2))
429                .collect::<Vec<_>>()
430                .join(", ");
431
432            // For completion blockers, we look at outgoing edges (from_task_id = parent)
433            // because "contains" means parent -> child, and child blocks parent completion
434            let sql = format!(
435                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
436                placeholders
437            );
438
439            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
440            params_vec.push(Box::new(task_id.to_string()));
441            for t in &completion_blocking_types {
442                params_vec.push(Box::new(t.to_string()));
443            }
444            let params_refs: Vec<&dyn rusqlite::ToSql> =
445                params_vec.iter().map(|b| b.as_ref()).collect();
446
447            let mut stmt = conn.prepare(&sql)?;
448            let blockers = stmt
449                .query_map(params_refs.as_slice(), |row| {
450                    let id: String = row.get(0)?;
451                    Ok(id)
452                })?
453                .filter_map(|r| r.ok())
454                .collect();
455
456            Ok(blockers)
457        })
458    }
459
460    /// Get the parent of a task (via 'contains' dependency).
461    pub fn get_parent(&self, task_id: &str) -> Result<Option<String>> {
462        self.with_conn(|conn| {
463            let result: Result<String, rusqlite::Error> = conn.query_row(
464                "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
465                params![task_id],
466                |row| row.get(0),
467            );
468
469            match result {
470                Ok(parent_id) => Ok(Some(parent_id)),
471                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
472                Err(e) => Err(e.into()),
473            }
474        })
475    }
476
477    /// Get children of a task (via 'contains' dependency).
478    pub fn get_children_ids(&self, task_id: &str) -> Result<Vec<String>> {
479        self.with_conn(|conn| {
480            let mut stmt = conn.prepare(
481                "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
482            )?;
483
484            let children = stmt
485                .query_map(params![task_id], |row| {
486                    let id: String = row.get(0)?;
487                    Ok(id)
488                })?
489                .filter_map(|r| r.ok())
490                .collect();
491
492            Ok(children)
493        })
494    }
495
496    /// Get all tasks that block a given task (backwards compatible).
497    /// Returns tasks from both 'blocks' and 'follows' dependencies.
498    pub fn get_blockers(&self, task_id: &str) -> Result<Vec<String>> {
499        self.with_conn(|conn| {
500            let mut stmt = conn.prepare(
501                "SELECT from_task_id FROM dependencies 
502                 WHERE to_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
503            )?;
504
505            let blockers = stmt
506                .query_map(params![task_id], |row| {
507                    let id: String = row.get(0)?;
508                    Ok(id)
509                })?
510                .filter_map(|r| r.ok())
511                .collect();
512
513            Ok(blockers)
514        })
515    }
516
517    /// Get tasks that a given task blocks.
518    #[allow(dead_code)]
519    pub fn get_blocking(&self, task_id: &str) -> Result<Vec<String>> {
520        self.with_conn(|conn| {
521            let mut stmt = conn.prepare(
522                "SELECT to_task_id FROM dependencies 
523                 WHERE from_task_id = ?1 AND dep_type IN ('blocks', 'follows')",
524            )?;
525
526            let blocking = stmt
527                .query_map(params![task_id], |row| {
528                    let id: String = row.get(0)?;
529                    Ok(id)
530                })?
531                .filter_map(|r| r.ok())
532                .collect();
533
534            Ok(blocking)
535        })
536    }
537
538    /// Get tasks that are blocked by incomplete start dependencies.
539    /// A task is blocked if any of its start-blocking dependencies are in a blocking state.
540    /// Excludes soft-deleted tasks.
541    pub fn get_blocked_tasks(
542        &self,
543        states_config: &StatesConfig,
544        deps_config: &DependenciesConfig,
545        sort_by: Option<&str>,
546        sort_order: Option<&str>,
547    ) -> Result<Vec<Task>> {
548        let start_blocking_types = deps_config.start_blocking_types();
549        if start_blocking_types.is_empty() {
550            return Ok(vec![]);
551        }
552
553        self.with_conn(|conn| {
554            // Build IN clause from blocking_states
555            let state_placeholders: Vec<String> = states_config
556                .blocking_states
557                .iter()
558                .enumerate()
559                .map(|(i, _)| format!("?{}", i + 2))
560                .collect();
561            let state_clause = state_placeholders.join(", ");
562
563            // Build IN clause from start_blocking_types
564            let type_start = states_config.blocking_states.len() + 2;
565            let type_placeholders: Vec<String> = start_blocking_types
566                .iter()
567                .enumerate()
568                .map(|(i, _)| format!("?{}", type_start + i))
569                .collect();
570            let type_clause = type_placeholders.join(", ");
571
572            // Build ORDER BY clause
573            let order_clause = build_order_clause(sort_by, sort_order);
574
575            let sql = format!(
576                "SELECT DISTINCT t.*
577                 FROM tasks t
578                 INNER JOIN dependencies d ON t.id = d.to_task_id
579                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
580                 WHERE d.dep_type IN ({})
581                 AND blocker.status IN ({})
582                 AND t.status = ?1
583                 AND t.deleted_at IS NULL
584                 ORDER BY {}",
585                type_clause, state_clause, order_clause
586            );
587
588            let mut stmt = conn.prepare(&sql)?;
589
590            // Build params: initial state + blocking states + start_blocking_types
591            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
592            params_vec.push(Box::new(states_config.initial.clone()));
593            for state in &states_config.blocking_states {
594                params_vec.push(Box::new(state.clone()));
595            }
596            for t in &start_blocking_types {
597                params_vec.push(Box::new(t.to_string()));
598            }
599            let params_refs: Vec<&dyn rusqlite::ToSql> =
600                params_vec.iter().map(|b| b.as_ref()).collect();
601
602            let tasks = stmt
603                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
604                .filter_map(|r| r.ok())
605                .collect();
606
607            Ok(tasks)
608        })
609    }
610
611    /// Get tasks that are ready to be claimed (all start dependencies satisfied).
612    /// A task is ready if it's in the initial state, unclaimed, and all start-blocking deps are not blocking.
613    /// When agent_id is provided, also filters by agent's tag qualifications using junction tables.
614    /// Excludes soft-deleted tasks.
615    pub fn get_ready_tasks(
616        &self,
617        agent_id: Option<&str>,
618        states_config: &StatesConfig,
619        deps_config: &DependenciesConfig,
620        sort_by: Option<&str>,
621        sort_order: Option<&str>,
622    ) -> Result<Vec<Task>> {
623        let start_blocking_types = deps_config.start_blocking_types();
624
625        // Get agent tags if agent_id is provided (for junction table filtering)
626        let agent_tags: Option<Vec<String>> = if let Some(aid) = agent_id {
627            Some(self.get_agent_tags(aid)?)
628        } else {
629            None
630        };
631
632        self.with_conn(|conn| {
633            // Build IN clause from blocking_states
634            let state_placeholders: Vec<String> = states_config
635                .blocking_states
636                .iter()
637                .enumerate()
638                .map(|(i, _)| format!("?{}", i + 2))
639                .collect();
640            let state_clause = state_placeholders.join(", ");
641
642            // Build IN clause from start_blocking_types
643            let type_start = states_config.blocking_states.len() + 2;
644            let type_placeholders: Vec<String> = start_blocking_types
645                .iter()
646                .enumerate()
647                .map(|(i, _)| format!("?{}", type_start + i))
648                .collect();
649            let type_clause = type_placeholders.join(", ");
650
651            // Build ORDER BY clause - for ready tasks, default is priority then created_at
652            let order_clause = if sort_by.is_some() {
653                build_order_clause(sort_by, sort_order)
654            } else {
655                // Default for ready: priority (high first), then created_at
656                "CAST(t.priority AS INTEGER) DESC, t.created_at DESC".to_string()
657            };
658
659            // Track param index for agent tag filters
660            let mut param_idx = type_start + start_blocking_types.len();
661
662            // Build agent qualification filters using junction tables
663            let (agent_needed_clause, agent_wanted_clause) = if let Some(ref tags) = agent_tags {
664                // For agent_tags_all (AND): agent must have ALL needed tags
665                // Count how many of the task's needed_tags match agent's tags
666                // Either the task has no needed_tags, or all must match
667                let needed_placeholders: Vec<String> = tags
668                    .iter()
669                    .enumerate()
670                    .map(|(i, _)| format!("?{}", param_idx + i))
671                    .collect();
672                param_idx += tags.len();
673
674                let needed_clause = if needed_placeholders.is_empty() {
675                    // Agent has no tags - only match tasks with no needed_tags
676                    "AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)".to_string()
677                } else {
678                    // Task must have no needed_tags OR agent must have all of them
679                    format!(
680                        "AND (
681                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
682                            OR (
683                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
684                            ) = (
685                                SELECT COUNT(*) FROM task_needed_tags 
686                                WHERE task_id = t.id AND tag IN ({})
687                            )
688                        )",
689                        needed_placeholders.join(", ")
690                    )
691                };
692
693                // For agent_tags_any (OR): agent must have at least ONE wanted tag
694                let wanted_placeholders: Vec<String> = tags
695                    .iter()
696                    .enumerate()
697                    .map(|(i, _)| format!("?{}", param_idx + i))
698                    .collect();
699
700                let wanted_clause = if wanted_placeholders.is_empty() {
701                    // Agent has no tags - only match tasks with no wanted_tags
702                    "AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)".to_string()
703                } else {
704                    // Task must have no wanted_tags OR agent must have at least one
705                    format!(
706                        "AND (
707                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
708                            OR EXISTS (
709                                SELECT 1 FROM task_wanted_tags 
710                                WHERE task_id = t.id AND tag IN ({})
711                            )
712                        )",
713                        wanted_placeholders.join(", ")
714                    )
715                };
716
717                (needed_clause, wanted_clause)
718            } else {
719                (String::new(), String::new())
720            };
721
722            let sql = format!(
723                "SELECT t.*
724                 FROM tasks t
725                 WHERE t.status = ?1
726                 AND t.worker_id IS NULL
727                 AND t.deleted_at IS NULL
728                 AND NOT EXISTS (
729                     SELECT 1 FROM dependencies d
730                     INNER JOIN tasks blocker ON d.from_task_id = blocker.id
731                     WHERE d.to_task_id = t.id 
732                     AND d.dep_type IN ({})
733                     AND blocker.status IN ({})
734                 )
735                 {}
736                 {}
737                 ORDER BY {}",
738                type_clause, state_clause, agent_needed_clause, agent_wanted_clause, order_clause
739            );
740
741            let mut stmt = conn.prepare(&sql)?;
742
743            // Build params: initial state + blocking states + types + agent tags (twice if present)
744            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
745            params_vec.push(Box::new(states_config.initial.clone()));
746            for state in &states_config.blocking_states {
747                params_vec.push(Box::new(state.clone()));
748            }
749            for t in &start_blocking_types {
750                params_vec.push(Box::new(t.to_string()));
751            }
752            // Add agent tags twice (once for needed_tags check, once for wanted_tags check)
753            if let Some(ref tags) = agent_tags {
754                for tag in tags {
755                    params_vec.push(Box::new(tag.clone()));
756                }
757                for tag in tags {
758                    params_vec.push(Box::new(tag.clone()));
759                }
760            }
761            let params_refs: Vec<&dyn rusqlite::ToSql> =
762                params_vec.iter().map(|b| b.as_ref()).collect();
763
764            let tasks: Vec<Task> = stmt
765                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
766                .filter_map(|r| r.ok())
767                .collect();
768
769            Ok(tasks)
770        })
771    }
772
773    /// Check if a task has unmet start dependencies.
774    #[allow(dead_code)]
775    pub fn has_unmet_start_dependencies(
776        &self,
777        task_id: &str,
778        states_config: &StatesConfig,
779        deps_config: &DependenciesConfig,
780    ) -> Result<bool> {
781        let start_blocking_types = deps_config.start_blocking_types();
782        if start_blocking_types.is_empty() {
783            return Ok(false);
784        }
785
786        self.with_conn(|conn| {
787            // Build IN clause from blocking_states
788            let state_placeholders: Vec<String> = states_config
789                .blocking_states
790                .iter()
791                .enumerate()
792                .map(|(i, _)| format!("?{}", i + 2))
793                .collect();
794            let state_clause = state_placeholders.join(", ");
795
796            // Build IN clause from types
797            let type_start = states_config.blocking_states.len() + 2;
798            let type_placeholders: Vec<String> = start_blocking_types
799                .iter()
800                .enumerate()
801                .map(|(i, _)| format!("?{}", type_start + i))
802                .collect();
803            let type_clause = type_placeholders.join(", ");
804
805            let sql = format!(
806                "SELECT COUNT(*) FROM dependencies d
807                 INNER JOIN tasks blocker ON d.from_task_id = blocker.id
808                 WHERE d.to_task_id = ?1 
809                 AND d.dep_type IN ({})
810                 AND blocker.status IN ({})",
811                type_clause, state_clause
812            );
813
814            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
815            params_vec.push(Box::new(task_id.to_string()));
816            for state in &states_config.blocking_states {
817                params_vec.push(Box::new(state.clone()));
818            }
819            for t in &start_blocking_types {
820                params_vec.push(Box::new(t.to_string()));
821            }
822            let params_refs: Vec<&dyn rusqlite::ToSql> =
823                params_vec.iter().map(|b| b.as_ref()).collect();
824
825            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
826
827            Ok(count > 0)
828        })
829    }
830
831    /// Check if a task has incomplete children (blocking completion).
832    pub fn has_incomplete_children(
833        &self,
834        task_id: &str,
835        states_config: &StatesConfig,
836    ) -> Result<bool> {
837        self.with_conn(|conn| {
838            // Build IN clause from blocking_states
839            let state_placeholders: Vec<String> = states_config
840                .blocking_states
841                .iter()
842                .enumerate()
843                .map(|(i, _)| format!("?{}", i + 2))
844                .collect();
845            let state_clause = state_placeholders.join(", ");
846
847            let sql = format!(
848                "SELECT COUNT(*) FROM dependencies d
849                 INNER JOIN tasks child ON d.to_task_id = child.id
850                 WHERE d.from_task_id = ?1 
851                 AND d.dep_type = 'contains'
852                 AND child.status IN ({})",
853                state_clause
854            );
855
856            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
857            params_vec.push(Box::new(task_id.to_string()));
858            for state in &states_config.blocking_states {
859                params_vec.push(Box::new(state.clone()));
860            }
861            let params_refs: Vec<&dyn rusqlite::ToSql> =
862                params_vec.iter().map(|b| b.as_ref()).collect();
863
864            let count: i32 = conn.query_row(&sql, params_refs.as_slice(), |row| row.get(0))?;
865
866            Ok(count > 0)
867        })
868    }
869
870    /// Get tasks with tag-based filtering using junction tables for indexed lookups.
871    /// - `tags_any`: Task must have at least one of these tags (OR)
872    /// - `tags_all`: Task must have all of these tags (AND)
873    /// - `qualified_for_agent_tags`: If provided, only return tasks where these tags satisfy the task's agent_tags_all/agent_tags_any
874    /// Excludes soft-deleted tasks.
875    pub fn list_tasks_with_tag_filters(
876        &self,
877        status: Option<Vec<String>>,
878        owner: Option<&str>,
879        parent_id: Option<Option<&str>>,
880        tags_any: Option<Vec<String>>,
881        tags_all: Option<Vec<String>>,
882        qualified_for_agent_tags: Option<Vec<String>>,
883        limit: Option<i32>,
884        sort_by: Option<&str>,
885        sort_order: Option<&str>,
886    ) -> Result<Vec<Task>> {
887        self.with_conn(|conn| {
888            let mut sql = String::from("SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL");
889            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
890            let mut param_idx = 1;
891
892            // Status filter (can be single or multiple)
893            if let Some(ref statuses) = status {
894                if statuses.len() == 1 {
895                    sql.push_str(&format!(" AND t.status = ?{}", param_idx));
896                    params_vec.push(Box::new(statuses[0].clone()));
897                    param_idx += 1;
898                } else if statuses.len() > 1 {
899                    let placeholders: Vec<String> = statuses
900                        .iter()
901                        .enumerate()
902                        .map(|(i, _)| format!("?{}", param_idx + i))
903                        .collect();
904                    sql.push_str(&format!(" AND t.status IN ({})", placeholders.join(", ")));
905                    for s in statuses {
906                        params_vec.push(Box::new(s.clone()));
907                    }
908                    param_idx += statuses.len();
909                }
910            }
911
912            // Owner filter
913            if let Some(o) = owner {
914                sql.push_str(&format!(" AND t.worker_id = ?{}", param_idx));
915                params_vec.push(Box::new(o.to_string()));
916                param_idx += 1;
917            }
918
919            // Parent filter via dependencies table
920            if let Some(p) = parent_id {
921                match p {
922                    Some(pid) => {
923                        sql.push_str(&format!(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')", param_idx));
924                        params_vec.push(Box::new(pid.to_string()));
925                        param_idx += 1;
926                    }
927                    None => {
928                        // Root tasks: not contained by any other task
929                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
930                    }
931                }
932            }
933
934            // tags_any: Task must have at least one of these tags (OR) - uses task_tags junction
935            if let Some(ref any_tags) = tags_any {
936                if !any_tags.is_empty() {
937                    let placeholders: Vec<String> = any_tags
938                        .iter()
939                        .enumerate()
940                        .map(|(i, _)| format!("?{}", param_idx + i))
941                        .collect();
942                    sql.push_str(&format!(
943                        " AND EXISTS (SELECT 1 FROM task_tags WHERE task_id = t.id AND tag IN ({}))",
944                        placeholders.join(", ")
945                    ));
946                    for tag in any_tags {
947                        params_vec.push(Box::new(tag.clone()));
948                    }
949                    param_idx += any_tags.len();
950                }
951            }
952
953            // tags_all: Task must have all of these tags (AND) - uses task_tags junction
954            if let Some(ref all_tags) = tags_all {
955                if !all_tags.is_empty() {
956                    let placeholders: Vec<String> = all_tags
957                        .iter()
958                        .enumerate()
959                        .map(|(i, _)| format!("?{}", param_idx + i))
960                        .collect();
961                    // Count matching tags must equal total required tags
962                    sql.push_str(&format!(
963                        " AND (SELECT COUNT(*) FROM task_tags WHERE task_id = t.id AND tag IN ({})) = {}",
964                        placeholders.join(", "),
965                        all_tags.len()
966                    ));
967                    for tag in all_tags {
968                        params_vec.push(Box::new(tag.clone()));
969                    }
970                    param_idx += all_tags.len();
971                }
972            }
973
974            // qualified_for: Agent's tags must satisfy task's requirements - uses junction tables
975            if let Some(ref agent_tags) = qualified_for_agent_tags {
976                // Agent must have ALL of task's agent_tags_all
977                if agent_tags.is_empty() {
978                    // Agent has no tags - only match tasks with no needed_tags
979                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)");
980                    // And no wanted_tags
981                    sql.push_str(" AND NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)");
982                } else {
983                    // For needed_tags (AND): task must have no needed_tags OR agent has all
984                    let needed_placeholders: Vec<String> = agent_tags
985                        .iter()
986                        .enumerate()
987                        .map(|(i, _)| format!("?{}", param_idx + i))
988                        .collect();
989                    sql.push_str(&format!(
990                        " AND (
991                            NOT EXISTS (SELECT 1 FROM task_needed_tags WHERE task_id = t.id)
992                            OR (
993                                SELECT COUNT(*) FROM task_needed_tags WHERE task_id = t.id
994                            ) = (
995                                SELECT COUNT(*) FROM task_needed_tags 
996                                WHERE task_id = t.id AND tag IN ({})
997                            )
998                        )",
999                        needed_placeholders.join(", ")
1000                    ));
1001                    for tag in agent_tags {
1002                        params_vec.push(Box::new(tag.clone()));
1003                    }
1004                    param_idx += agent_tags.len();
1005
1006                    // For wanted_tags (OR): task must have no wanted_tags OR agent has at least one
1007                    let wanted_placeholders: Vec<String> = agent_tags
1008                        .iter()
1009                        .enumerate()
1010                        .map(|(i, _)| format!("?{}", param_idx + i))
1011                        .collect();
1012                    sql.push_str(&format!(
1013                        " AND (
1014                            NOT EXISTS (SELECT 1 FROM task_wanted_tags WHERE task_id = t.id)
1015                            OR EXISTS (
1016                                SELECT 1 FROM task_wanted_tags 
1017                                WHERE task_id = t.id AND tag IN ({})
1018                            )
1019                        )",
1020                        wanted_placeholders.join(", ")
1021                    ));
1022                    for tag in agent_tags {
1023                        params_vec.push(Box::new(tag.clone()));
1024                    }
1025                    // param_idx += agent_tags.len(); // not needed, last param set
1026                }
1027            }
1028
1029            // Build ORDER BY clause
1030            let order_clause = build_order_clause(sort_by, sort_order);
1031            sql.push_str(&format!(" ORDER BY {}", order_clause));
1032
1033            // Apply limit in SQL
1034            if let Some(l) = limit {
1035                sql.push_str(&format!(" LIMIT {}", l));
1036            }
1037
1038            let params_refs: Vec<&dyn rusqlite::ToSql> =
1039                params_vec.iter().map(|b| b.as_ref()).collect();
1040
1041            let mut stmt = conn.prepare(&sql)?;
1042            let tasks: Vec<Task> = stmt
1043                .query_map(params_refs.as_slice(), super::tasks::parse_task_row)?
1044                .filter_map(|r| r.ok())
1045                .collect();
1046
1047            Ok(tasks)
1048        })
1049    }
1050
1051    /// Get agent tags by agent ID.
1052    pub fn get_agent_tags(&self, agent_id: &str) -> Result<Vec<String>> {
1053        self.with_conn(|conn| {
1054            let result: Result<String, rusqlite::Error> = conn.query_row(
1055                "SELECT tags FROM workers WHERE id = ?1",
1056                params![agent_id],
1057                |row| row.get(0),
1058            );
1059
1060            match result {
1061                Ok(tags_json) => {
1062                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
1063                    Ok(tags)
1064                }
1065                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(vec![]),
1066                Err(e) => Err(e.into()),
1067            }
1068        })
1069    }
1070
1071    /// Internal helper: add dependency within a transaction (used by tasks.rs).
1072    pub(super) fn add_dependency_internal(
1073        conn: &Connection,
1074        from_task_id: &str,
1075        to_task_id: &str,
1076        dep_type: &str,
1077    ) -> Result<()> {
1078        conn.execute(
1079            "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1080            params![from_task_id, to_task_id, dep_type],
1081        )?;
1082        Ok(())
1083    }
1084
1085
1086    /// Atomically relink dependencies: unlink all prev_from→prev_to, then link all from→to.
1087    /// This is a transaction-safe operation for moving children between parents.
1088    /// Returns a result with unlinked and linked pairs.
1089    pub fn relink(
1090        &self,
1091        prev_from_ids: &[String],
1092        prev_to_ids: &[String],
1093        from_ids: &[String],
1094        to_ids: &[String],
1095        dep_type: &str,
1096        deps_config: &DependenciesConfig,
1097    ) -> Result<RelinkResult> {
1098        // Validate dependency type upfront
1099        if !deps_config.is_valid_dep_type(dep_type) {
1100            return Err(anyhow!(
1101                "Invalid dependency type '{}'. Valid types: {:?}",
1102                dep_type,
1103                deps_config.dep_type_names()
1104            ));
1105        }
1106
1107        let def = deps_config.get_definition(dep_type).unwrap();
1108        let is_vertical = def.display == DependencyDisplay::Vertical;
1109
1110        self.with_conn_mut(|conn| {
1111            let tx = conn.transaction()?;
1112
1113            let mut unlinked = Vec::new();
1114            let mut linked = Vec::new();
1115            let mut errors = Vec::new();
1116
1117            // Phase 1: Unlink all prev_from × prev_to
1118            for prev_from in prev_from_ids {
1119                for prev_to in prev_to_ids {
1120                    let rows = tx.execute(
1121                        "DELETE FROM dependencies WHERE from_task_id = ?1 AND to_task_id = ?2 AND dep_type = ?3",
1122                        params![prev_from, prev_to, dep_type],
1123                    )?;
1124                    if rows > 0 {
1125                        unlinked.push((prev_from.clone(), prev_to.clone()));
1126                    }
1127                }
1128            }
1129
1130            // Phase 2: Link all from × to (with validation)
1131            for from_id in from_ids {
1132                for to_id in to_ids {
1133                    // For vertical deps, check single-parent constraint
1134                    if is_vertical {
1135                        let existing_parent: Option<String> = tx.query_row(
1136                            "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1137                            params![to_id],
1138                            |row| row.get(0),
1139                        ).optional()?;
1140
1141                        if let Some(ref parent) = existing_parent {
1142                            if parent != from_id {
1143                                errors.push(format!(
1144                                    "Task {} already has parent {}",
1145                                    to_id, parent
1146                                ));
1147                                continue;
1148                            }
1149                        }
1150                    }
1151
1152                    // Check for cycles using temporary view within transaction
1153                    if would_create_cycle_in_tx(&tx, from_id, to_id, dep_type, deps_config)? {
1154                        errors.push(format!(
1155                            "Adding dependency {}→{} would create a cycle",
1156                            from_id, to_id
1157                        ));
1158                        continue;
1159                    }
1160
1161                    tx.execute(
1162                        "INSERT OR IGNORE INTO dependencies (from_task_id, to_task_id, dep_type) VALUES (?1, ?2, ?3)",
1163                        params![from_id, to_id, dep_type],
1164                    )?;
1165                    linked.push((from_id.clone(), to_id.clone()));
1166                }
1167            }
1168
1169            if !errors.is_empty() {
1170                // Rollback on validation errors
1171                tx.rollback()?;
1172                return Err(anyhow!("Relink failed: {}", errors.join("; ")));
1173            }
1174
1175            tx.commit()?;
1176            Ok(RelinkResult { unlinked, linked })
1177        })
1178    }
1179
1180    // ============================================================================
1181    // Graph Traversal Methods for scan tool
1182    // ============================================================================
1183
1184    /// Get predecessors (tasks that block this task) via blocks/follows dependencies.
1185    /// depth: 0 = none, N = N levels, -1 = all
1186    pub fn get_predecessors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1187        if depth == 0 {
1188            return Ok(vec![]);
1189        }
1190
1191        self.with_conn(|conn| {
1192            let mut visited: HashSet<String> = HashSet::new();
1193            let mut result: Vec<Task> = Vec::new();
1194            let mut current_level: Vec<String> = vec![task_id.to_string()];
1195            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1196
1197            while !current_level.is_empty() && levels_remaining > 0 {
1198                let mut next_level: Vec<String> = Vec::new();
1199
1200                for tid in &current_level {
1201                    // Get tasks that block this one (from_task_id blocks to_task_id)
1202                    let mut stmt = conn.prepare(
1203                        "SELECT DISTINCT d.from_task_id FROM dependencies d
1204                         WHERE d.to_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')"
1205                    )?;
1206
1207                    let predecessors: Vec<String> = stmt
1208                        .query_map(params![tid], |row| row.get(0))?
1209                        .filter_map(|r| r.ok())
1210                        .collect();
1211
1212                    for pred_id in predecessors {
1213                        if !visited.contains(&pred_id) {
1214                            visited.insert(pred_id.clone());
1215                            if let Some(task) = get_task_by_id_internal(conn, &pred_id)? {
1216                                result.push(task);
1217                            }
1218                            next_level.push(pred_id);
1219                        }
1220                    }
1221                }
1222
1223                current_level = next_level;
1224                levels_remaining -= 1;
1225            }
1226
1227            Ok(result)
1228        })
1229    }
1230
1231    /// Get successors (tasks that this task blocks) via blocks/follows dependencies.
1232    /// depth: 0 = none, N = N levels, -1 = all
1233    pub fn get_successors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1234        if depth == 0 {
1235            return Ok(vec![]);
1236        }
1237
1238        self.with_conn(|conn| {
1239            let mut visited: HashSet<String> = HashSet::new();
1240            let mut result: Vec<Task> = Vec::new();
1241            let mut current_level: Vec<String> = vec![task_id.to_string()];
1242            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1243
1244            while !current_level.is_empty() && levels_remaining > 0 {
1245                let mut next_level: Vec<String> = Vec::new();
1246
1247                for tid in &current_level {
1248                    // Get tasks that this one blocks (from_task_id blocks to_task_id)
1249                    let mut stmt = conn.prepare(
1250                        "SELECT DISTINCT d.to_task_id FROM dependencies d
1251                         WHERE d.from_task_id = ?1 AND d.dep_type IN ('blocks', 'follows')"
1252                    )?;
1253
1254                    let successors: Vec<String> = stmt
1255                        .query_map(params![tid], |row| row.get(0))?
1256                        .filter_map(|r| r.ok())
1257                        .collect();
1258
1259                    for succ_id in successors {
1260                        if !visited.contains(&succ_id) {
1261                            visited.insert(succ_id.clone());
1262                            if let Some(task) = get_task_by_id_internal(conn, &succ_id)? {
1263                                result.push(task);
1264                            }
1265                            next_level.push(succ_id);
1266                        }
1267                    }
1268                }
1269
1270                current_level = next_level;
1271                levels_remaining -= 1;
1272            }
1273
1274            Ok(result)
1275        })
1276    }
1277
1278    /// Get ancestors (parent chain) via contains dependency.
1279    /// depth: 0 = none, N = N levels up, -1 = all
1280    pub fn get_ancestors(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1281        if depth == 0 {
1282            return Ok(vec![]);
1283        }
1284
1285        self.with_conn(|conn| {
1286            let mut result: Vec<Task> = Vec::new();
1287            let mut current_id = task_id.to_string();
1288            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1289
1290            while levels_remaining > 0 {
1291                // Get parent (from_task_id contains to_task_id)
1292                let parent_result: Result<String, rusqlite::Error> = conn.query_row(
1293                    "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1294                    params![&current_id],
1295                    |row| row.get(0),
1296                );
1297
1298                match parent_result {
1299                    Ok(parent_id) => {
1300                        if let Some(task) = get_task_by_id_internal(conn, &parent_id)? {
1301                            result.push(task);
1302                        }
1303                        current_id = parent_id;
1304                        levels_remaining -= 1;
1305                    }
1306                    Err(rusqlite::Error::QueryReturnedNoRows) => break,
1307                    Err(e) => return Err(e.into()),
1308                }
1309            }
1310
1311            Ok(result)
1312        })
1313    }
1314
1315    /// Get descendants (children tree) via contains dependency.
1316    /// depth: 0 = none, N = N levels down, -1 = all
1317    pub fn get_descendants(&self, task_id: &str, depth: i32) -> Result<Vec<Task>> {
1318        if depth == 0 {
1319            return Ok(vec![]);
1320        }
1321
1322        self.with_conn(|conn| {
1323            let mut visited: HashSet<String> = HashSet::new();
1324            let mut result: Vec<Task> = Vec::new();
1325            let mut current_level: Vec<String> = vec![task_id.to_string()];
1326            let mut levels_remaining = if depth < 0 { i32::MAX } else { depth };
1327
1328            while !current_level.is_empty() && levels_remaining > 0 {
1329                let mut next_level: Vec<String> = Vec::new();
1330
1331                for tid in &current_level {
1332                    // Get children (from_task_id contains to_task_id)
1333                    let mut stmt = conn.prepare(
1334                        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'"
1335                    )?;
1336
1337                    let children: Vec<String> = stmt
1338                        .query_map(params![tid], |row| row.get(0))?
1339                        .filter_map(|r| r.ok())
1340                        .collect();
1341
1342                    for child_id in children {
1343                        if !visited.contains(&child_id) {
1344                            visited.insert(child_id.clone());
1345                            if let Some(task) = get_task_by_id_internal(conn, &child_id)? {
1346                                result.push(task);
1347                            }
1348                            next_level.push(child_id);
1349                        }
1350                    }
1351                }
1352
1353                current_level = next_level;
1354                levels_remaining -= 1;
1355            }
1356
1357            Ok(result)
1358        })
1359    }
1360
1361}
1362
1363/// Helper to get a task by ID within a connection context.
1364fn get_task_by_id_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
1365    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1366    let task = stmt
1367        .query_row(params![task_id], super::tasks::parse_task_row)
1368        .optional()?;
1369    Ok(task)
1370}
1371
1372
1373/// Get the IDs of tasks that block a given task from starting (unsatisfied dependencies).
1374/// A task blocks starting if it has a start-blocking dependency type and is in a blocking state.
1375/// This is a transaction-safe version for use within existing transactions.
1376pub(crate) fn get_unsatisfied_start_blockers_in_tx(
1377    conn: &Connection,
1378    task_id: &str,
1379    states_config: &StatesConfig,
1380    deps_config: &DependenciesConfig,
1381) -> Result<Vec<String>> {
1382    let start_blocking_types = deps_config.start_blocking_types();
1383    if start_blocking_types.is_empty() {
1384        return Ok(vec![]);
1385    }
1386
1387    // Build IN clause from blocking_states
1388    let state_placeholders: Vec<String> = states_config
1389        .blocking_states
1390        .iter()
1391        .enumerate()
1392        .map(|(i, _)| format!("?{}", i + 2))
1393        .collect();
1394    let state_clause = state_placeholders.join(", ");
1395
1396    // Build IN clause from types
1397    let type_start = states_config.blocking_states.len() + 2;
1398    let type_placeholders: Vec<String> = start_blocking_types
1399        .iter()
1400        .enumerate()
1401        .map(|(i, _)| format!("?{}", type_start + i))
1402        .collect();
1403    let type_clause = type_placeholders.join(", ");
1404
1405    let sql = format!(
1406        "SELECT blocker.id FROM dependencies d
1407         INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1408         WHERE d.to_task_id = ?1 
1409         AND d.dep_type IN ({})
1410         AND blocker.status IN ({})",
1411        type_clause, state_clause
1412    );
1413
1414    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1415    params_vec.push(Box::new(task_id.to_string()));
1416    for state in &states_config.blocking_states {
1417        params_vec.push(Box::new(state.clone()));
1418    }
1419    for t in &start_blocking_types {
1420        params_vec.push(Box::new(t.to_string()));
1421    }
1422    let params_refs: Vec<&dyn rusqlite::ToSql> =
1423        params_vec.iter().map(|b| b.as_ref()).collect();
1424
1425    let mut stmt = conn.prepare(&sql)?;
1426    let blockers = stmt
1427        .query_map(params_refs.as_slice(), |row| {
1428            let id: String = row.get(0)?;
1429            Ok(id)
1430        })?
1431        .filter_map(|r| r.ok())
1432        .collect();
1433
1434    Ok(blockers)
1435}
1436
1437/// Propagate unblock effects when a task transitions out of a blocking state.
1438/// This is called after a task completes to find newly unblocked tasks and
1439/// optionally auto-advance them.
1440///
1441/// Returns (unblocked, auto_advanced):
1442/// - unblocked: All task IDs that are now ready (all dependencies satisfied)
1443/// - auto_advanced: Subset of unblocked that were actually transitioned (when enabled)
1444///
1445/// Algorithm:
1446/// 1. Find all tasks that have a start-blocking dependency on the completed task
1447/// 2. For each candidate:
1448///    - Skip if not in initial state or already claimed
1449///    - Check if ALL other start-blockers are also satisfied
1450///    - If fully unblocked → add to unblocked list
1451///    - If auto_advance enabled → also transition to target_state
1452/// 3. Return both lists
1453pub(crate) fn propagate_unblock_effects(
1454    conn: &Connection,
1455    completed_task_id: &str,
1456    agent_id: Option<&str>,
1457    states_config: &StatesConfig,
1458    deps_config: &DependenciesConfig,
1459    auto_advance: &AutoAdvanceConfig,
1460) -> Result<(Vec<String>, Vec<String>)> {
1461    // Get start-blocking dependency types
1462    let start_blocking_types = deps_config.start_blocking_types();
1463    if start_blocking_types.is_empty() {
1464        return Ok((vec![], vec![]));
1465    }
1466
1467    // Find all tasks that depend on the completed task via start-blocking dependencies
1468    let type_placeholders: Vec<String> = start_blocking_types
1469        .iter()
1470        .enumerate()
1471        .map(|(i, _)| format!("?{}", i + 2))
1472        .collect();
1473    let type_clause = type_placeholders.join(", ");
1474
1475    let sql = format!(
1476        "SELECT to_task_id FROM dependencies WHERE from_task_id = ?1 AND dep_type IN ({})",
1477        type_clause
1478    );
1479
1480    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1481    params_vec.push(Box::new(completed_task_id.to_string()));
1482    for t in &start_blocking_types {
1483        params_vec.push(Box::new(t.to_string()));
1484    }
1485    let params_refs: Vec<&dyn rusqlite::ToSql> =
1486        params_vec.iter().map(|b| b.as_ref()).collect();
1487
1488    let mut stmt = conn.prepare(&sql)?;
1489    let dependent_task_ids: Vec<String> = stmt
1490        .query_map(params_refs.as_slice(), |row| row.get(0))?
1491        .filter_map(|r| r.ok())
1492        .collect();
1493
1494    let mut unblocked = Vec::new();
1495    let mut auto_advanced = Vec::new();
1496    let now = super::now_ms();
1497
1498    // Determine if we should auto-advance
1499    let should_auto_advance = auto_advance.enabled && auto_advance.target_state.is_some();
1500    let target_state = auto_advance.target_state.clone();
1501
1502    // Validate target state if auto-advance is enabled
1503    if should_auto_advance {
1504        let ts = target_state.as_ref().unwrap();
1505        if !states_config.is_valid_state(ts) {
1506            return Err(anyhow!(
1507                "Auto-advance target state '{}' is not a valid state",
1508                ts
1509            ));
1510        }
1511    }
1512
1513    for task_id in dependent_task_ids {
1514        // Get the task
1515        let task = match get_task_by_id_internal(conn, &task_id)? {
1516            Some(t) => t,
1517            None => continue,
1518        };
1519
1520        // Skip if not in initial state
1521        if task.status != states_config.initial {
1522            continue;
1523        }
1524
1525        // Skip if task is already claimed
1526        if task.worker_id.is_some() {
1527            continue;
1528        }
1529
1530        // Check if ALL start-blockers are now satisfied (not in blocking states)
1531        // Build query to count remaining blockers that are still blocking
1532        let state_placeholders: Vec<String> = states_config
1533            .blocking_states
1534            .iter()
1535            .enumerate()
1536            .map(|(i, _)| format!("?{}", i + 3))
1537            .collect();
1538        let state_clause = state_placeholders.join(", ");
1539
1540        // Reuse type_placeholders from above
1541        let type_start = states_config.blocking_states.len() + 3;
1542        let type_placeholders2: Vec<String> = start_blocking_types
1543            .iter()
1544            .enumerate()
1545            .map(|(i, _)| format!("?{}", type_start + i))
1546            .collect();
1547        let type_clause2 = type_placeholders2.join(", ");
1548
1549        let blocker_sql = format!(
1550            "SELECT COUNT(*) FROM dependencies d
1551             INNER JOIN tasks blocker ON d.from_task_id = blocker.id
1552             WHERE d.to_task_id = ?1
1553             AND d.from_task_id != ?2
1554             AND d.dep_type IN ({})
1555             AND blocker.status IN ({})",
1556            type_clause2, state_clause
1557        );
1558
1559        let mut blocker_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1560        blocker_params.push(Box::new(task_id.clone()));
1561        blocker_params.push(Box::new(completed_task_id.to_string()));
1562        for state in &states_config.blocking_states {
1563            blocker_params.push(Box::new(state.clone()));
1564        }
1565        for t in &start_blocking_types {
1566            blocker_params.push(Box::new(t.to_string()));
1567        }
1568        let blocker_refs: Vec<&dyn rusqlite::ToSql> =
1569            blocker_params.iter().map(|b| b.as_ref()).collect();
1570
1571        let remaining_blockers: i32 =
1572            conn.query_row(&blocker_sql, blocker_refs.as_slice(), |row| row.get(0))?;
1573
1574        if remaining_blockers > 0 {
1575            continue; // Still blocked by other tasks
1576        }
1577
1578        // Task is now fully unblocked - add to unblocked list
1579        unblocked.push(task_id.clone());
1580
1581        // Auto-advance if enabled and transition is valid
1582        if should_auto_advance {
1583            let ts = target_state.as_ref().unwrap();
1584            
1585            // Validate transition from initial to target_state
1586            if !states_config.is_valid_transition(&states_config.initial, ts) {
1587                // Skip auto-advance for this task - transition not allowed
1588                continue;
1589            }
1590
1591            // Auto-advance: update the task's status
1592            conn.execute(
1593                "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
1594                params![ts, now, &task_id],
1595            )?;
1596
1597            // Record state transition
1598            let reason = format!(
1599                "auto-advanced: blocker '{}' completed",
1600                completed_task_id
1601            );
1602            super::state_transitions::record_state_transition(
1603                conn,
1604                &task_id,
1605                ts,
1606                agent_id,
1607                Some(&reason),
1608                states_config,
1609            )?;
1610
1611            auto_advanced.push(task_id);
1612        }
1613    }
1614
1615    Ok((unblocked, auto_advanced))
1616}
1617