use anyhow::{Context, Result};
use std::{
io::{Read, Write},
path::{Path, PathBuf},
};
use tar::{Builder, EntryType};
use crate::{
PackerProgress,
canonical::CanonicalTarHeader,
image::LayerBlob,
layers::open_layer,
tracker::{EmittedPathTracker, HardLinkTracker, WhiteoutTracker},
};
fn process_layer<W: Write>(
blob: &LayerBlob,
whiteout: &mut WhiteoutTracker,
emitted: &mut EmittedPathTracker,
hardlinks: &mut HardLinkTracker,
output: &mut Builder<W>,
) -> Result<()> {
let mut archive = open_layer(&blob.path, &blob.media_type)
.with_context(|| format!("opening layer {}", blob.path.display()))?;
let entries = archive.entries().context("reading tar entries")?;
for entry_result in entries {
let mut entry = entry_result.context("reading tar entry")?;
let raw_path = entry.path().context("entry path")?.into_owned();
let path = normalize_path(&raw_path);
if path.as_os_str().is_empty() {
continue;
}
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_default();
if file_name == ".wh..wh..opq" {
let parent = path.parent().unwrap_or(Path::new(""));
whiteout.insert_opaque(parent, blob.index);
continue;
}
if let Some(real_name) = file_name.strip_prefix(".wh.") {
let parent = path.parent().unwrap_or(Path::new(""));
whiteout.insert_simple(&parent.join(real_name), blob.index);
continue;
}
if whiteout.is_suppressed(&path, blob.index) {
let entry_type = entry.header().entry_type();
if entry_type == EntryType::Regular || entry_type == EntryType::Continuous {
let canonical = CanonicalTarHeader::from_entry(&mut entry)
.context("capturing suppressed entry header")?;
let mut data = Vec::new();
entry
.read_to_end(&mut data)
.context("buffering suppressed file content")?;
hardlinks.note_suppressed_file(path, canonical, data);
}
continue;
}
if emitted.contains(&path) {
continue;
}
let canonical =
CanonicalTarHeader::from_entry(&mut entry).context("capturing entry header")?;
if canonical.entry_type() == EntryType::Link {
let link_target = canonical
.link_name()
.context("reading hard link target")?
.context("hard link has no target")?;
let target_path = normalize_path(&link_target);
if whiteout.is_suppressed(&target_path, blob.index) {
hardlinks.record_promotion(path, target_path, blob.index);
} else {
hardlinks.record(path, target_path, blob.index, canonical);
}
continue;
}
canonical
.write_to_tar(&path, &mut entry, output)
.with_context(|| format!("emitting {}", path.display()))?;
emitted.insert(&path);
}
hardlinks.end_layer();
Ok(())
}
fn emit_deferred<W: Write>(
hardlinks: HardLinkTracker,
emitted: &mut EmittedPathTracker,
output: &mut Builder<W>,
) -> Result<()> {
let (deferred, promotions) = hardlinks.drain_sorted();
let mut promotion_groups: std::collections::HashMap<PathBuf, Vec<_>> =
std::collections::HashMap::new();
for promo in promotions {
promotion_groups
.entry(promo.target_path.clone())
.or_default()
.push(promo);
}
let mut group_keys: Vec<PathBuf> = promotion_groups.keys().cloned().collect();
group_keys.sort();
for key in group_keys {
let group = promotion_groups.remove(&key).unwrap();
let primary_idx = group
.iter()
.position(|e| e.file_data.is_some() && !emitted.contains(&e.link_path));
let Some(primary_idx) = primary_idx else {
continue;
};
let primary_link_path = group[primary_idx].link_path.clone();
let (file_canonical, data) = group[primary_idx].file_data.as_ref().unwrap();
let regular_canonical = file_canonical.clone_as_regular();
regular_canonical
.write_to_tar(&primary_link_path, data.as_slice(), output)
.with_context(|| format!("emitting promoted entry {}", primary_link_path.display()))?;
emitted.insert(&primary_link_path);
for (i, promo) in group.iter().enumerate() {
if i == primary_idx || emitted.contains(&promo.link_path) {
continue;
}
let base_canonical = promo
.file_data
.as_ref()
.map(|(c, _)| c)
.unwrap_or(file_canonical);
base_canonical
.write_hardlink_to_tar(&promo.link_path, &primary_link_path, output)
.with_context(|| {
format!(
"emitting within-group hardlink {} → {}",
promo.link_path.display(),
primary_link_path.display()
)
})?;
emitted.insert(&promo.link_path);
}
}
for hl in deferred {
if !emitted.contains(&hl.target_path) {
continue;
}
hl.canonical
.write_to_tar(&hl.link_path, &[] as &[u8], output)
.with_context(|| format!("emitting hard link {}", hl.link_path.display()))?;
emitted.insert(&hl.link_path);
}
Ok(())
}
pub fn merge_layers_into<W: Write>(mut layers: Vec<LayerBlob>, sink: W) -> Result<()> {
layers.sort_by_key(|l| std::cmp::Reverse(l.index));
let mut whiteout = WhiteoutTracker::default();
let mut emitted = EmittedPathTracker::default();
let mut hardlinks = HardLinkTracker::default();
let mut output = Builder::new(sink);
output.mode(tar::HeaderMode::Complete);
for blob in &layers {
process_layer(
blob,
&mut whiteout,
&mut emitted,
&mut hardlinks,
&mut output,
)?;
}
emit_deferred(hardlinks, &mut emitted, &mut output)?;
output.finish()?;
let mut sink = output.into_inner()?;
sink.flush()?;
Ok(())
}
pub fn merge_layers_into_streaming<W: Write>(
receiver: std::sync::mpsc::Receiver<anyhow::Result<LayerBlob>>,
total_layers: usize,
sink: W,
progress_tx: Option<&std::sync::mpsc::SyncSender<PackerProgress>>,
) -> Result<()> {
let mut whiteout = WhiteoutTracker::default();
let mut emitted = EmittedPathTracker::default();
let mut hardlinks = HardLinkTracker::default();
let mut output = Builder::new(sink);
output.mode(tar::HeaderMode::Complete);
let mut buffer: std::collections::HashMap<usize, LayerBlob> = std::collections::HashMap::new();
let mut next_index = total_layers.saturating_sub(1);
let mut received = 0usize;
while received < total_layers {
let blob = match receiver.recv() {
Ok(Ok(blob)) => blob,
Ok(Err(e)) => return Err(e).context("download error received on streaming channel"),
Err(_) => {
anyhow::bail!(
"layer channel closed after {received} of {total_layers} layers; \
sender dropped without completing all layers"
);
}
};
received += 1;
buffer.insert(blob.index, blob);
while let Some(blob) = buffer.remove(&next_index) {
let idx = blob.index;
if let Some(tx) = progress_tx {
let _ = tx.try_send(PackerProgress::LayerStarted(idx));
}
process_layer(
&blob,
&mut whiteout,
&mut emitted,
&mut hardlinks,
&mut output,
)?;
if let Some(tx) = progress_tx {
let _ = tx.try_send(PackerProgress::LayerFinished(idx));
}
if next_index == 0 {
break;
}
next_index -= 1;
}
}
emit_deferred(hardlinks, &mut emitted, &mut output)?;
output.finish()?;
let mut sink = output.into_inner()?;
sink.flush()?;
Ok(())
}
pub fn normalize_path(p: &Path) -> PathBuf {
let s = p.to_string_lossy();
let s = s.trim_start_matches("./").trim_start_matches('/');
PathBuf::from(s)
}