use crate::cli::Cli;
use crate::context::GriteContext;
use crate::output::{output_success, print_human};
use libgrite_core::types::ids::ActorId;
use libgrite_core::{lock::LockCheckResult, GriteError};
use libgrite_git::WalManager;
use serde::Serialize;
fn check_push_lock(cli: &Cli, ctx: &GriteContext) -> Result<(), GriteError> {
match ctx.check_lock("repo:global")? {
LockCheckResult::Clear => Ok(()),
LockCheckResult::Warning(conflicts) => {
if !cli.quiet {
for lock in &conflicts {
eprintln!(
"Warning: {} is locked by {} (expires in {}s)",
lock.resource,
lock.owner,
lock.time_remaining_ms() / 1000
);
}
}
Ok(())
}
LockCheckResult::Blocked(_) => Err(GriteError::Conflict(
"Repository is locked by another process".to_string(),
)),
}
}
#[derive(Serialize)]
struct SyncOutput {
pulled: bool,
pushed: bool,
pull_events: usize,
pull_wal_head: Option<String>,
push_success: bool,
push_rebased: bool,
push_events_rebased: usize,
message: String,
}
#[derive(Serialize)]
struct PullOutput {
success: bool,
events: usize,
wal_head: Option<String>,
message: String,
}
#[derive(Serialize)]
struct PushOutput {
success: bool,
rebased: bool,
events_rebased: usize,
backfilled: usize,
message: String,
}
pub fn run(cli: &Cli, remote: String, pull_only: bool, push_only: bool) -> Result<(), GriteError> {
let ctx = GriteContext::resolve(cli)?;
let sync_mgr = ctx.open_sync()?;
let actor_id: ActorId = hex::decode(&ctx.actor_id)
.map_err(|e| GriteError::Internal(format!("Invalid actor ID: {}", e)))?
.try_into()
.map_err(|_| GriteError::Internal("Actor ID must be 16 bytes".to_string()))?;
let do_pull = !push_only;
let do_push = !pull_only;
if do_push {
check_push_lock(cli, &ctx)?;
if let Some(count) = backfill_wal_if_needed(&ctx, &actor_id)? {
print_human(
cli,
&format!("Backfilled WAL with {} event(s) from local store", count),
);
}
}
if do_pull && !do_push {
let result = sync_mgr.pull(&remote)?;
if result.events_pulled > 0 {
print_human(
cli,
&format!("Pulled {} events from {}", result.events_pulled, remote),
);
} else {
print_human(cli, &format!("Already up to date with {}", remote));
}
output_success(
cli,
PullOutput {
success: result.success,
events: result.events_pulled,
wal_head: result.new_wal_head.map(|oid| oid.to_string()),
message: result.message,
},
);
} else if do_push && !do_pull {
let result = sync_mgr.push_with_rebase(&remote, &actor_id)?;
if result.rebased {
print_human(
cli,
&format!(
"Conflict resolved: rebased {} local events on top of remote",
result.events_rebased
),
);
}
if result.success {
print_human(cli, &format!("Pushed to {}", remote));
} else {
print_human(cli, &format!("Push failed: {}", result.message));
}
output_success(
cli,
PushOutput {
success: result.success,
rebased: result.rebased,
events_rebased: result.events_rebased,
backfilled: 0,
message: result.message,
},
);
} else {
let (pull_result, push_result) = sync_mgr.sync_with_rebase(&remote, &actor_id)?;
if pull_result.events_pulled > 0 {
print_human(
cli,
&format!(
"Pulled {} events from {}",
pull_result.events_pulled, remote
),
);
}
if push_result.rebased {
print_human(
cli,
&format!(
"Conflict resolved: rebased {} local events on top of remote",
push_result.events_rebased
),
);
}
if push_result.success {
print_human(cli, &format!("Pushed to {}", remote));
} else {
print_human(cli, &format!("Push failed: {}", push_result.message));
}
output_success(
cli,
SyncOutput {
pulled: true,
pushed: true,
pull_events: pull_result.events_pulled,
pull_wal_head: pull_result.new_wal_head.map(|oid| oid.to_string()),
push_success: push_result.success,
push_rebased: push_result.rebased,
push_events_rebased: push_result.events_rebased,
message: format!("{} / {}", pull_result.message, push_result.message),
},
);
}
Ok(())
}
fn backfill_wal_if_needed(
ctx: &GriteContext,
actor_id: &ActorId,
) -> Result<Option<usize>, GriteError> {
let git_dir = ctx.repo_root().join(".git");
let wal = WalManager::open(&git_dir)?;
if wal.head()?.is_some() {
return Ok(None);
}
let store = libgrite_core::GriteStore::open(&ctx.sled_path())?;
let events = store.get_all_events()?;
if events.is_empty() {
return Ok(None);
}
let mut sorted = events;
sorted.sort_by_key(|e| e.ts_unix_ms);
wal.append(actor_id, &sorted)?;
Ok(Some(sorted.len()))
}