Skip to main content

task_graph_mcp/db/
tasks.rs

1//! Task CRUD and tree operations.
2
3use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6    AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::error::ToolError;
9use crate::types::{
10    PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
11    parse_priority,
12};
13use anyhow::{Result, anyhow};
14use petname::{Generator, Petnames};
15use rusqlite::{Connection, Row, params};
16
17/// Generate a petname-based task ID using the large wordlist.
18/// Uses the configured number of words and case style.
19fn generate_task_id(ids_config: &IdsConfig) -> String {
20    let words = ids_config.task_id_words;
21    let case = ids_config.id_case;
22
23    // Generate with hyphen separator first (petname's default format)
24    let base = Petnames::medium()
25        .generate_one(words, "-")
26        .unwrap_or_else(|| format!("task-{}", super::now_ms()));
27
28    // Convert to desired case
29    case.convert(&base)
30}
31
32/// Build an ORDER BY clause from sort_by and sort_order parameters.
33/// Returns a safe SQL ORDER BY expression.
34fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
35    let field = match sort_by {
36        Some("priority") => "CAST(t.priority AS INTEGER)",
37        Some("created_at") => "t.created_at",
38        Some("updated_at") => "t.updated_at",
39        _ => "t.created_at", // default
40    };
41
42    let order = match sort_order {
43        Some("asc") => "ASC",
44        Some("desc") => "DESC",
45        _ => {
46            // Default: priority is descending (higher number = more important), dates are descending
47            "DESC"
48        }
49    };
50
51    format!("{} {}", field, order)
52}
53
54// =============================================================================
55// Junction table helpers for tag management
56// =============================================================================
57
58/// Sync task tags to the task_tags junction table.
59/// Replaces all existing tags for the task.
60fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
61    conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
62    for tag in tags {
63        conn.execute(
64            "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
65            params![task_id, tag],
66        )?;
67    }
68    Ok(())
69}
70
71/// Sync needed tags (agent must have ALL) to the task_needed_tags junction table.
72fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
73    conn.execute(
74        "DELETE FROM task_needed_tags WHERE task_id = ?1",
75        params![task_id],
76    )?;
77    for tag in tags {
78        conn.execute(
79            "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
80            params![task_id, tag],
81        )?;
82    }
83    Ok(())
84}
85
86/// Sync wanted tags (agent must have ANY) to the task_wanted_tags junction table.
87fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
88    conn.execute(
89        "DELETE FROM task_wanted_tags WHERE task_id = ?1",
90        params![task_id],
91    )?;
92    for tag in tags {
93        conn.execute(
94            "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
95            params![task_id, tag],
96        )?;
97    }
98    Ok(())
99}
100
101pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
102    let id: String = row.get("id")?;
103    let title: String = row.get("title")?;
104    let description: Option<String> = row.get("description")?;
105    let status: String = row.get("status")?;
106    let phase: Option<String> = row.get("phase")?;
107    let priority: String = row.get("priority")?;
108    let worker_id: Option<String> = row.get("worker_id")?;
109    let claimed_at: Option<i64> = row.get("claimed_at")?;
110
111    let needed_tags_json: Option<String> = row.get("needed_tags")?;
112    let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
113    let tags_json: Option<String> = row.get("tags")?;
114
115    let points: Option<i32> = row.get("points")?;
116    let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
117    let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
118    let started_at: Option<i64> = row.get("started_at")?;
119    let completed_at: Option<i64> = row.get("completed_at")?;
120
121    let current_thought: Option<String> = row.get("current_thought")?;
122
123    let cost_usd: f64 = row.get("cost_usd")?;
124    let metric_0: i64 = row.get("metric_0")?;
125    let metric_1: i64 = row.get("metric_1")?;
126    let metric_2: i64 = row.get("metric_2")?;
127    let metric_3: i64 = row.get("metric_3")?;
128    let metric_4: i64 = row.get("metric_4")?;
129    let metric_5: i64 = row.get("metric_5")?;
130    let metric_6: i64 = row.get("metric_6")?;
131    let metric_7: i64 = row.get("metric_7")?;
132
133    let created_at: i64 = row.get("created_at")?;
134    let updated_at: i64 = row.get("updated_at")?;
135
136    Ok(Task {
137        id,
138        title,
139        description,
140        status,
141        phase,
142        priority: parse_priority(&priority),
143        worker_id,
144        claimed_at,
145        needed_tags: needed_tags_json
146            .map(|s| serde_json::from_str(&s).unwrap_or_default())
147            .unwrap_or_default(),
148        wanted_tags: wanted_tags_json
149            .map(|s| serde_json::from_str(&s).unwrap_or_default())
150            .unwrap_or_default(),
151        tags: tags_json
152            .map(|s| serde_json::from_str(&s).unwrap_or_default())
153            .unwrap_or_default(),
154        points,
155        time_estimate_ms,
156        time_actual_ms,
157        started_at,
158        completed_at,
159        current_thought,
160        cost_usd,
161        metrics: [
162            metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
163        ],
164        created_at,
165        updated_at,
166    })
167}
168
169/// Internal helper to get a task using an existing connection (avoids deadlock).
170fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
171    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
172
173    let result = stmt.query_row(params![task_id], parse_task_row);
174
175    match result {
176        Ok(task) => Ok(Some(task)),
177        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
178        Err(e) => Err(e.into()),
179    }
180}
181
182/// Internal helper to get a worker using an existing connection (avoids deadlock).
183fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
184    let mut stmt = conn.prepare(
185        "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow
186         FROM workers WHERE id = ?1",
187    )?;
188
189    let result = stmt.query_row(params![worker_id], |row| {
190        let id: String = row.get(0)?;
191        let tags_json: String = row.get(1)?;
192        let max_claims: i32 = row.get(2)?;
193        let registered_at: i64 = row.get(3)?;
194        let last_heartbeat: i64 = row.get(4)?;
195        let last_status: Option<String> = row.get(5)?;
196        let last_phase: Option<String> = row.get(6)?;
197        let workflow: Option<String> = row.get(7)?;
198
199        Ok((
200            id,
201            tags_json,
202            max_claims,
203            registered_at,
204            last_heartbeat,
205            last_status,
206            last_phase,
207            workflow,
208        ))
209    });
210
211    match result {
212        Ok((
213            id,
214            tags_json,
215            max_claims,
216            registered_at,
217            last_heartbeat,
218            last_status,
219            last_phase,
220            workflow,
221        )) => {
222            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
223            Ok(Some(Worker {
224                id,
225                tags,
226                max_claims,
227                registered_at,
228                last_heartbeat,
229                last_status,
230                last_phase,
231                workflow,
232            }))
233        }
234        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
235        Err(e) => Err(e.into()),
236    }
237}
238
239impl Database {
240    /// Create a new task.
241    /// If id is provided, uses it as the task ID; otherwise generates a petname ID.
242    /// If parent_id is provided, creates a 'contains' dependency from parent to this task.
243    #[allow(clippy::too_many_arguments)]
244    pub fn create_task(
245        &self,
246        id: Option<String>,
247        description: String,
248        parent_id: Option<String>,
249        phase: Option<String>,
250        priority: Option<Priority>,
251        points: Option<i32>,
252        time_estimate_ms: Option<i64>,
253        agent_tags_all: Option<Vec<String>>,
254        agent_tags_any: Option<Vec<String>>,
255        tags: Option<Vec<String>>,
256        states_config: &StatesConfig,
257        ids_config: &IdsConfig,
258    ) -> Result<Task> {
259        let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
260        let now = now_ms();
261        let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
262        let initial_status = &states_config.initial;
263
264        let needed_tags = agent_tags_all.unwrap_or_default();
265        let wanted_tags = agent_tags_any.unwrap_or_default();
266        let tags = tags.unwrap_or_default();
267        let needed_tags_json = serde_json::to_string(&needed_tags)?;
268        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
269        let tags_json = serde_json::to_string(&tags)?;
270
271        self.with_conn_mut(|conn| {
272            let tx = conn.transaction()?;
273
274            tx.execute(
275                "INSERT INTO tasks (
276                    id, title, description, status, phase, priority,
277                    needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
278                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
279                params![
280                    &task_id,
281                    &description, // Use description as title
282                    &description, // Also store as description
283                    initial_status,
284                    &phase,
285                    priority.to_string(),
286                    needed_tags_json,
287                    wanted_tags_json,
288                    tags_json,
289                    points,
290                    time_estimate_ms,
291                    now,
292                    now,
293                ],
294            )?;
295
296            // Sync tags to junction tables
297            sync_task_tags(&tx, &task_id, &tags)?;
298            sync_needed_tags(&tx, &task_id, &needed_tags)?;
299            sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
300
301            // Create 'contains' dependency if parent_id is provided
302            if let Some(ref pid) = parent_id {
303                Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
304            }
305
306            // Record initial state
307            record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
308
309            tx.commit()?;
310
311            Ok(Task {
312                id: task_id,
313                title: description.clone(),
314                description: Some(description),
315                status: initial_status.clone(),
316                phase,
317                priority,
318                worker_id: None,
319                claimed_at: None,
320                needed_tags,
321                wanted_tags,
322                tags,
323                points,
324                time_estimate_ms,
325                time_actual_ms: None,
326                started_at: None,
327                completed_at: None,
328                current_thought: None,
329                cost_usd: 0.0,
330                metrics: [0; 8],
331                created_at: now,
332                updated_at: now,
333            })
334        })
335    }
336
337    /// Convenience method to create a task with just description.
338    /// All other parameters use defaults. Useful for tests.
339    pub fn create_task_simple(
340        &self,
341        description: impl Into<String>,
342        states_config: &StatesConfig,
343        ids_config: &IdsConfig,
344    ) -> Result<Task> {
345        self.create_task(
346            None,
347            description.into(),
348            None,
349            None,
350            None,
351            None,
352            None,
353            None,
354            None,
355            None,
356            states_config,
357            ids_config,
358        )
359    }
360
361    /// Create a task tree from nested input.
362    /// Uses child_type for parent-child dependencies (default: "contains").
363    /// Uses sibling_type for sibling dependencies (default: none/parallel).
364    pub fn create_task_tree(
365        &self,
366        input: TaskTreeInput,
367        parent_id: Option<String>,
368        child_type: Option<String>,
369        sibling_type: Option<String>,
370        states_config: &StatesConfig,
371        phases_config: &PhasesConfig,
372        tags_config: &TagsConfig,
373        ids_config: &IdsConfig,
374    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
375        let mut all_ids = Vec::new();
376        let mut phase_warnings = Vec::new();
377        let mut tag_warnings = Vec::new();
378        // Default child_type to "contains" if not specified
379        let child_type = child_type.or_else(|| Some("contains".to_string()));
380
381        self.with_conn_mut(|conn| {
382            let tx = conn.transaction()?;
383            let root_id = create_tree_recursive(
384                &tx,
385                &input,
386                parent_id.as_deref(),
387                None, // no previous sibling for root
388                child_type.as_deref(),
389                sibling_type.as_deref(),
390                &mut all_ids,
391                &mut phase_warnings,
392                &mut tag_warnings,
393                states_config,
394                phases_config,
395                tags_config,
396                ids_config,
397            )?;
398            tx.commit()?;
399            Ok((root_id, all_ids, phase_warnings, tag_warnings))
400        })
401    }
402
403    /// Get a task by ID.
404    pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
405        self.with_conn(|conn| {
406            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
407
408            let result = stmt.query_row(params![task_id], parse_task_row);
409
410            match result {
411                Ok(task) => Ok(Some(task)),
412                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
413                Err(e) => Err(e.into()),
414            }
415        })
416    }
417
418    /// Get a task with all its children (tree).
419    pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
420        let task = self.get_task(task_id)?;
421        match task {
422            None => Ok(None),
423            Some(task) => {
424                let children = self.get_children_recursive(&task.id)?;
425                Ok(Some(TaskTree { task, children }))
426            }
427        }
428    }
429
430    /// Get children recursively.
431    fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
432        let children = self.get_children(parent_id)?;
433        let mut result = Vec::new();
434
435        for child in children {
436            let child_children = self.get_children_recursive(&child.id)?;
437            result.push(TaskTree {
438                task: child,
439                children: child_children,
440            });
441        }
442
443        Ok(result)
444    }
445
446    /// Get direct children of a task (via 'contains' dependency).
447    pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
448        self.with_conn(|conn| {
449            let mut stmt = conn.prepare(
450                "SELECT t.* FROM tasks t
451                 INNER JOIN dependencies d ON t.id = d.to_task_id
452                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
453                 ORDER BY t.created_at",
454            )?;
455
456            let tasks = stmt
457                .query_map(params![parent_id], parse_task_row)?
458                .filter_map(|r| r.ok())
459                .collect();
460
461            Ok(tasks)
462        })
463    }
464
465    /// Update a task.
466    #[allow(clippy::too_many_arguments)]
467    pub fn update_task(
468        &self,
469        task_id: &str,
470        title: Option<String>,
471        description: Option<Option<String>>,
472        status: Option<String>,
473        priority: Option<Priority>,
474        points: Option<Option<i32>>,
475        tags: Option<Vec<String>>,
476        states_config: &StatesConfig,
477    ) -> Result<Task> {
478        let now = now_ms();
479
480        self.with_conn(|conn| {
481            let task =
482                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
483
484            let new_title = title.unwrap_or(task.title.clone());
485            let new_description = description.unwrap_or(task.description.clone());
486            let new_status = status.unwrap_or(task.status.clone());
487            let new_priority = priority.unwrap_or(task.priority);
488            let new_points = points.unwrap_or(task.points);
489            let new_tags = tags.unwrap_or(task.tags.clone());
490
491            // Validate the new status exists
492            if !states_config.is_valid_state(&new_status) {
493                return Err(anyhow!(
494                    "Invalid state '{}'. Valid states: {:?}",
495                    new_status,
496                    states_config.state_names()
497                ));
498            }
499
500            // Validate state transition if status changed
501            if task.status != new_status
502                && !states_config.is_valid_transition(&task.status, &new_status)
503            {
504                let exits = states_config.get_exits(&task.status);
505                return Err(anyhow!(
506                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
507                    task.status,
508                    new_status,
509                    exits
510                ));
511            }
512
513            // Handle status transitions for timestamps
514            // Set started_at when first entering a timed state
515            let started_at =
516                if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
517                    Some(now)
518                } else {
519                    task.started_at
520                };
521
522            // Set completed_at when entering a completed state (terminal or with reopen capability)
523            let completed_at = if new_status == "completed" {
524                Some(now)
525            } else {
526                task.completed_at
527            };
528
529            // Record state transition if status changed (handles time accumulation)
530            if task.status != new_status {
531                record_state_transition(
532                    conn,
533                    task_id,
534                    &new_status,
535                    task.worker_id.as_deref(),
536                    None,
537                    states_config,
538                )?;
539            }
540
541            conn.execute(
542                "UPDATE tasks SET
543                    title = ?1, description = ?2, status = ?3, priority = ?4,
544                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
545                    tags = ?9
546                WHERE id = ?10",
547                params![
548                    new_title,
549                    new_description,
550                    new_status,
551                    new_priority.to_string(),
552                    new_points,
553                    started_at,
554                    completed_at,
555                    now,
556                    serde_json::to_string(&new_tags)?,
557                    task_id,
558                ],
559            )?;
560
561            Ok(Task {
562                id: task_id.to_string(),
563                title: new_title,
564                description: new_description,
565                status: new_status,
566                priority: new_priority,
567                points: new_points,
568                tags: new_tags,
569                started_at,
570                completed_at,
571                updated_at: now,
572                ..task
573            })
574        })
575    }
576
577    /// Update a task with unified claim/release logic.
578    /// - Transition to timed state = CLAIM (set owner, validate tags, check limit)
579    /// - Transition from timed to non-timed = RELEASE (clear owner)
580    /// - Transition to terminal = COMPLETE (check children, release file locks)
581    /// - With assignee = ASSIGN (set owner to assignee, transition to 'assigned' state)
582    /// - Only the owner can update a claimed task (unless force=true)
583    ///
584    /// Returns (task, unblocked, auto_advanced):
585    /// - task: The updated task
586    /// - unblocked: Task IDs that are now ready (all dependencies satisfied)
587    /// - auto_advanced: Subset of unblocked that were actually transitioned
588    #[allow(clippy::too_many_arguments)]
589    pub fn update_task_unified(
590        &self,
591        task_id: &str,
592        agent_id: &str,
593        assignee: Option<&str>,
594        title: Option<String>,
595        description: Option<Option<String>>,
596        status: Option<String>,
597        phase: Option<String>,
598        priority: Option<Priority>,
599        points: Option<Option<i32>>,
600        tags: Option<Vec<String>>,
601        needed_tags: Option<Vec<String>>,
602        wanted_tags: Option<Vec<String>>,
603        time_estimate_ms: Option<i64>,
604        reason: Option<String>,
605        force: bool,
606        states_config: &StatesConfig,
607        deps_config: &DependenciesConfig,
608        auto_advance: &AutoAdvanceConfig,
609    ) -> Result<(Task, Vec<String>, Vec<String>)> {
610        let now = now_ms();
611
612        self.with_conn_mut(|conn| {
613            let tx = conn.transaction()?;
614
615            let task =
616                get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
617
618            // Owner-only validation: if task is claimed, only owner can update (unless force)
619            if let Some(ref current_owner) = task.worker_id
620                && current_owner != agent_id && !force {
621                    return Err(anyhow!(
622                        "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
623                        current_owner
624                    ));
625                }
626
627            let new_title = title.unwrap_or(task.title.clone());
628            let new_description = description.unwrap_or(task.description.clone());
629            // If assignee is set but no explicit status, default to 'assigned' state
630            let new_status = if assignee.is_some() && status.is_none() {
631                "assigned".to_string()
632            } else {
633                status.unwrap_or(task.status.clone())
634            };
635            let new_priority = priority.unwrap_or(task.priority);
636            let new_points = points.unwrap_or(task.points);
637            let new_tags = tags.unwrap_or(task.tags.clone());
638            let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
639            let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
640            let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
641            let new_phase = phase.or(task.phase.clone());
642
643            // Validate the new status exists
644            if !states_config.is_valid_state(&new_status) {
645                return Err(anyhow!(
646                    "Invalid state '{}'. Valid states: {:?}",
647                    new_status,
648                    states_config.state_names()
649                ));
650            }
651
652            // Validate state transition if status changed
653            if task.status != new_status
654                && !states_config.is_valid_transition(&task.status, &new_status) {
655                    let exits = states_config.get_exits(&task.status);
656                    return Err(anyhow!(
657                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
658                        task.status,
659                        new_status,
660                        exits
661                    ));
662                }
663
664            // Determine ownership changes based on state transition
665            let new_is_timed = states_config.is_timed_state(&new_status);
666            let new_is_terminal = states_config.is_terminal_state(&new_status);
667            let current_owner = task.worker_id.as_deref();
668            let is_owned_by_agent = current_owner == Some(agent_id);
669            let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
670
671            let mut new_owner: Option<String> = task.worker_id.clone();
672            let mut new_claimed_at: Option<i64> = task.claimed_at;
673
674            // ASSIGN: Push coordination - coordinator assigns task to another agent
675            // Sets owner without starting the timer (assigned state is untimed)
676            if let Some(target_agent) = assignee {
677                // Verify task is not already claimed (unless force)
678                if is_owned_by_other && !force {
679                    return Err(anyhow!(
680                        "Task is already claimed by agent '{}'. Use force=true to reassign.",
681                        current_owner.unwrap()
682                    ));
683                }
684
685                // Verify the assignee exists
686                let target = get_worker_internal(&tx, target_agent)?
687                    .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
688
689                // Check tag affinity for the assignee
690                if !task.needed_tags.is_empty() {
691                    for needed in &task.needed_tags {
692                        if !target.tags.contains(needed) {
693                            return Err(anyhow!(
694                                "Assignee '{}' missing required tag: {}",
695                                target_agent,
696                                needed
697                            ));
698                        }
699                    }
700                }
701
702                if !task.wanted_tags.is_empty() {
703                    let has_any = task
704                        .wanted_tags
705                        .iter()
706                        .any(|wanted| target.tags.contains(wanted));
707                    if !has_any {
708                        return Err(anyhow!(
709                            "Assignee '{}' has none of the wanted tags: {:?}",
710                            target_agent,
711                            task.wanted_tags
712                        ));
713                    }
714                }
715
716                // Set ownership to the assignee
717                new_owner = Some(target_agent.to_string());
718                new_claimed_at = Some(now);
719            }
720
721            // CLAIM: Transitioning to a timed state and need to take ownership
722            // This handles: non-timed -> timed, OR timed (other owner) -> timed (force claim)
723            if new_is_timed && !is_owned_by_agent {
724                // Already claimed by someone else?
725                if is_owned_by_other && !force {
726                    return Err(anyhow!(
727                        "Task is already claimed by agent '{}'",
728                        current_owner.unwrap()
729                    ));
730                }
731
732                // Check for unsatisfied blocking dependencies (skip if force)
733                if !force {
734                    let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
735                        &tx,
736                        task_id,
737                        states_config,
738                        deps_config,
739                    )?;
740                    if !unsatisfied_blockers.is_empty() {
741                        // Return structured error with blocking task IDs so clients
742                        // can monitor them and retry when they complete
743                        return Err(ToolError::deps_not_satisfied(&unsatisfied_blockers).into());
744                    }
745                }
746
747                // Get the agent
748                let agent = get_worker_internal(&tx, agent_id)?
749                    .ok_or_else(|| anyhow!("Agent not found"))?;
750
751                // Check tag affinity - needed_tags (AND - must have ALL)
752                if !task.needed_tags.is_empty() {
753                    for needed in &task.needed_tags {
754                        if !agent.tags.contains(needed) {
755                            return Err(anyhow!("Agent missing required tag: {}", needed));
756                        }
757                    }
758                }
759
760                // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
761                if !task.wanted_tags.is_empty() {
762                    let has_any = task
763                        .wanted_tags
764                        .iter()
765                        .any(|wanted| agent.tags.contains(wanted));
766                    if !has_any {
767                        return Err(anyhow!("Agent has none of the wanted tags"));
768                    }
769                }
770
771                // Set ownership
772                new_owner = Some(agent_id.to_string());
773                new_claimed_at = Some(now);
774
775                // Refresh agent heartbeat
776                tx.execute(
777                    "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
778                    params![now, agent_id],
779                )?;
780            }
781
782            // RELEASE: Transitioning to non-timed state (but not terminal)
783            if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
784                // Verify ownership (unless force)
785                if is_owned_by_other && !force {
786                    return Err(anyhow!("Task is not owned by this agent"));
787                }
788
789                // Clear ownership
790                new_owner = None;
791                new_claimed_at = None;
792            }
793
794            // COMPLETE: Transition to terminal state
795            if new_is_terminal {
796                // Verify ownership if task was claimed (unless force)
797                if let Some(ref current_owner) = task.worker_id
798                    && current_owner != agent_id && !force {
799                        return Err(anyhow!("Task is not owned by this agent"));
800                    }
801
802                // Check for incomplete children (via 'contains' dependencies)
803                let incomplete_children: i32 = tx.query_row(
804                    "SELECT COUNT(*) FROM dependencies d
805                     INNER JOIN tasks child ON d.to_task_id = child.id
806                     WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
807                     AND child.status IN (SELECT value FROM json_each(?2))",
808                    params![
809                        task_id,
810                        serde_json::to_string(&states_config.blocking_states)?
811                    ],
812                    |row| row.get(0),
813                )?;
814
815                if incomplete_children > 0 {
816                    return Err(anyhow!(
817                        "Cannot complete task: {} child task(s) are not complete",
818                        incomplete_children
819                    ));
820                }
821
822                // Clear ownership
823                new_owner = None;
824                new_claimed_at = None;
825
826                // Release file locks associated with this task (for auto-cleanup)
827                tx.execute(
828                    "DELETE FROM file_locks WHERE task_id = ?1",
829                    params![task_id],
830                )?;
831            }
832
833            // Handle timestamps
834            let started_at =
835                if task.started_at.is_none() && new_is_timed {
836                    Some(now)
837                } else {
838                    task.started_at
839                };
840
841            // Set completed_at when entering completed status (even if it has reopen exits)
842            let completed_at = if new_status == "completed" {
843                Some(now)
844            } else {
845                task.completed_at
846            };
847
848            // Record state transition if status changed (with reason for audit)
849            let status_changed = task.status != new_status;
850            if status_changed {
851                record_state_transition(
852                    &tx,
853                    task_id,
854                    &new_status,
855                    new_owner.as_deref(),
856                    reason.as_deref(),
857                    states_config,
858                )?;
859            }
860
861            // Record phase transition if phase changed
862            let phase_changed = task.phase != new_phase;
863            if phase_changed {
864                super::state_transitions::record_phase_transition(
865                    &tx,
866                    task_id,
867                    new_phase.as_deref().unwrap_or(""),
868                    Some(agent_id),
869                    reason.as_deref(),
870                )?;
871            }
872
873            tx.execute(
874                "UPDATE tasks SET
875                    title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
876                    points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
877                    tags = ?10, worker_id = ?11, claimed_at = ?12,
878                    needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
879                WHERE id = ?16",
880                params![
881                    new_title,
882                    new_description,
883                    new_status,
884                    new_phase,
885                    new_priority.to_string(),
886                    new_points,
887                    started_at,
888                    completed_at,
889                    now,
890                    serde_json::to_string(&new_tags)?,
891                    new_owner,
892                    new_claimed_at,
893                    serde_json::to_string(&new_needed_tags)?,
894                    serde_json::to_string(&new_wanted_tags)?,
895                    new_time_estimate_ms,
896                    task_id,
897                ],
898            )?;
899
900            // Sync tags to junction tables if changed
901            if new_tags != task.tags {
902                sync_task_tags(&tx, task_id, &new_tags)?;
903            }
904            if new_needed_tags != task.needed_tags {
905                sync_needed_tags(&tx, task_id, &new_needed_tags)?;
906            }
907            if new_wanted_tags != task.wanted_tags {
908                sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
909            }
910
911            // Check for unblocked tasks if this task transitioned FROM blocking TO non-blocking
912            let (unblocked, auto_advanced) = if status_changed {
913                let was_blocking = states_config.is_blocking_state(&task.status);
914                let is_blocking = states_config.is_blocking_state(&new_status);
915
916                if was_blocking && !is_blocking {
917                    super::deps::propagate_unblock_effects(
918                        &tx,
919                        task_id,
920                        Some(agent_id),
921                        states_config,
922                        deps_config,
923                        auto_advance,
924                    )?
925                } else {
926                    (vec![], vec![])
927                }
928            } else {
929                (vec![], vec![])
930            };
931
932            tx.commit()?;
933
934            Ok((Task {
935                id: task_id.to_string(),
936                title: new_title,
937                description: new_description,
938                status: new_status,
939                phase: new_phase,
940                priority: new_priority,
941                points: new_points,
942                tags: new_tags,
943                needed_tags: new_needed_tags,
944                wanted_tags: new_wanted_tags,
945                time_estimate_ms: new_time_estimate_ms,
946                started_at,
947                completed_at,
948                updated_at: now,
949                worker_id: new_owner,
950                claimed_at: new_claimed_at,
951                ..task
952            }, unblocked, auto_advanced))
953        })
954    }
955
956    /// Delete a task (soft delete by default, hard delete with obliterate=true).
957    ///
958    /// - `worker_id`: The worker attempting to delete (required for ownership check)
959    /// - `cascade`: Whether to delete children (default: false)
960    /// - `reason`: Optional reason for deletion
961    /// - `obliterate`: If true, permanently deletes the task; if false (default), soft deletes
962    /// - `force`: If true, allows deletion even if owned by another worker
963    pub fn delete_task(
964        &self,
965        task_id: &str,
966        worker_id: &str,
967        cascade: bool,
968        reason: Option<String>,
969        obliterate: bool,
970        force: bool,
971    ) -> Result<()> {
972        let now = now_ms();
973
974        self.with_conn_mut(|conn| {
975            let tx = conn.transaction()?;
976
977            // Get the task to check ownership
978            let task = get_task_internal(&tx, task_id)?
979                .ok_or_else(|| anyhow!("Task not found"))?;
980
981            // Check ownership - reject if claimed by another worker (unless force)
982            if let Some(ref owner) = task.worker_id
983                && owner != worker_id && !force {
984                    return Err(anyhow!(
985                        "Task is claimed by worker '{}'. Use force=true to override.",
986                        owner
987                    ));
988                }
989
990            if obliterate {
991                // Hard delete - permanently remove from database
992                if cascade {
993                    // Find all descendants using recursive CTE and delete them
994                    // The CTE finds all tasks reachable via 'contains' dependencies
995                    tx.execute(
996                        "WITH RECURSIVE descendants AS (
997                            SELECT ?1 AS id
998                            UNION ALL
999                            SELECT dep.to_task_id FROM dependencies dep
1000                            INNER JOIN descendants d ON dep.from_task_id = d.id
1001                            WHERE dep.dep_type = 'contains'
1002                        )
1003                        DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1004                        params![task_id],
1005                    )?;
1006                } else {
1007                    // Check for children via dependencies
1008                    let child_count: i32 = tx.query_row(
1009                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1010                        params![task_id],
1011                        |row| row.get(0),
1012                    )?;
1013
1014                    if child_count > 0 {
1015                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1016                    }
1017
1018                    tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1019                }
1020            } else {
1021                // Soft delete - set deleted_at, deleted_by, deleted_reason
1022                if cascade {
1023                    // Soft delete all descendants
1024                    tx.execute(
1025                        "WITH RECURSIVE descendants AS (
1026                            SELECT ?1 AS id
1027                            UNION ALL
1028                            SELECT dep.to_task_id FROM dependencies dep
1029                            INNER JOIN descendants d ON dep.from_task_id = d.id
1030                            WHERE dep.dep_type = 'contains'
1031                        )
1032                        UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1033                        WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1034                        params![task_id, now, worker_id, reason],
1035                    )?;
1036                } else {
1037                    // Check for children via dependencies
1038                    let child_count: i32 = tx.query_row(
1039                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1040                        params![task_id],
1041                        |row| row.get(0),
1042                    )?;
1043
1044                    if child_count > 0 {
1045                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1046                    }
1047
1048                    tx.execute(
1049                        "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1050                        params![now, worker_id, reason, task_id],
1051                    )?;
1052                }
1053            }
1054
1055            tx.commit()?;
1056            Ok(())
1057        })
1058    }
1059
1060    /// List tasks with optional filters.
1061    /// Returns full Task objects. Excludes soft-deleted tasks.
1062    pub fn list_tasks(
1063        &self,
1064        status: Option<&str>,
1065        phase: Option<&str>,
1066        owner: Option<&str>,
1067        parent_id: Option<Option<&str>>,
1068        limit: Option<i32>,
1069        offset: i32,
1070        sort_by: Option<&str>,
1071        sort_order: Option<&str>,
1072    ) -> Result<Vec<Task>> {
1073        self.with_conn(|conn| {
1074            let mut sql = String::from(
1075                "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1076            );
1077            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1078
1079            if let Some(s) = status {
1080                sql.push_str(" AND t.status = ?");
1081                params_vec.push(Box::new(s.to_string()));
1082            }
1083
1084            if let Some(p) = phase {
1085                sql.push_str(" AND t.phase = ?");
1086                params_vec.push(Box::new(p.to_string()));
1087            }
1088
1089            if let Some(o) = owner {
1090                sql.push_str(" AND t.worker_id = ?");
1091                params_vec.push(Box::new(o.to_string()));
1092            }
1093
1094            // Handle parent filtering via dependencies table
1095            if let Some(p) = parent_id {
1096                match p {
1097                    Some(pid) => {
1098                        sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1099                        params_vec.push(Box::new(pid.to_string()));
1100                    }
1101                    None => {
1102                        // Root tasks: not contained by any other task
1103                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1104                    }
1105                }
1106            }
1107
1108            // Build ORDER BY clause
1109            let order_clause = build_order_clause(sort_by, sort_order);
1110            sql.push_str(&format!(" ORDER BY {}", order_clause));
1111
1112            if let Some(l) = limit {
1113                sql.push_str(&format!(" LIMIT {}", l));
1114            }
1115
1116            if offset > 0 {
1117                sql.push_str(&format!(" OFFSET {}", offset));
1118            }
1119
1120            let params_refs: Vec<&dyn rusqlite::ToSql> =
1121                params_vec.iter().map(|b| b.as_ref()).collect();
1122
1123            let mut stmt = conn.prepare(&sql)?;
1124            let tasks = stmt
1125                .query_map(params_refs.as_slice(), parse_task_row)?
1126                .filter_map(|r| r.ok())
1127                .collect();
1128
1129            Ok(tasks)
1130        })
1131    }
1132
1133    /// Set the current thought for tasks owned by an agent.
1134    pub fn set_thought(
1135        &self,
1136        agent_id: &str,
1137        thought: Option<String>,
1138        task_ids: Option<Vec<String>>,
1139    ) -> Result<i32> {
1140        let now = now_ms();
1141
1142        self.with_conn(|conn| {
1143            let updated = if let Some(ids) = task_ids {
1144                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1145                let sql = format!(
1146                    "UPDATE tasks SET current_thought = ?, updated_at = ?
1147                     WHERE worker_id = ? AND id IN ({})",
1148                    placeholders.join(", ")
1149                );
1150
1151                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1152                params_vec.push(Box::new(thought.clone()));
1153                params_vec.push(Box::new(now));
1154                params_vec.push(Box::new(agent_id.to_string()));
1155                for id in &ids {
1156                    params_vec.push(Box::new(id.clone()));
1157                }
1158
1159                let params_refs: Vec<&dyn rusqlite::ToSql> =
1160                    params_vec.iter().map(|b| b.as_ref()).collect();
1161                conn.execute(&sql, params_refs.as_slice())?
1162            } else {
1163                conn.execute(
1164                    "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1165                    params![thought, now, agent_id],
1166                )?
1167            };
1168
1169            Ok(updated as i32)
1170        })
1171    }
1172
1173    /// Log time for a task.
1174    pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1175        let now = now_ms();
1176
1177        self.with_conn(|conn| {
1178            conn.execute(
1179                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1180                 WHERE id = ?3",
1181                params![duration_ms, now, task_id],
1182            )?;
1183
1184            let total: i64 = conn.query_row(
1185                "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1186                params![task_id],
1187                |row| row.get(0),
1188            )?;
1189
1190            Ok(total)
1191        })
1192    }
1193
1194    /// Log metrics and cost for a task.
1195    /// Values in the metrics array are aggregated (added) to existing values.
1196    pub fn log_metrics(
1197        &self,
1198        task_id: &str,
1199        cost_usd: Option<f64>,
1200        values: &[i64],
1201    ) -> Result<Task> {
1202        let now = now_ms();
1203
1204        self.with_conn(|conn| {
1205            let task =
1206                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1207
1208            // Aggregate metrics (add new values to existing)
1209            let mut new_metrics = task.metrics;
1210            for (i, &val) in values.iter().take(8).enumerate() {
1211                new_metrics[i] += val;
1212            }
1213
1214            let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1215
1216            conn.execute(
1217                "UPDATE tasks SET
1218                    metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1219                    metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1220                    cost_usd = ?9, updated_at = ?10
1221                WHERE id = ?11",
1222                params![
1223                    new_metrics[0],
1224                    new_metrics[1],
1225                    new_metrics[2],
1226                    new_metrics[3],
1227                    new_metrics[4],
1228                    new_metrics[5],
1229                    new_metrics[6],
1230                    new_metrics[7],
1231                    new_cost_usd,
1232                    now,
1233                    task_id,
1234                ],
1235            )?;
1236
1237            Ok(Task {
1238                cost_usd: new_cost_usd,
1239                metrics: new_metrics,
1240                updated_at: now,
1241                ..task
1242            })
1243        })
1244    }
1245
1246    /// Claim a task for an agent.
1247    /// Uses the first timed state (typically "working") as the claiming state.
1248    pub fn claim_task(
1249        &self,
1250        task_id: &str,
1251        agent_id: &str,
1252        states_config: &StatesConfig,
1253    ) -> Result<Task> {
1254        let now = now_ms();
1255
1256        // Find the first timed state to use for claiming (typically "working")
1257        let claim_status = states_config
1258            .definitions
1259            .iter()
1260            .find(|(_, def)| def.timed)
1261            .map(|(name, _)| name.as_str())
1262            .unwrap_or("working");
1263
1264        self.with_conn(|conn| {
1265            // Get the task (using internal helper to avoid deadlock)
1266            let task =
1267                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1268
1269            // Check if already claimed
1270            if task.worker_id.is_some() {
1271                return Err(anyhow!("Task is already claimed"));
1272            }
1273
1274            // Validate state transition
1275            if !states_config.is_valid_transition(&task.status, claim_status) {
1276                let exits = states_config.get_exits(&task.status);
1277                return Err(anyhow!(
1278                    "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1279                    task.status,
1280                    exits
1281                ));
1282            }
1283
1284            // Get the agent (using internal helper to avoid deadlock)
1285            let agent =
1286                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1287
1288            // Check tag affinity - needed_tags (AND - must have ALL)
1289            if !task.needed_tags.is_empty() {
1290                for needed in &task.needed_tags {
1291                    if !agent.tags.contains(needed) {
1292                        return Err(anyhow!("Agent missing required tag: {}", needed));
1293                    }
1294                }
1295            }
1296
1297            // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
1298            if !task.wanted_tags.is_empty() {
1299                let has_any = task
1300                    .wanted_tags
1301                    .iter()
1302                    .any(|wanted| agent.tags.contains(wanted));
1303                if !has_any {
1304                    return Err(anyhow!("Agent has none of the wanted tags"));
1305                }
1306            }
1307
1308            conn.execute(
1309                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1310                 WHERE id = ?6",
1311                params![agent_id, now, claim_status, now, now, task_id,],
1312            )?;
1313
1314            // Record state transition (accumulates time if coming from timed state)
1315            record_state_transition(
1316                conn,
1317                task_id,
1318                claim_status,
1319                Some(agent_id),
1320                None,
1321                states_config,
1322            )?;
1323
1324            // Refresh agent heartbeat
1325            conn.execute(
1326                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1327                params![now, agent_id],
1328            )?;
1329
1330            Ok(Task {
1331                worker_id: Some(agent_id.to_string()),
1332                claimed_at: Some(now),
1333                status: claim_status.to_string(),
1334                started_at: Some(now),
1335                updated_at: now,
1336                ..task
1337            })
1338        })
1339    }
1340
1341    /// Release a task claim.
1342    pub fn release_task(
1343        &self,
1344        task_id: &str,
1345        agent_id: &str,
1346        states_config: &StatesConfig,
1347    ) -> Result<()> {
1348        let now = now_ms();
1349        let release_status = &states_config.initial;
1350
1351        self.with_conn(|conn| {
1352            let task =
1353                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1354
1355            if task.worker_id.as_deref() != Some(agent_id) {
1356                return Err(anyhow!("Task is not owned by this agent"));
1357            }
1358
1359            // Record state transition (accumulates time if coming from timed state)
1360            record_state_transition(
1361                conn,
1362                task_id,
1363                release_status,
1364                Some(agent_id),
1365                None,
1366                states_config,
1367            )?;
1368
1369            conn.execute(
1370                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1371                 WHERE id = ?3",
1372                params![release_status, now, task_id],
1373            )?;
1374
1375            Ok(())
1376        })
1377    }
1378
1379    /// Force release a task regardless of owner.
1380    pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1381        let now = now_ms();
1382        let release_status = &states_config.initial;
1383
1384        self.with_conn(|conn| {
1385            let task =
1386                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1387
1388            // Record state transition (accumulates time if coming from timed state)
1389            record_state_transition(
1390                conn,
1391                task_id,
1392                release_status,
1393                task.worker_id.as_deref(),
1394                None,
1395                states_config,
1396            )?;
1397
1398            conn.execute(
1399                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1400                 WHERE id = ?3",
1401                params![release_status, now, task_id],
1402            )?;
1403
1404            Ok(())
1405        })
1406    }
1407
1408    /// Force claim a task even if owned by another agent.
1409    pub fn force_claim_task(
1410        &self,
1411        task_id: &str,
1412        agent_id: &str,
1413        states_config: &StatesConfig,
1414    ) -> Result<Task> {
1415        let now = now_ms();
1416
1417        // Find the first timed state to use for claiming (typically "working")
1418        let claim_status = states_config
1419            .definitions
1420            .iter()
1421            .find(|(_, def)| def.timed)
1422            .map(|(name, _)| name.as_str())
1423            .unwrap_or("working");
1424
1425        self.with_conn(|conn| {
1426            // Get the task
1427            let task =
1428                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1429
1430            // Get the agent
1431            let agent =
1432                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1433
1434            // Check tag affinity - needed_tags (AND)
1435            if !task.needed_tags.is_empty() {
1436                for needed in &task.needed_tags {
1437                    if !agent.tags.contains(needed) {
1438                        return Err(anyhow!("Agent missing required tag: {}", needed));
1439                    }
1440                }
1441            }
1442
1443            // Check tag affinity - wanted_tags (OR)
1444            if !task.wanted_tags.is_empty() {
1445                let has_any = task
1446                    .wanted_tags
1447                    .iter()
1448                    .any(|wanted| agent.tags.contains(wanted));
1449                if !has_any {
1450                    return Err(anyhow!("Agent has none of the wanted tags"));
1451                }
1452            }
1453
1454            conn.execute(
1455                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1456                 WHERE id = ?6",
1457                params![agent_id, now, claim_status, now, now, task_id,],
1458            )?;
1459
1460            // Record state transition (accumulates time if coming from timed state)
1461            record_state_transition(
1462                conn,
1463                task_id,
1464                claim_status,
1465                Some(agent_id),
1466                None,
1467                states_config,
1468            )?;
1469
1470            // Refresh agent heartbeat
1471            conn.execute(
1472                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1473                params![now, agent_id],
1474            )?;
1475
1476            Ok(Task {
1477                worker_id: Some(agent_id.to_string()),
1478                claimed_at: Some(now),
1479                status: claim_status.to_string(),
1480                started_at: task.started_at.or(Some(now)),
1481                updated_at: now,
1482                ..task
1483            })
1484        })
1485    }
1486
1487    /// Release a task claim with a specified state.
1488    pub fn release_task_with_state(
1489        &self,
1490        task_id: &str,
1491        agent_id: &str,
1492        state: &str,
1493        states_config: &StatesConfig,
1494    ) -> Result<()> {
1495        let now = now_ms();
1496
1497        self.with_conn(|conn| {
1498            let task =
1499                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1500
1501            if task.worker_id.as_deref() != Some(agent_id) {
1502                return Err(anyhow!("Task is not owned by this agent"));
1503            }
1504
1505            // Validate state exists
1506            if !states_config.is_valid_state(state) {
1507                return Err(anyhow!(
1508                    "Invalid state '{}'. Valid states: {:?}",
1509                    state,
1510                    states_config.state_names()
1511                ));
1512            }
1513
1514            // Validate transition
1515            if !states_config.is_valid_transition(&task.status, state) {
1516                let exits = states_config.get_exits(&task.status);
1517                return Err(anyhow!(
1518                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1519                    task.status,
1520                    state,
1521                    exits
1522                ));
1523            }
1524
1525            // Set completed_at when entering completed status (even if it has reopen exits)
1526            let completed_at = if state == "completed" {
1527                Some(now)
1528            } else {
1529                None
1530            };
1531
1532            // Record state transition (accumulates time if coming from timed state)
1533            record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1534
1535            conn.execute(
1536                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1537                 WHERE id = ?4",
1538                params![state, completed_at, now, task_id],
1539            )?;
1540
1541            Ok(())
1542        })
1543    }
1544
1545    /// Force release stale claims.
1546    pub fn force_release_stale(
1547        &self,
1548        timeout_seconds: i64,
1549        states_config: &StatesConfig,
1550    ) -> Result<i32> {
1551        let now = now_ms();
1552        let cutoff = now - (timeout_seconds * 1000);
1553        let release_status = &states_config.initial;
1554
1555        self.with_conn(|conn| {
1556            let updated = conn.execute(
1557                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1558                 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1559                params![release_status, now, cutoff],
1560            )?;
1561
1562            Ok(updated as i32)
1563        })
1564    }
1565
1566    /// Complete a task and release file locks held by the agent.
1567    /// Uses "completed" state by default, which should be a terminal state.
1568    /// Checks that all children (via 'contains' dependencies) are complete.
1569    pub fn complete_task(
1570        &self,
1571        task_id: &str,
1572        agent_id: &str,
1573        states_config: &StatesConfig,
1574    ) -> Result<Task> {
1575        let now = now_ms();
1576
1577        // Find a terminal state to use (prefer "completed" if it exists)
1578        let complete_status = if states_config.definitions.contains_key("completed") {
1579            "completed"
1580        } else {
1581            // Find any terminal state
1582            states_config
1583                .definitions
1584                .iter()
1585                .find(|(_, def)| def.exits.is_empty())
1586                .map(|(name, _)| name.as_str())
1587                .unwrap_or("completed")
1588        };
1589
1590        self.with_conn_mut(|conn| {
1591            let tx = conn.transaction()?;
1592
1593            // Get the task
1594            let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1595            let task = stmt
1596                .query_row(params![task_id], parse_task_row)
1597                .map_err(|_| anyhow!("Task not found"))?;
1598            drop(stmt);
1599
1600            // Verify ownership
1601            if task.worker_id.as_deref() != Some(agent_id) {
1602                return Err(anyhow!("Task is not owned by this agent"));
1603            }
1604
1605            // Check for incomplete children (blocking completion)
1606            let incomplete_children: i32 = tx.query_row(
1607                "SELECT COUNT(*) FROM dependencies d
1608                 INNER JOIN tasks child ON d.to_task_id = child.id
1609                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1610                 AND child.status IN (SELECT value FROM json_each(?2))",
1611                params![
1612                    task_id,
1613                    serde_json::to_string(&states_config.blocking_states)?
1614                ],
1615                |row| row.get(0),
1616            )?;
1617
1618            if incomplete_children > 0 {
1619                return Err(anyhow!(
1620                    "Cannot complete task: {} child task(s) are not complete",
1621                    incomplete_children
1622                ));
1623            }
1624
1625            // Validate transition
1626            if !states_config.is_valid_transition(&task.status, complete_status) {
1627                let exits = states_config.get_exits(&task.status);
1628                return Err(anyhow!(
1629                    "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1630                    task.status,
1631                    exits
1632                ));
1633            }
1634
1635            // Record state transition (accumulates time from timed state)
1636            record_state_transition(
1637                &tx,
1638                task_id,
1639                complete_status,
1640                Some(agent_id),
1641                None,
1642                states_config,
1643            )?;
1644
1645            // Update task to completed
1646            tx.execute(
1647                "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1648                 worker_id = NULL, claimed_at = NULL
1649                 WHERE id = ?4",
1650                params![complete_status, now, now, task_id],
1651            )?;
1652
1653            // Release file locks associated with this task (for auto-cleanup)
1654            tx.execute(
1655                "DELETE FROM file_locks WHERE task_id = ?1",
1656                params![task_id],
1657            )?;
1658
1659            // Refresh agent heartbeat
1660            tx.execute(
1661                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1662                params![now, agent_id],
1663            )?;
1664
1665            tx.commit()?;
1666
1667            Ok(Task {
1668                status: complete_status.to_string(),
1669                completed_at: Some(now),
1670                updated_at: now,
1671                worker_id: None,
1672                claimed_at: None,
1673                ..task
1674            })
1675        })
1676    }
1677
1678    /// Get all tasks. Excludes soft-deleted tasks.
1679    pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1680        self.with_conn(|conn| {
1681            let mut stmt =
1682                conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1683            let tasks = stmt
1684                .query_map([], parse_task_row)?
1685                .filter_map(|r| r.ok())
1686                .collect();
1687            Ok(tasks)
1688        })
1689    }
1690
1691    /// Get tasks by status.
1692    #[allow(dead_code)]
1693    pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1694        self.with_conn(|conn| {
1695            let mut stmt =
1696                conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1697            let tasks = stmt
1698                .query_map(params![status], parse_task_row)?
1699                .filter_map(|r| r.ok())
1700                .collect();
1701            Ok(tasks)
1702        })
1703    }
1704
1705    /// Get claimed tasks. Excludes soft-deleted tasks.
1706    pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1707        self.with_conn(|conn| {
1708            let tasks = if let Some(aid) = agent_id {
1709                let mut stmt = conn
1710                    .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1711                stmt.query_map(params![aid], parse_task_row)?
1712                    .filter_map(|r| r.ok())
1713                    .collect()
1714            } else {
1715                let mut stmt = conn.prepare(
1716                    "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1717                )?;
1718                stmt.query_map([], parse_task_row)?
1719                    .filter_map(|r| r.ok())
1720                    .collect()
1721            };
1722
1723            Ok(tasks)
1724        })
1725    }
1726}
1727
1728/// Helper function to create task tree recursively within a transaction.
1729/// Creates dependencies from parent to children using child_type.
1730/// Creates dependencies between siblings using sibling_type.
1731/// Supports referencing existing tasks via ref_id.
1732#[allow(clippy::too_many_arguments)]
1733fn create_tree_recursive(
1734    conn: &Connection,
1735    input: &TaskTreeInput,
1736    parent_id: Option<&str>,
1737    prev_sibling_id: Option<&str>,
1738    child_type: Option<&str>,
1739    sibling_type: Option<&str>,
1740    all_ids: &mut Vec<String>,
1741    phase_warnings: &mut Vec<String>,
1742    tag_warnings: &mut Vec<String>,
1743    states_config: &StatesConfig,
1744    phases_config: &PhasesConfig,
1745    tags_config: &TagsConfig,
1746    ids_config: &IdsConfig,
1747) -> Result<String> {
1748    // Check if this node references an existing task
1749    let task_id = if let Some(ref ref_id) = input.ref_id {
1750        // Verify the referenced task exists
1751        let exists: bool = conn.query_row(
1752            "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1753            params![ref_id],
1754            |row| row.get(0),
1755        )?;
1756        if !exists {
1757            return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1758        }
1759        ref_id.clone()
1760    } else {
1761        // Create a new task
1762        let task_id = input
1763            .id
1764            .clone()
1765            .unwrap_or_else(|| generate_task_id(ids_config));
1766        let now = now_ms();
1767        let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1768        let initial_status = &states_config.initial;
1769
1770        // Check phase validity
1771        if let Some(ref phase) = input.phase
1772            && let Some(warning) = phases_config.check_phase(phase)?
1773        {
1774            phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1775        }
1776
1777        let needed_tags = input.needed_tags.clone().unwrap_or_default();
1778        let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1779        let tags = input.tags.clone().unwrap_or_default();
1780
1781        // Check tag validity for all tag types
1782        for warning in tags_config.validate_tags(&tags)? {
1783            tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1784        }
1785        for warning in tags_config.validate_tags(&needed_tags)? {
1786            tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1787        }
1788        for warning in tags_config.validate_tags(&wanted_tags)? {
1789            tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1790        }
1791
1792        let needed_tags_json = serde_json::to_string(&needed_tags)?;
1793        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1794        let tags_json = serde_json::to_string(&tags)?;
1795
1796        conn.execute(
1797            "INSERT INTO tasks (
1798                id, title, description, status, phase, priority,
1799                needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1800            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1801            params![
1802                &task_id,
1803                &input.title,
1804                &input.description,
1805                initial_status,
1806                &input.phase,
1807                priority.to_string(),
1808                needed_tags_json,
1809                wanted_tags_json,
1810                tags_json,
1811                input.points,
1812                input.time_estimate_ms,
1813                now,
1814                now,
1815            ],
1816        )?;
1817
1818        // Record initial state transition
1819        record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1820
1821        // Sync tags to junction tables for indexed lookups
1822        sync_task_tags(conn, &task_id, &tags)?;
1823        sync_needed_tags(conn, &task_id, &needed_tags)?;
1824        sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1825
1826        task_id
1827    };
1828
1829    // Create dependency from parent if child_type is specified
1830    if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1831        Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1832    }
1833
1834    // Create dependency from previous sibling if sibling_type is specified
1835    if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1836        Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1837    }
1838
1839    all_ids.push(task_id.clone());
1840
1841    // Create children with dependencies based on child_type and sibling_type
1842    let mut prev_child_id: Option<String> = None;
1843    for child in input.children.iter() {
1844        let child_id = create_tree_recursive(
1845            conn,
1846            child,
1847            Some(&task_id),
1848            prev_child_id.as_deref(),
1849            child_type,
1850            sibling_type,
1851            all_ids,
1852            phase_warnings,
1853            tag_warnings,
1854            states_config,
1855            phases_config,
1856            tags_config,
1857            ids_config,
1858        )?;
1859        prev_child_id = Some(child_id);
1860    }
1861
1862    Ok(task_id)
1863}