Skip to main content

task_graph_mcp/db/
tasks.rs

1//! Task CRUD and tree operations.
2
3use super::state_transitions::record_state_transition;
4use super::{now_ms, Database};
5use crate::config::{AutoAdvanceConfig, DependenciesConfig, StatesConfig};
6use crate::types::{clamp_priority, parse_priority, Priority, Task, TaskTree, TaskTreeInput, Worker, PRIORITY_DEFAULT};
7use anyhow::{anyhow, Result};
8use rusqlite::{params, Connection, Row};
9use uuid::Uuid;
10
11/// Build an ORDER BY clause from sort_by and sort_order parameters.
12/// Returns a safe SQL ORDER BY expression.
13fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
14    let field = match sort_by {
15        Some("priority") => "CAST(t.priority AS INTEGER)",
16        Some("created_at") => "t.created_at",
17        Some("updated_at") => "t.updated_at",
18        _ => "t.created_at", // default
19    };
20
21    let order = match sort_order {
22        Some("asc") => "ASC",
23        Some("desc") => "DESC",
24        _ => {
25            // Default: priority is descending (higher number = more important), dates are descending
26            "DESC"
27        }
28    };
29
30    format!("{} {}", field, order)
31}
32
33// =============================================================================
34// Junction table helpers for tag management
35// =============================================================================
36
37/// Sync task tags to the task_tags junction table.
38/// Replaces all existing tags for the task.
39fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
40    conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
41    for tag in tags {
42        conn.execute(
43            "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
44            params![task_id, tag],
45        )?;
46    }
47    Ok(())
48}
49
50/// Sync needed tags (agent must have ALL) to the task_needed_tags junction table.
51fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
52    conn.execute("DELETE FROM task_needed_tags WHERE task_id = ?1", params![task_id])?;
53    for tag in tags {
54        conn.execute(
55            "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
56            params![task_id, tag],
57        )?;
58    }
59    Ok(())
60}
61
62/// Sync wanted tags (agent must have ANY) to the task_wanted_tags junction table.
63fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
64    conn.execute("DELETE FROM task_wanted_tags WHERE task_id = ?1", params![task_id])?;
65    for tag in tags {
66        conn.execute(
67            "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
68            params![task_id, tag],
69        )?;
70    }
71    Ok(())
72}
73
74pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
75    let id: String = row.get("id")?;
76    let title: String = row.get("title")?;
77    let description: Option<String> = row.get("description")?;
78    let status: String = row.get("status")?;
79    let priority: String = row.get("priority")?;
80    let worker_id: Option<String> = row.get("worker_id")?;
81    let claimed_at: Option<i64> = row.get("claimed_at")?;
82
83    let needed_tags_json: Option<String> = row.get("needed_tags")?;
84    let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
85    let tags_json: Option<String> = row.get("tags")?;
86
87    let points: Option<i32> = row.get("points")?;
88    let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
89    let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
90    let started_at: Option<i64> = row.get("started_at")?;
91    let completed_at: Option<i64> = row.get("completed_at")?;
92
93    let current_thought: Option<String> = row.get("current_thought")?;
94
95    let cost_usd: f64 = row.get("cost_usd")?;
96    let metric_0: i64 = row.get("metric_0")?;
97    let metric_1: i64 = row.get("metric_1")?;
98    let metric_2: i64 = row.get("metric_2")?;
99    let metric_3: i64 = row.get("metric_3")?;
100    let metric_4: i64 = row.get("metric_4")?;
101    let metric_5: i64 = row.get("metric_5")?;
102    let metric_6: i64 = row.get("metric_6")?;
103    let metric_7: i64 = row.get("metric_7")?;
104
105    let created_at: i64 = row.get("created_at")?;
106    let updated_at: i64 = row.get("updated_at")?;
107
108    Ok(Task {
109        id,
110        title,
111        description,
112        status,
113        priority: parse_priority(&priority),
114        worker_id,
115        claimed_at,
116        needed_tags: needed_tags_json
117            .map(|s| serde_json::from_str(&s).unwrap_or_default())
118            .unwrap_or_default(),
119        wanted_tags: wanted_tags_json
120            .map(|s| serde_json::from_str(&s).unwrap_or_default())
121            .unwrap_or_default(),
122        tags: tags_json
123            .map(|s| serde_json::from_str(&s).unwrap_or_default())
124            .unwrap_or_default(),
125        points,
126        time_estimate_ms,
127        time_actual_ms,
128        started_at,
129        completed_at,
130        current_thought,
131        cost_usd,
132        metrics: [metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7],
133        created_at,
134        updated_at,
135    })
136}
137
138/// Internal helper to get a task using an existing connection (avoids deadlock).
139fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
140    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
141
142    let result = stmt.query_row(params![task_id], parse_task_row);
143
144    match result {
145        Ok(task) => Ok(Some(task)),
146        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
147        Err(e) => Err(e.into()),
148    }
149}
150
151/// Internal helper to get a worker using an existing connection (avoids deadlock).
152fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
153    let mut stmt = conn.prepare(
154        "SELECT id, tags, max_claims, registered_at, last_heartbeat
155         FROM workers WHERE id = ?1",
156    )?;
157
158    let result = stmt.query_row(params![worker_id], |row| {
159        let id: String = row.get(0)?;
160        let tags_json: String = row.get(1)?;
161        let max_claims: i32 = row.get(2)?;
162        let registered_at: i64 = row.get(3)?;
163        let last_heartbeat: i64 = row.get(4)?;
164
165        Ok((id, tags_json, max_claims, registered_at, last_heartbeat))
166    });
167
168    match result {
169        Ok((id, tags_json, max_claims, registered_at, last_heartbeat)) => {
170            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
171            Ok(Some(Worker {
172                id,
173                tags,
174                max_claims,
175                registered_at,
176                last_heartbeat,
177            }))
178        }
179        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
180        Err(e) => Err(e.into()),
181    }
182}
183
184impl Database {
185    /// Create a new task.
186    /// If id is provided, uses it as the task ID; otherwise generates UUID7.
187    /// If parent_id is provided, creates a 'contains' dependency from parent to this task.
188    pub fn create_task(
189        &self,
190        id: Option<String>,
191        description: String,
192        parent_id: Option<String>,
193        priority: Option<Priority>,
194        points: Option<i32>,
195        time_estimate_ms: Option<i64>,
196        agent_tags_all: Option<Vec<String>>,
197        agent_tags_any: Option<Vec<String>>,
198        tags: Option<Vec<String>>,
199        states_config: &StatesConfig,
200    ) -> Result<Task> {
201        let task_id = id.unwrap_or_else(|| Uuid::now_v7().to_string());
202        let now = now_ms();
203        let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
204        let initial_status = &states_config.initial;
205
206        let needed_tags = agent_tags_all.unwrap_or_default();
207        let wanted_tags = agent_tags_any.unwrap_or_default();
208        let tags = tags.unwrap_or_default();
209        let needed_tags_json = serde_json::to_string(&needed_tags)?;
210        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
211        let tags_json = serde_json::to_string(&tags)?;
212
213        self.with_conn_mut(|conn| {
214            let tx = conn.transaction()?;
215
216            tx.execute(
217                "INSERT INTO tasks (
218                    id, title, description, status, priority,
219                    needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
220                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
221                params![
222                    &task_id,
223                    &description,  // Use description as title
224                    &description,  // Also store as description
225                    initial_status,
226                    priority.to_string(),
227                    needed_tags_json,
228                    wanted_tags_json,
229                    tags_json,
230                    points,
231                    time_estimate_ms,
232                    now,
233                    now,
234                ],
235            )?;
236
237            // Sync tags to junction tables
238            sync_task_tags(&tx, &task_id, &tags)?;
239            sync_needed_tags(&tx, &task_id, &needed_tags)?;
240            sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
241
242            // Create 'contains' dependency if parent_id is provided
243            if let Some(ref pid) = parent_id {
244                Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
245            }
246
247            // Record initial state
248            record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
249
250            tx.commit()?;
251
252            Ok(Task {
253                id: task_id,
254                title: description.clone(),
255                description: Some(description),
256                status: initial_status.clone(),
257                priority,
258                worker_id: None,
259                claimed_at: None,
260                needed_tags,
261                wanted_tags,
262                tags,
263                points,
264                time_estimate_ms,
265                time_actual_ms: None,
266                started_at: None,
267                completed_at: None,
268                current_thought: None,
269                cost_usd: 0.0,
270                metrics: [0; 8],
271                created_at: now,
272                updated_at: now,
273            })
274        })
275    }
276
277    /// Create a task tree from nested input.
278    /// Uses child_type for parent-child dependencies (default: "contains").
279    /// Uses sibling_type for sibling dependencies (default: none/parallel).
280    pub fn create_task_tree(
281        &self,
282        input: TaskTreeInput,
283        parent_id: Option<String>,
284        child_type: Option<String>,
285        sibling_type: Option<String>,
286        states_config: &StatesConfig,
287    ) -> Result<(String, Vec<String>)> {
288        let mut all_ids = Vec::new();
289        // Default child_type to "contains" if not specified
290        let child_type = child_type.or_else(|| Some("contains".to_string()));
291
292        self.with_conn_mut(|conn| {
293            let tx = conn.transaction()?;
294            let root_id = create_tree_recursive(
295                &tx,
296                &input,
297                parent_id.as_deref(),
298                None, // no previous sibling for root
299                child_type.as_deref(),
300                sibling_type.as_deref(),
301                &mut all_ids,
302                states_config,
303            )?;
304            tx.commit()?;
305            Ok((root_id, all_ids))
306        })
307    }
308
309    /// Get a task by ID.
310    pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
311        self.with_conn(|conn| {
312            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
313
314            let result = stmt.query_row(params![task_id], parse_task_row);
315
316            match result {
317                Ok(task) => Ok(Some(task)),
318                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
319                Err(e) => Err(e.into()),
320            }
321        })
322    }
323
324    /// Get a task with all its children (tree).
325    pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
326        let task = self.get_task(task_id)?;
327        match task {
328            None => Ok(None),
329            Some(task) => {
330                let children = self.get_children_recursive(&task.id)?;
331                Ok(Some(TaskTree { task, children }))
332            }
333        }
334    }
335
336    /// Get children recursively.
337    fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
338        let children = self.get_children(parent_id)?;
339        let mut result = Vec::new();
340
341        for child in children {
342            let child_children = self.get_children_recursive(&child.id)?;
343            result.push(TaskTree {
344                task: child,
345                children: child_children,
346            });
347        }
348
349        Ok(result)
350    }
351
352    /// Get direct children of a task (via 'contains' dependency).
353    pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
354        self.with_conn(|conn| {
355            let mut stmt = conn.prepare(
356                "SELECT t.* FROM tasks t
357                 INNER JOIN dependencies d ON t.id = d.to_task_id
358                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
359                 ORDER BY t.created_at",
360            )?;
361
362            let tasks = stmt
363                .query_map(params![parent_id], parse_task_row)?
364                .filter_map(|r| r.ok())
365                .collect();
366
367            Ok(tasks)
368        })
369    }
370
371    /// Update a task.
372    pub fn update_task(
373        &self,
374        task_id: &str,
375        title: Option<String>,
376        description: Option<Option<String>>,
377        status: Option<String>,
378        priority: Option<Priority>,
379        points: Option<Option<i32>>,
380        tags: Option<Vec<String>>,
381        states_config: &StatesConfig,
382    ) -> Result<Task> {
383        let now = now_ms();
384
385        self.with_conn(|conn| {
386            let task =
387                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
388
389            let new_title = title.unwrap_or(task.title.clone());
390            let new_description = description.unwrap_or(task.description.clone());
391            let new_status = status.unwrap_or(task.status.clone());
392            let new_priority = priority.unwrap_or(task.priority);
393            let new_points = points.unwrap_or(task.points);
394            let new_tags = tags.unwrap_or(task.tags.clone());
395
396            // Validate the new status exists
397            if !states_config.is_valid_state(&new_status) {
398                return Err(anyhow!(
399                    "Invalid state '{}'. Valid states: {:?}",
400                    new_status,
401                    states_config.state_names()
402                ));
403            }
404
405            // Validate state transition if status changed
406            if task.status != new_status {
407                if !states_config.is_valid_transition(&task.status, &new_status) {
408                    let exits = states_config.get_exits(&task.status);
409                    return Err(anyhow!(
410                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
411                        task.status,
412                        new_status,
413                        exits
414                    ));
415                }
416            }
417
418            // Handle status transitions for timestamps
419            // Set started_at when first entering a timed state
420            let started_at =
421                if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
422                    Some(now)
423                } else {
424                    task.started_at
425                };
426
427            // Set completed_at when entering a terminal state
428            let completed_at = if states_config.is_terminal_state(&new_status) {
429                Some(now)
430            } else {
431                task.completed_at
432            };
433
434            // Record state transition if status changed (handles time accumulation)
435            if task.status != new_status {
436                record_state_transition(
437                    conn,
438                    task_id,
439                    &new_status,
440                    task.worker_id.as_deref(),
441                    None,
442                    states_config,
443                )?;
444            }
445
446            conn.execute(
447                "UPDATE tasks SET
448                    title = ?1, description = ?2, status = ?3, priority = ?4,
449                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
450                    tags = ?9
451                WHERE id = ?10",
452                params![
453                    new_title,
454                    new_description,
455                    new_status,
456                    new_priority.to_string(),
457                    new_points,
458                    started_at,
459                    completed_at,
460                    now,
461                    serde_json::to_string(&new_tags)?,
462                    task_id,
463                ],
464            )?;
465
466            Ok(Task {
467                id: task_id.to_string(),
468                title: new_title,
469                description: new_description,
470                status: new_status,
471                priority: new_priority,
472                points: new_points,
473                tags: new_tags,
474                started_at,
475                completed_at,
476                updated_at: now,
477                ..task
478            })
479        })
480    }
481
482
483    /// Update a task with unified claim/release logic.
484    /// - Transition to timed state = CLAIM (set owner, validate tags, check limit)
485    /// - Transition from timed to non-timed = RELEASE (clear owner)
486    /// - Transition to terminal = COMPLETE (check children, release file locks)
487    /// - With assignee = ASSIGN (set owner to assignee, transition to 'assigned' state)
488    /// - Only the owner can update a claimed task (unless force=true)
489    ///
490    /// Returns (task, unblocked, auto_advanced):
491    /// - task: The updated task
492    /// - unblocked: Task IDs that are now ready (all dependencies satisfied)
493    /// - auto_advanced: Subset of unblocked that were actually transitioned
494    #[allow(clippy::too_many_arguments)]
495    pub fn update_task_unified(
496        &self,
497        task_id: &str,
498        agent_id: &str,
499        assignee: Option<&str>,
500        title: Option<String>,
501        description: Option<Option<String>>,
502        status: Option<String>,
503        priority: Option<Priority>,
504        points: Option<Option<i32>>,
505        tags: Option<Vec<String>>,
506        needed_tags: Option<Vec<String>>,
507        wanted_tags: Option<Vec<String>>,
508        time_estimate_ms: Option<i64>,
509        reason: Option<String>,
510        force: bool,
511        states_config: &StatesConfig,
512        deps_config: &DependenciesConfig,
513        auto_advance: &AutoAdvanceConfig,
514    ) -> Result<(Task, Vec<String>, Vec<String>)> {
515        let now = now_ms();
516
517        self.with_conn_mut(|conn| {
518            let tx = conn.transaction()?;
519
520            let task =
521                get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
522
523            // Owner-only validation: if task is claimed, only owner can update (unless force)
524            if let Some(ref current_owner) = task.worker_id {
525                if current_owner != agent_id && !force {
526                    return Err(anyhow!(
527                        "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
528                        current_owner
529                    ));
530                }
531            }
532
533            let new_title = title.unwrap_or(task.title.clone());
534            let new_description = description.unwrap_or(task.description.clone());
535            // If assignee is set but no explicit status, default to 'assigned' state
536            let new_status = if assignee.is_some() && status.is_none() {
537                "assigned".to_string()
538            } else {
539                status.unwrap_or(task.status.clone())
540            };
541            let new_priority = priority.unwrap_or(task.priority);
542            let new_points = points.unwrap_or(task.points);
543            let new_tags = tags.unwrap_or(task.tags.clone());
544            let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
545            let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
546            let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
547
548            // Validate the new status exists
549            if !states_config.is_valid_state(&new_status) {
550                return Err(anyhow!(
551                    "Invalid state '{}'. Valid states: {:?}",
552                    new_status,
553                    states_config.state_names()
554                ));
555            }
556
557            // Validate state transition if status changed
558            if task.status != new_status {
559                if !states_config.is_valid_transition(&task.status, &new_status) {
560                    let exits = states_config.get_exits(&task.status);
561                    return Err(anyhow!(
562                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
563                        task.status,
564                        new_status,
565                        exits
566                    ));
567                }
568            }
569
570            // Determine ownership changes based on state transition
571            let new_is_timed = states_config.is_timed_state(&new_status);
572            let new_is_terminal = states_config.is_terminal_state(&new_status);
573            let current_owner = task.worker_id.as_deref();
574            let is_owned_by_agent = current_owner == Some(agent_id);
575            let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
576
577            let mut new_owner: Option<String> = task.worker_id.clone();
578            let mut new_claimed_at: Option<i64> = task.claimed_at;
579
580            // ASSIGN: Push coordination - coordinator assigns task to another agent
581            // Sets owner without starting the timer (assigned state is untimed)
582            if let Some(target_agent) = assignee {
583                // Verify task is not already claimed (unless force)
584                if is_owned_by_other && !force {
585                    return Err(anyhow!(
586                        "Task is already claimed by agent '{}'. Use force=true to reassign.",
587                        current_owner.unwrap()
588                    ));
589                }
590
591                // Verify the assignee exists
592                let target = get_worker_internal(&tx, target_agent)?
593                    .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
594
595                // Check tag affinity for the assignee
596                if !task.needed_tags.is_empty() {
597                    for needed in &task.needed_tags {
598                        if !target.tags.contains(needed) {
599                            return Err(anyhow!(
600                                "Assignee '{}' missing required tag: {}",
601                                target_agent,
602                                needed
603                            ));
604                        }
605                    }
606                }
607
608                if !task.wanted_tags.is_empty() {
609                    let has_any = task
610                        .wanted_tags
611                        .iter()
612                        .any(|wanted| target.tags.contains(wanted));
613                    if !has_any {
614                        return Err(anyhow!(
615                            "Assignee '{}' has none of the wanted tags: {:?}",
616                            target_agent,
617                            task.wanted_tags
618                        ));
619                    }
620                }
621
622                // Set ownership to the assignee
623                new_owner = Some(target_agent.to_string());
624                new_claimed_at = Some(now);
625            }
626
627            // CLAIM: Transitioning to a timed state and need to take ownership
628            // This handles: non-timed -> timed, OR timed (other owner) -> timed (force claim)
629            if new_is_timed && !is_owned_by_agent {
630                // Already claimed by someone else?
631                if is_owned_by_other && !force {
632                    return Err(anyhow!(
633                        "Task is already claimed by agent '{}'",
634                        current_owner.unwrap()
635                    ));
636                }
637
638                // Check for unsatisfied blocking dependencies (skip if force)
639                if !force {
640                    let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
641                        &tx,
642                        task_id,
643                        states_config,
644                        deps_config,
645                    )?;
646                    if !unsatisfied_blockers.is_empty() {
647                        return Err(anyhow!(
648                            "Task has unsatisfied dependencies: [{}]",
649                            unsatisfied_blockers.join(", ")
650                        ));
651                    }
652                }
653
654                // Get the agent
655                let agent = get_worker_internal(&tx, agent_id)?
656                    .ok_or_else(|| anyhow!("Agent not found"))?;
657
658                // Check tag affinity - needed_tags (AND - must have ALL)
659                if !task.needed_tags.is_empty() {
660                    for needed in &task.needed_tags {
661                        if !agent.tags.contains(needed) {
662                            return Err(anyhow!("Agent missing required tag: {}", needed));
663                        }
664                    }
665                }
666
667                // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
668                if !task.wanted_tags.is_empty() {
669                    let has_any = task
670                        .wanted_tags
671                        .iter()
672                        .any(|wanted| agent.tags.contains(wanted));
673                    if !has_any {
674                        return Err(anyhow!("Agent has none of the wanted tags"));
675                    }
676                }
677
678                // Set ownership
679                new_owner = Some(agent_id.to_string());
680                new_claimed_at = Some(now);
681
682                // Refresh agent heartbeat
683                tx.execute(
684                    "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
685                    params![now, agent_id],
686                )?;
687            }
688
689            // RELEASE: Transitioning to non-timed state (but not terminal)
690            if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
691                // Verify ownership (unless force)
692                if is_owned_by_other && !force {
693                    return Err(anyhow!("Task is not owned by this agent"));
694                }
695
696                // Clear ownership
697                new_owner = None;
698                new_claimed_at = None;
699            }
700
701            // COMPLETE: Transition to terminal state
702            if new_is_terminal {
703                // Verify ownership if task was claimed (unless force)
704                if let Some(ref current_owner) = task.worker_id {
705                    if current_owner != agent_id && !force {
706                        return Err(anyhow!("Task is not owned by this agent"));
707                    }
708                }
709
710                // Check for incomplete children (via 'contains' dependencies)
711                let incomplete_children: i32 = tx.query_row(
712                    "SELECT COUNT(*) FROM dependencies d
713                     INNER JOIN tasks child ON d.to_task_id = child.id
714                     WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
715                     AND child.status IN (SELECT value FROM json_each(?2))",
716                    params![
717                        task_id,
718                        serde_json::to_string(&states_config.blocking_states)?
719                    ],
720                    |row| row.get(0),
721                )?;
722
723                if incomplete_children > 0 {
724                    return Err(anyhow!(
725                        "Cannot complete task: {} child task(s) are not complete",
726                        incomplete_children
727                    ));
728                }
729
730                // Clear ownership
731                new_owner = None;
732                new_claimed_at = None;
733
734                // Release file locks associated with this task (for auto-cleanup)
735                tx.execute(
736                    "DELETE FROM file_locks WHERE task_id = ?1",
737                    params![task_id],
738                )?;
739            }
740
741            // Handle timestamps
742            let started_at =
743                if task.started_at.is_none() && new_is_timed {
744                    Some(now)
745                } else {
746                    task.started_at
747                };
748
749            let completed_at = if new_is_terminal {
750                Some(now)
751            } else {
752                task.completed_at
753            };
754
755            // Record state transition if status changed (with reason for audit)
756            let status_changed = task.status != new_status;
757            if status_changed {
758                record_state_transition(
759                    &tx,
760                    task_id,
761                    &new_status,
762                    new_owner.as_deref(),
763                    reason.as_deref(),
764                    states_config,
765                )?;
766            }
767
768            tx.execute(
769                "UPDATE tasks SET
770                    title = ?1, description = ?2, status = ?3, priority = ?4,
771                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
772                    tags = ?9, worker_id = ?10, claimed_at = ?11,
773                    needed_tags = ?12, wanted_tags = ?13, time_estimate_ms = ?14
774                WHERE id = ?15",
775                params![
776                    new_title,
777                    new_description,
778                    new_status,
779                    new_priority.to_string(),
780                    new_points,
781                    started_at,
782                    completed_at,
783                    now,
784                    serde_json::to_string(&new_tags)?,
785                    new_owner,
786                    new_claimed_at,
787                    serde_json::to_string(&new_needed_tags)?,
788                    serde_json::to_string(&new_wanted_tags)?,
789                    new_time_estimate_ms,
790                    task_id,
791                ],
792            )?;
793
794            // Sync tags to junction tables if changed
795            if new_tags != task.tags {
796                sync_task_tags(&tx, task_id, &new_tags)?;
797            }
798            if new_needed_tags != task.needed_tags {
799                sync_needed_tags(&tx, task_id, &new_needed_tags)?;
800            }
801            if new_wanted_tags != task.wanted_tags {
802                sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
803            }
804
805            // Check for unblocked tasks if this task transitioned FROM blocking TO non-blocking
806            let (unblocked, auto_advanced) = if status_changed {
807                let was_blocking = states_config.is_blocking_state(&task.status);
808                let is_blocking = states_config.is_blocking_state(&new_status);
809                
810                if was_blocking && !is_blocking {
811                    super::deps::propagate_unblock_effects(
812                        &tx,
813                        task_id,
814                        Some(agent_id),
815                        states_config,
816                        deps_config,
817                        auto_advance,
818                    )?
819                } else {
820                    (vec![], vec![])
821                }
822            } else {
823                (vec![], vec![])
824            };
825
826            tx.commit()?;
827
828            Ok((Task {
829                id: task_id.to_string(),
830                title: new_title,
831                description: new_description,
832                status: new_status,
833                priority: new_priority,
834                points: new_points,
835                tags: new_tags,
836                needed_tags: new_needed_tags,
837                wanted_tags: new_wanted_tags,
838                time_estimate_ms: new_time_estimate_ms,
839                started_at,
840                completed_at,
841                updated_at: now,
842                worker_id: new_owner,
843                claimed_at: new_claimed_at,
844                ..task
845            }, unblocked, auto_advanced))
846        })
847    }
848
849    /// Delete a task (soft delete by default, hard delete with obliterate=true).
850    ///
851    /// - `worker_id`: The worker attempting to delete (required for ownership check)
852    /// - `cascade`: Whether to delete children (default: false)
853    /// - `reason`: Optional reason for deletion
854    /// - `obliterate`: If true, permanently deletes the task; if false (default), soft deletes
855    /// - `force`: If true, allows deletion even if owned by another worker
856    pub fn delete_task(
857        &self,
858        task_id: &str,
859        worker_id: &str,
860        cascade: bool,
861        reason: Option<String>,
862        obliterate: bool,
863        force: bool,
864    ) -> Result<()> {
865        let now = now_ms();
866
867        self.with_conn_mut(|conn| {
868            let tx = conn.transaction()?;
869
870            // Get the task to check ownership
871            let task = get_task_internal(&tx, task_id)?
872                .ok_or_else(|| anyhow!("Task not found"))?;
873
874            // Check ownership - reject if claimed by another worker (unless force)
875            if let Some(ref owner) = task.worker_id {
876                if owner != worker_id && !force {
877                    return Err(anyhow!(
878                        "Task is claimed by worker '{}'. Use force=true to override.",
879                        owner
880                    ));
881                }
882            }
883
884            if obliterate {
885                // Hard delete - permanently remove from database
886                if cascade {
887                    // Find all descendants using recursive CTE and delete them
888                    // The CTE finds all tasks reachable via 'contains' dependencies
889                    tx.execute(
890                        "WITH RECURSIVE descendants AS (
891                            SELECT ?1 AS id
892                            UNION ALL
893                            SELECT dep.to_task_id FROM dependencies dep
894                            INNER JOIN descendants d ON dep.from_task_id = d.id
895                            WHERE dep.dep_type = 'contains'
896                        )
897                        DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
898                        params![task_id],
899                    )?;
900                } else {
901                    // Check for children via dependencies
902                    let child_count: i32 = tx.query_row(
903                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
904                        params![task_id],
905                        |row| row.get(0),
906                    )?;
907
908                    if child_count > 0 {
909                        return Err(anyhow!("Task has children; use cascade=true to delete"));
910                    }
911
912                    tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
913                }
914            } else {
915                // Soft delete - set deleted_at, deleted_by, deleted_reason
916                if cascade {
917                    // Soft delete all descendants
918                    tx.execute(
919                        "WITH RECURSIVE descendants AS (
920                            SELECT ?1 AS id
921                            UNION ALL
922                            SELECT dep.to_task_id FROM dependencies dep
923                            INNER JOIN descendants d ON dep.from_task_id = d.id
924                            WHERE dep.dep_type = 'contains'
925                        )
926                        UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
927                        WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
928                        params![task_id, now, worker_id, reason],
929                    )?;
930                } else {
931                    // Check for children via dependencies
932                    let child_count: i32 = tx.query_row(
933                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
934                        params![task_id],
935                        |row| row.get(0),
936                    )?;
937
938                    if child_count > 0 {
939                        return Err(anyhow!("Task has children; use cascade=true to delete"));
940                    }
941
942                    tx.execute(
943                        "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
944                        params![now, worker_id, reason, task_id],
945                    )?;
946                }
947            }
948
949            tx.commit()?;
950            Ok(())
951        })
952    }
953
954    /// List tasks with optional filters.
955    /// Returns full Task objects. Excludes soft-deleted tasks.
956    pub fn list_tasks(
957        &self,
958        status: Option<&str>,
959        owner: Option<&str>,
960        parent_id: Option<Option<&str>>,
961        limit: Option<i32>,
962        sort_by: Option<&str>,
963        sort_order: Option<&str>,
964    ) -> Result<Vec<Task>> {
965        self.with_conn(|conn| {
966            let mut sql = String::from(
967                "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
968            );
969            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
970
971            if let Some(s) = status {
972                sql.push_str(" AND t.status = ?");
973                params_vec.push(Box::new(s.to_string()));
974            }
975
976            if let Some(o) = owner {
977                sql.push_str(" AND t.worker_id = ?");
978                params_vec.push(Box::new(o.to_string()));
979            }
980
981            // Handle parent filtering via dependencies table
982            if let Some(p) = parent_id {
983                match p {
984                    Some(pid) => {
985                        sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
986                        params_vec.push(Box::new(pid.to_string()));
987                    }
988                    None => {
989                        // Root tasks: not contained by any other task
990                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
991                    }
992                }
993            }
994
995            // Build ORDER BY clause
996            let order_clause = build_order_clause(sort_by, sort_order);
997            sql.push_str(&format!(" ORDER BY {}", order_clause));
998
999            if let Some(l) = limit {
1000                sql.push_str(&format!(" LIMIT {}", l));
1001            }
1002
1003            let params_refs: Vec<&dyn rusqlite::ToSql> =
1004                params_vec.iter().map(|b| b.as_ref()).collect();
1005
1006            let mut stmt = conn.prepare(&sql)?;
1007            let tasks = stmt
1008                .query_map(params_refs.as_slice(), parse_task_row)?
1009                .filter_map(|r| r.ok())
1010                .collect();
1011
1012            Ok(tasks)
1013        })
1014    }
1015
1016    /// Set the current thought for tasks owned by an agent.
1017    pub fn set_thought(
1018        &self,
1019        agent_id: &str,
1020        thought: Option<String>,
1021        task_ids: Option<Vec<String>>,
1022    ) -> Result<i32> {
1023        let now = now_ms();
1024
1025        self.with_conn(|conn| {
1026            let updated = if let Some(ids) = task_ids {
1027                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1028                let sql = format!(
1029                    "UPDATE tasks SET current_thought = ?, updated_at = ?
1030                     WHERE worker_id = ? AND id IN ({})",
1031                    placeholders.join(", ")
1032                );
1033
1034                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1035                params_vec.push(Box::new(thought.clone()));
1036                params_vec.push(Box::new(now));
1037                params_vec.push(Box::new(agent_id.to_string()));
1038                for id in &ids {
1039                    params_vec.push(Box::new(id.clone()));
1040                }
1041
1042                let params_refs: Vec<&dyn rusqlite::ToSql> =
1043                    params_vec.iter().map(|b| b.as_ref()).collect();
1044                conn.execute(&sql, params_refs.as_slice())?
1045            } else {
1046                conn.execute(
1047                    "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1048                    params![thought, now, agent_id],
1049                )?
1050            };
1051
1052            Ok(updated as i32)
1053        })
1054    }
1055
1056    /// Log time for a task.
1057    pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1058        let now = now_ms();
1059
1060        self.with_conn(|conn| {
1061            conn.execute(
1062                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1063                 WHERE id = ?3",
1064                params![duration_ms, now, task_id],
1065            )?;
1066
1067            let total: i64 = conn.query_row(
1068                "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1069                params![task_id],
1070                |row| row.get(0),
1071            )?;
1072
1073            Ok(total)
1074        })
1075    }
1076
1077    /// Log metrics and cost for a task.
1078    /// Values in the metrics array are aggregated (added) to existing values.
1079    pub fn log_metrics(
1080        &self,
1081        task_id: &str,
1082        cost_usd: Option<f64>,
1083        values: &[i64],
1084    ) -> Result<Task> {
1085        let now = now_ms();
1086
1087        self.with_conn(|conn| {
1088            let task =
1089                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1090
1091            // Aggregate metrics (add new values to existing)
1092            let mut new_metrics = task.metrics;
1093            for (i, &val) in values.iter().take(8).enumerate() {
1094                new_metrics[i] += val;
1095            }
1096
1097            let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1098
1099            conn.execute(
1100                "UPDATE tasks SET
1101                    metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1102                    metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1103                    cost_usd = ?9, updated_at = ?10
1104                WHERE id = ?11",
1105                params![
1106                    new_metrics[0],
1107                    new_metrics[1],
1108                    new_metrics[2],
1109                    new_metrics[3],
1110                    new_metrics[4],
1111                    new_metrics[5],
1112                    new_metrics[6],
1113                    new_metrics[7],
1114                    new_cost_usd,
1115                    now,
1116                    task_id,
1117                ],
1118            )?;
1119
1120            Ok(Task {
1121                cost_usd: new_cost_usd,
1122                metrics: new_metrics,
1123                updated_at: now,
1124                ..task
1125            })
1126        })
1127    }
1128
1129    /// Claim a task for an agent.
1130    /// Uses the first timed state (typically "in_progress") as the claiming state.
1131    pub fn claim_task(
1132        &self,
1133        task_id: &str,
1134        agent_id: &str,
1135        states_config: &StatesConfig,
1136    ) -> Result<Task> {
1137        let now = now_ms();
1138
1139        // Find the first timed state to use for claiming (typically "in_progress")
1140        let claim_status = states_config
1141            .definitions
1142            .iter()
1143            .find(|(_, def)| def.timed)
1144            .map(|(name, _)| name.as_str())
1145            .unwrap_or("in_progress");
1146
1147        self.with_conn(|conn| {
1148            // Get the task (using internal helper to avoid deadlock)
1149            let task =
1150                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1151
1152            // Check if already claimed
1153            if task.worker_id.is_some() {
1154                return Err(anyhow!("Task is already claimed"));
1155            }
1156
1157            // Validate state transition
1158            if !states_config.is_valid_transition(&task.status, claim_status) {
1159                let exits = states_config.get_exits(&task.status);
1160                return Err(anyhow!(
1161                    "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1162                    task.status,
1163                    exits
1164                ));
1165            }
1166
1167            // Get the agent (using internal helper to avoid deadlock)
1168            let agent =
1169                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1170
1171            // Check tag affinity - needed_tags (AND - must have ALL)
1172            if !task.needed_tags.is_empty() {
1173                for needed in &task.needed_tags {
1174                    if !agent.tags.contains(needed) {
1175                        return Err(anyhow!("Agent missing required tag: {}", needed));
1176                    }
1177                }
1178            }
1179
1180            // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
1181            if !task.wanted_tags.is_empty() {
1182                let has_any = task
1183                    .wanted_tags
1184                    .iter()
1185                    .any(|wanted| agent.tags.contains(wanted));
1186                if !has_any {
1187                    return Err(anyhow!("Agent has none of the wanted tags"));
1188                }
1189            }
1190
1191            conn.execute(
1192                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1193                 WHERE id = ?6",
1194                params![agent_id, now, claim_status, now, now, task_id,],
1195            )?;
1196
1197            // Record state transition (accumulates time if coming from timed state)
1198            record_state_transition(
1199                conn,
1200                task_id,
1201                claim_status,
1202                Some(agent_id),
1203                None,
1204                states_config,
1205            )?;
1206
1207            // Refresh agent heartbeat
1208            conn.execute(
1209                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1210                params![now, agent_id],
1211            )?;
1212
1213            Ok(Task {
1214                worker_id: Some(agent_id.to_string()),
1215                claimed_at: Some(now),
1216                status: claim_status.to_string(),
1217                started_at: Some(now),
1218                updated_at: now,
1219                ..task
1220            })
1221        })
1222    }
1223
1224    /// Release a task claim.
1225    pub fn release_task(
1226        &self,
1227        task_id: &str,
1228        agent_id: &str,
1229        states_config: &StatesConfig,
1230    ) -> Result<()> {
1231        let now = now_ms();
1232        let release_status = &states_config.initial;
1233
1234        self.with_conn(|conn| {
1235            let task =
1236                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1237
1238            if task.worker_id.as_deref() != Some(agent_id) {
1239                return Err(anyhow!("Task is not owned by this agent"));
1240            }
1241
1242            // Record state transition (accumulates time if coming from timed state)
1243            record_state_transition(
1244                conn,
1245                task_id,
1246                release_status,
1247                Some(agent_id),
1248                None,
1249                states_config,
1250            )?;
1251
1252            conn.execute(
1253                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1254                 WHERE id = ?3",
1255                params![release_status, now, task_id],
1256            )?;
1257
1258            Ok(())
1259        })
1260    }
1261
1262    /// Force release a task regardless of owner.
1263    pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1264        let now = now_ms();
1265        let release_status = &states_config.initial;
1266
1267        self.with_conn(|conn| {
1268            let task =
1269                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1270
1271            // Record state transition (accumulates time if coming from timed state)
1272            record_state_transition(
1273                conn,
1274                task_id,
1275                release_status,
1276                task.worker_id.as_deref(),
1277                None,
1278                states_config,
1279            )?;
1280
1281            conn.execute(
1282                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1283                 WHERE id = ?3",
1284                params![release_status, now, task_id],
1285            )?;
1286
1287            Ok(())
1288        })
1289    }
1290
1291    /// Force claim a task even if owned by another agent.
1292    pub fn force_claim_task(
1293        &self,
1294        task_id: &str,
1295        agent_id: &str,
1296        states_config: &StatesConfig,
1297    ) -> Result<Task> {
1298        let now = now_ms();
1299
1300        // Find the first timed state to use for claiming (typically "in_progress")
1301        let claim_status = states_config
1302            .definitions
1303            .iter()
1304            .find(|(_, def)| def.timed)
1305            .map(|(name, _)| name.as_str())
1306            .unwrap_or("in_progress");
1307
1308        self.with_conn(|conn| {
1309            // Get the task
1310            let task =
1311                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1312
1313            // Get the agent
1314            let agent =
1315                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1316
1317            // Check tag affinity - needed_tags (AND)
1318            if !task.needed_tags.is_empty() {
1319                for needed in &task.needed_tags {
1320                    if !agent.tags.contains(needed) {
1321                        return Err(anyhow!("Agent missing required tag: {}", needed));
1322                    }
1323                }
1324            }
1325
1326            // Check tag affinity - wanted_tags (OR)
1327            if !task.wanted_tags.is_empty() {
1328                let has_any = task
1329                    .wanted_tags
1330                    .iter()
1331                    .any(|wanted| agent.tags.contains(wanted));
1332                if !has_any {
1333                    return Err(anyhow!("Agent has none of the wanted tags"));
1334                }
1335            }
1336
1337            conn.execute(
1338                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1339                 WHERE id = ?6",
1340                params![agent_id, now, claim_status, now, now, task_id,],
1341            )?;
1342
1343            // Record state transition (accumulates time if coming from timed state)
1344            record_state_transition(
1345                conn,
1346                task_id,
1347                claim_status,
1348                Some(agent_id),
1349                None,
1350                states_config,
1351            )?;
1352
1353            // Refresh agent heartbeat
1354            conn.execute(
1355                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1356                params![now, agent_id],
1357            )?;
1358
1359            Ok(Task {
1360                worker_id: Some(agent_id.to_string()),
1361                claimed_at: Some(now),
1362                status: claim_status.to_string(),
1363                started_at: task.started_at.or(Some(now)),
1364                updated_at: now,
1365                ..task
1366            })
1367        })
1368    }
1369
1370    /// Release a task claim with a specified state.
1371    pub fn release_task_with_state(
1372        &self,
1373        task_id: &str,
1374        agent_id: &str,
1375        state: &str,
1376        states_config: &StatesConfig,
1377    ) -> Result<()> {
1378        let now = now_ms();
1379
1380        self.with_conn(|conn| {
1381            let task =
1382                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1383
1384            if task.worker_id.as_deref() != Some(agent_id) {
1385                return Err(anyhow!("Task is not owned by this agent"));
1386            }
1387
1388            // Validate state exists
1389            if !states_config.is_valid_state(state) {
1390                return Err(anyhow!(
1391                    "Invalid state '{}'. Valid states: {:?}",
1392                    state,
1393                    states_config.state_names()
1394                ));
1395            }
1396
1397            // Validate transition
1398            if !states_config.is_valid_transition(&task.status, state) {
1399                let exits = states_config.get_exits(&task.status);
1400                return Err(anyhow!(
1401                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1402                    task.status,
1403                    state,
1404                    exits
1405                ));
1406            }
1407
1408            // Set completed_at for terminal states
1409            let completed_at = if states_config.is_terminal_state(state) {
1410                Some(now)
1411            } else {
1412                None
1413            };
1414
1415            // Record state transition (accumulates time if coming from timed state)
1416            record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1417
1418            conn.execute(
1419                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1420                 WHERE id = ?4",
1421                params![state, completed_at, now, task_id],
1422            )?;
1423
1424            Ok(())
1425        })
1426    }
1427
1428    /// Force release stale claims.
1429    pub fn force_release_stale(
1430        &self,
1431        timeout_seconds: i64,
1432        states_config: &StatesConfig,
1433    ) -> Result<i32> {
1434        let now = now_ms();
1435        let cutoff = now - (timeout_seconds * 1000);
1436        let release_status = &states_config.initial;
1437
1438        self.with_conn(|conn| {
1439            let updated = conn.execute(
1440                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1441                 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1442                params![release_status, now, cutoff],
1443            )?;
1444
1445            Ok(updated as i32)
1446        })
1447    }
1448
1449    /// Complete a task and release file locks held by the agent.
1450    /// Uses "completed" state by default, which should be a terminal state.
1451    /// Checks that all children (via 'contains' dependencies) are complete.
1452    pub fn complete_task(
1453        &self,
1454        task_id: &str,
1455        agent_id: &str,
1456        states_config: &StatesConfig,
1457    ) -> Result<Task> {
1458        let now = now_ms();
1459
1460        // Find a terminal state to use (prefer "completed" if it exists)
1461        let complete_status = if states_config.definitions.contains_key("completed") {
1462            "completed"
1463        } else {
1464            // Find any terminal state
1465            states_config
1466                .definitions
1467                .iter()
1468                .find(|(_, def)| def.exits.is_empty())
1469                .map(|(name, _)| name.as_str())
1470                .unwrap_or("completed")
1471        };
1472
1473        self.with_conn_mut(|conn| {
1474            let tx = conn.transaction()?;
1475
1476            // Get the task
1477            let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1478            let task = stmt
1479                .query_row(params![task_id], parse_task_row)
1480                .map_err(|_| anyhow!("Task not found"))?;
1481            drop(stmt);
1482
1483            // Verify ownership
1484            if task.worker_id.as_deref() != Some(agent_id) {
1485                return Err(anyhow!("Task is not owned by this agent"));
1486            }
1487
1488            // Check for incomplete children (blocking completion)
1489            let incomplete_children: i32 = tx.query_row(
1490                "SELECT COUNT(*) FROM dependencies d
1491                 INNER JOIN tasks child ON d.to_task_id = child.id
1492                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1493                 AND child.status IN (SELECT value FROM json_each(?2))",
1494                params![
1495                    task_id,
1496                    serde_json::to_string(&states_config.blocking_states)?
1497                ],
1498                |row| row.get(0),
1499            )?;
1500
1501            if incomplete_children > 0 {
1502                return Err(anyhow!(
1503                    "Cannot complete task: {} child task(s) are not complete",
1504                    incomplete_children
1505                ));
1506            }
1507
1508            // Validate transition
1509            if !states_config.is_valid_transition(&task.status, complete_status) {
1510                let exits = states_config.get_exits(&task.status);
1511                return Err(anyhow!(
1512                    "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1513                    task.status,
1514                    exits
1515                ));
1516            }
1517
1518            // Record state transition (accumulates time from timed state)
1519            record_state_transition(
1520                &tx,
1521                task_id,
1522                complete_status,
1523                Some(agent_id),
1524                None,
1525                states_config,
1526            )?;
1527
1528            // Update task to completed
1529            tx.execute(
1530                "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1531                 worker_id = NULL, claimed_at = NULL
1532                 WHERE id = ?4",
1533                params![complete_status, now, now, task_id],
1534            )?;
1535
1536            // Release file locks associated with this task (for auto-cleanup)
1537            tx.execute(
1538                "DELETE FROM file_locks WHERE task_id = ?1",
1539                params![task_id],
1540            )?;
1541
1542            // Refresh agent heartbeat
1543            tx.execute(
1544                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1545                params![now, agent_id],
1546            )?;
1547
1548            tx.commit()?;
1549
1550            Ok(Task {
1551                status: complete_status.to_string(),
1552                completed_at: Some(now),
1553                updated_at: now,
1554                worker_id: None,
1555                claimed_at: None,
1556                ..task
1557            })
1558        })
1559    }
1560
1561    /// Get all tasks. Excludes soft-deleted tasks.
1562    pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1563        self.with_conn(|conn| {
1564            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1565            let tasks = stmt
1566                .query_map([], parse_task_row)?
1567                .filter_map(|r| r.ok())
1568                .collect();
1569            Ok(tasks)
1570        })
1571    }
1572
1573    /// Get tasks by status.
1574    #[allow(dead_code)]
1575    pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1576        self.with_conn(|conn| {
1577            let mut stmt =
1578                conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1579            let tasks = stmt
1580                .query_map(params![status], parse_task_row)?
1581                .filter_map(|r| r.ok())
1582                .collect();
1583            Ok(tasks)
1584        })
1585    }
1586
1587    /// Get claimed tasks. Excludes soft-deleted tasks.
1588    pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1589        self.with_conn(|conn| {
1590            let tasks = if let Some(aid) = agent_id {
1591                let mut stmt = conn
1592                    .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1593                stmt.query_map(params![aid], parse_task_row)?
1594                    .filter_map(|r| r.ok())
1595                    .collect()
1596            } else {
1597                let mut stmt = conn.prepare(
1598                    "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1599                )?;
1600                stmt.query_map([], parse_task_row)?
1601                    .filter_map(|r| r.ok())
1602                    .collect()
1603            };
1604
1605            Ok(tasks)
1606        })
1607    }
1608}
1609
1610/// Helper function to create task tree recursively within a transaction.
1611/// Creates dependencies from parent to children using child_type.
1612/// Creates dependencies between siblings using sibling_type.
1613/// Supports referencing existing tasks via ref_id.
1614fn create_tree_recursive(
1615    conn: &Connection,
1616    input: &TaskTreeInput,
1617    parent_id: Option<&str>,
1618    prev_sibling_id: Option<&str>,
1619    child_type: Option<&str>,
1620    sibling_type: Option<&str>,
1621    all_ids: &mut Vec<String>,
1622    states_config: &StatesConfig,
1623) -> Result<String> {
1624    // Check if this node references an existing task
1625    let task_id = if let Some(ref ref_id) = input.ref_id {
1626        // Verify the referenced task exists
1627        let exists: bool = conn.query_row(
1628            "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1629            params![ref_id],
1630            |row| row.get(0),
1631        )?;
1632        if !exists {
1633            return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1634        }
1635        ref_id.clone()
1636    } else {
1637        // Create a new task
1638        let generated_id = Uuid::now_v7().to_string();
1639        let task_id = input.id.clone().unwrap_or(generated_id);
1640        let now = now_ms();
1641        let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1642        let initial_status = &states_config.initial;
1643
1644        let needed_tags = input.needed_tags.clone().unwrap_or_default();
1645        let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1646        let tags = input.tags.clone().unwrap_or_default();
1647        let needed_tags_json = serde_json::to_string(&needed_tags)?;
1648        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1649        let tags_json = serde_json::to_string(&tags)?;
1650
1651        conn.execute(
1652            "INSERT INTO tasks (
1653                id, title, description, status, priority,
1654                needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1655            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1656            params![
1657                &task_id,
1658                &input.title,
1659                &input.description,
1660                initial_status,
1661                priority.to_string(),
1662                needed_tags_json,
1663                wanted_tags_json,
1664                tags_json,
1665                input.points,
1666                input.time_estimate_ms,
1667                now,
1668                now,
1669            ],
1670        )?;
1671
1672        // Record initial state transition
1673        record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1674
1675        // Sync tags to junction tables for indexed lookups
1676        sync_task_tags(conn, &task_id, &tags)?;
1677        sync_needed_tags(conn, &task_id, &needed_tags)?;
1678        sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1679
1680        task_id
1681    };
1682
1683    // Create dependency from parent if child_type is specified
1684    if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1685        Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1686    }
1687
1688    // Create dependency from previous sibling if sibling_type is specified
1689    if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1690        Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1691    }
1692
1693    all_ids.push(task_id.clone());
1694
1695    // Create children with dependencies based on child_type and sibling_type
1696    let mut prev_child_id: Option<String> = None;
1697    for child in input.children.iter() {
1698        let child_id = create_tree_recursive(
1699            conn,
1700            child,
1701            Some(&task_id),
1702            prev_child_id.as_deref(),
1703            child_type,
1704            sibling_type,
1705            all_ids,
1706            states_config,
1707        )?;
1708        prev_child_id = Some(child_id);
1709    }
1710
1711    Ok(task_id)
1712}