1use anyhow::{bail, Context, Result};
2use chrono::Utc;
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::atomic::{AtomicBool, Ordering};
6
7use crate::identity::AgentConfig;
8use crate::locks::{Heartbeat, Keyring, LocksFile};
9
10const HUB_CACHE_DIR: &str = ".locks-cache";
12
13const HUB_BRANCH: &str = "chainlink/locks";
15
16const MAX_DIVERGENCE: usize = 10;
18
19#[derive(Debug, Clone, PartialEq)]
21pub enum GpgVerification {
22 Valid {
24 commit: String,
25 fingerprint: Option<String>,
26 },
27 Unsigned { commit: String },
29 Invalid { commit: String, reason: String },
31 NoCommits,
33}
34
35pub fn read_tracker_remote(chainlink_dir: &Path) -> String {
39 let config_path = chainlink_dir.join("hook-config.json");
40 let configured = std::fs::read_to_string(&config_path)
41 .ok()
42 .and_then(|content| serde_json::from_str::<serde_json::Value>(&content).ok())
43 .and_then(|v| {
44 v.get("tracker_remote")
45 .and_then(|r| r.as_str().map(|s| s.to_string()))
46 });
47
48 if let Some(remote) = configured {
49 return remote;
50 }
51
52 static WARNED: AtomicBool = AtomicBool::new(false);
54 if !WARNED.swap(true, Ordering::Relaxed) {
55 tracing::warn!(
56 "no tracker_remote configured in {}, defaulting to \"origin\"",
57 config_path.display()
58 );
59 }
60
61 "origin".to_string()
62}
63
64pub struct SyncManager {
69 chainlink_dir: PathBuf,
71 cache_dir: PathBuf,
73 repo_root: PathBuf,
75 remote: String,
77}
78
79impl SyncManager {
80 pub fn new(chainlink_dir: &Path) -> Result<Self> {
82 let repo_root = chainlink_dir
83 .parent()
84 .ok_or_else(|| anyhow::anyhow!("Cannot determine repo root from .chainlink dir"))?
85 .to_path_buf();
86
87 let cache_dir = chainlink_dir.join(HUB_CACHE_DIR);
88 let remote = read_tracker_remote(chainlink_dir);
89
90 Ok(SyncManager {
91 chainlink_dir: chainlink_dir.to_path_buf(),
92 cache_dir,
93 repo_root,
94 remote,
95 })
96 }
97
98 pub fn chainlink_dir(&self) -> &Path {
100 &self.chainlink_dir
101 }
102
103 pub fn is_initialized(&self) -> bool {
105 self.cache_dir.exists()
106 }
107
108 pub fn cache_path(&self) -> &Path {
110 &self.cache_dir
111 }
112
113 pub fn remote(&self) -> &str {
115 &self.remote
116 }
117
118 fn cache_path_str(&self) -> String {
121 self.cache_dir.to_string_lossy().to_string()
122 }
123
124 fn git_in_repo(&self, args: &[&str]) -> Result<std::process::Output> {
125 let output = Command::new("git")
126 .current_dir(&self.repo_root)
127 .args(args)
128 .output()
129 .with_context(|| format!("Failed to run git {:?}", args))?;
130 if !output.status.success() {
131 let stderr = String::from_utf8_lossy(&output.stderr);
132 bail!("git {:?} failed: {}", args, stderr);
133 }
134 Ok(output)
135 }
136
137 fn git_in_cache(&self, args: &[&str]) -> Result<std::process::Output> {
138 let output = Command::new("git")
139 .current_dir(&self.cache_dir)
140 .args(args)
141 .output()
142 .with_context(|| format!("Failed to run git {:?} in cache", args))?;
143 if !output.status.success() {
144 let stderr = String::from_utf8_lossy(&output.stderr);
145 bail!("git {:?} in cache failed: {}", args, stderr);
146 }
147 Ok(output)
148 }
149
150 fn ensure_cache_git_identity(&self) -> Result<()> {
152 let has_email = Command::new("git")
153 .current_dir(&self.cache_dir)
154 .args(["config", "user.email"])
155 .output()
156 .map(|o| o.status.success())
157 .unwrap_or(false);
158 if !has_email {
159 let _ = Command::new("git")
160 .current_dir(&self.cache_dir)
161 .args(["config", "--local", "user.email", "chainlink@localhost"])
162 .output();
163 let _ = Command::new("git")
164 .current_dir(&self.cache_dir)
165 .args(["config", "--local", "user.name", "chainlink"])
166 .output();
167 }
168 Ok(())
169 }
170
171 fn count_unpushed_commits(&self) -> usize {
173 let remote_ref = format!("{}/{}", self.remote, HUB_BRANCH);
174 let range = format!("{}..HEAD", remote_ref);
175 match self.git_in_cache(&["rev-list", "--count", &range]) {
176 Ok(output) => String::from_utf8_lossy(&output.stdout)
177 .trim()
178 .parse::<usize>()
179 .unwrap_or(0),
180 Err(_) => 0,
181 }
182 }
183
184 fn check_divergence(&self) -> Result<()> {
186 let ahead = self.count_unpushed_commits();
187 if ahead > MAX_DIVERGENCE {
188 bail!(
189 "Hub branch has diverged: {} local commits ahead of remote \
190 (threshold: {}). Resolve manually with: cd {} && git log --oneline {}/{}..HEAD",
191 ahead,
192 MAX_DIVERGENCE,
193 self.cache_dir.display(),
194 self.remote,
195 HUB_BRANCH
196 );
197 }
198 Ok(())
199 }
200
201 pub fn init_cache(&self) -> Result<()> {
209 if self.cache_dir.exists() {
210 return Ok(());
211 }
212
213 let has_remote = self
215 .git_in_repo(&["ls-remote", "--heads", &self.remote, HUB_BRANCH])
216 .map(|o| !String::from_utf8_lossy(&o.stdout).trim().is_empty())
217 .unwrap_or(false);
218
219 if has_remote {
220 self.git_in_repo(&["fetch", &self.remote, HUB_BRANCH])?;
222
223 let has_local = self
225 .git_in_repo(&["rev-parse", "--verify", HUB_BRANCH])
226 .is_ok();
227
228 if has_local {
229 self.git_in_repo(&["worktree", "add", &self.cache_path_str(), HUB_BRANCH])?;
230 } else {
231 let remote_ref = format!("{}/{}", self.remote, HUB_BRANCH);
232 self.git_in_repo(&[
233 "worktree",
234 "add",
235 "-b",
236 HUB_BRANCH,
237 &self.cache_path_str(),
238 &remote_ref,
239 ])?;
240 }
241 } else {
242 self.git_in_repo(&[
244 "worktree",
245 "add",
246 "--orphan",
247 "-b",
248 HUB_BRANCH,
249 &self.cache_path_str(),
250 ])?;
251
252 let locks = LocksFile::empty();
254 locks.save(&self.cache_dir.join("locks.json"))?;
255 std::fs::create_dir_all(self.cache_dir.join("heartbeats"))?;
256 std::fs::create_dir_all(self.cache_dir.join("trust"))?;
257
258 self.git_in_cache(&["add", "locks.json"])?;
260 self.ensure_cache_git_identity()?;
261 self.git_in_cache(&["commit", "-m", "Initialize chainlink/locks branch"])?;
262 }
263
264 self.ensure_cache_git_identity()?;
265 Ok(())
266 }
267
268 pub fn hub_health_check(&self) -> Result<()> {
272 if !self.cache_dir.exists() {
273 return Ok(());
274 }
275
276 let git_dir = match self.git_in_cache(&["rev-parse", "--git-dir"]) {
277 Ok(output) => {
278 let raw = String::from_utf8_lossy(&output.stdout).trim().to_string();
279 let path = PathBuf::from(&raw);
280 if path.is_absolute() {
281 path
282 } else {
283 self.cache_dir.join(path)
284 }
285 }
286 Err(_) => return Ok(()),
287 };
288
289 let index_lock = git_dir.join("index.lock");
291 if index_lock.exists() {
292 tracing::warn!("removing index.lock from hub cache before recovery");
293 let _ = std::fs::remove_file(&index_lock);
294 }
295
296 let rebase_merge = git_dir.join("rebase-merge");
298 let rebase_apply = git_dir.join("rebase-apply");
299 if rebase_merge.exists() || rebase_apply.exists() {
300 tracing::warn!("hub cache is stuck in mid-rebase state, aborting to recover");
301 let _ = self.git_in_cache(&["rebase", "--abort"]);
302 if rebase_merge.exists() {
303 let _ = std::fs::remove_dir_all(&rebase_merge);
304 }
305 if rebase_apply.exists() {
306 let _ = std::fs::remove_dir_all(&rebase_apply);
307 }
308 if index_lock.exists() {
309 let _ = std::fs::remove_file(&index_lock);
310 }
311 }
312
313 if self.git_in_cache(&["symbolic-ref", "HEAD"]).is_err() {
315 tracing::warn!("hub cache HEAD is detached, re-attaching to {}", HUB_BRANCH);
316 if self.git_in_cache(&["checkout", HUB_BRANCH]).is_err() {
317 let _ = self.git_in_cache(&["branch", "-f", HUB_BRANCH, "HEAD"]);
318 let _ = self.git_in_cache(&["checkout", HUB_BRANCH]);
319 }
320 }
321
322 Ok(())
323 }
324
325 fn clean_dirty_state(&self) -> Result<bool> {
327 let status = self.git_in_cache(&["status", "--porcelain"]);
328 match status {
329 Ok(output) => {
330 let stdout = String::from_utf8_lossy(&output.stdout);
331 if stdout.trim().is_empty() {
332 return Ok(false);
333 }
334 if self.git_in_cache(&["add", "-A"]).is_err() {
335 tracing::warn!(
336 "git add -A failed in dirty state cleanup, escalating to reset --hard HEAD"
337 );
338 self.git_in_cache(&["reset", "--hard", "HEAD"])?;
339 return Ok(true);
340 }
341 let commit_result = self.git_in_cache(&[
342 "commit",
343 "-m",
344 "sync: auto-stage dirty hub state (recovery)",
345 ]);
346 match commit_result {
347 Ok(_) => Ok(true),
348 Err(e) => {
349 let err_str = e.to_string();
350 if err_str.contains("nothing to commit")
351 || err_str.contains("no changes added")
352 {
353 Ok(false)
354 } else {
355 Err(e)
356 }
357 }
358 }
359 }
360 Err(_) => Ok(false),
361 }
362 }
363
364 pub fn fetch(&self) -> Result<()> {
368 self.hub_health_check()?;
369
370 let fetch_result = self.git_in_cache(&["fetch", &self.remote, HUB_BRANCH]);
371 if let Err(e) = &fetch_result {
372 let err_str = e.to_string();
373 if err_str.contains("Could not resolve host")
374 || err_str.contains("Could not read from remote")
375 || err_str.contains("does not appear to be a git repository")
376 || err_str.contains("No such remote")
377 || err_str.contains("couldn't find remote ref")
378 {
379 return Ok(());
380 }
381 fetch_result?;
382 }
383
384 let remote_ref = format!("{}/{}", self.remote, HUB_BRANCH);
386 let log_result = self.git_in_cache(&["log", &format!("{}..HEAD", remote_ref), "--oneline"]);
387
388 match &log_result {
389 Ok(output) => {
390 let stdout = String::from_utf8_lossy(&output.stdout);
391 if !stdout.trim().is_empty() {
392 self.rebase_preserving_local(&remote_ref)?;
393 return Ok(());
394 }
395 }
396 Err(_) => {
397 tracing::warn!("cannot determine unpushed commit count; keeping local state");
398 return Ok(());
399 }
400 }
401
402 let reset_result = self.git_in_cache(&["reset", "--hard", &remote_ref]);
404 if let Err(e) = &reset_result {
405 let err_str = e.to_string();
406 if err_str.contains("unknown revision") || err_str.contains("ambiguous argument") {
407 return Ok(());
408 }
409 reset_result?;
410 }
411
412 Ok(())
413 }
414
415 fn rebase_preserving_local(&self, remote_ref: &str) -> Result<()> {
417 self.check_divergence()?;
418 self.clean_dirty_state()?;
419
420 let rebase_result = self.git_in_cache(&["rebase", remote_ref]);
421 if let Err(e) = &rebase_result {
422 let err_str = e.to_string();
423 if err_str.contains("unknown revision") || err_str.contains("ambiguous argument") {
424 return Ok(());
425 }
426 if let Err(abort_err) = self.git_in_cache(&["rebase", "--abort"]) {
427 tracing::warn!("rebase --abort failed during recovery: {}", abort_err);
428 }
429 tracing::warn!(
430 "rebase onto {} failed; aborted to preserve local commits",
431 remote_ref
432 );
433 return Ok(());
434 }
435
436 Ok(())
437 }
438
439 pub fn read_locks(&self) -> Result<LocksFile> {
443 let path = self.cache_dir.join("locks.json");
444 if !path.exists() {
445 return Ok(LocksFile::empty());
446 }
447 LocksFile::load(&path)
448 }
449
450 pub fn read_keyring(&self) -> Result<Keyring> {
452 let path = self.cache_dir.join("trust").join("keyring.json");
453 if !path.exists() {
454 return Ok(Keyring {
455 trusted_fingerprints: Vec::new(),
456 });
457 }
458 Keyring::load(&path)
459 }
460
461 pub fn verify_locks_signature(&self) -> Result<GpgVerification> {
463 let log_result = self.git_in_cache(&["log", "-1", "--format=%H", "--", "locks.json"]);
465 let commit = match log_result {
466 Ok(output) => {
467 let hash = String::from_utf8_lossy(&output.stdout).trim().to_string();
468 if hash.is_empty() {
469 return Ok(GpgVerification::NoCommits);
470 }
471 hash
472 }
473 Err(_) => return Ok(GpgVerification::NoCommits),
474 };
475
476 let verify_output = Command::new("git")
478 .current_dir(&self.cache_dir)
479 .args(["verify-commit", "--raw", &commit])
480 .output();
481
482 match verify_output {
483 Ok(output) => {
484 let stderr = String::from_utf8_lossy(&output.stderr);
485 if stderr.contains("VALIDSIG") {
486 let fingerprint = stderr
488 .lines()
489 .find(|l| l.contains("VALIDSIG"))
490 .and_then(|l| l.split_whitespace().nth(2))
491 .map(|s| s.to_string());
492 Ok(GpgVerification::Valid {
493 commit,
494 fingerprint,
495 })
496 } else if stderr.contains("ERRSIG") || stderr.contains("BADSIG") {
497 Ok(GpgVerification::Invalid {
498 commit: commit.clone(),
499 reason: stderr.to_string(),
500 })
501 } else if output.status.success() {
502 Ok(GpgVerification::Valid {
504 commit,
505 fingerprint: None,
506 })
507 } else {
508 Ok(GpgVerification::Unsigned { commit })
510 }
511 }
512 Err(_) => {
513 Ok(GpgVerification::Unsigned { commit })
515 }
516 }
517 }
518
519 pub fn claim_lock(
524 &self,
525 agent: &AgentConfig,
526 issue_id: i64,
527 branch: Option<&str>,
528 force: bool,
529 ) -> Result<bool> {
530 for _attempt in 0..3 {
531 let mut locks = self.read_locks()?;
532
533 if let Some(existing) = locks.get_lock(issue_id) {
534 if existing.agent_id == agent.agent_id {
535 return Ok(false); }
537 if !force {
538 bail!(
539 "Issue {} is locked by '{}' (claimed {}). \
540 Use 'chainlink locks steal {}' if the lock is stale.",
541 crate::utils::format_issue_id(issue_id),
542 existing.agent_id,
543 existing.claimed_at.format("%Y-%m-%d %H:%M"),
544 issue_id
545 );
546 }
547 }
548
549 let lock = crate::locks::Lock {
550 agent_id: agent.agent_id.clone(),
551 branch: branch.map(|s| s.to_string()),
552 claimed_at: Utc::now(),
553 signed_by: agent.agent_id.clone(),
554 };
555
556 locks.locks.insert(issue_id.to_string(), lock);
557 locks.save(&self.cache_dir.join("locks.json"))?;
558
559 match self
560 .commit_and_push_locks(&format!("{}: claim lock on #{}", agent.agent_id, issue_id))
561 {
562 Ok(()) => {
563 let verified = LocksFile::load(&self.cache_dir.join("locks.json"))?;
565 match verified.get_lock(issue_id) {
566 Some(lock) if lock.agent_id == agent.agent_id => {
567 return Ok(true);
568 }
569 Some(lock) => {
570 tracing::warn!(
571 "lock claim for issue {} was overwritten by '{}', retrying",
572 crate::utils::format_issue_id(issue_id),
573 lock.agent_id
574 );
575 }
576 None => {
577 tracing::warn!(
578 "lock claim for issue {} was lost during push, retrying",
579 crate::utils::format_issue_id(issue_id)
580 );
581 }
582 }
583 }
584 Err(e) => {
585 let err_str = e.to_string();
586 if err_str.contains("Push failed after") {
587 if self
588 .git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])
589 .is_err()
590 {
591 self.hub_health_check()?;
592 self.git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])?;
593 }
594 } else {
595 return Err(e);
596 }
597 }
598 }
599 }
600
601 bail!(
602 "Failed to claim lock on #{} after 3 attempts due to concurrent updates",
603 issue_id
604 )
605 }
606
607 pub fn release_lock(&self, agent: &AgentConfig, issue_id: i64, force: bool) -> Result<bool> {
611 let locks = self.read_locks()?;
612
613 match locks.get_lock(issue_id) {
614 None => return Ok(false),
615 Some(existing) => {
616 if existing.agent_id != agent.agent_id && !force {
617 bail!(
618 "Issue {} is locked by '{}', not by you ('{}').",
619 crate::utils::format_issue_id(issue_id),
620 existing.agent_id,
621 agent.agent_id
622 );
623 }
624 }
625 }
626
627 for _release_attempt in 0..3 {
628 let mut current_locks = self.read_locks()?;
629 current_locks.locks.remove(&issue_id.to_string());
630 current_locks.save(&self.cache_dir.join("locks.json"))?;
631
632 self.commit_and_push_locks(&format!(
633 "{}: release lock on #{}",
634 agent.agent_id, issue_id
635 ))?;
636
637 let verified = LocksFile::load(&self.cache_dir.join("locks.json"))?;
638 if verified.get_lock(issue_id).is_none() {
639 break;
640 }
641 tracing::warn!(
642 "lock release for issue {} was undone during push, retrying",
643 crate::utils::format_issue_id(issue_id)
644 );
645 }
646
647 Ok(true)
648 }
649
650 fn commit_and_push_locks(&self, message: &str) -> Result<()> {
652 self.git_in_cache(&["add", "locks.json"])?;
653
654 let commit_result = self.git_in_cache(&["commit", "-m", message]);
655 if let Err(e) = &commit_result {
656 let err_str = e.to_string();
657 if err_str.contains("nothing to commit") || err_str.contains("no changes added") {
658 return Ok(());
659 }
660 commit_result?;
661 }
662
663 for attempt in 0..3 {
664 let push_result = self.git_in_cache(&["push", &self.remote, HUB_BRANCH]);
665 match push_result {
666 Ok(_) => return Ok(()),
667 Err(e) => {
668 let err_str = e.to_string();
669 if err_str.contains("Could not resolve host")
670 || err_str.contains("Could not read from remote")
671 {
672 return Ok(()); }
674 if err_str.contains("rejected") || err_str.contains("non-fast-forward") {
675 if attempt < 2 {
676 self.check_divergence()?;
677 if self
678 .git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])
679 .is_err()
680 {
681 self.hub_health_check()?;
682 self.git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])?;
683 }
684 continue;
685 }
686 bail!("Push failed after 3 retries for locks.json");
687 }
688 return Err(e);
689 }
690 }
691 }
692 Ok(())
693 }
694
695 pub fn push_heartbeat(&self, agent: &AgentConfig, active_issue_id: Option<i64>) -> Result<()> {
699 let heartbeat = Heartbeat {
700 agent_id: agent.agent_id.clone(),
701 last_heartbeat: Utc::now(),
702 active_issue_id,
703 machine_id: agent.machine_id.clone(),
704 };
705
706 let hb_dir = self.cache_dir.join("heartbeats");
707 std::fs::create_dir_all(&hb_dir)?;
708
709 let filename = format!("{}.json", agent.agent_id);
710 let path = hb_dir.join(&filename);
711 let json = serde_json::to_string_pretty(&heartbeat)?;
712 std::fs::write(&path, json)?;
713
714 self.git_in_cache(&["add", &format!("heartbeats/{}", filename)])?;
715
716 let msg = format!(
717 "heartbeat: {} at {}",
718 agent.agent_id,
719 Utc::now().format("%Y-%m-%dT%H:%M:%SZ")
720 );
721 let commit_result = self.git_in_cache(&["commit", "-m", &msg]);
722 if let Err(e) = &commit_result {
723 let err_str = e.to_string();
724 if err_str.contains("nothing to commit") || err_str.contains("no changes added") {
725 return Ok(());
726 }
727 commit_result?;
728 }
729
730 let push_result = self.git_in_cache(&["push", &self.remote, HUB_BRANCH]);
732 if let Err(e) = &push_result {
733 let err_str = e.to_string();
734 if err_str.contains("Could not resolve host")
735 || err_str.contains("Could not read from remote")
736 {
737 tracing::warn!("heartbeat push failed (offline), changes saved locally only");
738 return Ok(());
739 }
740 if err_str.contains("rejected") || err_str.contains("non-fast-forward") {
741 self.check_divergence()?;
742 self.clean_dirty_state()?;
743 if self
744 .git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])
745 .is_err()
746 {
747 self.hub_health_check()?;
748 self.git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])?;
749 }
750 if let Err(retry_err) = self.git_in_cache(&["push", &self.remote, HUB_BRANCH]) {
751 tracing::warn!("heartbeat push failed after retry: {}", retry_err);
752 }
753 }
754 }
755
756 Ok(())
757 }
758
759 pub fn read_heartbeats(&self) -> Result<Vec<Heartbeat>> {
761 let dir = self.cache_dir.join("heartbeats");
762 if !dir.exists() {
763 return Ok(Vec::new());
764 }
765 let mut heartbeats = Vec::new();
766 for entry in std::fs::read_dir(&dir)? {
767 let entry = entry?;
768 let path = entry.path();
769 if path.extension().map(|e| e == "json").unwrap_or(false) {
770 let content = std::fs::read_to_string(&path)?;
771 if let Ok(hb) = serde_json::from_str::<Heartbeat>(&content) {
772 heartbeats.push(hb);
773 }
774 }
775 }
776 Ok(heartbeats)
777 }
778
779 pub fn find_stale_locks(&self) -> Result<Vec<(i64, String)>> {
781 let locks = self.read_locks()?;
782 let heartbeats = self.read_heartbeats()?;
783 let timeout = chrono::Duration::minutes(locks.settings.stale_lock_timeout_minutes as i64);
784 let now = Utc::now();
785
786 let mut stale = Vec::new();
787 for (issue_id_str, lock) in &locks.locks {
788 let has_fresh_heartbeat = heartbeats.iter().any(|hb| {
789 hb.agent_id == lock.agent_id
790 && now
791 .signed_duration_since(hb.last_heartbeat)
792 .max(chrono::Duration::zero())
793 < timeout
794 });
795 if !has_fresh_heartbeat {
796 if let Ok(id) = issue_id_str.parse::<i64>() {
797 stale.push((id, lock.agent_id.clone()));
798 }
799 }
800 }
801 Ok(stale)
802 }
803
804 pub fn find_stale_locks_with_age(&self) -> Result<Vec<(i64, String, u64)>> {
806 let locks = self.read_locks()?;
807 let heartbeats = self.read_heartbeats()?;
808 let timeout = chrono::Duration::minutes(locks.settings.stale_lock_timeout_minutes as i64);
809 let now = Utc::now();
810
811 let mut stale = Vec::new();
812 for (issue_id_str, lock) in &locks.locks {
813 let latest_heartbeat = heartbeats
814 .iter()
815 .filter(|hb| hb.agent_id == lock.agent_id)
816 .map(|hb| hb.last_heartbeat)
817 .max();
818
819 let age = match latest_heartbeat {
820 Some(hb_time) => now
821 .signed_duration_since(hb_time)
822 .max(chrono::Duration::zero()),
823 None => now
824 .signed_duration_since(lock.claimed_at)
825 .max(chrono::Duration::zero()),
826 };
827
828 if age >= timeout {
829 if let Ok(id) = issue_id_str.parse::<i64>() {
830 stale.push((id, lock.agent_id.clone(), age.num_minutes() as u64));
831 }
832 }
833 }
834 Ok(stale)
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841 use tempfile::tempdir;
842
843 #[test]
844 fn test_sync_manager_new() {
845 let dir = tempdir().unwrap();
846 let chainlink_dir = dir.path().join(".chainlink");
847 std::fs::create_dir_all(&chainlink_dir).unwrap();
848
849 let sync = SyncManager::new(&chainlink_dir).unwrap();
850 assert!(!sync.is_initialized());
851 assert_eq!(sync.remote(), "origin");
852 }
853
854 #[test]
855 fn test_sync_manager_cache_path() {
856 let dir = tempdir().unwrap();
857 let chainlink_dir = dir.path().join(".chainlink");
858 std::fs::create_dir_all(&chainlink_dir).unwrap();
859
860 let sync = SyncManager::new(&chainlink_dir).unwrap();
861 assert!(sync.cache_path().ends_with(".locks-cache"));
862 }
863
864 #[test]
865 fn test_read_tracker_remote_default() {
866 let dir = tempdir().unwrap();
867 let remote = read_tracker_remote(dir.path());
869 assert_eq!(remote, "origin");
870 }
871
872 #[test]
873 fn test_read_tracker_remote_configured() {
874 let dir = tempdir().unwrap();
875 let config = r#"{"tracker_remote": "upstream"}"#;
876 std::fs::write(dir.path().join("hook-config.json"), config).unwrap();
877
878 let remote = read_tracker_remote(dir.path());
879 assert_eq!(remote, "upstream");
880 }
881
882 #[test]
883 fn test_read_locks_no_cache() {
884 let dir = tempdir().unwrap();
885 let chainlink_dir = dir.path().join(".chainlink");
886 std::fs::create_dir_all(&chainlink_dir).unwrap();
887
888 let sync = SyncManager::new(&chainlink_dir).unwrap();
889 let locks = sync.read_locks().unwrap();
891 assert!(locks.locks.is_empty());
892 }
893
894 #[test]
895 fn test_read_locks_with_file() {
896 let dir = tempdir().unwrap();
897 let chainlink_dir = dir.path().join(".chainlink");
898 let cache_dir = chainlink_dir.join(".locks-cache");
899 std::fs::create_dir_all(&cache_dir).unwrap();
900
901 let locks = LocksFile::empty();
903 locks.save(&cache_dir.join("locks.json")).unwrap();
904
905 let sync = SyncManager::new(&chainlink_dir).unwrap();
906 let loaded = sync.read_locks().unwrap();
907 assert_eq!(loaded.version, 1);
908 assert!(loaded.locks.is_empty());
909 }
910
911 #[test]
912 fn test_read_heartbeats_empty() {
913 let dir = tempdir().unwrap();
914 let chainlink_dir = dir.path().join(".chainlink");
915 let cache_dir = chainlink_dir.join(".locks-cache");
916 std::fs::create_dir_all(&cache_dir).unwrap();
917
918 let sync = SyncManager::new(&chainlink_dir).unwrap();
919 let heartbeats = sync.read_heartbeats().unwrap();
920 assert!(heartbeats.is_empty());
921 }
922
923 #[test]
924 fn test_read_heartbeats_with_files() {
925 let dir = tempdir().unwrap();
926 let chainlink_dir = dir.path().join(".chainlink");
927 let cache_dir = chainlink_dir.join(".locks-cache");
928 let hb_dir = cache_dir.join("heartbeats");
929 std::fs::create_dir_all(&hb_dir).unwrap();
930
931 let hb = Heartbeat {
932 agent_id: "worker-1".to_string(),
933 last_heartbeat: Utc::now(),
934 active_issue_id: Some(5),
935 machine_id: "test-host".to_string(),
936 };
937 let json = serde_json::to_string_pretty(&hb).unwrap();
938 std::fs::write(hb_dir.join("worker-1.json"), json).unwrap();
939
940 let sync = SyncManager::new(&chainlink_dir).unwrap();
941 let heartbeats = sync.read_heartbeats().unwrap();
942 assert_eq!(heartbeats.len(), 1);
943 assert_eq!(heartbeats[0].agent_id, "worker-1");
944 assert_eq!(heartbeats[0].active_issue_id, Some(5));
945 }
946
947 #[test]
948 fn test_read_keyring_missing() {
949 let dir = tempdir().unwrap();
950 let chainlink_dir = dir.path().join(".chainlink");
951 let cache_dir = chainlink_dir.join(".locks-cache");
952 std::fs::create_dir_all(&cache_dir).unwrap();
953
954 let sync = SyncManager::new(&chainlink_dir).unwrap();
955 let keyring = sync.read_keyring().unwrap();
956 assert!(keyring.trusted_fingerprints.is_empty());
957 }
958
959 #[test]
960 fn test_find_stale_locks_no_heartbeats() {
961 let dir = tempdir().unwrap();
962 let chainlink_dir = dir.path().join(".chainlink");
963 let cache_dir = chainlink_dir.join(".locks-cache");
964 std::fs::create_dir_all(&cache_dir).unwrap();
965
966 let mut locks = LocksFile::empty();
968 locks.locks.insert(
969 "5".to_string(),
970 crate::locks::Lock {
971 agent_id: "worker-1".to_string(),
972 branch: None,
973 claimed_at: Utc::now() - chrono::Duration::hours(2),
974 signed_by: "worker-1".to_string(),
975 },
976 );
977 locks.save(&cache_dir.join("locks.json")).unwrap();
978
979 let sync = SyncManager::new(&chainlink_dir).unwrap();
980 let stale = sync.find_stale_locks().unwrap();
981 assert_eq!(stale.len(), 1);
982 assert_eq!(stale[0], (5, "worker-1".to_string()));
983 }
984
985 #[test]
986 fn test_find_stale_locks_fresh_heartbeat() {
987 let dir = tempdir().unwrap();
988 let chainlink_dir = dir.path().join(".chainlink");
989 let cache_dir = chainlink_dir.join(".locks-cache");
990 let hb_dir = cache_dir.join("heartbeats");
991 std::fs::create_dir_all(&hb_dir).unwrap();
992
993 let mut locks = LocksFile::empty();
995 locks.locks.insert(
996 "5".to_string(),
997 crate::locks::Lock {
998 agent_id: "worker-1".to_string(),
999 branch: None,
1000 claimed_at: Utc::now() - chrono::Duration::hours(2),
1001 signed_by: "worker-1".to_string(),
1002 },
1003 );
1004 locks.save(&cache_dir.join("locks.json")).unwrap();
1005
1006 let hb = Heartbeat {
1008 agent_id: "worker-1".to_string(),
1009 last_heartbeat: Utc::now(),
1010 active_issue_id: Some(5),
1011 machine_id: "test".to_string(),
1012 };
1013 std::fs::write(
1014 hb_dir.join("worker-1.json"),
1015 serde_json::to_string(&hb).unwrap(),
1016 )
1017 .unwrap();
1018
1019 let sync = SyncManager::new(&chainlink_dir).unwrap();
1020 let stale = sync.find_stale_locks().unwrap();
1021 assert!(stale.is_empty());
1022 }
1023
1024 #[test]
1025 fn test_gpg_verification_debug() {
1026 let variants = vec![
1027 GpgVerification::Valid {
1028 commit: "abc".to_string(),
1029 fingerprint: Some("FP123".to_string()),
1030 },
1031 GpgVerification::Unsigned {
1032 commit: "def".to_string(),
1033 },
1034 GpgVerification::Invalid {
1035 commit: "ghi".to_string(),
1036 reason: "bad sig".to_string(),
1037 },
1038 GpgVerification::NoCommits,
1039 ];
1040 for v in variants {
1041 let _ = format!("{:?}", v);
1042 }
1043 }
1044}