use std::path::Path;
use std::process::Command;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use camino::Utf8PathBuf;
use serde::Serialize;
use void_core::config;
use void_core::pipeline::{export_commit_to_car, push_repo, ExportCarOptions, PushOptions};
use void_core::transport::IpfsBackend;
use void_core::refs;
use crate::context::{open_repo, void_err_to_cli};
use crate::observer::ProgressObserver;
use crate::output::{run_command, CliError, CliOptions};
pub const DEFAULT_KUBO_URL: &str = "http://127.0.0.1:5001";
pub const DEFAULT_TIMEOUT_MS: u64 = 30000;
#[derive(Debug)]
pub struct PushArgs {
pub commit: Option<String>,
pub backend: String,
pub kubo_url: String,
pub gateway: Option<String>,
pub timeout_ms: u64,
pub pin: bool,
pub local: bool,
pub remote: Option<String>,
pub full: bool,
pub force: bool,
}
impl Default for PushArgs {
fn default() -> Self {
Self {
commit: None,
backend: "daemon".to_string(),
kubo_url: DEFAULT_KUBO_URL.to_string(),
gateway: None,
timeout_ms: DEFAULT_TIMEOUT_MS,
pin: true,
local: false,
remote: None,
full: false,
force: false,
}
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RemoteResult {
pub name: String,
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PushOutput {
pub commit: String,
pub objects_pushed: usize,
pub pinned: usize,
pub commits_pushed: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub remotes: Option<Vec<RemoteResult>>,
}
pub fn run(cwd: &Path, args: PushArgs, opts: &CliOptions) -> Result<(), CliError> {
run_command("push", opts, |ctx| {
let repo = open_repo(cwd)?;
let void_dir = repo.void_dir().to_owned();
ctx.progress("Connecting to IPFS...");
let (backend, daemon_remote, _daemon_rt) = match args.backend.as_str() {
"kubo" => (
IpfsBackend::Kubo {
api: args.kubo_url.clone(),
},
None,
None,
),
"gateway" => {
let gateway_url = args.gateway.as_ref().ok_or_else(|| {
crate::output::CliError::invalid_args(
"--gateway URL is required when using gateway backend",
)
})?;
(
IpfsBackend::Gateway {
base: gateway_url.clone(),
},
None,
None,
)
}
_ => {
ctx.progress("Starting embedded node...");
let (remote, rt) = crate::daemon::start_daemon(repo.void_dir().as_std_path())
.map_err(|e| crate::output::CliError::internal(e))?;
(
IpfsBackend::Kubo {
api: "unused".to_string(),
},
Some(remote),
Some(rt),
)
}
};
let progress_observer = if opts.is_human_mode() {
ProgressObserver::new("Pushing...")
} else {
ProgressObserver::new_hidden()
};
let observer: Arc<dyn void_core::support::events::VoidObserver> =
Arc::new(progress_observer);
let push_opts = PushOptions {
ctx: repo.context().clone(),
commit_cid: args.commit.clone(),
backend,
timeout: Duration::from_millis(args.timeout_ms),
pin: args.pin,
backend_name: "local".to_string(),
full: args.full,
force: args.force,
observer: Some(observer),
remote: daemon_remote,
};
ctx.progress("Pushing...");
let result = push_repo(push_opts).map_err(void_err_to_cli)?;
if !ctx.use_json() {
let short_cid = if result.commit_cid.len() > 12 {
&result.commit_cid[..12]
} else {
&result.commit_cid
};
ctx.info(format!(
"Pushed {} objects for commit {}...",
result.objects_pushed, short_cid
));
if result.pinned > 0 {
ctx.info(format!("Pinned {} objects", result.pinned));
}
}
let remote_results = if args.local {
None
} else {
replicate_to_remotes(
ctx,
repo.context(),
&result.commit_cid,
args.commit.as_deref(),
args.remote.as_deref(),
)?
};
{
let cfg = config::load(void_dir.as_std_path()).ok();
if let Some(ref repo_id) = cfg.as_ref().and_then(|c| c.repo_id.as_ref()) {
let branch = get_current_branch(void_dir.as_std_path()).unwrap_or_else(|| "trunk".to_string());
if let Err(e) = crate::registry::update_head(repo_id, &branch, &result.commit_cid) {
ctx.warn(format!("Failed to update registry: {}", e));
}
}
}
ctx.progress("Push complete.");
Ok(PushOutput {
commit: result.commit_cid,
objects_pushed: result.objects_pushed,
pinned: result.pinned,
commits_pushed: result.commits_pushed,
remotes: remote_results,
})
})
}
fn get_current_branch(void_dir: &Path) -> Option<String> {
let void_dir_utf8 = Utf8PathBuf::try_from(void_dir.to_path_buf()).ok()?;
match refs::read_head(&void_dir_utf8).ok()? {
Some(refs::HeadRef::Symbolic(branch)) => Some(branch),
_ => None,
}
}
fn expand_tilde(path: &str) -> String {
if path.starts_with('~') {
if let Some(home) = dirs::home_dir() {
return path.replacen('~', &home.to_string_lossy(), 1);
}
}
path.to_string()
}
fn pin_to_remote(
ctx: &mut crate::output::CommandContext,
name: &str,
remote: &config::RemoteConfig,
commit_cid: &str,
car_path: &Path,
) -> RemoteResult {
let ssh_host = match remote.host.as_deref() {
Some(h) => h,
None => {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("Remote '{}' has no host configured", name)),
};
}
};
let ssh_user = remote.user.as_deref().unwrap_or("root");
let ssh_key = remote
.key_path
.as_deref()
.map(expand_tilde)
.unwrap_or_else(|| {
dirs::home_dir()
.map(|h| h.join(".ssh/id_rsa").to_string_lossy().to_string())
.unwrap_or_else(|| "~/.ssh/id_rsa".to_string())
});
let host_display = format!("{}@{}", ssh_user, ssh_host);
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let remote_car_path = format!("/tmp/void-{}.car", ts);
let scp_dest = format!("{}@{}:{}", ssh_user, ssh_host, remote_car_path);
ctx.progress(format!("Copying CAR to {}...", host_display));
let scp_output = match Command::new("scp")
.args([
"-i",
&ssh_key,
"-o",
"StrictHostKeyChecking=accept-new",
&car_path.to_string_lossy(),
&scp_dest,
])
.output()
{
Ok(o) => o,
Err(e) => {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("failed to run scp: {}", e)),
};
}
};
if !scp_output.status.success() {
let stderr = String::from_utf8_lossy(&scp_output.stderr);
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("SCP failed: {}", stderr.trim())),
};
}
ctx.progress(format!("Importing and pinning on {}...", host_display));
let ssh_cmd = format!(
"sudo IPFS_PATH=/home/ipfs/.ipfs ipfs dag import {} && \
sudo IPFS_PATH=/home/ipfs/.ipfs ipfs pin add {} && \
rm {}",
remote_car_path, commit_cid, remote_car_path
);
let ssh_output = match Command::new("ssh")
.args([
"-i",
&ssh_key,
"-o",
"StrictHostKeyChecking=accept-new",
&format!("{}@{}", ssh_user, ssh_host),
&ssh_cmd,
])
.output()
{
Ok(o) => o,
Err(e) => {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("failed to run ssh: {}", e)),
};
}
};
if !ssh_output.status.success() {
let stderr = String::from_utf8_lossy(&ssh_output.stderr);
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("SSH failed: {}", stderr.trim())),
};
}
ctx.info(format!("Pinned on {}", host_display));
RemoteResult {
name: name.to_string(),
success: true,
error: None,
}
}
fn pin_to_p2p_remote(
ctx: &mut crate::output::CommandContext,
name: &str,
remote: &config::RemoteConfig,
void_ctx: &void_core::VoidContext,
commit_cid: &str,
) -> RemoteResult {
let (peer_id_str, addr_str) = if let (Some(pid), Some(a)) = (remote.peer_id.as_deref(), remote.addrs.first()) {
(pid.to_string(), a.clone())
} else if let Some(ref multiaddr) = remote.peer_multiaddr {
match multiaddr.rfind("/p2p/") {
Some(idx) => (multiaddr[idx + 5..].to_string(), multiaddr[..idx].to_string()),
None => {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("Remote '{}': peerMultiaddr missing /p2p/ component", name)),
};
}
}
} else {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("Remote '{}' has no peerId or peerMultiaddr configured", name)),
};
};
let peer_id: libp2p::PeerId = match peer_id_str.as_str().parse() {
Ok(p) => p,
Err(e) => {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("invalid peerId: {e}")),
};
}
};
let addr: libp2p::Multiaddr = match addr_str.as_str().parse() {
Ok(a) => a,
Err(e) => {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("invalid multiaddr: {e}")),
};
}
};
ctx.info(format!("Syncing to remote '{}' ({})...", name, &peer_id_str[..12.min(peer_id_str.len())]));
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
return RemoteResult {
name: name.to_string(),
success: false,
error: Some(format!("runtime: {e}")),
};
}
};
let result = rt.block_on(async {
let client = void_daemon::DaemonClient::connect(addr, peer_id).await
.map_err(|e| format!("connect failed: {e}"))?;
let signing_key = crate::context::load_signing_key()
.map_err(|e| format!("load identity: {e}"))?;
let key_bytes = signing_key.as_bytes();
let dalek_key = ed25519_dalek::SigningKey::from_bytes(key_bytes);
client.authenticate(&dalek_key).await
.map_err(|e| format!("auth failed: {e}"))?;
ctx.progress(format!("Authenticated with {}. Syncing blocks...", name));
let commit_cid_obj = void_core::cid::parse(commit_cid)
.map_err(|e| format!("invalid CID: {e}"))?;
use void_core::pipeline::collect_local_objects;
let objects = collect_local_objects(void_ctx, &commit_cid_obj)
.map_err(|e| format!("collect objects: {e}"))?;
let mut synced = 0usize;
for (cid, data) in &objects {
client.put(cid.to_bytes(), data.clone()).await
.map_err(|e| format!("put failed: {e}"))?;
synced += 1;
}
let repo_name = void_ctx.repo.name.clone()
.unwrap_or_else(|| "unnamed".to_string());
let branch = {
let void_dir_std = void_ctx.paths.void_dir.as_std_path();
get_current_branch(void_dir_std).unwrap_or_else(|| "trunk".to_string())
};
client.set_head(&repo_name, &branch, commit_cid).await
.map_err(|e| format!("set_head failed: {e}"))?;
Ok::<usize, String>(synced)
});
match result {
Ok(synced) => {
ctx.info(format!("Synced {} blocks to {}", synced, name));
RemoteResult {
name: name.to_string(),
success: true,
error: None,
}
}
Err(e) => RemoteResult {
name: name.to_string(),
success: false,
error: Some(e),
},
}
}
fn replicate_to_remotes(
ctx: &mut crate::output::CommandContext,
void_ctx: &void_core::VoidContext,
commit_cid: &str,
commit_arg: Option<&str>,
specific_remote: Option<&str>,
) -> Result<Option<Vec<RemoteResult>>, CliError> {
let void_dir = &void_ctx.paths.void_dir;
let cfg = config::load(void_dir.as_std_path())
.map_err(|e| CliError::internal(format!("failed to load config: {e}")))?;
if cfg.remote.is_empty() {
return Ok(None);
}
let target_remotes: Vec<(&String, &config::RemoteConfig)> = if let Some(name) = specific_remote
{
match cfg.remote.get_key_value(name) {
Some(entry) => vec![entry],
None => {
return Err(CliError::not_found(format!("Remote '{}' not found", name)));
}
}
} else {
cfg.remote.iter().collect()
};
if target_remotes.is_empty() {
return Ok(None);
}
let (p2p_remotes, ssh_remotes): (Vec<_>, Vec<_>) = target_remotes
.iter()
.partition(|(_, r)| r.peer_id.is_some() || r.peer_multiaddr.is_some());
let names: Vec<&str> = target_remotes.iter().map(|(n, _)| n.as_str()).collect();
ctx.progress(format!("Replicating to: {}", names.join(", ")));
let mut results = Vec::with_capacity(target_remotes.len());
for (name, remote_cfg) in &p2p_remotes {
let r = pin_to_p2p_remote(ctx, name, remote_cfg, void_ctx, commit_cid);
results.push(r);
}
if !ssh_remotes.is_empty() {
ctx.progress("Exporting CAR for SSH remote replication...");
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let temp_car = std::env::temp_dir().join(format!("void-push-{}.car", ts));
let export_opts = ExportCarOptions {
ctx: void_ctx.clone(),
commit_cid: commit_arg.map(String::from),
};
match export_commit_to_car(export_opts, &temp_car) {
Ok(export_result) => {
ctx.progress(format!(
"Exported {} blocks ({} bytes)",
export_result.blocks_exported, export_result.car_size
));
for (name, remote_cfg) in &ssh_remotes {
let r = pin_to_remote(ctx, name, remote_cfg, commit_cid, &temp_car);
results.push(r);
}
let _ = std::fs::remove_file(&temp_car);
}
Err(e) => {
ctx.warn(format!("CAR export failed: {e} — skipping SSH remotes"));
}
}
}
let failures: Vec<&RemoteResult> = results.iter().filter(|r| !r.success).collect();
if !failures.is_empty() {
ctx.warn(format!("{} remote(s) failed", failures.len()));
}
Ok(Some(results))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::output::CliOptions;
use std::fs;
use tempfile::tempdir;
use void_core::crypto;
fn default_opts() -> CliOptions {
CliOptions {
human: true,
..Default::default()
}
}
fn setup_test_repo() -> (tempfile::TempDir, std::path::PathBuf, tempfile::TempDir, crate::context::VoidHomeGuard) {
let dir = tempdir().unwrap();
let void_dir = dir.path().join(".void");
fs::create_dir_all(void_dir.join("objects")).unwrap();
fs::create_dir_all(void_dir.join("refs/heads")).unwrap();
let key = crypto::generate_key();
let home = tempdir().unwrap();
let guard = crate::context::setup_test_manifest(&void_dir, &key, home.path());
let repo_secret = hex::encode(crypto::generate_key());
fs::write(
void_dir.join("config.json"),
format!(r#"{{"repoSecret": "{}"}}"#, repo_secret),
)
.unwrap();
(dir, void_dir, home, guard)
}
#[test]
fn test_push_args_default() {
let args = PushArgs::default();
assert_eq!(args.backend, "daemon");
assert_eq!(args.kubo_url, DEFAULT_KUBO_URL);
assert_eq!(args.timeout_ms, DEFAULT_TIMEOUT_MS);
assert!(args.pin); assert!(args.commit.is_none());
assert!(args.gateway.is_none());
assert!(!args.local); assert!(args.remote.is_none());
}
#[test]
fn test_push_output_serialization() {
let output = PushOutput {
commit: "bafytest123456789abcdef".to_string(),
objects_pushed: 42,
pinned: 42,
commits_pushed: 1,
remotes: None,
};
let json = serde_json::to_string(&output).unwrap();
assert!(json.contains("\"commit\":\"bafytest123456789abcdef\""));
assert!(json.contains("\"objectsPushed\":42"));
assert!(json.contains("\"pinned\":42"));
assert!(!json.contains("\"remotes\""));
}
#[test]
fn test_push_output_serialization_no_pin() {
let output = PushOutput {
commit: "bafytest123".to_string(),
objects_pushed: 10,
pinned: 0,
commits_pushed: 1,
remotes: None,
};
let json = serde_json::to_string(&output).unwrap();
assert!(json.contains("\"objectsPushed\":10"));
assert!(json.contains("\"pinned\":0"));
}
#[test]
fn test_push_output_serialization_with_remotes() {
let output = PushOutput {
commit: "bafytest123".to_string(),
objects_pushed: 10,
pinned: 10,
commits_pushed: 1,
remotes: Some(vec![
RemoteResult {
name: "origin".to_string(),
success: true,
error: None,
},
RemoteResult {
name: "backup".to_string(),
success: false,
error: Some("connection refused".to_string()),
},
]),
};
let json = serde_json::to_string(&output).unwrap();
assert!(json.contains("\"remotes\""));
assert!(json.contains("\"name\":\"origin\""));
assert!(json.contains("\"success\":true"));
assert!(json.contains("\"name\":\"backup\""));
assert!(json.contains("\"success\":false"));
assert!(json.contains("\"error\":\"connection refused\""));
}
#[test]
fn test_remote_result_skips_none_error() {
let result = RemoteResult {
name: "origin".to_string(),
success: true,
error: None,
};
let json = serde_json::to_string(&result).unwrap();
assert!(!json.contains("\"error\""));
}
#[test]
fn test_push_not_initialized() {
let dir = tempdir().unwrap();
let args = PushArgs::default();
let result = run(dir.path(), args, &default_opts());
assert!(result.is_err());
}
#[test]
fn test_push_no_head() {
let (dir, _void_dir, _home, _guard) = setup_test_repo();
let args = PushArgs::default();
let result = run(dir.path(), args, &default_opts());
assert!(result.is_err());
}
}