use crate::{
management::db::{self, OCI_DB_MIGRATOR},
oci::{DockerRegistry, OciRegistryPull, Reference},
MicrosandboxError, MicrosandboxResult,
};
#[cfg(feature = "cli")]
use flate2::read::GzDecoder;
use futures::future;
#[cfg(feature = "cli")]
use indicatif::{ProgressBar, ProgressStyle};
#[cfg(feature = "cli")]
use microsandbox_utils::term::{self, MULTI_PROGRESS};
use microsandbox_utils::{env, EXTRACTED_LAYER_SUFFIX, LAYERS_SUBDIR, OCI_DB_FILENAME};
use sqlx::{Pool, Sqlite};
#[cfg(feature = "cli")]
use std::io::{Read, Result as IoResult};
use std::path::{Path, PathBuf};
#[cfg(feature = "cli")]
use tar::Archive;
use tempfile::tempdir;
use tokio::fs;
#[cfg(not(feature = "cli"))]
use tokio::process::Command;
#[cfg(feature = "cli")]
use tokio::task::spawn_blocking;
const DOCKER_REGISTRY: &str = "docker.io";
const SANDBOXES_REGISTRY: &str = "sandboxes.io";
#[cfg(feature = "cli")]
const EXTRACT_LAYERS_MSG: &str = "Extracting layers";
pub async fn pull(
name: Reference,
image: bool,
image_group: bool,
layer_path: Option<PathBuf>,
) -> MicrosandboxResult<()> {
if image && image_group {
return Err(MicrosandboxError::InvalidArgument(
"both image and image_group cannot be true".to_string(),
));
}
if image_group {
return Err(MicrosandboxError::InvalidArgument(
"image group pull is currently not supported".to_string(),
));
}
let registry = name.to_string().split('/').next().unwrap_or("").to_string();
let temp_download_dir = tempdir()?.into_path();
tracing::info!(
"temporary download directory: {}",
temp_download_dir.display()
);
if registry == DOCKER_REGISTRY {
pull_from_docker_registry(&name, &temp_download_dir, layer_path).await
} else if registry == SANDBOXES_REGISTRY {
pull_from_sandboxes_registry(&name, &temp_download_dir, layer_path).await
} else {
Err(MicrosandboxError::InvalidArgument(format!(
"Unsupported registry: {}",
registry
)))
}
}
pub async fn pull_from_docker_registry(
image: &Reference,
download_dir: impl AsRef<Path>,
layer_path: Option<PathBuf>,
) -> MicrosandboxResult<()> {
let download_dir = download_dir.as_ref();
let microsandbox_home_path = env::get_microsandbox_home_path();
let db_path = microsandbox_home_path.join(OCI_DB_FILENAME);
let layers_dir = match layer_path {
Some(path) => path,
None => microsandbox_home_path.join(LAYERS_SUBDIR),
};
fs::create_dir_all(&layers_dir).await?;
let docker_registry = DockerRegistry::new(download_dir, &db_path).await?;
let pool = db::get_or_create_pool(&db_path, &OCI_DB_MIGRATOR).await?;
if check_image_layers(&pool, image, &layers_dir).await? {
tracing::info!("image {} and all its layers exist, skipping pull", image);
return Ok(());
}
docker_registry
.pull_image(image.get_repository(), image.get_selector().clone())
.await?;
let layer_paths = collect_layer_files(download_dir).await?;
#[cfg(feature = "cli")]
let extract_layers_sp = term::create_spinner(
EXTRACT_LAYERS_MSG.to_string(),
None,
Some(layer_paths.len() as u64),
);
let extraction_futures: Vec<_> = layer_paths
.into_iter()
.map(|path| {
let layers_dir = layers_dir.clone();
#[cfg(feature = "cli")]
let extract_layers_sp = extract_layers_sp.clone();
async move {
let result = extract_layer(path, &layers_dir).await;
#[cfg(feature = "cli")]
extract_layers_sp.inc(1);
result
}
})
.collect();
for result in future::join_all(extraction_futures).await {
result?;
}
#[cfg(feature = "cli")]
extract_layers_sp.finish();
Ok(())
}
pub async fn pull_from_sandboxes_registry(
image: &Reference,
download_dir: impl AsRef<Path>,
layer_path: Option<PathBuf>,
) -> MicrosandboxResult<()> {
let repository = image.get_repository();
let docker_ref_str = format!(
"{}/{}",
DOCKER_REGISTRY,
image
.to_string()
.split('/')
.skip(1)
.collect::<Vec<&str>>()
.join("/")
);
let docker_reference: Reference = docker_ref_str.parse()?;
if repository.starts_with("library/") {
tracing::info!("pulling library image from Docker registry for compatibility");
} else {
tracing::warn!(
"Non-library namespace image requested from Sandboxes registry: {}",
repository
);
tracing::warn!(
"Currently using Docker registry for compatibility, but namespace mappings may change in the future"
);
tracing::info!(
"To ensure consistent behavior, consider setting OCI_REGISTRY_DOMAIN=docker.io if you want to use Docker registry consistently"
);
}
pull_from_docker_registry(&docker_reference, download_dir, layer_path).await
}
pub async fn pull_group_from_sandboxes_registry(_group: &Reference) -> MicrosandboxResult<()> {
return Err(MicrosandboxError::NotImplemented(
"Sandboxes registry image group pull is not implemented".to_string(),
));
}
async fn check_image_layers(
pool: &Pool<Sqlite>,
image: &Reference,
layers_dir: impl AsRef<Path>,
) -> MicrosandboxResult<bool> {
let layers_dir = layers_dir.as_ref();
match db::image_exists(pool, &image.to_string()).await {
Ok(true) => {
match db::get_image_layer_digests(pool, &image.to_string()).await {
Ok(layer_digests) => {
tracing::info!("layer_digests: {:?}", layer_digests);
if layer_digests.is_empty() {
tracing::warn!("no layers found for image {}", image);
return Ok(false);
}
for digest in &layer_digests {
let layer_path =
layers_dir.join(format!("{}.{}", digest, EXTRACTED_LAYER_SUFFIX));
if !layer_path.exists() {
tracing::warn!("layer {} not found in layers directory", digest);
return Ok(false);
}
let mut read_dir = fs::read_dir(&layer_path).await?;
let dir_empty = read_dir.next_entry().await?.is_none();
if dir_empty {
tracing::warn!("layer {} exists but is empty", digest);
}
tracing::info!("layer {} found in layers directory", digest);
}
let db_layers = db::get_layers_by_digest(pool, &layer_digests).await?;
if db_layers.len() < layer_digests.len() {
tracing::warn!(
"some layers for image {} exist on disk but missing in db",
image
);
return Ok(false);
}
tracing::info!("all layers for image {} exist and are valid", image);
Ok(true)
}
Err(e) => {
tracing::warn!("error checking layer digests: {}, will pull image", e);
Ok(false)
}
}
}
Ok(false) => {
tracing::warn!("image {} does not exist in db, will pull image", image);
Ok(false)
}
Err(e) => {
tracing::warn!("error checking image existence: {}, will pull image", e);
Ok(false)
}
}
}
async fn extract_layer(
layer_path: impl AsRef<std::path::Path>,
extract_base_dir: impl AsRef<Path>,
) -> MicrosandboxResult<()> {
let layer_path = layer_path.as_ref();
let file_name = layer_path
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| MicrosandboxError::LayerHandling {
source: std::io::Error::new(std::io::ErrorKind::NotFound, "invalid layer file name"),
layer: layer_path.display().to_string(),
})?;
let extract_dir = extract_base_dir
.as_ref()
.join(format!("{}.{}", file_name, EXTRACTED_LAYER_SUFFIX));
if extract_dir.exists() {
let mut read_dir =
fs::read_dir(&extract_dir)
.await
.map_err(|e| MicrosandboxError::LayerHandling {
source: e,
layer: file_name.to_string(),
})?;
if read_dir.next_entry().await?.is_some() {
tracing::info!(
"layer {} already extracted at {}, skipping extraction",
file_name,
extract_dir.display()
);
return Ok(());
}
}
fs::create_dir_all(&extract_dir)
.await
.map_err(|e| MicrosandboxError::LayerHandling {
source: e,
layer: file_name.to_string(),
})?;
tracing::info!(
"extracting layer {} to {}",
file_name,
extract_dir.display()
);
#[cfg(feature = "cli")]
struct ProgressReader<R> {
inner: R,
bar: ProgressBar,
}
#[cfg(feature = "cli")]
impl<R: Read> Read for ProgressReader<R> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let n = self.inner.read(buf)?;
if n > 0 {
self.bar.inc(n as u64);
}
Ok(n)
}
}
#[cfg(feature = "cli")]
{
let total_bytes = fs::metadata(layer_path).await?.len();
let pb = MULTI_PROGRESS.add(ProgressBar::new(total_bytes));
pb.set_style(
ProgressStyle::with_template(
"{prefix:.bold.dim} {bar:40.green/green.dim} {bytes:.bold}/{total_bytes:.dim}",
)
.unwrap()
.progress_chars("=+-"),
);
let digest_short = if let Some(rest) = file_name.strip_prefix("sha256:") {
&rest[..8.min(rest.len())]
} else {
&file_name[..8.min(file_name.len())]
};
pb.set_prefix(format!("{}", digest_short));
let layer_path_clone = layer_path.to_path_buf();
let extract_dir_clone = extract_dir.clone();
let pb_clone = pb.clone();
spawn_blocking(move || -> MicrosandboxResult<()> {
let file = std::fs::File::open(&layer_path_clone)?;
let reader = ProgressReader {
inner: file,
bar: pb_clone.clone(),
};
let decoder = GzDecoder::new(reader);
let mut archive = Archive::new(decoder);
archive.unpack(&extract_dir_clone)?;
Ok(())
})
.await
.map_err(|e| MicrosandboxError::LayerExtraction(format!("{:?}", e)))??;
pb.finish_and_clear();
}
#[cfg(not(feature = "cli"))]
{
let output = Command::new("tar")
.arg("-xzf")
.arg(layer_path)
.arg("-C")
.arg(&extract_dir)
.output()
.await
.map_err(|e| MicrosandboxError::LayerHandling {
source: e,
layer: file_name.to_string(),
})?;
if !output.status.success() {
let error_msg = String::from_utf8_lossy(&output.stderr);
return Err(MicrosandboxError::LayerExtraction(format!(
"Failed to extract layer {}: {}",
file_name, error_msg
)));
}
}
tracing::info!(
"successfully extracted layer {} to {}",
file_name,
extract_dir.display()
);
Ok(())
}
async fn collect_layer_files(dir: impl AsRef<Path>) -> MicrosandboxResult<Vec<PathBuf>> {
let mut layer_paths = Vec::new();
let mut read_dir = fs::read_dir(dir).await?;
while let Ok(Some(entry)) = read_dir.next_entry().await {
let path = entry.path();
if path.is_file() {
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
if file_name.starts_with("sha256:") {
layer_paths.push(path.clone());
}
}
}
}
tracing::info!("found {} layers to extract", layer_paths.len());
Ok(layer_paths)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test_log::test(tokio::test)]
#[ignore = "makes network requests to Docker registry to pull an image"]
async fn test_image_pull_from_docker_registry() -> MicrosandboxResult<()> {
let temp_dir = TempDir::new()?;
let microsandbox_home = temp_dir.path().join("microsandbox_home");
let download_dir = temp_dir.path().join("download");
fs::create_dir_all(µsandbox_home).await?;
fs::create_dir_all(&download_dir).await?;
std::env::set_var("MICROSANDBOX_HOME", microsandbox_home.to_str().unwrap());
let image_ref: Reference = "docker.io/library/nginx:stable-alpine".parse().unwrap();
pull_from_docker_registry(&image_ref, &download_dir, None).await?;
let db_path = microsandbox_home.join(OCI_DB_FILENAME);
let pool = db::get_or_create_pool(&db_path, &OCI_DB_MIGRATOR).await?;
let image_exists = db::image_exists(&pool, &image_ref.to_string()).await?;
assert!(image_exists, "Image should exist in database");
let layers_dir = microsandbox_home.join(LAYERS_SUBDIR);
assert!(layers_dir.exists(), "Layers directory should exist");
let mut entries = fs::read_dir(&layers_dir).await?;
let mut found_extracted_layers = false;
while let Some(entry) = entries.next_entry().await? {
if entry
.file_name()
.to_string_lossy()
.ends_with(EXTRACTED_LAYER_SUFFIX)
{
found_extracted_layers = true;
assert!(
entry.path().is_dir(),
"Extracted layer path should be a directory"
);
}
}
assert!(
found_extracted_layers,
"Should have found extracted layer directories"
);
helper::verify_nginx_files(&layers_dir).await?;
Ok(())
}
}
#[cfg(test)]
mod helper {
use super::*;
pub(super) async fn verify_nginx_files(layers_dir: impl AsRef<Path>) -> MicrosandboxResult<()> {
let mut found_nginx_conf = false;
let mut found_default_conf = false;
let mut found_nginx_binary = false;
let mut entries = fs::read_dir(layers_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if !entry
.file_name()
.to_string_lossy()
.ends_with(EXTRACTED_LAYER_SUFFIX)
{
continue;
}
let layer_path = entry.path();
tracing::info!("checking layer: {}", layer_path.display());
let nginx_conf = layer_path.join("etc").join("nginx").join("nginx.conf");
if nginx_conf.exists() {
found_nginx_conf = true;
tracing::info!("found nginx.conf at {}", nginx_conf.display());
}
let default_conf = layer_path
.join("etc")
.join("nginx")
.join("conf.d")
.join("default.conf");
if default_conf.exists() {
found_default_conf = true;
tracing::info!("found default.conf at {}", default_conf.display());
}
let nginx_binary = layer_path.join("usr").join("sbin").join("nginx");
if nginx_binary.exists() {
found_nginx_binary = true;
tracing::info!("found nginx binary at {}", nginx_binary.display());
}
if found_nginx_conf && found_default_conf && found_nginx_binary {
break;
}
}
assert!(
found_nginx_conf,
"nginx.conf should exist in one of the layers"
);
assert!(
found_default_conf,
"default.conf should exist in one of the layers"
);
assert!(
found_nginx_binary,
"nginx binary should exist in one of the layers"
);
Ok(())
}
}