use super::cache::CacheSource;
use super::substitution::{SubstitutionOutcome, SubstitutionPlan, SubstitutionStep};
use crate::core::types::Machine;
use crate::transport;
use std::path::Path;
#[derive(Debug, Clone)]
pub struct CachePullResult {
pub store_hash: String,
pub store_path: String,
pub bytes_transferred: u64,
pub verified: bool,
}
pub fn pull_from_cache(
source: &CacheSource,
store_hash: &str,
store_dir: &Path,
machine: &Machine,
timeout_secs: Option<u64>,
) -> Result<CachePullResult, String> {
let hash_bare = store_hash.strip_prefix("blake3:").unwrap_or(store_hash);
let target = store_dir.join(hash_bare);
let staging = store_dir.join(format!(".staging-{hash_bare}"));
let pull_cmd = pull_command(source, store_hash, &staging);
let output = transport::exec_script_timeout(machine, &pull_cmd, timeout_secs)
.map_err(|e| format!("cache pull failed: {e}"))?;
if !output.success() {
let _ = std::fs::remove_dir_all(&staging);
return Err(format!(
"cache pull exit code {}: {}",
output.exit_code,
output.stderr.trim()
));
}
let verified = verify_pulled_content(&staging, store_hash);
if target.exists() {
let _ = std::fs::remove_dir_all(&staging);
} else {
std::fs::rename(&staging, &target).map_err(|e| {
let _ = std::fs::remove_dir_all(&staging);
format!("atomic move staging → store: {e}")
})?;
}
let bytes = super::gc_exec::dir_size(&target);
Ok(CachePullResult {
store_hash: store_hash.to_string(),
store_path: target.display().to_string(),
bytes_transferred: bytes,
verified,
})
}
pub fn push_to_cache(
source: &CacheSource,
store_hash: &str,
store_dir: &Path,
machine: &Machine,
timeout_secs: Option<u64>,
) -> Result<(), String> {
let push_cmd = push_command(source, store_hash, store_dir);
let output = transport::exec_script_timeout(machine, &push_cmd, timeout_secs)
.map_err(|e| format!("cache push failed: {e}"))?;
if !output.success() {
return Err(format!(
"cache push exit code {}: {}",
output.exit_code,
output.stderr.trim()
));
}
Ok(())
}
pub fn verify_remote_entry(
source: &CacheSource,
store_hash: &str,
machine: &Machine,
) -> Result<bool, String> {
let check_cmd = remote_check_command(source, store_hash);
let output = transport::exec_script_timeout(machine, &check_cmd, Some(30))
.map_err(|e| format!("remote verify: {e}"))?;
Ok(output.success())
}
pub fn execute_substitution(
plan: &SubstitutionPlan,
machine: &Machine,
store_dir: &Path,
timeout_secs: Option<u64>,
) -> Result<String, String> {
match &plan.outcome {
SubstitutionOutcome::LocalHit { store_path } => Ok(store_path.clone()),
SubstitutionOutcome::CacheHit { source, store_hash } => {
let cache_source = extract_cache_source_from_plan(plan)
.ok_or("cache hit but no SSH source in plan")?;
let result =
pull_from_cache(&cache_source, store_hash, store_dir, machine, timeout_secs)?;
if !result.verified {
return Err(format!("cache pull from {source} failed hash verification"));
}
Ok(result.store_path)
}
SubstitutionOutcome::CacheMiss { store_hash } => {
Err(format!(
"cache miss for {store_hash}: build required (use sandbox_run)"
))
}
}
}
pub fn pull_command(source: &CacheSource, hash: &str, staging: &Path) -> String {
let hash_bare = hash.strip_prefix("blake3:").unwrap_or(hash);
match source {
CacheSource::Ssh {
host,
user,
path,
port,
} => {
let port_flag = port.map_or(String::new(), |p| format!(" -p {p}"));
format!(
"mkdir -p '{}' && rsync -az -e 'ssh{port_flag}' '{user}@{host}:{path}/{hash_bare}/' '{}'",
staging.display(),
staging.display()
)
}
CacheSource::Local { path } => {
format!(
"mkdir -p '{}' && cp -a '{path}/{hash_bare}/.' '{}'",
staging.display(),
staging.display()
)
}
}
}
pub fn push_command(source: &CacheSource, hash: &str, store_dir: &Path) -> String {
let hash_bare = hash.strip_prefix("blake3:").unwrap_or(hash);
match source {
CacheSource::Ssh {
host,
user,
path,
port,
} => {
let port_flag = port.map_or(String::new(), |p| format!(" -p {p}"));
format!(
"rsync -az -e 'ssh{port_flag}' '{}/{hash_bare}/' '{user}@{host}:{path}/{hash_bare}/'",
store_dir.display()
)
}
CacheSource::Local { path } => {
format!(
"cp -a '{}/{hash_bare}' '{path}/{hash_bare}'",
store_dir.display()
)
}
}
}
fn remote_check_command(source: &CacheSource, hash: &str) -> String {
let hash_bare = hash.strip_prefix("blake3:").unwrap_or(hash);
match source {
CacheSource::Ssh {
host,
user,
path,
port,
} => {
let port_flag = port.map_or(String::new(), |p| format!(" -p {p}"));
format!("ssh{port_flag} '{user}@{host}' test -d '{path}/{hash_bare}'")
}
CacheSource::Local { path } => {
format!("test -d '{path}/{hash_bare}'")
}
}
}
fn verify_pulled_content(staging: &Path, expected_hash: &str) -> bool {
let content_dir = staging.join("content");
let dir_to_hash = if content_dir.is_dir() {
&content_dir
} else {
staging
};
match crate::tripwire::hasher::hash_directory(dir_to_hash) {
Ok(actual) => {
let expected = if expected_hash.starts_with("blake3:") {
expected_hash.to_string()
} else {
format!("blake3:{expected_hash}")
};
actual == expected
}
Err(_) => false,
}
}
fn extract_cache_source_from_plan(plan: &SubstitutionPlan) -> Option<CacheSource> {
for step in &plan.steps {
if let SubstitutionStep::PullFromCache { source, .. } = step {
let parts: Vec<&str> = source.splitn(2, '@').collect();
if parts.len() == 2 {
return Some(CacheSource::Ssh {
host: parts[1].to_string(),
user: parts[0].to_string(),
path: "/var/lib/forjar/cache".to_string(),
port: None,
});
}
}
}
None
}