Skip to main content

task_graph_mcp/db/
agents.rs

1//! Worker CRUD operations.
2
3use super::{Database, now_ms};
4use crate::config::IdsConfig;
5use crate::types::{CleanupSummary, DisconnectSummary, Worker};
6use anyhow::{Result, anyhow};
7use petname::{Generator, Petnames};
8use rusqlite::{Connection, params};
9
10/// Maximum length for worker IDs (4-word petnames can be ~50 chars).
11pub const MAX_WORKER_ID_LEN: usize = 64;
12
13/// Generate a petname-based agent ID using the large wordlist with configured case style.
14/// With 4 words from a large wordlist, collisions are extremely unlikely.
15fn generate_agent_id(ids_config: &IdsConfig) -> String {
16    let words = ids_config.agent_id_words;
17    let case = ids_config.agent_id_case;
18
19    // Generate with hyphen separator first (petname's default format)
20    let base = Petnames::medium()
21        .generate_one(words, "-")
22        .unwrap_or_else(|| format!("worker-{}", now_ms()));
23
24    // Convert to desired case
25    case.convert(&base)
26}
27
28/// Parse overlays JSON from DB (nullable TEXT column) into Vec<String>.
29fn parse_overlays(overlays_json: &Option<String>) -> Vec<String> {
30    overlays_json
31        .as_deref()
32        .and_then(|s| serde_json::from_str(s).ok())
33        .unwrap_or_default()
34}
35
36/// Internal helper to get a worker using an existing connection (avoids deadlock).
37fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
38    let mut stmt = conn.prepare(
39        "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
40         FROM workers WHERE id = ?1",
41    )?;
42
43    let result = stmt.query_row(params![worker_id], |row| {
44        let id: String = row.get(0)?;
45        let tags_json: String = row.get(1)?;
46        let max_claims: i32 = row.get(2)?;
47        let registered_at: i64 = row.get(3)?;
48        let last_heartbeat: i64 = row.get(4)?;
49        let last_status: Option<String> = row.get(5)?;
50        let last_phase: Option<String> = row.get(6)?;
51        let last_task_id: Option<String> = row.get(7)?;
52        let workflow: Option<String> = row.get(8)?;
53        let overlays_json: Option<String> = row.get(9)?;
54
55        Ok((
56            id,
57            tags_json,
58            max_claims,
59            registered_at,
60            last_heartbeat,
61            last_status,
62            last_phase,
63            last_task_id,
64            workflow,
65            overlays_json,
66        ))
67    });
68
69    match result {
70        Ok((
71            id,
72            tags_json,
73            max_claims,
74            registered_at,
75            last_heartbeat,
76            last_status,
77            last_phase,
78            last_task_id,
79            workflow,
80            overlays_json,
81        )) => {
82            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
83            let overlays = parse_overlays(&overlays_json);
84            Ok(Some(Worker {
85                id,
86                tags,
87                max_claims,
88                registered_at,
89                last_heartbeat,
90                last_status,
91                last_phase,
92                last_task_id,
93                workflow,
94                overlays,
95            }))
96        }
97        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
98        Err(e) => Err(e.into()),
99    }
100}
101
102impl Database {
103    /// Register a new worker.
104    ///
105    /// If `worker_id` is provided, it must be at most 36 characters.
106    /// If not provided, a human-readable petname will be generated (e.g., "happy-turtle").
107    /// If `force` is true and the worker already exists, it will be re-registered
108    /// (useful for stuck worker recovery).
109    /// If `workflow` is provided, the worker will use that named workflow (e.g., "swarm" for workflow-swarm.yaml).
110    /// If `overlays` is provided, those overlays will be applied on top of the workflow.
111    #[allow(clippy::too_many_arguments)]
112    pub fn register_worker(
113        &self,
114        worker_id: Option<String>,
115        tags: Vec<String>,
116        force: bool,
117        ids_config: &IdsConfig,
118        workflow: Option<String>,
119        overlays: Vec<String>,
120        max_claims: Option<i32>,
121    ) -> Result<Worker> {
122        // Validate user-provided ID upfront (before acquiring connection)
123        let provided_id = match worker_id {
124            Some(id) => {
125                if id.len() > MAX_WORKER_ID_LEN {
126                    return Err(anyhow!(
127                        "Worker ID must be at most {} characters, got {}",
128                        MAX_WORKER_ID_LEN,
129                        id.len()
130                    ));
131                }
132                if id.is_empty() {
133                    return Err(anyhow!("Worker ID cannot be empty"));
134                }
135                Some(id)
136            }
137            None => None,
138        };
139        let now = now_ms();
140        let max_claims = match max_claims {
141            Some(0) => i32::MAX, // 0 means unlimited
142            Some(n) => n,
143            None => 1, // Default to 1 concurrent claim
144        };
145        let tags_json = serde_json::to_string(&tags)?;
146        let overlays_json = if overlays.is_empty() {
147            None
148        } else {
149            Some(serde_json::to_string(&overlays)?)
150        };
151
152        self.with_conn(|conn| {
153            // Generate ID (with 4+ words from a large wordlist, collisions are extremely unlikely)
154            let id = match provided_id {
155                Some(id) => id,
156                None => generate_agent_id(ids_config),
157            };
158
159            // Check if worker ID already exists
160            let exists: bool = conn
161                .query_row("SELECT 1 FROM workers WHERE id = ?1", params![&id], |_| Ok(true))
162                .unwrap_or(false);
163
164            // Get current max claim sequence + 1 to initialize poll position.
165            // This ensures first poll returns empty (no events since registration).
166            // The +1 is needed because we now query with `id >= last_seq`.
167            let current_max_sequence: i64 = conn
168                .query_row("SELECT COALESCE(MAX(id), 0) FROM claim_sequence", [], |row| row.get(0))
169                .unwrap_or(0);
170            let initial_sequence = current_max_sequence + 1;
171
172            if exists {
173                if force {
174                    // Force reconnection: update existing worker and reset poll position, including workflow and overlays
175                    conn.execute(
176                        "UPDATE workers SET tags = ?1, max_claims = ?2, last_heartbeat = ?3, last_claim_sequence = ?4, workflow = ?5, overlays = ?6 WHERE id = ?7",
177                        params![tags_json, max_claims, now, initial_sequence, &workflow, &overlays_json, &id],
178                    )?;
179                } else {
180                    return Err(anyhow!("Worker ID '{}' already registered. Use force=true to reconnect.", id));
181                }
182            } else {
183                conn.execute(
184                    "INSERT INTO workers (id, tags, max_claims, registered_at, last_heartbeat, last_claim_sequence, workflow, overlays)
185                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
186                    params![&id, tags_json, max_claims, now, now, initial_sequence, &workflow, &overlays_json],
187                )?;
188            }
189
190            Ok(Worker {
191                id,
192                tags,
193                max_claims,
194                registered_at: now,
195                last_heartbeat: now,
196                last_status: None,
197                last_phase: None,
198                last_task_id: None,
199                workflow,
200                overlays,
201            })
202        })
203    }
204
205    /// Get a worker by ID.
206    pub fn get_worker(&self, worker_id: &str) -> Result<Option<Worker>> {
207        self.with_conn(|conn| get_worker_internal(conn, worker_id))
208    }
209
210    /// Check if a worker exists. Returns error if not found.
211    pub fn require_worker(&self, worker_id: &str) -> Result<Worker> {
212        self.get_worker(worker_id)?
213            .ok_or_else(|| anyhow::anyhow!("Worker {} not found", worker_id))
214    }
215
216    /// Update a worker.
217    pub fn update_worker(
218        &self,
219        worker_id: &str,
220        tags: Option<Vec<String>>,
221        max_claims: Option<i32>,
222    ) -> Result<Worker> {
223        self.with_conn(|conn| {
224            let worker =
225                get_worker_internal(conn, worker_id)?.ok_or_else(|| anyhow!("Worker not found"))?;
226
227            let new_tags = tags.unwrap_or(worker.tags.clone());
228            let new_max_claims = max_claims.unwrap_or(worker.max_claims);
229            let tags_json = serde_json::to_string(&new_tags)?;
230
231            conn.execute(
232                "UPDATE workers SET tags = ?1, max_claims = ?2 WHERE id = ?3",
233                params![tags_json, new_max_claims, worker_id],
234            )?;
235
236            Ok(Worker {
237                id: worker_id.to_string(),
238                tags: new_tags,
239                max_claims: new_max_claims,
240                registered_at: worker.registered_at,
241                last_heartbeat: worker.last_heartbeat,
242                last_status: worker.last_status,
243                last_phase: worker.last_phase,
244                last_task_id: worker.last_task_id,
245                workflow: worker.workflow,
246                overlays: worker.overlays,
247            })
248        })
249    }
250
251    /// Update only the overlays for a worker.
252    pub fn update_worker_overlays(&self, worker_id: &str, overlays: Vec<String>) -> Result<Worker> {
253        let overlays_json = if overlays.is_empty() {
254            None
255        } else {
256            Some(serde_json::to_string(&overlays)?)
257        };
258
259        self.with_conn(|conn| {
260            let updated = conn.execute(
261                "UPDATE workers SET overlays = ?1 WHERE id = ?2",
262                params![overlays_json, worker_id],
263            )?;
264
265            if updated == 0 {
266                return Err(anyhow!("Worker not found"));
267            }
268
269            get_worker_internal(conn, worker_id)?
270                .ok_or_else(|| anyhow!("Worker not found after update"))
271        })
272    }
273
274    /// Update worker's last seen state (status and phase) for transition prompt tracking.
275    /// Returns the previous state (old_status, old_phase) for prompt calculation.
276    pub fn update_worker_state(
277        &self,
278        worker_id: &str,
279        new_status: Option<&str>,
280        new_phase: Option<&str>,
281        task_id: Option<&str>,
282    ) -> Result<(Option<String>, Option<String>)> {
283        self.with_conn(|conn| {
284            // Get current state
285            let (old_status, old_phase): (Option<String>, Option<String>) = conn
286                .query_row(
287                    "SELECT last_status, last_phase FROM workers WHERE id = ?1",
288                    params![worker_id],
289                    |row| Ok((row.get(0)?, row.get(1)?)),
290                )
291                .map_err(|e| match e {
292                    rusqlite::Error::QueryReturnedNoRows => anyhow!("Worker not found"),
293                    e => e.into(),
294                })?;
295
296            // Update to new state (including last_task_id for per-task tracking)
297            conn.execute(
298                "UPDATE workers SET last_status = ?1, last_phase = ?2, last_task_id = ?3 WHERE id = ?4",
299                params![new_status, new_phase, task_id, worker_id],
300            )?;
301
302            Ok((old_status, old_phase))
303        })
304    }
305
306    /// Update worker heartbeat.
307    pub fn heartbeat(
308        &self,
309        worker_id: &str,
310        states_config: &crate::config::StatesConfig,
311    ) -> Result<i32> {
312        let now = now_ms();
313
314        self.with_conn(|conn| {
315            let updated = conn.execute(
316                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
317                params![now, worker_id],
318            )?;
319
320            if updated == 0 {
321                return Err(anyhow!("Worker not found"));
322            }
323
324            // Return current claim count (all timed states)
325            get_claim_count_internal(conn, worker_id, states_config)
326        })
327    }
328
329    /// Unregister a worker (releases all claims).
330    /// Returns a summary of released tasks and files.
331    pub fn unregister_worker(
332        &self,
333        worker_id: &str,
334        final_status: &str,
335    ) -> Result<DisconnectSummary> {
336        self.with_conn_mut(|conn| {
337            let tx = conn.transaction()?;
338
339            // Release all task claims, setting them to final_status
340            let tasks_released = tx.execute(
341                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?2
342                 WHERE worker_id = ?1",
343                params![worker_id, final_status],
344            )? as i32;
345
346            // Remove all file locks
347            let files_released = tx.execute(
348                "DELETE FROM file_locks WHERE worker_id = ?1",
349                params![worker_id],
350            )? as i32;
351
352            // Remove worker
353            tx.execute("DELETE FROM workers WHERE id = ?1", params![worker_id])?;
354
355            tx.commit()?;
356            Ok(DisconnectSummary {
357                tasks_released,
358                files_released,
359                final_status: final_status.to_string(),
360            })
361        })
362    }
363
364    /// List all workers.
365    pub fn list_workers(&self) -> Result<Vec<Worker>> {
366        self.with_conn(|conn| {
367            let mut stmt = conn.prepare(
368                "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
369                 FROM workers ORDER BY registered_at DESC",
370            )?;
371
372            let workers = stmt
373                .query_map([], |row| {
374                    let id: String = row.get(0)?;
375                    let tags_json: String = row.get(1)?;
376                    let max_claims: i32 = row.get(2)?;
377                    let registered_at: i64 = row.get(3)?;
378                    let last_heartbeat: i64 = row.get(4)?;
379                    let last_status: Option<String> = row.get(5)?;
380                    let last_phase: Option<String> = row.get(6)?;
381                    let last_task_id: Option<String> = row.get(7)?;
382                    let workflow: Option<String> = row.get(8)?;
383                    let overlays_json: Option<String> = row.get(9)?;
384
385                    Ok((
386                        id,
387                        tags_json,
388                        max_claims,
389                        registered_at,
390                        last_heartbeat,
391                        last_status,
392                        last_phase,
393                        last_task_id,
394                        workflow,
395                        overlays_json,
396                    ))
397                })?
398                .filter_map(|r| r.ok())
399                .map(
400                    |(
401                        id,
402                        tags_json,
403                        max_claims,
404                        registered_at,
405                        last_heartbeat,
406                        last_status,
407                        last_phase,
408                        last_task_id,
409                        workflow,
410                        overlays_json,
411                    )| {
412                        let tags: Vec<String> =
413                            serde_json::from_str(&tags_json).unwrap_or_default();
414                        let overlays = parse_overlays(&overlays_json);
415                        Worker {
416                            id,
417                            tags,
418                            max_claims,
419                            registered_at,
420                            last_heartbeat,
421                            last_status,
422                            last_phase,
423                            last_task_id,
424                            workflow,
425                            overlays,
426                        }
427                    },
428                )
429                .collect();
430
431            Ok(workers)
432        })
433    }
434
435    /// List all workers with extended info (claim count, current thought).
436    pub fn list_workers_info(
437        &self,
438        states_config: &crate::config::StatesConfig,
439    ) -> Result<Vec<crate::types::WorkerInfo>> {
440        self.with_conn(|conn| {
441            let timed_states = states_config.timed_state_names();
442            let (status_in, status_in_thought) = if timed_states.is_empty() {
443                ("status = '__none__'".to_string(), "status = '__none__'".to_string())
444            } else {
445                let quoted: Vec<String> = timed_states.iter().map(|s| format!("'{}'", s)).collect();
446                let clause = format!("status IN ({})", quoted.join(", "));
447                (clause.clone(), clause)
448            };
449
450            let sql = format!(
451                "SELECT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
452                        (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND {}) as claim_count,
453                        (SELECT current_thought FROM tasks WHERE worker_id = w.id AND {} AND current_thought IS NOT NULL LIMIT 1) as current_thought,
454                        w.last_status, w.last_phase, w.last_task_id, w.workflow, w.overlays
455                 FROM workers w ORDER BY w.registered_at DESC",
456                status_in, status_in_thought
457            );
458
459            let mut stmt = conn.prepare(&sql)?;
460
461            let workers = stmt.query_map([], |row| {
462                let id: String = row.get(0)?;
463                let tags_json: String = row.get(1)?;
464                let max_claims: i32 = row.get(2)?;
465                let registered_at: i64 = row.get(3)?;
466                let last_heartbeat: i64 = row.get(4)?;
467                let claim_count: i32 = row.get(5)?;
468                let current_thought: Option<String> = row.get(6)?;
469                let last_status: Option<String> = row.get(7)?;
470                let last_phase: Option<String> = row.get(8)?;
471                let last_task_id: Option<String> = row.get(9)?;
472                let workflow: Option<String> = row.get(10)?;
473                let overlays_json: Option<String> = row.get(11)?;
474
475                Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json))
476            })?
477            .filter_map(|r| r.ok())
478            .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json)| {
479                let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
480                let overlays = parse_overlays(&overlays_json);
481                crate::types::WorkerInfo {
482                    id,
483                    tags,
484                    max_claims,
485                    claim_count,
486                    current_thought,
487                    registered_at,
488                    last_heartbeat,
489                    last_status,
490                    last_phase,
491                    last_task_id,
492                    workflow,
493                    overlays,
494                }
495            })
496            .collect();
497
498            Ok(workers)
499        })
500    }
501
502    /// List workers with optional filters by tags, file claimed, or related task.
503    ///
504    /// - `tags`: Workers must have ALL of these tags
505    /// - `file`: Workers that have claimed this file
506    /// - `task_id`: Workers working on tasks related to this task
507    /// - `depth`: Task relationship depth (-3 to 3). Negative: ancestors, positive: descendants
508    pub fn list_workers_filtered(
509        &self,
510        tags: Option<&Vec<String>>,
511        file: Option<&str>,
512        task_id: Option<&str>,
513        depth: i32,
514        states_config: &crate::config::StatesConfig,
515    ) -> Result<Vec<crate::types::WorkerInfo>> {
516        self.with_conn(|conn| {
517            let timed_states = states_config.timed_state_names();
518            let status_clause = if timed_states.is_empty() {
519                "status = '__none__'".to_string()
520            } else {
521                let quoted: Vec<String> = timed_states.iter().map(|s| format!("'{}'", s)).collect();
522                format!("status IN ({})", quoted.join(", "))
523            };
524
525            // Start with base query
526            let mut sql = format!(
527                "SELECT DISTINCT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
528                        (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND {}) as claim_count,
529                        (SELECT current_thought FROM tasks WHERE worker_id = w.id AND {} AND current_thought IS NOT NULL LIMIT 1) as current_thought,
530                        w.last_status, w.last_phase, w.last_task_id, w.workflow, w.overlays
531                 FROM workers w WHERE 1=1",
532                status_clause, status_clause
533            );
534            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
535
536            // Filter by file claim
537            if let Some(f) = file {
538                sql.push_str(" AND w.id IN (SELECT worker_id FROM file_locks WHERE file_path = ?)");
539                params_vec.push(Box::new(f.to_string()));
540            }
541
542            // Filter by related task (with depth traversal)
543            if let Some(tid) = task_id {
544                // Get all related task IDs at the given depth
545                let related_task_ids = Self::get_related_task_ids_internal(conn, tid, depth)?;
546                if !related_task_ids.is_empty() {
547                    let placeholders: Vec<String> = related_task_ids.iter().map(|_| "?".to_string()).collect();
548                    sql.push_str(&format!(
549                        " AND w.id IN (SELECT DISTINCT worker_id FROM tasks WHERE id IN ({}) AND worker_id IS NOT NULL)",
550                        placeholders.join(", ")
551                    ));
552                    for task in related_task_ids {
553                        params_vec.push(Box::new(task));
554                    }
555                } else {
556                    // No related tasks found, return empty result
557                    return Ok(Vec::new());
558                }
559            }
560
561            sql.push_str(" ORDER BY w.registered_at DESC");
562
563            let params_refs: Vec<&dyn rusqlite::ToSql> =
564                params_vec.iter().map(|b| b.as_ref()).collect();
565
566            let mut stmt = conn.prepare(&sql)?;
567            let workers: Vec<crate::types::WorkerInfo> = stmt
568                .query_map(params_refs.as_slice(), |row| {
569                    let id: String = row.get(0)?;
570                    let tags_json: String = row.get(1)?;
571                    let max_claims: i32 = row.get(2)?;
572                    let registered_at: i64 = row.get(3)?;
573                    let last_heartbeat: i64 = row.get(4)?;
574                    let claim_count: i32 = row.get(5)?;
575                    let current_thought: Option<String> = row.get(6)?;
576                    let last_status: Option<String> = row.get(7)?;
577                    let last_phase: Option<String> = row.get(8)?;
578                    let last_task_id: Option<String> = row.get(9)?;
579                    let workflow: Option<String> = row.get(10)?;
580                    let overlays_json: Option<String> = row.get(11)?;
581
582                    Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json))
583                })?
584                .filter_map(|r| r.ok())
585                .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json)| {
586                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
587                    let overlays = parse_overlays(&overlays_json);
588                    crate::types::WorkerInfo {
589                        id,
590                        tags,
591                        max_claims,
592                        claim_count,
593                        current_thought,
594                        registered_at,
595                        last_heartbeat,
596                        last_status,
597                        last_phase,
598                        last_task_id,
599                        workflow,
600                        overlays,
601                    }
602                })
603                .collect();
604
605            // Post-filter by tags (need to check ALL tags match)
606            let workers = if let Some(required_tags) = tags {
607                workers
608                    .into_iter()
609                    .filter(|w| required_tags.iter().all(|t| w.tags.contains(t)))
610                    .collect()
611            } else {
612                workers
613            };
614
615            Ok(workers)
616        })
617    }
618
619    /// Internal helper to get related task IDs at a given depth.
620    /// Negative depth: ancestors (parents/blockers), positive depth: descendants (children/blocked).
621    fn get_related_task_ids_internal(
622        conn: &Connection,
623        task_id: &str,
624        depth: i32,
625    ) -> Result<Vec<String>> {
626        use std::collections::HashSet;
627
628        let mut result = HashSet::new();
629        result.insert(task_id.to_string());
630
631        if depth == 0 {
632            return Ok(result.into_iter().collect());
633        }
634
635        let abs_depth = depth.abs();
636        let mut current_level: HashSet<String> = [task_id.to_string()].into_iter().collect();
637
638        for _ in 0..abs_depth {
639            if current_level.is_empty() {
640                break;
641            }
642
643            let mut next_level = HashSet::new();
644
645            for tid in &current_level {
646                let related: Vec<String> = if depth > 0 {
647                    // Descendants: tasks where this task is the from_task_id (children, blocked tasks)
648                    let mut stmt = conn
649                        .prepare("SELECT to_task_id FROM dependencies WHERE from_task_id = ?1")?;
650                    stmt.query_map(params![tid], |row| row.get(0))?
651                        .filter_map(|r| r.ok())
652                        .collect()
653                } else {
654                    // Ancestors: tasks where this task is the to_task_id (parents, blockers)
655                    let mut stmt = conn
656                        .prepare("SELECT from_task_id FROM dependencies WHERE to_task_id = ?1")?;
657                    stmt.query_map(params![tid], |row| row.get(0))?
658                        .filter_map(|r| r.ok())
659                        .collect()
660                };
661
662                for related_id in related {
663                    if !result.contains(&related_id) {
664                        next_level.insert(related_id.clone());
665                        result.insert(related_id);
666                    }
667                }
668            }
669
670            current_level = next_level;
671        }
672
673        Ok(result.into_iter().collect())
674    }
675
676    /// Get workers with stale heartbeats.
677    pub fn get_stale_workers(&self, timeout_seconds: i64) -> Result<Vec<Worker>> {
678        let cutoff = now_ms() - (timeout_seconds * 1000);
679
680        self.with_conn(|conn| {
681            let mut stmt = conn.prepare(
682                "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
683                 FROM workers WHERE last_heartbeat < ?1",
684            )?;
685
686            let workers = stmt
687                .query_map(params![cutoff], |row| {
688                    let id: String = row.get(0)?;
689                    let tags_json: String = row.get(1)?;
690                    let max_claims: i32 = row.get(2)?;
691                    let registered_at: i64 = row.get(3)?;
692                    let last_heartbeat: i64 = row.get(4)?;
693                    let last_status: Option<String> = row.get(5)?;
694                    let last_phase: Option<String> = row.get(6)?;
695                    let last_task_id: Option<String> = row.get(7)?;
696                    let workflow: Option<String> = row.get(8)?;
697                    let overlays_json: Option<String> = row.get(9)?;
698
699                    Ok((
700                        id,
701                        tags_json,
702                        max_claims,
703                        registered_at,
704                        last_heartbeat,
705                        last_status,
706                        last_phase,
707                        last_task_id,
708                        workflow,
709                        overlays_json,
710                    ))
711                })?
712                .filter_map(|r| r.ok())
713                .map(
714                    |(
715                        id,
716                        tags_json,
717                        max_claims,
718                        registered_at,
719                        last_heartbeat,
720                        last_status,
721                        last_phase,
722                        last_task_id,
723                        workflow,
724                        overlays_json,
725                    )| {
726                        let tags: Vec<String> =
727                            serde_json::from_str(&tags_json).unwrap_or_default();
728                        let overlays = parse_overlays(&overlays_json);
729                        Worker {
730                            id,
731                            tags,
732                            max_claims,
733                            registered_at,
734                            last_heartbeat,
735                            last_status,
736                            last_phase,
737                            last_task_id,
738                            workflow,
739                            overlays,
740                        }
741                    },
742                )
743                .collect();
744
745            Ok(workers)
746        })
747    }
748
749    /// Cleanup stale workers by evicting them and releasing their claims.
750    ///
751    /// For each stale worker, individual `task_sequence` entries are inserted
752    /// for every released task before calling `unregister_worker()`. This allows
753    /// polling agents to discover released tasks gradually via the sequence table
754    /// rather than all tasks becoming available simultaneously (which would cause
755    /// scheduling storms when an agent holding many tasks times out).
756    ///
757    /// Returns a summary of the cleanup operation.
758    pub fn cleanup_stale_workers(
759        &self,
760        timeout_seconds: i64,
761        final_status: &str,
762    ) -> Result<CleanupSummary> {
763        let stale_workers = self.get_stale_workers(timeout_seconds)?;
764
765        let mut total_tasks_released = 0;
766        let mut total_files_released = 0;
767        let mut evicted_worker_ids = Vec::new();
768
769        for worker in &stale_workers {
770            // Record individual task_sequence entries BEFORE bulk-releasing
771            let released_task_ids =
772                self.record_stale_release_transitions(&worker.id, final_status)?;
773
774            if released_task_ids.len() > 5 {
775                eprintln!(
776                    "[cleanup] Bulk-releasing {} task claims from stale agent '{}' (last heartbeat: {})",
777                    released_task_ids.len(),
778                    worker.id,
779                    worker.last_heartbeat,
780                );
781            }
782
783            // Release file locks first
784            let _ = self.release_worker_locks(&worker.id);
785
786            // Unregister the worker (releases task claims and removes worker)
787            if let Ok(summary) = self.unregister_worker(&worker.id, final_status) {
788                total_tasks_released += summary.tasks_released;
789                total_files_released += summary.files_released;
790                evicted_worker_ids.push(worker.id.clone());
791            }
792        }
793
794        Ok(CleanupSummary {
795            workers_evicted: evicted_worker_ids.len() as i32,
796            tasks_released: total_tasks_released,
797            files_released: total_files_released,
798            final_status: final_status.to_string(),
799            evicted_worker_ids,
800        })
801    }
802
803    /// Record individual task_sequence entries for each task claimed by a stale
804    /// worker before it is unregistered. Provides gradual discovery and audit trail.
805    fn record_stale_release_transitions(
806        &self,
807        worker_id: &str,
808        final_status: &str,
809    ) -> Result<Vec<String>> {
810        self.with_conn(|conn| {
811            let now = now_ms();
812
813            let mut stmt = conn.prepare("SELECT id FROM tasks WHERE worker_id = ?1")?;
814            let released_task_ids: Vec<String> = stmt
815                .query_map(params![worker_id], |row| row.get(0))?
816                .filter_map(|r| r.ok())
817                .collect();
818
819            for task_id in &released_task_ids {
820                // Close any open status transition
821                conn.execute(
822                    "UPDATE task_sequence SET end_timestamp = ?1
823                     WHERE task_id = ?2 AND end_timestamp IS NULL AND status IS NOT NULL",
824                    params![now, task_id],
825                )?;
826
827                // Insert stale_release transition
828                conn.execute(
829                    "INSERT INTO task_sequence (task_id, worker_id, status, reason, timestamp)
830                     VALUES (?1, ?2, ?3, 'stale_release', ?4)",
831                    params![task_id, worker_id, final_status, now],
832                )?;
833            }
834
835            Ok(released_task_ids)
836        })
837    }
838
839    /// Force-expire a specific worker, releasing all its claimed tasks and file locks,
840    /// then unregistering it. Unlike cleanup_stale_workers, this does not check
841    /// heartbeat staleness -- it unconditionally expires the worker.
842    pub fn expire_worker(&self, worker_id: &str, final_status: &str) -> Result<DisconnectSummary> {
843        let files_from_locks = self.release_worker_locks(worker_id).unwrap_or(0);
844        let mut summary = self.unregister_worker(worker_id, final_status)?;
845        summary.files_released += files_from_locks;
846        Ok(summary)
847    }
848
849    /// Get claim count for a worker (counts tasks in any timed state).
850    pub fn get_claim_count(
851        &self,
852        worker_id: &str,
853        states_config: &crate::config::StatesConfig,
854    ) -> Result<i32> {
855        self.with_conn(|conn| get_claim_count_internal(conn, worker_id, states_config))
856    }
857}
858
859/// Internal helper to get claim count using an existing connection (avoids deadlock in transactions).
860/// Counts tasks in any timed state, not just 'working'.
861pub(crate) fn get_claim_count_internal(
862    conn: &Connection,
863    worker_id: &str,
864    states_config: &crate::config::StatesConfig,
865) -> Result<i32> {
866    let timed_states = states_config.timed_state_names();
867    if timed_states.is_empty() {
868        return Ok(0);
869    }
870    let placeholders: Vec<String> = (0..timed_states.len())
871        .map(|i| format!("?{}", i + 2))
872        .collect();
873    let sql = format!(
874        "SELECT COUNT(*) FROM tasks WHERE worker_id = ?1 AND status IN ({})",
875        placeholders.join(", ")
876    );
877    let mut stmt = conn.prepare(&sql)?;
878    let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
879    param_values.push(Box::new(worker_id.to_string()));
880    for state in &timed_states {
881        param_values.push(Box::new(state.to_string()));
882    }
883    let param_refs: Vec<&dyn rusqlite::ToSql> = param_values.iter().map(|b| b.as_ref()).collect();
884    let count: i32 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
885    Ok(count)
886}