1use super::{Database, now_ms};
4use crate::config::IdsConfig;
5use crate::types::{CleanupSummary, DisconnectSummary, Worker};
6use anyhow::{Result, anyhow};
7use petname::{Generator, Petnames};
8use rusqlite::{Connection, params};
9
10pub const MAX_WORKER_ID_LEN: usize = 64;
12
13fn generate_agent_id(ids_config: &IdsConfig) -> String {
16 let words = ids_config.agent_id_words;
17 let case = ids_config.agent_id_case;
18
19 let base = Petnames::medium()
21 .generate_one(words, "-")
22 .unwrap_or_else(|| format!("worker-{}", now_ms()));
23
24 case.convert(&base)
26}
27
28fn parse_overlays(overlays_json: &Option<String>) -> Vec<String> {
30 overlays_json
31 .as_deref()
32 .and_then(|s| serde_json::from_str(s).ok())
33 .unwrap_or_default()
34}
35
36fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
38 let mut stmt = conn.prepare(
39 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
40 FROM workers WHERE id = ?1",
41 )?;
42
43 let result = stmt.query_row(params![worker_id], |row| {
44 let id: String = row.get(0)?;
45 let tags_json: String = row.get(1)?;
46 let max_claims: i32 = row.get(2)?;
47 let registered_at: i64 = row.get(3)?;
48 let last_heartbeat: i64 = row.get(4)?;
49 let last_status: Option<String> = row.get(5)?;
50 let last_phase: Option<String> = row.get(6)?;
51 let last_task_id: Option<String> = row.get(7)?;
52 let workflow: Option<String> = row.get(8)?;
53 let overlays_json: Option<String> = row.get(9)?;
54
55 Ok((
56 id,
57 tags_json,
58 max_claims,
59 registered_at,
60 last_heartbeat,
61 last_status,
62 last_phase,
63 last_task_id,
64 workflow,
65 overlays_json,
66 ))
67 });
68
69 match result {
70 Ok((
71 id,
72 tags_json,
73 max_claims,
74 registered_at,
75 last_heartbeat,
76 last_status,
77 last_phase,
78 last_task_id,
79 workflow,
80 overlays_json,
81 )) => {
82 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
83 let overlays = parse_overlays(&overlays_json);
84 Ok(Some(Worker {
85 id,
86 tags,
87 max_claims,
88 registered_at,
89 last_heartbeat,
90 last_status,
91 last_phase,
92 last_task_id,
93 workflow,
94 overlays,
95 }))
96 }
97 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
98 Err(e) => Err(e.into()),
99 }
100}
101
102impl Database {
103 #[allow(clippy::too_many_arguments)]
112 pub fn register_worker(
113 &self,
114 worker_id: Option<String>,
115 tags: Vec<String>,
116 force: bool,
117 ids_config: &IdsConfig,
118 workflow: Option<String>,
119 overlays: Vec<String>,
120 max_claims: Option<i32>,
121 ) -> Result<Worker> {
122 let provided_id = match worker_id {
124 Some(id) => {
125 if id.len() > MAX_WORKER_ID_LEN {
126 return Err(anyhow!(
127 "Worker ID must be at most {} characters, got {}",
128 MAX_WORKER_ID_LEN,
129 id.len()
130 ));
131 }
132 if id.is_empty() {
133 return Err(anyhow!("Worker ID cannot be empty"));
134 }
135 Some(id)
136 }
137 None => None,
138 };
139 let now = now_ms();
140 let max_claims = match max_claims {
141 Some(0) => i32::MAX, Some(n) => n,
143 None => 1, };
145 let tags_json = serde_json::to_string(&tags)?;
146 let overlays_json = if overlays.is_empty() {
147 None
148 } else {
149 Some(serde_json::to_string(&overlays)?)
150 };
151
152 self.with_conn(|conn| {
153 let id = match provided_id {
155 Some(id) => id,
156 None => generate_agent_id(ids_config),
157 };
158
159 let exists: bool = conn
161 .query_row("SELECT 1 FROM workers WHERE id = ?1", params![&id], |_| Ok(true))
162 .unwrap_or(false);
163
164 let current_max_sequence: i64 = conn
168 .query_row("SELECT COALESCE(MAX(id), 0) FROM claim_sequence", [], |row| row.get(0))
169 .unwrap_or(0);
170 let initial_sequence = current_max_sequence + 1;
171
172 if exists {
173 if force {
174 conn.execute(
176 "UPDATE workers SET tags = ?1, max_claims = ?2, last_heartbeat = ?3, last_claim_sequence = ?4, workflow = ?5, overlays = ?6 WHERE id = ?7",
177 params![tags_json, max_claims, now, initial_sequence, &workflow, &overlays_json, &id],
178 )?;
179 } else {
180 return Err(anyhow!("Worker ID '{}' already registered. Use force=true to reconnect.", id));
181 }
182 } else {
183 conn.execute(
184 "INSERT INTO workers (id, tags, max_claims, registered_at, last_heartbeat, last_claim_sequence, workflow, overlays)
185 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
186 params![&id, tags_json, max_claims, now, now, initial_sequence, &workflow, &overlays_json],
187 )?;
188 }
189
190 Ok(Worker {
191 id,
192 tags,
193 max_claims,
194 registered_at: now,
195 last_heartbeat: now,
196 last_status: None,
197 last_phase: None,
198 last_task_id: None,
199 workflow,
200 overlays,
201 })
202 })
203 }
204
205 pub fn get_worker(&self, worker_id: &str) -> Result<Option<Worker>> {
207 self.with_conn(|conn| get_worker_internal(conn, worker_id))
208 }
209
210 pub fn require_worker(&self, worker_id: &str) -> Result<Worker> {
212 self.get_worker(worker_id)?
213 .ok_or_else(|| anyhow::anyhow!("Worker {} not found", worker_id))
214 }
215
216 pub fn update_worker(
218 &self,
219 worker_id: &str,
220 tags: Option<Vec<String>>,
221 max_claims: Option<i32>,
222 ) -> Result<Worker> {
223 self.with_conn(|conn| {
224 let worker =
225 get_worker_internal(conn, worker_id)?.ok_or_else(|| anyhow!("Worker not found"))?;
226
227 let new_tags = tags.unwrap_or(worker.tags.clone());
228 let new_max_claims = max_claims.unwrap_or(worker.max_claims);
229 let tags_json = serde_json::to_string(&new_tags)?;
230
231 conn.execute(
232 "UPDATE workers SET tags = ?1, max_claims = ?2 WHERE id = ?3",
233 params![tags_json, new_max_claims, worker_id],
234 )?;
235
236 Ok(Worker {
237 id: worker_id.to_string(),
238 tags: new_tags,
239 max_claims: new_max_claims,
240 registered_at: worker.registered_at,
241 last_heartbeat: worker.last_heartbeat,
242 last_status: worker.last_status,
243 last_phase: worker.last_phase,
244 last_task_id: worker.last_task_id,
245 workflow: worker.workflow,
246 overlays: worker.overlays,
247 })
248 })
249 }
250
251 pub fn update_worker_overlays(&self, worker_id: &str, overlays: Vec<String>) -> Result<Worker> {
253 let overlays_json = if overlays.is_empty() {
254 None
255 } else {
256 Some(serde_json::to_string(&overlays)?)
257 };
258
259 self.with_conn(|conn| {
260 let updated = conn.execute(
261 "UPDATE workers SET overlays = ?1 WHERE id = ?2",
262 params![overlays_json, worker_id],
263 )?;
264
265 if updated == 0 {
266 return Err(anyhow!("Worker not found"));
267 }
268
269 get_worker_internal(conn, worker_id)?
270 .ok_or_else(|| anyhow!("Worker not found after update"))
271 })
272 }
273
274 pub fn update_worker_state(
277 &self,
278 worker_id: &str,
279 new_status: Option<&str>,
280 new_phase: Option<&str>,
281 task_id: Option<&str>,
282 ) -> Result<(Option<String>, Option<String>)> {
283 self.with_conn(|conn| {
284 let (old_status, old_phase): (Option<String>, Option<String>) = conn
286 .query_row(
287 "SELECT last_status, last_phase FROM workers WHERE id = ?1",
288 params![worker_id],
289 |row| Ok((row.get(0)?, row.get(1)?)),
290 )
291 .map_err(|e| match e {
292 rusqlite::Error::QueryReturnedNoRows => anyhow!("Worker not found"),
293 e => e.into(),
294 })?;
295
296 conn.execute(
298 "UPDATE workers SET last_status = ?1, last_phase = ?2, last_task_id = ?3 WHERE id = ?4",
299 params![new_status, new_phase, task_id, worker_id],
300 )?;
301
302 Ok((old_status, old_phase))
303 })
304 }
305
306 pub fn heartbeat(
308 &self,
309 worker_id: &str,
310 states_config: &crate::config::StatesConfig,
311 ) -> Result<i32> {
312 let now = now_ms();
313
314 self.with_conn(|conn| {
315 let updated = conn.execute(
316 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
317 params![now, worker_id],
318 )?;
319
320 if updated == 0 {
321 return Err(anyhow!("Worker not found"));
322 }
323
324 get_claim_count_internal(conn, worker_id, states_config)
326 })
327 }
328
329 pub fn unregister_worker(
332 &self,
333 worker_id: &str,
334 final_status: &str,
335 ) -> Result<DisconnectSummary> {
336 self.with_conn_mut(|conn| {
337 let tx = conn.transaction()?;
338
339 let tasks_released = tx.execute(
341 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?2
342 WHERE worker_id = ?1",
343 params![worker_id, final_status],
344 )? as i32;
345
346 let files_released = tx.execute(
348 "DELETE FROM file_locks WHERE worker_id = ?1",
349 params![worker_id],
350 )? as i32;
351
352 tx.execute("DELETE FROM workers WHERE id = ?1", params![worker_id])?;
354
355 tx.commit()?;
356 Ok(DisconnectSummary {
357 tasks_released,
358 files_released,
359 final_status: final_status.to_string(),
360 })
361 })
362 }
363
364 pub fn list_workers(&self) -> Result<Vec<Worker>> {
366 self.with_conn(|conn| {
367 let mut stmt = conn.prepare(
368 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
369 FROM workers ORDER BY registered_at DESC",
370 )?;
371
372 let workers = stmt
373 .query_map([], |row| {
374 let id: String = row.get(0)?;
375 let tags_json: String = row.get(1)?;
376 let max_claims: i32 = row.get(2)?;
377 let registered_at: i64 = row.get(3)?;
378 let last_heartbeat: i64 = row.get(4)?;
379 let last_status: Option<String> = row.get(5)?;
380 let last_phase: Option<String> = row.get(6)?;
381 let last_task_id: Option<String> = row.get(7)?;
382 let workflow: Option<String> = row.get(8)?;
383 let overlays_json: Option<String> = row.get(9)?;
384
385 Ok((
386 id,
387 tags_json,
388 max_claims,
389 registered_at,
390 last_heartbeat,
391 last_status,
392 last_phase,
393 last_task_id,
394 workflow,
395 overlays_json,
396 ))
397 })?
398 .filter_map(|r| r.ok())
399 .map(
400 |(
401 id,
402 tags_json,
403 max_claims,
404 registered_at,
405 last_heartbeat,
406 last_status,
407 last_phase,
408 last_task_id,
409 workflow,
410 overlays_json,
411 )| {
412 let tags: Vec<String> =
413 serde_json::from_str(&tags_json).unwrap_or_default();
414 let overlays = parse_overlays(&overlays_json);
415 Worker {
416 id,
417 tags,
418 max_claims,
419 registered_at,
420 last_heartbeat,
421 last_status,
422 last_phase,
423 last_task_id,
424 workflow,
425 overlays,
426 }
427 },
428 )
429 .collect();
430
431 Ok(workers)
432 })
433 }
434
435 pub fn list_workers_info(
437 &self,
438 states_config: &crate::config::StatesConfig,
439 ) -> Result<Vec<crate::types::WorkerInfo>> {
440 self.with_conn(|conn| {
441 let timed_states = states_config.timed_state_names();
442 let (status_in, status_in_thought) = if timed_states.is_empty() {
443 ("status = '__none__'".to_string(), "status = '__none__'".to_string())
444 } else {
445 let quoted: Vec<String> = timed_states.iter().map(|s| format!("'{}'", s)).collect();
446 let clause = format!("status IN ({})", quoted.join(", "));
447 (clause.clone(), clause)
448 };
449
450 let sql = format!(
451 "SELECT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
452 (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND {}) as claim_count,
453 (SELECT current_thought FROM tasks WHERE worker_id = w.id AND {} AND current_thought IS NOT NULL LIMIT 1) as current_thought,
454 w.last_status, w.last_phase, w.last_task_id, w.workflow, w.overlays
455 FROM workers w ORDER BY w.registered_at DESC",
456 status_in, status_in_thought
457 );
458
459 let mut stmt = conn.prepare(&sql)?;
460
461 let workers = stmt.query_map([], |row| {
462 let id: String = row.get(0)?;
463 let tags_json: String = row.get(1)?;
464 let max_claims: i32 = row.get(2)?;
465 let registered_at: i64 = row.get(3)?;
466 let last_heartbeat: i64 = row.get(4)?;
467 let claim_count: i32 = row.get(5)?;
468 let current_thought: Option<String> = row.get(6)?;
469 let last_status: Option<String> = row.get(7)?;
470 let last_phase: Option<String> = row.get(8)?;
471 let last_task_id: Option<String> = row.get(9)?;
472 let workflow: Option<String> = row.get(10)?;
473 let overlays_json: Option<String> = row.get(11)?;
474
475 Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json))
476 })?
477 .filter_map(|r| r.ok())
478 .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json)| {
479 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
480 let overlays = parse_overlays(&overlays_json);
481 crate::types::WorkerInfo {
482 id,
483 tags,
484 max_claims,
485 claim_count,
486 current_thought,
487 registered_at,
488 last_heartbeat,
489 last_status,
490 last_phase,
491 last_task_id,
492 workflow,
493 overlays,
494 }
495 })
496 .collect();
497
498 Ok(workers)
499 })
500 }
501
502 pub fn list_workers_filtered(
509 &self,
510 tags: Option<&Vec<String>>,
511 file: Option<&str>,
512 task_id: Option<&str>,
513 depth: i32,
514 states_config: &crate::config::StatesConfig,
515 ) -> Result<Vec<crate::types::WorkerInfo>> {
516 self.with_conn(|conn| {
517 let timed_states = states_config.timed_state_names();
518 let status_clause = if timed_states.is_empty() {
519 "status = '__none__'".to_string()
520 } else {
521 let quoted: Vec<String> = timed_states.iter().map(|s| format!("'{}'", s)).collect();
522 format!("status IN ({})", quoted.join(", "))
523 };
524
525 let mut sql = format!(
527 "SELECT DISTINCT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
528 (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND {}) as claim_count,
529 (SELECT current_thought FROM tasks WHERE worker_id = w.id AND {} AND current_thought IS NOT NULL LIMIT 1) as current_thought,
530 w.last_status, w.last_phase, w.last_task_id, w.workflow, w.overlays
531 FROM workers w WHERE 1=1",
532 status_clause, status_clause
533 );
534 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
535
536 if let Some(f) = file {
538 sql.push_str(" AND w.id IN (SELECT worker_id FROM file_locks WHERE file_path = ?)");
539 params_vec.push(Box::new(f.to_string()));
540 }
541
542 if let Some(tid) = task_id {
544 let related_task_ids = Self::get_related_task_ids_internal(conn, tid, depth)?;
546 if !related_task_ids.is_empty() {
547 let placeholders: Vec<String> = related_task_ids.iter().map(|_| "?".to_string()).collect();
548 sql.push_str(&format!(
549 " AND w.id IN (SELECT DISTINCT worker_id FROM tasks WHERE id IN ({}) AND worker_id IS NOT NULL)",
550 placeholders.join(", ")
551 ));
552 for task in related_task_ids {
553 params_vec.push(Box::new(task));
554 }
555 } else {
556 return Ok(Vec::new());
558 }
559 }
560
561 sql.push_str(" ORDER BY w.registered_at DESC");
562
563 let params_refs: Vec<&dyn rusqlite::ToSql> =
564 params_vec.iter().map(|b| b.as_ref()).collect();
565
566 let mut stmt = conn.prepare(&sql)?;
567 let workers: Vec<crate::types::WorkerInfo> = stmt
568 .query_map(params_refs.as_slice(), |row| {
569 let id: String = row.get(0)?;
570 let tags_json: String = row.get(1)?;
571 let max_claims: i32 = row.get(2)?;
572 let registered_at: i64 = row.get(3)?;
573 let last_heartbeat: i64 = row.get(4)?;
574 let claim_count: i32 = row.get(5)?;
575 let current_thought: Option<String> = row.get(6)?;
576 let last_status: Option<String> = row.get(7)?;
577 let last_phase: Option<String> = row.get(8)?;
578 let last_task_id: Option<String> = row.get(9)?;
579 let workflow: Option<String> = row.get(10)?;
580 let overlays_json: Option<String> = row.get(11)?;
581
582 Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json))
583 })?
584 .filter_map(|r| r.ok())
585 .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, last_task_id, workflow, overlays_json)| {
586 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
587 let overlays = parse_overlays(&overlays_json);
588 crate::types::WorkerInfo {
589 id,
590 tags,
591 max_claims,
592 claim_count,
593 current_thought,
594 registered_at,
595 last_heartbeat,
596 last_status,
597 last_phase,
598 last_task_id,
599 workflow,
600 overlays,
601 }
602 })
603 .collect();
604
605 let workers = if let Some(required_tags) = tags {
607 workers
608 .into_iter()
609 .filter(|w| required_tags.iter().all(|t| w.tags.contains(t)))
610 .collect()
611 } else {
612 workers
613 };
614
615 Ok(workers)
616 })
617 }
618
619 fn get_related_task_ids_internal(
622 conn: &Connection,
623 task_id: &str,
624 depth: i32,
625 ) -> Result<Vec<String>> {
626 use std::collections::HashSet;
627
628 let mut result = HashSet::new();
629 result.insert(task_id.to_string());
630
631 if depth == 0 {
632 return Ok(result.into_iter().collect());
633 }
634
635 let abs_depth = depth.abs();
636 let mut current_level: HashSet<String> = [task_id.to_string()].into_iter().collect();
637
638 for _ in 0..abs_depth {
639 if current_level.is_empty() {
640 break;
641 }
642
643 let mut next_level = HashSet::new();
644
645 for tid in ¤t_level {
646 let related: Vec<String> = if depth > 0 {
647 let mut stmt = conn
649 .prepare("SELECT to_task_id FROM dependencies WHERE from_task_id = ?1")?;
650 stmt.query_map(params![tid], |row| row.get(0))?
651 .filter_map(|r| r.ok())
652 .collect()
653 } else {
654 let mut stmt = conn
656 .prepare("SELECT from_task_id FROM dependencies WHERE to_task_id = ?1")?;
657 stmt.query_map(params![tid], |row| row.get(0))?
658 .filter_map(|r| r.ok())
659 .collect()
660 };
661
662 for related_id in related {
663 if !result.contains(&related_id) {
664 next_level.insert(related_id.clone());
665 result.insert(related_id);
666 }
667 }
668 }
669
670 current_level = next_level;
671 }
672
673 Ok(result.into_iter().collect())
674 }
675
676 pub fn get_stale_workers(&self, timeout_seconds: i64) -> Result<Vec<Worker>> {
678 let cutoff = now_ms() - (timeout_seconds * 1000);
679
680 self.with_conn(|conn| {
681 let mut stmt = conn.prepare(
682 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, last_task_id, workflow, overlays
683 FROM workers WHERE last_heartbeat < ?1",
684 )?;
685
686 let workers = stmt
687 .query_map(params![cutoff], |row| {
688 let id: String = row.get(0)?;
689 let tags_json: String = row.get(1)?;
690 let max_claims: i32 = row.get(2)?;
691 let registered_at: i64 = row.get(3)?;
692 let last_heartbeat: i64 = row.get(4)?;
693 let last_status: Option<String> = row.get(5)?;
694 let last_phase: Option<String> = row.get(6)?;
695 let last_task_id: Option<String> = row.get(7)?;
696 let workflow: Option<String> = row.get(8)?;
697 let overlays_json: Option<String> = row.get(9)?;
698
699 Ok((
700 id,
701 tags_json,
702 max_claims,
703 registered_at,
704 last_heartbeat,
705 last_status,
706 last_phase,
707 last_task_id,
708 workflow,
709 overlays_json,
710 ))
711 })?
712 .filter_map(|r| r.ok())
713 .map(
714 |(
715 id,
716 tags_json,
717 max_claims,
718 registered_at,
719 last_heartbeat,
720 last_status,
721 last_phase,
722 last_task_id,
723 workflow,
724 overlays_json,
725 )| {
726 let tags: Vec<String> =
727 serde_json::from_str(&tags_json).unwrap_or_default();
728 let overlays = parse_overlays(&overlays_json);
729 Worker {
730 id,
731 tags,
732 max_claims,
733 registered_at,
734 last_heartbeat,
735 last_status,
736 last_phase,
737 last_task_id,
738 workflow,
739 overlays,
740 }
741 },
742 )
743 .collect();
744
745 Ok(workers)
746 })
747 }
748
749 pub fn cleanup_stale_workers(
759 &self,
760 timeout_seconds: i64,
761 final_status: &str,
762 ) -> Result<CleanupSummary> {
763 let stale_workers = self.get_stale_workers(timeout_seconds)?;
764
765 let mut total_tasks_released = 0;
766 let mut total_files_released = 0;
767 let mut evicted_worker_ids = Vec::new();
768
769 for worker in &stale_workers {
770 let released_task_ids =
772 self.record_stale_release_transitions(&worker.id, final_status)?;
773
774 if released_task_ids.len() > 5 {
775 eprintln!(
776 "[cleanup] Bulk-releasing {} task claims from stale agent '{}' (last heartbeat: {})",
777 released_task_ids.len(),
778 worker.id,
779 worker.last_heartbeat,
780 );
781 }
782
783 let _ = self.release_worker_locks(&worker.id);
785
786 if let Ok(summary) = self.unregister_worker(&worker.id, final_status) {
788 total_tasks_released += summary.tasks_released;
789 total_files_released += summary.files_released;
790 evicted_worker_ids.push(worker.id.clone());
791 }
792 }
793
794 Ok(CleanupSummary {
795 workers_evicted: evicted_worker_ids.len() as i32,
796 tasks_released: total_tasks_released,
797 files_released: total_files_released,
798 final_status: final_status.to_string(),
799 evicted_worker_ids,
800 })
801 }
802
803 fn record_stale_release_transitions(
806 &self,
807 worker_id: &str,
808 final_status: &str,
809 ) -> Result<Vec<String>> {
810 self.with_conn(|conn| {
811 let now = now_ms();
812
813 let mut stmt = conn.prepare("SELECT id FROM tasks WHERE worker_id = ?1")?;
814 let released_task_ids: Vec<String> = stmt
815 .query_map(params![worker_id], |row| row.get(0))?
816 .filter_map(|r| r.ok())
817 .collect();
818
819 for task_id in &released_task_ids {
820 conn.execute(
822 "UPDATE task_sequence SET end_timestamp = ?1
823 WHERE task_id = ?2 AND end_timestamp IS NULL AND status IS NOT NULL",
824 params![now, task_id],
825 )?;
826
827 conn.execute(
829 "INSERT INTO task_sequence (task_id, worker_id, status, reason, timestamp)
830 VALUES (?1, ?2, ?3, 'stale_release', ?4)",
831 params![task_id, worker_id, final_status, now],
832 )?;
833 }
834
835 Ok(released_task_ids)
836 })
837 }
838
839 pub fn expire_worker(&self, worker_id: &str, final_status: &str) -> Result<DisconnectSummary> {
843 let files_from_locks = self.release_worker_locks(worker_id).unwrap_or(0);
844 let mut summary = self.unregister_worker(worker_id, final_status)?;
845 summary.files_released += files_from_locks;
846 Ok(summary)
847 }
848
849 pub fn get_claim_count(
851 &self,
852 worker_id: &str,
853 states_config: &crate::config::StatesConfig,
854 ) -> Result<i32> {
855 self.with_conn(|conn| get_claim_count_internal(conn, worker_id, states_config))
856 }
857}
858
859pub(crate) fn get_claim_count_internal(
862 conn: &Connection,
863 worker_id: &str,
864 states_config: &crate::config::StatesConfig,
865) -> Result<i32> {
866 let timed_states = states_config.timed_state_names();
867 if timed_states.is_empty() {
868 return Ok(0);
869 }
870 let placeholders: Vec<String> = (0..timed_states.len())
871 .map(|i| format!("?{}", i + 2))
872 .collect();
873 let sql = format!(
874 "SELECT COUNT(*) FROM tasks WHERE worker_id = ?1 AND status IN ({})",
875 placeholders.join(", ")
876 );
877 let mut stmt = conn.prepare(&sql)?;
878 let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
879 param_values.push(Box::new(worker_id.to_string()));
880 for state in &timed_states {
881 param_values.push(Box::new(state.to_string()));
882 }
883 let param_refs: Vec<&dyn rusqlite::ToSql> = param_values.iter().map(|b| b.as_ref()).collect();
884 let count: i32 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
885 Ok(count)
886}