1use super::{Database, now_ms};
4use crate::config::IdsConfig;
5use crate::types::{CleanupSummary, DisconnectSummary, Worker};
6use anyhow::{Result, anyhow};
7use petname::{Generator, Petnames};
8use rusqlite::{Connection, params};
9
10pub const MAX_WORKER_ID_LEN: usize = 64;
12
13fn generate_agent_id(ids_config: &IdsConfig) -> String {
16 let words = ids_config.agent_id_words;
17 let case = ids_config.agent_id_case;
18
19 let base = Petnames::medium()
21 .generate_one(words, "-")
22 .unwrap_or_else(|| format!("worker-{}", now_ms()));
23
24 case.convert(&base)
26}
27
28fn parse_overlays(overlays_json: &Option<String>) -> Vec<String> {
30 overlays_json
31 .as_deref()
32 .and_then(|s| serde_json::from_str(s).ok())
33 .unwrap_or_default()
34}
35
36fn get_worker_internal(conn: &Connection, worker_id: &str) -> Result<Option<Worker>> {
38 let mut stmt = conn.prepare(
39 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
40 FROM workers WHERE id = ?1",
41 )?;
42
43 let result = stmt.query_row(params![worker_id], |row| {
44 let id: String = row.get(0)?;
45 let tags_json: String = row.get(1)?;
46 let max_claims: i32 = row.get(2)?;
47 let registered_at: i64 = row.get(3)?;
48 let last_heartbeat: i64 = row.get(4)?;
49 let last_status: Option<String> = row.get(5)?;
50 let last_phase: Option<String> = row.get(6)?;
51 let workflow: Option<String> = row.get(7)?;
52 let overlays_json: Option<String> = row.get(8)?;
53
54 Ok((
55 id,
56 tags_json,
57 max_claims,
58 registered_at,
59 last_heartbeat,
60 last_status,
61 last_phase,
62 workflow,
63 overlays_json,
64 ))
65 });
66
67 match result {
68 Ok((
69 id,
70 tags_json,
71 max_claims,
72 registered_at,
73 last_heartbeat,
74 last_status,
75 last_phase,
76 workflow,
77 overlays_json,
78 )) => {
79 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
80 let overlays = parse_overlays(&overlays_json);
81 Ok(Some(Worker {
82 id,
83 tags,
84 max_claims,
85 registered_at,
86 last_heartbeat,
87 last_status,
88 last_phase,
89 workflow,
90 overlays,
91 }))
92 }
93 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
94 Err(e) => Err(e.into()),
95 }
96}
97
98impl Database {
99 pub fn register_worker(
108 &self,
109 worker_id: Option<String>,
110 tags: Vec<String>,
111 force: bool,
112 ids_config: &IdsConfig,
113 workflow: Option<String>,
114 overlays: Vec<String>,
115 ) -> Result<Worker> {
116 let provided_id = match worker_id {
118 Some(id) => {
119 if id.len() > MAX_WORKER_ID_LEN {
120 return Err(anyhow!(
121 "Worker ID must be at most {} characters, got {}",
122 MAX_WORKER_ID_LEN,
123 id.len()
124 ));
125 }
126 if id.is_empty() {
127 return Err(anyhow!("Worker ID cannot be empty"));
128 }
129 Some(id)
130 }
131 None => None,
132 };
133 let now = now_ms();
134 let max_claims = i32::MAX; let tags_json = serde_json::to_string(&tags)?;
136 let overlays_json = if overlays.is_empty() {
137 None
138 } else {
139 Some(serde_json::to_string(&overlays)?)
140 };
141
142 self.with_conn(|conn| {
143 let id = match provided_id {
145 Some(id) => id,
146 None => generate_agent_id(ids_config),
147 };
148
149 let exists: bool = conn
151 .query_row("SELECT 1 FROM workers WHERE id = ?1", params![&id], |_| Ok(true))
152 .unwrap_or(false);
153
154 let current_max_sequence: i64 = conn
158 .query_row("SELECT COALESCE(MAX(id), 0) FROM claim_sequence", [], |row| row.get(0))
159 .unwrap_or(0);
160 let initial_sequence = current_max_sequence + 1;
161
162 if exists {
163 if force {
164 conn.execute(
166 "UPDATE workers SET tags = ?1, max_claims = ?2, last_heartbeat = ?3, last_claim_sequence = ?4, workflow = ?5, overlays = ?6 WHERE id = ?7",
167 params![tags_json, max_claims, now, initial_sequence, &workflow, &overlays_json, &id],
168 )?;
169 } else {
170 return Err(anyhow!("Worker ID '{}' already registered. Use force=true to reconnect.", id));
171 }
172 } else {
173 conn.execute(
174 "INSERT INTO workers (id, tags, max_claims, registered_at, last_heartbeat, last_claim_sequence, workflow, overlays)
175 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
176 params![&id, tags_json, max_claims, now, now, initial_sequence, &workflow, &overlays_json],
177 )?;
178 }
179
180 Ok(Worker {
181 id,
182 tags,
183 max_claims,
184 registered_at: now,
185 last_heartbeat: now,
186 last_status: None,
187 last_phase: None,
188 workflow,
189 overlays,
190 })
191 })
192 }
193
194 pub fn get_worker(&self, worker_id: &str) -> Result<Option<Worker>> {
196 self.with_conn(|conn| get_worker_internal(conn, worker_id))
197 }
198
199 pub fn require_worker(&self, worker_id: &str) -> Result<Worker> {
201 self.get_worker(worker_id)?
202 .ok_or_else(|| anyhow::anyhow!("Worker {} not found", worker_id))
203 }
204
205 pub fn update_worker(
207 &self,
208 worker_id: &str,
209 tags: Option<Vec<String>>,
210 max_claims: Option<i32>,
211 ) -> Result<Worker> {
212 self.with_conn(|conn| {
213 let worker =
214 get_worker_internal(conn, worker_id)?.ok_or_else(|| anyhow!("Worker not found"))?;
215
216 let new_tags = tags.unwrap_or(worker.tags.clone());
217 let new_max_claims = max_claims.unwrap_or(worker.max_claims);
218 let tags_json = serde_json::to_string(&new_tags)?;
219
220 conn.execute(
221 "UPDATE workers SET tags = ?1, max_claims = ?2 WHERE id = ?3",
222 params![tags_json, new_max_claims, worker_id],
223 )?;
224
225 Ok(Worker {
226 id: worker_id.to_string(),
227 tags: new_tags,
228 max_claims: new_max_claims,
229 registered_at: worker.registered_at,
230 last_heartbeat: worker.last_heartbeat,
231 last_status: worker.last_status,
232 last_phase: worker.last_phase,
233 workflow: worker.workflow,
234 overlays: worker.overlays,
235 })
236 })
237 }
238
239 pub fn update_worker_overlays(&self, worker_id: &str, overlays: Vec<String>) -> Result<Worker> {
241 let overlays_json = if overlays.is_empty() {
242 None
243 } else {
244 Some(serde_json::to_string(&overlays)?)
245 };
246
247 self.with_conn(|conn| {
248 let updated = conn.execute(
249 "UPDATE workers SET overlays = ?1 WHERE id = ?2",
250 params![overlays_json, worker_id],
251 )?;
252
253 if updated == 0 {
254 return Err(anyhow!("Worker not found"));
255 }
256
257 get_worker_internal(conn, worker_id)?
258 .ok_or_else(|| anyhow!("Worker not found after update"))
259 })
260 }
261
262 pub fn update_worker_state(
265 &self,
266 worker_id: &str,
267 new_status: Option<&str>,
268 new_phase: Option<&str>,
269 ) -> Result<(Option<String>, Option<String>)> {
270 self.with_conn(|conn| {
271 let (old_status, old_phase): (Option<String>, Option<String>) = conn
273 .query_row(
274 "SELECT last_status, last_phase FROM workers WHERE id = ?1",
275 params![worker_id],
276 |row| Ok((row.get(0)?, row.get(1)?)),
277 )
278 .map_err(|e| match e {
279 rusqlite::Error::QueryReturnedNoRows => anyhow!("Worker not found"),
280 e => e.into(),
281 })?;
282
283 conn.execute(
285 "UPDATE workers SET last_status = ?1, last_phase = ?2 WHERE id = ?3",
286 params![new_status, new_phase, worker_id],
287 )?;
288
289 Ok((old_status, old_phase))
290 })
291 }
292
293 pub fn heartbeat(&self, worker_id: &str) -> Result<i32> {
295 let now = now_ms();
296
297 self.with_conn(|conn| {
298 let updated = conn.execute(
299 "UPDATE workers SET last_heartbeat = ?1 WHERE id = ?2",
300 params![now, worker_id],
301 )?;
302
303 if updated == 0 {
304 return Err(anyhow!("Worker not found"));
305 }
306
307 let count: i32 = conn.query_row(
309 "SELECT COUNT(*) FROM tasks WHERE worker_id = ?1 AND status = 'working'",
310 params![worker_id],
311 |row| row.get(0),
312 )?;
313
314 Ok(count)
315 })
316 }
317
318 pub fn unregister_worker(
321 &self,
322 worker_id: &str,
323 final_status: &str,
324 ) -> Result<DisconnectSummary> {
325 self.with_conn_mut(|conn| {
326 let tx = conn.transaction()?;
327
328 let tasks_released = tx.execute(
330 "UPDATE tasks SET worker_id = NULL, claimed_at = NULL, status = ?2
331 WHERE worker_id = ?1",
332 params![worker_id, final_status],
333 )? as i32;
334
335 let files_released = tx.execute(
337 "DELETE FROM file_locks WHERE worker_id = ?1",
338 params![worker_id],
339 )? as i32;
340
341 tx.execute("DELETE FROM workers WHERE id = ?1", params![worker_id])?;
343
344 tx.commit()?;
345 Ok(DisconnectSummary {
346 tasks_released,
347 files_released,
348 final_status: final_status.to_string(),
349 })
350 })
351 }
352
353 pub fn list_workers(&self) -> Result<Vec<Worker>> {
355 self.with_conn(|conn| {
356 let mut stmt = conn.prepare(
357 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
358 FROM workers ORDER BY registered_at DESC",
359 )?;
360
361 let workers = stmt
362 .query_map([], |row| {
363 let id: String = row.get(0)?;
364 let tags_json: String = row.get(1)?;
365 let max_claims: i32 = row.get(2)?;
366 let registered_at: i64 = row.get(3)?;
367 let last_heartbeat: i64 = row.get(4)?;
368 let last_status: Option<String> = row.get(5)?;
369 let last_phase: Option<String> = row.get(6)?;
370 let workflow: Option<String> = row.get(7)?;
371 let overlays_json: Option<String> = row.get(8)?;
372
373 Ok((
374 id,
375 tags_json,
376 max_claims,
377 registered_at,
378 last_heartbeat,
379 last_status,
380 last_phase,
381 workflow,
382 overlays_json,
383 ))
384 })?
385 .filter_map(|r| r.ok())
386 .map(
387 |(
388 id,
389 tags_json,
390 max_claims,
391 registered_at,
392 last_heartbeat,
393 last_status,
394 last_phase,
395 workflow,
396 overlays_json,
397 )| {
398 let tags: Vec<String> =
399 serde_json::from_str(&tags_json).unwrap_or_default();
400 let overlays = parse_overlays(&overlays_json);
401 Worker {
402 id,
403 tags,
404 max_claims,
405 registered_at,
406 last_heartbeat,
407 last_status,
408 last_phase,
409 workflow,
410 overlays,
411 }
412 },
413 )
414 .collect();
415
416 Ok(workers)
417 })
418 }
419
420 pub fn list_workers_info(&self) -> Result<Vec<crate::types::WorkerInfo>> {
422 self.with_conn(|conn| {
423 let mut stmt = conn.prepare(
424 "SELECT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
425 (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND status = 'working') as claim_count,
426 (SELECT current_thought FROM tasks WHERE worker_id = w.id AND status = 'working' AND current_thought IS NOT NULL LIMIT 1) as current_thought,
427 w.last_status, w.last_phase, w.workflow, w.overlays
428 FROM workers w ORDER BY w.registered_at DESC",
429 )?;
430
431 let workers = stmt.query_map([], |row| {
432 let id: String = row.get(0)?;
433 let tags_json: String = row.get(1)?;
434 let max_claims: i32 = row.get(2)?;
435 let registered_at: i64 = row.get(3)?;
436 let last_heartbeat: i64 = row.get(4)?;
437 let claim_count: i32 = row.get(5)?;
438 let current_thought: Option<String> = row.get(6)?;
439 let last_status: Option<String> = row.get(7)?;
440 let last_phase: Option<String> = row.get(8)?;
441 let workflow: Option<String> = row.get(9)?;
442 let overlays_json: Option<String> = row.get(10)?;
443
444 Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json))
445 })?
446 .filter_map(|r| r.ok())
447 .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json)| {
448 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
449 let overlays = parse_overlays(&overlays_json);
450 crate::types::WorkerInfo {
451 id,
452 tags,
453 max_claims,
454 claim_count,
455 current_thought,
456 registered_at,
457 last_heartbeat,
458 last_status,
459 last_phase,
460 workflow,
461 overlays,
462 }
463 })
464 .collect();
465
466 Ok(workers)
467 })
468 }
469
470 pub fn list_workers_filtered(
477 &self,
478 tags: Option<&Vec<String>>,
479 file: Option<&str>,
480 task_id: Option<&str>,
481 depth: i32,
482 ) -> Result<Vec<crate::types::WorkerInfo>> {
483 self.with_conn(|conn| {
484 let mut sql = String::from(
486 "SELECT DISTINCT w.id, w.tags, w.max_claims, w.registered_at, w.last_heartbeat,
487 (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND status = 'working') as claim_count,
488 (SELECT current_thought FROM tasks WHERE worker_id = w.id AND status = 'working' AND current_thought IS NOT NULL LIMIT 1) as current_thought,
489 w.last_status, w.last_phase, w.workflow, w.overlays
490 FROM workers w WHERE 1=1",
491 );
492 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
493
494 if let Some(f) = file {
496 sql.push_str(" AND w.id IN (SELECT worker_id FROM file_locks WHERE file_path = ?)");
497 params_vec.push(Box::new(f.to_string()));
498 }
499
500 if let Some(tid) = task_id {
502 let related_task_ids = Self::get_related_task_ids_internal(conn, tid, depth)?;
504 if !related_task_ids.is_empty() {
505 let placeholders: Vec<String> = related_task_ids.iter().map(|_| "?".to_string()).collect();
506 sql.push_str(&format!(
507 " AND w.id IN (SELECT DISTINCT worker_id FROM tasks WHERE id IN ({}) AND worker_id IS NOT NULL)",
508 placeholders.join(", ")
509 ));
510 for task in related_task_ids {
511 params_vec.push(Box::new(task));
512 }
513 } else {
514 return Ok(Vec::new());
516 }
517 }
518
519 sql.push_str(" ORDER BY w.registered_at DESC");
520
521 let params_refs: Vec<&dyn rusqlite::ToSql> =
522 params_vec.iter().map(|b| b.as_ref()).collect();
523
524 let mut stmt = conn.prepare(&sql)?;
525 let workers: Vec<crate::types::WorkerInfo> = stmt
526 .query_map(params_refs.as_slice(), |row| {
527 let id: String = row.get(0)?;
528 let tags_json: String = row.get(1)?;
529 let max_claims: i32 = row.get(2)?;
530 let registered_at: i64 = row.get(3)?;
531 let last_heartbeat: i64 = row.get(4)?;
532 let claim_count: i32 = row.get(5)?;
533 let current_thought: Option<String> = row.get(6)?;
534 let last_status: Option<String> = row.get(7)?;
535 let last_phase: Option<String> = row.get(8)?;
536 let workflow: Option<String> = row.get(9)?;
537 let overlays_json: Option<String> = row.get(10)?;
538
539 Ok((id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json))
540 })?
541 .filter_map(|r| r.ok())
542 .map(|(id, tags_json, max_claims, registered_at, last_heartbeat, claim_count, current_thought, last_status, last_phase, workflow, overlays_json)| {
543 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
544 let overlays = parse_overlays(&overlays_json);
545 crate::types::WorkerInfo {
546 id,
547 tags,
548 max_claims,
549 claim_count,
550 current_thought,
551 registered_at,
552 last_heartbeat,
553 last_status,
554 last_phase,
555 workflow,
556 overlays,
557 }
558 })
559 .collect();
560
561 let workers = if let Some(required_tags) = tags {
563 workers
564 .into_iter()
565 .filter(|w| required_tags.iter().all(|t| w.tags.contains(t)))
566 .collect()
567 } else {
568 workers
569 };
570
571 Ok(workers)
572 })
573 }
574
575 fn get_related_task_ids_internal(
578 conn: &Connection,
579 task_id: &str,
580 depth: i32,
581 ) -> Result<Vec<String>> {
582 use std::collections::HashSet;
583
584 let mut result = HashSet::new();
585 result.insert(task_id.to_string());
586
587 if depth == 0 {
588 return Ok(result.into_iter().collect());
589 }
590
591 let abs_depth = depth.abs();
592 let mut current_level: HashSet<String> = [task_id.to_string()].into_iter().collect();
593
594 for _ in 0..abs_depth {
595 if current_level.is_empty() {
596 break;
597 }
598
599 let mut next_level = HashSet::new();
600
601 for tid in ¤t_level {
602 let related: Vec<String> = if depth > 0 {
603 let mut stmt = conn
605 .prepare("SELECT to_task_id FROM dependencies WHERE from_task_id = ?1")?;
606 stmt.query_map(params![tid], |row| row.get(0))?
607 .filter_map(|r| r.ok())
608 .collect()
609 } else {
610 let mut stmt = conn
612 .prepare("SELECT from_task_id FROM dependencies WHERE to_task_id = ?1")?;
613 stmt.query_map(params![tid], |row| row.get(0))?
614 .filter_map(|r| r.ok())
615 .collect()
616 };
617
618 for related_id in related {
619 if !result.contains(&related_id) {
620 next_level.insert(related_id.clone());
621 result.insert(related_id);
622 }
623 }
624 }
625
626 current_level = next_level;
627 }
628
629 Ok(result.into_iter().collect())
630 }
631
632 pub fn get_stale_workers(&self, timeout_seconds: i64) -> Result<Vec<Worker>> {
634 let cutoff = now_ms() - (timeout_seconds * 1000);
635
636 self.with_conn(|conn| {
637 let mut stmt = conn.prepare(
638 "SELECT id, tags, max_claims, registered_at, last_heartbeat, last_status, last_phase, workflow, overlays
639 FROM workers WHERE last_heartbeat < ?1",
640 )?;
641
642 let workers = stmt
643 .query_map(params![cutoff], |row| {
644 let id: String = row.get(0)?;
645 let tags_json: String = row.get(1)?;
646 let max_claims: i32 = row.get(2)?;
647 let registered_at: i64 = row.get(3)?;
648 let last_heartbeat: i64 = row.get(4)?;
649 let last_status: Option<String> = row.get(5)?;
650 let last_phase: Option<String> = row.get(6)?;
651 let workflow: Option<String> = row.get(7)?;
652 let overlays_json: Option<String> = row.get(8)?;
653
654 Ok((
655 id,
656 tags_json,
657 max_claims,
658 registered_at,
659 last_heartbeat,
660 last_status,
661 last_phase,
662 workflow,
663 overlays_json,
664 ))
665 })?
666 .filter_map(|r| r.ok())
667 .map(
668 |(
669 id,
670 tags_json,
671 max_claims,
672 registered_at,
673 last_heartbeat,
674 last_status,
675 last_phase,
676 workflow,
677 overlays_json,
678 )| {
679 let tags: Vec<String> =
680 serde_json::from_str(&tags_json).unwrap_or_default();
681 let overlays = parse_overlays(&overlays_json);
682 Worker {
683 id,
684 tags,
685 max_claims,
686 registered_at,
687 last_heartbeat,
688 last_status,
689 last_phase,
690 workflow,
691 overlays,
692 }
693 },
694 )
695 .collect();
696
697 Ok(workers)
698 })
699 }
700
701 pub fn cleanup_stale_workers(
704 &self,
705 timeout_seconds: i64,
706 final_status: &str,
707 ) -> Result<CleanupSummary> {
708 let stale_workers = self.get_stale_workers(timeout_seconds)?;
709
710 let mut total_tasks_released = 0;
711 let mut total_files_released = 0;
712 let mut evicted_worker_ids = Vec::new();
713
714 for worker in &stale_workers {
715 let _ = self.release_worker_locks(&worker.id);
717
718 if let Ok(summary) = self.unregister_worker(&worker.id, final_status) {
720 total_tasks_released += summary.tasks_released;
721 total_files_released += summary.files_released;
722 evicted_worker_ids.push(worker.id.clone());
723 }
724 }
725
726 Ok(CleanupSummary {
727 workers_evicted: evicted_worker_ids.len() as i32,
728 tasks_released: total_tasks_released,
729 files_released: total_files_released,
730 final_status: final_status.to_string(),
731 evicted_worker_ids,
732 })
733 }
734
735 pub fn get_claim_count(&self, worker_id: &str) -> Result<i32> {
737 self.with_conn(|conn| {
738 let count: i32 = conn.query_row(
739 "SELECT COUNT(*) FROM tasks WHERE worker_id = ?1 AND status = 'working'",
740 params![worker_id],
741 |row| row.get(0),
742 )?;
743 Ok(count)
744 })
745 }
746}