use std::os::unix::fs::FileExt;
use std::os::unix::io::OwnedFd;
use std::sync::Arc;
use anyhow::{Context, Result};
use base64::Engine;
use composefs::{
INLINE_CONTENT_MAX_V0,
fsverity::FsVerityHashValue,
repository::{ImportContext, ObjectStoreMethod, Repository},
};
use cstorage::{
Image, Layer, ProxiedTarSplitItem, Storage, StorageProxy, TarSplitFdStream, TarSplitItem,
can_bypass_file_permissions,
};
pub use cstorage::init_if_helper;
use crate::oci_image::manifest_identifier;
use crate::progress::{ComponentId, ProgressEvent, ProgressUnit, SharedReporter};
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, OCI_MANIFEST_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE};
use crate::{ContentAndVerity, ImportStats, OciDigest, config_identifier, layer_identifier};
type CstorImportResult<ObjectID> = (ContentAndVerity<ObjectID>, ContentAndVerity<ObjectID>);
const ZERO_PADDING: [u8; 512] = [0u8; 512];
pub async fn import_from_containers_storage<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
image_id: &str,
reference: Option<&str>,
zerocopy: bool,
storage_root: Option<&std::path::Path>,
additional_image_stores: &[&std::path::Path],
reporter: SharedReporter,
) -> Result<(CstorImportResult<ObjectID>, ImportStats)> {
if can_bypass_file_permissions() {
let repo = Arc::clone(repo);
let image_id = image_id.to_owned();
let reference = reference.map(|s| s.to_owned());
let storage_root = storage_root.map(|p| p.to_path_buf());
let additional_image_stores: Vec<std::path::PathBuf> = additional_image_stores
.iter()
.map(|p| p.to_path_buf())
.collect();
tokio::task::spawn_blocking(move || {
import_from_containers_storage_direct(
&repo,
&image_id,
reference.as_deref(),
zerocopy,
storage_root.as_deref(),
&additional_image_stores,
reporter,
)
})
.await
.context("spawn_blocking failed")?
} else {
if storage_root.is_some() || !additional_image_stores.is_empty() {
anyhow::bail!(
"storage_root and additional_image_stores are not supported in rootless mode"
);
}
import_from_containers_storage_proxied(repo, image_id, reference, zerocopy, reporter).await
}
}
fn import_from_containers_storage_direct<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
image_id: &str,
reference: Option<&str>,
zerocopy: bool,
storage_root: Option<&std::path::Path>,
additional_image_stores: &[std::path::PathBuf],
reporter: SharedReporter,
) -> Result<(CstorImportResult<ObjectID>, ImportStats)> {
let mut stats = ImportStats::default();
let mut ctx = ImportContext::default();
let mut stores = if let Some(root) = storage_root {
vec![
Storage::open(root)
.with_context(|| format!("Failed to open storage root at {}", root.display()))?,
]
} else {
match Storage::discover_all() {
Ok(s) => s,
Err(e) if !additional_image_stores.is_empty() => {
tracing::warn!(
"containers-storage auto-discovery failed ({e:#}), \
using additional image stores only"
);
Vec::new()
}
Err(e) => return Err(e).context("Failed to discover containers-storage"),
}
};
for path in additional_image_stores {
stores.push(Storage::open(path).with_context(|| {
format!(
"Failed to open additional image store at {}",
path.display()
)
})?);
}
let (_image_store, image) = stores
.iter()
.find_map(|s| {
Image::open(s, image_id)
.or_else(|_| s.find_image_by_name(image_id))
.ok()
.map(|img| (s, img))
})
.with_context(|| format!("Failed to find image {image_id} in any storage"))?;
let storage_layer_ids = image
.storage_layer_ids(&stores)
.context("Failed to get storage layer IDs from image")?;
let config = image.config().context("Failed to read image config")?;
let diff_ids: Vec<OciDigest> = config
.rootfs()
.diff_ids()
.iter()
.map(|s| s.parse::<OciDigest>().context("parsing diff_id"))
.collect::<Result<_>>()?;
anyhow::ensure!(
storage_layer_ids.len() == diff_ids.len(),
"Layer count mismatch: {} layers in storage, {} diff_ids in config",
storage_layer_ids.len(),
diff_ids.len()
);
stats.layers = storage_layer_ids.len() as u64;
let mut layer_refs = Vec::with_capacity(storage_layer_ids.len());
for (storage_layer_id, diff_id) in storage_layer_ids.iter().zip(diff_ids.iter()) {
let content_id = layer_identifier(diff_id);
let id = ComponentId::from(diff_id.to_string());
let layer_verity = if let Some(existing) = repo.has_stream(&content_id)? {
reporter.report(ProgressEvent::Skipped { id });
stats.layers_already_present += 1;
existing
} else {
reporter.report(ProgressEvent::Started {
id: id.clone(),
total: None,
unit: ProgressUnit::Bytes,
});
let (layer_store, layer) = stores
.iter()
.find_map(|s| Layer::open(s, storage_layer_id).ok().map(|l| (s, l)))
.with_context(|| format!("Failed to open layer {}", storage_layer_id))?;
let (verity, layer_stats) =
import_layer_direct(repo, layer_store, &layer, diff_id, zerocopy, &mut ctx)?;
let bytes = layer_stats.new_bytes();
stats.merge(&layer_stats);
reporter.report(ProgressEvent::Done {
id,
transferred: bytes,
});
verity
};
layer_refs.push((diff_id.clone(), layer_verity));
}
reporter.report(ProgressEvent::Message("Layers imported".to_string()));
finalize_import(repo, &image, &layer_refs, reference, &reporter, stats)
}
async fn import_from_containers_storage_proxied<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
image_id: &str,
reference: Option<&str>,
zerocopy: bool,
reporter: SharedReporter,
) -> Result<(CstorImportResult<ObjectID>, ImportStats)> {
let mut stats = ImportStats::default();
let mut ctx = ImportContext::default();
let mut proxy = StorageProxy::spawn()
.await
.context("Failed to spawn userns helper")?
.context("Expected proxy but got None")?;
let storage_paths = discover_storage_paths()?;
let mut image_info = None;
let mut found_storage_path = String::new();
for path in &storage_paths {
match proxy.get_image(path, image_id).await {
Ok(info) => {
found_storage_path = path.clone();
image_info = Some(info);
break;
}
Err(_) => continue,
}
}
let image_info =
image_info.with_context(|| format!("Failed to find image {} in any storage", image_id))?;
let storage_path = found_storage_path;
anyhow::ensure!(
image_info.storage_layer_ids.len() == image_info.layer_diff_ids.len(),
"Layer count mismatch: {} layers in storage, {} diff_ids in config",
image_info.storage_layer_ids.len(),
image_info.layer_diff_ids.len()
);
stats.layers = image_info.storage_layer_ids.len() as u64;
let mut layer_refs = Vec::with_capacity(image_info.storage_layer_ids.len());
for (storage_layer_id, diff_id) in image_info
.storage_layer_ids
.iter()
.zip(image_info.layer_diff_ids.iter())
{
let content_id = layer_identifier(diff_id);
let id = ComponentId::from(diff_id.to_string());
let layer_verity = if let Some(existing) = repo.has_stream(&content_id)? {
reporter.report(ProgressEvent::Skipped { id });
stats.layers_already_present += 1;
existing
} else {
reporter.report(ProgressEvent::Started {
id: id.clone(),
total: None,
unit: ProgressUnit::Bytes,
});
let (verity, layer_stats) = import_layer_proxied(
repo,
&mut proxy,
&storage_path,
storage_layer_id,
diff_id,
zerocopy,
&mut ctx,
)
.await?;
let bytes = layer_stats.new_bytes();
stats.merge(&layer_stats);
reporter.report(ProgressEvent::Done {
id,
transferred: bytes,
});
verity
};
layer_refs.push((diff_id.clone(), layer_verity));
}
reporter.report(ProgressEvent::Message("Layers imported".to_string()));
let stores = Storage::discover_all().context("Failed to discover containers-storage")?;
let (_, image) = stores
.iter()
.find_map(|s| Image::open(s, &image_info.id).ok().map(|img| (s, img)))
.with_context(|| format!("Failed to open image {}", image_info.id))?;
proxy.shutdown().await.context("Failed to shutdown proxy")?;
finalize_import(repo, &image, &layer_refs, reference, &reporter, stats)
}
fn finalize_import<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
image: &Image,
layer_refs: &[(OciDigest, ObjectID)],
reference: Option<&str>,
reporter: &SharedReporter,
stats: ImportStats,
) -> Result<(CstorImportResult<ObjectID>, ImportStats)> {
let config_key = format!("sha256:{}", image.id());
let encoded_key = base64::engine::general_purpose::STANDARD.encode(config_key.as_bytes());
let config_json = image
.read_metadata(&encoded_key)
.context("Failed to read config bytes")?;
let config_digest = crate::sha256_content_digest(&config_json);
let content_id = config_identifier(&config_digest);
let config_verity = if let Some(existing) = repo.has_stream(&content_id)? {
reporter.report(ProgressEvent::Message(format!(
"Already have config {config_digest}"
)));
existing
} else {
reporter.report(ProgressEvent::Message(format!(
"Creating config splitstream {config_digest}"
)));
let mut writer = repo.create_stream(OCI_CONFIG_CONTENT_TYPE)?;
for (diff_id, verity) in layer_refs {
let key: &str = diff_id.as_ref();
writer.add_named_stream_ref(key, verity);
}
writer.write_external(&config_json)?;
repo.write_stream(writer, &content_id, None)?
};
let manifest_json = image
.read_manifest_raw()
.context("Failed to read manifest bytes")?;
let manifest_digest = crate::sha256_content_digest(&manifest_json);
let manifest_content_id = manifest_identifier(&manifest_digest);
let manifest_verity = if let Some(existing) = repo.has_stream(&manifest_content_id)? {
reporter.report(ProgressEvent::Message(format!(
"Already have manifest {manifest_digest}"
)));
existing
} else {
reporter.report(ProgressEvent::Message(format!(
"Creating manifest splitstream {manifest_digest}"
)));
let mut writer = repo.create_stream(OCI_MANIFEST_CONTENT_TYPE)?;
let config_ref_key = format!("config:{config_digest}");
writer.add_named_stream_ref(&config_ref_key, &config_verity);
for (diff_id, verity) in layer_refs {
let key: &str = diff_id.as_ref();
writer.add_named_stream_ref(key, verity);
}
writer.write_external(&manifest_json)?;
repo.write_stream(writer, &manifest_content_id, None)?
};
let existing_erofs = crate::composefs_erofs_for_manifest(
repo,
&manifest_digest,
Some(&manifest_verity),
repo.erofs_version(),
)?;
if existing_erofs.is_none() {
let erofs = crate::ensure_oci_composefs_erofs(
repo,
&manifest_digest,
Some(&manifest_verity),
reference,
)?;
if erofs.is_none() {
if let Some(name) = reference {
crate::oci_image::tag_image(repo, &manifest_digest, name)?;
}
}
} else if let Some(name) = reference {
crate::oci_image::tag_image(repo, &manifest_digest, name)?;
}
let config_verity = repo
.has_stream(&content_id)?
.context("config splitstream missing after finalization")?;
let manifest_verity = repo
.has_stream(&manifest_content_id)?
.context("manifest splitstream missing after finalization")?;
Ok((
(
(manifest_digest, manifest_verity),
(config_digest, config_verity),
),
stats,
))
}
fn import_layer_direct<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
storage: &Storage,
layer: &Layer,
diff_id: &OciDigest,
zerocopy: bool,
ctx: &mut ImportContext,
) -> Result<(ObjectID, ImportStats)> {
let mut stats = ImportStats::default();
let mut inline_buf = Vec::new();
let mut stream = TarSplitFdStream::new(storage, layer)
.with_context(|| format!("Failed to create tar-split stream for layer {}", layer.id()))?;
let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE)?;
let content_id = layer_identifier(diff_id);
let mut prev_file_padding: usize = 0;
while let Some(item) = stream.next()? {
match item {
TarSplitItem::Segment(bytes) => {
let header_bytes = &bytes[prev_file_padding..];
stats.bytes_inlined += header_bytes.len() as u64;
writer.write_inline(header_bytes);
prev_file_padding = 0;
}
TarSplitItem::FileContent { fd, size, name } => {
process_file_content(
repo,
&mut writer,
&mut stats,
ctx,
fd,
size,
&name,
zerocopy,
&mut inline_buf,
)?;
let padding_size = (size as usize).next_multiple_of(512) - size as usize;
if padding_size > 0 {
stats.bytes_inlined += padding_size as u64;
writer.write_inline(&ZERO_PADDING[..padding_size]);
}
prev_file_padding = padding_size;
}
}
}
let verity = repo.write_stream(writer, &content_id, None)?;
Ok((verity, stats))
}
async fn import_layer_proxied<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
proxy: &mut StorageProxy,
storage_path: &str,
layer_id: &str,
diff_id: &OciDigest,
zerocopy: bool,
ctx: &mut ImportContext,
) -> Result<(ObjectID, ImportStats)> {
let mut stats = ImportStats::default();
let mut inline_buf = Vec::new();
let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE)?;
let content_id = layer_identifier(diff_id);
let mut prev_file_padding: usize = 0;
let mut stream = proxy
.stream_layer(storage_path, layer_id)
.await
.with_context(|| format!("Failed to start streaming layer {}", layer_id))?;
while let Some(item) = stream
.next()
.await
.with_context(|| format!("Failed to receive stream item for layer {}", layer_id))?
{
match item {
ProxiedTarSplitItem::Segment(bytes) => {
let header_bytes = &bytes[prev_file_padding..];
stats.bytes_inlined += header_bytes.len() as u64;
writer.write_inline(header_bytes);
prev_file_padding = 0;
}
ProxiedTarSplitItem::FileContent { fd, size, name } => {
process_file_content(
repo,
&mut writer,
&mut stats,
ctx,
fd,
size,
&name,
zerocopy,
&mut inline_buf,
)?;
let padding_size = (size as usize).next_multiple_of(512) - size as usize;
if padding_size > 0 {
stats.bytes_inlined += padding_size as u64;
writer.write_inline(&ZERO_PADDING[..padding_size]);
}
prev_file_padding = padding_size;
}
}
}
let verity = repo.write_stream(writer, &content_id, None)?;
Ok((verity, stats))
}
#[allow(clippy::too_many_arguments)]
fn process_file_content<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
writer: &mut composefs::splitstream::SplitStreamWriter<ObjectID>,
stats: &mut ImportStats,
ctx: &mut ImportContext,
fd: OwnedFd,
size: u64,
name: &str,
zerocopy: bool,
inline_buf: &mut Vec<u8>,
) -> Result<()> {
let file = std::fs::File::from(fd);
if size as usize > INLINE_CONTENT_MAX_V0 {
let (object_id, method) = if zerocopy {
repo.ensure_object_from_file_zerocopy(&file, size, ctx)
} else {
repo.ensure_object_from_file(&file, size, ctx)
}
.with_context(|| format!("Failed to store object for {}", name))?;
match method {
ObjectStoreMethod::Reflinked => {
stats.objects_reflinked += 1;
stats.bytes_reflinked += size;
}
ObjectStoreMethod::Hardlinked => {
stats.objects_hardlinked += 1;
stats.bytes_hardlinked += size;
}
ObjectStoreMethod::Copied => {
stats.objects_copied += 1;
stats.bytes_copied += size;
}
ObjectStoreMethod::AlreadyPresent => {
stats.objects_already_present += 1;
}
}
writer.add_external_size(size);
writer.write_reference(object_id)?;
} else {
inline_buf.resize(size as usize, 0);
file.read_exact_at(inline_buf, 0)?;
stats.bytes_inlined += size;
writer.write_inline(inline_buf);
}
Ok(())
}
fn discover_storage_paths() -> Result<Vec<String>> {
let mut paths = Vec::new();
if let Ok(home) = std::env::var("HOME") {
let user_path = format!("{}/.local/share/containers/storage", home);
if std::path::Path::new(&user_path).exists() {
paths.push(user_path);
}
}
let system_path = "/var/lib/containers/storage";
if std::path::Path::new(system_path).exists() {
paths.push(system_path.to_string());
}
if let Ok(opts) = std::env::var("STORAGE_OPTS") {
for item in opts.split(',') {
let item = item.trim();
if let Some(path) = item.strip_prefix("additionalimagestore=")
&& std::path::Path::new(path).exists()
{
paths.push(path.to_string());
}
}
}
anyhow::ensure!(
!paths.is_empty(),
"Could not find containers-storage at standard locations"
);
Ok(paths)
}
pub fn parse_containers_storage_ref(imgref: &str) -> Option<&str> {
imgref.strip_prefix("containers-storage:")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_containers_storage_ref() {
assert_eq!(
parse_containers_storage_ref("containers-storage:sha256:abc123"),
Some("sha256:abc123")
);
assert_eq!(
parse_containers_storage_ref("containers-storage:quay.io/fedora:latest"),
Some("quay.io/fedora:latest")
);
assert_eq!(
parse_containers_storage_ref("docker://quay.io/fedora:latest"),
None
);
assert_eq!(parse_containers_storage_ref("sha256:abc123"), None);
}
}