use super::*;
#[derive(Clone)]
pub(crate) enum CachedPayload {
Bytes(Arc<Vec<u8>>),
File(NormalizedPath),
}
#[derive(Clone)]
pub(crate) struct CachedArtifact {
pub(crate) meta: ArtifactIndex,
pub(crate) stdout: Arc<Vec<u8>>,
pub(crate) stderr: Arc<Vec<u8>>,
pub(crate) payloads: Option<Arc<[CachedPayload]>>,
pub(crate) last_used: std::time::Instant,
}
impl CachedArtifact {
pub(super) fn from_artifact_data(artifact: &ArtifactData) -> Self {
let meta = ArtifactIndex::new(
artifact.outputs.iter().map(|o| o.name.clone()).collect(),
artifact
.outputs
.iter()
.map(|o| o.payload.size_bytes())
.collect(),
Arc::clone(&artifact.stdout),
Arc::clone(&artifact.stderr),
artifact.exit_code,
);
Self {
meta,
stdout: Arc::clone(&artifact.stdout),
stderr: Arc::clone(&artifact.stderr),
payloads: Some(Arc::from(
artifact
.outputs
.iter()
.map(|o| match &o.payload {
ArtifactPayload::Bytes(b) => CachedPayload::Bytes(Arc::clone(b)),
ArtifactPayload::Path(p) => CachedPayload::File(p.clone()),
})
.collect::<Vec<_>>(),
)),
last_used: std::time::Instant::now(),
}
}
pub(super) fn from_file_payloads(meta: ArtifactIndex, payloads: Vec<NormalizedPath>) -> Self {
let stdout = Arc::clone(&meta.stdout);
let stderr = Arc::clone(&meta.stderr);
Self {
meta,
stdout,
stderr,
payloads: Some(Arc::from(
payloads
.into_iter()
.map(CachedPayload::File)
.collect::<Vec<_>>(),
)),
last_used: std::time::Instant::now(),
}
}
pub(super) fn from_index(meta: ArtifactIndex) -> Self {
let stdout = Arc::clone(&meta.stdout);
let stderr = Arc::clone(&meta.stderr);
Self {
meta,
stdout,
stderr,
payloads: None,
last_used: std::time::Instant::now(),
}
}
}
pub(super) fn ensure_payloads<'a>(
cached: &'a mut CachedArtifact,
artifact_dir: &Path,
key_hex: &str,
) -> Option<&'a [CachedPayload]> {
if cached.payloads.is_none() {
let mut payloads = Vec::with_capacity(cached.meta.output_names.len());
for i in 0..cached.meta.output_names.len() {
let path = artifact_dir.join(format!("{key_hex}_{i}"));
if let Ok(meta) = std::fs::metadata(&path) {
if meta.is_file()
&& cached
.meta
.output_sizes
.get(i)
.is_none_or(|expected| *expected == meta.len())
{
payloads.push(CachedPayload::File(path.into()));
continue;
}
}
let bytes = try_load_packed_payload(artifact_dir, key_hex, i)?;
if let Some(expected) = cached.meta.output_sizes.get(i) {
if *expected != bytes.len() as u64 {
return None;
}
}
payloads.push(CachedPayload::Bytes(Arc::new(bytes)));
}
cached.payloads = Some(Arc::from(payloads));
}
cached.payloads.as_deref()
}
pub(super) fn migrate_meta_files(
artifact_dir: &Path,
artifacts: &DashMap<String, CachedArtifact>,
store: &ArtifactStore,
) -> usize {
use rayon::prelude::*;
let meta_paths: Vec<NormalizedPath> = match std::fs::read_dir(artifact_dir) {
Ok(entries) => entries
.flatten()
.map(|e| e.path().into())
.filter(|p: &NormalizedPath| p.extension().and_then(|e| e.to_str()) == Some("meta"))
.collect(),
Err(_) => return 0,
};
if meta_paths.is_empty() {
return 0;
}
let migrated: Vec<(String, CachedArtifact, NormalizedPath)> = meta_paths
.par_iter()
.filter_map(|path| {
let data = std::fs::read(path).ok()?;
let artifact = bincode::deserialize::<ArtifactData>(&data).ok()?;
let stem: String = path
.file_stem()
.unwrap_or_default()
.to_string_lossy()
.into_owned();
for (i, out) in artifact.outputs.iter().enumerate() {
let data_path = artifact_dir.join(format!("{stem}_{i}"));
if !data_path.exists() {
if let Some(bytes) = out.payload.as_bytes() {
std::fs::write(&data_path, bytes.as_slice()).ok();
}
}
}
let cached = CachedArtifact::from_artifact_data(&artifact);
Some((stem, cached, path.clone()))
})
.collect();
let count = migrated.len();
for (stem, cached, meta_path) in migrated {
store.insert(&stem, &cached.meta);
artifacts.insert(stem, cached);
std::fs::remove_file(&meta_path).ok();
}
if count > 0 {
tracing::info!(count, "migrated legacy .meta files to artifact index");
}
count
}