Skip to main content

task_graph_mcp/db/
tasks.rs

1//! Task CRUD and tree operations.
2
3use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{AutoAdvanceConfig, DependenciesConfig, StatesConfig};
6use crate::types::{
7    PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
8    parse_priority,
9};
10use anyhow::{Result, anyhow};
11use rusqlite::{Connection, Row, params};
12use uuid::Uuid;
13
14/// Build an ORDER BY clause from sort_by and sort_order parameters.
15/// Returns a safe SQL ORDER BY expression.
16fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
17    let field = match sort_by {
18        Some("priority") => "CAST(t.priority AS INTEGER)",
19        Some("created_at") => "t.created_at",
20        Some("updated_at") => "t.updated_at",
21        _ => "t.created_at", // default
22    };
23
24    let order = match sort_order {
25        Some("asc") => "ASC",
26        Some("desc") => "DESC",
27        _ => {
28            // Default: priority is descending (higher number = more important), dates are descending
29            "DESC"
30        }
31    };
32
33    format!("{} {}", field, order)
34}
35
36// =============================================================================
37// Junction table helpers for tag management
38// =============================================================================
39
40/// Sync task tags to the task_tags junction table.
41/// Replaces all existing tags for the task.
42fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
43    conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
44    for tag in tags {
45        conn.execute(
46            "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
47            params![task_id, tag],
48        )?;
49    }
50    Ok(())
51}
52
53/// Sync needed tags (agent must have ALL) to the task_needed_tags junction table.
54fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
55    conn.execute(
56        "DELETE FROM task_needed_tags WHERE task_id = ?1",
57        params![task_id],
58    )?;
59    for tag in tags {
60        conn.execute(
61            "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
62            params![task_id, tag],
63        )?;
64    }
65    Ok(())
66}
67
68/// Sync wanted tags (agent must have ANY) to the task_wanted_tags junction table.
69fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
70    conn.execute(
71        "DELETE FROM task_wanted_tags WHERE task_id = ?1",
72        params![task_id],
73    )?;
74    for tag in tags {
75        conn.execute(
76            "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
77            params![task_id, tag],
78        )?;
79    }
80    Ok(())
81}
82
83pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
84    let id: String = row.get("id")?;
85    let title: String = row.get("title")?;
86    let description: Option<String> = row.get("description")?;
87    let status: String = row.get("status")?;
88    let priority: String = row.get("priority")?;
89    let worker_id: Option<String> = row.get("worker_id")?;
90    let claimed_at: Option<i64> = row.get("claimed_at")?;
91
92    let needed_tags_json: Option<String> = row.get("needed_tags")?;
93    let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
94    let tags_json: Option<String> = row.get("tags")?;
95
96    let points: Option<i32> = row.get("points")?;
97    let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
98    let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
99    let started_at: Option<i64> = row.get("started_at")?;
100    let completed_at: Option<i64> = row.get("completed_at")?;
101
102    let current_thought: Option<String> = row.get("current_thought")?;
103
104    let cost_usd: f64 = row.get("cost_usd")?;
105    let metric_0: i64 = row.get("metric_0")?;
106    let metric_1: i64 = row.get("metric_1")?;
107    let metric_2: i64 = row.get("metric_2")?;
108    let metric_3: i64 = row.get("metric_3")?;
109    let metric_4: i64 = row.get("metric_4")?;
110    let metric_5: i64 = row.get("metric_5")?;
111    let metric_6: i64 = row.get("metric_6")?;
112    let metric_7: i64 = row.get("metric_7")?;
113
114    let created_at: i64 = row.get("created_at")?;
115    let updated_at: i64 = row.get("updated_at")?;
116
117    Ok(Task {
118        id,
119        title,
120        description,
121        status,
122        priority: parse_priority(&priority),
123        worker_id,
124        claimed_at,
125        needed_tags: needed_tags_json
126            .map(|s| serde_json::from_str(&s).unwrap_or_default())
127            .unwrap_or_default(),
128        wanted_tags: wanted_tags_json
129            .map(|s| serde_json::from_str(&s).unwrap_or_default())
130            .unwrap_or_default(),
131        tags: tags_json
132            .map(|s| serde_json::from_str(&s).unwrap_or_default())
133            .unwrap_or_default(),
134        points,
135        time_estimate_ms,
136        time_actual_ms,
137        started_at,
138        completed_at,
139        current_thought,
140        cost_usd,
141        metrics: [
142            metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
143        ],
144        created_at,
145        updated_at,
146    })
147}
148
149/// Internal helper to get a task using an existing connection (avoids deadlock).
150fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
151    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
152
153    let result = stmt.query_row(params![task_id], parse_task_row);
154
155    match result {
156        Ok(task) => Ok(Some(task)),
157        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
158        Err(e) => Err(e.into()),
159    }
160}
161
162/// Internal helper to get a worker using an existing connection (avoids deadlock).
163fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
164    let mut stmt = conn.prepare(
165        "SELECT id, tags, max_claims, registered_at, last_heartbeat
166         FROM workers WHERE id = ?1",
167    )?;
168
169    let result = stmt.query_row(params![worker_id], |row| {
170        let id: String = row.get(0)?;
171        let tags_json: String = row.get(1)?;
172        let max_claims: i32 = row.get(2)?;
173        let registered_at: i64 = row.get(3)?;
174        let last_heartbeat: i64 = row.get(4)?;
175
176        Ok((id, tags_json, max_claims, registered_at, last_heartbeat))
177    });
178
179    match result {
180        Ok((id, tags_json, max_claims, registered_at, last_heartbeat)) => {
181            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
182            Ok(Some(Worker {
183                id,
184                tags,
185                max_claims,
186                registered_at,
187                last_heartbeat,
188            }))
189        }
190        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
191        Err(e) => Err(e.into()),
192    }
193}
194
195impl Database {
196    /// Create a new task.
197    /// If id is provided, uses it as the task ID; otherwise generates UUID7.
198    /// If parent_id is provided, creates a 'contains' dependency from parent to this task.
199    #[allow(clippy::too_many_arguments)]
200    pub fn create_task(
201        &self,
202        id: Option<String>,
203        description: String,
204        parent_id: Option<String>,
205        priority: Option<Priority>,
206        points: Option<i32>,
207        time_estimate_ms: Option<i64>,
208        agent_tags_all: Option<Vec<String>>,
209        agent_tags_any: Option<Vec<String>>,
210        tags: Option<Vec<String>>,
211        states_config: &StatesConfig,
212    ) -> Result<Task> {
213        let task_id = id.unwrap_or_else(|| Uuid::now_v7().to_string());
214        let now = now_ms();
215        let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
216        let initial_status = &states_config.initial;
217
218        let needed_tags = agent_tags_all.unwrap_or_default();
219        let wanted_tags = agent_tags_any.unwrap_or_default();
220        let tags = tags.unwrap_or_default();
221        let needed_tags_json = serde_json::to_string(&needed_tags)?;
222        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
223        let tags_json = serde_json::to_string(&tags)?;
224
225        self.with_conn_mut(|conn| {
226            let tx = conn.transaction()?;
227
228            tx.execute(
229                "INSERT INTO tasks (
230                    id, title, description, status, priority,
231                    needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
232                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
233                params![
234                    &task_id,
235                    &description, // Use description as title
236                    &description, // Also store as description
237                    initial_status,
238                    priority.to_string(),
239                    needed_tags_json,
240                    wanted_tags_json,
241                    tags_json,
242                    points,
243                    time_estimate_ms,
244                    now,
245                    now,
246                ],
247            )?;
248
249            // Sync tags to junction tables
250            sync_task_tags(&tx, &task_id, &tags)?;
251            sync_needed_tags(&tx, &task_id, &needed_tags)?;
252            sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
253
254            // Create 'contains' dependency if parent_id is provided
255            if let Some(ref pid) = parent_id {
256                Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
257            }
258
259            // Record initial state
260            record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
261
262            tx.commit()?;
263
264            Ok(Task {
265                id: task_id,
266                title: description.clone(),
267                description: Some(description),
268                status: initial_status.clone(),
269                priority,
270                worker_id: None,
271                claimed_at: None,
272                needed_tags,
273                wanted_tags,
274                tags,
275                points,
276                time_estimate_ms,
277                time_actual_ms: None,
278                started_at: None,
279                completed_at: None,
280                current_thought: None,
281                cost_usd: 0.0,
282                metrics: [0; 8],
283                created_at: now,
284                updated_at: now,
285            })
286        })
287    }
288
289    /// Create a task tree from nested input.
290    /// Uses child_type for parent-child dependencies (default: "contains").
291    /// Uses sibling_type for sibling dependencies (default: none/parallel).
292    pub fn create_task_tree(
293        &self,
294        input: TaskTreeInput,
295        parent_id: Option<String>,
296        child_type: Option<String>,
297        sibling_type: Option<String>,
298        states_config: &StatesConfig,
299    ) -> Result<(String, Vec<String>)> {
300        let mut all_ids = Vec::new();
301        // Default child_type to "contains" if not specified
302        let child_type = child_type.or_else(|| Some("contains".to_string()));
303
304        self.with_conn_mut(|conn| {
305            let tx = conn.transaction()?;
306            let root_id = create_tree_recursive(
307                &tx,
308                &input,
309                parent_id.as_deref(),
310                None, // no previous sibling for root
311                child_type.as_deref(),
312                sibling_type.as_deref(),
313                &mut all_ids,
314                states_config,
315            )?;
316            tx.commit()?;
317            Ok((root_id, all_ids))
318        })
319    }
320
321    /// Get a task by ID.
322    pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
323        self.with_conn(|conn| {
324            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
325
326            let result = stmt.query_row(params![task_id], parse_task_row);
327
328            match result {
329                Ok(task) => Ok(Some(task)),
330                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
331                Err(e) => Err(e.into()),
332            }
333        })
334    }
335
336    /// Get a task with all its children (tree).
337    pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
338        let task = self.get_task(task_id)?;
339        match task {
340            None => Ok(None),
341            Some(task) => {
342                let children = self.get_children_recursive(&task.id)?;
343                Ok(Some(TaskTree { task, children }))
344            }
345        }
346    }
347
348    /// Get children recursively.
349    fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
350        let children = self.get_children(parent_id)?;
351        let mut result = Vec::new();
352
353        for child in children {
354            let child_children = self.get_children_recursive(&child.id)?;
355            result.push(TaskTree {
356                task: child,
357                children: child_children,
358            });
359        }
360
361        Ok(result)
362    }
363
364    /// Get direct children of a task (via 'contains' dependency).
365    pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
366        self.with_conn(|conn| {
367            let mut stmt = conn.prepare(
368                "SELECT t.* FROM tasks t
369                 INNER JOIN dependencies d ON t.id = d.to_task_id
370                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
371                 ORDER BY t.created_at",
372            )?;
373
374            let tasks = stmt
375                .query_map(params![parent_id], parse_task_row)?
376                .filter_map(|r| r.ok())
377                .collect();
378
379            Ok(tasks)
380        })
381    }
382
383    /// Update a task.
384    #[allow(clippy::too_many_arguments)]
385    pub fn update_task(
386        &self,
387        task_id: &str,
388        title: Option<String>,
389        description: Option<Option<String>>,
390        status: Option<String>,
391        priority: Option<Priority>,
392        points: Option<Option<i32>>,
393        tags: Option<Vec<String>>,
394        states_config: &StatesConfig,
395    ) -> Result<Task> {
396        let now = now_ms();
397
398        self.with_conn(|conn| {
399            let task =
400                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
401
402            let new_title = title.unwrap_or(task.title.clone());
403            let new_description = description.unwrap_or(task.description.clone());
404            let new_status = status.unwrap_or(task.status.clone());
405            let new_priority = priority.unwrap_or(task.priority);
406            let new_points = points.unwrap_or(task.points);
407            let new_tags = tags.unwrap_or(task.tags.clone());
408
409            // Validate the new status exists
410            if !states_config.is_valid_state(&new_status) {
411                return Err(anyhow!(
412                    "Invalid state '{}'. Valid states: {:?}",
413                    new_status,
414                    states_config.state_names()
415                ));
416            }
417
418            // Validate state transition if status changed
419            if task.status != new_status
420                && !states_config.is_valid_transition(&task.status, &new_status) {
421                    let exits = states_config.get_exits(&task.status);
422                    return Err(anyhow!(
423                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
424                        task.status,
425                        new_status,
426                        exits
427                    ));
428                }
429
430            // Handle status transitions for timestamps
431            // Set started_at when first entering a timed state
432            let started_at =
433                if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
434                    Some(now)
435                } else {
436                    task.started_at
437                };
438
439            // Set completed_at when entering a terminal state
440            let completed_at = if states_config.is_terminal_state(&new_status) {
441                Some(now)
442            } else {
443                task.completed_at
444            };
445
446            // Record state transition if status changed (handles time accumulation)
447            if task.status != new_status {
448                record_state_transition(
449                    conn,
450                    task_id,
451                    &new_status,
452                    task.worker_id.as_deref(),
453                    None,
454                    states_config,
455                )?;
456            }
457
458            conn.execute(
459                "UPDATE tasks SET
460                    title = ?1, description = ?2, status = ?3, priority = ?4,
461                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
462                    tags = ?9
463                WHERE id = ?10",
464                params![
465                    new_title,
466                    new_description,
467                    new_status,
468                    new_priority.to_string(),
469                    new_points,
470                    started_at,
471                    completed_at,
472                    now,
473                    serde_json::to_string(&new_tags)?,
474                    task_id,
475                ],
476            )?;
477
478            Ok(Task {
479                id: task_id.to_string(),
480                title: new_title,
481                description: new_description,
482                status: new_status,
483                priority: new_priority,
484                points: new_points,
485                tags: new_tags,
486                started_at,
487                completed_at,
488                updated_at: now,
489                ..task
490            })
491        })
492    }
493
494    /// Update a task with unified claim/release logic.
495    /// - Transition to timed state = CLAIM (set owner, validate tags, check limit)
496    /// - Transition from timed to non-timed = RELEASE (clear owner)
497    /// - Transition to terminal = COMPLETE (check children, release file locks)
498    /// - With assignee = ASSIGN (set owner to assignee, transition to 'assigned' state)
499    /// - Only the owner can update a claimed task (unless force=true)
500    ///
501    /// Returns (task, unblocked, auto_advanced):
502    /// - task: The updated task
503    /// - unblocked: Task IDs that are now ready (all dependencies satisfied)
504    /// - auto_advanced: Subset of unblocked that were actually transitioned
505    #[allow(clippy::too_many_arguments)]
506    pub fn update_task_unified(
507        &self,
508        task_id: &str,
509        agent_id: &str,
510        assignee: Option<&str>,
511        title: Option<String>,
512        description: Option<Option<String>>,
513        status: Option<String>,
514        priority: Option<Priority>,
515        points: Option<Option<i32>>,
516        tags: Option<Vec<String>>,
517        needed_tags: Option<Vec<String>>,
518        wanted_tags: Option<Vec<String>>,
519        time_estimate_ms: Option<i64>,
520        reason: Option<String>,
521        force: bool,
522        states_config: &StatesConfig,
523        deps_config: &DependenciesConfig,
524        auto_advance: &AutoAdvanceConfig,
525    ) -> Result<(Task, Vec<String>, Vec<String>)> {
526        let now = now_ms();
527
528        self.with_conn_mut(|conn| {
529            let tx = conn.transaction()?;
530
531            let task =
532                get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
533
534            // Owner-only validation: if task is claimed, only owner can update (unless force)
535            if let Some(ref current_owner) = task.worker_id
536                && current_owner != agent_id && !force {
537                    return Err(anyhow!(
538                        "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
539                        current_owner
540                    ));
541                }
542
543            let new_title = title.unwrap_or(task.title.clone());
544            let new_description = description.unwrap_or(task.description.clone());
545            // If assignee is set but no explicit status, default to 'assigned' state
546            let new_status = if assignee.is_some() && status.is_none() {
547                "assigned".to_string()
548            } else {
549                status.unwrap_or(task.status.clone())
550            };
551            let new_priority = priority.unwrap_or(task.priority);
552            let new_points = points.unwrap_or(task.points);
553            let new_tags = tags.unwrap_or(task.tags.clone());
554            let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
555            let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
556            let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
557
558            // Validate the new status exists
559            if !states_config.is_valid_state(&new_status) {
560                return Err(anyhow!(
561                    "Invalid state '{}'. Valid states: {:?}",
562                    new_status,
563                    states_config.state_names()
564                ));
565            }
566
567            // Validate state transition if status changed
568            if task.status != new_status
569                && !states_config.is_valid_transition(&task.status, &new_status) {
570                    let exits = states_config.get_exits(&task.status);
571                    return Err(anyhow!(
572                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
573                        task.status,
574                        new_status,
575                        exits
576                    ));
577                }
578
579            // Determine ownership changes based on state transition
580            let new_is_timed = states_config.is_timed_state(&new_status);
581            let new_is_terminal = states_config.is_terminal_state(&new_status);
582            let current_owner = task.worker_id.as_deref();
583            let is_owned_by_agent = current_owner == Some(agent_id);
584            let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
585
586            let mut new_owner: Option<String> = task.worker_id.clone();
587            let mut new_claimed_at: Option<i64> = task.claimed_at;
588
589            // ASSIGN: Push coordination - coordinator assigns task to another agent
590            // Sets owner without starting the timer (assigned state is untimed)
591            if let Some(target_agent) = assignee {
592                // Verify task is not already claimed (unless force)
593                if is_owned_by_other && !force {
594                    return Err(anyhow!(
595                        "Task is already claimed by agent '{}'. Use force=true to reassign.",
596                        current_owner.unwrap()
597                    ));
598                }
599
600                // Verify the assignee exists
601                let target = get_worker_internal(&tx, target_agent)?
602                    .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
603
604                // Check tag affinity for the assignee
605                if !task.needed_tags.is_empty() {
606                    for needed in &task.needed_tags {
607                        if !target.tags.contains(needed) {
608                            return Err(anyhow!(
609                                "Assignee '{}' missing required tag: {}",
610                                target_agent,
611                                needed
612                            ));
613                        }
614                    }
615                }
616
617                if !task.wanted_tags.is_empty() {
618                    let has_any = task
619                        .wanted_tags
620                        .iter()
621                        .any(|wanted| target.tags.contains(wanted));
622                    if !has_any {
623                        return Err(anyhow!(
624                            "Assignee '{}' has none of the wanted tags: {:?}",
625                            target_agent,
626                            task.wanted_tags
627                        ));
628                    }
629                }
630
631                // Set ownership to the assignee
632                new_owner = Some(target_agent.to_string());
633                new_claimed_at = Some(now);
634            }
635
636            // CLAIM: Transitioning to a timed state and need to take ownership
637            // This handles: non-timed -> timed, OR timed (other owner) -> timed (force claim)
638            if new_is_timed && !is_owned_by_agent {
639                // Already claimed by someone else?
640                if is_owned_by_other && !force {
641                    return Err(anyhow!(
642                        "Task is already claimed by agent '{}'",
643                        current_owner.unwrap()
644                    ));
645                }
646
647                // Check for unsatisfied blocking dependencies (skip if force)
648                if !force {
649                    let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
650                        &tx,
651                        task_id,
652                        states_config,
653                        deps_config,
654                    )?;
655                    if !unsatisfied_blockers.is_empty() {
656                        return Err(anyhow!(
657                            "Task has unsatisfied dependencies: [{}]",
658                            unsatisfied_blockers.join(", ")
659                        ));
660                    }
661                }
662
663                // Get the agent
664                let agent = get_worker_internal(&tx, agent_id)?
665                    .ok_or_else(|| anyhow!("Agent not found"))?;
666
667                // Check tag affinity - needed_tags (AND - must have ALL)
668                if !task.needed_tags.is_empty() {
669                    for needed in &task.needed_tags {
670                        if !agent.tags.contains(needed) {
671                            return Err(anyhow!("Agent missing required tag: {}", needed));
672                        }
673                    }
674                }
675
676                // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
677                if !task.wanted_tags.is_empty() {
678                    let has_any = task
679                        .wanted_tags
680                        .iter()
681                        .any(|wanted| agent.tags.contains(wanted));
682                    if !has_any {
683                        return Err(anyhow!("Agent has none of the wanted tags"));
684                    }
685                }
686
687                // Set ownership
688                new_owner = Some(agent_id.to_string());
689                new_claimed_at = Some(now);
690
691                // Refresh agent heartbeat
692                tx.execute(
693                    "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
694                    params![now, agent_id],
695                )?;
696            }
697
698            // RELEASE: Transitioning to non-timed state (but not terminal)
699            if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
700                // Verify ownership (unless force)
701                if is_owned_by_other && !force {
702                    return Err(anyhow!("Task is not owned by this agent"));
703                }
704
705                // Clear ownership
706                new_owner = None;
707                new_claimed_at = None;
708            }
709
710            // COMPLETE: Transition to terminal state
711            if new_is_terminal {
712                // Verify ownership if task was claimed (unless force)
713                if let Some(ref current_owner) = task.worker_id
714                    && current_owner != agent_id && !force {
715                        return Err(anyhow!("Task is not owned by this agent"));
716                    }
717
718                // Check for incomplete children (via 'contains' dependencies)
719                let incomplete_children: i32 = tx.query_row(
720                    "SELECT COUNT(*) FROM dependencies d
721                     INNER JOIN tasks child ON d.to_task_id = child.id
722                     WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
723                     AND child.status IN (SELECT value FROM json_each(?2))",
724                    params![
725                        task_id,
726                        serde_json::to_string(&states_config.blocking_states)?
727                    ],
728                    |row| row.get(0),
729                )?;
730
731                if incomplete_children > 0 {
732                    return Err(anyhow!(
733                        "Cannot complete task: {} child task(s) are not complete",
734                        incomplete_children
735                    ));
736                }
737
738                // Clear ownership
739                new_owner = None;
740                new_claimed_at = None;
741
742                // Release file locks associated with this task (for auto-cleanup)
743                tx.execute(
744                    "DELETE FROM file_locks WHERE task_id = ?1",
745                    params![task_id],
746                )?;
747            }
748
749            // Handle timestamps
750            let started_at =
751                if task.started_at.is_none() && new_is_timed {
752                    Some(now)
753                } else {
754                    task.started_at
755                };
756
757            let completed_at = if new_is_terminal {
758                Some(now)
759            } else {
760                task.completed_at
761            };
762
763            // Record state transition if status changed (with reason for audit)
764            let status_changed = task.status != new_status;
765            if status_changed {
766                record_state_transition(
767                    &tx,
768                    task_id,
769                    &new_status,
770                    new_owner.as_deref(),
771                    reason.as_deref(),
772                    states_config,
773                )?;
774            }
775
776            tx.execute(
777                "UPDATE tasks SET
778                    title = ?1, description = ?2, status = ?3, priority = ?4,
779                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
780                    tags = ?9, worker_id = ?10, claimed_at = ?11,
781                    needed_tags = ?12, wanted_tags = ?13, time_estimate_ms = ?14
782                WHERE id = ?15",
783                params![
784                    new_title,
785                    new_description,
786                    new_status,
787                    new_priority.to_string(),
788                    new_points,
789                    started_at,
790                    completed_at,
791                    now,
792                    serde_json::to_string(&new_tags)?,
793                    new_owner,
794                    new_claimed_at,
795                    serde_json::to_string(&new_needed_tags)?,
796                    serde_json::to_string(&new_wanted_tags)?,
797                    new_time_estimate_ms,
798                    task_id,
799                ],
800            )?;
801
802            // Sync tags to junction tables if changed
803            if new_tags != task.tags {
804                sync_task_tags(&tx, task_id, &new_tags)?;
805            }
806            if new_needed_tags != task.needed_tags {
807                sync_needed_tags(&tx, task_id, &new_needed_tags)?;
808            }
809            if new_wanted_tags != task.wanted_tags {
810                sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
811            }
812
813            // Check for unblocked tasks if this task transitioned FROM blocking TO non-blocking
814            let (unblocked, auto_advanced) = if status_changed {
815                let was_blocking = states_config.is_blocking_state(&task.status);
816                let is_blocking = states_config.is_blocking_state(&new_status);
817
818                if was_blocking && !is_blocking {
819                    super::deps::propagate_unblock_effects(
820                        &tx,
821                        task_id,
822                        Some(agent_id),
823                        states_config,
824                        deps_config,
825                        auto_advance,
826                    )?
827                } else {
828                    (vec![], vec![])
829                }
830            } else {
831                (vec![], vec![])
832            };
833
834            tx.commit()?;
835
836            Ok((Task {
837                id: task_id.to_string(),
838                title: new_title,
839                description: new_description,
840                status: new_status,
841                priority: new_priority,
842                points: new_points,
843                tags: new_tags,
844                needed_tags: new_needed_tags,
845                wanted_tags: new_wanted_tags,
846                time_estimate_ms: new_time_estimate_ms,
847                started_at,
848                completed_at,
849                updated_at: now,
850                worker_id: new_owner,
851                claimed_at: new_claimed_at,
852                ..task
853            }, unblocked, auto_advanced))
854        })
855    }
856
857    /// Delete a task (soft delete by default, hard delete with obliterate=true).
858    ///
859    /// - `worker_id`: The worker attempting to delete (required for ownership check)
860    /// - `cascade`: Whether to delete children (default: false)
861    /// - `reason`: Optional reason for deletion
862    /// - `obliterate`: If true, permanently deletes the task; if false (default), soft deletes
863    /// - `force`: If true, allows deletion even if owned by another worker
864    pub fn delete_task(
865        &self,
866        task_id: &str,
867        worker_id: &str,
868        cascade: bool,
869        reason: Option<String>,
870        obliterate: bool,
871        force: bool,
872    ) -> Result<()> {
873        let now = now_ms();
874
875        self.with_conn_mut(|conn| {
876            let tx = conn.transaction()?;
877
878            // Get the task to check ownership
879            let task = get_task_internal(&tx, task_id)?
880                .ok_or_else(|| anyhow!("Task not found"))?;
881
882            // Check ownership - reject if claimed by another worker (unless force)
883            if let Some(ref owner) = task.worker_id
884                && owner != worker_id && !force {
885                    return Err(anyhow!(
886                        "Task is claimed by worker '{}'. Use force=true to override.",
887                        owner
888                    ));
889                }
890
891            if obliterate {
892                // Hard delete - permanently remove from database
893                if cascade {
894                    // Find all descendants using recursive CTE and delete them
895                    // The CTE finds all tasks reachable via 'contains' dependencies
896                    tx.execute(
897                        "WITH RECURSIVE descendants AS (
898                            SELECT ?1 AS id
899                            UNION ALL
900                            SELECT dep.to_task_id FROM dependencies dep
901                            INNER JOIN descendants d ON dep.from_task_id = d.id
902                            WHERE dep.dep_type = 'contains'
903                        )
904                        DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
905                        params![task_id],
906                    )?;
907                } else {
908                    // Check for children via dependencies
909                    let child_count: i32 = tx.query_row(
910                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
911                        params![task_id],
912                        |row| row.get(0),
913                    )?;
914
915                    if child_count > 0 {
916                        return Err(anyhow!("Task has children; use cascade=true to delete"));
917                    }
918
919                    tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
920                }
921            } else {
922                // Soft delete - set deleted_at, deleted_by, deleted_reason
923                if cascade {
924                    // Soft delete all descendants
925                    tx.execute(
926                        "WITH RECURSIVE descendants AS (
927                            SELECT ?1 AS id
928                            UNION ALL
929                            SELECT dep.to_task_id FROM dependencies dep
930                            INNER JOIN descendants d ON dep.from_task_id = d.id
931                            WHERE dep.dep_type = 'contains'
932                        )
933                        UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
934                        WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
935                        params![task_id, now, worker_id, reason],
936                    )?;
937                } else {
938                    // Check for children via dependencies
939                    let child_count: i32 = tx.query_row(
940                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
941                        params![task_id],
942                        |row| row.get(0),
943                    )?;
944
945                    if child_count > 0 {
946                        return Err(anyhow!("Task has children; use cascade=true to delete"));
947                    }
948
949                    tx.execute(
950                        "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
951                        params![now, worker_id, reason, task_id],
952                    )?;
953                }
954            }
955
956            tx.commit()?;
957            Ok(())
958        })
959    }
960
961    /// List tasks with optional filters.
962    /// Returns full Task objects. Excludes soft-deleted tasks.
963    pub fn list_tasks(
964        &self,
965        status: Option<&str>,
966        owner: Option<&str>,
967        parent_id: Option<Option<&str>>,
968        limit: Option<i32>,
969        sort_by: Option<&str>,
970        sort_order: Option<&str>,
971    ) -> Result<Vec<Task>> {
972        self.with_conn(|conn| {
973            let mut sql = String::from(
974                "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
975            );
976            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
977
978            if let Some(s) = status {
979                sql.push_str(" AND t.status = ?");
980                params_vec.push(Box::new(s.to_string()));
981            }
982
983            if let Some(o) = owner {
984                sql.push_str(" AND t.worker_id = ?");
985                params_vec.push(Box::new(o.to_string()));
986            }
987
988            // Handle parent filtering via dependencies table
989            if let Some(p) = parent_id {
990                match p {
991                    Some(pid) => {
992                        sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
993                        params_vec.push(Box::new(pid.to_string()));
994                    }
995                    None => {
996                        // Root tasks: not contained by any other task
997                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
998                    }
999                }
1000            }
1001
1002            // Build ORDER BY clause
1003            let order_clause = build_order_clause(sort_by, sort_order);
1004            sql.push_str(&format!(" ORDER BY {}", order_clause));
1005
1006            if let Some(l) = limit {
1007                sql.push_str(&format!(" LIMIT {}", l));
1008            }
1009
1010            let params_refs: Vec<&dyn rusqlite::ToSql> =
1011                params_vec.iter().map(|b| b.as_ref()).collect();
1012
1013            let mut stmt = conn.prepare(&sql)?;
1014            let tasks = stmt
1015                .query_map(params_refs.as_slice(), parse_task_row)?
1016                .filter_map(|r| r.ok())
1017                .collect();
1018
1019            Ok(tasks)
1020        })
1021    }
1022
1023    /// Set the current thought for tasks owned by an agent.
1024    pub fn set_thought(
1025        &self,
1026        agent_id: &str,
1027        thought: Option<String>,
1028        task_ids: Option<Vec<String>>,
1029    ) -> Result<i32> {
1030        let now = now_ms();
1031
1032        self.with_conn(|conn| {
1033            let updated = if let Some(ids) = task_ids {
1034                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1035                let sql = format!(
1036                    "UPDATE tasks SET current_thought = ?, updated_at = ?
1037                     WHERE worker_id = ? AND id IN ({})",
1038                    placeholders.join(", ")
1039                );
1040
1041                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1042                params_vec.push(Box::new(thought.clone()));
1043                params_vec.push(Box::new(now));
1044                params_vec.push(Box::new(agent_id.to_string()));
1045                for id in &ids {
1046                    params_vec.push(Box::new(id.clone()));
1047                }
1048
1049                let params_refs: Vec<&dyn rusqlite::ToSql> =
1050                    params_vec.iter().map(|b| b.as_ref()).collect();
1051                conn.execute(&sql, params_refs.as_slice())?
1052            } else {
1053                conn.execute(
1054                    "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1055                    params![thought, now, agent_id],
1056                )?
1057            };
1058
1059            Ok(updated as i32)
1060        })
1061    }
1062
1063    /// Log time for a task.
1064    pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1065        let now = now_ms();
1066
1067        self.with_conn(|conn| {
1068            conn.execute(
1069                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1070                 WHERE id = ?3",
1071                params![duration_ms, now, task_id],
1072            )?;
1073
1074            let total: i64 = conn.query_row(
1075                "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1076                params![task_id],
1077                |row| row.get(0),
1078            )?;
1079
1080            Ok(total)
1081        })
1082    }
1083
1084    /// Log metrics and cost for a task.
1085    /// Values in the metrics array are aggregated (added) to existing values.
1086    pub fn log_metrics(
1087        &self,
1088        task_id: &str,
1089        cost_usd: Option<f64>,
1090        values: &[i64],
1091    ) -> Result<Task> {
1092        let now = now_ms();
1093
1094        self.with_conn(|conn| {
1095            let task =
1096                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1097
1098            // Aggregate metrics (add new values to existing)
1099            let mut new_metrics = task.metrics;
1100            for (i, &val) in values.iter().take(8).enumerate() {
1101                new_metrics[i] += val;
1102            }
1103
1104            let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1105
1106            conn.execute(
1107                "UPDATE tasks SET
1108                    metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1109                    metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1110                    cost_usd = ?9, updated_at = ?10
1111                WHERE id = ?11",
1112                params![
1113                    new_metrics[0],
1114                    new_metrics[1],
1115                    new_metrics[2],
1116                    new_metrics[3],
1117                    new_metrics[4],
1118                    new_metrics[5],
1119                    new_metrics[6],
1120                    new_metrics[7],
1121                    new_cost_usd,
1122                    now,
1123                    task_id,
1124                ],
1125            )?;
1126
1127            Ok(Task {
1128                cost_usd: new_cost_usd,
1129                metrics: new_metrics,
1130                updated_at: now,
1131                ..task
1132            })
1133        })
1134    }
1135
1136    /// Claim a task for an agent.
1137    /// Uses the first timed state (typically "in_progress") as the claiming state.
1138    pub fn claim_task(
1139        &self,
1140        task_id: &str,
1141        agent_id: &str,
1142        states_config: &StatesConfig,
1143    ) -> Result<Task> {
1144        let now = now_ms();
1145
1146        // Find the first timed state to use for claiming (typically "in_progress")
1147        let claim_status = states_config
1148            .definitions
1149            .iter()
1150            .find(|(_, def)| def.timed)
1151            .map(|(name, _)| name.as_str())
1152            .unwrap_or("in_progress");
1153
1154        self.with_conn(|conn| {
1155            // Get the task (using internal helper to avoid deadlock)
1156            let task =
1157                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1158
1159            // Check if already claimed
1160            if task.worker_id.is_some() {
1161                return Err(anyhow!("Task is already claimed"));
1162            }
1163
1164            // Validate state transition
1165            if !states_config.is_valid_transition(&task.status, claim_status) {
1166                let exits = states_config.get_exits(&task.status);
1167                return Err(anyhow!(
1168                    "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1169                    task.status,
1170                    exits
1171                ));
1172            }
1173
1174            // Get the agent (using internal helper to avoid deadlock)
1175            let agent =
1176                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1177
1178            // Check tag affinity - needed_tags (AND - must have ALL)
1179            if !task.needed_tags.is_empty() {
1180                for needed in &task.needed_tags {
1181                    if !agent.tags.contains(needed) {
1182                        return Err(anyhow!("Agent missing required tag: {}", needed));
1183                    }
1184                }
1185            }
1186
1187            // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
1188            if !task.wanted_tags.is_empty() {
1189                let has_any = task
1190                    .wanted_tags
1191                    .iter()
1192                    .any(|wanted| agent.tags.contains(wanted));
1193                if !has_any {
1194                    return Err(anyhow!("Agent has none of the wanted tags"));
1195                }
1196            }
1197
1198            conn.execute(
1199                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1200                 WHERE id = ?6",
1201                params![agent_id, now, claim_status, now, now, task_id,],
1202            )?;
1203
1204            // Record state transition (accumulates time if coming from timed state)
1205            record_state_transition(
1206                conn,
1207                task_id,
1208                claim_status,
1209                Some(agent_id),
1210                None,
1211                states_config,
1212            )?;
1213
1214            // Refresh agent heartbeat
1215            conn.execute(
1216                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1217                params![now, agent_id],
1218            )?;
1219
1220            Ok(Task {
1221                worker_id: Some(agent_id.to_string()),
1222                claimed_at: Some(now),
1223                status: claim_status.to_string(),
1224                started_at: Some(now),
1225                updated_at: now,
1226                ..task
1227            })
1228        })
1229    }
1230
1231    /// Release a task claim.
1232    pub fn release_task(
1233        &self,
1234        task_id: &str,
1235        agent_id: &str,
1236        states_config: &StatesConfig,
1237    ) -> Result<()> {
1238        let now = now_ms();
1239        let release_status = &states_config.initial;
1240
1241        self.with_conn(|conn| {
1242            let task =
1243                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1244
1245            if task.worker_id.as_deref() != Some(agent_id) {
1246                return Err(anyhow!("Task is not owned by this agent"));
1247            }
1248
1249            // Record state transition (accumulates time if coming from timed state)
1250            record_state_transition(
1251                conn,
1252                task_id,
1253                release_status,
1254                Some(agent_id),
1255                None,
1256                states_config,
1257            )?;
1258
1259            conn.execute(
1260                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1261                 WHERE id = ?3",
1262                params![release_status, now, task_id],
1263            )?;
1264
1265            Ok(())
1266        })
1267    }
1268
1269    /// Force release a task regardless of owner.
1270    pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1271        let now = now_ms();
1272        let release_status = &states_config.initial;
1273
1274        self.with_conn(|conn| {
1275            let task =
1276                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1277
1278            // Record state transition (accumulates time if coming from timed state)
1279            record_state_transition(
1280                conn,
1281                task_id,
1282                release_status,
1283                task.worker_id.as_deref(),
1284                None,
1285                states_config,
1286            )?;
1287
1288            conn.execute(
1289                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1290                 WHERE id = ?3",
1291                params![release_status, now, task_id],
1292            )?;
1293
1294            Ok(())
1295        })
1296    }
1297
1298    /// Force claim a task even if owned by another agent.
1299    pub fn force_claim_task(
1300        &self,
1301        task_id: &str,
1302        agent_id: &str,
1303        states_config: &StatesConfig,
1304    ) -> Result<Task> {
1305        let now = now_ms();
1306
1307        // Find the first timed state to use for claiming (typically "in_progress")
1308        let claim_status = states_config
1309            .definitions
1310            .iter()
1311            .find(|(_, def)| def.timed)
1312            .map(|(name, _)| name.as_str())
1313            .unwrap_or("in_progress");
1314
1315        self.with_conn(|conn| {
1316            // Get the task
1317            let task =
1318                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1319
1320            // Get the agent
1321            let agent =
1322                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1323
1324            // Check tag affinity - needed_tags (AND)
1325            if !task.needed_tags.is_empty() {
1326                for needed in &task.needed_tags {
1327                    if !agent.tags.contains(needed) {
1328                        return Err(anyhow!("Agent missing required tag: {}", needed));
1329                    }
1330                }
1331            }
1332
1333            // Check tag affinity - wanted_tags (OR)
1334            if !task.wanted_tags.is_empty() {
1335                let has_any = task
1336                    .wanted_tags
1337                    .iter()
1338                    .any(|wanted| agent.tags.contains(wanted));
1339                if !has_any {
1340                    return Err(anyhow!("Agent has none of the wanted tags"));
1341                }
1342            }
1343
1344            conn.execute(
1345                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1346                 WHERE id = ?6",
1347                params![agent_id, now, claim_status, now, now, task_id,],
1348            )?;
1349
1350            // Record state transition (accumulates time if coming from timed state)
1351            record_state_transition(
1352                conn,
1353                task_id,
1354                claim_status,
1355                Some(agent_id),
1356                None,
1357                states_config,
1358            )?;
1359
1360            // Refresh agent heartbeat
1361            conn.execute(
1362                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1363                params![now, agent_id],
1364            )?;
1365
1366            Ok(Task {
1367                worker_id: Some(agent_id.to_string()),
1368                claimed_at: Some(now),
1369                status: claim_status.to_string(),
1370                started_at: task.started_at.or(Some(now)),
1371                updated_at: now,
1372                ..task
1373            })
1374        })
1375    }
1376
1377    /// Release a task claim with a specified state.
1378    pub fn release_task_with_state(
1379        &self,
1380        task_id: &str,
1381        agent_id: &str,
1382        state: &str,
1383        states_config: &StatesConfig,
1384    ) -> Result<()> {
1385        let now = now_ms();
1386
1387        self.with_conn(|conn| {
1388            let task =
1389                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1390
1391            if task.worker_id.as_deref() != Some(agent_id) {
1392                return Err(anyhow!("Task is not owned by this agent"));
1393            }
1394
1395            // Validate state exists
1396            if !states_config.is_valid_state(state) {
1397                return Err(anyhow!(
1398                    "Invalid state '{}'. Valid states: {:?}",
1399                    state,
1400                    states_config.state_names()
1401                ));
1402            }
1403
1404            // Validate transition
1405            if !states_config.is_valid_transition(&task.status, state) {
1406                let exits = states_config.get_exits(&task.status);
1407                return Err(anyhow!(
1408                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1409                    task.status,
1410                    state,
1411                    exits
1412                ));
1413            }
1414
1415            // Set completed_at for terminal states
1416            let completed_at = if states_config.is_terminal_state(state) {
1417                Some(now)
1418            } else {
1419                None
1420            };
1421
1422            // Record state transition (accumulates time if coming from timed state)
1423            record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1424
1425            conn.execute(
1426                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1427                 WHERE id = ?4",
1428                params![state, completed_at, now, task_id],
1429            )?;
1430
1431            Ok(())
1432        })
1433    }
1434
1435    /// Force release stale claims.
1436    pub fn force_release_stale(
1437        &self,
1438        timeout_seconds: i64,
1439        states_config: &StatesConfig,
1440    ) -> Result<i32> {
1441        let now = now_ms();
1442        let cutoff = now - (timeout_seconds * 1000);
1443        let release_status = &states_config.initial;
1444
1445        self.with_conn(|conn| {
1446            let updated = conn.execute(
1447                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1448                 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1449                params![release_status, now, cutoff],
1450            )?;
1451
1452            Ok(updated as i32)
1453        })
1454    }
1455
1456    /// Complete a task and release file locks held by the agent.
1457    /// Uses "completed" state by default, which should be a terminal state.
1458    /// Checks that all children (via 'contains' dependencies) are complete.
1459    pub fn complete_task(
1460        &self,
1461        task_id: &str,
1462        agent_id: &str,
1463        states_config: &StatesConfig,
1464    ) -> Result<Task> {
1465        let now = now_ms();
1466
1467        // Find a terminal state to use (prefer "completed" if it exists)
1468        let complete_status = if states_config.definitions.contains_key("completed") {
1469            "completed"
1470        } else {
1471            // Find any terminal state
1472            states_config
1473                .definitions
1474                .iter()
1475                .find(|(_, def)| def.exits.is_empty())
1476                .map(|(name, _)| name.as_str())
1477                .unwrap_or("completed")
1478        };
1479
1480        self.with_conn_mut(|conn| {
1481            let tx = conn.transaction()?;
1482
1483            // Get the task
1484            let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1485            let task = stmt
1486                .query_row(params![task_id], parse_task_row)
1487                .map_err(|_| anyhow!("Task not found"))?;
1488            drop(stmt);
1489
1490            // Verify ownership
1491            if task.worker_id.as_deref() != Some(agent_id) {
1492                return Err(anyhow!("Task is not owned by this agent"));
1493            }
1494
1495            // Check for incomplete children (blocking completion)
1496            let incomplete_children: i32 = tx.query_row(
1497                "SELECT COUNT(*) FROM dependencies d
1498                 INNER JOIN tasks child ON d.to_task_id = child.id
1499                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1500                 AND child.status IN (SELECT value FROM json_each(?2))",
1501                params![
1502                    task_id,
1503                    serde_json::to_string(&states_config.blocking_states)?
1504                ],
1505                |row| row.get(0),
1506            )?;
1507
1508            if incomplete_children > 0 {
1509                return Err(anyhow!(
1510                    "Cannot complete task: {} child task(s) are not complete",
1511                    incomplete_children
1512                ));
1513            }
1514
1515            // Validate transition
1516            if !states_config.is_valid_transition(&task.status, complete_status) {
1517                let exits = states_config.get_exits(&task.status);
1518                return Err(anyhow!(
1519                    "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1520                    task.status,
1521                    exits
1522                ));
1523            }
1524
1525            // Record state transition (accumulates time from timed state)
1526            record_state_transition(
1527                &tx,
1528                task_id,
1529                complete_status,
1530                Some(agent_id),
1531                None,
1532                states_config,
1533            )?;
1534
1535            // Update task to completed
1536            tx.execute(
1537                "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1538                 worker_id = NULL, claimed_at = NULL
1539                 WHERE id = ?4",
1540                params![complete_status, now, now, task_id],
1541            )?;
1542
1543            // Release file locks associated with this task (for auto-cleanup)
1544            tx.execute(
1545                "DELETE FROM file_locks WHERE task_id = ?1",
1546                params![task_id],
1547            )?;
1548
1549            // Refresh agent heartbeat
1550            tx.execute(
1551                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1552                params![now, agent_id],
1553            )?;
1554
1555            tx.commit()?;
1556
1557            Ok(Task {
1558                status: complete_status.to_string(),
1559                completed_at: Some(now),
1560                updated_at: now,
1561                worker_id: None,
1562                claimed_at: None,
1563                ..task
1564            })
1565        })
1566    }
1567
1568    /// Get all tasks. Excludes soft-deleted tasks.
1569    pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1570        self.with_conn(|conn| {
1571            let mut stmt =
1572                conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1573            let tasks = stmt
1574                .query_map([], parse_task_row)?
1575                .filter_map(|r| r.ok())
1576                .collect();
1577            Ok(tasks)
1578        })
1579    }
1580
1581    /// Get tasks by status.
1582    #[allow(dead_code)]
1583    pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1584        self.with_conn(|conn| {
1585            let mut stmt =
1586                conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1587            let tasks = stmt
1588                .query_map(params![status], parse_task_row)?
1589                .filter_map(|r| r.ok())
1590                .collect();
1591            Ok(tasks)
1592        })
1593    }
1594
1595    /// Get claimed tasks. Excludes soft-deleted tasks.
1596    pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1597        self.with_conn(|conn| {
1598            let tasks = if let Some(aid) = agent_id {
1599                let mut stmt = conn
1600                    .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1601                stmt.query_map(params![aid], parse_task_row)?
1602                    .filter_map(|r| r.ok())
1603                    .collect()
1604            } else {
1605                let mut stmt = conn.prepare(
1606                    "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1607                )?;
1608                stmt.query_map([], parse_task_row)?
1609                    .filter_map(|r| r.ok())
1610                    .collect()
1611            };
1612
1613            Ok(tasks)
1614        })
1615    }
1616}
1617
1618/// Helper function to create task tree recursively within a transaction.
1619/// Creates dependencies from parent to children using child_type.
1620/// Creates dependencies between siblings using sibling_type.
1621/// Supports referencing existing tasks via ref_id.
1622#[allow(clippy::too_many_arguments)]
1623fn create_tree_recursive(
1624    conn: &Connection,
1625    input: &TaskTreeInput,
1626    parent_id: Option<&str>,
1627    prev_sibling_id: Option<&str>,
1628    child_type: Option<&str>,
1629    sibling_type: Option<&str>,
1630    all_ids: &mut Vec<String>,
1631    states_config: &StatesConfig,
1632) -> Result<String> {
1633    // Check if this node references an existing task
1634    let task_id = if let Some(ref ref_id) = input.ref_id {
1635        // Verify the referenced task exists
1636        let exists: bool = conn.query_row(
1637            "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1638            params![ref_id],
1639            |row| row.get(0),
1640        )?;
1641        if !exists {
1642            return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1643        }
1644        ref_id.clone()
1645    } else {
1646        // Create a new task
1647        let generated_id = Uuid::now_v7().to_string();
1648        let task_id = input.id.clone().unwrap_or(generated_id);
1649        let now = now_ms();
1650        let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1651        let initial_status = &states_config.initial;
1652
1653        let needed_tags = input.needed_tags.clone().unwrap_or_default();
1654        let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1655        let tags = input.tags.clone().unwrap_or_default();
1656        let needed_tags_json = serde_json::to_string(&needed_tags)?;
1657        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1658        let tags_json = serde_json::to_string(&tags)?;
1659
1660        conn.execute(
1661            "INSERT INTO tasks (
1662                id, title, description, status, priority,
1663                needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1664            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1665            params![
1666                &task_id,
1667                &input.title,
1668                &input.description,
1669                initial_status,
1670                priority.to_string(),
1671                needed_tags_json,
1672                wanted_tags_json,
1673                tags_json,
1674                input.points,
1675                input.time_estimate_ms,
1676                now,
1677                now,
1678            ],
1679        )?;
1680
1681        // Record initial state transition
1682        record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1683
1684        // Sync tags to junction tables for indexed lookups
1685        sync_task_tags(conn, &task_id, &tags)?;
1686        sync_needed_tags(conn, &task_id, &needed_tags)?;
1687        sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1688
1689        task_id
1690    };
1691
1692    // Create dependency from parent if child_type is specified
1693    if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1694        Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1695    }
1696
1697    // Create dependency from previous sibling if sibling_type is specified
1698    if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1699        Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1700    }
1701
1702    all_ids.push(task_id.clone());
1703
1704    // Create children with dependencies based on child_type and sibling_type
1705    let mut prev_child_id: Option<String> = None;
1706    for child in input.children.iter() {
1707        let child_id = create_tree_recursive(
1708            conn,
1709            child,
1710            Some(&task_id),
1711            prev_child_id.as_deref(),
1712            child_type,
1713            sibling_type,
1714            all_ids,
1715            states_config,
1716        )?;
1717        prev_child_id = Some(child_id);
1718    }
1719
1720    Ok(task_id)
1721}