Skip to main content

task_graph_mcp/db/
tasks.rs

1//! Task CRUD and tree operations.
2
3use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6    AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10    PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11    parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17/// Options for creating a task tree from nested input.
18#[derive(Debug)]
19pub struct CreateTreeOptions<'a> {
20    pub input: TaskTreeInput,
21    pub parent_id: Option<String>,
22    pub child_type: Option<String>,
23    pub sibling_type: Option<String>,
24    pub states_config: &'a StatesConfig,
25    pub phases_config: &'a PhasesConfig,
26    pub tags_config: &'a TagsConfig,
27    pub ids_config: &'a IdsConfig,
28}
29
30/// Query parameters for listing tasks with optional filters.
31#[derive(Debug, Default)]
32pub struct ListTasksQuery<'a> {
33    pub status: Option<&'a str>,
34    pub phase: Option<&'a str>,
35    pub owner: Option<&'a str>,
36    pub parent_id: Option<Option<&'a str>>,
37    pub limit: Option<i32>,
38    pub offset: i32,
39    pub sort_by: Option<&'a str>,
40    pub sort_order: Option<&'a str>,
41}
42
43/// Generate a petname-based task ID using the large wordlist.
44/// Uses the configured number of words and case style.
45fn generate_task_id(ids_config: &IdsConfig) -> String {
46    let words = ids_config.task_id_words;
47    let case = ids_config.id_case;
48
49    // Generate with hyphen separator first (petname's default format)
50    let base = Petnames::medium()
51        .generate_one(words, "-")
52        .unwrap_or_else(|| format!("task-{}", super::now_ms()));
53
54    // Convert to desired case
55    case.convert(&base)
56}
57
58/// Build an ORDER BY clause from sort_by and sort_order parameters.
59/// Returns a safe SQL ORDER BY expression.
60fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
61    let field = match sort_by {
62        Some("priority") => "CAST(t.priority AS INTEGER)",
63        Some("created_at") => "t.created_at",
64        Some("updated_at") => "t.updated_at",
65        _ => "t.created_at", // default
66    };
67
68    let order = match sort_order {
69        Some("asc") => "ASC",
70        Some("desc") => "DESC",
71        _ => {
72            // Default: priority is descending (higher number = more important), dates are descending
73            "DESC"
74        }
75    };
76
77    format!("{} {}", field, order)
78}
79
80// =============================================================================
81// Junction table helpers for tag management
82// =============================================================================
83
84/// Sync task tags to the task_tags junction table.
85/// Replaces all existing tags for the task.
86fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
87    conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
88    for tag in tags {
89        conn.execute(
90            "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
91            params![task_id, tag],
92        )?;
93    }
94    Ok(())
95}
96
97/// Sync needed tags (agent must have ALL) to the task_needed_tags junction table.
98fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
99    conn.execute(
100        "DELETE FROM task_needed_tags WHERE task_id = ?1",
101        params![task_id],
102    )?;
103    for tag in tags {
104        conn.execute(
105            "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
106            params![task_id, tag],
107        )?;
108    }
109    Ok(())
110}
111
112/// Sync wanted tags (agent must have ANY) to the task_wanted_tags junction table.
113fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
114    conn.execute(
115        "DELETE FROM task_wanted_tags WHERE task_id = ?1",
116        params![task_id],
117    )?;
118    for tag in tags {
119        conn.execute(
120            "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
121            params![task_id, tag],
122        )?;
123    }
124    Ok(())
125}
126
127pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
128    let id: String = row.get("id")?;
129    let title: String = row.get("title")?;
130    let description: Option<String> = row.get("description")?;
131    let status: String = row.get("status")?;
132    let phase: Option<String> = row.get("phase")?;
133    let priority: String = row.get("priority")?;
134    let worker_id: Option<String> = row.get("worker_id")?;
135    let claimed_at: Option<i64> = row.get("claimed_at")?;
136
137    let needed_tags_json: Option<String> = row.get("needed_tags")?;
138    let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
139    let tags_json: Option<String> = row.get("tags")?;
140
141    let points: Option<i32> = row.get("points")?;
142    let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
143    let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
144    let started_at: Option<i64> = row.get("started_at")?;
145    let completed_at: Option<i64> = row.get("completed_at")?;
146
147    let current_thought: Option<String> = row.get("current_thought")?;
148
149    let cost_usd: f64 = row.get("cost_usd")?;
150    let metric_0: i64 = row.get("metric_0")?;
151    let metric_1: i64 = row.get("metric_1")?;
152    let metric_2: i64 = row.get("metric_2")?;
153    let metric_3: i64 = row.get("metric_3")?;
154    let metric_4: i64 = row.get("metric_4")?;
155    let metric_5: i64 = row.get("metric_5")?;
156    let metric_6: i64 = row.get("metric_6")?;
157    let metric_7: i64 = row.get("metric_7")?;
158
159    let created_at: i64 = row.get("created_at")?;
160    let updated_at: i64 = row.get("updated_at")?;
161
162    Ok(Task {
163        id,
164        title,
165        description,
166        status,
167        phase,
168        priority: parse_priority(&priority),
169        worker_id,
170        claimed_at,
171        needed_tags: needed_tags_json
172            .map(|s| serde_json::from_str(&s).unwrap_or_default())
173            .unwrap_or_default(),
174        wanted_tags: wanted_tags_json
175            .map(|s| serde_json::from_str(&s).unwrap_or_default())
176            .unwrap_or_default(),
177        tags: tags_json
178            .map(|s| serde_json::from_str(&s).unwrap_or_default())
179            .unwrap_or_default(),
180        points,
181        time_estimate_ms,
182        time_actual_ms,
183        started_at,
184        completed_at,
185        current_thought,
186        cost_usd,
187        metrics: [
188            metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
189        ],
190        created_at,
191        updated_at,
192    })
193}
194
195/// Internal helper to get a task using an existing connection (avoids deadlock).
196fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
197    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
198
199    let result = stmt.query_row(params![task_id], parse_task_row);
200
201    match result {
202        Ok(task) => Ok(Some(task)),
203        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
204        Err(e) => Err(e.into()),
205    }
206}
207
208/// Internal helper to get a worker using an existing connection (avoids deadlock).
209fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
210    let mut stmt = conn.prepare(
211        "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow
212         FROM workers WHERE id = ?1",
213    )?;
214
215    let result = stmt.query_row(params![worker_id], |row| {
216        let id: String = row.get(0)?;
217        let tags_json: String = row.get(1)?;
218        let max_claims: i32 = row.get(2)?;
219        let registered_at: i64 = row.get(3)?;
220        let last_heartbeat: i64 = row.get(4)?;
221        let last_status: Option<String> = row.get(5)?;
222        let last_phase: Option<String> = row.get(6)?;
223        let workflow: Option<String> = row.get(7)?;
224
225        Ok((
226            id,
227            tags_json,
228            max_claims,
229            registered_at,
230            last_heartbeat,
231            last_status,
232            last_phase,
233            workflow,
234        ))
235    });
236
237    match result {
238        Ok((
239            id,
240            tags_json,
241            max_claims,
242            registered_at,
243            last_heartbeat,
244            last_status,
245            last_phase,
246            workflow,
247        )) => {
248            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
249            Ok(Some(Worker {
250                id,
251                tags,
252                max_claims,
253                registered_at,
254                last_heartbeat,
255                last_status,
256                last_phase,
257                workflow,
258            }))
259        }
260        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
261        Err(e) => Err(e.into()),
262    }
263}
264
265impl Database {
266    /// Create a new task.
267    /// If id is provided, uses it as the task ID; otherwise generates a petname ID.
268    /// If parent_id is provided, creates a 'contains' dependency from parent to this task.
269    /// `title` is the short task title; `description` is optional longer detail.
270    #[allow(clippy::too_many_arguments)]
271    pub fn create_task(
272        &self,
273        id: Option<String>,
274        title: String,
275        description: Option<String>,
276        parent_id: Option<String>,
277        phase: Option<String>,
278        priority: Option<Priority>,
279        points: Option<i32>,
280        time_estimate_ms: Option<i64>,
281        agent_tags_all: Option<Vec<String>>,
282        agent_tags_any: Option<Vec<String>>,
283        tags: Option<Vec<String>>,
284        states_config: &StatesConfig,
285        ids_config: &IdsConfig,
286    ) -> Result<Task> {
287        let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
288        let now = now_ms();
289        let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
290        let initial_status = &states_config.initial;
291
292        let needed_tags = agent_tags_all.unwrap_or_default();
293        let wanted_tags = agent_tags_any.unwrap_or_default();
294        let tags = tags.unwrap_or_default();
295        let needed_tags_json = serde_json::to_string(&needed_tags)?;
296        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
297        let tags_json = serde_json::to_string(&tags)?;
298
299        self.with_conn_mut(|conn| {
300            let tx = conn.transaction()?;
301
302            tx.execute(
303                "INSERT INTO tasks (
304                    id, title, description, status, phase, priority,
305                    needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
306                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
307                params![
308                    &task_id,
309                    &title,
310                    &description,
311                    initial_status,
312                    &phase,
313                    priority.to_string(),
314                    needed_tags_json,
315                    wanted_tags_json,
316                    tags_json,
317                    points,
318                    time_estimate_ms,
319                    now,
320                    now,
321                ],
322            )?;
323
324            // Sync tags to junction tables
325            sync_task_tags(&tx, &task_id, &tags)?;
326            sync_needed_tags(&tx, &task_id, &needed_tags)?;
327            sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
328
329            // Create 'contains' dependency if parent_id is provided
330            if let Some(ref pid) = parent_id {
331                Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
332            }
333
334            // Record initial state
335            record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
336
337            tx.commit()?;
338
339            Ok(Task {
340                id: task_id,
341                title,
342                description,
343                status: initial_status.clone(),
344                phase,
345                priority,
346                worker_id: None,
347                claimed_at: None,
348                needed_tags,
349                wanted_tags,
350                tags,
351                points,
352                time_estimate_ms,
353                time_actual_ms: None,
354                started_at: None,
355                completed_at: None,
356                current_thought: None,
357                cost_usd: 0.0,
358                metrics: [0; 8],
359                created_at: now,
360                updated_at: now,
361            })
362        })
363    }
364
365    /// Convenience method to create a task with just a description string.
366    /// Uses the description as both title and description. Useful for tests.
367    pub fn create_task_simple(
368        &self,
369        description: impl Into<String>,
370        states_config: &StatesConfig,
371        ids_config: &IdsConfig,
372    ) -> Result<Task> {
373        let desc = description.into();
374        self.create_task(
375            None,
376            desc.clone(),
377            Some(desc),
378            None,
379            None,
380            None,
381            None,
382            None,
383            None,
384            None,
385            None,
386            states_config,
387            ids_config,
388        )
389    }
390
391    /// Create a task tree from nested input.
392    /// Uses child_type for parent-child dependencies (default: "contains").
393    /// Uses sibling_type for sibling dependencies (default: none/parallel).
394    #[allow(clippy::type_complexity)]
395    pub fn create_task_tree(
396        &self,
397        opts: CreateTreeOptions<'_>,
398    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
399        let mut all_ids = Vec::new();
400        let mut phase_warnings = Vec::new();
401        let mut tag_warnings = Vec::new();
402        // Default child_type to "contains" if not specified
403        let child_type = opts.child_type.or_else(|| Some("contains".to_string()));
404
405        self.with_conn_mut(|conn| {
406            let tx = conn.transaction()?;
407            let root_id = create_tree_recursive(
408                &tx,
409                &opts.input,
410                opts.parent_id.as_deref(),
411                None, // no previous sibling for root
412                child_type.as_deref(),
413                opts.sibling_type.as_deref(),
414                &mut all_ids,
415                &mut phase_warnings,
416                &mut tag_warnings,
417                opts.states_config,
418                opts.phases_config,
419                opts.tags_config,
420                opts.ids_config,
421            )?;
422            tx.commit()?;
423            Ok((root_id, all_ids, phase_warnings, tag_warnings))
424        })
425    }
426
427    /// Get a task by ID.
428    pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
429        self.with_conn(|conn| {
430            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
431
432            let result = stmt.query_row(params![task_id], parse_task_row);
433
434            match result {
435                Ok(task) => Ok(Some(task)),
436                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
437                Err(e) => Err(e.into()),
438            }
439        })
440    }
441
442    /// Rename a task's ID, updating all references atomically.
443    ///
444    /// Disables foreign key enforcement, updates every table that references
445    /// `tasks.id` inside a transaction, then re-enables and verifies FK
446    /// integrity.
447    pub fn rename_task(&self, old_id: &str, new_id: &str) -> Result<()> {
448        // Validate new_id
449        if new_id.is_empty() {
450            return Err(anyhow!("new_id must not be empty"));
451        }
452        if new_id.len() > 64 {
453            return Err(anyhow!("new_id must not exceed 64 characters"));
454        }
455
456        self.with_conn_mut(|conn| {
457            // Pre-check: old_id must exist
458            let exists: bool = conn.query_row(
459                "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
460                params![old_id],
461                |row| row.get(0),
462            )?;
463            if !exists {
464                return Err(anyhow!("Task '{}' not found", old_id));
465            }
466
467            // Pre-check: new_id must not already exist
468            let conflict: bool = conn.query_row(
469                "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
470                params![new_id],
471                |row| row.get(0),
472            )?;
473            if conflict {
474                return Err(anyhow!("Task '{}' already exists", new_id));
475            }
476
477            // Disable FK enforcement for the duration of the update
478            conn.execute_batch("PRAGMA foreign_keys = OFF")?;
479
480            let result = (|| -> Result<()> {
481                let tx = conn.transaction()?;
482
483                // Primary table — triggers tasks_fts_update
484                tx.execute(
485                    "UPDATE tasks SET id = ?1 WHERE id = ?2",
486                    params![new_id, old_id],
487                )?;
488
489                // Attachments — triggers attachments_fts_update per row
490                tx.execute(
491                    "UPDATE attachments SET task_id = ?1 WHERE task_id = ?2",
492                    params![new_id, old_id],
493                )?;
494
495                // Dependencies (both columns)
496                tx.execute(
497                    "UPDATE dependencies SET from_task_id = ?1 WHERE from_task_id = ?2",
498                    params![new_id, old_id],
499                )?;
500                tx.execute(
501                    "UPDATE dependencies SET to_task_id = ?1 WHERE to_task_id = ?2",
502                    params![new_id, old_id],
503                )?;
504
505                // File locks
506                tx.execute(
507                    "UPDATE file_locks SET task_id = ?1 WHERE task_id = ?2",
508                    params![new_id, old_id],
509                )?;
510
511                // Tag junction tables
512                tx.execute(
513                    "UPDATE task_tags SET task_id = ?1 WHERE task_id = ?2",
514                    params![new_id, old_id],
515                )?;
516                tx.execute(
517                    "UPDATE task_needed_tags SET task_id = ?1 WHERE task_id = ?2",
518                    params![new_id, old_id],
519                )?;
520                tx.execute(
521                    "UPDATE task_wanted_tags SET task_id = ?1 WHERE task_id = ?2",
522                    params![new_id, old_id],
523                )?;
524
525                // Sequence table
526                tx.execute(
527                    "UPDATE task_sequence SET task_id = ?1 WHERE task_id = ?2",
528                    params![new_id, old_id],
529                )?;
530
531                tx.commit()?;
532                Ok(())
533            })();
534
535            // Re-enable FK enforcement regardless of success
536            conn.execute_batch("PRAGMA foreign_keys = ON")?;
537
538            // Propagate any error from the transaction
539            result?;
540
541            // Verify FK integrity
542            let mut stmt = conn.prepare("PRAGMA foreign_key_check")?;
543            let violations: Vec<String> = stmt
544                .query_map([], |row| {
545                    let table: String = row.get(0)?;
546                    Ok(table)
547                })?
548                .filter_map(|r| r.ok())
549                .collect();
550
551            if !violations.is_empty() {
552                return Err(anyhow!(
553                    "Foreign key violations after rename in tables: {:?}",
554                    violations
555                ));
556            }
557
558            Ok(())
559        })
560    }
561
562    /// Get a task with all its children (tree).
563    pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
564        let task = self.get_task(task_id)?;
565        match task {
566            None => Ok(None),
567            Some(task) => {
568                let children = self.get_children_recursive(&task.id)?;
569                Ok(Some(TaskTree { task, children }))
570            }
571        }
572    }
573
574    /// Get children recursively.
575    fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
576        let children = self.get_children(parent_id)?;
577        let mut result = Vec::new();
578
579        for child in children {
580            let child_children = self.get_children_recursive(&child.id)?;
581            result.push(TaskTree {
582                task: child,
583                children: child_children,
584            });
585        }
586
587        Ok(result)
588    }
589
590    /// Get direct children of a task (via 'contains' dependency).
591    pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
592        self.with_conn(|conn| {
593            let mut stmt = conn.prepare(
594                "SELECT t.* FROM tasks t
595                 INNER JOIN dependencies d ON t.id = d.to_task_id
596                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
597                 ORDER BY t.created_at",
598            )?;
599
600            let tasks = stmt
601                .query_map(params![parent_id], parse_task_row)?
602                .filter_map(|r| r.ok())
603                .collect();
604
605            Ok(tasks)
606        })
607    }
608
609    /// Update a task.
610    #[allow(clippy::too_many_arguments)]
611    pub fn update_task(
612        &self,
613        task_id: &str,
614        title: Option<String>,
615        description: Option<Option<String>>,
616        status: Option<String>,
617        priority: Option<Priority>,
618        points: Option<Option<i32>>,
619        tags: Option<Vec<String>>,
620        states_config: &StatesConfig,
621    ) -> Result<Task> {
622        let now = now_ms();
623
624        self.with_conn(|conn| {
625            let task =
626                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
627
628            let new_title = title.unwrap_or(task.title.clone());
629            let new_description = description.unwrap_or(task.description.clone());
630            let new_status = status.unwrap_or(task.status.clone());
631            let new_priority = priority.unwrap_or(task.priority);
632            let new_points = points.unwrap_or(task.points);
633            let new_tags = tags.unwrap_or(task.tags.clone());
634
635            // Validate the new status exists
636            if !states_config.is_valid_state(&new_status) {
637                return Err(anyhow!(
638                    "Invalid state '{}'. Valid states: {:?}",
639                    new_status,
640                    states_config.state_names()
641                ));
642            }
643
644            // Validate state transition if status changed
645            if task.status != new_status
646                && !states_config.is_valid_transition(&task.status, &new_status)
647            {
648                let exits = states_config.get_exits(&task.status);
649                return Err(anyhow!(
650                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
651                    task.status,
652                    new_status,
653                    exits
654                ));
655            }
656
657            // Handle status transitions for timestamps
658            // Set started_at when first entering a timed state
659            let started_at =
660                if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
661                    Some(now)
662                } else {
663                    task.started_at
664                };
665
666            // Set completed_at when entering a completed state (terminal or with reopen capability)
667            let completed_at = if new_status == "completed" {
668                Some(now)
669            } else {
670                task.completed_at
671            };
672
673            // Record state transition if status changed (handles time accumulation)
674            if task.status != new_status {
675                record_state_transition(
676                    conn,
677                    task_id,
678                    &new_status,
679                    task.worker_id.as_deref(),
680                    None,
681                    states_config,
682                )?;
683            }
684
685            conn.execute(
686                "UPDATE tasks SET
687                    title = ?1, description = ?2, status = ?3, priority = ?4,
688                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
689                    tags = ?9
690                WHERE id = ?10",
691                params![
692                    new_title,
693                    new_description,
694                    new_status,
695                    new_priority.to_string(),
696                    new_points,
697                    started_at,
698                    completed_at,
699                    now,
700                    serde_json::to_string(&new_tags)?,
701                    task_id,
702                ],
703            )?;
704
705            Ok(Task {
706                id: task_id.to_string(),
707                title: new_title,
708                description: new_description,
709                status: new_status,
710                priority: new_priority,
711                points: new_points,
712                tags: new_tags,
713                started_at,
714                completed_at,
715                updated_at: now,
716                ..task
717            })
718        })
719    }
720
721    /// Update a task with unified claim/release logic.
722    /// - Transition to timed state = CLAIM (set owner, validate tags, check limit)
723    /// - Transition from timed to non-timed = RELEASE (clear owner)
724    /// - Transition to terminal = COMPLETE (check children, release file locks)
725    /// - With assignee = ASSIGN (set owner to assignee, transition to 'assigned' state)
726    /// - Only the owner can update a claimed task (unless force=true)
727    ///
728    /// Returns (task, unblocked, auto_advanced):
729    /// - task: The updated task
730    /// - unblocked: Task IDs that are now ready (all dependencies satisfied)
731    /// - auto_advanced: Subset of unblocked that were actually transitioned
732    #[allow(clippy::too_many_arguments)]
733    pub fn update_task_unified(
734        &self,
735        task_id: &str,
736        agent_id: &str,
737        assignee: Option<&str>,
738        title: Option<String>,
739        description: Option<Option<String>>,
740        status: Option<String>,
741        phase: Option<String>,
742        priority: Option<Priority>,
743        points: Option<Option<i32>>,
744        tags: Option<Vec<String>>,
745        needed_tags: Option<Vec<String>>,
746        wanted_tags: Option<Vec<String>>,
747        time_estimate_ms: Option<i64>,
748        reason: Option<String>,
749        force: bool,
750        states_config: &StatesConfig,
751        deps_config: &DependenciesConfig,
752        auto_advance: &AutoAdvanceConfig,
753    ) -> Result<(Task, Vec<String>, Vec<String>)> {
754        let now = now_ms();
755
756        self.with_conn_mut(|conn| {
757            let tx = conn.transaction()?;
758
759            let task =
760                get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
761
762            // Owner-only validation: if task is claimed, only owner can update (unless force)
763            if let Some(ref current_owner) = task.worker_id
764                && current_owner != agent_id && !force {
765                    return Err(anyhow!(
766                        "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
767                        current_owner
768                    ));
769                }
770
771            let new_title = title.unwrap_or(task.title.clone());
772            let new_description = description.unwrap_or(task.description.clone());
773            // If assignee is set but no explicit status, default to 'assigned' state
774            let new_status = if assignee.is_some() && status.is_none() {
775                "assigned".to_string()
776            } else {
777                status.unwrap_or(task.status.clone())
778            };
779            let new_priority = priority.unwrap_or(task.priority);
780            let new_points = points.unwrap_or(task.points);
781            let new_tags = tags.unwrap_or(task.tags.clone());
782            let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
783            let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
784            let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
785            let new_phase = phase.or(task.phase.clone());
786
787            // Validate the new status exists
788            if !states_config.is_valid_state(&new_status) {
789                return Err(anyhow!(
790                    "Invalid state '{}'. Valid states: {:?}",
791                    new_status,
792                    states_config.state_names()
793                ));
794            }
795
796            // Validate state transition if status changed
797            if task.status != new_status
798                && !states_config.is_valid_transition(&task.status, &new_status) {
799                    let exits = states_config.get_exits(&task.status);
800                    return Err(anyhow!(
801                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
802                        task.status,
803                        new_status,
804                        exits
805                    ));
806                }
807
808            // Determine ownership changes based on state transition
809            let new_is_timed = states_config.is_timed_state(&new_status);
810            let new_is_terminal = states_config.is_terminal_state(&new_status);
811            let current_owner = task.worker_id.as_deref();
812            let is_owned_by_agent = current_owner == Some(agent_id);
813            let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
814
815            let mut new_owner: Option<String> = task.worker_id.clone();
816            let mut new_claimed_at: Option<i64> = task.claimed_at;
817
818            // ASSIGN: Push coordination - coordinator assigns task to another agent
819            // Sets owner without starting the timer (assigned state is untimed)
820            if let Some(target_agent) = assignee {
821                // Verify task is not already claimed (unless force)
822                if is_owned_by_other && !force {
823                    return Err(anyhow!(
824                        "Task is already claimed by agent '{}'. Use force=true to reassign.",
825                        current_owner.unwrap()
826                    ));
827                }
828
829                // Verify the assignee exists
830                let target = get_worker_internal(&tx, target_agent)?
831                    .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
832
833                // Check tag affinity for the assignee
834                if !task.needed_tags.is_empty() {
835                    for needed in &task.needed_tags {
836                        if !target.tags.contains(needed) {
837                            return Err(anyhow!(
838                                "Assignee '{}' missing required tag: {}",
839                                target_agent,
840                                needed
841                            ));
842                        }
843                    }
844                }
845
846                if !task.wanted_tags.is_empty() {
847                    let has_any = task
848                        .wanted_tags
849                        .iter()
850                        .any(|wanted| target.tags.contains(wanted));
851                    if !has_any {
852                        return Err(anyhow!(
853                            "Assignee '{}' has none of the wanted tags: {:?}",
854                            target_agent,
855                            task.wanted_tags
856                        ));
857                    }
858                }
859
860                // Set ownership to the assignee
861                new_owner = Some(target_agent.to_string());
862                new_claimed_at = Some(now);
863            }
864
865            // CLAIM: Transitioning to a timed state and need to take ownership
866            // This handles: non-timed -> timed, OR timed (other owner) -> timed (force claim)
867            if new_is_timed && !is_owned_by_agent {
868                // Already claimed by someone else?
869                if is_owned_by_other && !force {
870                    return Err(anyhow!(
871                        "Task is already claimed by agent '{}'",
872                        current_owner.unwrap()
873                    ));
874                }
875
876                // Check for unsatisfied blocking dependencies (skip if force)
877                if !force {
878                    let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
879                        &tx,
880                        task_id,
881                        states_config,
882                        deps_config,
883                    )?;
884                    if !unsatisfied_blockers.is_empty() {
885                        // Return structured error with blocking task IDs so clients
886                        // can monitor them and retry when they complete
887                        return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
888                    }
889                }
890
891                // Get the agent
892                let agent = get_worker_internal(&tx, agent_id)?
893                    .ok_or_else(|| anyhow!("Agent not found"))?;
894
895                // Check tag affinity - needed_tags (AND - must have ALL)
896                if !task.needed_tags.is_empty() {
897                    for needed in &task.needed_tags {
898                        if !agent.tags.contains(needed) {
899                            return Err(anyhow!("Agent missing required tag: {}", needed));
900                        }
901                    }
902                }
903
904                // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
905                if !task.wanted_tags.is_empty() {
906                    let has_any = task
907                        .wanted_tags
908                        .iter()
909                        .any(|wanted| agent.tags.contains(wanted));
910                    if !has_any {
911                        return Err(anyhow!("Agent has none of the wanted tags"));
912                    }
913                }
914
915                // Set ownership
916                new_owner = Some(agent_id.to_string());
917                new_claimed_at = Some(now);
918
919                // Refresh agent heartbeat
920                tx.execute(
921                    "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
922                    params![now, agent_id],
923                )?;
924            }
925
926            // RELEASE: Transitioning to non-timed state (but not terminal)
927            if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
928                // Verify ownership (unless force)
929                if is_owned_by_other && !force {
930                    return Err(anyhow!("Task is not owned by this agent"));
931                }
932
933                // Clear ownership
934                new_owner = None;
935                new_claimed_at = None;
936            }
937
938            // COMPLETE: Transition to terminal state
939            if new_is_terminal {
940                // Verify ownership if task was claimed (unless force)
941                if let Some(ref current_owner) = task.worker_id
942                    && current_owner != agent_id && !force {
943                        return Err(anyhow!("Task is not owned by this agent"));
944                    }
945
946                // Check for incomplete children (via 'contains' dependencies)
947                let incomplete_children: i32 = tx.query_row(
948                    "SELECT COUNT(*) FROM dependencies d
949                     INNER JOIN tasks child ON d.to_task_id = child.id
950                     WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
951                     AND child.status IN (SELECT value FROM json_each(?2))",
952                    params![
953                        task_id,
954                        serde_json::to_string(&states_config.blocking_states)?
955                    ],
956                    |row| row.get(0),
957                )?;
958
959                if incomplete_children > 0 {
960                    return Err(anyhow!(
961                        "Cannot complete task: {} child task(s) are not complete",
962                        incomplete_children
963                    ));
964                }
965
966                // Clear ownership
967                new_owner = None;
968                new_claimed_at = None;
969
970                // Release file locks associated with this task (for auto-cleanup)
971                tx.execute(
972                    "DELETE FROM file_locks WHERE task_id = ?1",
973                    params![task_id],
974                )?;
975            }
976
977            // Handle timestamps
978            let started_at =
979                if task.started_at.is_none() && new_is_timed {
980                    Some(now)
981                } else {
982                    task.started_at
983                };
984
985            // Set completed_at when entering completed status (even if it has reopen exits)
986            let completed_at = if new_status == "completed" {
987                Some(now)
988            } else {
989                task.completed_at
990            };
991
992            // Record state transition if status changed (with reason for audit)
993            let status_changed = task.status != new_status;
994            if status_changed {
995                record_state_transition(
996                    &tx,
997                    task_id,
998                    &new_status,
999                    new_owner.as_deref(),
1000                    reason.as_deref(),
1001                    states_config,
1002                )?;
1003            }
1004
1005            // Record phase transition if phase changed
1006            let phase_changed = task.phase != new_phase;
1007            if phase_changed {
1008                super::state_transitions::record_phase_transition(
1009                    &tx,
1010                    task_id,
1011                    new_phase.as_deref().unwrap_or(""),
1012                    Some(agent_id),
1013                    reason.as_deref(),
1014                )?;
1015            }
1016
1017            tx.execute(
1018                "UPDATE tasks SET
1019                    title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
1020                    points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
1021                    tags = ?10, worker_id = ?11, claimed_at = ?12,
1022                    needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
1023                WHERE id = ?16",
1024                params![
1025                    new_title,
1026                    new_description,
1027                    new_status,
1028                    new_phase,
1029                    new_priority.to_string(),
1030                    new_points,
1031                    started_at,
1032                    completed_at,
1033                    now,
1034                    serde_json::to_string(&new_tags)?,
1035                    new_owner,
1036                    new_claimed_at,
1037                    serde_json::to_string(&new_needed_tags)?,
1038                    serde_json::to_string(&new_wanted_tags)?,
1039                    new_time_estimate_ms,
1040                    task_id,
1041                ],
1042            )?;
1043
1044            // Sync tags to junction tables if changed
1045            if new_tags != task.tags {
1046                sync_task_tags(&tx, task_id, &new_tags)?;
1047            }
1048            if new_needed_tags != task.needed_tags {
1049                sync_needed_tags(&tx, task_id, &new_needed_tags)?;
1050            }
1051            if new_wanted_tags != task.wanted_tags {
1052                sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
1053            }
1054
1055            // Check for unblocked tasks if this task transitioned FROM blocking TO non-blocking
1056            let (unblocked, auto_advanced) = if status_changed {
1057                let was_blocking = states_config.is_blocking_state(&task.status);
1058                let is_blocking = states_config.is_blocking_state(&new_status);
1059
1060                if was_blocking && !is_blocking {
1061                    super::deps::propagate_unblock_effects(
1062                        &tx,
1063                        task_id,
1064                        Some(agent_id),
1065                        states_config,
1066                        deps_config,
1067                        auto_advance,
1068                    )?
1069                } else {
1070                    (vec![], vec![])
1071                }
1072            } else {
1073                (vec![], vec![])
1074            };
1075
1076            tx.commit()?;
1077
1078            Ok((Task {
1079                id: task_id.to_string(),
1080                title: new_title,
1081                description: new_description,
1082                status: new_status,
1083                phase: new_phase,
1084                priority: new_priority,
1085                points: new_points,
1086                tags: new_tags,
1087                needed_tags: new_needed_tags,
1088                wanted_tags: new_wanted_tags,
1089                time_estimate_ms: new_time_estimate_ms,
1090                started_at,
1091                completed_at,
1092                updated_at: now,
1093                worker_id: new_owner,
1094                claimed_at: new_claimed_at,
1095                ..task
1096            }, unblocked, auto_advanced))
1097        })
1098    }
1099
1100    /// Delete a task (soft delete by default, hard delete with obliterate=true).
1101    ///
1102    /// - `worker_id`: The worker attempting to delete (required for ownership check)
1103    /// - `cascade`: Whether to delete children (default: false)
1104    /// - `reason`: Optional reason for deletion
1105    /// - `obliterate`: If true, permanently deletes the task; if false (default), soft deletes
1106    /// - `force`: If true, allows deletion even if owned by another worker
1107    pub fn delete_task(
1108        &self,
1109        task_id: &str,
1110        worker_id: &str,
1111        cascade: bool,
1112        reason: Option<String>,
1113        obliterate: bool,
1114        force: bool,
1115    ) -> Result<()> {
1116        let now = now_ms();
1117
1118        self.with_conn_mut(|conn| {
1119            let tx = conn.transaction()?;
1120
1121            // Get the task to check ownership
1122            let task = get_task_internal(&tx, task_id)?
1123                .ok_or_else(|| anyhow!("Task not found"))?;
1124
1125            // Check ownership - reject if claimed by another worker (unless force)
1126            if let Some(ref owner) = task.worker_id
1127                && owner != worker_id && !force {
1128                    return Err(anyhow!(
1129                        "Task is claimed by worker '{}'. Use force=true to override.",
1130                        owner
1131                    ));
1132                }
1133
1134            if obliterate {
1135                // Hard delete - permanently remove from database
1136                if cascade {
1137                    // Find all descendants using recursive CTE and delete them
1138                    // The CTE finds all tasks reachable via 'contains' dependencies
1139                    tx.execute(
1140                        "WITH RECURSIVE descendants AS (
1141                            SELECT ?1 AS id
1142                            UNION ALL
1143                            SELECT dep.to_task_id FROM dependencies dep
1144                            INNER JOIN descendants d ON dep.from_task_id = d.id
1145                            WHERE dep.dep_type = 'contains'
1146                        )
1147                        DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1148                        params![task_id],
1149                    )?;
1150                } else {
1151                    // Check for children via dependencies
1152                    let child_count: i32 = tx.query_row(
1153                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1154                        params![task_id],
1155                        |row| row.get(0),
1156                    )?;
1157
1158                    if child_count > 0 {
1159                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1160                    }
1161
1162                    tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1163                }
1164            } else {
1165                // Soft delete - set deleted_at, deleted_by, deleted_reason
1166                if cascade {
1167                    // Soft delete all descendants
1168                    tx.execute(
1169                        "WITH RECURSIVE descendants AS (
1170                            SELECT ?1 AS id
1171                            UNION ALL
1172                            SELECT dep.to_task_id FROM dependencies dep
1173                            INNER JOIN descendants d ON dep.from_task_id = d.id
1174                            WHERE dep.dep_type = 'contains'
1175                        )
1176                        UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1177                        WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1178                        params![task_id, now, worker_id, reason],
1179                    )?;
1180                } else {
1181                    // Check for children via dependencies
1182                    let child_count: i32 = tx.query_row(
1183                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1184                        params![task_id],
1185                        |row| row.get(0),
1186                    )?;
1187
1188                    if child_count > 0 {
1189                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1190                    }
1191
1192                    tx.execute(
1193                        "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1194                        params![now, worker_id, reason, task_id],
1195                    )?;
1196                }
1197            }
1198
1199            tx.commit()?;
1200            Ok(())
1201        })
1202    }
1203
1204    /// List tasks with optional filters.
1205    /// Returns full Task objects. Excludes soft-deleted tasks.
1206    pub fn list_tasks(&self, query: ListTasksQuery<'_>) -> Result<Vec<Task>> {
1207        let ListTasksQuery {
1208            status,
1209            phase,
1210            owner,
1211            parent_id,
1212            limit,
1213            offset,
1214            sort_by,
1215            sort_order,
1216        } = query;
1217        self.with_conn(|conn| {
1218            let mut sql = String::from(
1219                "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1220            );
1221            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1222
1223            if let Some(s) = status {
1224                sql.push_str(" AND t.status = ?");
1225                params_vec.push(Box::new(s.to_string()));
1226            }
1227
1228            if let Some(p) = phase {
1229                sql.push_str(" AND t.phase = ?");
1230                params_vec.push(Box::new(p.to_string()));
1231            }
1232
1233            if let Some(o) = owner {
1234                sql.push_str(" AND t.worker_id = ?");
1235                params_vec.push(Box::new(o.to_string()));
1236            }
1237
1238            // Handle parent filtering via dependencies table
1239            if let Some(p) = parent_id {
1240                match p {
1241                    Some(pid) => {
1242                        sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1243                        params_vec.push(Box::new(pid.to_string()));
1244                    }
1245                    None => {
1246                        // Root tasks: not contained by any other task
1247                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1248                    }
1249                }
1250            }
1251
1252            // Build ORDER BY clause
1253            let order_clause = build_order_clause(sort_by, sort_order);
1254            sql.push_str(&format!(" ORDER BY {}", order_clause));
1255
1256            if let Some(l) = limit {
1257                sql.push_str(&format!(" LIMIT {}", l));
1258            }
1259
1260            if offset > 0 {
1261                sql.push_str(&format!(" OFFSET {}", offset));
1262            }
1263
1264            let params_refs: Vec<&dyn rusqlite::ToSql> =
1265                params_vec.iter().map(|b| b.as_ref()).collect();
1266
1267            let mut stmt = conn.prepare(&sql)?;
1268            let tasks = stmt
1269                .query_map(params_refs.as_slice(), parse_task_row)?
1270                .filter_map(|r| r.ok())
1271                .collect();
1272
1273            Ok(tasks)
1274        })
1275    }
1276
1277    /// Set the current thought for tasks owned by an agent.
1278    pub fn set_thought(
1279        &self,
1280        agent_id: &str,
1281        thought: Option<String>,
1282        task_ids: Option<Vec<String>>,
1283    ) -> Result<i32> {
1284        let now = now_ms();
1285
1286        self.with_conn(|conn| {
1287            let updated = if let Some(ids) = task_ids {
1288                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1289                let sql = format!(
1290                    "UPDATE tasks SET current_thought = ?, updated_at = ?
1291                     WHERE worker_id = ? AND id IN ({})",
1292                    placeholders.join(", ")
1293                );
1294
1295                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1296                params_vec.push(Box::new(thought.clone()));
1297                params_vec.push(Box::new(now));
1298                params_vec.push(Box::new(agent_id.to_string()));
1299                for id in &ids {
1300                    params_vec.push(Box::new(id.clone()));
1301                }
1302
1303                let params_refs: Vec<&dyn rusqlite::ToSql> =
1304                    params_vec.iter().map(|b| b.as_ref()).collect();
1305                conn.execute(&sql, params_refs.as_slice())?
1306            } else {
1307                conn.execute(
1308                    "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1309                    params![thought, now, agent_id],
1310                )?
1311            };
1312
1313            Ok(updated as i32)
1314        })
1315    }
1316
1317    /// Log time for a task.
1318    pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1319        let now = now_ms();
1320
1321        self.with_conn(|conn| {
1322            conn.execute(
1323                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1324                 WHERE id = ?3",
1325                params![duration_ms, now, task_id],
1326            )?;
1327
1328            let total: i64 = conn.query_row(
1329                "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1330                params![task_id],
1331                |row| row.get(0),
1332            )?;
1333
1334            Ok(total)
1335        })
1336    }
1337
1338    /// Log metrics and cost for a task.
1339    /// Values in the metrics array are aggregated (added) to existing values.
1340    pub fn log_metrics(
1341        &self,
1342        task_id: &str,
1343        cost_usd: Option<f64>,
1344        values: &[i64],
1345    ) -> Result<Task> {
1346        let now = now_ms();
1347
1348        self.with_conn(|conn| {
1349            let task =
1350                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1351
1352            // Aggregate metrics (add new values to existing)
1353            let mut new_metrics = task.metrics;
1354            for (i, &val) in values.iter().take(8).enumerate() {
1355                new_metrics[i] += val;
1356            }
1357
1358            let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1359
1360            conn.execute(
1361                "UPDATE tasks SET
1362                    metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1363                    metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1364                    cost_usd = ?9, updated_at = ?10
1365                WHERE id = ?11",
1366                params![
1367                    new_metrics[0],
1368                    new_metrics[1],
1369                    new_metrics[2],
1370                    new_metrics[3],
1371                    new_metrics[4],
1372                    new_metrics[5],
1373                    new_metrics[6],
1374                    new_metrics[7],
1375                    new_cost_usd,
1376                    now,
1377                    task_id,
1378                ],
1379            )?;
1380
1381            Ok(Task {
1382                cost_usd: new_cost_usd,
1383                metrics: new_metrics,
1384                updated_at: now,
1385                ..task
1386            })
1387        })
1388    }
1389
1390    /// Claim a task for an agent.
1391    /// Uses the first timed state (typically "working") as the claiming state.
1392    pub fn claim_task(
1393        &self,
1394        task_id: &str,
1395        agent_id: &str,
1396        states_config: &StatesConfig,
1397    ) -> Result<Task> {
1398        let now = now_ms();
1399
1400        // Find the first timed state to use for claiming (typically "working")
1401        let claim_status = states_config
1402            .definitions
1403            .iter()
1404            .find(|(_, def)| def.timed)
1405            .map(|(name, _)| name.as_str())
1406            .unwrap_or("working");
1407
1408        self.with_conn(|conn| {
1409            // Get the task (using internal helper to avoid deadlock)
1410            let task =
1411                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1412
1413            // Check if already claimed
1414            if task.worker_id.is_some() {
1415                return Err(anyhow!("Task is already claimed"));
1416            }
1417
1418            // Validate state transition
1419            if !states_config.is_valid_transition(&task.status, claim_status) {
1420                let exits = states_config.get_exits(&task.status);
1421                return Err(anyhow!(
1422                    "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1423                    task.status,
1424                    exits
1425                ));
1426            }
1427
1428            // Get the agent (using internal helper to avoid deadlock)
1429            let agent =
1430                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1431
1432            // Check tag affinity - needed_tags (AND - must have ALL)
1433            if !task.needed_tags.is_empty() {
1434                for needed in &task.needed_tags {
1435                    if !agent.tags.contains(needed) {
1436                        return Err(anyhow!("Agent missing required tag: {}", needed));
1437                    }
1438                }
1439            }
1440
1441            // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
1442            if !task.wanted_tags.is_empty() {
1443                let has_any = task
1444                    .wanted_tags
1445                    .iter()
1446                    .any(|wanted| agent.tags.contains(wanted));
1447                if !has_any {
1448                    return Err(anyhow!("Agent has none of the wanted tags"));
1449                }
1450            }
1451
1452            conn.execute(
1453                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1454                 WHERE id = ?6",
1455                params![agent_id, now, claim_status, now, now, task_id,],
1456            )?;
1457
1458            // Record state transition (accumulates time if coming from timed state)
1459            record_state_transition(
1460                conn,
1461                task_id,
1462                claim_status,
1463                Some(agent_id),
1464                None,
1465                states_config,
1466            )?;
1467
1468            // Refresh agent heartbeat
1469            conn.execute(
1470                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1471                params![now, agent_id],
1472            )?;
1473
1474            Ok(Task {
1475                worker_id: Some(agent_id.to_string()),
1476                claimed_at: Some(now),
1477                status: claim_status.to_string(),
1478                started_at: Some(now),
1479                updated_at: now,
1480                ..task
1481            })
1482        })
1483    }
1484
1485    /// Release a task claim.
1486    pub fn release_task(
1487        &self,
1488        task_id: &str,
1489        agent_id: &str,
1490        states_config: &StatesConfig,
1491    ) -> Result<()> {
1492        let now = now_ms();
1493        let release_status = &states_config.initial;
1494
1495        self.with_conn(|conn| {
1496            let task =
1497                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1498
1499            if task.worker_id.as_deref() != Some(agent_id) {
1500                return Err(anyhow!("Task is not owned by this agent"));
1501            }
1502
1503            // Record state transition (accumulates time if coming from timed state)
1504            record_state_transition(
1505                conn,
1506                task_id,
1507                release_status,
1508                Some(agent_id),
1509                None,
1510                states_config,
1511            )?;
1512
1513            conn.execute(
1514                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1515                 WHERE id = ?3",
1516                params![release_status, now, task_id],
1517            )?;
1518
1519            Ok(())
1520        })
1521    }
1522
1523    /// Force release a task regardless of owner.
1524    pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1525        let now = now_ms();
1526        let release_status = &states_config.initial;
1527
1528        self.with_conn(|conn| {
1529            let task =
1530                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1531
1532            // Record state transition (accumulates time if coming from timed state)
1533            record_state_transition(
1534                conn,
1535                task_id,
1536                release_status,
1537                task.worker_id.as_deref(),
1538                None,
1539                states_config,
1540            )?;
1541
1542            conn.execute(
1543                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1544                 WHERE id = ?3",
1545                params![release_status, now, task_id],
1546            )?;
1547
1548            Ok(())
1549        })
1550    }
1551
1552    /// Force claim a task even if owned by another agent.
1553    pub fn force_claim_task(
1554        &self,
1555        task_id: &str,
1556        agent_id: &str,
1557        states_config: &StatesConfig,
1558    ) -> Result<Task> {
1559        let now = now_ms();
1560
1561        // Find the first timed state to use for claiming (typically "working")
1562        let claim_status = states_config
1563            .definitions
1564            .iter()
1565            .find(|(_, def)| def.timed)
1566            .map(|(name, _)| name.as_str())
1567            .unwrap_or("working");
1568
1569        self.with_conn(|conn| {
1570            // Get the task
1571            let task =
1572                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1573
1574            // Get the agent
1575            let agent =
1576                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1577
1578            // Check tag affinity - needed_tags (AND)
1579            if !task.needed_tags.is_empty() {
1580                for needed in &task.needed_tags {
1581                    if !agent.tags.contains(needed) {
1582                        return Err(anyhow!("Agent missing required tag: {}", needed));
1583                    }
1584                }
1585            }
1586
1587            // Check tag affinity - wanted_tags (OR)
1588            if !task.wanted_tags.is_empty() {
1589                let has_any = task
1590                    .wanted_tags
1591                    .iter()
1592                    .any(|wanted| agent.tags.contains(wanted));
1593                if !has_any {
1594                    return Err(anyhow!("Agent has none of the wanted tags"));
1595                }
1596            }
1597
1598            conn.execute(
1599                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1600                 WHERE id = ?6",
1601                params![agent_id, now, claim_status, now, now, task_id,],
1602            )?;
1603
1604            // Record state transition (accumulates time if coming from timed state)
1605            record_state_transition(
1606                conn,
1607                task_id,
1608                claim_status,
1609                Some(agent_id),
1610                None,
1611                states_config,
1612            )?;
1613
1614            // Refresh agent heartbeat
1615            conn.execute(
1616                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1617                params![now, agent_id],
1618            )?;
1619
1620            Ok(Task {
1621                worker_id: Some(agent_id.to_string()),
1622                claimed_at: Some(now),
1623                status: claim_status.to_string(),
1624                started_at: task.started_at.or(Some(now)),
1625                updated_at: now,
1626                ..task
1627            })
1628        })
1629    }
1630
1631    /// Release a task claim with a specified state.
1632    pub fn release_task_with_state(
1633        &self,
1634        task_id: &str,
1635        agent_id: &str,
1636        state: &str,
1637        states_config: &StatesConfig,
1638    ) -> Result<()> {
1639        let now = now_ms();
1640
1641        self.with_conn(|conn| {
1642            let task =
1643                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1644
1645            if task.worker_id.as_deref() != Some(agent_id) {
1646                return Err(anyhow!("Task is not owned by this agent"));
1647            }
1648
1649            // Validate state exists
1650            if !states_config.is_valid_state(state) {
1651                return Err(anyhow!(
1652                    "Invalid state '{}'. Valid states: {:?}",
1653                    state,
1654                    states_config.state_names()
1655                ));
1656            }
1657
1658            // Validate transition
1659            if !states_config.is_valid_transition(&task.status, state) {
1660                let exits = states_config.get_exits(&task.status);
1661                return Err(anyhow!(
1662                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1663                    task.status,
1664                    state,
1665                    exits
1666                ));
1667            }
1668
1669            // Set completed_at when entering completed status (even if it has reopen exits)
1670            let completed_at = if state == "completed" {
1671                Some(now)
1672            } else {
1673                None
1674            };
1675
1676            // Record state transition (accumulates time if coming from timed state)
1677            record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1678
1679            conn.execute(
1680                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1681                 WHERE id = ?4",
1682                params![state, completed_at, now, task_id],
1683            )?;
1684
1685            Ok(())
1686        })
1687    }
1688
1689    /// Force release stale claims.
1690    pub fn force_release_stale(
1691        &self,
1692        timeout_seconds: i64,
1693        states_config: &StatesConfig,
1694    ) -> Result<i32> {
1695        let now = now_ms();
1696        let cutoff = now - (timeout_seconds * 1000);
1697        let release_status = &states_config.initial;
1698
1699        self.with_conn(|conn| {
1700            let updated = conn.execute(
1701                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1702                 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1703                params![release_status, now, cutoff],
1704            )?;
1705
1706            Ok(updated as i32)
1707        })
1708    }
1709
1710    /// Complete a task and release file locks held by the agent.
1711    /// Uses "completed" state by default, which should be a terminal state.
1712    /// Checks that all children (via 'contains' dependencies) are complete.
1713    pub fn complete_task(
1714        &self,
1715        task_id: &str,
1716        agent_id: &str,
1717        states_config: &StatesConfig,
1718    ) -> Result<Task> {
1719        let now = now_ms();
1720
1721        // Find a terminal state to use (prefer "completed" if it exists)
1722        let complete_status = if states_config.definitions.contains_key("completed") {
1723            "completed"
1724        } else {
1725            // Find any terminal state
1726            states_config
1727                .definitions
1728                .iter()
1729                .find(|(_, def)| def.exits.is_empty())
1730                .map(|(name, _)| name.as_str())
1731                .unwrap_or("completed")
1732        };
1733
1734        self.with_conn_mut(|conn| {
1735            let tx = conn.transaction()?;
1736
1737            // Get the task
1738            let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1739            let task = stmt
1740                .query_row(params![task_id], parse_task_row)
1741                .map_err(|_| anyhow!("Task not found"))?;
1742            drop(stmt);
1743
1744            // Verify ownership
1745            if task.worker_id.as_deref() != Some(agent_id) {
1746                return Err(anyhow!("Task is not owned by this agent"));
1747            }
1748
1749            // Check for incomplete children (blocking completion)
1750            let incomplete_children: i32 = tx.query_row(
1751                "SELECT COUNT(*) FROM dependencies d
1752                 INNER JOIN tasks child ON d.to_task_id = child.id
1753                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1754                 AND child.status IN (SELECT value FROM json_each(?2))",
1755                params![
1756                    task_id,
1757                    serde_json::to_string(&states_config.blocking_states)?
1758                ],
1759                |row| row.get(0),
1760            )?;
1761
1762            if incomplete_children > 0 {
1763                return Err(anyhow!(
1764                    "Cannot complete task: {} child task(s) are not complete",
1765                    incomplete_children
1766                ));
1767            }
1768
1769            // Validate transition
1770            if !states_config.is_valid_transition(&task.status, complete_status) {
1771                let exits = states_config.get_exits(&task.status);
1772                return Err(anyhow!(
1773                    "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1774                    task.status,
1775                    exits
1776                ));
1777            }
1778
1779            // Record state transition (accumulates time from timed state)
1780            record_state_transition(
1781                &tx,
1782                task_id,
1783                complete_status,
1784                Some(agent_id),
1785                None,
1786                states_config,
1787            )?;
1788
1789            // Update task to completed
1790            tx.execute(
1791                "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1792                 worker_id = NULL, claimed_at = NULL
1793                 WHERE id = ?4",
1794                params![complete_status, now, now, task_id],
1795            )?;
1796
1797            // Release file locks associated with this task (for auto-cleanup)
1798            tx.execute(
1799                "DELETE FROM file_locks WHERE task_id = ?1",
1800                params![task_id],
1801            )?;
1802
1803            // Refresh agent heartbeat
1804            tx.execute(
1805                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1806                params![now, agent_id],
1807            )?;
1808
1809            tx.commit()?;
1810
1811            Ok(Task {
1812                status: complete_status.to_string(),
1813                completed_at: Some(now),
1814                updated_at: now,
1815                worker_id: None,
1816                claimed_at: None,
1817                ..task
1818            })
1819        })
1820    }
1821
1822    /// Get all tasks. Excludes soft-deleted tasks.
1823    pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1824        self.with_conn(|conn| {
1825            let mut stmt =
1826                conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1827            let tasks = stmt
1828                .query_map([], parse_task_row)?
1829                .filter_map(|r| r.ok())
1830                .collect();
1831            Ok(tasks)
1832        })
1833    }
1834
1835    /// Get tasks by status.
1836    #[allow(dead_code)]
1837    pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1838        self.with_conn(|conn| {
1839            let mut stmt =
1840                conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1841            let tasks = stmt
1842                .query_map(params![status], parse_task_row)?
1843                .filter_map(|r| r.ok())
1844                .collect();
1845            Ok(tasks)
1846        })
1847    }
1848
1849    /// Get claimed tasks. Excludes soft-deleted tasks.
1850    pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1851        self.with_conn(|conn| {
1852            let tasks = if let Some(aid) = agent_id {
1853                let mut stmt = conn
1854                    .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1855                stmt.query_map(params![aid], parse_task_row)?
1856                    .filter_map(|r| r.ok())
1857                    .collect()
1858            } else {
1859                let mut stmt = conn.prepare(
1860                    "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1861                )?;
1862                stmt.query_map([], parse_task_row)?
1863                    .filter_map(|r| r.ok())
1864                    .collect()
1865            };
1866
1867            Ok(tasks)
1868        })
1869    }
1870}
1871
1872/// Helper function to create task tree recursively within a transaction.
1873/// Creates dependencies from parent to children using child_type.
1874/// Creates dependencies between siblings using sibling_type.
1875/// Supports referencing existing tasks via ref_id.
1876#[allow(clippy::too_many_arguments)]
1877fn create_tree_recursive(
1878    conn: &Connection,
1879    input: &TaskTreeInput,
1880    parent_id: Option<&str>,
1881    prev_sibling_id: Option<&str>,
1882    child_type: Option<&str>,
1883    sibling_type: Option<&str>,
1884    all_ids: &mut Vec<String>,
1885    phase_warnings: &mut Vec<String>,
1886    tag_warnings: &mut Vec<String>,
1887    states_config: &StatesConfig,
1888    phases_config: &PhasesConfig,
1889    tags_config: &TagsConfig,
1890    ids_config: &IdsConfig,
1891) -> Result<String> {
1892    // Check if this node references an existing task
1893    let task_id = if let Some(ref ref_id) = input.ref_id {
1894        // Verify the referenced task exists
1895        let exists: bool = conn.query_row(
1896            "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1897            params![ref_id],
1898            |row| row.get(0),
1899        )?;
1900        if !exists {
1901            return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1902        }
1903        ref_id.clone()
1904    } else {
1905        // Create a new task
1906        let task_id = input
1907            .id
1908            .clone()
1909            .unwrap_or_else(|| generate_task_id(ids_config));
1910        let now = now_ms();
1911        let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1912        let initial_status = &states_config.initial;
1913
1914        // Derive title: use explicit title, or derive from description, or empty
1915        let title = input.title.clone().unwrap_or_else(|| {
1916            input
1917                .description
1918                .as_deref()
1919                .map(|d| crate::format::truncate_title(d).into_owned())
1920                .unwrap_or_default()
1921        });
1922
1923        // Check phase validity
1924        if let Some(ref phase) = input.phase
1925            && let Some(warning) = phases_config.check_phase(phase)?
1926        {
1927            phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1928        }
1929
1930        let needed_tags = input.needed_tags.clone().unwrap_or_default();
1931        let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1932        let tags = input.tags.clone().unwrap_or_default();
1933
1934        // Check tag validity for all tag types
1935        for warning in tags_config.validate_tags(&tags)? {
1936            tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1937        }
1938        for warning in tags_config.validate_tags(&needed_tags)? {
1939            tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1940        }
1941        for warning in tags_config.validate_tags(&wanted_tags)? {
1942            tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1943        }
1944
1945        let needed_tags_json = serde_json::to_string(&needed_tags)?;
1946        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1947        let tags_json = serde_json::to_string(&tags)?;
1948
1949        conn.execute(
1950            "INSERT INTO tasks (
1951                id, title, description, status, phase, priority,
1952                needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1953            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1954            params![
1955                &task_id,
1956                &title,
1957                &input.description,
1958                initial_status,
1959                &input.phase,
1960                priority.to_string(),
1961                needed_tags_json,
1962                wanted_tags_json,
1963                tags_json,
1964                input.points,
1965                input.time_estimate_ms,
1966                now,
1967                now,
1968            ],
1969        )?;
1970
1971        // Record initial state transition
1972        record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1973
1974        // Sync tags to junction tables for indexed lookups
1975        sync_task_tags(conn, &task_id, &tags)?;
1976        sync_needed_tags(conn, &task_id, &needed_tags)?;
1977        sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1978
1979        task_id
1980    };
1981
1982    // Create dependency from parent if child_type is specified
1983    if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1984        Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1985    }
1986
1987    // Create dependency from previous sibling if sibling_type is specified
1988    if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1989        Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1990    }
1991
1992    all_ids.push(task_id.clone());
1993
1994    // Create children with dependencies based on child_type and sibling_type
1995    let mut prev_child_id: Option<String> = None;
1996    for child in input.children.iter() {
1997        let child_id = create_tree_recursive(
1998            conn,
1999            child,
2000            Some(&task_id),
2001            prev_child_id.as_deref(),
2002            child_type,
2003            sibling_type,
2004            all_ids,
2005            phase_warnings,
2006            tag_warnings,
2007            states_config,
2008            phases_config,
2009            tags_config,
2010            ids_config,
2011        )?;
2012        prev_child_id = Some(child_id);
2013    }
2014
2015    Ok(task_id)
2016}