use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use camino::Utf8PathBuf;
use serde::Serialize;
use void_core::pipeline::{pull_repo, CloneMode, PullOptions};
use void_core::transport::IpfsBackend;
use void_core::{cid, refs};
use crate::context::{open_repo, void_err_to_cli};
use crate::observer::ProgressObserver;
use crate::output::{run_command, CliError, CliOptions};
const DEFAULT_KUBO_URL: &str = "http://127.0.0.1:5001";
const DEFAULT_TIMEOUT_MS: u64 = 30000;
#[derive(Debug)]
pub struct PullArgs {
pub commit: Option<String>,
pub backend: Option<String>,
pub kubo_url: String,
pub gateway_url: Option<String>,
pub timeout_ms: u64,
pub mode: String,
pub remote: Option<String>,
}
impl Default for PullArgs {
fn default() -> Self {
Self {
commit: None,
backend: None,
kubo_url: DEFAULT_KUBO_URL.to_string(),
gateway_url: None,
timeout_ms: DEFAULT_TIMEOUT_MS,
mode: "depth1".to_string(),
remote: None,
}
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PullOutput {
#[serde(skip_serializing_if = "Option::is_none")]
pub source: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub up_to_date: Option<bool>,
pub commit: String,
pub metadata: String,
pub mode: String,
pub shards_fetched: usize,
pub shards_total: usize,
}
fn parse_mode(mode: &str) -> Result<CloneMode, CliError> {
match mode.to_lowercase().as_str() {
"depth1" => Ok(CloneMode::Depth1),
"full" => Ok(CloneMode::Full),
"lazy" => Ok(CloneMode::Lazy),
"virtual" => Ok(CloneMode::Virtual),
_ => Err(CliError::invalid_args(format!(
"invalid mode '{}': expected 'depth1', 'full', or 'lazy'",
mode
))),
}
}
fn mode_to_string(mode: CloneMode) -> String {
match mode {
CloneMode::Depth1 => "depth1".to_string(),
CloneMode::Full => "full".to_string(),
CloneMode::Lazy => "lazy".to_string(),
CloneMode::Virtual => "virtual".to_string(),
}
}
fn build_backend(args: &PullArgs) -> Result<IpfsBackend, CliError> {
match args.backend.as_deref() {
Some("gateway") => {
let gateway_url = args.gateway_url.as_ref().ok_or_else(|| {
CliError::invalid_args("--gateway URL is required when backend is 'gateway'")
})?;
Ok(IpfsBackend::Gateway {
base: gateway_url.clone(),
})
}
Some("kubo") | None => Ok(IpfsBackend::Kubo {
api: args.kubo_url.clone(),
}),
Some(other) => Err(CliError::invalid_args(format!(
"invalid backend '{}': expected 'kubo' or 'gateway'",
other
))),
}
}
fn resolve_pull_source(
args: &PullArgs,
void_dir: &Path,
) -> Result<(String, Option<String>), CliError> {
match &args.commit {
Some(source) if !source.is_empty() => {
if source.starts_with("bafy") || source.starts_with("Qm") {
return Ok((source.clone(), None));
}
Err(CliError::invalid_args(format!(
"invalid pull source '{}': expected a commit CID (bafy...)",
source
)))
}
_ => {
if let Ok(config) = void_core::config::load(void_dir) {
if let Some(ref repo_id) = config.repo_id {
if let Ok(Some(record)) = crate::registry::load_record(repo_id) {
let branch =
get_current_branch(void_dir).unwrap_or_else(|| "trunk".to_string());
if let Some(cid) = record.head.get(&branch) {
return Ok((cid.clone(), Some(record.name.clone())));
}
}
}
}
if let Some(ref remote_name) = args.remote {
let resolved = crate::remotes::resolve_remote(remote_name, None)
.map_err(|e| CliError::internal(format!("remote: {e}")))?;
let repo_name = void_core::config::load(void_dir)
.ok()
.and_then(|c| c.repo_name.clone())
.unwrap_or_else(|| "unnamed".to_string());
let branch = get_current_branch(void_dir).unwrap_or_else(|| "trunk".to_string());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| CliError::internal(format!("runtime: {e}")))?;
let cid = rt.block_on(async {
let pid: libp2p::PeerId = resolved.peer_id.parse()
.map_err(|e| format!("invalid peer_id: {e}"))?;
let addr: libp2p::Multiaddr = resolved.addr.parse()
.map_err(|e| format!("invalid addr: {e}"))?;
let client = void_daemon::DaemonClient::connect(addr, pid).await
.map_err(|e| format!("connect: {e}"))?;
let sk = crate::context::load_signing_key()
.map_err(|e| format!("identity: {e}"))?;
let dk = ed25519_dalek::SigningKey::from_bytes(sk.as_bytes());
client.authenticate(&dk).await.map_err(|e| format!("auth: {e}"))?;
client.get_head(&repo_name, &branch).await.map_err(|e| format!("get_head: {e}"))
}).map_err(|e| CliError::internal(format!("remote pull: {e}")))?;
return Ok((cid, Some(format!("{}@{}", repo_name, remote_name))));
}
if let Ok(config) = void_core::config::load(void_dir) {
for (name, remote) in &config.remote {
if remote.peer_id.is_some() || remote.peer_multiaddr.is_some() {
if let Ok(resolved) = crate::remotes::resolve_remote(name, Some(&config)) {
let repo_name = config.repo_name.clone().unwrap_or_else(|| "unnamed".to_string());
let branch = get_current_branch(void_dir).unwrap_or_else(|| "trunk".to_string());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| CliError::internal(format!("runtime: {e}")))?;
if let Ok(cid) = rt.block_on(async {
let pid: libp2p::PeerId = resolved.peer_id.parse()
.map_err(|e| format!("peer_id: {e}"))?;
let addr: libp2p::Multiaddr = resolved.addr.parse()
.map_err(|e| format!("addr: {e}"))?;
let client = void_daemon::DaemonClient::connect(addr, pid).await
.map_err(|e| format!("connect: {e}"))?;
let sk = crate::context::load_signing_key()
.map_err(|e| format!("identity: {e}"))?;
let dk = ed25519_dalek::SigningKey::from_bytes(sk.as_bytes());
client.authenticate(&dk).await.map_err(|e| format!("auth: {e}"))?;
client.get_head(&repo_name, &branch).await.map_err(|e| format!("{e}"))
}) {
return Ok((cid, Some(format!("{}@{}", repo_name, name))));
}
}
}
}
}
Err(CliError::invalid_args(
"commit CID required, or configure a remote with 'void remote add'",
))
}
}
}
fn get_current_branch(void_dir: &Path) -> Option<String> {
let head_content = std::fs::read_to_string(void_dir.join("HEAD")).ok()?;
let trimmed = head_content.trim();
if let Some(ref_path) = trimmed.strip_prefix("ref: refs/heads/") {
Some(ref_path.to_string())
} else {
None
}
}
fn check_up_to_date(void_dir: &Path, commit_cid: &str) -> Option<bool> {
let void_dir_utf8 = Utf8PathBuf::try_from(void_dir.to_path_buf()).ok()?;
let head_bytes = refs::resolve_head(&void_dir_utf8).ok()??;
let head_cid = cid::from_bytes(head_bytes.as_bytes()).ok()?;
let head_str = head_cid.to_string();
Some(head_str == commit_cid)
}
pub fn run(cwd: &Path, args: PullArgs, opts: &CliOptions) -> Result<(), CliError> {
run_command("pull", opts, |ctx| {
let repo = open_repo(cwd)?;
let void_dir = repo.void_dir();
let (commit_cid, source_label) = resolve_pull_source(&args, void_dir.as_std_path())?;
let has_source = source_label.is_some();
if has_source {
if let Some(true) = check_up_to_date(void_dir.as_std_path(), &commit_cid) {
if !ctx.use_json() {
ctx.info("Already up to date.");
}
return Ok(PullOutput {
source: source_label,
up_to_date: Some(true),
commit: commit_cid,
metadata: String::new(),
mode: args.mode.clone(),
shards_fetched: 0,
shards_total: 0,
});
}
}
let mode = parse_mode(&args.mode)?;
let (backend, daemon_remote, _daemon_rt) = match args.backend.as_deref() {
Some("kubo") => (build_backend(&args)?, None, None),
Some("gateway") => (build_backend(&args)?, None, None),
_ => {
ctx.progress("Starting embedded node...");
let (remote, rt) =
crate::daemon::start_daemon(repo.void_dir().as_std_path())
.map_err(|e| crate::output::CliError::internal(e))?;
(
IpfsBackend::Kubo { api: "unused".into() },
Some(remote),
Some(rt),
)
}
};
let timeout = Duration::from_millis(args.timeout_ms);
ctx.progress(format!(
"Pulling commit {}...",
&commit_cid[..12.min(commit_cid.len())]
));
let progress_observer = if opts.is_human_mode() {
ProgressObserver::new("Pulling...")
} else {
ProgressObserver::new_hidden()
};
let observer: Arc<dyn void_core::support::events::VoidObserver> =
Arc::new(progress_observer);
let pull_opts = PullOptions {
ctx: repo.context().clone(),
commit_cid,
backend,
timeout,
mode,
observer: Some(observer),
remote: daemon_remote,
};
let result = pull_repo(pull_opts).map_err(void_err_to_cli)?;
if result.repo_manifest_cid.is_some() {
backfill_config_identity(void_dir.as_std_path());
}
if !ctx.use_json() {
let short_cid = if result.commit_cid.len() > 12 {
&result.commit_cid[..12]
} else {
&result.commit_cid
};
if let Some(ref source) = source_label {
ctx.info(format!("Pulled commit {}... from {}", short_cid, source));
} else {
ctx.info(format!("Pulled commit {}...", short_cid));
}
ctx.info(format!(
"Fetched {}/{} shards (mode: {})",
result.shards_fetched,
result.shards_total,
mode_to_string(result.mode)
));
}
Ok(PullOutput {
up_to_date: if has_source { Some(false) } else { None },
source: source_label,
commit: result.commit_cid,
metadata: result.metadata_cid,
mode: mode_to_string(result.mode),
shards_fetched: result.shards_fetched,
shards_total: result.shards_total,
})
})
}
fn backfill_config_identity(void_dir: &Path) {
let manifest = match void_core::collab::manifest::load_manifest(void_dir) {
Ok(Some(m)) => m,
_ => return,
};
let manifest_repo_id = match manifest.repo_id {
Some(ref id) => id.clone(),
None => return,
};
let mut config = match void_core::config::load(void_dir) {
Ok(c) => c,
Err(_) => return,
};
if config.repo_id.is_some() {
return;
}
config.repo_id = Some(manifest_repo_id);
config.repo_name = manifest.repo_name.clone();
let _ = void_core::config::save(void_dir, &config);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::output::CliOptions;
fn default_opts() -> CliOptions {
CliOptions {
human: true,
..Default::default()
}
}
#[test]
fn test_parse_mode_full() {
assert_eq!(parse_mode("full").unwrap(), CloneMode::Full);
assert_eq!(parse_mode("Full").unwrap(), CloneMode::Full);
assert_eq!(parse_mode("FULL").unwrap(), CloneMode::Full);
}
#[test]
fn test_parse_mode_depth1() {
assert_eq!(parse_mode("depth1").unwrap(), CloneMode::Depth1);
assert_eq!(parse_mode("Depth1").unwrap(), CloneMode::Depth1);
}
#[test]
fn test_parse_mode_lazy() {
assert_eq!(parse_mode("lazy").unwrap(), CloneMode::Lazy);
assert_eq!(parse_mode("LAZY").unwrap(), CloneMode::Lazy);
}
#[test]
fn test_parse_mode_invalid() {
assert!(parse_mode("invalid").is_err());
assert!(parse_mode("").is_err());
}
#[test]
fn test_mode_to_string() {
assert_eq!(mode_to_string(CloneMode::Depth1), "depth1");
assert_eq!(mode_to_string(CloneMode::Full), "full");
assert_eq!(mode_to_string(CloneMode::Lazy), "lazy");
}
#[test]
fn test_build_backend_kubo_default() {
let args = PullArgs {
commit: Some("bafy...".to_string()),
backend: None,
kubo_url: "http://localhost:5001".to_string(),
gateway_url: None,
timeout_ms: 30000,
mode: "depth1".to_string(),
remote: None,
};
let backend = build_backend(&args).unwrap();
match backend {
IpfsBackend::Kubo { api } => assert_eq!(api, "http://localhost:5001"),
_ => panic!("expected Kubo backend"),
}
}
#[test]
fn test_build_backend_kubo_explicit() {
let args = PullArgs {
commit: Some("bafy...".to_string()),
backend: Some("kubo".to_string()),
kubo_url: "http://127.0.0.1:5001".to_string(),
gateway_url: None,
timeout_ms: 30000,
mode: "depth1".to_string(),
remote: None,
};
let backend = build_backend(&args).unwrap();
match backend {
IpfsBackend::Kubo { api } => assert_eq!(api, "http://127.0.0.1:5001"),
_ => panic!("expected Kubo backend"),
}
}
#[test]
fn test_build_backend_gateway() {
let args = PullArgs {
commit: Some("bafy...".to_string()),
backend: Some("gateway".to_string()),
kubo_url: DEFAULT_KUBO_URL.to_string(),
gateway_url: Some("https://ipfs.io".to_string()),
timeout_ms: 30000,
mode: "depth1".to_string(),
remote: None,
};
let backend = build_backend(&args).unwrap();
match backend {
IpfsBackend::Gateway { base } => assert_eq!(base, "https://ipfs.io"),
_ => panic!("expected Gateway backend"),
}
}
#[test]
fn test_build_backend_gateway_missing_url() {
let args = PullArgs {
commit: Some("bafy...".to_string()),
backend: Some("gateway".to_string()),
kubo_url: DEFAULT_KUBO_URL.to_string(),
gateway_url: None,
timeout_ms: 30000,
mode: "depth1".to_string(),
remote: None,
};
assert!(build_backend(&args).is_err());
}
#[test]
fn test_build_backend_invalid() {
let args = PullArgs {
commit: Some("bafy...".to_string()),
backend: Some("invalid".to_string()),
kubo_url: DEFAULT_KUBO_URL.to_string(),
gateway_url: None,
timeout_ms: 30000,
mode: "depth1".to_string(),
remote: None,
};
assert!(build_backend(&args).is_err());
}
#[test]
fn test_pull_args_default() {
let args = PullArgs::default();
assert!(args.commit.is_none());
assert!(args.backend.is_none());
assert_eq!(args.kubo_url, DEFAULT_KUBO_URL);
assert!(args.gateway_url.is_none());
assert_eq!(args.timeout_ms, DEFAULT_TIMEOUT_MS);
assert_eq!(args.mode, "depth1");
}
#[test]
fn test_pull_output_serialization() {
let output = PullOutput {
source: None,
up_to_date: None,
commit: "bafyabc123".to_string(),
metadata: "bafymeta456".to_string(),
mode: "depth1".to_string(),
shards_fetched: 10,
shards_total: 15,
};
let json = serde_json::to_string(&output).unwrap();
assert!(!json.contains("\"source\""));
assert!(!json.contains("\"upToDate\""));
assert!(json.contains("\"commit\":\"bafyabc123\""));
assert!(json.contains("\"metadata\":\"bafymeta456\""));
assert!(json.contains("\"shardsFetched\":10"));
assert!(json.contains("\"shardsTotal\":15"));
assert!(json.contains("\"mode\":\"depth1\""));
}
#[test]
fn test_pull_output_serialization_with_source() {
let output = PullOutput {
source: Some("alice/project/main".to_string()),
up_to_date: Some(false),
commit: "bafyabc123".to_string(),
metadata: "bafymeta456".to_string(),
mode: "depth1".to_string(),
shards_fetched: 10,
shards_total: 15,
};
let json = serde_json::to_string(&output).unwrap();
assert!(json.contains("\"source\":\"alice/project/main\""));
assert!(json.contains("\"upToDate\":false"));
assert!(json.contains("\"commit\":\"bafyabc123\""));
assert!(json.contains("\"metadata\":\"bafymeta456\""));
}
#[test]
fn test_run_missing_commit() {
use std::fs;
use tempfile::tempdir;
let dir = tempdir().unwrap();
let void_dir = dir.path().join(".void");
fs::create_dir_all(&void_dir).unwrap();
let key = hex::decode("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef").unwrap();
let key: [u8; 32] = key.try_into().unwrap();
let home = tempfile::tempdir().unwrap();
let _guard = crate::context::setup_test_manifest(&void_dir, &key, home.path());
let args = PullArgs {
commit: None, ..Default::default()
};
let result = run(dir.path(), args, &default_opts());
assert!(result.is_err());
}
#[test]
fn test_run_not_initialized() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let args = PullArgs {
commit: Some("bafyabc123".to_string()),
..Default::default()
};
let result = run(dir.path(), args, &default_opts());
assert!(result.is_err());
}
}