Skip to main content

task_graph_mcp/db/
tasks.rs

1//! Task CRUD and tree operations.
2
3use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6    AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10    PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11    parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17/// Options for creating a task tree from nested input.
18#[derive(Debug)]
19pub struct CreateTreeOptions<'a> {
20    pub input: TaskTreeInput,
21    pub parent_id: Option<String>,
22    pub child_type: Option<String>,
23    pub sibling_type: Option<String>,
24    pub states_config: &'a StatesConfig,
25    pub phases_config: &'a PhasesConfig,
26    pub tags_config: &'a TagsConfig,
27    pub ids_config: &'a IdsConfig,
28}
29
30/// Query parameters for listing tasks with optional filters.
31#[derive(Debug, Default)]
32pub struct ListTasksQuery<'a> {
33    pub status: Option<&'a str>,
34    pub phase: Option<&'a str>,
35    pub owner: Option<&'a str>,
36    pub parent_id: Option<Option<&'a str>>,
37    pub limit: Option<i32>,
38    pub offset: i32,
39    pub sort_by: Option<&'a str>,
40    pub sort_order: Option<&'a str>,
41}
42
43/// Generate a petname-based task ID using the large wordlist.
44/// Uses the configured number of words and case style.
45fn generate_task_id(ids_config: &IdsConfig) -> String {
46    let words = ids_config.task_id_words;
47    let case = ids_config.id_case;
48
49    // Generate with hyphen separator first (petname's default format)
50    let base = Petnames::medium()
51        .generate_one(words, "-")
52        .unwrap_or_else(|| format!("task-{}", super::now_ms()));
53
54    // Convert to desired case
55    case.convert(&base)
56}
57
58/// Build an ORDER BY clause from sort_by and sort_order parameters.
59/// Returns a safe SQL ORDER BY expression.
60fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
61    let field = match sort_by {
62        Some("priority") => "CAST(t.priority AS INTEGER)",
63        Some("created_at") => "t.created_at",
64        Some("updated_at") => "t.updated_at",
65        _ => "t.created_at", // default
66    };
67
68    let order = match sort_order {
69        Some("asc") => "ASC",
70        Some("desc") => "DESC",
71        _ => {
72            // Default: priority is descending (higher number = more important), dates are descending
73            "DESC"
74        }
75    };
76
77    format!("{} {}", field, order)
78}
79
80// =============================================================================
81// Junction table helpers for tag management
82// =============================================================================
83
84/// Sync task tags to the task_tags junction table.
85/// Replaces all existing tags for the task.
86fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
87    conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
88    for tag in tags {
89        conn.execute(
90            "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
91            params![task_id, tag],
92        )?;
93    }
94    Ok(())
95}
96
97/// Sync needed tags (agent must have ALL) to the task_needed_tags junction table.
98fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
99    conn.execute(
100        "DELETE FROM task_needed_tags WHERE task_id = ?1",
101        params![task_id],
102    )?;
103    for tag in tags {
104        conn.execute(
105            "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
106            params![task_id, tag],
107        )?;
108    }
109    Ok(())
110}
111
112/// Sync wanted tags (agent must have ANY) to the task_wanted_tags junction table.
113fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
114    conn.execute(
115        "DELETE FROM task_wanted_tags WHERE task_id = ?1",
116        params![task_id],
117    )?;
118    for tag in tags {
119        conn.execute(
120            "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
121            params![task_id, tag],
122        )?;
123    }
124    Ok(())
125}
126
127pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
128    let id: String = row.get("id")?;
129    let title: String = row.get("title")?;
130    let description: Option<String> = row.get("description")?;
131    let status: String = row.get("status")?;
132    let phase: Option<String> = row.get("phase")?;
133    let priority: String = row.get("priority")?;
134    let worker_id: Option<String> = row.get("worker_id")?;
135    let claimed_at: Option<i64> = row.get("claimed_at")?;
136
137    let needed_tags_json: Option<String> = row.get("needed_tags")?;
138    let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
139    let tags_json: Option<String> = row.get("tags")?;
140
141    let points: Option<i32> = row.get("points")?;
142    let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
143    let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
144    let started_at: Option<i64> = row.get("started_at")?;
145    let completed_at: Option<i64> = row.get("completed_at")?;
146
147    let current_thought: Option<String> = row.get("current_thought")?;
148
149    let cost_usd: f64 = row.get("cost_usd")?;
150    let metric_0: i64 = row.get("metric_0")?;
151    let metric_1: i64 = row.get("metric_1")?;
152    let metric_2: i64 = row.get("metric_2")?;
153    let metric_3: i64 = row.get("metric_3")?;
154    let metric_4: i64 = row.get("metric_4")?;
155    let metric_5: i64 = row.get("metric_5")?;
156    let metric_6: i64 = row.get("metric_6")?;
157    let metric_7: i64 = row.get("metric_7")?;
158
159    let created_at: i64 = row.get("created_at")?;
160    let updated_at: i64 = row.get("updated_at")?;
161
162    Ok(Task {
163        id,
164        title,
165        description,
166        status,
167        phase,
168        priority: parse_priority(&priority),
169        worker_id,
170        claimed_at,
171        needed_tags: needed_tags_json
172            .map(|s| serde_json::from_str(&s).unwrap_or_default())
173            .unwrap_or_default(),
174        wanted_tags: wanted_tags_json
175            .map(|s| serde_json::from_str(&s).unwrap_or_default())
176            .unwrap_or_default(),
177        tags: tags_json
178            .map(|s| serde_json::from_str(&s).unwrap_or_default())
179            .unwrap_or_default(),
180        points,
181        time_estimate_ms,
182        time_actual_ms,
183        started_at,
184        completed_at,
185        current_thought,
186        cost_usd,
187        metrics: [
188            metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
189        ],
190        created_at,
191        updated_at,
192    })
193}
194
195/// Internal helper to get a task using an existing connection (avoids deadlock).
196fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
197    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
198
199    let result = stmt.query_row(params![task_id], parse_task_row);
200
201    match result {
202        Ok(task) => Ok(Some(task)),
203        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
204        Err(e) => Err(e.into()),
205    }
206}
207
208/// Internal helper to get a worker using an existing connection (avoids deadlock).
209fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
210    let mut stmt = conn.prepare(
211        "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
212         FROM workers WHERE id = ?1",
213    )?;
214
215    let result = stmt.query_row(params![worker_id], |row| {
216        let id: String = row.get(0)?;
217        let tags_json: String = row.get(1)?;
218        let max_claims: i32 = row.get(2)?;
219        let registered_at: i64 = row.get(3)?;
220        let last_heartbeat: i64 = row.get(4)?;
221        let last_status: Option<String> = row.get(5)?;
222        let last_phase: Option<String> = row.get(6)?;
223        let workflow: Option<String> = row.get(7)?;
224        let overlays_json: Option<String> = row.get(8)?;
225
226        Ok((
227            id,
228            tags_json,
229            max_claims,
230            registered_at,
231            last_heartbeat,
232            last_status,
233            last_phase,
234            workflow,
235            overlays_json,
236        ))
237    });
238
239    match result {
240        Ok((
241            id,
242            tags_json,
243            max_claims,
244            registered_at,
245            last_heartbeat,
246            last_status,
247            last_phase,
248            workflow,
249            overlays_json,
250        )) => {
251            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
252            let overlays: Vec<String> = overlays_json
253                .as_deref()
254                .and_then(|s| serde_json::from_str(s).ok())
255                .unwrap_or_default();
256            Ok(Some(Worker {
257                id,
258                tags,
259                max_claims,
260                registered_at,
261                last_heartbeat,
262                last_status,
263                last_phase,
264                workflow,
265                overlays,
266            }))
267        }
268        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
269        Err(e) => Err(e.into()),
270    }
271}
272
273impl Database {
274    /// Create a new task.
275    /// If id is provided, uses it as the task ID; otherwise generates a petname ID.
276    /// If parent_id is provided, creates a 'contains' dependency from parent to this task.
277    /// `title` is the short task title; `description` is optional longer detail.
278    #[allow(clippy::too_many_arguments)]
279    pub fn create_task(
280        &self,
281        id: Option<String>,
282        title: String,
283        description: Option<String>,
284        parent_id: Option<String>,
285        phase: Option<String>,
286        priority: Option<Priority>,
287        points: Option<i32>,
288        time_estimate_ms: Option<i64>,
289        agent_tags_all: Option<Vec<String>>,
290        agent_tags_any: Option<Vec<String>>,
291        tags: Option<Vec<String>>,
292        states_config: &StatesConfig,
293        ids_config: &IdsConfig,
294    ) -> Result<Task> {
295        let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
296        let now = now_ms();
297        let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
298        let initial_status = &states_config.initial;
299
300        let needed_tags = agent_tags_all.unwrap_or_default();
301        let wanted_tags = agent_tags_any.unwrap_or_default();
302        let tags = tags.unwrap_or_default();
303        let needed_tags_json = serde_json::to_string(&needed_tags)?;
304        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
305        let tags_json = serde_json::to_string(&tags)?;
306
307        self.with_conn_mut(|conn| {
308            let tx = conn.transaction()?;
309
310            tx.execute(
311                "INSERT INTO tasks (
312                    id, title, description, status, phase, priority,
313                    needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
314                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
315                params![
316                    &task_id,
317                    &title,
318                    &description,
319                    initial_status,
320                    &phase,
321                    priority.to_string(),
322                    needed_tags_json,
323                    wanted_tags_json,
324                    tags_json,
325                    points,
326                    time_estimate_ms,
327                    now,
328                    now,
329                ],
330            )?;
331
332            // Sync tags to junction tables
333            sync_task_tags(&tx, &task_id, &tags)?;
334            sync_needed_tags(&tx, &task_id, &needed_tags)?;
335            sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
336
337            // Create 'contains' dependency if parent_id is provided
338            if let Some(ref pid) = parent_id {
339                Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
340            }
341
342            // Record initial state
343            record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
344
345            tx.commit()?;
346
347            Ok(Task {
348                id: task_id,
349                title,
350                description,
351                status: initial_status.clone(),
352                phase,
353                priority,
354                worker_id: None,
355                claimed_at: None,
356                needed_tags,
357                wanted_tags,
358                tags,
359                points,
360                time_estimate_ms,
361                time_actual_ms: None,
362                started_at: None,
363                completed_at: None,
364                current_thought: None,
365                cost_usd: 0.0,
366                metrics: [0; 8],
367                created_at: now,
368                updated_at: now,
369            })
370        })
371    }
372
373    /// Convenience method to create a task with just a description string.
374    /// Uses the description as both title and description. Useful for tests.
375    pub fn create_task_simple(
376        &self,
377        description: impl Into<String>,
378        states_config: &StatesConfig,
379        ids_config: &IdsConfig,
380    ) -> Result<Task> {
381        let desc = description.into();
382        self.create_task(
383            None,
384            desc.clone(),
385            Some(desc),
386            None,
387            None,
388            None,
389            None,
390            None,
391            None,
392            None,
393            None,
394            states_config,
395            ids_config,
396        )
397    }
398
399    /// Create a task tree from nested input.
400    /// Uses child_type for parent-child dependencies (default: "contains").
401    /// Uses sibling_type for sibling dependencies (default: none/parallel).
402    #[allow(clippy::type_complexity)]
403    pub fn create_task_tree(
404        &self,
405        opts: CreateTreeOptions<'_>,
406    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
407        let mut all_ids = Vec::new();
408        let mut phase_warnings = Vec::new();
409        let mut tag_warnings = Vec::new();
410        // Default child_type to "contains" if not specified
411        let child_type = opts.child_type.or_else(|| Some("contains".to_string()));
412
413        self.with_conn_mut(|conn| {
414            let tx = conn.transaction()?;
415            let root_id = create_tree_recursive(
416                &tx,
417                &opts.input,
418                opts.parent_id.as_deref(),
419                None, // no previous sibling for root
420                child_type.as_deref(),
421                opts.sibling_type.as_deref(),
422                &mut all_ids,
423                &mut phase_warnings,
424                &mut tag_warnings,
425                opts.states_config,
426                opts.phases_config,
427                opts.tags_config,
428                opts.ids_config,
429            )?;
430            tx.commit()?;
431            Ok((root_id, all_ids, phase_warnings, tag_warnings))
432        })
433    }
434
435    /// Get a task by ID.
436    pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
437        self.with_conn(|conn| {
438            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
439
440            let result = stmt.query_row(params![task_id], parse_task_row);
441
442            match result {
443                Ok(task) => Ok(Some(task)),
444                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
445                Err(e) => Err(e.into()),
446            }
447        })
448    }
449
450    /// Rename a task's ID, updating all references atomically.
451    ///
452    /// Disables foreign key enforcement, updates every table that references
453    /// `tasks.id` inside a transaction, then re-enables and verifies FK
454    /// integrity.
455    pub fn rename_task(&self, old_id: &str, new_id: &str) -> Result<()> {
456        // Validate new_id
457        if new_id.is_empty() {
458            return Err(anyhow!("new_id must not be empty"));
459        }
460        if new_id.len() > 64 {
461            return Err(anyhow!("new_id must not exceed 64 characters"));
462        }
463
464        self.with_conn_mut(|conn| {
465            // Pre-check: old_id must exist
466            let exists: bool = conn.query_row(
467                "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
468                params![old_id],
469                |row| row.get(0),
470            )?;
471            if !exists {
472                return Err(anyhow!("Task '{}' not found", old_id));
473            }
474
475            // Pre-check: new_id must not already exist
476            let conflict: bool = conn.query_row(
477                "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
478                params![new_id],
479                |row| row.get(0),
480            )?;
481            if conflict {
482                return Err(anyhow!("Task '{}' already exists", new_id));
483            }
484
485            // Disable FK enforcement for the duration of the update
486            conn.execute_batch("PRAGMA foreign_keys = OFF")?;
487
488            let result = (|| -> Result<()> {
489                let tx = conn.transaction()?;
490
491                // Primary table — triggers tasks_fts_update
492                tx.execute(
493                    "UPDATE tasks SET id = ?1 WHERE id = ?2",
494                    params![new_id, old_id],
495                )?;
496
497                // Attachments — triggers attachments_fts_update per row
498                tx.execute(
499                    "UPDATE attachments SET task_id = ?1 WHERE task_id = ?2",
500                    params![new_id, old_id],
501                )?;
502
503                // Dependencies (both columns)
504                tx.execute(
505                    "UPDATE dependencies SET from_task_id = ?1 WHERE from_task_id = ?2",
506                    params![new_id, old_id],
507                )?;
508                tx.execute(
509                    "UPDATE dependencies SET to_task_id = ?1 WHERE to_task_id = ?2",
510                    params![new_id, old_id],
511                )?;
512
513                // File locks
514                tx.execute(
515                    "UPDATE file_locks SET task_id = ?1 WHERE task_id = ?2",
516                    params![new_id, old_id],
517                )?;
518
519                // Tag junction tables
520                tx.execute(
521                    "UPDATE task_tags SET task_id = ?1 WHERE task_id = ?2",
522                    params![new_id, old_id],
523                )?;
524                tx.execute(
525                    "UPDATE task_needed_tags SET task_id = ?1 WHERE task_id = ?2",
526                    params![new_id, old_id],
527                )?;
528                tx.execute(
529                    "UPDATE task_wanted_tags SET task_id = ?1 WHERE task_id = ?2",
530                    params![new_id, old_id],
531                )?;
532
533                // Sequence table
534                tx.execute(
535                    "UPDATE task_sequence SET task_id = ?1 WHERE task_id = ?2",
536                    params![new_id, old_id],
537                )?;
538
539                tx.commit()?;
540                Ok(())
541            })();
542
543            // Re-enable FK enforcement regardless of success
544            conn.execute_batch("PRAGMA foreign_keys = ON")?;
545
546            // Propagate any error from the transaction
547            result?;
548
549            // Verify FK integrity
550            let mut stmt = conn.prepare("PRAGMA foreign_key_check")?;
551            let violations: Vec<String> = stmt
552                .query_map([], |row| {
553                    let table: String = row.get(0)?;
554                    Ok(table)
555                })?
556                .filter_map(|r| r.ok())
557                .collect();
558
559            if !violations.is_empty() {
560                return Err(anyhow!(
561                    "Foreign key violations after rename in tables: {:?}",
562                    violations
563                ));
564            }
565
566            Ok(())
567        })
568    }
569
570    /// Get a task with all its children (tree).
571    pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
572        let task = self.get_task(task_id)?;
573        match task {
574            None => Ok(None),
575            Some(task) => {
576                let children = self.get_children_recursive(&task.id)?;
577                Ok(Some(TaskTree { task, children }))
578            }
579        }
580    }
581
582    /// Get children recursively.
583    fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
584        let children = self.get_children(parent_id)?;
585        let mut result = Vec::new();
586
587        for child in children {
588            let child_children = self.get_children_recursive(&child.id)?;
589            result.push(TaskTree {
590                task: child,
591                children: child_children,
592            });
593        }
594
595        Ok(result)
596    }
597
598    /// Get direct children of a task (via 'contains' dependency).
599    pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
600        self.with_conn(|conn| {
601            let mut stmt = conn.prepare(
602                "SELECT t.* FROM tasks t
603                 INNER JOIN dependencies d ON t.id = d.to_task_id
604                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
605                 ORDER BY t.created_at",
606            )?;
607
608            let tasks = stmt
609                .query_map(params![parent_id], parse_task_row)?
610                .filter_map(|r| r.ok())
611                .collect();
612
613            Ok(tasks)
614        })
615    }
616
617    /// Update a task.
618    #[allow(clippy::too_many_arguments)]
619    pub fn update_task(
620        &self,
621        task_id: &str,
622        title: Option<String>,
623        description: Option<Option<String>>,
624        status: Option<String>,
625        priority: Option<Priority>,
626        points: Option<Option<i32>>,
627        tags: Option<Vec<String>>,
628        states_config: &StatesConfig,
629    ) -> Result<Task> {
630        let now = now_ms();
631
632        self.with_conn(|conn| {
633            let task =
634                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
635
636            let new_title = title.unwrap_or(task.title.clone());
637            let new_description = description.unwrap_or(task.description.clone());
638            let new_status = status.unwrap_or(task.status.clone());
639            let new_priority = priority.unwrap_or(task.priority);
640            let new_points = points.unwrap_or(task.points);
641            let new_tags = tags.unwrap_or(task.tags.clone());
642
643            // Validate the new status exists
644            if !states_config.is_valid_state(&new_status) {
645                return Err(anyhow!(
646                    "Invalid state '{}'. Valid states: {:?}",
647                    new_status,
648                    states_config.state_names()
649                ));
650            }
651
652            // Validate state transition if status changed
653            if task.status != new_status
654                && !states_config.is_valid_transition(&task.status, &new_status)
655            {
656                let exits = states_config.get_exits(&task.status);
657                return Err(anyhow!(
658                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
659                    task.status,
660                    new_status,
661                    exits
662                ));
663            }
664
665            // Handle status transitions for timestamps
666            // Set started_at when first entering a timed state
667            let started_at =
668                if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
669                    Some(now)
670                } else {
671                    task.started_at
672                };
673
674            // Set completed_at when entering a completed state (terminal or with reopen capability)
675            let completed_at = if new_status == "completed" {
676                Some(now)
677            } else {
678                task.completed_at
679            };
680
681            // Record state transition if status changed (handles time accumulation)
682            if task.status != new_status {
683                record_state_transition(
684                    conn,
685                    task_id,
686                    &new_status,
687                    task.worker_id.as_deref(),
688                    None,
689                    states_config,
690                )?;
691            }
692
693            conn.execute(
694                "UPDATE tasks SET
695                    title = ?1, description = ?2, status = ?3, priority = ?4,
696                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
697                    tags = ?9
698                WHERE id = ?10",
699                params![
700                    new_title,
701                    new_description,
702                    new_status,
703                    new_priority.to_string(),
704                    new_points,
705                    started_at,
706                    completed_at,
707                    now,
708                    serde_json::to_string(&new_tags)?,
709                    task_id,
710                ],
711            )?;
712
713            Ok(Task {
714                id: task_id.to_string(),
715                title: new_title,
716                description: new_description,
717                status: new_status,
718                priority: new_priority,
719                points: new_points,
720                tags: new_tags,
721                started_at,
722                completed_at,
723                updated_at: now,
724                ..task
725            })
726        })
727    }
728
729    /// Update a task with unified claim/release logic.
730    /// - Transition to timed state = CLAIM (set owner, validate tags, check limit)
731    /// - Transition from timed to non-timed = RELEASE (clear owner)
732    /// - Transition to terminal = COMPLETE (check children, release file locks)
733    /// - With assignee = ASSIGN (set owner to assignee, transition to 'assigned' state)
734    /// - Only the owner can update a claimed task (unless force=true)
735    ///
736    /// Returns (task, unblocked, auto_advanced):
737    /// - task: The updated task
738    /// - unblocked: Task IDs that are now ready (all dependencies satisfied)
739    /// - auto_advanced: Subset of unblocked that were actually transitioned
740    #[allow(clippy::too_many_arguments)]
741    pub fn update_task_unified(
742        &self,
743        task_id: &str,
744        agent_id: &str,
745        assignee: Option<&str>,
746        title: Option<String>,
747        description: Option<Option<String>>,
748        status: Option<String>,
749        phase: Option<String>,
750        priority: Option<Priority>,
751        points: Option<Option<i32>>,
752        tags: Option<Vec<String>>,
753        needed_tags: Option<Vec<String>>,
754        wanted_tags: Option<Vec<String>>,
755        time_estimate_ms: Option<i64>,
756        reason: Option<String>,
757        force: bool,
758        states_config: &StatesConfig,
759        deps_config: &DependenciesConfig,
760        auto_advance: &AutoAdvanceConfig,
761    ) -> Result<(Task, Vec<String>, Vec<String>)> {
762        let now = now_ms();
763
764        self.with_conn_mut(|conn| {
765            let tx = conn.transaction()?;
766
767            let task =
768                get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
769
770            // Owner-only validation: if task is claimed, only owner can update (unless force)
771            if let Some(ref current_owner) = task.worker_id
772                && current_owner != agent_id && !force {
773                    return Err(anyhow!(
774                        "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
775                        current_owner
776                    ));
777                }
778
779            let new_title = title.unwrap_or(task.title.clone());
780            let new_description = description.unwrap_or(task.description.clone());
781            // If assignee is set but no explicit status, default to 'assigned' state
782            let new_status = if assignee.is_some() && status.is_none() {
783                "assigned".to_string()
784            } else {
785                status.unwrap_or(task.status.clone())
786            };
787            let new_priority = priority.unwrap_or(task.priority);
788            let new_points = points.unwrap_or(task.points);
789            let new_tags = tags.unwrap_or(task.tags.clone());
790            let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
791            let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
792            let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
793            let new_phase = phase.or(task.phase.clone());
794
795            // Validate the new status exists
796            if !states_config.is_valid_state(&new_status) {
797                return Err(anyhow!(
798                    "Invalid state '{}'. Valid states: {:?}",
799                    new_status,
800                    states_config.state_names()
801                ));
802            }
803
804            // Validate state transition if status changed
805            if task.status != new_status
806                && !states_config.is_valid_transition(&task.status, &new_status) {
807                    let exits = states_config.get_exits(&task.status);
808                    return Err(anyhow!(
809                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
810                        task.status,
811                        new_status,
812                        exits
813                    ));
814                }
815
816            // Determine ownership changes based on state transition
817            let new_is_timed = states_config.is_timed_state(&new_status);
818            let new_is_terminal = states_config.is_terminal_state(&new_status);
819            let current_owner = task.worker_id.as_deref();
820            let is_owned_by_agent = current_owner == Some(agent_id);
821            let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
822
823            let mut new_owner: Option<String> = task.worker_id.clone();
824            let mut new_claimed_at: Option<i64> = task.claimed_at;
825
826            // ASSIGN: Push coordination - coordinator assigns task to another agent
827            // Sets owner without starting the timer (assigned state is untimed)
828            if let Some(target_agent) = assignee {
829                // Verify task is not already claimed (unless force)
830                if is_owned_by_other && !force {
831                    return Err(anyhow!(
832                        "Task is already claimed by agent '{}'. Use force=true to reassign.",
833                        current_owner.unwrap()
834                    ));
835                }
836
837                // Verify the assignee exists
838                let target = get_worker_internal(&tx, target_agent)?
839                    .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
840
841                // Check tag affinity for the assignee
842                if !task.needed_tags.is_empty() {
843                    for needed in &task.needed_tags {
844                        if !target.tags.contains(needed) {
845                            return Err(anyhow!(
846                                "Assignee '{}' missing required tag: {}",
847                                target_agent,
848                                needed
849                            ));
850                        }
851                    }
852                }
853
854                if !task.wanted_tags.is_empty() {
855                    let has_any = task
856                        .wanted_tags
857                        .iter()
858                        .any(|wanted| target.tags.contains(wanted));
859                    if !has_any {
860                        return Err(anyhow!(
861                            "Assignee '{}' has none of the wanted tags: {:?}",
862                            target_agent,
863                            task.wanted_tags
864                        ));
865                    }
866                }
867
868                // Set ownership to the assignee
869                new_owner = Some(target_agent.to_string());
870                new_claimed_at = Some(now);
871            }
872
873            // CLAIM: Transitioning to a timed state and need to take ownership
874            // This handles: non-timed -> timed, OR timed (other owner) -> timed (force claim)
875            if new_is_timed && !is_owned_by_agent {
876                // Already claimed by someone else?
877                if is_owned_by_other && !force {
878                    return Err(anyhow!(
879                        "Task is already claimed by agent '{}'",
880                        current_owner.unwrap()
881                    ));
882                }
883
884                // Check for unsatisfied blocking dependencies (skip if force)
885                if !force {
886                    let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
887                        &tx,
888                        task_id,
889                        states_config,
890                        deps_config,
891                    )?;
892                    if !unsatisfied_blockers.is_empty() {
893                        // Return structured error with blocking task IDs so clients
894                        // can monitor them and retry when they complete
895                        return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
896                    }
897                }
898
899                // Get the agent
900                let agent = get_worker_internal(&tx, agent_id)?
901                    .ok_or_else(|| anyhow!("Agent not found"))?;
902
903                // Check tag affinity - needed_tags (AND - must have ALL)
904                if !task.needed_tags.is_empty() {
905                    for needed in &task.needed_tags {
906                        if !agent.tags.contains(needed) {
907                            return Err(anyhow!("Agent missing required tag: {}", needed));
908                        }
909                    }
910                }
911
912                // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
913                if !task.wanted_tags.is_empty() {
914                    let has_any = task
915                        .wanted_tags
916                        .iter()
917                        .any(|wanted| agent.tags.contains(wanted));
918                    if !has_any {
919                        return Err(anyhow!("Agent has none of the wanted tags"));
920                    }
921                }
922
923                // Set ownership
924                new_owner = Some(agent_id.to_string());
925                new_claimed_at = Some(now);
926
927                // Refresh agent heartbeat
928                tx.execute(
929                    "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
930                    params![now, agent_id],
931                )?;
932            }
933
934            // RELEASE: Transitioning to non-timed state (but not terminal)
935            if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
936                // Verify ownership (unless force)
937                if is_owned_by_other && !force {
938                    return Err(anyhow!("Task is not owned by this agent"));
939                }
940
941                // Clear ownership
942                new_owner = None;
943                new_claimed_at = None;
944            }
945
946            // COMPLETE: Transition to terminal state
947            if new_is_terminal {
948                // Verify ownership if task was claimed (unless force)
949                if let Some(ref current_owner) = task.worker_id
950                    && current_owner != agent_id && !force {
951                        return Err(anyhow!("Task is not owned by this agent"));
952                    }
953
954                // Check for incomplete children (via 'contains' dependencies)
955                let incomplete_children: i32 = tx.query_row(
956                    "SELECT COUNT(*) FROM dependencies d
957                     INNER JOIN tasks child ON d.to_task_id = child.id
958                     WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
959                     AND child.status IN (SELECT value FROM json_each(?2))",
960                    params![
961                        task_id,
962                        serde_json::to_string(&states_config.blocking_states)?
963                    ],
964                    |row| row.get(0),
965                )?;
966
967                if incomplete_children > 0 {
968                    return Err(anyhow!(
969                        "Cannot complete task: {} child task(s) are not complete",
970                        incomplete_children
971                    ));
972                }
973
974                // Clear ownership
975                new_owner = None;
976                new_claimed_at = None;
977
978                // Release file locks associated with this task (for auto-cleanup)
979                tx.execute(
980                    "DELETE FROM file_locks WHERE task_id = ?1",
981                    params![task_id],
982                )?;
983            }
984
985            // Handle timestamps
986            let started_at =
987                if task.started_at.is_none() && new_is_timed {
988                    Some(now)
989                } else {
990                    task.started_at
991                };
992
993            // Set completed_at when entering completed status (even if it has reopen exits)
994            let completed_at = if new_status == "completed" {
995                Some(now)
996            } else {
997                task.completed_at
998            };
999
1000            // Record state transition if status changed (with reason for audit)
1001            let status_changed = task.status != new_status;
1002            if status_changed {
1003                record_state_transition(
1004                    &tx,
1005                    task_id,
1006                    &new_status,
1007                    new_owner.as_deref(),
1008                    reason.as_deref(),
1009                    states_config,
1010                )?;
1011            }
1012
1013            // Record phase transition if phase changed
1014            let phase_changed = task.phase != new_phase;
1015            if phase_changed {
1016                super::state_transitions::record_phase_transition(
1017                    &tx,
1018                    task_id,
1019                    new_phase.as_deref().unwrap_or(""),
1020                    Some(agent_id),
1021                    reason.as_deref(),
1022                )?;
1023            }
1024
1025            tx.execute(
1026                "UPDATE tasks SET
1027                    title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
1028                    points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
1029                    tags = ?10, worker_id = ?11, claimed_at = ?12,
1030                    needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
1031                WHERE id = ?16",
1032                params![
1033                    new_title,
1034                    new_description,
1035                    new_status,
1036                    new_phase,
1037                    new_priority.to_string(),
1038                    new_points,
1039                    started_at,
1040                    completed_at,
1041                    now,
1042                    serde_json::to_string(&new_tags)?,
1043                    new_owner,
1044                    new_claimed_at,
1045                    serde_json::to_string(&new_needed_tags)?,
1046                    serde_json::to_string(&new_wanted_tags)?,
1047                    new_time_estimate_ms,
1048                    task_id,
1049                ],
1050            )?;
1051
1052            // Sync tags to junction tables if changed
1053            if new_tags != task.tags {
1054                sync_task_tags(&tx, task_id, &new_tags)?;
1055            }
1056            if new_needed_tags != task.needed_tags {
1057                sync_needed_tags(&tx, task_id, &new_needed_tags)?;
1058            }
1059            if new_wanted_tags != task.wanted_tags {
1060                sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
1061            }
1062
1063            // Check for unblocked tasks if this task transitioned FROM blocking TO non-blocking
1064            let (unblocked, auto_advanced) = if status_changed {
1065                let was_blocking = states_config.is_blocking_state(&task.status);
1066                let is_blocking = states_config.is_blocking_state(&new_status);
1067
1068                if was_blocking && !is_blocking {
1069                    super::deps::propagate_unblock_effects(
1070                        &tx,
1071                        task_id,
1072                        Some(agent_id),
1073                        states_config,
1074                        deps_config,
1075                        auto_advance,
1076                    )?
1077                } else {
1078                    (vec![], vec![])
1079                }
1080            } else {
1081                (vec![], vec![])
1082            };
1083
1084            tx.commit()?;
1085
1086            Ok((Task {
1087                id: task_id.to_string(),
1088                title: new_title,
1089                description: new_description,
1090                status: new_status,
1091                phase: new_phase,
1092                priority: new_priority,
1093                points: new_points,
1094                tags: new_tags,
1095                needed_tags: new_needed_tags,
1096                wanted_tags: new_wanted_tags,
1097                time_estimate_ms: new_time_estimate_ms,
1098                started_at,
1099                completed_at,
1100                updated_at: now,
1101                worker_id: new_owner,
1102                claimed_at: new_claimed_at,
1103                ..task
1104            }, unblocked, auto_advanced))
1105        })
1106    }
1107
1108    /// Delete a task (soft delete by default, hard delete with obliterate=true).
1109    ///
1110    /// - `worker_id`: The worker attempting to delete (required for ownership check)
1111    /// - `cascade`: Whether to delete children (default: false)
1112    /// - `reason`: Optional reason for deletion
1113    /// - `obliterate`: If true, permanently deletes the task; if false (default), soft deletes
1114    /// - `force`: If true, allows deletion even if owned by another worker
1115    pub fn delete_task(
1116        &self,
1117        task_id: &str,
1118        worker_id: &str,
1119        cascade: bool,
1120        reason: Option<String>,
1121        obliterate: bool,
1122        force: bool,
1123    ) -> Result<()> {
1124        let now = now_ms();
1125
1126        self.with_conn_mut(|conn| {
1127            let tx = conn.transaction()?;
1128
1129            // Get the task to check ownership
1130            let task = get_task_internal(&tx, task_id)?
1131                .ok_or_else(|| anyhow!("Task not found"))?;
1132
1133            // Check ownership - reject if claimed by another worker (unless force)
1134            if let Some(ref owner) = task.worker_id
1135                && owner != worker_id && !force {
1136                    return Err(anyhow!(
1137                        "Task is claimed by worker '{}'. Use force=true to override.",
1138                        owner
1139                    ));
1140                }
1141
1142            if obliterate {
1143                // Hard delete - permanently remove from database
1144                if cascade {
1145                    // Find all descendants using recursive CTE and delete them
1146                    // The CTE finds all tasks reachable via 'contains' dependencies
1147                    tx.execute(
1148                        "WITH RECURSIVE descendants AS (
1149                            SELECT ?1 AS id
1150                            UNION ALL
1151                            SELECT dep.to_task_id FROM dependencies dep
1152                            INNER JOIN descendants d ON dep.from_task_id = d.id
1153                            WHERE dep.dep_type = 'contains'
1154                        )
1155                        DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1156                        params![task_id],
1157                    )?;
1158                } else {
1159                    // Check for children via dependencies
1160                    let child_count: i32 = tx.query_row(
1161                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1162                        params![task_id],
1163                        |row| row.get(0),
1164                    )?;
1165
1166                    if child_count > 0 {
1167                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1168                    }
1169
1170                    tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1171                }
1172            } else {
1173                // Soft delete - set deleted_at, deleted_by, deleted_reason
1174                if cascade {
1175                    // Soft delete all descendants
1176                    tx.execute(
1177                        "WITH RECURSIVE descendants AS (
1178                            SELECT ?1 AS id
1179                            UNION ALL
1180                            SELECT dep.to_task_id FROM dependencies dep
1181                            INNER JOIN descendants d ON dep.from_task_id = d.id
1182                            WHERE dep.dep_type = 'contains'
1183                        )
1184                        UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1185                        WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1186                        params![task_id, now, worker_id, reason],
1187                    )?;
1188                } else {
1189                    // Check for children via dependencies
1190                    let child_count: i32 = tx.query_row(
1191                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1192                        params![task_id],
1193                        |row| row.get(0),
1194                    )?;
1195
1196                    if child_count > 0 {
1197                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1198                    }
1199
1200                    tx.execute(
1201                        "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1202                        params![now, worker_id, reason, task_id],
1203                    )?;
1204                }
1205            }
1206
1207            tx.commit()?;
1208            Ok(())
1209        })
1210    }
1211
1212    /// List tasks with optional filters.
1213    /// Returns full Task objects. Excludes soft-deleted tasks.
1214    pub fn list_tasks(&self, query: ListTasksQuery<'_>) -> Result<Vec<Task>> {
1215        let ListTasksQuery {
1216            status,
1217            phase,
1218            owner,
1219            parent_id,
1220            limit,
1221            offset,
1222            sort_by,
1223            sort_order,
1224        } = query;
1225        self.with_conn(|conn| {
1226            let mut sql = String::from(
1227                "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1228            );
1229            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1230
1231            if let Some(s) = status {
1232                sql.push_str(" AND t.status = ?");
1233                params_vec.push(Box::new(s.to_string()));
1234            }
1235
1236            if let Some(p) = phase {
1237                sql.push_str(" AND t.phase = ?");
1238                params_vec.push(Box::new(p.to_string()));
1239            }
1240
1241            if let Some(o) = owner {
1242                sql.push_str(" AND t.worker_id = ?");
1243                params_vec.push(Box::new(o.to_string()));
1244            }
1245
1246            // Handle parent filtering via dependencies table
1247            if let Some(p) = parent_id {
1248                match p {
1249                    Some(pid) => {
1250                        sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1251                        params_vec.push(Box::new(pid.to_string()));
1252                    }
1253                    None => {
1254                        // Root tasks: not contained by any other task
1255                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1256                    }
1257                }
1258            }
1259
1260            // Build ORDER BY clause
1261            let order_clause = build_order_clause(sort_by, sort_order);
1262            sql.push_str(&format!(" ORDER BY {}", order_clause));
1263
1264            if let Some(l) = limit {
1265                sql.push_str(&format!(" LIMIT {}", l));
1266            }
1267
1268            if offset > 0 {
1269                sql.push_str(&format!(" OFFSET {}", offset));
1270            }
1271
1272            let params_refs: Vec<&dyn rusqlite::ToSql> =
1273                params_vec.iter().map(|b| b.as_ref()).collect();
1274
1275            let mut stmt = conn.prepare(&sql)?;
1276            let tasks = stmt
1277                .query_map(params_refs.as_slice(), parse_task_row)?
1278                .filter_map(|r| r.ok())
1279                .collect();
1280
1281            Ok(tasks)
1282        })
1283    }
1284
1285    /// Set the current thought for tasks owned by an agent.
1286    pub fn set_thought(
1287        &self,
1288        agent_id: &str,
1289        thought: Option<String>,
1290        task_ids: Option<Vec<String>>,
1291    ) -> Result<i32> {
1292        let now = now_ms();
1293
1294        self.with_conn(|conn| {
1295            let updated = if let Some(ids) = task_ids {
1296                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1297                let sql = format!(
1298                    "UPDATE tasks SET current_thought = ?, updated_at = ?
1299                     WHERE worker_id = ? AND id IN ({})",
1300                    placeholders.join(", ")
1301                );
1302
1303                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1304                params_vec.push(Box::new(thought.clone()));
1305                params_vec.push(Box::new(now));
1306                params_vec.push(Box::new(agent_id.to_string()));
1307                for id in &ids {
1308                    params_vec.push(Box::new(id.clone()));
1309                }
1310
1311                let params_refs: Vec<&dyn rusqlite::ToSql> =
1312                    params_vec.iter().map(|b| b.as_ref()).collect();
1313                conn.execute(&sql, params_refs.as_slice())?
1314            } else {
1315                conn.execute(
1316                    "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1317                    params![thought, now, agent_id],
1318                )?
1319            };
1320
1321            Ok(updated as i32)
1322        })
1323    }
1324
1325    /// Log time for a task.
1326    pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1327        let now = now_ms();
1328
1329        self.with_conn(|conn| {
1330            conn.execute(
1331                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1332                 WHERE id = ?3",
1333                params![duration_ms, now, task_id],
1334            )?;
1335
1336            let total: i64 = conn.query_row(
1337                "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1338                params![task_id],
1339                |row| row.get(0),
1340            )?;
1341
1342            Ok(total)
1343        })
1344    }
1345
1346    /// Log metrics and cost for a task.
1347    /// Values in the metrics array are aggregated (added) to existing values.
1348    pub fn log_metrics(
1349        &self,
1350        task_id: &str,
1351        cost_usd: Option<f64>,
1352        values: &[i64],
1353    ) -> Result<Task> {
1354        let now = now_ms();
1355
1356        self.with_conn(|conn| {
1357            let task =
1358                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1359
1360            // Aggregate metrics (add new values to existing)
1361            let mut new_metrics = task.metrics;
1362            for (i, &val) in values.iter().take(8).enumerate() {
1363                new_metrics[i] += val;
1364            }
1365
1366            let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1367
1368            conn.execute(
1369                "UPDATE tasks SET
1370                    metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1371                    metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1372                    cost_usd = ?9, updated_at = ?10
1373                WHERE id = ?11",
1374                params![
1375                    new_metrics[0],
1376                    new_metrics[1],
1377                    new_metrics[2],
1378                    new_metrics[3],
1379                    new_metrics[4],
1380                    new_metrics[5],
1381                    new_metrics[6],
1382                    new_metrics[7],
1383                    new_cost_usd,
1384                    now,
1385                    task_id,
1386                ],
1387            )?;
1388
1389            Ok(Task {
1390                cost_usd: new_cost_usd,
1391                metrics: new_metrics,
1392                updated_at: now,
1393                ..task
1394            })
1395        })
1396    }
1397
1398    /// Claim a task for an agent.
1399    /// Uses the first timed state (typically "working") as the claiming state.
1400    pub fn claim_task(
1401        &self,
1402        task_id: &str,
1403        agent_id: &str,
1404        states_config: &StatesConfig,
1405    ) -> Result<Task> {
1406        let now = now_ms();
1407
1408        // Find the first timed state to use for claiming (typically "working")
1409        let claim_status = states_config
1410            .definitions
1411            .iter()
1412            .find(|(_, def)| def.timed)
1413            .map(|(name, _)| name.as_str())
1414            .unwrap_or("working");
1415
1416        self.with_conn(|conn| {
1417            // Get the task (using internal helper to avoid deadlock)
1418            let task =
1419                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1420
1421            // Check if already claimed
1422            if task.worker_id.is_some() {
1423                return Err(anyhow!("Task is already claimed"));
1424            }
1425
1426            // Validate state transition
1427            if !states_config.is_valid_transition(&task.status, claim_status) {
1428                let exits = states_config.get_exits(&task.status);
1429                return Err(anyhow!(
1430                    "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1431                    task.status,
1432                    exits
1433                ));
1434            }
1435
1436            // Get the agent (using internal helper to avoid deadlock)
1437            let agent =
1438                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1439
1440            // Check tag affinity - needed_tags (AND - must have ALL)
1441            if !task.needed_tags.is_empty() {
1442                for needed in &task.needed_tags {
1443                    if !agent.tags.contains(needed) {
1444                        return Err(anyhow!("Agent missing required tag: {}", needed));
1445                    }
1446                }
1447            }
1448
1449            // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
1450            if !task.wanted_tags.is_empty() {
1451                let has_any = task
1452                    .wanted_tags
1453                    .iter()
1454                    .any(|wanted| agent.tags.contains(wanted));
1455                if !has_any {
1456                    return Err(anyhow!("Agent has none of the wanted tags"));
1457                }
1458            }
1459
1460            conn.execute(
1461                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1462                 WHERE id = ?6",
1463                params![agent_id, now, claim_status, now, now, task_id,],
1464            )?;
1465
1466            // Record state transition (accumulates time if coming from timed state)
1467            record_state_transition(
1468                conn,
1469                task_id,
1470                claim_status,
1471                Some(agent_id),
1472                None,
1473                states_config,
1474            )?;
1475
1476            // Refresh agent heartbeat
1477            conn.execute(
1478                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1479                params![now, agent_id],
1480            )?;
1481
1482            Ok(Task {
1483                worker_id: Some(agent_id.to_string()),
1484                claimed_at: Some(now),
1485                status: claim_status.to_string(),
1486                started_at: Some(now),
1487                updated_at: now,
1488                ..task
1489            })
1490        })
1491    }
1492
1493    /// Release a task claim.
1494    pub fn release_task(
1495        &self,
1496        task_id: &str,
1497        agent_id: &str,
1498        states_config: &StatesConfig,
1499    ) -> Result<()> {
1500        let now = now_ms();
1501        let release_status = &states_config.initial;
1502
1503        self.with_conn(|conn| {
1504            let task =
1505                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1506
1507            if task.worker_id.as_deref() != Some(agent_id) {
1508                return Err(anyhow!("Task is not owned by this agent"));
1509            }
1510
1511            // Record state transition (accumulates time if coming from timed state)
1512            record_state_transition(
1513                conn,
1514                task_id,
1515                release_status,
1516                Some(agent_id),
1517                None,
1518                states_config,
1519            )?;
1520
1521            conn.execute(
1522                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1523                 WHERE id = ?3",
1524                params![release_status, now, task_id],
1525            )?;
1526
1527            Ok(())
1528        })
1529    }
1530
1531    /// Force release a task regardless of owner.
1532    pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1533        let now = now_ms();
1534        let release_status = &states_config.initial;
1535
1536        self.with_conn(|conn| {
1537            let task =
1538                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1539
1540            // Record state transition (accumulates time if coming from timed state)
1541            record_state_transition(
1542                conn,
1543                task_id,
1544                release_status,
1545                task.worker_id.as_deref(),
1546                None,
1547                states_config,
1548            )?;
1549
1550            conn.execute(
1551                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1552                 WHERE id = ?3",
1553                params![release_status, now, task_id],
1554            )?;
1555
1556            Ok(())
1557        })
1558    }
1559
1560    /// Force claim a task even if owned by another agent.
1561    pub fn force_claim_task(
1562        &self,
1563        task_id: &str,
1564        agent_id: &str,
1565        states_config: &StatesConfig,
1566    ) -> Result<Task> {
1567        let now = now_ms();
1568
1569        // Find the first timed state to use for claiming (typically "working")
1570        let claim_status = states_config
1571            .definitions
1572            .iter()
1573            .find(|(_, def)| def.timed)
1574            .map(|(name, _)| name.as_str())
1575            .unwrap_or("working");
1576
1577        self.with_conn(|conn| {
1578            // Get the task
1579            let task =
1580                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1581
1582            // Get the agent
1583            let agent =
1584                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1585
1586            // Check tag affinity - needed_tags (AND)
1587            if !task.needed_tags.is_empty() {
1588                for needed in &task.needed_tags {
1589                    if !agent.tags.contains(needed) {
1590                        return Err(anyhow!("Agent missing required tag: {}", needed));
1591                    }
1592                }
1593            }
1594
1595            // Check tag affinity - wanted_tags (OR)
1596            if !task.wanted_tags.is_empty() {
1597                let has_any = task
1598                    .wanted_tags
1599                    .iter()
1600                    .any(|wanted| agent.tags.contains(wanted));
1601                if !has_any {
1602                    return Err(anyhow!("Agent has none of the wanted tags"));
1603                }
1604            }
1605
1606            conn.execute(
1607                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1608                 WHERE id = ?6",
1609                params![agent_id, now, claim_status, now, now, task_id,],
1610            )?;
1611
1612            // Record state transition (accumulates time if coming from timed state)
1613            record_state_transition(
1614                conn,
1615                task_id,
1616                claim_status,
1617                Some(agent_id),
1618                None,
1619                states_config,
1620            )?;
1621
1622            // Refresh agent heartbeat
1623            conn.execute(
1624                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1625                params![now, agent_id],
1626            )?;
1627
1628            Ok(Task {
1629                worker_id: Some(agent_id.to_string()),
1630                claimed_at: Some(now),
1631                status: claim_status.to_string(),
1632                started_at: task.started_at.or(Some(now)),
1633                updated_at: now,
1634                ..task
1635            })
1636        })
1637    }
1638
1639    /// Release a task claim with a specified state.
1640    pub fn release_task_with_state(
1641        &self,
1642        task_id: &str,
1643        agent_id: &str,
1644        state: &str,
1645        states_config: &StatesConfig,
1646    ) -> Result<()> {
1647        let now = now_ms();
1648
1649        self.with_conn(|conn| {
1650            let task =
1651                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1652
1653            if task.worker_id.as_deref() != Some(agent_id) {
1654                return Err(anyhow!("Task is not owned by this agent"));
1655            }
1656
1657            // Validate state exists
1658            if !states_config.is_valid_state(state) {
1659                return Err(anyhow!(
1660                    "Invalid state '{}'. Valid states: {:?}",
1661                    state,
1662                    states_config.state_names()
1663                ));
1664            }
1665
1666            // Validate transition
1667            if !states_config.is_valid_transition(&task.status, state) {
1668                let exits = states_config.get_exits(&task.status);
1669                return Err(anyhow!(
1670                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1671                    task.status,
1672                    state,
1673                    exits
1674                ));
1675            }
1676
1677            // Set completed_at when entering completed status (even if it has reopen exits)
1678            let completed_at = if state == "completed" {
1679                Some(now)
1680            } else {
1681                None
1682            };
1683
1684            // Record state transition (accumulates time if coming from timed state)
1685            record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1686
1687            conn.execute(
1688                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1689                 WHERE id = ?4",
1690                params![state, completed_at, now, task_id],
1691            )?;
1692
1693            Ok(())
1694        })
1695    }
1696
1697    /// Force release stale claims.
1698    pub fn force_release_stale(
1699        &self,
1700        timeout_seconds: i64,
1701        states_config: &StatesConfig,
1702    ) -> Result<i32> {
1703        let now = now_ms();
1704        let cutoff = now - (timeout_seconds * 1000);
1705        let release_status = &states_config.initial;
1706
1707        self.with_conn(|conn| {
1708            let updated = conn.execute(
1709                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1710                 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1711                params![release_status, now, cutoff],
1712            )?;
1713
1714            Ok(updated as i32)
1715        })
1716    }
1717
1718    /// Complete a task and release file locks held by the agent.
1719    /// Uses "completed" state by default, which should be a terminal state.
1720    /// Checks that all children (via 'contains' dependencies) are complete.
1721    pub fn complete_task(
1722        &self,
1723        task_id: &str,
1724        agent_id: &str,
1725        states_config: &StatesConfig,
1726    ) -> Result<Task> {
1727        let now = now_ms();
1728
1729        // Find a terminal state to use (prefer "completed" if it exists)
1730        let complete_status = if states_config.definitions.contains_key("completed") {
1731            "completed"
1732        } else {
1733            // Find any terminal state
1734            states_config
1735                .definitions
1736                .iter()
1737                .find(|(_, def)| def.exits.is_empty())
1738                .map(|(name, _)| name.as_str())
1739                .unwrap_or("completed")
1740        };
1741
1742        self.with_conn_mut(|conn| {
1743            let tx = conn.transaction()?;
1744
1745            // Get the task
1746            let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1747            let task = stmt
1748                .query_row(params![task_id], parse_task_row)
1749                .map_err(|_| anyhow!("Task not found"))?;
1750            drop(stmt);
1751
1752            // Verify ownership
1753            if task.worker_id.as_deref() != Some(agent_id) {
1754                return Err(anyhow!("Task is not owned by this agent"));
1755            }
1756
1757            // Check for incomplete children (blocking completion)
1758            let incomplete_children: i32 = tx.query_row(
1759                "SELECT COUNT(*) FROM dependencies d
1760                 INNER JOIN tasks child ON d.to_task_id = child.id
1761                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1762                 AND child.status IN (SELECT value FROM json_each(?2))",
1763                params![
1764                    task_id,
1765                    serde_json::to_string(&states_config.blocking_states)?
1766                ],
1767                |row| row.get(0),
1768            )?;
1769
1770            if incomplete_children > 0 {
1771                return Err(anyhow!(
1772                    "Cannot complete task: {} child task(s) are not complete",
1773                    incomplete_children
1774                ));
1775            }
1776
1777            // Validate transition
1778            if !states_config.is_valid_transition(&task.status, complete_status) {
1779                let exits = states_config.get_exits(&task.status);
1780                return Err(anyhow!(
1781                    "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1782                    task.status,
1783                    exits
1784                ));
1785            }
1786
1787            // Record state transition (accumulates time from timed state)
1788            record_state_transition(
1789                &tx,
1790                task_id,
1791                complete_status,
1792                Some(agent_id),
1793                None,
1794                states_config,
1795            )?;
1796
1797            // Update task to completed
1798            tx.execute(
1799                "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1800                 worker_id = NULL, claimed_at = NULL
1801                 WHERE id = ?4",
1802                params![complete_status, now, now, task_id],
1803            )?;
1804
1805            // Release file locks associated with this task (for auto-cleanup)
1806            tx.execute(
1807                "DELETE FROM file_locks WHERE task_id = ?1",
1808                params![task_id],
1809            )?;
1810
1811            // Refresh agent heartbeat
1812            tx.execute(
1813                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1814                params![now, agent_id],
1815            )?;
1816
1817            tx.commit()?;
1818
1819            Ok(Task {
1820                status: complete_status.to_string(),
1821                completed_at: Some(now),
1822                updated_at: now,
1823                worker_id: None,
1824                claimed_at: None,
1825                ..task
1826            })
1827        })
1828    }
1829
1830    /// Get all tasks. Excludes soft-deleted tasks.
1831    pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1832        self.with_conn(|conn| {
1833            let mut stmt =
1834                conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1835            let tasks = stmt
1836                .query_map([], parse_task_row)?
1837                .filter_map(|r| r.ok())
1838                .collect();
1839            Ok(tasks)
1840        })
1841    }
1842
1843    /// Get tasks by status.
1844    #[allow(dead_code)]
1845    pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1846        self.with_conn(|conn| {
1847            let mut stmt =
1848                conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1849            let tasks = stmt
1850                .query_map(params![status], parse_task_row)?
1851                .filter_map(|r| r.ok())
1852                .collect();
1853            Ok(tasks)
1854        })
1855    }
1856
1857    /// Get claimed tasks. Excludes soft-deleted tasks.
1858    pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1859        self.with_conn(|conn| {
1860            let tasks = if let Some(aid) = agent_id {
1861                let mut stmt = conn
1862                    .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1863                stmt.query_map(params![aid], parse_task_row)?
1864                    .filter_map(|r| r.ok())
1865                    .collect()
1866            } else {
1867                let mut stmt = conn.prepare(
1868                    "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1869                )?;
1870                stmt.query_map([], parse_task_row)?
1871                    .filter_map(|r| r.ok())
1872                    .collect()
1873            };
1874
1875            Ok(tasks)
1876        })
1877    }
1878}
1879
1880/// Helper function to create task tree recursively within a transaction.
1881/// Creates dependencies from parent to children using child_type.
1882/// Creates dependencies between siblings using sibling_type.
1883/// Supports referencing existing tasks via ref_id.
1884#[allow(clippy::too_many_arguments)]
1885fn create_tree_recursive(
1886    conn: &Connection,
1887    input: &TaskTreeInput,
1888    parent_id: Option<&str>,
1889    prev_sibling_id: Option<&str>,
1890    child_type: Option<&str>,
1891    sibling_type: Option<&str>,
1892    all_ids: &mut Vec<String>,
1893    phase_warnings: &mut Vec<String>,
1894    tag_warnings: &mut Vec<String>,
1895    states_config: &StatesConfig,
1896    phases_config: &PhasesConfig,
1897    tags_config: &TagsConfig,
1898    ids_config: &IdsConfig,
1899) -> Result<String> {
1900    // Check if this node references an existing task
1901    let task_id = if let Some(ref ref_id) = input.ref_id {
1902        // Verify the referenced task exists
1903        let exists: bool = conn.query_row(
1904            "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1905            params![ref_id],
1906            |row| row.get(0),
1907        )?;
1908        if !exists {
1909            return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1910        }
1911        ref_id.clone()
1912    } else {
1913        // Create a new task
1914        let task_id = input
1915            .id
1916            .clone()
1917            .unwrap_or_else(|| generate_task_id(ids_config));
1918        let now = now_ms();
1919        let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1920        let initial_status = &states_config.initial;
1921
1922        // Derive title: use explicit title, or derive from description, or empty
1923        let title = input.title.clone().unwrap_or_else(|| {
1924            input
1925                .description
1926                .as_deref()
1927                .map(|d| crate::format::truncate_title(d).into_owned())
1928                .unwrap_or_default()
1929        });
1930
1931        // Check phase validity
1932        if let Some(ref phase) = input.phase
1933            && let Some(warning) = phases_config.check_phase(phase)?
1934        {
1935            phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1936        }
1937
1938        let needed_tags = input.needed_tags.clone().unwrap_or_default();
1939        let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1940        let tags = input.tags.clone().unwrap_or_default();
1941
1942        // Check tag validity for all tag types
1943        for warning in tags_config.validate_tags(&tags)? {
1944            tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1945        }
1946        for warning in tags_config.validate_tags(&needed_tags)? {
1947            tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1948        }
1949        for warning in tags_config.validate_tags(&wanted_tags)? {
1950            tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1951        }
1952
1953        let needed_tags_json = serde_json::to_string(&needed_tags)?;
1954        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1955        let tags_json = serde_json::to_string(&tags)?;
1956
1957        conn.execute(
1958            "INSERT INTO tasks (
1959                id, title, description, status, phase, priority,
1960                needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1961            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1962            params![
1963                &task_id,
1964                &title,
1965                &input.description,
1966                initial_status,
1967                &input.phase,
1968                priority.to_string(),
1969                needed_tags_json,
1970                wanted_tags_json,
1971                tags_json,
1972                input.points,
1973                input.time_estimate_ms,
1974                now,
1975                now,
1976            ],
1977        )?;
1978
1979        // Record initial state transition
1980        record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1981
1982        // Sync tags to junction tables for indexed lookups
1983        sync_task_tags(conn, &task_id, &tags)?;
1984        sync_needed_tags(conn, &task_id, &needed_tags)?;
1985        sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1986
1987        task_id
1988    };
1989
1990    // Create dependency from parent if child_type is specified
1991    if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1992        Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1993    }
1994
1995    // Create dependency from previous sibling if sibling_type is specified
1996    if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1997        Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1998    }
1999
2000    all_ids.push(task_id.clone());
2001
2002    // Create children with dependencies based on child_type and sibling_type
2003    let mut prev_child_id: Option<String> = None;
2004    for child in input.children.iter() {
2005        let child_id = create_tree_recursive(
2006            conn,
2007            child,
2008            Some(&task_id),
2009            prev_child_id.as_deref(),
2010            child_type,
2011            sibling_type,
2012            all_ids,
2013            phase_warnings,
2014            tag_warnings,
2015            states_config,
2016            phases_config,
2017            tags_config,
2018            ids_config,
2019        )?;
2020        prev_child_id = Some(child_id);
2021    }
2022
2023    Ok(task_id)
2024}