Skip to main content

task_graph_mcp/db/
tasks.rs

1//! Task CRUD and tree operations.
2
3use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6    AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10    PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11    parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17/// Result type for `update_task_unified`:
18/// (updated_task, unblocked_task_ids, auto_advanced_task_ids, auto_completed_parents)
19/// where auto_completed_parents is a list of (id, title) pairs for parents auto-completed via rollup.
20type UpdateResult = (Task, Vec<String>, Vec<String>, Vec<(String, String)>);
21
22/// Options for creating a task tree from nested input.
23#[derive(Debug)]
24pub struct CreateTreeOptions<'a> {
25    pub input: TaskTreeInput,
26    pub parent_id: Option<String>,
27    pub child_type: Option<String>,
28    pub sibling_type: Option<String>,
29    pub states_config: &'a StatesConfig,
30    pub phases_config: &'a PhasesConfig,
31    pub tags_config: &'a TagsConfig,
32    pub ids_config: &'a IdsConfig,
33}
34
35/// Query parameters for listing tasks with optional filters.
36#[derive(Debug, Default)]
37pub struct ListTasksQuery<'a> {
38    pub status: Option<&'a str>,
39    pub phase: Option<&'a str>,
40    pub owner: Option<&'a str>,
41    pub parent_id: Option<Option<&'a str>>,
42    pub limit: Option<i32>,
43    pub offset: i32,
44    pub sort_by: Option<&'a str>,
45    pub sort_order: Option<&'a str>,
46}
47
48/// Generate a petname-based task ID using the large wordlist.
49/// Uses the configured number of words and case style.
50fn generate_task_id(ids_config: &IdsConfig) -> String {
51    let words = ids_config.task_id_words;
52    let case = ids_config.id_case;
53
54    // Generate with hyphen separator first (petname's default format)
55    let base = Petnames::medium()
56        .generate_one(words, "-")
57        .unwrap_or_else(|| format!("task-{}", super::now_ms()));
58
59    // Convert to desired case
60    case.convert(&base)
61}
62
63/// Build an ORDER BY clause from sort_by and sort_order parameters.
64/// Returns a safe SQL ORDER BY expression.
65fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
66    let field = match sort_by {
67        Some("priority") => "CAST(t.priority AS INTEGER)",
68        Some("created_at") => "t.created_at",
69        Some("updated_at") => "t.updated_at",
70        _ => "t.created_at", // default
71    };
72
73    let order = match sort_order {
74        Some("asc") => "ASC",
75        Some("desc") => "DESC",
76        _ => {
77            // Default: priority is descending (higher number = more important), dates are descending
78            "DESC"
79        }
80    };
81
82    format!("{} {}", field, order)
83}
84
85// =============================================================================
86// Junction table helpers for tag management
87// =============================================================================
88
89/// Sync task tags to the task_tags junction table.
90/// Replaces all existing tags for the task.
91fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
92    conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
93    for tag in tags {
94        conn.execute(
95            "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
96            params![task_id, tag],
97        )?;
98    }
99    Ok(())
100}
101
102/// Sync needed tags (agent must have ALL) to the task_needed_tags junction table.
103fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
104    conn.execute(
105        "DELETE FROM task_needed_tags WHERE task_id = ?1",
106        params![task_id],
107    )?;
108    for tag in tags {
109        conn.execute(
110            "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
111            params![task_id, tag],
112        )?;
113    }
114    Ok(())
115}
116
117/// Sync wanted tags (agent must have ANY) to the task_wanted_tags junction table.
118fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
119    conn.execute(
120        "DELETE FROM task_wanted_tags WHERE task_id = ?1",
121        params![task_id],
122    )?;
123    for tag in tags {
124        conn.execute(
125            "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
126            params![task_id, tag],
127        )?;
128    }
129    Ok(())
130}
131
132pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
133    let id: String = row.get("id")?;
134    let title: String = row.get("title")?;
135    let description: Option<String> = row.get("description")?;
136    let status: String = row.get("status")?;
137    let phase: Option<String> = row.get("phase")?;
138    let priority: String = row.get("priority")?;
139    let worker_id: Option<String> = row.get("worker_id")?;
140    let claimed_at: Option<i64> = row.get("claimed_at")?;
141
142    let needed_tags_json: Option<String> = row.get("needed_tags")?;
143    let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
144    let tags_json: Option<String> = row.get("tags")?;
145
146    let points: Option<i32> = row.get("points")?;
147    let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
148    let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
149    let started_at: Option<i64> = row.get("started_at")?;
150    let completed_at: Option<i64> = row.get("completed_at")?;
151
152    let current_thought: Option<String> = row.get("current_thought")?;
153
154    let cost_usd: f64 = row.get("cost_usd")?;
155    let metric_0: i64 = row.get("metric_0")?;
156    let metric_1: i64 = row.get("metric_1")?;
157    let metric_2: i64 = row.get("metric_2")?;
158    let metric_3: i64 = row.get("metric_3")?;
159    let metric_4: i64 = row.get("metric_4")?;
160    let metric_5: i64 = row.get("metric_5")?;
161    let metric_6: i64 = row.get("metric_6")?;
162    let metric_7: i64 = row.get("metric_7")?;
163
164    let created_at: i64 = row.get("created_at")?;
165    let updated_at: i64 = row.get("updated_at")?;
166
167    Ok(Task {
168        id,
169        title,
170        description,
171        status,
172        phase,
173        priority: parse_priority(&priority),
174        worker_id,
175        claimed_at,
176        needed_tags: needed_tags_json
177            .map(|s| serde_json::from_str(&s).unwrap_or_default())
178            .unwrap_or_default(),
179        wanted_tags: wanted_tags_json
180            .map(|s| serde_json::from_str(&s).unwrap_or_default())
181            .unwrap_or_default(),
182        tags: tags_json
183            .map(|s| serde_json::from_str(&s).unwrap_or_default())
184            .unwrap_or_default(),
185        points,
186        time_estimate_ms,
187        time_actual_ms,
188        started_at,
189        completed_at,
190        current_thought,
191        cost_usd,
192        metrics: [
193            metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
194        ],
195        created_at,
196        updated_at,
197    })
198}
199
200/// Internal helper to get a task using an existing connection (avoids deadlock).
201fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
202    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
203
204    let result = stmt.query_row(params![task_id], parse_task_row);
205
206    match result {
207        Ok(task) => Ok(Some(task)),
208        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
209        Err(e) => Err(e.into()),
210    }
211}
212
213/// Internal helper to get a worker using an existing connection (avoids deadlock).
214fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
215    let mut stmt = conn.prepare(
216        "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
217         FROM workers WHERE id = ?1",
218    )?;
219
220    let result = stmt.query_row(params![worker_id], |row| {
221        let id: String = row.get(0)?;
222        let tags_json: String = row.get(1)?;
223        let max_claims: i32 = row.get(2)?;
224        let registered_at: i64 = row.get(3)?;
225        let last_heartbeat: i64 = row.get(4)?;
226        let last_status: Option<String> = row.get(5)?;
227        let last_phase: Option<String> = row.get(6)?;
228        let last_task_id: Option<String> = row.get(7)?;
229        let workflow: Option<String> = row.get(8)?;
230        let overlays_json: Option<String> = row.get(9)?;
231
232        Ok((
233            id,
234            tags_json,
235            max_claims,
236            registered_at,
237            last_heartbeat,
238            last_status,
239            last_phase,
240            last_task_id,
241            workflow,
242            overlays_json,
243        ))
244    });
245
246    match result {
247        Ok((
248            id,
249            tags_json,
250            max_claims,
251            registered_at,
252            last_heartbeat,
253            last_status,
254            last_phase,
255            last_task_id,
256            workflow,
257            overlays_json,
258        )) => {
259            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
260            let overlays: Vec<String> = overlays_json
261                .as_deref()
262                .and_then(|s| serde_json::from_str(s).ok())
263                .unwrap_or_default();
264            Ok(Some(Worker {
265                id,
266                tags,
267                max_claims,
268                registered_at,
269                last_heartbeat,
270                last_status,
271                last_phase,
272                last_task_id,
273                workflow,
274                overlays,
275            }))
276        }
277        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
278        Err(e) => Err(e.into()),
279    }
280}
281
282impl Database {
283    /// Create a new task.
284    /// If id is provided, uses it as the task ID; otherwise generates a petname ID.
285    /// If parent_id is provided, creates a 'contains' dependency from parent to this task.
286    /// `title` is the short task title; `description` is optional longer detail.
287    #[allow(clippy::too_many_arguments)]
288    pub fn create_task(
289        &self,
290        id: Option<String>,
291        title: String,
292        description: Option<String>,
293        parent_id: Option<String>,
294        phase: Option<String>,
295        priority: Option<Priority>,
296        points: Option<i32>,
297        time_estimate_ms: Option<i64>,
298        agent_tags_all: Option<Vec<String>>,
299        agent_tags_any: Option<Vec<String>>,
300        tags: Option<Vec<String>>,
301        states_config: &StatesConfig,
302        ids_config: &IdsConfig,
303    ) -> Result<Task> {
304        let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
305        let now = now_ms();
306        let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
307        let initial_status = &states_config.initial;
308
309        let needed_tags = agent_tags_all.unwrap_or_default();
310        let wanted_tags = agent_tags_any.unwrap_or_default();
311        let tags = tags.unwrap_or_default();
312        let needed_tags_json = serde_json::to_string(&needed_tags)?;
313        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
314        let tags_json = serde_json::to_string(&tags)?;
315
316        self.with_conn_mut(|conn| {
317            let tx = conn.transaction()?;
318
319            tx.execute(
320                "INSERT INTO tasks (
321                    id, title, description, status, phase, priority,
322                    needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
323                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
324                params![
325                    &task_id,
326                    &title,
327                    &description,
328                    initial_status,
329                    &phase,
330                    priority.to_string(),
331                    needed_tags_json,
332                    wanted_tags_json,
333                    tags_json,
334                    points,
335                    time_estimate_ms,
336                    now,
337                    now,
338                ],
339            )?;
340
341            // Sync tags to junction tables
342            sync_task_tags(&tx, &task_id, &tags)?;
343            sync_needed_tags(&tx, &task_id, &needed_tags)?;
344            sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
345
346            // Create 'contains' dependency if parent_id is provided
347            if let Some(ref pid) = parent_id {
348                Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
349            }
350
351            // Record initial state
352            record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
353
354            tx.commit()?;
355
356            Ok(Task {
357                id: task_id,
358                title,
359                description,
360                status: initial_status.clone(),
361                phase,
362                priority,
363                worker_id: None,
364                claimed_at: None,
365                needed_tags,
366                wanted_tags,
367                tags,
368                points,
369                time_estimate_ms,
370                time_actual_ms: None,
371                started_at: None,
372                completed_at: None,
373                current_thought: None,
374                cost_usd: 0.0,
375                metrics: [0; 8],
376                created_at: now,
377                updated_at: now,
378            })
379        })
380    }
381
382    /// Convenience method to create a task with just a description string.
383    /// Uses the description as both title and description. Useful for tests.
384    pub fn create_task_simple(
385        &self,
386        description: impl Into<String>,
387        states_config: &StatesConfig,
388        ids_config: &IdsConfig,
389    ) -> Result<Task> {
390        let desc = description.into();
391        self.create_task(
392            None,
393            desc.clone(),
394            Some(desc),
395            None,
396            None,
397            None,
398            None,
399            None,
400            None,
401            None,
402            None,
403            states_config,
404            ids_config,
405        )
406    }
407
408    /// Create a task tree from nested input.
409    /// Uses child_type for parent-child dependencies (default: "contains").
410    /// Uses sibling_type for sibling dependencies (default: none/parallel).
411    #[allow(clippy::type_complexity)]
412    pub fn create_task_tree(
413        &self,
414        opts: CreateTreeOptions<'_>,
415    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
416        let mut all_ids = Vec::new();
417        let mut phase_warnings = Vec::new();
418        let mut tag_warnings = Vec::new();
419        // Default child_type to "contains" if not specified
420        let child_type = opts.child_type.or_else(|| Some("contains".to_string()));
421
422        self.with_conn_mut(|conn| {
423            let tx = conn.transaction()?;
424            let root_id = create_tree_recursive(
425                &tx,
426                &opts.input,
427                opts.parent_id.as_deref(),
428                None, // no previous sibling for root
429                child_type.as_deref(),
430                opts.sibling_type.as_deref(),
431                &mut all_ids,
432                &mut phase_warnings,
433                &mut tag_warnings,
434                opts.states_config,
435                opts.phases_config,
436                opts.tags_config,
437                opts.ids_config,
438            )?;
439            tx.commit()?;
440            Ok((root_id, all_ids, phase_warnings, tag_warnings))
441        })
442    }
443
444    /// Get a task by ID.
445    pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
446        self.with_conn(|conn| {
447            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
448
449            let result = stmt.query_row(params![task_id], parse_task_row);
450
451            match result {
452                Ok(task) => Ok(Some(task)),
453                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
454                Err(e) => Err(e.into()),
455            }
456        })
457    }
458
459    /// Rename a task's ID, updating all references atomically.
460    ///
461    /// Disables foreign key enforcement, updates every table that references
462    /// `tasks.id` inside a transaction, then re-enables and verifies FK
463    /// integrity.
464    pub fn rename_task(&self, old_id: &str, new_id: &str) -> Result<()> {
465        // Validate new_id
466        if new_id.is_empty() {
467            return Err(anyhow!("new_id must not be empty"));
468        }
469        if new_id.len() > 64 {
470            return Err(anyhow!("new_id must not exceed 64 characters"));
471        }
472
473        self.with_conn_mut(|conn| {
474            // Pre-check: old_id must exist
475            let exists: bool = conn.query_row(
476                "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
477                params![old_id],
478                |row| row.get(0),
479            )?;
480            if !exists {
481                return Err(anyhow!("Task '{}' not found", old_id));
482            }
483
484            // Pre-check: new_id must not already exist
485            let conflict: bool = conn.query_row(
486                "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
487                params![new_id],
488                |row| row.get(0),
489            )?;
490            if conflict {
491                return Err(anyhow!("Task '{}' already exists", new_id));
492            }
493
494            // Disable FK enforcement for the duration of the update
495            conn.execute_batch("PRAGMA foreign_keys = OFF")?;
496
497            let result = (|| -> Result<()> {
498                let tx = conn.transaction()?;
499
500                // Primary table — triggers tasks_fts_update
501                tx.execute(
502                    "UPDATE tasks SET id = ?1 WHERE id = ?2",
503                    params![new_id, old_id],
504                )?;
505
506                // Attachments — triggers attachments_fts_update per row
507                tx.execute(
508                    "UPDATE attachments SET task_id = ?1 WHERE task_id = ?2",
509                    params![new_id, old_id],
510                )?;
511
512                // Dependencies (both columns)
513                tx.execute(
514                    "UPDATE dependencies SET from_task_id = ?1 WHERE from_task_id = ?2",
515                    params![new_id, old_id],
516                )?;
517                tx.execute(
518                    "UPDATE dependencies SET to_task_id = ?1 WHERE to_task_id = ?2",
519                    params![new_id, old_id],
520                )?;
521
522                // File locks
523                tx.execute(
524                    "UPDATE file_locks SET task_id = ?1 WHERE task_id = ?2",
525                    params![new_id, old_id],
526                )?;
527
528                // Tag junction tables
529                tx.execute(
530                    "UPDATE task_tags SET task_id = ?1 WHERE task_id = ?2",
531                    params![new_id, old_id],
532                )?;
533                tx.execute(
534                    "UPDATE task_needed_tags SET task_id = ?1 WHERE task_id = ?2",
535                    params![new_id, old_id],
536                )?;
537                tx.execute(
538                    "UPDATE task_wanted_tags SET task_id = ?1 WHERE task_id = ?2",
539                    params![new_id, old_id],
540                )?;
541
542                // Sequence table
543                tx.execute(
544                    "UPDATE task_sequence SET task_id = ?1 WHERE task_id = ?2",
545                    params![new_id, old_id],
546                )?;
547
548                tx.commit()?;
549                Ok(())
550            })();
551
552            // Re-enable FK enforcement regardless of success
553            conn.execute_batch("PRAGMA foreign_keys = ON")?;
554
555            // Propagate any error from the transaction
556            result?;
557
558            // Verify FK integrity
559            let mut stmt = conn.prepare("PRAGMA foreign_key_check")?;
560            let violations: Vec<String> = stmt
561                .query_map([], |row| {
562                    let table: String = row.get(0)?;
563                    Ok(table)
564                })?
565                .filter_map(|r| r.ok())
566                .collect();
567
568            if !violations.is_empty() {
569                return Err(anyhow!(
570                    "Foreign key violations after rename in tables: {:?}",
571                    violations
572                ));
573            }
574
575            Ok(())
576        })
577    }
578
579    /// Get a task with all its children (tree).
580    pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
581        let task = self.get_task(task_id)?;
582        match task {
583            None => Ok(None),
584            Some(task) => {
585                let children = self.get_children_recursive(&task.id)?;
586                Ok(Some(TaskTree { task, children }))
587            }
588        }
589    }
590
591    /// Get children recursively.
592    fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
593        let children = self.get_children(parent_id)?;
594        let mut result = Vec::new();
595
596        for child in children {
597            let child_children = self.get_children_recursive(&child.id)?;
598            result.push(TaskTree {
599                task: child,
600                children: child_children,
601            });
602        }
603
604        Ok(result)
605    }
606
607    /// Get direct children of a task (via 'contains' dependency).
608    pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
609        self.with_conn(|conn| {
610            let mut stmt = conn.prepare(
611                "SELECT t.* FROM tasks t
612                 INNER JOIN dependencies d ON t.id = d.to_task_id
613                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
614                 ORDER BY t.created_at",
615            )?;
616
617            let tasks = stmt
618                .query_map(params![parent_id], parse_task_row)?
619                .filter_map(|r| r.ok())
620                .collect();
621
622            Ok(tasks)
623        })
624    }
625
626    /// Update a task.
627    #[allow(clippy::too_many_arguments)]
628    pub fn update_task(
629        &self,
630        task_id: &str,
631        title: Option<String>,
632        description: Option<Option<String>>,
633        status: Option<String>,
634        priority: Option<Priority>,
635        points: Option<Option<i32>>,
636        tags: Option<Vec<String>>,
637        states_config: &StatesConfig,
638    ) -> Result<Task> {
639        let now = now_ms();
640
641        self.with_conn(|conn| {
642            let task =
643                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
644
645            let new_title = title.unwrap_or(task.title.clone());
646            let new_description = description.unwrap_or(task.description.clone());
647            let new_status = status.unwrap_or(task.status.clone());
648            let new_priority = priority.unwrap_or(task.priority);
649            let new_points = points.unwrap_or(task.points);
650            let new_tags = tags.unwrap_or(task.tags.clone());
651
652            // Validate the new status exists
653            if !states_config.is_valid_state(&new_status) {
654                return Err(anyhow!(
655                    "Invalid state '{}'. Valid states: {:?}",
656                    new_status,
657                    states_config.state_names()
658                ));
659            }
660
661            // Validate state transition if status changed
662            if task.status != new_status
663                && !states_config.is_valid_transition(&task.status, &new_status)
664            {
665                let exits = states_config.get_exits(&task.status);
666                return Err(anyhow!(
667                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}. \
668Tasks must transition through a timed state (e.g. working) for time tracking; \
669skipping directly to a terminal state is not permitted.",
670                    task.status,
671                    new_status,
672                    exits
673                ));
674            }
675
676            // Handle status transitions for timestamps
677            // Set started_at when first entering a timed state
678            let started_at =
679                if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
680                    Some(now)
681                } else {
682                    task.started_at
683                };
684
685            // Set completed_at when entering a completed state (terminal or with reopen capability)
686            let completed_at = if new_status == "completed" {
687                Some(now)
688            } else {
689                task.completed_at
690            };
691
692            // Record state transition if status changed (handles time accumulation)
693            if task.status != new_status {
694                record_state_transition(
695                    conn,
696                    task_id,
697                    &new_status,
698                    task.worker_id.as_deref(),
699                    None,
700                    states_config,
701                )?;
702            }
703
704            conn.execute(
705                "UPDATE tasks SET
706                    title = ?1, description = ?2, status = ?3, priority = ?4,
707                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
708                    tags = ?9
709                WHERE id = ?10",
710                params![
711                    new_title,
712                    new_description,
713                    new_status,
714                    new_priority.to_string(),
715                    new_points,
716                    started_at,
717                    completed_at,
718                    now,
719                    serde_json::to_string(&new_tags)?,
720                    task_id,
721                ],
722            )?;
723
724            Ok(Task {
725                id: task_id.to_string(),
726                title: new_title,
727                description: new_description,
728                status: new_status,
729                priority: new_priority,
730                points: new_points,
731                tags: new_tags,
732                started_at,
733                completed_at,
734                updated_at: now,
735                ..task
736            })
737        })
738    }
739
740    /// Update a task with unified claim/release logic.
741    /// - Transition to timed state = CLAIM (set owner, validate tags, check limit)
742    /// - Transition from timed to non-timed = RELEASE (clear owner)
743    /// - Transition to terminal = COMPLETE (check children, release file locks)
744    /// - With assignee = ASSIGN (set owner to assignee, transition to 'assigned' state)
745    /// - Only the owner can update a claimed task (unless force=true)
746    ///
747    /// Returns (task, unblocked, auto_advanced):
748    /// - task: The updated task
749    /// - unblocked: Task IDs that are now ready (all dependencies satisfied)
750    /// - auto_advanced: Subset of unblocked that were actually transitioned
751    #[allow(clippy::too_many_arguments)]
752    pub fn update_task_unified(
753        &self,
754        task_id: &str,
755        agent_id: &str,
756        assignee: Option<&str>,
757        title: Option<String>,
758        description: Option<Option<String>>,
759        status: Option<String>,
760        phase: Option<String>,
761        priority: Option<Priority>,
762        points: Option<Option<i32>>,
763        tags: Option<Vec<String>>,
764        needed_tags: Option<Vec<String>>,
765        wanted_tags: Option<Vec<String>>,
766        time_estimate_ms: Option<i64>,
767        reason: Option<String>,
768        force: bool,
769        states_config: &StatesConfig,
770        deps_config: &DependenciesConfig,
771        auto_advance: &AutoAdvanceConfig,
772    ) -> Result<UpdateResult> {
773        let now = now_ms();
774
775        self.with_conn_mut(|conn| {
776            let tx = conn.transaction()?;
777
778            let task =
779                get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
780
781            // Owner-only validation: if task is claimed, only owner can update (unless force)
782            if let Some(ref current_owner) = task.worker_id
783                && current_owner != agent_id && !force {
784                    return Err(anyhow!(
785                        "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
786                        current_owner
787                    ));
788                }
789
790            let new_title = title.unwrap_or(task.title.clone());
791            let new_description = description.unwrap_or(task.description.clone());
792            // If assignee is set but no explicit status, default to 'assigned' state
793            let new_status = if assignee.is_some() && status.is_none() {
794                "assigned".to_string()
795            } else {
796                status.unwrap_or(task.status.clone())
797            };
798            let new_priority = priority.unwrap_or(task.priority);
799            let new_points = points.unwrap_or(task.points);
800            let new_tags = tags.unwrap_or(task.tags.clone());
801            let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
802            let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
803            let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
804            let new_phase = phase.or(task.phase.clone());
805
806            // Validate the new status exists
807            if !states_config.is_valid_state(&new_status) {
808                return Err(anyhow!(
809                    "Invalid state '{}'. Valid states: {:?}",
810                    new_status,
811                    states_config.state_names()
812                ));
813            }
814
815            // Validate state transition if status changed
816            if task.status != new_status
817                && !states_config.is_valid_transition(&task.status, &new_status) {
818                    let exits = states_config.get_exits(&task.status);
819                    return Err(anyhow!(
820                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}. \
821Tasks must transition through a timed state (e.g. working) for time tracking; \
822skipping directly to a terminal state is not permitted.",
823                        task.status,
824                        new_status,
825                        exits
826                    ));
827                }
828
829            // Determine ownership changes based on state transition
830            let new_is_timed = states_config.is_timed_state(&new_status);
831            let new_is_terminal = states_config.is_terminal_state(&new_status);
832            let current_owner = task.worker_id.as_deref();
833            let is_owned_by_agent = current_owner == Some(agent_id);
834            let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
835
836            let mut new_owner: Option<String> = task.worker_id.clone();
837            let mut new_claimed_at: Option<i64> = task.claimed_at;
838
839            // ASSIGN: Push coordination - coordinator assigns task to another agent
840            // Sets owner without starting the timer (assigned state is untimed)
841            if let Some(target_agent) = assignee {
842                // Verify task is not already claimed (unless force)
843                if is_owned_by_other && !force {
844                    return Err(anyhow!(
845                        "Task is already claimed by agent '{}'. Use force=true to reassign.",
846                        current_owner.unwrap()
847                    ));
848                }
849
850                // Verify the assignee exists
851                let target = get_worker_internal(&tx, target_agent)?
852                    .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
853
854                // Check tag affinity for the assignee
855                if !task.needed_tags.is_empty() {
856                    for needed in &task.needed_tags {
857                        if !target.tags.contains(needed) {
858                            return Err(anyhow!(
859                                "Assignee '{}' missing required tag: {}",
860                                target_agent,
861                                needed
862                            ));
863                        }
864                    }
865                }
866
867                if !task.wanted_tags.is_empty() {
868                    let has_any = task
869                        .wanted_tags
870                        .iter()
871                        .any(|wanted| target.tags.contains(wanted));
872                    if !has_any {
873                        return Err(anyhow!(
874                            "Assignee '{}' has none of the wanted tags: {:?}",
875                            target_agent,
876                            task.wanted_tags
877                        ));
878                    }
879                }
880
881                // Set ownership to the assignee
882                new_owner = Some(target_agent.to_string());
883                new_claimed_at = Some(now);
884            }
885
886            // CLAIM: Transitioning to a timed state and need to take ownership
887            // This handles: non-timed -> timed, OR timed (other owner) -> timed (force claim)
888            if new_is_timed && !is_owned_by_agent {
889                // Already claimed by someone else?
890                if is_owned_by_other && !force {
891                    return Err(anyhow!(
892                        "Task is already claimed by agent '{}'",
893                        current_owner.unwrap()
894                    ));
895                }
896
897                // Check for unsatisfied blocking dependencies (skip if force)
898                if !force {
899                    let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
900                        &tx,
901                        task_id,
902                        states_config,
903                        deps_config,
904                    )?;
905                    if !unsatisfied_blockers.is_empty() {
906                        // Return structured error with blocking task IDs so clients
907                        // can monitor them and retry when they complete
908                        return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
909                    }
910                }
911
912                // Get the agent
913                let agent = get_worker_internal(&tx, agent_id)?
914                    .ok_or_else(|| anyhow!("Agent not found"))?;
915
916                // Check tag affinity - needed_tags (AND - must have ALL)
917                if !task.needed_tags.is_empty() {
918                    for needed in &task.needed_tags {
919                        if !agent.tags.contains(needed) {
920                            return Err(anyhow!("Agent missing required tag: {}", needed));
921                        }
922                    }
923                }
924
925                // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
926                if !task.wanted_tags.is_empty() {
927                    let has_any = task
928                        .wanted_tags
929                        .iter()
930                        .any(|wanted| agent.tags.contains(wanted));
931                    if !has_any {
932                        return Err(anyhow!("Agent has none of the wanted tags"));
933                    }
934                }
935
936                // Check max_claims limit
937                if !force {
938                    let claim_count = super::agents::get_claim_count_internal(&tx, agent_id, states_config)?;
939                    if claim_count >= agent.max_claims {
940                        return Err(anyhow!(
941                            "Agent '{}' has reached max_claims limit ({}/{}). Release a task first or use force=true to override.",
942                            agent_id, claim_count, agent.max_claims
943                        ));
944                    }
945                }
946
947                // Set ownership
948                new_owner = Some(agent_id.to_string());
949                new_claimed_at = Some(now);
950
951                // Refresh agent heartbeat
952                tx.execute(
953                    "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
954                    params![now, agent_id],
955                )?;
956            }
957
958            // RELEASE: Transitioning to non-timed state (but not terminal)
959            if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
960                // Verify ownership (unless force)
961                if is_owned_by_other && !force {
962                    return Err(anyhow!("Task is not owned by this agent"));
963                }
964
965                // Clear ownership
966                new_owner = None;
967                new_claimed_at = None;
968            }
969
970            // COMPLETE: Transition to terminal state
971            if new_is_terminal {
972                // Verify ownership if task was claimed (unless force)
973                if let Some(ref current_owner) = task.worker_id
974                    && current_owner != agent_id && !force {
975                        return Err(anyhow!("Task is not owned by this agent"));
976                    }
977
978                // Clear ownership
979                new_owner = None;
980                new_claimed_at = None;
981
982                // Release file locks associated with this task (for auto-cleanup)
983                tx.execute(
984                    "DELETE FROM file_locks WHERE task_id = ?1",
985                    params![task_id],
986                )?;
987            }
988
989            // Handle timestamps
990            let started_at =
991                if task.started_at.is_none() && new_is_timed {
992                    Some(now)
993                } else {
994                    task.started_at
995                };
996
997            // Set completed_at when entering completed status (even if it has reopen exits)
998            let completed_at = if new_status == "completed" {
999                Some(now)
1000            } else {
1001                task.completed_at
1002            };
1003
1004            // Record state transition if status changed (with reason for audit)
1005            let status_changed = task.status != new_status;
1006            if status_changed {
1007                record_state_transition(
1008                    &tx,
1009                    task_id,
1010                    &new_status,
1011                    new_owner.as_deref(),
1012                    reason.as_deref(),
1013                    states_config,
1014                )?;
1015            }
1016
1017            // Record phase transition if phase changed
1018            let phase_changed = task.phase != new_phase;
1019            if phase_changed {
1020                super::state_transitions::record_phase_transition(
1021                    &tx,
1022                    task_id,
1023                    new_phase.as_deref().unwrap_or(""),
1024                    Some(agent_id),
1025                    reason.as_deref(),
1026                )?;
1027            }
1028
1029            tx.execute(
1030                "UPDATE tasks SET
1031                    title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
1032                    points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
1033                    tags = ?10, worker_id = ?11, claimed_at = ?12,
1034                    needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
1035                WHERE id = ?16",
1036                params![
1037                    new_title,
1038                    new_description,
1039                    new_status,
1040                    new_phase,
1041                    new_priority.to_string(),
1042                    new_points,
1043                    started_at,
1044                    completed_at,
1045                    now,
1046                    serde_json::to_string(&new_tags)?,
1047                    new_owner,
1048                    new_claimed_at,
1049                    serde_json::to_string(&new_needed_tags)?,
1050                    serde_json::to_string(&new_wanted_tags)?,
1051                    new_time_estimate_ms,
1052                    task_id,
1053                ],
1054            )?;
1055
1056            // Sync tags to junction tables if changed
1057            if new_tags != task.tags {
1058                sync_task_tags(&tx, task_id, &new_tags)?;
1059            }
1060            if new_needed_tags != task.needed_tags {
1061                sync_needed_tags(&tx, task_id, &new_needed_tags)?;
1062            }
1063            if new_wanted_tags != task.wanted_tags {
1064                sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
1065            }
1066
1067            // Check for unblocked tasks if this task transitioned FROM blocking TO non-blocking
1068            let (unblocked, auto_advanced) = if status_changed {
1069                let was_blocking = states_config.is_blocking_state(&task.status);
1070                let is_blocking = states_config.is_blocking_state(&new_status);
1071
1072                if was_blocking && !is_blocking {
1073                    super::deps::propagate_unblock_effects(
1074                        &tx,
1075                        task_id,
1076                        Some(agent_id),
1077                        states_config,
1078                        deps_config,
1079                        auto_advance,
1080                    )?
1081                } else {
1082                    (vec![], vec![])
1083                }
1084            } else {
1085                (vec![], vec![])
1086            };
1087
1088            // Auto-rollup: if this task moved to a non-blocking state and auto_rollup is enabled,
1089            // check if its parent (and recursively grandparents) should auto-complete.
1090            // We check !is_blocking rather than is_terminal because "completed" and "failed"
1091            // may have reopen exits but are still considered "done" for rollup purposes.
1092            let new_is_non_blocking = !states_config.is_blocking_state(&new_status);
1093            let auto_completed = if status_changed && new_is_non_blocking && auto_advance.auto_rollup {
1094                propagate_auto_rollup(
1095                    &tx,
1096                    task_id,
1097                    agent_id,
1098                    states_config,
1099                )?
1100            } else {
1101                vec![]
1102            };
1103
1104            tx.commit()?;
1105
1106            Ok((Task {
1107                id: task_id.to_string(),
1108                title: new_title,
1109                description: new_description,
1110                status: new_status,
1111                phase: new_phase,
1112                priority: new_priority,
1113                points: new_points,
1114                tags: new_tags,
1115                needed_tags: new_needed_tags,
1116                wanted_tags: new_wanted_tags,
1117                time_estimate_ms: new_time_estimate_ms,
1118                started_at,
1119                completed_at,
1120                updated_at: now,
1121                worker_id: new_owner,
1122                claimed_at: new_claimed_at,
1123                ..task
1124            }, unblocked, auto_advanced, auto_completed))
1125        })
1126    }
1127
1128    /// Delete a task (soft delete by default, hard delete with obliterate=true).
1129    ///
1130    /// - `worker_id`: The worker attempting to delete (required for ownership check)
1131    /// - `cascade`: Whether to delete children (default: false)
1132    /// - `reason`: Optional reason for deletion
1133    /// - `obliterate`: If true, permanently deletes the task; if false (default), soft deletes
1134    /// - `force`: If true, allows deletion even if owned by another worker
1135    pub fn delete_task(
1136        &self,
1137        task_id: &str,
1138        worker_id: &str,
1139        cascade: bool,
1140        reason: Option<String>,
1141        obliterate: bool,
1142        force: bool,
1143    ) -> Result<()> {
1144        let now = now_ms();
1145
1146        self.with_conn_mut(|conn| {
1147            let tx = conn.transaction()?;
1148
1149            // Get the task to check ownership
1150            let task = get_task_internal(&tx, task_id)?
1151                .ok_or_else(|| anyhow!("Task not found"))?;
1152
1153            // Check ownership - reject if claimed by another worker (unless force)
1154            if let Some(ref owner) = task.worker_id
1155                && owner != worker_id && !force {
1156                    return Err(anyhow!(
1157                        "Task is claimed by worker '{}'. Use force=true to override.",
1158                        owner
1159                    ));
1160                }
1161
1162            if obliterate {
1163                // Hard delete - permanently remove from database
1164                if cascade {
1165                    // Collect all descendant IDs via recursive CTE, then clean up
1166                    // file_locks (FK without ON DELETE CASCADE) and delete tasks
1167                    let mut stmt = tx.prepare(
1168                        "WITH RECURSIVE descendants AS (
1169                            SELECT ?1 AS id
1170                            UNION ALL
1171                            SELECT dep.to_task_id FROM dependencies dep
1172                            INNER JOIN descendants d ON dep.from_task_id = d.id
1173                            WHERE dep.dep_type = 'contains'
1174                        )
1175                        SELECT id FROM descendants",
1176                    )?;
1177                    let ids: Vec<String> = stmt
1178                        .query_map(params![task_id], |row| row.get(0))?
1179                        .collect::<Result<Vec<_>, _>>()?;
1180                    drop(stmt);
1181
1182                    if !ids.is_empty() {
1183                        let placeholders: String = ids.iter().enumerate()
1184                            .map(|(i, _)| format!("?{}", i + 1))
1185                            .collect::<Vec<_>>()
1186                            .join(",");
1187                        let sql_locks = format!(
1188                            "DELETE FROM file_locks WHERE task_id IN ({})", placeholders
1189                        );
1190                        let sql_tasks = format!(
1191                            "DELETE FROM tasks WHERE id IN ({})", placeholders
1192                        );
1193                        let params: Vec<&dyn rusqlite::types::ToSql> =
1194                            ids.iter().map(|id| id as &dyn rusqlite::types::ToSql).collect();
1195                        tx.execute(&sql_locks, params.as_slice())?;
1196                        tx.execute(&sql_tasks, params.as_slice())?;
1197                    }
1198                } else {
1199                    // Check for children via dependencies
1200                    let child_count: i32 = tx.query_row(
1201                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1202                        params![task_id],
1203                        |row| row.get(0),
1204                    )?;
1205
1206                    if child_count > 0 {
1207                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1208                    }
1209
1210                    // Clean up file_locks first (FK without ON DELETE CASCADE)
1211                    tx.execute("DELETE FROM file_locks WHERE task_id = ?1", params![task_id])?;
1212                    tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1213                }
1214            } else {
1215                // Soft delete - set deleted_at, deleted_by, deleted_reason
1216                if cascade {
1217                    // Soft delete all descendants
1218                    tx.execute(
1219                        "WITH RECURSIVE descendants AS (
1220                            SELECT ?1 AS id
1221                            UNION ALL
1222                            SELECT dep.to_task_id FROM dependencies dep
1223                            INNER JOIN descendants d ON dep.from_task_id = d.id
1224                            WHERE dep.dep_type = 'contains'
1225                        )
1226                        UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1227                        WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1228                        params![task_id, now, worker_id, reason],
1229                    )?;
1230                } else {
1231                    // Check for children via dependencies
1232                    let child_count: i32 = tx.query_row(
1233                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1234                        params![task_id],
1235                        |row| row.get(0),
1236                    )?;
1237
1238                    if child_count > 0 {
1239                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1240                    }
1241
1242                    tx.execute(
1243                        "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1244                        params![now, worker_id, reason, task_id],
1245                    )?;
1246                }
1247            }
1248
1249            tx.commit()?;
1250            Ok(())
1251        })
1252    }
1253
1254    /// List tasks with optional filters.
1255    /// Returns full Task objects. Excludes soft-deleted tasks.
1256    pub fn list_tasks(&self, query: ListTasksQuery<'_>) -> Result<Vec<Task>> {
1257        let ListTasksQuery {
1258            status,
1259            phase,
1260            owner,
1261            parent_id,
1262            limit,
1263            offset,
1264            sort_by,
1265            sort_order,
1266        } = query;
1267        self.with_conn(|conn| {
1268            let mut sql = String::from(
1269                "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1270            );
1271            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1272
1273            if let Some(s) = status {
1274                sql.push_str(" AND t.status = ?");
1275                params_vec.push(Box::new(s.to_string()));
1276            }
1277
1278            if let Some(p) = phase {
1279                sql.push_str(" AND t.phase = ?");
1280                params_vec.push(Box::new(p.to_string()));
1281            }
1282
1283            if let Some(o) = owner {
1284                sql.push_str(" AND t.worker_id = ?");
1285                params_vec.push(Box::new(o.to_string()));
1286            }
1287
1288            // Handle parent filtering via dependencies table
1289            if let Some(p) = parent_id {
1290                match p {
1291                    Some(pid) => {
1292                        sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1293                        params_vec.push(Box::new(pid.to_string()));
1294                    }
1295                    None => {
1296                        // Root tasks: not contained by any other task
1297                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1298                    }
1299                }
1300            }
1301
1302            // Build ORDER BY clause
1303            let order_clause = build_order_clause(sort_by, sort_order);
1304            sql.push_str(&format!(" ORDER BY {}", order_clause));
1305
1306            if let Some(l) = limit {
1307                sql.push_str(&format!(" LIMIT {}", l));
1308            }
1309
1310            if offset > 0 {
1311                sql.push_str(&format!(" OFFSET {}", offset));
1312            }
1313
1314            let params_refs: Vec<&dyn rusqlite::ToSql> =
1315                params_vec.iter().map(|b| b.as_ref()).collect();
1316
1317            let mut stmt = conn.prepare(&sql)?;
1318            let tasks = stmt
1319                .query_map(params_refs.as_slice(), parse_task_row)?
1320                .filter_map(|r| r.ok())
1321                .collect();
1322
1323            Ok(tasks)
1324        })
1325    }
1326
1327    /// Set the current thought for tasks owned by an agent.
1328    pub fn set_thought(
1329        &self,
1330        agent_id: &str,
1331        thought: Option<String>,
1332        task_ids: Option<Vec<String>>,
1333    ) -> Result<i32> {
1334        let now = now_ms();
1335
1336        self.with_conn(|conn| {
1337            let updated = if let Some(ids) = task_ids {
1338                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1339                let sql = format!(
1340                    "UPDATE tasks SET current_thought = ?, updated_at = ?
1341                     WHERE worker_id = ? AND id IN ({})",
1342                    placeholders.join(", ")
1343                );
1344
1345                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1346                params_vec.push(Box::new(thought.clone()));
1347                params_vec.push(Box::new(now));
1348                params_vec.push(Box::new(agent_id.to_string()));
1349                for id in &ids {
1350                    params_vec.push(Box::new(id.clone()));
1351                }
1352
1353                let params_refs: Vec<&dyn rusqlite::ToSql> =
1354                    params_vec.iter().map(|b| b.as_ref()).collect();
1355                conn.execute(&sql, params_refs.as_slice())?
1356            } else {
1357                conn.execute(
1358                    "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1359                    params![thought, now, agent_id],
1360                )?
1361            };
1362
1363            Ok(updated as i32)
1364        })
1365    }
1366
1367    /// Log time for a task.
1368    pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1369        let now = now_ms();
1370
1371        self.with_conn(|conn| {
1372            conn.execute(
1373                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1374                 WHERE id = ?3",
1375                params![duration_ms, now, task_id],
1376            )?;
1377
1378            let total: i64 = conn.query_row(
1379                "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1380                params![task_id],
1381                |row| row.get(0),
1382            )?;
1383
1384            Ok(total)
1385        })
1386    }
1387
1388    /// Log metrics and cost for a task.
1389    /// Values in the metrics array are aggregated (added) to existing values.
1390    pub fn log_metrics(
1391        &self,
1392        task_id: &str,
1393        cost_usd: Option<f64>,
1394        values: &[i64],
1395    ) -> Result<Task> {
1396        let now = now_ms();
1397
1398        self.with_conn(|conn| {
1399            let task =
1400                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1401
1402            // Aggregate metrics (add new values to existing)
1403            let mut new_metrics = task.metrics;
1404            for (i, &val) in values.iter().take(8).enumerate() {
1405                new_metrics[i] += val;
1406            }
1407
1408            let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1409
1410            conn.execute(
1411                "UPDATE tasks SET
1412                    metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1413                    metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1414                    cost_usd = ?9, updated_at = ?10
1415                WHERE id = ?11",
1416                params![
1417                    new_metrics[0],
1418                    new_metrics[1],
1419                    new_metrics[2],
1420                    new_metrics[3],
1421                    new_metrics[4],
1422                    new_metrics[5],
1423                    new_metrics[6],
1424                    new_metrics[7],
1425                    new_cost_usd,
1426                    now,
1427                    task_id,
1428                ],
1429            )?;
1430
1431            Ok(Task {
1432                cost_usd: new_cost_usd,
1433                metrics: new_metrics,
1434                updated_at: now,
1435                ..task
1436            })
1437        })
1438    }
1439
1440    /// Claim a task for an agent.
1441    /// Uses the first timed state (typically "working") as the claiming state.
1442    pub fn claim_task(
1443        &self,
1444        task_id: &str,
1445        agent_id: &str,
1446        states_config: &StatesConfig,
1447    ) -> Result<Task> {
1448        let now = now_ms();
1449
1450        // Find the first timed state to use for claiming (typically "working")
1451        let claim_status = states_config
1452            .definitions
1453            .iter()
1454            .find(|(_, def)| def.timed)
1455            .map(|(name, _)| name.as_str())
1456            .unwrap_or("working");
1457
1458        self.with_conn(|conn| {
1459            // Get the task (using internal helper to avoid deadlock)
1460            let task =
1461                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1462
1463            // Check if already claimed
1464            if task.worker_id.is_some() {
1465                return Err(anyhow!("Task is already claimed"));
1466            }
1467
1468            // Validate state transition
1469            if !states_config.is_valid_transition(&task.status, claim_status) {
1470                let exits = states_config.get_exits(&task.status);
1471                return Err(anyhow!(
1472                    "Cannot claim task in state '{}'. Allowed transitions: {:?}. \
1473Tasks must transition through a timed state (e.g. working) for time tracking.",
1474                    task.status,
1475                    exits
1476                ));
1477            }
1478
1479            // Get the agent (using internal helper to avoid deadlock)
1480            let agent =
1481                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1482
1483            // Check tag affinity - needed_tags (AND - must have ALL)
1484            if !task.needed_tags.is_empty() {
1485                for needed in &task.needed_tags {
1486                    if !agent.tags.contains(needed) {
1487                        return Err(anyhow!("Agent missing required tag: {}", needed));
1488                    }
1489                }
1490            }
1491
1492            // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
1493            if !task.wanted_tags.is_empty() {
1494                let has_any = task
1495                    .wanted_tags
1496                    .iter()
1497                    .any(|wanted| agent.tags.contains(wanted));
1498                if !has_any {
1499                    return Err(anyhow!("Agent has none of the wanted tags"));
1500                }
1501            }
1502
1503            // Check max_claims limit
1504            let claim_count = super::agents::get_claim_count_internal(conn, agent_id, states_config)?;
1505            if claim_count >= agent.max_claims {
1506                return Err(anyhow!(
1507                    "Agent '{}' has reached max_claims limit ({}/{}). Release a task first or use force=true to override.",
1508                    agent_id, claim_count, agent.max_claims
1509                ));
1510            }
1511
1512            conn.execute(
1513                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1514                 WHERE id = ?6",
1515                params![agent_id, now, claim_status, now, now, task_id,],
1516            )?;
1517
1518            // Record state transition (accumulates time if coming from timed state)
1519            record_state_transition(
1520                conn,
1521                task_id,
1522                claim_status,
1523                Some(agent_id),
1524                None,
1525                states_config,
1526            )?;
1527
1528            // Refresh agent heartbeat
1529            conn.execute(
1530                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1531                params![now, agent_id],
1532            )?;
1533
1534            Ok(Task {
1535                worker_id: Some(agent_id.to_string()),
1536                claimed_at: Some(now),
1537                status: claim_status.to_string(),
1538                started_at: Some(now),
1539                updated_at: now,
1540                ..task
1541            })
1542        })
1543    }
1544
1545    /// Release a task claim.
1546    pub fn release_task(
1547        &self,
1548        task_id: &str,
1549        agent_id: &str,
1550        states_config: &StatesConfig,
1551    ) -> Result<()> {
1552        let now = now_ms();
1553        let release_status = &states_config.initial;
1554
1555        self.with_conn(|conn| {
1556            let task =
1557                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1558
1559            if task.worker_id.as_deref() != Some(agent_id) {
1560                return Err(anyhow!("Task is not owned by this agent"));
1561            }
1562
1563            // Record state transition (accumulates time if coming from timed state)
1564            record_state_transition(
1565                conn,
1566                task_id,
1567                release_status,
1568                Some(agent_id),
1569                None,
1570                states_config,
1571            )?;
1572
1573            conn.execute(
1574                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1575                 WHERE id = ?3",
1576                params![release_status, now, task_id],
1577            )?;
1578
1579            Ok(())
1580        })
1581    }
1582
1583    /// Force release a task regardless of owner.
1584    pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1585        let now = now_ms();
1586        let release_status = &states_config.initial;
1587
1588        self.with_conn(|conn| {
1589            let task =
1590                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1591
1592            // Record state transition (accumulates time if coming from timed state)
1593            record_state_transition(
1594                conn,
1595                task_id,
1596                release_status,
1597                task.worker_id.as_deref(),
1598                None,
1599                states_config,
1600            )?;
1601
1602            conn.execute(
1603                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1604                 WHERE id = ?3",
1605                params![release_status, now, task_id],
1606            )?;
1607
1608            Ok(())
1609        })
1610    }
1611
1612    /// Force claim a task even if owned by another agent.
1613    pub fn force_claim_task(
1614        &self,
1615        task_id: &str,
1616        agent_id: &str,
1617        states_config: &StatesConfig,
1618    ) -> Result<Task> {
1619        let now = now_ms();
1620
1621        // Find the first timed state to use for claiming (typically "working")
1622        let claim_status = states_config
1623            .definitions
1624            .iter()
1625            .find(|(_, def)| def.timed)
1626            .map(|(name, _)| name.as_str())
1627            .unwrap_or("working");
1628
1629        self.with_conn(|conn| {
1630            // Get the task
1631            let task =
1632                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1633
1634            // Get the agent
1635            let agent =
1636                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1637
1638            // Check tag affinity - needed_tags (AND)
1639            if !task.needed_tags.is_empty() {
1640                for needed in &task.needed_tags {
1641                    if !agent.tags.contains(needed) {
1642                        return Err(anyhow!("Agent missing required tag: {}", needed));
1643                    }
1644                }
1645            }
1646
1647            // Check tag affinity - wanted_tags (OR)
1648            if !task.wanted_tags.is_empty() {
1649                let has_any = task
1650                    .wanted_tags
1651                    .iter()
1652                    .any(|wanted| agent.tags.contains(wanted));
1653                if !has_any {
1654                    return Err(anyhow!("Agent has none of the wanted tags"));
1655                }
1656            }
1657
1658            conn.execute(
1659                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1660                 WHERE id = ?6",
1661                params![agent_id, now, claim_status, now, now, task_id,],
1662            )?;
1663
1664            // Record state transition (accumulates time if coming from timed state)
1665            record_state_transition(
1666                conn,
1667                task_id,
1668                claim_status,
1669                Some(agent_id),
1670                None,
1671                states_config,
1672            )?;
1673
1674            // Refresh agent heartbeat
1675            conn.execute(
1676                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1677                params![now, agent_id],
1678            )?;
1679
1680            Ok(Task {
1681                worker_id: Some(agent_id.to_string()),
1682                claimed_at: Some(now),
1683                status: claim_status.to_string(),
1684                started_at: task.started_at.or(Some(now)),
1685                updated_at: now,
1686                ..task
1687            })
1688        })
1689    }
1690
1691    /// Release a task claim with a specified state.
1692    pub fn release_task_with_state(
1693        &self,
1694        task_id: &str,
1695        agent_id: &str,
1696        state: &str,
1697        states_config: &StatesConfig,
1698    ) -> Result<()> {
1699        let now = now_ms();
1700
1701        self.with_conn(|conn| {
1702            let task =
1703                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1704
1705            if task.worker_id.as_deref() != Some(agent_id) {
1706                return Err(anyhow!("Task is not owned by this agent"));
1707            }
1708
1709            // Validate state exists
1710            if !states_config.is_valid_state(state) {
1711                return Err(anyhow!(
1712                    "Invalid state '{}'. Valid states: {:?}",
1713                    state,
1714                    states_config.state_names()
1715                ));
1716            }
1717
1718            // Validate transition
1719            if !states_config.is_valid_transition(&task.status, state) {
1720                let exits = states_config.get_exits(&task.status);
1721                return Err(anyhow!(
1722                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}. \
1723Tasks must transition through a timed state (e.g. working) for time tracking; \
1724skipping directly to a terminal state is not permitted.",
1725                    task.status,
1726                    state,
1727                    exits
1728                ));
1729            }
1730
1731            // Set completed_at when entering completed status (even if it has reopen exits)
1732            let completed_at = if state == "completed" {
1733                Some(now)
1734            } else {
1735                None
1736            };
1737
1738            // Record state transition (accumulates time if coming from timed state)
1739            record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1740
1741            conn.execute(
1742                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1743                 WHERE id = ?4",
1744                params![state, completed_at, now, task_id],
1745            )?;
1746
1747            Ok(())
1748        })
1749    }
1750
1751    /// Force release stale claims.
1752    pub fn force_release_stale(
1753        &self,
1754        timeout_seconds: i64,
1755        states_config: &StatesConfig,
1756    ) -> Result<i32> {
1757        let now = now_ms();
1758        let cutoff = now - (timeout_seconds * 1000);
1759        let release_status = &states_config.initial;
1760
1761        self.with_conn(|conn| {
1762            let updated = conn.execute(
1763                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1764                 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1765                params![release_status, now, cutoff],
1766            )?;
1767
1768            Ok(updated as i32)
1769        })
1770    }
1771
1772    /// Complete a task and release file locks held by the agent.
1773    /// Uses "completed" state by default, which should be a terminal state.
1774    /// Checks that all children (via 'contains' dependencies) are complete.
1775    pub fn complete_task(
1776        &self,
1777        task_id: &str,
1778        agent_id: &str,
1779        states_config: &StatesConfig,
1780    ) -> Result<Task> {
1781        let now = now_ms();
1782
1783        // Find a terminal state to use (prefer "completed" if it exists)
1784        let complete_status = if states_config.definitions.contains_key("completed") {
1785            "completed"
1786        } else {
1787            // Find any terminal state
1788            states_config
1789                .definitions
1790                .iter()
1791                .find(|(_, def)| def.exits.is_empty())
1792                .map(|(name, _)| name.as_str())
1793                .unwrap_or("completed")
1794        };
1795
1796        self.with_conn_mut(|conn| {
1797            let tx = conn.transaction()?;
1798
1799            // Get the task
1800            let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1801            let task = stmt
1802                .query_row(params![task_id], parse_task_row)
1803                .map_err(|_| anyhow!("Task not found"))?;
1804            drop(stmt);
1805
1806            // Verify ownership
1807            if task.worker_id.as_deref() != Some(agent_id) {
1808                return Err(anyhow!("Task is not owned by this agent"));
1809            }
1810
1811            // Check for incomplete children (blocking completion)
1812            let incomplete_children: i32 = tx.query_row(
1813                "SELECT COUNT(*) FROM dependencies d
1814                 INNER JOIN tasks child ON d.to_task_id = child.id
1815                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1816                 AND child.status IN (SELECT value FROM json_each(?2))",
1817                params![
1818                    task_id,
1819                    serde_json::to_string(&states_config.blocking_states)?
1820                ],
1821                |row| row.get(0),
1822            )?;
1823
1824            if incomplete_children > 0 {
1825                return Err(anyhow!(
1826                    "Cannot complete task: {} child task(s) are not complete",
1827                    incomplete_children
1828                ));
1829            }
1830
1831            // Validate transition
1832            if !states_config.is_valid_transition(&task.status, complete_status) {
1833                let exits = states_config.get_exits(&task.status);
1834                return Err(anyhow!(
1835                    "Cannot complete task in state '{}'. Allowed transitions: {:?}. \
1836Tasks must transition through a timed state (e.g. working) for time tracking.",
1837                    task.status,
1838                    exits
1839                ));
1840            }
1841
1842            // Record state transition (accumulates time from timed state)
1843            record_state_transition(
1844                &tx,
1845                task_id,
1846                complete_status,
1847                Some(agent_id),
1848                None,
1849                states_config,
1850            )?;
1851
1852            // Update task to completed
1853            tx.execute(
1854                "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1855                 worker_id = NULL, claimed_at = NULL
1856                 WHERE id = ?4",
1857                params![complete_status, now, now, task_id],
1858            )?;
1859
1860            // Release file locks associated with this task (for auto-cleanup)
1861            tx.execute(
1862                "DELETE FROM file_locks WHERE task_id = ?1",
1863                params![task_id],
1864            )?;
1865
1866            // Refresh agent heartbeat
1867            tx.execute(
1868                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1869                params![now, agent_id],
1870            )?;
1871
1872            tx.commit()?;
1873
1874            Ok(Task {
1875                status: complete_status.to_string(),
1876                completed_at: Some(now),
1877                updated_at: now,
1878                worker_id: None,
1879                claimed_at: None,
1880                ..task
1881            })
1882        })
1883    }
1884
1885    /// Get all tasks. Excludes soft-deleted tasks.
1886    pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1887        self.with_conn(|conn| {
1888            let mut stmt =
1889                conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1890            let tasks = stmt
1891                .query_map([], parse_task_row)?
1892                .filter_map(|r| r.ok())
1893                .collect();
1894            Ok(tasks)
1895        })
1896    }
1897
1898    /// Get tasks by status.
1899    #[allow(dead_code)]
1900    pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1901        self.with_conn(|conn| {
1902            let mut stmt =
1903                conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1904            let tasks = stmt
1905                .query_map(params![status], parse_task_row)?
1906                .filter_map(|r| r.ok())
1907                .collect();
1908            Ok(tasks)
1909        })
1910    }
1911
1912    /// Get claimed tasks. Excludes soft-deleted tasks.
1913    pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1914        self.with_conn(|conn| {
1915            let tasks = if let Some(aid) = agent_id {
1916                let mut stmt = conn
1917                    .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1918                stmt.query_map(params![aid], parse_task_row)?
1919                    .filter_map(|r| r.ok())
1920                    .collect()
1921            } else {
1922                let mut stmt = conn.prepare(
1923                    "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1924                )?;
1925                stmt.query_map([], parse_task_row)?
1926                    .filter_map(|r| r.ok())
1927                    .collect()
1928            };
1929
1930            Ok(tasks)
1931        })
1932    }
1933}
1934
1935/// Check if a task's parent should auto-complete because all children are terminal.
1936/// Recursively checks grandparents too. Returns a list of (id, title) pairs for
1937/// all parents that were auto-completed.
1938fn propagate_auto_rollup(
1939    conn: &Connection,
1940    completed_task_id: &str,
1941    agent_id: &str,
1942    states_config: &StatesConfig,
1943) -> Result<Vec<(String, String)>> {
1944    let now = super::now_ms();
1945    let mut auto_completed = Vec::new();
1946
1947    // Find the parent of the completed task (via 'contains' dependency)
1948    let parent_id: Option<String> = conn
1949        .query_row(
1950            "SELECT from_task_id FROM dependencies WHERE to_task_id = ?1 AND dep_type = 'contains'",
1951            params![completed_task_id],
1952            |row| row.get(0),
1953        )
1954        .ok();
1955
1956    let parent_id = match parent_id {
1957        Some(id) => id,
1958        None => return Ok(auto_completed), // No parent, nothing to do
1959    };
1960
1961    // Get the parent task
1962    let parent = match get_task_internal(conn, &parent_id)? {
1963        Some(t) => t,
1964        None => return Ok(auto_completed),
1965    };
1966
1967    // Skip if parent is already in a non-blocking (done) state
1968    if !states_config.is_blocking_state(&parent.status) {
1969        return Ok(auto_completed);
1970    }
1971
1972    // Check if ALL children of the parent are in terminal states.
1973    // blocking_states contains the non-terminal states, so children in those states
1974    // are not yet done.
1975    let non_terminal_children: i32 = conn.query_row(
1976        "SELECT COUNT(*) FROM dependencies d
1977         INNER JOIN tasks child ON d.to_task_id = child.id
1978         WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1979         AND child.status IN (SELECT value FROM json_each(?2))",
1980        params![
1981            parent_id,
1982            serde_json::to_string(&states_config.blocking_states)?
1983        ],
1984        |row| row.get(0),
1985    )?;
1986
1987    if non_terminal_children > 0 {
1988        return Ok(auto_completed); // Some children are still non-terminal
1989    }
1990
1991    // All children are terminal -- auto-complete the parent.
1992    // Find the first timed state (e.g., "working") for proper time tracking.
1993    let first_timed = states_config
1994        .definitions
1995        .iter()
1996        .find(|(_, def)| def.timed)
1997        .map(|(name, _)| name.clone())
1998        .unwrap_or_else(|| "working".to_string());
1999
2000    let parent_title = parent.title.clone();
2001
2002    // Determine the current effective state for transition validation
2003    let mut current_state = parent.status.clone();
2004
2005    // If parent is in an initial/untimed state, transition through the timed state first
2006    if !states_config.is_timed_state(&current_state) {
2007        // Check the transition is valid: current -> timed
2008        if states_config.is_valid_transition(&current_state, &first_timed) {
2009            conn.execute(
2010                "UPDATE tasks SET status = ?1, started_at = COALESCE(started_at, ?2), updated_at = ?2 WHERE id = ?3",
2011                params![&first_timed, now, &parent_id],
2012            )?;
2013
2014            record_state_transition(
2015                conn,
2016                &parent_id,
2017                &first_timed,
2018                Some(agent_id),
2019                Some("auto-rollup: transitioning to timed state before completion"),
2020                states_config,
2021            )?;
2022
2023            current_state = first_timed;
2024        } else {
2025            // Cannot transition to timed state -- skip auto-rollup for this parent
2026            return Ok(auto_completed);
2027        }
2028    }
2029
2030    // Now transition from the current (timed) state to completed
2031    let completed_state = "completed";
2032    if !states_config.is_valid_transition(&current_state, completed_state) {
2033        // Cannot transition to completed -- skip
2034        return Ok(auto_completed);
2035    }
2036
2037    conn.execute(
2038        "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?2, worker_id = NULL, claimed_at = NULL WHERE id = ?3",
2039        params![completed_state, now, &parent_id],
2040    )?;
2041
2042    // Release file locks for the parent
2043    conn.execute(
2044        "DELETE FROM file_locks WHERE task_id = ?1",
2045        params![&parent_id],
2046    )?;
2047
2048    record_state_transition(
2049        conn,
2050        &parent_id,
2051        completed_state,
2052        Some(agent_id),
2053        Some("auto-rollup: all children reached terminal state"),
2054        states_config,
2055    )?;
2056
2057    auto_completed.push((parent_id.clone(), parent_title));
2058
2059    // Recurse: check if completing this parent triggers its own parent to auto-complete
2060    let mut grandparent_completed =
2061        propagate_auto_rollup(conn, &parent_id, agent_id, states_config)?;
2062    auto_completed.append(&mut grandparent_completed);
2063
2064    Ok(auto_completed)
2065}
2066
2067/// Helper function to create task tree recursively within a transaction.
2068/// Creates dependencies from parent to children using child_type.
2069/// Creates dependencies between siblings using sibling_type.
2070/// Supports referencing existing tasks via ref_id.
2071#[allow(clippy::too_many_arguments)]
2072fn create_tree_recursive(
2073    conn: &Connection,
2074    input: &TaskTreeInput,
2075    parent_id: Option<&str>,
2076    prev_sibling_id: Option<&str>,
2077    child_type: Option<&str>,
2078    sibling_type: Option<&str>,
2079    all_ids: &mut Vec<String>,
2080    phase_warnings: &mut Vec<String>,
2081    tag_warnings: &mut Vec<String>,
2082    states_config: &StatesConfig,
2083    phases_config: &PhasesConfig,
2084    tags_config: &TagsConfig,
2085    ids_config: &IdsConfig,
2086) -> Result<String> {
2087    // Check if this node references an existing task
2088    let task_id = if let Some(ref ref_id) = input.ref_id {
2089        // Verify the referenced task exists
2090        let exists: bool = conn.query_row(
2091            "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
2092            params![ref_id],
2093            |row| row.get(0),
2094        )?;
2095        if !exists {
2096            return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
2097        }
2098        ref_id.clone()
2099    } else {
2100        // Create a new task
2101        let task_id = input
2102            .id
2103            .clone()
2104            .unwrap_or_else(|| generate_task_id(ids_config));
2105        let now = now_ms();
2106        let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
2107        let initial_status = &states_config.initial;
2108
2109        // Derive title: use explicit title, or derive from description, or empty
2110        let title = input.title.clone().unwrap_or_else(|| {
2111            input
2112                .description
2113                .as_deref()
2114                .map(|d| crate::format::truncate_title(d).into_owned())
2115                .unwrap_or_default()
2116        });
2117
2118        // Check phase validity
2119        if let Some(ref phase) = input.phase
2120            && let Some(warning) = phases_config.check_phase(phase)?
2121        {
2122            phase_warnings.push(format!("Task '{}': {}", task_id, warning));
2123        }
2124
2125        let needed_tags = input.needed_tags.clone().unwrap_or_default();
2126        let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
2127        let tags = input.tags.clone().unwrap_or_default();
2128
2129        // Check tag validity for all tag types
2130        for warning in tags_config.validate_tags(&tags)? {
2131            tag_warnings.push(format!("Task '{}': {}", task_id, warning));
2132        }
2133        for warning in tags_config.validate_tags(&needed_tags)? {
2134            tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
2135        }
2136        for warning in tags_config.validate_tags(&wanted_tags)? {
2137            tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
2138        }
2139
2140        let needed_tags_json = serde_json::to_string(&needed_tags)?;
2141        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
2142        let tags_json = serde_json::to_string(&tags)?;
2143
2144        conn.execute(
2145            "INSERT INTO tasks (
2146                id, title, description, status, phase, priority,
2147                needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
2148            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
2149            params![
2150                &task_id,
2151                &title,
2152                &input.description,
2153                initial_status,
2154                &input.phase,
2155                priority.to_string(),
2156                needed_tags_json,
2157                wanted_tags_json,
2158                tags_json,
2159                input.points,
2160                input.time_estimate_ms,
2161                now,
2162                now,
2163            ],
2164        )?;
2165
2166        // Record initial state transition
2167        record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
2168
2169        // Sync tags to junction tables for indexed lookups
2170        sync_task_tags(conn, &task_id, &tags)?;
2171        sync_needed_tags(conn, &task_id, &needed_tags)?;
2172        sync_wanted_tags(conn, &task_id, &wanted_tags)?;
2173
2174        task_id
2175    };
2176
2177    // Create dependency from parent if child_type is specified
2178    if let (Some(pid), Some(ct)) = (parent_id, child_type) {
2179        Database::add_dependency_internal(conn, pid, &task_id, ct)?;
2180    }
2181
2182    // Create dependency from previous sibling if sibling_type is specified
2183    if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
2184        Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
2185    }
2186
2187    all_ids.push(task_id.clone());
2188
2189    // Create blocked_by dependencies: each referenced task "blocks" this task
2190    for blocker_id in &input.blocked_by {
2191        // Check if blocker exists in previously created tree nodes or in the database
2192        let blocker_exists = all_ids.iter().any(|id| id == blocker_id) || {
2193            let exists: bool = conn.query_row(
2194                "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
2195                params![blocker_id],
2196                |row| row.get(0),
2197            )?;
2198            exists
2199        };
2200        if !blocker_exists {
2201            return Err(anyhow::anyhow!(
2202                "blocked_by task '{}' not found (not created earlier in tree and not in database)",
2203                blocker_id
2204            ));
2205        }
2206        Database::add_dependency_internal(conn, blocker_id, &task_id, "blocks")?;
2207    }
2208
2209    // Create children with dependencies based on child_type and sibling_type
2210    let mut prev_child_id: Option<String> = None;
2211    for child in input.children.iter() {
2212        let child_id = create_tree_recursive(
2213            conn,
2214            child,
2215            Some(&task_id),
2216            prev_child_id.as_deref(),
2217            child_type,
2218            sibling_type,
2219            all_ids,
2220            phase_warnings,
2221            tag_warnings,
2222            states_config,
2223            phases_config,
2224            tags_config,
2225            ids_config,
2226        )?;
2227        prev_child_id = Some(child_id);
2228    }
2229
2230    Ok(task_id)
2231}