Skip to main content

chainlink/
sync.rs

1use anyhow::{bail, Context, Result};
2use chrono::Utc;
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::atomic::{AtomicBool, Ordering};
6
7use crate::identity::AgentConfig;
8use crate::locks::{Heartbeat, Keyring, LocksFile};
9
10/// Directory name under .chainlink for the hub cache worktree.
11const HUB_CACHE_DIR: &str = ".locks-cache";
12
13/// The coordination branch name.
14const HUB_BRANCH: &str = "chainlink/locks";
15
16/// Maximum number of local commits ahead of remote before bailing.
17const MAX_DIVERGENCE: usize = 10;
18
19/// GPG signature verification result.
20#[derive(Debug, Clone, PartialEq)]
21pub enum GpgVerification {
22    /// Commit is signed with a valid GPG signature.
23    Valid {
24        commit: String,
25        fingerprint: Option<String>,
26    },
27    /// Commit exists but is not signed.
28    Unsigned { commit: String },
29    /// Commit has an invalid or untrusted signature.
30    Invalid { commit: String, reason: String },
31    /// No commits exist on the branch.
32    NoCommits,
33}
34
35/// Read the configured tracker remote name from `.chainlink/hook-config.json`.
36///
37/// Returns the value of `tracker_remote` if set, otherwise `"origin"`.
38pub fn read_tracker_remote(chainlink_dir: &Path) -> String {
39    let config_path = chainlink_dir.join("hook-config.json");
40    let configured = std::fs::read_to_string(&config_path)
41        .ok()
42        .and_then(|content| serde_json::from_str::<serde_json::Value>(&content).ok())
43        .and_then(|v| {
44            v.get("tracker_remote")
45                .and_then(|r| r.as_str().map(|s| s.to_string()))
46        });
47
48    if let Some(remote) = configured {
49        return remote;
50    }
51
52    // Warn once when falling back to "origin".
53    static WARNED: AtomicBool = AtomicBool::new(false);
54    if !WARNED.swap(true, Ordering::Relaxed) {
55        tracing::warn!(
56            "no tracker_remote configured in {}, defaulting to \"origin\"",
57            config_path.display()
58        );
59    }
60
61    "origin".to_string()
62}
63
64/// Manages synchronization with the `chainlink/locks` coordination branch.
65///
66/// Uses a git worktree at `.chainlink/.locks-cache/` to avoid disturbing
67/// the user's working tree.
68pub struct SyncManager {
69    /// Path to the .chainlink directory.
70    chainlink_dir: PathBuf,
71    /// Path to .chainlink/.locks-cache (worktree of chainlink/locks branch).
72    cache_dir: PathBuf,
73    /// The repo root (parent of .chainlink).
74    repo_root: PathBuf,
75    /// Git remote name (from config, defaults to "origin").
76    remote: String,
77}
78
79impl SyncManager {
80    /// Create a new SyncManager for the given .chainlink directory.
81    pub fn new(chainlink_dir: &Path) -> Result<Self> {
82        let repo_root = chainlink_dir
83            .parent()
84            .ok_or_else(|| anyhow::anyhow!("Cannot determine repo root from .chainlink dir"))?
85            .to_path_buf();
86
87        let cache_dir = chainlink_dir.join(HUB_CACHE_DIR);
88        let remote = read_tracker_remote(chainlink_dir);
89
90        Ok(SyncManager {
91            chainlink_dir: chainlink_dir.to_path_buf(),
92            cache_dir,
93            repo_root,
94            remote,
95        })
96    }
97
98    /// Get the path to the .chainlink directory.
99    pub fn chainlink_dir(&self) -> &Path {
100        &self.chainlink_dir
101    }
102
103    /// Check if the cache directory is initialized.
104    pub fn is_initialized(&self) -> bool {
105        self.cache_dir.exists()
106    }
107
108    /// Get the path to the cache directory.
109    pub fn cache_path(&self) -> &Path {
110        &self.cache_dir
111    }
112
113    /// Get the configured git remote name.
114    pub fn remote(&self) -> &str {
115        &self.remote
116    }
117
118    // --- Git helpers ---
119
120    fn cache_path_str(&self) -> String {
121        self.cache_dir.to_string_lossy().to_string()
122    }
123
124    fn git_in_repo(&self, args: &[&str]) -> Result<std::process::Output> {
125        let output = Command::new("git")
126            .current_dir(&self.repo_root)
127            .args(args)
128            .output()
129            .with_context(|| format!("Failed to run git {:?}", args))?;
130        if !output.status.success() {
131            let stderr = String::from_utf8_lossy(&output.stderr);
132            bail!("git {:?} failed: {}", args, stderr);
133        }
134        Ok(output)
135    }
136
137    fn git_in_cache(&self, args: &[&str]) -> Result<std::process::Output> {
138        let output = Command::new("git")
139            .current_dir(&self.cache_dir)
140            .args(args)
141            .output()
142            .with_context(|| format!("Failed to run git {:?} in cache", args))?;
143        if !output.status.success() {
144            let stderr = String::from_utf8_lossy(&output.stderr);
145            bail!("git {:?} in cache failed: {}", args, stderr);
146        }
147        Ok(output)
148    }
149
150    /// Ensure the cache worktree has a git identity configured.
151    fn ensure_cache_git_identity(&self) -> Result<()> {
152        let has_email = Command::new("git")
153            .current_dir(&self.cache_dir)
154            .args(["config", "user.email"])
155            .output()
156            .map(|o| o.status.success())
157            .unwrap_or(false);
158        if !has_email {
159            let _ = Command::new("git")
160                .current_dir(&self.cache_dir)
161                .args(["config", "--local", "user.email", "chainlink@localhost"])
162                .output();
163            let _ = Command::new("git")
164                .current_dir(&self.cache_dir)
165                .args(["config", "--local", "user.name", "chainlink"])
166                .output();
167        }
168        Ok(())
169    }
170
171    /// Count how many commits the local hub branch is ahead of the remote.
172    fn count_unpushed_commits(&self) -> usize {
173        let remote_ref = format!("{}/{}", self.remote, HUB_BRANCH);
174        let range = format!("{}..HEAD", remote_ref);
175        match self.git_in_cache(&["rev-list", "--count", &range]) {
176            Ok(output) => String::from_utf8_lossy(&output.stdout)
177                .trim()
178                .parse::<usize>()
179                .unwrap_or(0),
180            Err(_) => 0,
181        }
182    }
183
184    /// Check if local has diverged too far from remote and bail if so.
185    fn check_divergence(&self) -> Result<()> {
186        let ahead = self.count_unpushed_commits();
187        if ahead > MAX_DIVERGENCE {
188            bail!(
189                "Hub branch has diverged: {} local commits ahead of remote \
190                 (threshold: {}). Resolve manually with: cd {} && git log --oneline {}/{}..HEAD",
191                ahead,
192                MAX_DIVERGENCE,
193                self.cache_dir.display(),
194                self.remote,
195                HUB_BRANCH
196            );
197        }
198        Ok(())
199    }
200
201    // --- Cache initialization ---
202
203    /// Initialize the hub cache directory.
204    ///
205    /// If the `chainlink/locks` branch exists on the remote, fetches it and
206    /// creates a worktree. If not, creates an orphan branch with an empty
207    /// locks.json.
208    pub fn init_cache(&self) -> Result<()> {
209        if self.cache_dir.exists() {
210            return Ok(());
211        }
212
213        // Check if remote branch exists
214        let has_remote = self
215            .git_in_repo(&["ls-remote", "--heads", &self.remote, HUB_BRANCH])
216            .map(|o| !String::from_utf8_lossy(&o.stdout).trim().is_empty())
217            .unwrap_or(false);
218
219        if has_remote {
220            // Fetch the remote branch
221            self.git_in_repo(&["fetch", &self.remote, HUB_BRANCH])?;
222
223            // Check if a local branch already exists
224            let has_local = self
225                .git_in_repo(&["rev-parse", "--verify", HUB_BRANCH])
226                .is_ok();
227
228            if has_local {
229                self.git_in_repo(&["worktree", "add", &self.cache_path_str(), HUB_BRANCH])?;
230            } else {
231                let remote_ref = format!("{}/{}", self.remote, HUB_BRANCH);
232                self.git_in_repo(&[
233                    "worktree",
234                    "add",
235                    "-b",
236                    HUB_BRANCH,
237                    &self.cache_path_str(),
238                    &remote_ref,
239                ])?;
240            }
241        } else {
242            // No remote branch — create orphan branch with worktree
243            self.git_in_repo(&[
244                "worktree",
245                "add",
246                "--orphan",
247                "-b",
248                HUB_BRANCH,
249                &self.cache_path_str(),
250            ])?;
251
252            // Initialize with empty locks.json and directory structure
253            let locks = LocksFile::empty();
254            locks.save(&self.cache_dir.join("locks.json"))?;
255            std::fs::create_dir_all(self.cache_dir.join("heartbeats"))?;
256            std::fs::create_dir_all(self.cache_dir.join("trust"))?;
257
258            // Commit the initial state
259            self.git_in_cache(&["add", "locks.json"])?;
260            self.ensure_cache_git_identity()?;
261            self.git_in_cache(&["commit", "-m", "Initialize chainlink/locks branch"])?;
262        }
263
264        self.ensure_cache_git_identity()?;
265        Ok(())
266    }
267
268    // --- Health checks ---
269
270    /// Detect and recover from broken git states in the hub cache worktree.
271    pub fn hub_health_check(&self) -> Result<()> {
272        if !self.cache_dir.exists() {
273            return Ok(());
274        }
275
276        let git_dir = match self.git_in_cache(&["rev-parse", "--git-dir"]) {
277            Ok(output) => {
278                let raw = String::from_utf8_lossy(&output.stdout).trim().to_string();
279                let path = PathBuf::from(&raw);
280                if path.is_absolute() {
281                    path
282                } else {
283                    self.cache_dir.join(path)
284                }
285            }
286            Err(_) => return Ok(()),
287        };
288
289        // Fix 0: Remove index.lock first
290        let index_lock = git_dir.join("index.lock");
291        if index_lock.exists() {
292            tracing::warn!("removing index.lock from hub cache before recovery");
293            let _ = std::fs::remove_file(&index_lock);
294        }
295
296        // Fix 1: Mid-rebase state
297        let rebase_merge = git_dir.join("rebase-merge");
298        let rebase_apply = git_dir.join("rebase-apply");
299        if rebase_merge.exists() || rebase_apply.exists() {
300            tracing::warn!("hub cache is stuck in mid-rebase state, aborting to recover");
301            let _ = self.git_in_cache(&["rebase", "--abort"]);
302            if rebase_merge.exists() {
303                let _ = std::fs::remove_dir_all(&rebase_merge);
304            }
305            if rebase_apply.exists() {
306                let _ = std::fs::remove_dir_all(&rebase_apply);
307            }
308            if index_lock.exists() {
309                let _ = std::fs::remove_file(&index_lock);
310            }
311        }
312
313        // Fix 2: Detached HEAD
314        if self.git_in_cache(&["symbolic-ref", "HEAD"]).is_err() {
315            tracing::warn!("hub cache HEAD is detached, re-attaching to {}", HUB_BRANCH);
316            if self.git_in_cache(&["checkout", HUB_BRANCH]).is_err() {
317                let _ = self.git_in_cache(&["branch", "-f", HUB_BRANCH, "HEAD"]);
318                let _ = self.git_in_cache(&["checkout", HUB_BRANCH]);
319            }
320        }
321
322        Ok(())
323    }
324
325    /// Detect and resolve dirty hub cache state.
326    fn clean_dirty_state(&self) -> Result<bool> {
327        let status = self.git_in_cache(&["status", "--porcelain"]);
328        match status {
329            Ok(output) => {
330                let stdout = String::from_utf8_lossy(&output.stdout);
331                if stdout.trim().is_empty() {
332                    return Ok(false);
333                }
334                if self.git_in_cache(&["add", "-A"]).is_err() {
335                    tracing::warn!(
336                        "git add -A failed in dirty state cleanup, escalating to reset --hard HEAD"
337                    );
338                    self.git_in_cache(&["reset", "--hard", "HEAD"])?;
339                    return Ok(true);
340                }
341                let commit_result = self.git_in_cache(&[
342                    "commit",
343                    "-m",
344                    "sync: auto-stage dirty hub state (recovery)",
345                ]);
346                match commit_result {
347                    Ok(_) => Ok(true),
348                    Err(e) => {
349                        let err_str = e.to_string();
350                        if err_str.contains("nothing to commit")
351                            || err_str.contains("no changes added")
352                        {
353                            Ok(false)
354                        } else {
355                            Err(e)
356                        }
357                    }
358                }
359            }
360            Err(_) => Ok(false),
361        }
362    }
363
364    // --- Fetch ---
365
366    /// Fetch the latest state from remote and integrate changes.
367    pub fn fetch(&self) -> Result<()> {
368        self.hub_health_check()?;
369
370        let fetch_result = self.git_in_cache(&["fetch", &self.remote, HUB_BRANCH]);
371        if let Err(e) = &fetch_result {
372            let err_str = e.to_string();
373            if err_str.contains("Could not resolve host")
374                || err_str.contains("Could not read from remote")
375                || err_str.contains("does not appear to be a git repository")
376                || err_str.contains("No such remote")
377                || err_str.contains("couldn't find remote ref")
378            {
379                return Ok(());
380            }
381            fetch_result?;
382        }
383
384        // Check for unpushed local commits
385        let remote_ref = format!("{}/{}", self.remote, HUB_BRANCH);
386        let log_result = self.git_in_cache(&["log", &format!("{}..HEAD", remote_ref), "--oneline"]);
387
388        match &log_result {
389            Ok(output) => {
390                let stdout = String::from_utf8_lossy(&output.stdout);
391                if !stdout.trim().is_empty() {
392                    self.rebase_preserving_local(&remote_ref)?;
393                    return Ok(());
394                }
395            }
396            Err(_) => {
397                tracing::warn!("cannot determine unpushed commit count; keeping local state");
398                return Ok(());
399            }
400        }
401
402        // No unpushed commits — safe to reset to match remote
403        let reset_result = self.git_in_cache(&["reset", "--hard", &remote_ref]);
404        if let Err(e) = &reset_result {
405            let err_str = e.to_string();
406            if err_str.contains("unknown revision") || err_str.contains("ambiguous argument") {
407                return Ok(());
408            }
409            reset_result?;
410        }
411
412        Ok(())
413    }
414
415    /// Rebase local unpushed commits on top of the remote ref.
416    fn rebase_preserving_local(&self, remote_ref: &str) -> Result<()> {
417        self.check_divergence()?;
418        self.clean_dirty_state()?;
419
420        let rebase_result = self.git_in_cache(&["rebase", remote_ref]);
421        if let Err(e) = &rebase_result {
422            let err_str = e.to_string();
423            if err_str.contains("unknown revision") || err_str.contains("ambiguous argument") {
424                return Ok(());
425            }
426            if let Err(abort_err) = self.git_in_cache(&["rebase", "--abort"]) {
427                tracing::warn!("rebase --abort failed during recovery: {}", abort_err);
428            }
429            tracing::warn!(
430                "rebase onto {} failed; aborted to preserve local commits",
431                remote_ref
432            );
433            return Ok(());
434        }
435
436        Ok(())
437    }
438
439    // --- Locks ---
440
441    /// Read the current locks file from the cache.
442    pub fn read_locks(&self) -> Result<LocksFile> {
443        let path = self.cache_dir.join("locks.json");
444        if !path.exists() {
445            return Ok(LocksFile::empty());
446        }
447        LocksFile::load(&path)
448    }
449
450    /// Read the trust keyring from the cache.
451    pub fn read_keyring(&self) -> Result<Keyring> {
452        let path = self.cache_dir.join("trust").join("keyring.json");
453        if !path.exists() {
454            return Ok(Keyring {
455                trusted_fingerprints: Vec::new(),
456            });
457        }
458        Keyring::load(&path)
459    }
460
461    /// Verify GPG signature on the last commit that touched locks.json.
462    pub fn verify_locks_signature(&self) -> Result<GpgVerification> {
463        // Find the last commit that modified locks.json
464        let log_result = self.git_in_cache(&["log", "-1", "--format=%H", "--", "locks.json"]);
465        let commit = match log_result {
466            Ok(output) => {
467                let hash = String::from_utf8_lossy(&output.stdout).trim().to_string();
468                if hash.is_empty() {
469                    return Ok(GpgVerification::NoCommits);
470                }
471                hash
472            }
473            Err(_) => return Ok(GpgVerification::NoCommits),
474        };
475
476        // Try to verify the commit's signature
477        let verify_output = Command::new("git")
478            .current_dir(&self.cache_dir)
479            .args(["verify-commit", "--raw", &commit])
480            .output();
481
482        match verify_output {
483            Ok(output) => {
484                let stderr = String::from_utf8_lossy(&output.stderr);
485                if stderr.contains("VALIDSIG") {
486                    // Extract fingerprint from VALIDSIG line
487                    let fingerprint = stderr
488                        .lines()
489                        .find(|l| l.contains("VALIDSIG"))
490                        .and_then(|l| l.split_whitespace().nth(2))
491                        .map(|s| s.to_string());
492                    Ok(GpgVerification::Valid {
493                        commit,
494                        fingerprint,
495                    })
496                } else if stderr.contains("ERRSIG") || stderr.contains("BADSIG") {
497                    Ok(GpgVerification::Invalid {
498                        commit: commit.clone(),
499                        reason: stderr.to_string(),
500                    })
501                } else if output.status.success() {
502                    // verify-commit succeeded but no VALIDSIG — unusual
503                    Ok(GpgVerification::Valid {
504                        commit,
505                        fingerprint: None,
506                    })
507                } else {
508                    // verify-commit failed — commit is unsigned
509                    Ok(GpgVerification::Unsigned { commit })
510                }
511            }
512            Err(_) => {
513                // GPG not available — can't verify
514                Ok(GpgVerification::Unsigned { commit })
515            }
516        }
517    }
518
519    /// Claim a lock on an issue for the given agent.
520    ///
521    /// Returns `Ok(true)` if newly claimed, `Ok(false)` if already held by self.
522    /// Fails if locked by another agent (unless `force` is true for steal).
523    pub fn claim_lock(
524        &self,
525        agent: &AgentConfig,
526        issue_id: i64,
527        branch: Option<&str>,
528        force: bool,
529    ) -> Result<bool> {
530        for _attempt in 0..3 {
531            let mut locks = self.read_locks()?;
532
533            if let Some(existing) = locks.get_lock(issue_id) {
534                if existing.agent_id == agent.agent_id {
535                    return Ok(false); // Already held by self
536                }
537                if !force {
538                    bail!(
539                        "Issue {} is locked by '{}' (claimed {}). \
540                         Use 'chainlink locks steal {}' if the lock is stale.",
541                        crate::utils::format_issue_id(issue_id),
542                        existing.agent_id,
543                        existing.claimed_at.format("%Y-%m-%d %H:%M"),
544                        issue_id
545                    );
546                }
547            }
548
549            let lock = crate::locks::Lock {
550                agent_id: agent.agent_id.clone(),
551                branch: branch.map(|s| s.to_string()),
552                claimed_at: Utc::now(),
553                signed_by: agent.agent_id.clone(),
554            };
555
556            locks.locks.insert(issue_id.to_string(), lock);
557            locks.save(&self.cache_dir.join("locks.json"))?;
558
559            match self
560                .commit_and_push_locks(&format!("{}: claim lock on #{}", agent.agent_id, issue_id))
561            {
562                Ok(()) => {
563                    // Verify our claim survived any rebase during push
564                    let verified = LocksFile::load(&self.cache_dir.join("locks.json"))?;
565                    match verified.get_lock(issue_id) {
566                        Some(lock) if lock.agent_id == agent.agent_id => {
567                            return Ok(true);
568                        }
569                        Some(lock) => {
570                            tracing::warn!(
571                                "lock claim for issue {} was overwritten by '{}', retrying",
572                                crate::utils::format_issue_id(issue_id),
573                                lock.agent_id
574                            );
575                        }
576                        None => {
577                            tracing::warn!(
578                                "lock claim for issue {} was lost during push, retrying",
579                                crate::utils::format_issue_id(issue_id)
580                            );
581                        }
582                    }
583                }
584                Err(e) => {
585                    let err_str = e.to_string();
586                    if err_str.contains("Push failed after") {
587                        if self
588                            .git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])
589                            .is_err()
590                        {
591                            self.hub_health_check()?;
592                            self.git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])?;
593                        }
594                    } else {
595                        return Err(e);
596                    }
597                }
598            }
599        }
600
601        bail!(
602            "Failed to claim lock on #{} after 3 attempts due to concurrent updates",
603            issue_id
604        )
605    }
606
607    /// Release a lock on an issue.
608    ///
609    /// Returns `Ok(true)` if released, `Ok(false)` if not locked.
610    pub fn release_lock(&self, agent: &AgentConfig, issue_id: i64, force: bool) -> Result<bool> {
611        let locks = self.read_locks()?;
612
613        match locks.get_lock(issue_id) {
614            None => return Ok(false),
615            Some(existing) => {
616                if existing.agent_id != agent.agent_id && !force {
617                    bail!(
618                        "Issue {} is locked by '{}', not by you ('{}').",
619                        crate::utils::format_issue_id(issue_id),
620                        existing.agent_id,
621                        agent.agent_id
622                    );
623                }
624            }
625        }
626
627        for _release_attempt in 0..3 {
628            let mut current_locks = self.read_locks()?;
629            current_locks.locks.remove(&issue_id.to_string());
630            current_locks.save(&self.cache_dir.join("locks.json"))?;
631
632            self.commit_and_push_locks(&format!(
633                "{}: release lock on #{}",
634                agent.agent_id, issue_id
635            ))?;
636
637            let verified = LocksFile::load(&self.cache_dir.join("locks.json"))?;
638            if verified.get_lock(issue_id).is_none() {
639                break;
640            }
641            tracing::warn!(
642                "lock release for issue {} was undone during push, retrying",
643                crate::utils::format_issue_id(issue_id)
644            );
645        }
646
647        Ok(true)
648    }
649
650    /// Stage locks.json, commit, and push with rebase-retry.
651    fn commit_and_push_locks(&self, message: &str) -> Result<()> {
652        self.git_in_cache(&["add", "locks.json"])?;
653
654        let commit_result = self.git_in_cache(&["commit", "-m", message]);
655        if let Err(e) = &commit_result {
656            let err_str = e.to_string();
657            if err_str.contains("nothing to commit") || err_str.contains("no changes added") {
658                return Ok(());
659            }
660            commit_result?;
661        }
662
663        for attempt in 0..3 {
664            let push_result = self.git_in_cache(&["push", &self.remote, HUB_BRANCH]);
665            match push_result {
666                Ok(_) => return Ok(()),
667                Err(e) => {
668                    let err_str = e.to_string();
669                    if err_str.contains("Could not resolve host")
670                        || err_str.contains("Could not read from remote")
671                    {
672                        return Ok(()); // Offline — commit is local
673                    }
674                    if err_str.contains("rejected") || err_str.contains("non-fast-forward") {
675                        if attempt < 2 {
676                            self.check_divergence()?;
677                            if self
678                                .git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])
679                                .is_err()
680                            {
681                                self.hub_health_check()?;
682                                self.git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])?;
683                            }
684                            continue;
685                        }
686                        bail!("Push failed after 3 retries for locks.json");
687                    }
688                    return Err(e);
689                }
690            }
691        }
692        Ok(())
693    }
694
695    // --- Heartbeats ---
696
697    /// Write and push a heartbeat file for this agent.
698    pub fn push_heartbeat(&self, agent: &AgentConfig, active_issue_id: Option<i64>) -> Result<()> {
699        let heartbeat = Heartbeat {
700            agent_id: agent.agent_id.clone(),
701            last_heartbeat: Utc::now(),
702            active_issue_id,
703            machine_id: agent.machine_id.clone(),
704        };
705
706        let hb_dir = self.cache_dir.join("heartbeats");
707        std::fs::create_dir_all(&hb_dir)?;
708
709        let filename = format!("{}.json", agent.agent_id);
710        let path = hb_dir.join(&filename);
711        let json = serde_json::to_string_pretty(&heartbeat)?;
712        std::fs::write(&path, json)?;
713
714        self.git_in_cache(&["add", &format!("heartbeats/{}", filename)])?;
715
716        let msg = format!(
717            "heartbeat: {} at {}",
718            agent.agent_id,
719            Utc::now().format("%Y-%m-%dT%H:%M:%SZ")
720        );
721        let commit_result = self.git_in_cache(&["commit", "-m", &msg]);
722        if let Err(e) = &commit_result {
723            let err_str = e.to_string();
724            if err_str.contains("nothing to commit") || err_str.contains("no changes added") {
725                return Ok(());
726            }
727            commit_result?;
728        }
729
730        // Push (best-effort)
731        let push_result = self.git_in_cache(&["push", &self.remote, HUB_BRANCH]);
732        if let Err(e) = &push_result {
733            let err_str = e.to_string();
734            if err_str.contains("Could not resolve host")
735                || err_str.contains("Could not read from remote")
736            {
737                tracing::warn!("heartbeat push failed (offline), changes saved locally only");
738                return Ok(());
739            }
740            if err_str.contains("rejected") || err_str.contains("non-fast-forward") {
741                self.check_divergence()?;
742                self.clean_dirty_state()?;
743                if self
744                    .git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])
745                    .is_err()
746                {
747                    self.hub_health_check()?;
748                    self.git_in_cache(&["pull", "--rebase", &self.remote, HUB_BRANCH])?;
749                }
750                if let Err(retry_err) = self.git_in_cache(&["push", &self.remote, HUB_BRANCH]) {
751                    tracing::warn!("heartbeat push failed after retry: {}", retry_err);
752                }
753            }
754        }
755
756        Ok(())
757    }
758
759    /// Read all heartbeat files from the cache.
760    pub fn read_heartbeats(&self) -> Result<Vec<Heartbeat>> {
761        let dir = self.cache_dir.join("heartbeats");
762        if !dir.exists() {
763            return Ok(Vec::new());
764        }
765        let mut heartbeats = Vec::new();
766        for entry in std::fs::read_dir(&dir)? {
767            let entry = entry?;
768            let path = entry.path();
769            if path.extension().map(|e| e == "json").unwrap_or(false) {
770                let content = std::fs::read_to_string(&path)?;
771                if let Ok(hb) = serde_json::from_str::<Heartbeat>(&content) {
772                    heartbeats.push(hb);
773                }
774            }
775        }
776        Ok(heartbeats)
777    }
778
779    /// Find locks that have gone stale (no heartbeat within the timeout).
780    pub fn find_stale_locks(&self) -> Result<Vec<(i64, String)>> {
781        let locks = self.read_locks()?;
782        let heartbeats = self.read_heartbeats()?;
783        let timeout = chrono::Duration::minutes(locks.settings.stale_lock_timeout_minutes as i64);
784        let now = Utc::now();
785
786        let mut stale = Vec::new();
787        for (issue_id_str, lock) in &locks.locks {
788            let has_fresh_heartbeat = heartbeats.iter().any(|hb| {
789                hb.agent_id == lock.agent_id
790                    && now
791                        .signed_duration_since(hb.last_heartbeat)
792                        .max(chrono::Duration::zero())
793                        < timeout
794            });
795            if !has_fresh_heartbeat {
796                if let Ok(id) = issue_id_str.parse::<i64>() {
797                    stale.push((id, lock.agent_id.clone()));
798                }
799            }
800        }
801        Ok(stale)
802    }
803
804    /// Find stale locks with their age in minutes.
805    pub fn find_stale_locks_with_age(&self) -> Result<Vec<(i64, String, u64)>> {
806        let locks = self.read_locks()?;
807        let heartbeats = self.read_heartbeats()?;
808        let timeout = chrono::Duration::minutes(locks.settings.stale_lock_timeout_minutes as i64);
809        let now = Utc::now();
810
811        let mut stale = Vec::new();
812        for (issue_id_str, lock) in &locks.locks {
813            let latest_heartbeat = heartbeats
814                .iter()
815                .filter(|hb| hb.agent_id == lock.agent_id)
816                .map(|hb| hb.last_heartbeat)
817                .max();
818
819            let age = match latest_heartbeat {
820                Some(hb_time) => now
821                    .signed_duration_since(hb_time)
822                    .max(chrono::Duration::zero()),
823                None => now
824                    .signed_duration_since(lock.claimed_at)
825                    .max(chrono::Duration::zero()),
826            };
827
828            if age >= timeout {
829                if let Ok(id) = issue_id_str.parse::<i64>() {
830                    stale.push((id, lock.agent_id.clone(), age.num_minutes() as u64));
831                }
832            }
833        }
834        Ok(stale)
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use tempfile::tempdir;
842
843    #[test]
844    fn test_sync_manager_new() {
845        let dir = tempdir().unwrap();
846        let chainlink_dir = dir.path().join(".chainlink");
847        std::fs::create_dir_all(&chainlink_dir).unwrap();
848
849        let sync = SyncManager::new(&chainlink_dir).unwrap();
850        assert!(!sync.is_initialized());
851        assert_eq!(sync.remote(), "origin");
852    }
853
854    #[test]
855    fn test_sync_manager_cache_path() {
856        let dir = tempdir().unwrap();
857        let chainlink_dir = dir.path().join(".chainlink");
858        std::fs::create_dir_all(&chainlink_dir).unwrap();
859
860        let sync = SyncManager::new(&chainlink_dir).unwrap();
861        assert!(sync.cache_path().ends_with(".locks-cache"));
862    }
863
864    #[test]
865    fn test_read_tracker_remote_default() {
866        let dir = tempdir().unwrap();
867        // No hook-config.json — should default to "origin"
868        let remote = read_tracker_remote(dir.path());
869        assert_eq!(remote, "origin");
870    }
871
872    #[test]
873    fn test_read_tracker_remote_configured() {
874        let dir = tempdir().unwrap();
875        let config = r#"{"tracker_remote": "upstream"}"#;
876        std::fs::write(dir.path().join("hook-config.json"), config).unwrap();
877
878        let remote = read_tracker_remote(dir.path());
879        assert_eq!(remote, "upstream");
880    }
881
882    #[test]
883    fn test_read_locks_no_cache() {
884        let dir = tempdir().unwrap();
885        let chainlink_dir = dir.path().join(".chainlink");
886        std::fs::create_dir_all(&chainlink_dir).unwrap();
887
888        let sync = SyncManager::new(&chainlink_dir).unwrap();
889        // Cache doesn't exist, so this should return empty
890        let locks = sync.read_locks().unwrap();
891        assert!(locks.locks.is_empty());
892    }
893
894    #[test]
895    fn test_read_locks_with_file() {
896        let dir = tempdir().unwrap();
897        let chainlink_dir = dir.path().join(".chainlink");
898        let cache_dir = chainlink_dir.join(".locks-cache");
899        std::fs::create_dir_all(&cache_dir).unwrap();
900
901        // Write a locks.json into the cache dir
902        let locks = LocksFile::empty();
903        locks.save(&cache_dir.join("locks.json")).unwrap();
904
905        let sync = SyncManager::new(&chainlink_dir).unwrap();
906        let loaded = sync.read_locks().unwrap();
907        assert_eq!(loaded.version, 1);
908        assert!(loaded.locks.is_empty());
909    }
910
911    #[test]
912    fn test_read_heartbeats_empty() {
913        let dir = tempdir().unwrap();
914        let chainlink_dir = dir.path().join(".chainlink");
915        let cache_dir = chainlink_dir.join(".locks-cache");
916        std::fs::create_dir_all(&cache_dir).unwrap();
917
918        let sync = SyncManager::new(&chainlink_dir).unwrap();
919        let heartbeats = sync.read_heartbeats().unwrap();
920        assert!(heartbeats.is_empty());
921    }
922
923    #[test]
924    fn test_read_heartbeats_with_files() {
925        let dir = tempdir().unwrap();
926        let chainlink_dir = dir.path().join(".chainlink");
927        let cache_dir = chainlink_dir.join(".locks-cache");
928        let hb_dir = cache_dir.join("heartbeats");
929        std::fs::create_dir_all(&hb_dir).unwrap();
930
931        let hb = Heartbeat {
932            agent_id: "worker-1".to_string(),
933            last_heartbeat: Utc::now(),
934            active_issue_id: Some(5),
935            machine_id: "test-host".to_string(),
936        };
937        let json = serde_json::to_string_pretty(&hb).unwrap();
938        std::fs::write(hb_dir.join("worker-1.json"), json).unwrap();
939
940        let sync = SyncManager::new(&chainlink_dir).unwrap();
941        let heartbeats = sync.read_heartbeats().unwrap();
942        assert_eq!(heartbeats.len(), 1);
943        assert_eq!(heartbeats[0].agent_id, "worker-1");
944        assert_eq!(heartbeats[0].active_issue_id, Some(5));
945    }
946
947    #[test]
948    fn test_read_keyring_missing() {
949        let dir = tempdir().unwrap();
950        let chainlink_dir = dir.path().join(".chainlink");
951        let cache_dir = chainlink_dir.join(".locks-cache");
952        std::fs::create_dir_all(&cache_dir).unwrap();
953
954        let sync = SyncManager::new(&chainlink_dir).unwrap();
955        let keyring = sync.read_keyring().unwrap();
956        assert!(keyring.trusted_fingerprints.is_empty());
957    }
958
959    #[test]
960    fn test_find_stale_locks_no_heartbeats() {
961        let dir = tempdir().unwrap();
962        let chainlink_dir = dir.path().join(".chainlink");
963        let cache_dir = chainlink_dir.join(".locks-cache");
964        std::fs::create_dir_all(&cache_dir).unwrap();
965
966        // Write locks with one entry
967        let mut locks = LocksFile::empty();
968        locks.locks.insert(
969            "5".to_string(),
970            crate::locks::Lock {
971                agent_id: "worker-1".to_string(),
972                branch: None,
973                claimed_at: Utc::now() - chrono::Duration::hours(2),
974                signed_by: "worker-1".to_string(),
975            },
976        );
977        locks.save(&cache_dir.join("locks.json")).unwrap();
978
979        let sync = SyncManager::new(&chainlink_dir).unwrap();
980        let stale = sync.find_stale_locks().unwrap();
981        assert_eq!(stale.len(), 1);
982        assert_eq!(stale[0], (5, "worker-1".to_string()));
983    }
984
985    #[test]
986    fn test_find_stale_locks_fresh_heartbeat() {
987        let dir = tempdir().unwrap();
988        let chainlink_dir = dir.path().join(".chainlink");
989        let cache_dir = chainlink_dir.join(".locks-cache");
990        let hb_dir = cache_dir.join("heartbeats");
991        std::fs::create_dir_all(&hb_dir).unwrap();
992
993        // Write lock
994        let mut locks = LocksFile::empty();
995        locks.locks.insert(
996            "5".to_string(),
997            crate::locks::Lock {
998                agent_id: "worker-1".to_string(),
999                branch: None,
1000                claimed_at: Utc::now() - chrono::Duration::hours(2),
1001                signed_by: "worker-1".to_string(),
1002            },
1003        );
1004        locks.save(&cache_dir.join("locks.json")).unwrap();
1005
1006        // Write fresh heartbeat
1007        let hb = Heartbeat {
1008            agent_id: "worker-1".to_string(),
1009            last_heartbeat: Utc::now(),
1010            active_issue_id: Some(5),
1011            machine_id: "test".to_string(),
1012        };
1013        std::fs::write(
1014            hb_dir.join("worker-1.json"),
1015            serde_json::to_string(&hb).unwrap(),
1016        )
1017        .unwrap();
1018
1019        let sync = SyncManager::new(&chainlink_dir).unwrap();
1020        let stale = sync.find_stale_locks().unwrap();
1021        assert!(stale.is_empty());
1022    }
1023
1024    #[test]
1025    fn test_gpg_verification_debug() {
1026        let variants = vec![
1027            GpgVerification::Valid {
1028                commit: "abc".to_string(),
1029                fingerprint: Some("FP123".to_string()),
1030            },
1031            GpgVerification::Unsigned {
1032                commit: "def".to_string(),
1033            },
1034            GpgVerification::Invalid {
1035                commit: "ghi".to_string(),
1036                reason: "bad sig".to_string(),
1037            },
1038            GpgVerification::NoCommits,
1039        ];
1040        for v in variants {
1041            let _ = format!("{:?}", v);
1042        }
1043    }
1044}