Skip to main content

task_graph_mcp/db/
tasks.rs

1//! Task CRUD and tree operations.
2
3use super::state_transitions::record_state_transition;
4use super::{Database, now_ms};
5use crate::config::{
6    AutoAdvanceConfig, DependenciesConfig, IdsConfig, PhasesConfig, StatesConfig, TagsConfig,
7};
8use crate::types::{
9    PRIORITY_DEFAULT, Priority, Task, TaskTree, TaskTreeInput, Worker, clamp_priority,
10    parse_priority,
11};
12use anyhow::{Result, anyhow};
13use petname::{Generator, Petnames};
14use rusqlite::{Connection, Row, params};
15
16/// Generate a petname-based task ID using the large wordlist.
17/// Uses the configured number of words and case style.
18fn generate_task_id(ids_config: &IdsConfig) -> String {
19    let words = ids_config.task_id_words;
20    let case = ids_config.id_case;
21
22    // Generate with hyphen separator first (petname's default format)
23    let base = Petnames::large()
24        .generate_one(words, "-")
25        .unwrap_or_else(|| format!("task-{}", super::now_ms()));
26
27    // Convert to desired case
28    case.convert(&base)
29}
30
31/// Build an ORDER BY clause from sort_by and sort_order parameters.
32/// Returns a safe SQL ORDER BY expression.
33fn build_order_clause(sort_by: Option<&str>, sort_order: Option<&str>) -> String {
34    let field = match sort_by {
35        Some("priority") => "CAST(t.priority AS INTEGER)",
36        Some("created_at") => "t.created_at",
37        Some("updated_at") => "t.updated_at",
38        _ => "t.created_at", // default
39    };
40
41    let order = match sort_order {
42        Some("asc") => "ASC",
43        Some("desc") => "DESC",
44        _ => {
45            // Default: priority is descending (higher number = more important), dates are descending
46            "DESC"
47        }
48    };
49
50    format!("{} {}", field, order)
51}
52
53// =============================================================================
54// Junction table helpers for tag management
55// =============================================================================
56
57/// Sync task tags to the task_tags junction table.
58/// Replaces all existing tags for the task.
59fn sync_task_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
60    conn.execute("DELETE FROM task_tags WHERE task_id = ?1", params![task_id])?;
61    for tag in tags {
62        conn.execute(
63            "INSERT INTO task_tags (task_id, tag) VALUES (?1, ?2)",
64            params![task_id, tag],
65        )?;
66    }
67    Ok(())
68}
69
70/// Sync needed tags (agent must have ALL) to the task_needed_tags junction table.
71fn sync_needed_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
72    conn.execute(
73        "DELETE FROM task_needed_tags WHERE task_id = ?1",
74        params![task_id],
75    )?;
76    for tag in tags {
77        conn.execute(
78            "INSERT INTO task_needed_tags (task_id, tag) VALUES (?1, ?2)",
79            params![task_id, tag],
80        )?;
81    }
82    Ok(())
83}
84
85/// Sync wanted tags (agent must have ANY) to the task_wanted_tags junction table.
86fn sync_wanted_tags(conn: &Connection, task_id: &str, tags: &[String]) -> Result<()> {
87    conn.execute(
88        "DELETE FROM task_wanted_tags WHERE task_id = ?1",
89        params![task_id],
90    )?;
91    for tag in tags {
92        conn.execute(
93            "INSERT INTO task_wanted_tags (task_id, tag) VALUES (?1, ?2)",
94            params![task_id, tag],
95        )?;
96    }
97    Ok(())
98}
99
100pub fn parse_task_row(row: &Row) -> rusqlite::Result<Task> {
101    let id: String = row.get("id")?;
102    let title: String = row.get("title")?;
103    let description: Option<String> = row.get("description")?;
104    let status: String = row.get("status")?;
105    let phase: Option<String> = row.get("phase")?;
106    let priority: String = row.get("priority")?;
107    let worker_id: Option<String> = row.get("worker_id")?;
108    let claimed_at: Option<i64> = row.get("claimed_at")?;
109
110    let needed_tags_json: Option<String> = row.get("needed_tags")?;
111    let wanted_tags_json: Option<String> = row.get("wanted_tags")?;
112    let tags_json: Option<String> = row.get("tags")?;
113
114    let points: Option<i32> = row.get("points")?;
115    let time_estimate_ms: Option<i64> = row.get("time_estimate_ms")?;
116    let time_actual_ms: Option<i64> = row.get("time_actual_ms")?;
117    let started_at: Option<i64> = row.get("started_at")?;
118    let completed_at: Option<i64> = row.get("completed_at")?;
119
120    let current_thought: Option<String> = row.get("current_thought")?;
121
122    let cost_usd: f64 = row.get("cost_usd")?;
123    let metric_0: i64 = row.get("metric_0")?;
124    let metric_1: i64 = row.get("metric_1")?;
125    let metric_2: i64 = row.get("metric_2")?;
126    let metric_3: i64 = row.get("metric_3")?;
127    let metric_4: i64 = row.get("metric_4")?;
128    let metric_5: i64 = row.get("metric_5")?;
129    let metric_6: i64 = row.get("metric_6")?;
130    let metric_7: i64 = row.get("metric_7")?;
131
132    let created_at: i64 = row.get("created_at")?;
133    let updated_at: i64 = row.get("updated_at")?;
134
135    Ok(Task {
136        id,
137        title,
138        description,
139        status,
140        phase,
141        priority: parse_priority(&priority),
142        worker_id,
143        claimed_at,
144        needed_tags: needed_tags_json
145            .map(|s| serde_json::from_str(&s).unwrap_or_default())
146            .unwrap_or_default(),
147        wanted_tags: wanted_tags_json
148            .map(|s| serde_json::from_str(&s).unwrap_or_default())
149            .unwrap_or_default(),
150        tags: tags_json
151            .map(|s| serde_json::from_str(&s).unwrap_or_default())
152            .unwrap_or_default(),
153        points,
154        time_estimate_ms,
155        time_actual_ms,
156        started_at,
157        completed_at,
158        current_thought,
159        cost_usd,
160        metrics: [
161            metric_0, metric_1, metric_2, metric_3, metric_4, metric_5, metric_6, metric_7,
162        ],
163        created_at,
164        updated_at,
165    })
166}
167
168/// Internal helper to get a task using an existing connection (avoids deadlock).
169fn get_task_internal(conn: &Connection, task_id: &str) -> Result<Option<Task>> {
170    let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
171
172    let result = stmt.query_row(params![task_id], parse_task_row);
173
174    match result {
175        Ok(task) => Ok(Some(task)),
176        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
177        Err(e) => Err(e.into()),
178    }
179}
180
181/// Internal helper to get a worker using an existing connection (avoids deadlock).
182fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
183    let mut stmt = conn.prepare(
184        "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow
185         FROM workers WHERE id = ?1",
186    )?;
187
188    let result = stmt.query_row(params![worker_id], |row| {
189        let id: String = row.get(0)?;
190        let tags_json: String = row.get(1)?;
191        let max_claims: i32 = row.get(2)?;
192        let registered_at: i64 = row.get(3)?;
193        let last_heartbeat: i64 = row.get(4)?;
194        let last_status: Option<String> = row.get(5)?;
195        let last_phase: Option<String> = row.get(6)?;
196        let workflow: Option<String> = row.get(7)?;
197
198        Ok((
199            id,
200            tags_json,
201            max_claims,
202            registered_at,
203            last_heartbeat,
204            last_status,
205            last_phase,
206            workflow,
207        ))
208    });
209
210    match result {
211        Ok((
212            id,
213            tags_json,
214            max_claims,
215            registered_at,
216            last_heartbeat,
217            last_status,
218            last_phase,
219            workflow,
220        )) => {
221            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
222            Ok(Some(Worker {
223                id,
224                tags,
225                max_claims,
226                registered_at,
227                last_heartbeat,
228                last_status,
229                last_phase,
230                workflow,
231            }))
232        }
233        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
234        Err(e) => Err(e.into()),
235    }
236}
237
238impl Database {
239    /// Create a new task.
240    /// If id is provided, uses it as the task ID; otherwise generates a petname ID.
241    /// If parent_id is provided, creates a 'contains' dependency from parent to this task.
242    #[allow(clippy::too_many_arguments)]
243    pub fn create_task(
244        &self,
245        id: Option<String>,
246        description: String,
247        parent_id: Option<String>,
248        phase: Option<String>,
249        priority: Option<Priority>,
250        points: Option<i32>,
251        time_estimate_ms: Option<i64>,
252        agent_tags_all: Option<Vec<String>>,
253        agent_tags_any: Option<Vec<String>>,
254        tags: Option<Vec<String>>,
255        states_config: &StatesConfig,
256        ids_config: &IdsConfig,
257    ) -> Result<Task> {
258        let task_id = id.unwrap_or_else(|| generate_task_id(ids_config));
259        let now = now_ms();
260        let priority = clamp_priority(priority.unwrap_or(PRIORITY_DEFAULT));
261        let initial_status = &states_config.initial;
262
263        let needed_tags = agent_tags_all.unwrap_or_default();
264        let wanted_tags = agent_tags_any.unwrap_or_default();
265        let tags = tags.unwrap_or_default();
266        let needed_tags_json = serde_json::to_string(&needed_tags)?;
267        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
268        let tags_json = serde_json::to_string(&tags)?;
269
270        self.with_conn_mut(|conn| {
271            let tx = conn.transaction()?;
272
273            tx.execute(
274                "INSERT INTO tasks (
275                    id, title, description, status, phase, priority,
276                    needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
277                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
278                params![
279                    &task_id,
280                    &description, // Use description as title
281                    &description, // Also store as description
282                    initial_status,
283                    &phase,
284                    priority.to_string(),
285                    needed_tags_json,
286                    wanted_tags_json,
287                    tags_json,
288                    points,
289                    time_estimate_ms,
290                    now,
291                    now,
292                ],
293            )?;
294
295            // Sync tags to junction tables
296            sync_task_tags(&tx, &task_id, &tags)?;
297            sync_needed_tags(&tx, &task_id, &needed_tags)?;
298            sync_wanted_tags(&tx, &task_id, &wanted_tags)?;
299
300            // Create 'contains' dependency if parent_id is provided
301            if let Some(ref pid) = parent_id {
302                Database::add_dependency_internal(&tx, pid, &task_id, "contains")?;
303            }
304
305            // Record initial state
306            record_state_transition(&tx, &task_id, initial_status, None, None, states_config)?;
307
308            tx.commit()?;
309
310            Ok(Task {
311                id: task_id,
312                title: description.clone(),
313                description: Some(description),
314                status: initial_status.clone(),
315                phase,
316                priority,
317                worker_id: None,
318                claimed_at: None,
319                needed_tags,
320                wanted_tags,
321                tags,
322                points,
323                time_estimate_ms,
324                time_actual_ms: None,
325                started_at: None,
326                completed_at: None,
327                current_thought: None,
328                cost_usd: 0.0,
329                metrics: [0; 8],
330                created_at: now,
331                updated_at: now,
332            })
333        })
334    }
335
336    /// Convenience method to create a task with just description.
337    /// All other parameters use defaults. Useful for tests.
338    pub fn create_task_simple(
339        &self,
340        description: impl Into<String>,
341        states_config: &StatesConfig,
342        ids_config: &IdsConfig,
343    ) -> Result<Task> {
344        self.create_task(
345            None,
346            description.into(),
347            None,
348            None,
349            None,
350            None,
351            None,
352            None,
353            None,
354            None,
355            states_config,
356            ids_config,
357        )
358    }
359
360    /// Create a task tree from nested input.
361    /// Uses child_type for parent-child dependencies (default: "contains").
362    /// Uses sibling_type for sibling dependencies (default: none/parallel).
363    pub fn create_task_tree(
364        &self,
365        input: TaskTreeInput,
366        parent_id: Option<String>,
367        child_type: Option<String>,
368        sibling_type: Option<String>,
369        states_config: &StatesConfig,
370        phases_config: &PhasesConfig,
371        tags_config: &TagsConfig,
372        ids_config: &IdsConfig,
373    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>)> {
374        let mut all_ids = Vec::new();
375        let mut phase_warnings = Vec::new();
376        let mut tag_warnings = Vec::new();
377        // Default child_type to "contains" if not specified
378        let child_type = child_type.or_else(|| Some("contains".to_string()));
379
380        self.with_conn_mut(|conn| {
381            let tx = conn.transaction()?;
382            let root_id = create_tree_recursive(
383                &tx,
384                &input,
385                parent_id.as_deref(),
386                None, // no previous sibling for root
387                child_type.as_deref(),
388                sibling_type.as_deref(),
389                &mut all_ids,
390                &mut phase_warnings,
391                &mut tag_warnings,
392                states_config,
393                phases_config,
394                tags_config,
395                ids_config,
396            )?;
397            tx.commit()?;
398            Ok((root_id, all_ids, phase_warnings, tag_warnings))
399        })
400    }
401
402    /// Get a task by ID.
403    pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
404        self.with_conn(|conn| {
405            let mut stmt = conn.prepare("SELECT * FROM tasks WHERE id = ?1")?;
406
407            let result = stmt.query_row(params![task_id], parse_task_row);
408
409            match result {
410                Ok(task) => Ok(Some(task)),
411                Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
412                Err(e) => Err(e.into()),
413            }
414        })
415    }
416
417    /// Get a task with all its children (tree).
418    pub fn get_task_tree(&self, task_id: &str) -> Result<Option<TaskTree>> {
419        let task = self.get_task(task_id)?;
420        match task {
421            None => Ok(None),
422            Some(task) => {
423                let children = self.get_children_recursive(&task.id)?;
424                Ok(Some(TaskTree { task, children }))
425            }
426        }
427    }
428
429    /// Get children recursively.
430    fn get_children_recursive(&self, parent_id: &str) -> Result<Vec<TaskTree>> {
431        let children = self.get_children(parent_id)?;
432        let mut result = Vec::new();
433
434        for child in children {
435            let child_children = self.get_children_recursive(&child.id)?;
436            result.push(TaskTree {
437                task: child,
438                children: child_children,
439            });
440        }
441
442        Ok(result)
443    }
444
445    /// Get direct children of a task (via 'contains' dependency).
446    pub fn get_children(&self, parent_id: &str) -> Result<Vec<Task>> {
447        self.with_conn(|conn| {
448            let mut stmt = conn.prepare(
449                "SELECT t.* FROM tasks t
450                 INNER JOIN dependencies d ON t.id = d.to_task_id
451                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
452                 ORDER BY t.created_at",
453            )?;
454
455            let tasks = stmt
456                .query_map(params![parent_id], parse_task_row)?
457                .filter_map(|r| r.ok())
458                .collect();
459
460            Ok(tasks)
461        })
462    }
463
464    /// Update a task.
465    #[allow(clippy::too_many_arguments)]
466    pub fn update_task(
467        &self,
468        task_id: &str,
469        title: Option<String>,
470        description: Option<Option<String>>,
471        status: Option<String>,
472        priority: Option<Priority>,
473        points: Option<Option<i32>>,
474        tags: Option<Vec<String>>,
475        states_config: &StatesConfig,
476    ) -> Result<Task> {
477        let now = now_ms();
478
479        self.with_conn(|conn| {
480            let task =
481                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
482
483            let new_title = title.unwrap_or(task.title.clone());
484            let new_description = description.unwrap_or(task.description.clone());
485            let new_status = status.unwrap_or(task.status.clone());
486            let new_priority = priority.unwrap_or(task.priority);
487            let new_points = points.unwrap_or(task.points);
488            let new_tags = tags.unwrap_or(task.tags.clone());
489
490            // Validate the new status exists
491            if !states_config.is_valid_state(&new_status) {
492                return Err(anyhow!(
493                    "Invalid state '{}'. Valid states: {:?}",
494                    new_status,
495                    states_config.state_names()
496                ));
497            }
498
499            // Validate state transition if status changed
500            if task.status != new_status
501                && !states_config.is_valid_transition(&task.status, &new_status)
502            {
503                let exits = states_config.get_exits(&task.status);
504                return Err(anyhow!(
505                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
506                    task.status,
507                    new_status,
508                    exits
509                ));
510            }
511
512            // Handle status transitions for timestamps
513            // Set started_at when first entering a timed state
514            let started_at =
515                if task.started_at.is_none() && states_config.is_timed_state(&new_status) {
516                    Some(now)
517                } else {
518                    task.started_at
519                };
520
521            // Set completed_at when entering a completed state (terminal or with reopen capability)
522            let completed_at = if new_status == "completed" {
523                Some(now)
524            } else {
525                task.completed_at
526            };
527
528            // Record state transition if status changed (handles time accumulation)
529            if task.status != new_status {
530                record_state_transition(
531                    conn,
532                    task_id,
533                    &new_status,
534                    task.worker_id.as_deref(),
535                    None,
536                    states_config,
537                )?;
538            }
539
540            conn.execute(
541                "UPDATE tasks SET
542                    title = ?1, description = ?2, status = ?3, priority = ?4,
543                    points = ?5, started_at = ?6, completed_at = ?7, updated_at = ?8,
544                    tags = ?9
545                WHERE id = ?10",
546                params![
547                    new_title,
548                    new_description,
549                    new_status,
550                    new_priority.to_string(),
551                    new_points,
552                    started_at,
553                    completed_at,
554                    now,
555                    serde_json::to_string(&new_tags)?,
556                    task_id,
557                ],
558            )?;
559
560            Ok(Task {
561                id: task_id.to_string(),
562                title: new_title,
563                description: new_description,
564                status: new_status,
565                priority: new_priority,
566                points: new_points,
567                tags: new_tags,
568                started_at,
569                completed_at,
570                updated_at: now,
571                ..task
572            })
573        })
574    }
575
576    /// Update a task with unified claim/release logic.
577    /// - Transition to timed state = CLAIM (set owner, validate tags, check limit)
578    /// - Transition from timed to non-timed = RELEASE (clear owner)
579    /// - Transition to terminal = COMPLETE (check children, release file locks)
580    /// - With assignee = ASSIGN (set owner to assignee, transition to 'assigned' state)
581    /// - Only the owner can update a claimed task (unless force=true)
582    ///
583    /// Returns (task, unblocked, auto_advanced):
584    /// - task: The updated task
585    /// - unblocked: Task IDs that are now ready (all dependencies satisfied)
586    /// - auto_advanced: Subset of unblocked that were actually transitioned
587    #[allow(clippy::too_many_arguments)]
588    pub fn update_task_unified(
589        &self,
590        task_id: &str,
591        agent_id: &str,
592        assignee: Option<&str>,
593        title: Option<String>,
594        description: Option<Option<String>>,
595        status: Option<String>,
596        phase: Option<String>,
597        priority: Option<Priority>,
598        points: Option<Option<i32>>,
599        tags: Option<Vec<String>>,
600        needed_tags: Option<Vec<String>>,
601        wanted_tags: Option<Vec<String>>,
602        time_estimate_ms: Option<i64>,
603        reason: Option<String>,
604        force: bool,
605        states_config: &StatesConfig,
606        deps_config: &DependenciesConfig,
607        auto_advance: &AutoAdvanceConfig,
608    ) -> Result<(Task, Vec<String>, Vec<String>)> {
609        let now = now_ms();
610
611        self.with_conn_mut(|conn| {
612            let tx = conn.transaction()?;
613
614            let task =
615                get_task_internal(&tx, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
616
617            // Owner-only validation: if task is claimed, only owner can update (unless force)
618            if let Some(ref current_owner) = task.worker_id
619                && current_owner != agent_id && !force {
620                    return Err(anyhow!(
621                        "Task is claimed by agent '{}'. Only the owner can update claimed tasks (use force=true to override)",
622                        current_owner
623                    ));
624                }
625
626            let new_title = title.unwrap_or(task.title.clone());
627            let new_description = description.unwrap_or(task.description.clone());
628            // If assignee is set but no explicit status, default to 'assigned' state
629            let new_status = if assignee.is_some() && status.is_none() {
630                "assigned".to_string()
631            } else {
632                status.unwrap_or(task.status.clone())
633            };
634            let new_priority = priority.unwrap_or(task.priority);
635            let new_points = points.unwrap_or(task.points);
636            let new_tags = tags.unwrap_or(task.tags.clone());
637            let new_needed_tags = needed_tags.unwrap_or(task.needed_tags.clone());
638            let new_wanted_tags = wanted_tags.unwrap_or(task.wanted_tags.clone());
639            let new_time_estimate_ms = time_estimate_ms.or(task.time_estimate_ms);
640            let new_phase = phase.or(task.phase.clone());
641
642            // Validate the new status exists
643            if !states_config.is_valid_state(&new_status) {
644                return Err(anyhow!(
645                    "Invalid state '{}'. Valid states: {:?}",
646                    new_status,
647                    states_config.state_names()
648                ));
649            }
650
651            // Validate state transition if status changed
652            if task.status != new_status
653                && !states_config.is_valid_transition(&task.status, &new_status) {
654                    let exits = states_config.get_exits(&task.status);
655                    return Err(anyhow!(
656                        "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
657                        task.status,
658                        new_status,
659                        exits
660                    ));
661                }
662
663            // Determine ownership changes based on state transition
664            let new_is_timed = states_config.is_timed_state(&new_status);
665            let new_is_terminal = states_config.is_terminal_state(&new_status);
666            let current_owner = task.worker_id.as_deref();
667            let is_owned_by_agent = current_owner == Some(agent_id);
668            let is_owned_by_other = current_owner.is_some() && !is_owned_by_agent;
669
670            let mut new_owner: Option<String> = task.worker_id.clone();
671            let mut new_claimed_at: Option<i64> = task.claimed_at;
672
673            // ASSIGN: Push coordination - coordinator assigns task to another agent
674            // Sets owner without starting the timer (assigned state is untimed)
675            if let Some(target_agent) = assignee {
676                // Verify task is not already claimed (unless force)
677                if is_owned_by_other && !force {
678                    return Err(anyhow!(
679                        "Task is already claimed by agent '{}'. Use force=true to reassign.",
680                        current_owner.unwrap()
681                    ));
682                }
683
684                // Verify the assignee exists
685                let target = get_worker_internal(&tx, target_agent)?
686                    .ok_or_else(|| anyhow!("Assignee agent '{}' not found", target_agent))?;
687
688                // Check tag affinity for the assignee
689                if !task.needed_tags.is_empty() {
690                    for needed in &task.needed_tags {
691                        if !target.tags.contains(needed) {
692                            return Err(anyhow!(
693                                "Assignee '{}' missing required tag: {}",
694                                target_agent,
695                                needed
696                            ));
697                        }
698                    }
699                }
700
701                if !task.wanted_tags.is_empty() {
702                    let has_any = task
703                        .wanted_tags
704                        .iter()
705                        .any(|wanted| target.tags.contains(wanted));
706                    if !has_any {
707                        return Err(anyhow!(
708                            "Assignee '{}' has none of the wanted tags: {:?}",
709                            target_agent,
710                            task.wanted_tags
711                        ));
712                    }
713                }
714
715                // Set ownership to the assignee
716                new_owner = Some(target_agent.to_string());
717                new_claimed_at = Some(now);
718            }
719
720            // CLAIM: Transitioning to a timed state and need to take ownership
721            // This handles: non-timed -> timed, OR timed (other owner) -> timed (force claim)
722            if new_is_timed && !is_owned_by_agent {
723                // Already claimed by someone else?
724                if is_owned_by_other && !force {
725                    return Err(anyhow!(
726                        "Task is already claimed by agent '{}'",
727                        current_owner.unwrap()
728                    ));
729                }
730
731                // Check for unsatisfied blocking dependencies (skip if force)
732                if !force {
733                    let unsatisfied_blockers = super::deps::get_unsatisfied_start_blockers_in_tx(
734                        &tx,
735                        task_id,
736                        states_config,
737                        deps_config,
738                    )?;
739                    if !unsatisfied_blockers.is_empty() {
740                        return Err(anyhow!(
741                            "Task has unsatisfied dependencies: [{}]",
742                            unsatisfied_blockers.join(", ")
743                        ));
744                    }
745                }
746
747                // Get the agent
748                let agent = get_worker_internal(&tx, agent_id)?
749                    .ok_or_else(|| anyhow!("Agent not found"))?;
750
751                // Check tag affinity - needed_tags (AND - must have ALL)
752                if !task.needed_tags.is_empty() {
753                    for needed in &task.needed_tags {
754                        if !agent.tags.contains(needed) {
755                            return Err(anyhow!("Agent missing required tag: {}", needed));
756                        }
757                    }
758                }
759
760                // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
761                if !task.wanted_tags.is_empty() {
762                    let has_any = task
763                        .wanted_tags
764                        .iter()
765                        .any(|wanted| agent.tags.contains(wanted));
766                    if !has_any {
767                        return Err(anyhow!("Agent has none of the wanted tags"));
768                    }
769                }
770
771                // Set ownership
772                new_owner = Some(agent_id.to_string());
773                new_claimed_at = Some(now);
774
775                // Refresh agent heartbeat
776                tx.execute(
777                    "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
778                    params![now, agent_id],
779                )?;
780            }
781
782            // RELEASE: Transitioning to non-timed state (but not terminal)
783            if !new_is_timed && !new_is_terminal && task.worker_id.is_some() {
784                // Verify ownership (unless force)
785                if is_owned_by_other && !force {
786                    return Err(anyhow!("Task is not owned by this agent"));
787                }
788
789                // Clear ownership
790                new_owner = None;
791                new_claimed_at = None;
792            }
793
794            // COMPLETE: Transition to terminal state
795            if new_is_terminal {
796                // Verify ownership if task was claimed (unless force)
797                if let Some(ref current_owner) = task.worker_id
798                    && current_owner != agent_id && !force {
799                        return Err(anyhow!("Task is not owned by this agent"));
800                    }
801
802                // Check for incomplete children (via 'contains' dependencies)
803                let incomplete_children: i32 = tx.query_row(
804                    "SELECT COUNT(*) FROM dependencies d
805                     INNER JOIN tasks child ON d.to_task_id = child.id
806                     WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
807                     AND child.status IN (SELECT value FROM json_each(?2))",
808                    params![
809                        task_id,
810                        serde_json::to_string(&states_config.blocking_states)?
811                    ],
812                    |row| row.get(0),
813                )?;
814
815                if incomplete_children > 0 {
816                    return Err(anyhow!(
817                        "Cannot complete task: {} child task(s) are not complete",
818                        incomplete_children
819                    ));
820                }
821
822                // Clear ownership
823                new_owner = None;
824                new_claimed_at = None;
825
826                // Release file locks associated with this task (for auto-cleanup)
827                tx.execute(
828                    "DELETE FROM file_locks WHERE task_id = ?1",
829                    params![task_id],
830                )?;
831            }
832
833            // Handle timestamps
834            let started_at =
835                if task.started_at.is_none() && new_is_timed {
836                    Some(now)
837                } else {
838                    task.started_at
839                };
840
841            // Set completed_at when entering completed status (even if it has reopen exits)
842            let completed_at = if new_status == "completed" {
843                Some(now)
844            } else {
845                task.completed_at
846            };
847
848            // Record state transition if status changed (with reason for audit)
849            let status_changed = task.status != new_status;
850            if status_changed {
851                record_state_transition(
852                    &tx,
853                    task_id,
854                    &new_status,
855                    new_owner.as_deref(),
856                    reason.as_deref(),
857                    states_config,
858                )?;
859            }
860
861            // Record phase transition if phase changed
862            let phase_changed = task.phase != new_phase;
863            if phase_changed {
864                super::state_transitions::record_phase_transition(
865                    &tx,
866                    task_id,
867                    new_phase.as_deref().unwrap_or(""),
868                    Some(agent_id),
869                    reason.as_deref(),
870                )?;
871            }
872
873            tx.execute(
874                "UPDATE tasks SET
875                    title = ?1, description = ?2, status = ?3, phase = ?4, priority = ?5,
876                    points = ?6, started_at = ?7, completed_at = ?8, updated_at = ?9,
877                    tags = ?10, worker_id = ?11, claimed_at = ?12,
878                    needed_tags = ?13, wanted_tags = ?14, time_estimate_ms = ?15
879                WHERE id = ?16",
880                params![
881                    new_title,
882                    new_description,
883                    new_status,
884                    new_phase,
885                    new_priority.to_string(),
886                    new_points,
887                    started_at,
888                    completed_at,
889                    now,
890                    serde_json::to_string(&new_tags)?,
891                    new_owner,
892                    new_claimed_at,
893                    serde_json::to_string(&new_needed_tags)?,
894                    serde_json::to_string(&new_wanted_tags)?,
895                    new_time_estimate_ms,
896                    task_id,
897                ],
898            )?;
899
900            // Sync tags to junction tables if changed
901            if new_tags != task.tags {
902                sync_task_tags(&tx, task_id, &new_tags)?;
903            }
904            if new_needed_tags != task.needed_tags {
905                sync_needed_tags(&tx, task_id, &new_needed_tags)?;
906            }
907            if new_wanted_tags != task.wanted_tags {
908                sync_wanted_tags(&tx, task_id, &new_wanted_tags)?;
909            }
910
911            // Check for unblocked tasks if this task transitioned FROM blocking TO non-blocking
912            let (unblocked, auto_advanced) = if status_changed {
913                let was_blocking = states_config.is_blocking_state(&task.status);
914                let is_blocking = states_config.is_blocking_state(&new_status);
915
916                if was_blocking && !is_blocking {
917                    super::deps::propagate_unblock_effects(
918                        &tx,
919                        task_id,
920                        Some(agent_id),
921                        states_config,
922                        deps_config,
923                        auto_advance,
924                    )?
925                } else {
926                    (vec![], vec![])
927                }
928            } else {
929                (vec![], vec![])
930            };
931
932            tx.commit()?;
933
934            Ok((Task {
935                id: task_id.to_string(),
936                title: new_title,
937                description: new_description,
938                status: new_status,
939                phase: new_phase,
940                priority: new_priority,
941                points: new_points,
942                tags: new_tags,
943                needed_tags: new_needed_tags,
944                wanted_tags: new_wanted_tags,
945                time_estimate_ms: new_time_estimate_ms,
946                started_at,
947                completed_at,
948                updated_at: now,
949                worker_id: new_owner,
950                claimed_at: new_claimed_at,
951                ..task
952            }, unblocked, auto_advanced))
953        })
954    }
955
956    /// Delete a task (soft delete by default, hard delete with obliterate=true).
957    ///
958    /// - `worker_id`: The worker attempting to delete (required for ownership check)
959    /// - `cascade`: Whether to delete children (default: false)
960    /// - `reason`: Optional reason for deletion
961    /// - `obliterate`: If true, permanently deletes the task; if false (default), soft deletes
962    /// - `force`: If true, allows deletion even if owned by another worker
963    pub fn delete_task(
964        &self,
965        task_id: &str,
966        worker_id: &str,
967        cascade: bool,
968        reason: Option<String>,
969        obliterate: bool,
970        force: bool,
971    ) -> Result<()> {
972        let now = now_ms();
973
974        self.with_conn_mut(|conn| {
975            let tx = conn.transaction()?;
976
977            // Get the task to check ownership
978            let task = get_task_internal(&tx, task_id)?
979                .ok_or_else(|| anyhow!("Task not found"))?;
980
981            // Check ownership - reject if claimed by another worker (unless force)
982            if let Some(ref owner) = task.worker_id
983                && owner != worker_id && !force {
984                    return Err(anyhow!(
985                        "Task is claimed by worker '{}'. Use force=true to override.",
986                        owner
987                    ));
988                }
989
990            if obliterate {
991                // Hard delete - permanently remove from database
992                if cascade {
993                    // Find all descendants using recursive CTE and delete them
994                    // The CTE finds all tasks reachable via 'contains' dependencies
995                    tx.execute(
996                        "WITH RECURSIVE descendants AS (
997                            SELECT ?1 AS id
998                            UNION ALL
999                            SELECT dep.to_task_id FROM dependencies dep
1000                            INNER JOIN descendants d ON dep.from_task_id = d.id
1001                            WHERE dep.dep_type = 'contains'
1002                        )
1003                        DELETE FROM tasks WHERE id IN (SELECT id FROM descendants)",
1004                        params![task_id],
1005                    )?;
1006                } else {
1007                    // Check for children via dependencies
1008                    let child_count: i32 = tx.query_row(
1009                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1010                        params![task_id],
1011                        |row| row.get(0),
1012                    )?;
1013
1014                    if child_count > 0 {
1015                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1016                    }
1017
1018                    tx.execute("DELETE FROM tasks WHERE id = ?1", params![task_id])?;
1019                }
1020            } else {
1021                // Soft delete - set deleted_at, deleted_by, deleted_reason
1022                if cascade {
1023                    // Soft delete all descendants
1024                    tx.execute(
1025                        "WITH RECURSIVE descendants AS (
1026                            SELECT ?1 AS id
1027                            UNION ALL
1028                            SELECT dep.to_task_id FROM dependencies dep
1029                            INNER JOIN descendants d ON dep.from_task_id = d.id
1030                            WHERE dep.dep_type = 'contains'
1031                        )
1032                        UPDATE tasks SET deleted_at = ?2, deleted_by = ?3, deleted_reason = ?4, updated_at = ?2
1033                        WHERE id IN (SELECT id FROM descendants) AND deleted_at IS NULL",
1034                        params![task_id, now, worker_id, reason],
1035                    )?;
1036                } else {
1037                    // Check for children via dependencies
1038                    let child_count: i32 = tx.query_row(
1039                        "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
1040                        params![task_id],
1041                        |row| row.get(0),
1042                    )?;
1043
1044                    if child_count > 0 {
1045                        return Err(anyhow!("Task has children; use cascade=true to delete"));
1046                    }
1047
1048                    tx.execute(
1049                        "UPDATE tasks SET deleted_at = ?1, deleted_by = ?2, deleted_reason = ?3, updated_at = ?1 WHERE id = ?4",
1050                        params![now, worker_id, reason, task_id],
1051                    )?;
1052                }
1053            }
1054
1055            tx.commit()?;
1056            Ok(())
1057        })
1058    }
1059
1060    /// List tasks with optional filters.
1061    /// Returns full Task objects. Excludes soft-deleted tasks.
1062    pub fn list_tasks(
1063        &self,
1064        status: Option<&str>,
1065        phase: Option<&str>,
1066        owner: Option<&str>,
1067        parent_id: Option<Option<&str>>,
1068        limit: Option<i32>,
1069        sort_by: Option<&str>,
1070        sort_order: Option<&str>,
1071    ) -> Result<Vec<Task>> {
1072        self.with_conn(|conn| {
1073            let mut sql = String::from(
1074                "SELECT t.* FROM tasks t WHERE t.deleted_at IS NULL",
1075            );
1076            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1077
1078            if let Some(s) = status {
1079                sql.push_str(" AND t.status = ?");
1080                params_vec.push(Box::new(s.to_string()));
1081            }
1082
1083            if let Some(p) = phase {
1084                sql.push_str(" AND t.phase = ?");
1085                params_vec.push(Box::new(p.to_string()));
1086            }
1087
1088            if let Some(o) = owner {
1089                sql.push_str(" AND t.worker_id = ?");
1090                params_vec.push(Box::new(o.to_string()));
1091            }
1092
1093            // Handle parent filtering via dependencies table
1094            if let Some(p) = parent_id {
1095                match p {
1096                    Some(pid) => {
1097                        sql.push_str(" AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ? AND dep_type = 'contains')");
1098                        params_vec.push(Box::new(pid.to_string()));
1099                    }
1100                    None => {
1101                        // Root tasks: not contained by any other task
1102                        sql.push_str(" AND t.id NOT IN (SELECT to_task_id FROM dependencies WHERE dep_type = 'contains')");
1103                    }
1104                }
1105            }
1106
1107            // Build ORDER BY clause
1108            let order_clause = build_order_clause(sort_by, sort_order);
1109            sql.push_str(&format!(" ORDER BY {}", order_clause));
1110
1111            if let Some(l) = limit {
1112                sql.push_str(&format!(" LIMIT {}", l));
1113            }
1114
1115            let params_refs: Vec<&dyn rusqlite::ToSql> =
1116                params_vec.iter().map(|b| b.as_ref()).collect();
1117
1118            let mut stmt = conn.prepare(&sql)?;
1119            let tasks = stmt
1120                .query_map(params_refs.as_slice(), parse_task_row)?
1121                .filter_map(|r| r.ok())
1122                .collect();
1123
1124            Ok(tasks)
1125        })
1126    }
1127
1128    /// Set the current thought for tasks owned by an agent.
1129    pub fn set_thought(
1130        &self,
1131        agent_id: &str,
1132        thought: Option<String>,
1133        task_ids: Option<Vec<String>>,
1134    ) -> Result<i32> {
1135        let now = now_ms();
1136
1137        self.with_conn(|conn| {
1138            let updated = if let Some(ids) = task_ids {
1139                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
1140                let sql = format!(
1141                    "UPDATE tasks SET current_thought = ?, updated_at = ?
1142                     WHERE worker_id = ? AND id IN ({})",
1143                    placeholders.join(", ")
1144                );
1145
1146                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1147                params_vec.push(Box::new(thought.clone()));
1148                params_vec.push(Box::new(now));
1149                params_vec.push(Box::new(agent_id.to_string()));
1150                for id in &ids {
1151                    params_vec.push(Box::new(id.clone()));
1152                }
1153
1154                let params_refs: Vec<&dyn rusqlite::ToSql> =
1155                    params_vec.iter().map(|b| b.as_ref()).collect();
1156                conn.execute(&sql, params_refs.as_slice())?
1157            } else {
1158                conn.execute(
1159                    "UPDATE tasks SET current_thought = ?, updated_at = ? WHERE worker_id = ?",
1160                    params![thought, now, agent_id],
1161                )?
1162            };
1163
1164            Ok(updated as i32)
1165        })
1166    }
1167
1168    /// Log time for a task.
1169    pub fn log_time(&self, task_id: &str, duration_ms: i64) -> Result<i64> {
1170        let now = now_ms();
1171
1172        self.with_conn(|conn| {
1173            conn.execute(
1174                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
1175                 WHERE id = ?3",
1176                params![duration_ms, now, task_id],
1177            )?;
1178
1179            let total: i64 = conn.query_row(
1180                "SELECT COALESCE(time_actual_ms, 0) FROM tasks WHERE id = ?1",
1181                params![task_id],
1182                |row| row.get(0),
1183            )?;
1184
1185            Ok(total)
1186        })
1187    }
1188
1189    /// Log metrics and cost for a task.
1190    /// Values in the metrics array are aggregated (added) to existing values.
1191    pub fn log_metrics(
1192        &self,
1193        task_id: &str,
1194        cost_usd: Option<f64>,
1195        values: &[i64],
1196    ) -> Result<Task> {
1197        let now = now_ms();
1198
1199        self.with_conn(|conn| {
1200            let task =
1201                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1202
1203            // Aggregate metrics (add new values to existing)
1204            let mut new_metrics = task.metrics;
1205            for (i, &val) in values.iter().take(8).enumerate() {
1206                new_metrics[i] += val;
1207            }
1208
1209            let new_cost_usd = task.cost_usd + cost_usd.unwrap_or(0.0);
1210
1211            conn.execute(
1212                "UPDATE tasks SET
1213                    metric_0 = ?1, metric_1 = ?2, metric_2 = ?3, metric_3 = ?4,
1214                    metric_4 = ?5, metric_5 = ?6, metric_6 = ?7, metric_7 = ?8,
1215                    cost_usd = ?9, updated_at = ?10
1216                WHERE id = ?11",
1217                params![
1218                    new_metrics[0],
1219                    new_metrics[1],
1220                    new_metrics[2],
1221                    new_metrics[3],
1222                    new_metrics[4],
1223                    new_metrics[5],
1224                    new_metrics[6],
1225                    new_metrics[7],
1226                    new_cost_usd,
1227                    now,
1228                    task_id,
1229                ],
1230            )?;
1231
1232            Ok(Task {
1233                cost_usd: new_cost_usd,
1234                metrics: new_metrics,
1235                updated_at: now,
1236                ..task
1237            })
1238        })
1239    }
1240
1241    /// Claim a task for an agent.
1242    /// Uses the first timed state (typically "working") as the claiming state.
1243    pub fn claim_task(
1244        &self,
1245        task_id: &str,
1246        agent_id: &str,
1247        states_config: &StatesConfig,
1248    ) -> Result<Task> {
1249        let now = now_ms();
1250
1251        // Find the first timed state to use for claiming (typically "working")
1252        let claim_status = states_config
1253            .definitions
1254            .iter()
1255            .find(|(_, def)| def.timed)
1256            .map(|(name, _)| name.as_str())
1257            .unwrap_or("working");
1258
1259        self.with_conn(|conn| {
1260            // Get the task (using internal helper to avoid deadlock)
1261            let task =
1262                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1263
1264            // Check if already claimed
1265            if task.worker_id.is_some() {
1266                return Err(anyhow!("Task is already claimed"));
1267            }
1268
1269            // Validate state transition
1270            if !states_config.is_valid_transition(&task.status, claim_status) {
1271                let exits = states_config.get_exits(&task.status);
1272                return Err(anyhow!(
1273                    "Cannot claim task in state '{}'. Allowed transitions: {:?}",
1274                    task.status,
1275                    exits
1276                ));
1277            }
1278
1279            // Get the agent (using internal helper to avoid deadlock)
1280            let agent =
1281                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1282
1283            // Check tag affinity - needed_tags (AND - must have ALL)
1284            if !task.needed_tags.is_empty() {
1285                for needed in &task.needed_tags {
1286                    if !agent.tags.contains(needed) {
1287                        return Err(anyhow!("Agent missing required tag: {}", needed));
1288                    }
1289                }
1290            }
1291
1292            // Check tag affinity - wanted_tags (OR - must have AT LEAST ONE)
1293            if !task.wanted_tags.is_empty() {
1294                let has_any = task
1295                    .wanted_tags
1296                    .iter()
1297                    .any(|wanted| agent.tags.contains(wanted));
1298                if !has_any {
1299                    return Err(anyhow!("Agent has none of the wanted tags"));
1300                }
1301            }
1302
1303            conn.execute(
1304                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = ?4, updated_at = ?5
1305                 WHERE id = ?6",
1306                params![agent_id, now, claim_status, now, now, task_id,],
1307            )?;
1308
1309            // Record state transition (accumulates time if coming from timed state)
1310            record_state_transition(
1311                conn,
1312                task_id,
1313                claim_status,
1314                Some(agent_id),
1315                None,
1316                states_config,
1317            )?;
1318
1319            // Refresh agent heartbeat
1320            conn.execute(
1321                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1322                params![now, agent_id],
1323            )?;
1324
1325            Ok(Task {
1326                worker_id: Some(agent_id.to_string()),
1327                claimed_at: Some(now),
1328                status: claim_status.to_string(),
1329                started_at: Some(now),
1330                updated_at: now,
1331                ..task
1332            })
1333        })
1334    }
1335
1336    /// Release a task claim.
1337    pub fn release_task(
1338        &self,
1339        task_id: &str,
1340        agent_id: &str,
1341        states_config: &StatesConfig,
1342    ) -> Result<()> {
1343        let now = now_ms();
1344        let release_status = &states_config.initial;
1345
1346        self.with_conn(|conn| {
1347            let task =
1348                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1349
1350            if task.worker_id.as_deref() != Some(agent_id) {
1351                return Err(anyhow!("Task is not owned by this agent"));
1352            }
1353
1354            // Record state transition (accumulates time if coming from timed state)
1355            record_state_transition(
1356                conn,
1357                task_id,
1358                release_status,
1359                Some(agent_id),
1360                None,
1361                states_config,
1362            )?;
1363
1364            conn.execute(
1365                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1366                 WHERE id = ?3",
1367                params![release_status, now, task_id],
1368            )?;
1369
1370            Ok(())
1371        })
1372    }
1373
1374    /// Force release a task regardless of owner.
1375    pub fn force_release(&self, task_id: &str, states_config: &StatesConfig) -> Result<()> {
1376        let now = now_ms();
1377        let release_status = &states_config.initial;
1378
1379        self.with_conn(|conn| {
1380            let task =
1381                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1382
1383            // Record state transition (accumulates time if coming from timed state)
1384            record_state_transition(
1385                conn,
1386                task_id,
1387                release_status,
1388                task.worker_id.as_deref(),
1389                None,
1390                states_config,
1391            )?;
1392
1393            conn.execute(
1394                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1395                 WHERE id = ?3",
1396                params![release_status, now, task_id],
1397            )?;
1398
1399            Ok(())
1400        })
1401    }
1402
1403    /// Force claim a task even if owned by another agent.
1404    pub fn force_claim_task(
1405        &self,
1406        task_id: &str,
1407        agent_id: &str,
1408        states_config: &StatesConfig,
1409    ) -> Result<Task> {
1410        let now = now_ms();
1411
1412        // Find the first timed state to use for claiming (typically "working")
1413        let claim_status = states_config
1414            .definitions
1415            .iter()
1416            .find(|(_, def)| def.timed)
1417            .map(|(name, _)| name.as_str())
1418            .unwrap_or("working");
1419
1420        self.with_conn(|conn| {
1421            // Get the task
1422            let task =
1423                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1424
1425            // Get the agent
1426            let agent =
1427                get_worker_internal(conn, agent_id)?.ok_or_else(|| anyhow!("Agent not found"))?;
1428
1429            // Check tag affinity - needed_tags (AND)
1430            if !task.needed_tags.is_empty() {
1431                for needed in &task.needed_tags {
1432                    if !agent.tags.contains(needed) {
1433                        return Err(anyhow!("Agent missing required tag: {}", needed));
1434                    }
1435                }
1436            }
1437
1438            // Check tag affinity - wanted_tags (OR)
1439            if !task.wanted_tags.is_empty() {
1440                let has_any = task
1441                    .wanted_tags
1442                    .iter()
1443                    .any(|wanted| agent.tags.contains(wanted));
1444                if !has_any {
1445                    return Err(anyhow!("Agent has none of the wanted tags"));
1446                }
1447            }
1448
1449            conn.execute(
1450                "UPDATE tasks SET worker_id = ?1, claimed_at = ?2, status = ?3, started_at = COALESCE(started_at, ?4), updated_at = ?5
1451                 WHERE id = ?6",
1452                params![agent_id, now, claim_status, now, now, task_id,],
1453            )?;
1454
1455            // Record state transition (accumulates time if coming from timed state)
1456            record_state_transition(
1457                conn,
1458                task_id,
1459                claim_status,
1460                Some(agent_id),
1461                None,
1462                states_config,
1463            )?;
1464
1465            // Refresh agent heartbeat
1466            conn.execute(
1467                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1468                params![now, agent_id],
1469            )?;
1470
1471            Ok(Task {
1472                worker_id: Some(agent_id.to_string()),
1473                claimed_at: Some(now),
1474                status: claim_status.to_string(),
1475                started_at: task.started_at.or(Some(now)),
1476                updated_at: now,
1477                ..task
1478            })
1479        })
1480    }
1481
1482    /// Release a task claim with a specified state.
1483    pub fn release_task_with_state(
1484        &self,
1485        task_id: &str,
1486        agent_id: &str,
1487        state: &str,
1488        states_config: &StatesConfig,
1489    ) -> Result<()> {
1490        let now = now_ms();
1491
1492        self.with_conn(|conn| {
1493            let task =
1494                get_task_internal(conn, task_id)?.ok_or_else(|| anyhow!("Task not found"))?;
1495
1496            if task.worker_id.as_deref() != Some(agent_id) {
1497                return Err(anyhow!("Task is not owned by this agent"));
1498            }
1499
1500            // Validate state exists
1501            if !states_config.is_valid_state(state) {
1502                return Err(anyhow!(
1503                    "Invalid state '{}'. Valid states: {:?}",
1504                    state,
1505                    states_config.state_names()
1506                ));
1507            }
1508
1509            // Validate transition
1510            if !states_config.is_valid_transition(&task.status, state) {
1511                let exits = states_config.get_exits(&task.status);
1512                return Err(anyhow!(
1513                    "Invalid transition from '{}' to '{}'. Allowed transitions: {:?}",
1514                    task.status,
1515                    state,
1516                    exits
1517                ));
1518            }
1519
1520            // Set completed_at when entering completed status (even if it has reopen exits)
1521            let completed_at = if state == "completed" {
1522                Some(now)
1523            } else {
1524                None
1525            };
1526
1527            // Record state transition (accumulates time if coming from timed state)
1528            record_state_transition(conn, task_id, state, Some(agent_id), None, states_config)?;
1529
1530            conn.execute(
1531                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, completed_at = COALESCE(?2, completed_at), updated_at = ?3
1532                 WHERE id = ?4",
1533                params![state, completed_at, now, task_id],
1534            )?;
1535
1536            Ok(())
1537        })
1538    }
1539
1540    /// Force release stale claims.
1541    pub fn force_release_stale(
1542        &self,
1543        timeout_seconds: i64,
1544        states_config: &StatesConfig,
1545    ) -> Result<i32> {
1546        let now = now_ms();
1547        let cutoff = now - (timeout_seconds * 1000);
1548        let release_status = &states_config.initial;
1549
1550        self.with_conn(|conn| {
1551            let updated = conn.execute(
1552                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?1, updated_at = ?2
1553                 WHERE claimed_at < ?3 AND worker_id IS NOT NULL",
1554                params![release_status, now, cutoff],
1555            )?;
1556
1557            Ok(updated as i32)
1558        })
1559    }
1560
1561    /// Complete a task and release file locks held by the agent.
1562    /// Uses "completed" state by default, which should be a terminal state.
1563    /// Checks that all children (via 'contains' dependencies) are complete.
1564    pub fn complete_task(
1565        &self,
1566        task_id: &str,
1567        agent_id: &str,
1568        states_config: &StatesConfig,
1569    ) -> Result<Task> {
1570        let now = now_ms();
1571
1572        // Find a terminal state to use (prefer "completed" if it exists)
1573        let complete_status = if states_config.definitions.contains_key("completed") {
1574            "completed"
1575        } else {
1576            // Find any terminal state
1577            states_config
1578                .definitions
1579                .iter()
1580                .find(|(_, def)| def.exits.is_empty())
1581                .map(|(name, _)| name.as_str())
1582                .unwrap_or("completed")
1583        };
1584
1585        self.with_conn_mut(|conn| {
1586            let tx = conn.transaction()?;
1587
1588            // Get the task
1589            let mut stmt = tx.prepare("SELECT * FROM tasks WHERE id = ?1")?;
1590            let task = stmt
1591                .query_row(params![task_id], parse_task_row)
1592                .map_err(|_| anyhow!("Task not found"))?;
1593            drop(stmt);
1594
1595            // Verify ownership
1596            if task.worker_id.as_deref() != Some(agent_id) {
1597                return Err(anyhow!("Task is not owned by this agent"));
1598            }
1599
1600            // Check for incomplete children (blocking completion)
1601            let incomplete_children: i32 = tx.query_row(
1602                "SELECT COUNT(*) FROM dependencies d
1603                 INNER JOIN tasks child ON d.to_task_id = child.id
1604                 WHERE d.from_task_id = ?1 AND d.dep_type = 'contains'
1605                 AND child.status IN (SELECT value FROM json_each(?2))",
1606                params![
1607                    task_id,
1608                    serde_json::to_string(&states_config.blocking_states)?
1609                ],
1610                |row| row.get(0),
1611            )?;
1612
1613            if incomplete_children > 0 {
1614                return Err(anyhow!(
1615                    "Cannot complete task: {} child task(s) are not complete",
1616                    incomplete_children
1617                ));
1618            }
1619
1620            // Validate transition
1621            if !states_config.is_valid_transition(&task.status, complete_status) {
1622                let exits = states_config.get_exits(&task.status);
1623                return Err(anyhow!(
1624                    "Cannot complete task in state '{}'. Allowed transitions: {:?}",
1625                    task.status,
1626                    exits
1627                ));
1628            }
1629
1630            // Record state transition (accumulates time from timed state)
1631            record_state_transition(
1632                &tx,
1633                task_id,
1634                complete_status,
1635                Some(agent_id),
1636                None,
1637                states_config,
1638            )?;
1639
1640            // Update task to completed
1641            tx.execute(
1642                "UPDATE tasks SET status = ?1, completed_at = ?2, updated_at = ?3,
1643                 worker_id = NULL, claimed_at = NULL
1644                 WHERE id = ?4",
1645                params![complete_status, now, now, task_id],
1646            )?;
1647
1648            // Release file locks associated with this task (for auto-cleanup)
1649            tx.execute(
1650                "DELETE FROM file_locks WHERE task_id = ?1",
1651                params![task_id],
1652            )?;
1653
1654            // Refresh agent heartbeat
1655            tx.execute(
1656                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
1657                params![now, agent_id],
1658            )?;
1659
1660            tx.commit()?;
1661
1662            Ok(Task {
1663                status: complete_status.to_string(),
1664                completed_at: Some(now),
1665                updated_at: now,
1666                worker_id: None,
1667                claimed_at: None,
1668                ..task
1669            })
1670        })
1671    }
1672
1673    /// Get all tasks. Excludes soft-deleted tasks.
1674    pub fn get_all_tasks(&self) -> Result<Vec<Task>> {
1675        self.with_conn(|conn| {
1676            let mut stmt =
1677                conn.prepare("SELECT * FROM tasks WHERE deleted_at IS NULL ORDER BY created_at")?;
1678            let tasks = stmt
1679                .query_map([], parse_task_row)?
1680                .filter_map(|r| r.ok())
1681                .collect();
1682            Ok(tasks)
1683        })
1684    }
1685
1686    /// Get tasks by status.
1687    #[allow(dead_code)]
1688    pub fn get_tasks_by_status(&self, status: &str) -> Result<Vec<Task>> {
1689        self.with_conn(|conn| {
1690            let mut stmt =
1691                conn.prepare("SELECT * FROM tasks WHERE status = ?1 ORDER BY created_at")?;
1692            let tasks = stmt
1693                .query_map(params![status], parse_task_row)?
1694                .filter_map(|r| r.ok())
1695                .collect();
1696            Ok(tasks)
1697        })
1698    }
1699
1700    /// Get claimed tasks. Excludes soft-deleted tasks.
1701    pub fn get_claimed_tasks(&self, agent_id: Option<&str>) -> Result<Vec<Task>> {
1702        self.with_conn(|conn| {
1703            let tasks = if let Some(aid) = agent_id {
1704                let mut stmt = conn
1705                    .prepare("SELECT * FROM tasks WHERE worker_id = ?1 AND deleted_at IS NULL ORDER BY claimed_at")?;
1706                stmt.query_map(params![aid], parse_task_row)?
1707                    .filter_map(|r| r.ok())
1708                    .collect()
1709            } else {
1710                let mut stmt = conn.prepare(
1711                    "SELECT * FROM tasks WHERE worker_id IS NOT NULL AND deleted_at IS NULL ORDER BY claimed_at",
1712                )?;
1713                stmt.query_map([], parse_task_row)?
1714                    .filter_map(|r| r.ok())
1715                    .collect()
1716            };
1717
1718            Ok(tasks)
1719        })
1720    }
1721}
1722
1723/// Helper function to create task tree recursively within a transaction.
1724/// Creates dependencies from parent to children using child_type.
1725/// Creates dependencies between siblings using sibling_type.
1726/// Supports referencing existing tasks via ref_id.
1727#[allow(clippy::too_many_arguments)]
1728fn create_tree_recursive(
1729    conn: &Connection,
1730    input: &TaskTreeInput,
1731    parent_id: Option<&str>,
1732    prev_sibling_id: Option<&str>,
1733    child_type: Option<&str>,
1734    sibling_type: Option<&str>,
1735    all_ids: &mut Vec<String>,
1736    phase_warnings: &mut Vec<String>,
1737    tag_warnings: &mut Vec<String>,
1738    states_config: &StatesConfig,
1739    phases_config: &PhasesConfig,
1740    tags_config: &TagsConfig,
1741    ids_config: &IdsConfig,
1742) -> Result<String> {
1743    // Check if this node references an existing task
1744    let task_id = if let Some(ref ref_id) = input.ref_id {
1745        // Verify the referenced task exists
1746        let exists: bool = conn.query_row(
1747            "SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?1)",
1748            params![ref_id],
1749            |row| row.get(0),
1750        )?;
1751        if !exists {
1752            return Err(anyhow::anyhow!("Referenced task '{}' not found", ref_id));
1753        }
1754        ref_id.clone()
1755    } else {
1756        // Create a new task
1757        let task_id = input
1758            .id
1759            .clone()
1760            .unwrap_or_else(|| generate_task_id(ids_config));
1761        let now = now_ms();
1762        let priority = clamp_priority(input.priority.unwrap_or(PRIORITY_DEFAULT));
1763        let initial_status = &states_config.initial;
1764
1765        // Check phase validity
1766        if let Some(ref phase) = input.phase {
1767            if let Some(warning) = phases_config.check_phase(phase)? {
1768                phase_warnings.push(format!("Task '{}': {}", task_id, warning));
1769            }
1770        }
1771
1772        let needed_tags = input.needed_tags.clone().unwrap_or_default();
1773        let wanted_tags = input.wanted_tags.clone().unwrap_or_default();
1774        let tags = input.tags.clone().unwrap_or_default();
1775
1776        // Check tag validity for all tag types
1777        for warning in tags_config.validate_tags(&tags)? {
1778            tag_warnings.push(format!("Task '{}': {}", task_id, warning));
1779        }
1780        for warning in tags_config.validate_tags(&needed_tags)? {
1781            tag_warnings.push(format!("Task '{}' needed_tags: {}", task_id, warning));
1782        }
1783        for warning in tags_config.validate_tags(&wanted_tags)? {
1784            tag_warnings.push(format!("Task '{}' wanted_tags: {}", task_id, warning));
1785        }
1786
1787        let needed_tags_json = serde_json::to_string(&needed_tags)?;
1788        let wanted_tags_json = serde_json::to_string(&wanted_tags)?;
1789        let tags_json = serde_json::to_string(&tags)?;
1790
1791        conn.execute(
1792            "INSERT INTO tasks (
1793                id, title, description, status, phase, priority,
1794                needed_tags, wanted_tags, tags, points, time_estimate_ms, created_at, updated_at
1795            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1796            params![
1797                &task_id,
1798                &input.title,
1799                &input.description,
1800                initial_status,
1801                &input.phase,
1802                priority.to_string(),
1803                needed_tags_json,
1804                wanted_tags_json,
1805                tags_json,
1806                input.points,
1807                input.time_estimate_ms,
1808                now,
1809                now,
1810            ],
1811        )?;
1812
1813        // Record initial state transition
1814        record_state_transition(conn, &task_id, initial_status, None, None, states_config)?;
1815
1816        // Sync tags to junction tables for indexed lookups
1817        sync_task_tags(conn, &task_id, &tags)?;
1818        sync_needed_tags(conn, &task_id, &needed_tags)?;
1819        sync_wanted_tags(conn, &task_id, &wanted_tags)?;
1820
1821        task_id
1822    };
1823
1824    // Create dependency from parent if child_type is specified
1825    if let (Some(pid), Some(ct)) = (parent_id, child_type) {
1826        Database::add_dependency_internal(conn, pid, &task_id, ct)?;
1827    }
1828
1829    // Create dependency from previous sibling if sibling_type is specified
1830    if let (Some(prev_id), Some(st)) = (prev_sibling_id, sibling_type) {
1831        Database::add_dependency_internal(conn, prev_id, &task_id, st)?;
1832    }
1833
1834    all_ids.push(task_id.clone());
1835
1836    // Create children with dependencies based on child_type and sibling_type
1837    let mut prev_child_id: Option<String> = None;
1838    for child in input.children.iter() {
1839        let child_id = create_tree_recursive(
1840            conn,
1841            child,
1842            Some(&task_id),
1843            prev_child_id.as_deref(),
1844            child_type,
1845            sibling_type,
1846            all_ids,
1847            phase_warnings,
1848            tag_warnings,
1849            states_config,
1850            phases_config,
1851            tags_config,
1852            ids_config,
1853        )?;
1854        prev_child_id = Some(child_id);
1855    }
1856
1857    Ok(task_id)
1858}