use std::fs;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Duration;
use anyhow::{bail, Result};
use rusqlite::Connection;
use crate::forge::{ForgeReader, Issue, IssueState, PrState, PullRequest};
use super::SyncStats;
const DELAY_BETWEEN_REQUESTS: Duration = Duration::from_millis(750);
const BATCH_SIZE: usize = 50;
fn pid_file_path(repo: &str) -> PathBuf {
let safe_name = repo.replace('/', "-");
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(format!(".patina/run/forge-sync-{}.pid", safe_name))
}
pub(crate) fn log_file_path(repo: &str) -> PathBuf {
let safe_name = repo.replace('/', "-");
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(format!(".patina/logs/forge-sync-{}.log", safe_name))
}
fn process_is_running(pid: u32) -> bool {
#[cfg(unix)]
unsafe {
libc::kill(pid as i32, 0) == 0
}
#[cfg(not(unix))]
{
false
}
}
pub(crate) struct SyncGuard {
pid_file: PathBuf,
}
impl Drop for SyncGuard {
fn drop(&mut self) {
let _ = fs::remove_file(&self.pid_file);
}
}
pub(crate) fn can_start_sync(repo: &str) -> Result<SyncGuard> {
let pid_file = pid_file_path(repo);
if let Some(parent) = pid_file.parent() {
fs::create_dir_all(parent)?;
}
if pid_file.exists() {
let content = fs::read_to_string(&pid_file)?;
if let Ok(pid) = content.trim().parse::<u32>() {
if process_is_running(pid) {
bail!("Already syncing (PID {}). Check: --status", pid);
}
}
fs::remove_file(&pid_file)?;
}
Ok(SyncGuard { pid_file })
}
pub(crate) fn is_sync_running(repo: &str) -> Option<u32> {
let pid_file = pid_file_path(repo);
if pid_file.exists() {
if let Ok(content) = fs::read_to_string(&pid_file) {
if let Ok(pid) = content.trim().parse::<u32>() {
if process_is_running(pid) {
return Some(pid);
}
}
}
let _ = fs::remove_file(&pid_file);
}
None
}
pub(crate) fn sync_forge(
conn: &Connection,
reader: &dyn ForgeReader,
repo: &str,
) -> Result<SyncStats> {
let discovered = discover_refs(conn, repo)?;
let pending_refs = get_pending_refs(conn, repo, BATCH_SIZE)?;
let total_pending = count_pending_refs(conn, repo)?;
if pending_refs.is_empty() {
return Ok(SyncStats {
discovered,
resolved: 0,
pending: 0,
errors: 0,
cache_hits: 0,
});
}
println!(
" Forge sync: {} pending refs, processing batch of {}",
total_pending,
pending_refs.len()
);
let mut resolved = 0;
let mut errors = 0;
let mut cache_hits = 0;
for ref_num in &pending_refs {
sleep(DELAY_BETWEEN_REQUESTS);
match resolve_ref(conn, reader, repo, *ref_num) {
Ok(was_cached) => {
resolved += 1;
if was_cached {
cache_hits += 1;
}
}
Err(e) => {
errors += 1;
eprintln!(" ⚠️ #{}: {}", ref_num, e);
}
}
}
Ok(SyncStats {
discovered,
resolved,
pending: total_pending - resolved,
errors,
cache_hits,
})
}
pub(crate) fn get_status(conn: &Connection, repo: &str) -> Result<SyncStats> {
let pending = count_pending_refs(conn, repo)?;
let resolved = count_resolved_refs(conn, repo)?;
let errors = count_failed_refs(conn, repo)?;
Ok(SyncStats {
discovered: 0,
resolved,
pending,
errors,
cache_hits: 0,
})
}
pub(crate) fn drain_forge(
conn: &Connection,
reader: &dyn ForgeReader,
repo: &str,
) -> Result<SyncStats> {
let mut total = SyncStats::default();
let mut batch_num = 0;
loop {
batch_num += 1;
let stats = sync_forge(conn, reader, repo)?;
total.discovered += stats.discovered;
total.resolved += stats.resolved;
total.errors += stats.errors;
total.cache_hits += stats.cache_hits;
total.pending = stats.pending;
if stats.pending == 0 {
break;
}
if stats.resolved == 0 && stats.errors == 0 {
break;
}
println!(
" Batch {} complete. {} remaining...",
batch_num, stats.pending
);
}
Ok(total)
}
pub(crate) fn sync_with_limit(
conn: &Connection,
reader: &dyn ForgeReader,
repo: &str,
limit: usize,
) -> Result<SyncStats> {
let mut total = SyncStats::default();
let mut resolved_count = 0;
total.discovered = discover_refs(conn, repo)?;
while resolved_count < limit {
let batch_limit = std::cmp::min(BATCH_SIZE, limit - resolved_count);
let pending_refs = get_pending_refs(conn, repo, batch_limit)?;
if pending_refs.is_empty() {
break;
}
for ref_num in &pending_refs {
sleep(DELAY_BETWEEN_REQUESTS);
match resolve_ref(conn, reader, repo, *ref_num) {
Ok(was_cached) => {
total.resolved += 1;
resolved_count += 1;
if was_cached {
total.cache_hits += 1;
}
}
Err(e) => {
total.errors += 1;
eprintln!(" ⚠️ #{}: {}", ref_num, e);
}
}
}
}
total.pending = count_pending_refs(conn, repo)?;
Ok(total)
}
#[cfg(unix)]
pub(crate) fn start_background_sync(
db_path: &std::path::Path,
repo: &str,
detected: &crate::forge::Forge,
) -> Result<u32> {
use std::os::unix::io::AsRawFd;
let _guard = can_start_sync(repo)?;
let log_path = log_file_path(repo);
let pid_path = pid_file_path(repo);
if let Some(parent) = log_path.parent() {
fs::create_dir_all(parent)?;
}
match unsafe { libc::fork() } {
-1 => bail!("Fork failed: {}", std::io::Error::last_os_error()),
0 => {
unsafe { libc::setsid() };
let log_file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.expect("Failed to open log file");
let log_fd = log_file.as_raw_fd();
unsafe {
libc::dup2(log_fd, libc::STDOUT_FILENO);
libc::dup2(log_fd, libc::STDERR_FILENO);
}
if let Some(parent) = pid_path.parent() {
let _ = fs::create_dir_all(parent);
}
let _ = fs::write(&pid_path, std::process::id().to_string());
println!(
"[{}] Background sync started for {}",
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
repo
);
let result = (|| -> Result<SyncStats> {
let conn = rusqlite::Connection::open(db_path)?;
let reader = crate::forge::reader(detected);
drain_forge(&conn, reader.as_ref(), repo)
})();
match &result {
Ok(stats) => {
println!(
"[{}] Sync complete: {} resolved, {} errors, {} pending",
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
stats.resolved,
stats.errors,
stats.pending
);
}
Err(e) => {
eprintln!(
"[{}] Sync failed: {}",
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
e
);
}
}
let _ = fs::remove_file(&pid_path);
std::process::exit(if result.is_ok() { 0 } else { 1 });
}
child_pid => {
Ok(child_pid as u32)
}
}
}
#[cfg(not(unix))]
pub(crate) fn start_background_sync(
_db_path: &std::path::Path,
_repo: &str,
_detected: &crate::forge::Forge,
) -> Result<u32> {
bail!("Background sync not supported on this platform. Use --limit instead.")
}
fn discover_refs(conn: &Connection, repo: &str) -> Result<usize> {
let count = conn.execute(
r#"
INSERT OR IGNORE INTO forge_refs (repo, ref_number, discovered, source)
SELECT
?1 as repo,
CAST(json_extract(data, '$.parsed.pr_ref') AS INTEGER) as ref_number,
datetime('now') as discovered,
json_extract(data, '$.hash') as source
FROM eventlog
WHERE event_type = 'git.commit'
AND json_extract(data, '$.parsed.pr_ref') IS NOT NULL
"#,
rusqlite::params![repo],
)?;
Ok(count)
}
fn get_pending_refs(conn: &Connection, repo: &str, limit: usize) -> Result<Vec<i64>> {
let mut stmt = conn.prepare(
r#"
SELECT ref_number
FROM forge_refs
WHERE repo = ?1 AND resolved IS NULL
ORDER BY discovered DESC
LIMIT ?2
"#,
)?;
let refs: Vec<i64> = stmt
.query_map(rusqlite::params![repo, limit], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
Ok(refs)
}
fn count_pending_refs(conn: &Connection, repo: &str) -> Result<usize> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM forge_refs WHERE repo = ?1 AND resolved IS NULL",
rusqlite::params![repo],
|row| row.get(0),
)?;
Ok(count as usize)
}
fn count_resolved_refs(conn: &Connection, repo: &str) -> Result<usize> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM forge_refs WHERE repo = ?1 AND resolved IS NOT NULL AND error IS NULL",
rusqlite::params![repo],
|row| row.get(0),
)?;
Ok(count as usize)
}
fn count_failed_refs(conn: &Connection, repo: &str) -> Result<usize> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM forge_refs WHERE repo = ?1 AND error IS NOT NULL",
rusqlite::params![repo],
|row| row.get(0),
)?;
Ok(count as usize)
}
fn resolve_ref(
conn: &Connection,
reader: &dyn ForgeReader,
repo: &str,
ref_num: i64,
) -> Result<bool> {
if is_known_issue(conn, ref_num)? {
mark_resolved(conn, repo, ref_num, "issue")?;
return Ok(true); }
if is_known_pr(conn, ref_num)? {
mark_resolved(conn, repo, ref_num, "pr")?;
return Ok(true); }
match reader.get_pull_request(ref_num) {
Ok(pr) => {
insert_pr(conn, &pr)?;
mark_resolved(conn, repo, ref_num, "pr")?;
return Ok(false);
}
Err(_) => {
}
}
match reader.get_issue(ref_num) {
Ok(issue) => {
insert_issue(conn, &issue)?;
mark_resolved(conn, repo, ref_num, "issue")?;
Ok(false)
}
Err(e) => {
mark_failed(conn, repo, ref_num, &e.to_string())?;
anyhow::bail!("#{} is neither PR nor issue: {}", ref_num, e)
}
}
}
fn is_known_issue(conn: &Connection, ref_num: i64) -> Result<bool> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM forge_issues WHERE number = ?1",
rusqlite::params![ref_num],
|row| row.get(0),
)?;
Ok(count > 0)
}
fn is_known_pr(conn: &Connection, ref_num: i64) -> Result<bool> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM forge_prs WHERE number = ?1",
rusqlite::params![ref_num],
|row| row.get(0),
)?;
Ok(count > 0)
}
fn mark_resolved(conn: &Connection, repo: &str, ref_num: i64, kind: &str) -> Result<()> {
conn.execute(
r#"
UPDATE forge_refs
SET resolved = datetime('now'), ref_kind = ?3, error = NULL
WHERE repo = ?1 AND ref_number = ?2
"#,
rusqlite::params![repo, ref_num, kind],
)?;
Ok(())
}
fn mark_failed(conn: &Connection, repo: &str, ref_num: i64, error: &str) -> Result<()> {
conn.execute(
r#"
UPDATE forge_refs
SET error = ?3
WHERE repo = ?1 AND ref_number = ?2
"#,
rusqlite::params![repo, ref_num, error],
)?;
Ok(())
}
fn insert_pr(conn: &Connection, pr: &PullRequest) -> Result<()> {
let labels_json = serde_json::to_string(&pr.labels)?;
let linked_json = serde_json::to_string(&pr.linked_issues)?;
let state_str = match pr.state {
PrState::Open => "open",
PrState::Merged => "merged",
PrState::Closed => "closed",
};
conn.execute(
r#"
INSERT OR REPLACE INTO forge_prs
(number, title, body, state, labels, author, created_at, merged_at, url, linked_issues, approvals)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
"#,
rusqlite::params![
pr.number,
&pr.title,
&pr.body,
state_str,
&labels_json,
&pr.author,
&pr.created_at,
&pr.merged_at,
&pr.url,
&linked_json,
pr.approvals,
],
)?;
Ok(())
}
fn insert_issue(conn: &Connection, issue: &Issue) -> Result<()> {
let labels_json = serde_json::to_string(&issue.labels)?;
let state_str = match issue.state {
IssueState::Open => "open",
IssueState::Closed => "closed",
};
conn.execute(
r#"
INSERT OR REPLACE INTO forge_issues
(number, title, body, state, labels, author, created_at, updated_at, url)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
"#,
rusqlite::params![
issue.number,
&issue.title,
&issue.body,
state_str,
&labels_json,
&issue.author,
&issue.created_at,
&issue.updated_at,
&issue.url,
],
)?;
Ok(())
}