Skip to main content

task_graph_mcp/db/
agents.rs

1//! Worker CRUD operations.
2
3use super::{Database, now_ms};
4use crate::config::IdsConfig;
5use crate::types::{CleanupSummary, DisconnectSummary, Worker};
6use anyhow::{Result, anyhow};
7use petname::{Generator, Petnames};
8use rusqlite::{Connection, params};
9
10/// Maximum length for worker IDs (4-word petnames can be ~50 chars).
11pub const MAX_WORKER_ID_LEN: usize = 64;
12
13/// Generate a petname-based agent ID using the large wordlist with configured case style.
14/// With 4 words from a large wordlist, collisions are extremely unlikely.
15fn generate_agent_id(ids_config: &IdsConfig) -> String {
16    let words = ids_config.agent_id_words;
17    let case = ids_config.agent_id_case;
18
19    // Generate with hyphen separator first (petname's default format)
20    let base = Petnames::medium()
21        .generate_one(words, "-")
22        .unwrap_or_else(|| format!("worker-{}", now_ms()));
23
24    // Convert to desired case
25    case.convert(&base)
26}
27
28/// Parse overlays JSON from DB (nullable TEXT column) into Vec<String>.
29fn parse_overlays(overlays_json: &Option<String>) -> Vec<String> {
30    overlays_json
31        .as_deref()
32        .and_then(|s| serde_json::from_str(s).ok())
33        .unwrap_or_default()
34}
35
36/// Internal helper to get a worker using an existing connection (avoids deadlock).
37fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
38    let mut stmt = conn.prepare(
39        "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
40         FROM workers WHERE id = ?1",
41    )?;
42
43    let result = stmt.query_row(params![worker_id], |row| {
44        let id: String = row.get(0)?;
45        let tags_json: String = row.get(1)?;
46        let max_claims: i32 = row.get(2)?;
47        let registered_at: i64 = row.get(3)?;
48        let last_heartbeat: i64 = row.get(4)?;
49        let last_status: Option<String> = row.get(5)?;
50        let last_phase: Option<String> = row.get(6)?;
51        let workflow: Option<String> = row.get(7)?;
52        let overlays_json: Option<String> = row.get(8)?;
53
54        Ok((
55            id,
56            tags_json,
57            max_claims,
58            registered_at,
59            last_heartbeat,
60            last_status,
61            last_phase,
62            workflow,
63            overlays_json,
64        ))
65    });
66
67    match result {
68        Ok((
69            id,
70            tags_json,
71            max_claims,
72            registered_at,
73            last_heartbeat,
74            last_status,
75            last_phase,
76            workflow,
77            overlays_json,
78        )) => {
79            let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
80            let overlays = parse_overlays(&overlays_json);
81            Ok(Some(Worker {
82                id,
83                tags,
84                max_claims,
85                registered_at,
86                last_heartbeat,
87                last_status,
88                last_phase,
89                workflow,
90                overlays,
91            }))
92        }
93        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
94        Err(e) => Err(e.into()),
95    }
96}
97
98impl Database {
99    /// Register a new worker.
100    ///
101    /// If `worker_id` is provided, it must be at most 36 characters.
102    /// If not provided, a human-readable petname will be generated (e.g., "happy-turtle").
103    /// If `force` is true and the worker already exists, it will be re-registered
104    /// (useful for stuck worker recovery).
105    /// If `workflow` is provided, the worker will use that named workflow (e.g., "swarm" for workflow-swarm.yaml).
106    /// If `overlays` is provided, those overlays will be applied on top of the workflow.
107    pub fn register_worker(
108        &self,
109        worker_id: Option<String>,
110        tags: Vec<String>,
111        force: bool,
112        ids_config: &IdsConfig,
113        workflow: Option<String>,
114        overlays: Vec<String>,
115    ) -> Result<Worker> {
116        // Validate user-provided ID upfront (before acquiring connection)
117        let provided_id = match worker_id {
118            Some(id) => {
119                if id.len() > MAX_WORKER_ID_LEN {
120                    return Err(anyhow!(
121                        "Worker ID must be at most {} characters, got {}",
122                        MAX_WORKER_ID_LEN,
123                        id.len()
124                    ));
125                }
126                if id.is_empty() {
127                    return Err(anyhow!("Worker ID cannot be empty"));
128                }
129                Some(id)
130            }
131            None => None,
132        };
133        let now = now_ms();
134        let max_claims = i32::MAX; // Effectively unlimited until overclaiming becomes a problem
135        let tags_json = serde_json::to_string(&tags)?;
136        let overlays_json = if overlays.is_empty() {
137            None
138        } else {
139            Some(serde_json::to_string(&overlays)?)
140        };
141
142        self.with_conn(|conn| {
143            // Generate ID (with 4+ words from a large wordlist, collisions are extremely unlikely)
144            let id = match provided_id {
145                Some(id) => id,
146                None => generate_agent_id(ids_config),
147            };
148
149            // Check if worker ID already exists
150            let exists: bool = conn
151                .query_row("SELECT 1 FROM workers WHERE id = ?1", params![&id], |_| Ok(true))
152                .unwrap_or(false);
153
154            // Get current max claim sequence + 1 to initialize poll position.
155            // This ensures first poll returns empty (no events since registration).
156            // The +1 is needed because we now query with `id >= last_seq`.
157            let current_max_sequence: i64 = conn
158                .query_row("SELECT COALESCE(MAX(id), 0) FROM claim_sequence", [], |row| row.get(0))
159                .unwrap_or(0);
160            let initial_sequence = current_max_sequence + 1;
161
162            if exists {
163                if force {
164                    // Force reconnection: update existing worker and reset poll position, including workflow and overlays
165                    conn.execute(
166                        "UPDATE workers SET tags = ?1, max_claims = ?2, last_heartbeat = ?3, last_claim_sequence = ?4, workflow = ?5, overlays = ?6 WHERE id = ?7",
167                        params![tags_json, max_claims, now, initial_sequence, &workflow, &overlays_json, &id],
168                    )?;
169                } else {
170                    return Err(anyhow!("Worker ID '{}' already registered. Use force=true to reconnect.", id));
171                }
172            } else {
173                conn.execute(
174                    "INSERT INTO workers (id, tags, max_claims, registered_at, last_heartbeat, last_claim_sequence, workflow, overlays)
175                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
176                    params![&id, tags_json, max_claims, now, now, initial_sequence, &workflow, &overlays_json],
177                )?;
178            }
179
180            Ok(Worker {
181                id,
182                tags,
183                max_claims,
184                registered_at: now,
185                last_heartbeat: now,
186                last_status: None,
187                last_phase: None,
188                workflow,
189                overlays,
190            })
191        })
192    }
193
194    /// Get a worker by ID.
195    pub fn get_worker(&self, worker_id: &str) -> Result<Option<Worker>> {
196        self.with_conn(|conn| get_worker_internal(conn, worker_id))
197    }
198
199    /// Check if a worker exists. Returns error if not found.
200    pub fn require_worker(&self, worker_id: &str) -> Result<Worker> {
201        self.get_worker(worker_id)?
202            .ok_or_else(|| anyhow::anyhow!("Worker {} not found", worker_id))
203    }
204
205    /// Update a worker.
206    pub fn update_worker(
207        &self,
208        worker_id: &str,
209        tags: Option<Vec<String>>,
210        max_claims: Option<i32>,
211    ) -> Result<Worker> {
212        self.with_conn(|conn| {
213            let worker =
214                get_worker_internal(conn, worker_id)?.ok_or_else(|| anyhow!("Worker not found"))?;
215
216            let new_tags = tags.unwrap_or(worker.tags.clone());
217            let new_max_claims = max_claims.unwrap_or(worker.max_claims);
218            let tags_json = serde_json::to_string(&new_tags)?;
219
220            conn.execute(
221                "UPDATE workers SET tags = ?1, max_claims = ?2 WHERE id = ?3",
222                params![tags_json, new_max_claims, worker_id],
223            )?;
224
225            Ok(Worker {
226                id: worker_id.to_string(),
227                tags: new_tags,
228                max_claims: new_max_claims,
229                registered_at: worker.registered_at,
230                last_heartbeat: worker.last_heartbeat,
231                last_status: worker.last_status,
232                last_phase: worker.last_phase,
233                workflow: worker.workflow,
234                overlays: worker.overlays,
235            })
236        })
237    }
238
239    /// Update only the overlays for a worker.
240    pub fn update_worker_overlays(&self, worker_id: &str, overlays: Vec<String>) -> Result<Worker> {
241        let overlays_json = if overlays.is_empty() {
242            None
243        } else {
244            Some(serde_json::to_string(&overlays)?)
245        };
246
247        self.with_conn(|conn| {
248            let updated = conn.execute(
249                "UPDATE workers SET overlays = ?1 WHERE id = ?2",
250                params![overlays_json, worker_id],
251            )?;
252
253            if updated == 0 {
254                return Err(anyhow!("Worker not found"));
255            }
256
257            get_worker_internal(conn, worker_id)?
258                .ok_or_else(|| anyhow!("Worker not found after update"))
259        })
260    }
261
262    /// Update worker's last seen state (status and phase) for transition prompt tracking.
263    /// Returns the previous state (old_status, old_phase) for prompt calculation.
264    pub fn update_worker_state(
265        &self,
266        worker_id: &str,
267        new_status: Option<&str>,
268        new_phase: Option<&str>,
269    ) -> Result<(Option<String>, Option<String>)> {
270        self.with_conn(|conn| {
271            // Get current state
272            let (old_status, old_phase): (Option<String>, Option<String>) = conn
273                .query_row(
274                    "SELECT last_status, last_phase FROM workers WHERE id = ?1",
275                    params![worker_id],
276                    |row| Ok((row.get(0)?, row.get(1)?)),
277                )
278                .map_err(|e| match e {
279                    rusqlite::Error::QueryReturnedNoRows => anyhow!("Worker not found"),
280                    e => e.into(),
281                })?;
282
283            // Update to new state
284            conn.execute(
285                "UPDATE workers SET last_status = ?1, last_phase = ?2 WHERE id = ?3",
286                params![new_status, new_phase, worker_id],
287            )?;
288
289            Ok((old_status, old_phase))
290        })
291    }
292
293    /// Update worker heartbeat.
294    pub fn heartbeat(&self, worker_id: &str) -> Result<i32> {
295        let now = now_ms();
296
297        self.with_conn(|conn| {
298            let updated = conn.execute(
299                "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
300                params![now, worker_id],
301            )?;
302
303            if updated == 0 {
304                return Err(anyhow!("Worker not found"));
305            }
306
307            // Return current claim count
308            let count: i32 = conn.query_row(
309                "SELECT COUNT(*) FROM tasks WHERE worker_id = ?1 AND status = 'working'",
310                params![worker_id],
311                |row| row.get(0),
312            )?;
313
314            Ok(count)
315        })
316    }
317
318    /// Unregister a worker (releases all claims).
319    /// Returns a summary of released tasks and files.
320    pub fn unregister_worker(
321        &self,
322        worker_id: &str,
323        final_status: &str,
324    ) -> Result<DisconnectSummary> {
325        self.with_conn_mut(|conn| {
326            let tx = conn.transaction()?;
327
328            // Release all task claims, setting them to final_status
329            let tasks_released = tx.execute(
330                "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?2
331                 WHERE worker_id = ?1",
332                params![worker_id, final_status],
333            )? as i32;
334
335            // Remove all file locks
336            let files_released = tx.execute(
337                "DELETE FROM file_locks WHERE worker_id = ?1",
338                params![worker_id],
339            )? as i32;
340
341            // Remove worker
342            tx.execute("DELETE FROM workers WHERE id = ?1", params![worker_id])?;
343
344            tx.commit()?;
345            Ok(DisconnectSummary {
346                tasks_released,
347                files_released,
348                final_status: final_status.to_string(),
349            })
350        })
351    }
352
353    /// List all workers.
354    pub fn list_workers(&self) -> Result<Vec<Worker>> {
355        self.with_conn(|conn| {
356            let mut stmt = conn.prepare(
357                "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
358                 FROM workers ORDER BY registered_at DESC",
359            )?;
360
361            let workers = stmt
362                .query_map([], |row| {
363                    let id: String = row.get(0)?;
364                    let tags_json: String = row.get(1)?;
365                    let max_claims: i32 = row.get(2)?;
366                    let registered_at: i64 = row.get(3)?;
367                    let last_heartbeat: i64 = row.get(4)?;
368                    let last_status: Option<String> = row.get(5)?;
369                    let last_phase: Option<String> = row.get(6)?;
370                    let workflow: Option<String> = row.get(7)?;
371                    let overlays_json: Option<String> = row.get(8)?;
372
373                    Ok((
374                        id,
375                        tags_json,
376                        max_claims,
377                        registered_at,
378                        last_heartbeat,
379                        last_status,
380                        last_phase,
381                        workflow,
382                        overlays_json,
383                    ))
384                })?
385                .filter_map(|r| r.ok())
386                .map(
387                    |(
388                        id,
389                        tags_json,
390                        max_claims,
391                        registered_at,
392                        last_heartbeat,
393                        last_status,
394                        last_phase,
395                        workflow,
396                        overlays_json,
397                    )| {
398                        let tags: Vec<String> =
399                            serde_json::from_str(&tags_json).unwrap_or_default();
400                        let overlays = parse_overlays(&overlays_json);
401                        Worker {
402                            id,
403                            tags,
404                            max_claims,
405                            registered_at,
406                            last_heartbeat,
407                            last_status,
408                            last_phase,
409                            workflow,
410                            overlays,
411                        }
412                    },
413                )
414                .collect();
415
416            Ok(workers)
417        })
418    }
419
420    /// List all workers with extended info (claim count, current thought).
421    pub fn list_workers_info(&self) -> Result<Vec<crate::types::WorkerInfo>> {
422        self.with_conn(|conn| {
423            let mut stmt = conn.prepare(
424                "SELECT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
425                        (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND status = 'working') as claim_count,
426                        (SELECT current_thought FROM tasks WHERE worker_id = w.id AND status = 'working' AND current_thought IS NOT NULL LIMIT 1) as current_thought,
427                        w.last_status, w.last_phase, w.workflow, w.overlays
428                 FROM workers w ORDER BY w.registered_at DESC",
429            )?;
430
431            let workers = stmt.query_map([], |row| {
432                let id: String = row.get(0)?;
433                let tags_json: String = row.get(1)?;
434                let max_claims: i32 = row.get(2)?;
435                let registered_at: i64 = row.get(3)?;
436                let last_heartbeat: i64 = row.get(4)?;
437                let claim_count: i32 = row.get(5)?;
438                let current_thought: Option<String> = row.get(6)?;
439                let last_status: Option<String> = row.get(7)?;
440                let last_phase: Option<String> = row.get(8)?;
441                let workflow: Option<String> = row.get(9)?;
442                let overlays_json: Option<String> = row.get(10)?;
443
444                Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json))
445            })?
446            .filter_map(|r| r.ok())
447            .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json)| {
448                let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
449                let overlays = parse_overlays(&overlays_json);
450                crate::types::WorkerInfo {
451                    id,
452                    tags,
453                    max_claims,
454                    claim_count,
455                    current_thought,
456                    registered_at,
457                    last_heartbeat,
458                    last_status,
459                    last_phase,
460                    workflow,
461                    overlays,
462                }
463            })
464            .collect();
465
466            Ok(workers)
467        })
468    }
469
470    /// List workers with optional filters by tags, file claimed, or related task.
471    ///
472    /// - `tags`: Workers must have ALL of these tags
473    /// - `file`: Workers that have claimed this file
474    /// - `task_id`: Workers working on tasks related to this task
475    /// - `depth`: Task relationship depth (-3 to 3). Negative: ancestors, positive: descendants
476    pub fn list_workers_filtered(
477        &self,
478        tags: Option<&Vec<String>>,
479        file: Option<&str>,
480        task_id: Option<&str>,
481        depth: i32,
482    ) -> Result<Vec<crate::types::WorkerInfo>> {
483        self.with_conn(|conn| {
484            // Start with base query
485            let mut sql = String::from(
486                "SELECT DISTINCT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
487                        (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND status = 'working') as claim_count,
488                        (SELECT current_thought FROM tasks WHERE worker_id = w.id AND status = 'working' AND current_thought IS NOT NULL LIMIT 1) as current_thought,
489                        w.last_status, w.last_phase, w.workflow, w.overlays
490                 FROM workers w WHERE 1=1",
491            );
492            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
493
494            // Filter by file claim
495            if let Some(f) = file {
496                sql.push_str(" AND w.id IN (SELECT worker_id FROM file_locks WHERE file_path = ?)");
497                params_vec.push(Box::new(f.to_string()));
498            }
499
500            // Filter by related task (with depth traversal)
501            if let Some(tid) = task_id {
502                // Get all related task IDs at the given depth
503                let related_task_ids = Self::get_related_task_ids_internal(conn, tid, depth)?;
504                if !related_task_ids.is_empty() {
505                    let placeholders: Vec<String> = related_task_ids.iter().map(|_| "?".to_string()).collect();
506                    sql.push_str(&format!(
507                        " AND w.id IN (SELECT DISTINCT worker_id FROM tasks WHERE id IN ({}) AND worker_id IS NOT NULL)",
508                        placeholders.join(", ")
509                    ));
510                    for task in related_task_ids {
511                        params_vec.push(Box::new(task));
512                    }
513                } else {
514                    // No related tasks found, return empty result
515                    return Ok(Vec::new());
516                }
517            }
518
519            sql.push_str(" ORDER BY w.registered_at DESC");
520
521            let params_refs: Vec<&dyn rusqlite::ToSql> =
522                params_vec.iter().map(|b| b.as_ref()).collect();
523
524            let mut stmt = conn.prepare(&sql)?;
525            let workers: Vec<crate::types::WorkerInfo> = stmt
526                .query_map(params_refs.as_slice(), |row| {
527                    let id: String = row.get(0)?;
528                    let tags_json: String = row.get(1)?;
529                    let max_claims: i32 = row.get(2)?;
530                    let registered_at: i64 = row.get(3)?;
531                    let last_heartbeat: i64 = row.get(4)?;
532                    let claim_count: i32 = row.get(5)?;
533                    let current_thought: Option<String> = row.get(6)?;
534                    let last_status: Option<String> = row.get(7)?;
535                    let last_phase: Option<String> = row.get(8)?;
536                    let workflow: Option<String> = row.get(9)?;
537                    let overlays_json: Option<String> = row.get(10)?;
538
539                    Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json))
540                })?
541                .filter_map(|r| r.ok())
542                .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json)| {
543                    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
544                    let overlays = parse_overlays(&overlays_json);
545                    crate::types::WorkerInfo {
546                        id,
547                        tags,
548                        max_claims,
549                        claim_count,
550                        current_thought,
551                        registered_at,
552                        last_heartbeat,
553                        last_status,
554                        last_phase,
555                        workflow,
556                        overlays,
557                    }
558                })
559                .collect();
560
561            // Post-filter by tags (need to check ALL tags match)
562            let workers = if let Some(required_tags) = tags {
563                workers
564                    .into_iter()
565                    .filter(|w| required_tags.iter().all(|t| w.tags.contains(t)))
566                    .collect()
567            } else {
568                workers
569            };
570
571            Ok(workers)
572        })
573    }
574
575    /// Internal helper to get related task IDs at a given depth.
576    /// Negative depth: ancestors (parents/blockers), positive depth: descendants (children/blocked).
577    fn get_related_task_ids_internal(
578        conn: &Connection,
579        task_id: &str,
580        depth: i32,
581    ) -> Result<Vec<String>> {
582        use std::collections::HashSet;
583
584        let mut result = HashSet::new();
585        result.insert(task_id.to_string());
586
587        if depth == 0 {
588            return Ok(result.into_iter().collect());
589        }
590
591        let abs_depth = depth.abs();
592        let mut current_level: HashSet<String> = [task_id.to_string()].into_iter().collect();
593
594        for _ in 0..abs_depth {
595            if current_level.is_empty() {
596                break;
597            }
598
599            let mut next_level = HashSet::new();
600
601            for tid in &current_level {
602                let related: Vec<String> = if depth > 0 {
603                    // Descendants: tasks where this task is the from_task_id (children, blocked tasks)
604                    let mut stmt = conn
605                        .prepare("SELECT to_task_id FROM dependencies WHERE from_task_id = ?1")?;
606                    stmt.query_map(params![tid], |row| row.get(0))?
607                        .filter_map(|r| r.ok())
608                        .collect()
609                } else {
610                    // Ancestors: tasks where this task is the to_task_id (parents, blockers)
611                    let mut stmt = conn
612                        .prepare("SELECT from_task_id FROM dependencies WHERE to_task_id = ?1")?;
613                    stmt.query_map(params![tid], |row| row.get(0))?
614                        .filter_map(|r| r.ok())
615                        .collect()
616                };
617
618                for related_id in related {
619                    if !result.contains(&related_id) {
620                        next_level.insert(related_id.clone());
621                        result.insert(related_id);
622                    }
623                }
624            }
625
626            current_level = next_level;
627        }
628
629        Ok(result.into_iter().collect())
630    }
631
632    /// Get workers with stale heartbeats.
633    pub fn get_stale_workers(&self, timeout_seconds: i64) -> Result<Vec<Worker>> {
634        let cutoff = now_ms() - (timeout_seconds * 1000);
635
636        self.with_conn(|conn| {
637            let mut stmt = conn.prepare(
638                "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
639                 FROM workers WHERE last_heartbeat < ?1",
640            )?;
641
642            let workers = stmt
643                .query_map(params![cutoff], |row| {
644                    let id: String = row.get(0)?;
645                    let tags_json: String = row.get(1)?;
646                    let max_claims: i32 = row.get(2)?;
647                    let registered_at: i64 = row.get(3)?;
648                    let last_heartbeat: i64 = row.get(4)?;
649                    let last_status: Option<String> = row.get(5)?;
650                    let last_phase: Option<String> = row.get(6)?;
651                    let workflow: Option<String> = row.get(7)?;
652                    let overlays_json: Option<String> = row.get(8)?;
653
654                    Ok((
655                        id,
656                        tags_json,
657                        max_claims,
658                        registered_at,
659                        last_heartbeat,
660                        last_status,
661                        last_phase,
662                        workflow,
663                        overlays_json,
664                    ))
665                })?
666                .filter_map(|r| r.ok())
667                .map(
668                    |(
669                        id,
670                        tags_json,
671                        max_claims,
672                        registered_at,
673                        last_heartbeat,
674                        last_status,
675                        last_phase,
676                        workflow,
677                        overlays_json,
678                    )| {
679                        let tags: Vec<String> =
680                            serde_json::from_str(&tags_json).unwrap_or_default();
681                        let overlays = parse_overlays(&overlays_json);
682                        Worker {
683                            id,
684                            tags,
685                            max_claims,
686                            registered_at,
687                            last_heartbeat,
688                            last_status,
689                            last_phase,
690                            workflow,
691                            overlays,
692                        }
693                    },
694                )
695                .collect();
696
697            Ok(workers)
698        })
699    }
700
701    /// Cleanup stale workers by evicting them and releasing their claims.
702    /// Returns a summary of the cleanup operation.
703    pub fn cleanup_stale_workers(
704        &self,
705        timeout_seconds: i64,
706        final_status: &str,
707    ) -> Result<CleanupSummary> {
708        let stale_workers = self.get_stale_workers(timeout_seconds)?;
709
710        let mut total_tasks_released = 0;
711        let mut total_files_released = 0;
712        let mut evicted_worker_ids = Vec::new();
713
714        for worker in &stale_workers {
715            // Release file locks first
716            let _ = self.release_worker_locks(&worker.id);
717
718            // Unregister the worker
719            if let Ok(summary) = self.unregister_worker(&worker.id, final_status) {
720                total_tasks_released += summary.tasks_released;
721                total_files_released += summary.files_released;
722                evicted_worker_ids.push(worker.id.clone());
723            }
724        }
725
726        Ok(CleanupSummary {
727            workers_evicted: evicted_worker_ids.len() as i32,
728            tasks_released: total_tasks_released,
729            files_released: total_files_released,
730            final_status: final_status.to_string(),
731            evicted_worker_ids,
732        })
733    }
734
735    /// Get claim count for a worker.
736    pub fn get_claim_count(&self, worker_id: &str) -> Result<i32> {
737        self.with_conn(|conn| {
738            let count: i32 = conn.query_row(
739                "SELECT COUNT(*) FROM tasks WHERE worker_id = ?1 AND status = 'working'",
740                params![worker_id],
741                |row| row.get(0),
742            )?;
743            Ok(count)
744        })
745    }
746}