use anyhow::Result;
use client_core::{
constants::docker::get_docker_work_dir, upgrade_strategy::UpgradeStrategy, utils::archive,
};
use std::io::{Read, Write};
use std::path::{Component, Path};
use std::time::Instant;
use tracing::{error, info};
use zip::read::ZipFile;
pub mod env_manager;
fn should_skip_file(file_name: &str) -> bool {
if file_name.starts_with("__MACOSX")
|| file_name.ends_with(".DS_Store")
|| file_name.starts_with("._")
|| file_name.ends_with(".tmp")
|| file_name.ends_with(".temp")
|| file_name.ends_with(".bak")
{
return true;
}
if file_name.starts_with(".git/")
|| file_name == ".gitignore"
|| file_name == ".gitattributes"
|| file_name == ".gitmodules"
{
return true;
}
if file_name.starts_with(".vscode/")
|| file_name.starts_with(".idea/")
|| file_name.starts_with(".vs/")
{
return true;
}
if file_name == ".env"
|| file_name.starts_with(".env.")
|| file_name == ".dockerignore"
|| file_name == ".editorconfig"
|| file_name.starts_with(".prettier")
|| file_name.starts_with(".eslint")
{
return false;
}
false
}
fn contains_unsafe_component(path: &Path) -> bool {
path.components().any(|component| {
matches!(
component,
Component::ParentDir | Component::RootDir | Component::Prefix(_)
)
})
}
pub fn validate_archive_paths(archive_path: &Path) -> Result<()> {
let format = archive::detect_format_by_magic(archive_path)?;
match format {
client_core::utils::archive::ArchiveFormat::Zip => {
let file = std::fs::File::open(archive_path)?;
let mut archive = zip::ZipArchive::new(file)?;
for i in 0..archive.len() {
let file = archive.by_index(i)?;
let raw_name = file.name().to_string();
let Some(enclosed_name) = file.enclosed_name() else {
return Err(anyhow::anyhow!(
"Unsafe archive path detected: {}",
raw_name
));
};
if contains_unsafe_component(&enclosed_name) {
return Err(anyhow::anyhow!(
"Unsafe archive path detected: {}",
raw_name
));
}
}
}
client_core::utils::archive::ArchiveFormat::TarGz => {
let tar_gz = std::fs::File::open(archive_path)?;
let decoder = flate2::read::GzDecoder::new(tar_gz);
let mut archive = tar::Archive::new(decoder);
for entry in archive.entries()? {
let entry = entry?;
let entry_type = entry.header().entry_type();
if entry_type.is_symlink() || entry_type.is_hard_link() {
return Err(anyhow::anyhow!(
"Archive links are not allowed: {}",
entry.path()?.display()
));
}
let path = entry.path()?;
if contains_unsafe_component(&path) {
return Err(anyhow::anyhow!(
"Unsafe archive path detected: {}",
path.display()
));
}
}
}
}
Ok(())
}
#[allow(dead_code)]
pub fn copy_with_progress<R: Read, W: Write>(
mut reader: R,
mut writer: W,
total_size: u64,
file_name: &str,
) -> std::io::Result<u64> {
let mut buf = [0u8; 8192]; let mut copied = 0u64;
let mut last_percent = 0;
loop {
let bytes_read = reader.read(&mut buf)?;
if bytes_read == 0 {
break;
}
writer.write_all(&buf[..bytes_read])?;
copied += bytes_read as u64;
if total_size > 100 * 1024 * 1024 {
let percent = (copied * 100).checked_div(total_size).unwrap_or(0);
let mb_copied = copied as f64 / 1024.0 / 1024.0;
let mb_total = total_size as f64 / 1024.0 / 1024.0;
if (percent != last_percent && percent.is_multiple_of(10))
|| (copied.is_multiple_of(100 * 1024 * 1024) && copied > 0)
{
info!(
" ⏳ {} copy progress: {:.1}% ({:.1}/{:.1} MB)",
file_name, percent as f64, mb_copied, mb_total
);
last_percent = percent;
}
}
}
Ok(copied)
}
fn force_extract_file(
entry: &mut ZipFile<std::fs::File>,
target_path: &std::path::Path,
) -> Result<()> {
if target_path.exists() {
if target_path.is_dir() {
info!("🗑️ Force removing directory: {}", target_path.display());
std::fs::remove_dir_all(target_path)?;
} else {
info!("🗑️ Force removing file: {}", target_path.display());
std::fs::remove_file(target_path)?;
}
}
if let Some(parent) = target_path.parent()
&& !parent.exists()
{
std::fs::create_dir_all(parent)?;
}
if entry.is_dir() {
std::fs::create_dir_all(target_path).map_err(|e| {
error!(
"❌ Failed to create directory: {} - error: {}",
target_path.display(),
e
);
e
})?;
} else {
let mut outfile = std::fs::File::create(target_path).map_err(|e| {
error!(
"❌ Failed to create file: {} - error: {}",
target_path.display(),
e
);
e
})?;
std::io::copy(entry, &mut outfile).map_err(|e| {
error!(
"❌ Failed to write file: {} - error: {}",
target_path.display(),
e
);
e
})?;
}
Ok(())
}
fn handle_extraction(
entry: &mut ZipFile<std::fs::File>,
dst: &std::path::Path,
extracted_files: &mut usize,
extracted_size: &mut u64,
) -> Result<()> {
force_extract_file(entry, dst)?;
*extracted_files += 1;
*extracted_size += entry.size();
Ok(())
}
fn ensure_parent_dir(path: &std::path::Path) -> Result<()> {
if let Some(parent) = path.parent()
&& !parent.exists()
{
std::fs::create_dir_all(parent)?;
}
Ok(())
}
fn is_upload_directory_path(path: &std::path::Path) -> bool {
path.components().any(|component| {
client_core::constants::docker::EXCLUDE_DIRS
.iter()
.any(|d| component.as_os_str() == *d)
})
}
fn safe_remove_docker_directory(output_dir: &std::path::Path) -> Result<()> {
if !output_dir.exists() {
return Ok(());
}
info!(
"🧹 Safely cleaning docker directory (keeping upload): {}",
output_dir.display()
);
for entry in std::fs::read_dir(output_dir)? {
let entry = entry?;
let path = entry.path();
let file_name = entry.file_name();
if client_core::constants::docker::EXCLUDE_DIRS
.iter()
.any(|d| file_name.as_os_str() == *d)
{
info!("🛡️ Keeping directory: {}", path.display());
continue;
}
if path.is_dir() {
info!("🗑️ Removing directory: {}", path.display());
std::fs::remove_dir_all(&path)?;
} else {
info!("🗑️ Removing file: {}", path.display());
std::fs::remove_file(&path)?;
}
}
info!("✅ Docker directory cleanup completed, upload directory preserved");
Ok(())
}
pub async fn extract_docker_service(
archive_path: &std::path::Path,
upgrade_strategy: &UpgradeStrategy,
) -> Result<()> {
let extract_start = Instant::now();
info!(
"📦 Starting Docker service package extraction: {}",
archive_path.display()
);
if !archive_path.exists() {
return Err(anyhow::anyhow!(
"{}",
t!("utils.file_not_exists", path = archive_path.display())
));
}
let format = archive::detect_format_by_magic(archive_path)?;
info!("✅ Detected archive format: {:?}", format);
validate_archive_paths(archive_path)?;
match format {
client_core::utils::archive::ArchiveFormat::Zip => {
extract_zip_archive(archive_path, upgrade_strategy, extract_start).await
}
client_core::utils::archive::ArchiveFormat::TarGz => {
extract_tar_gz_archive(archive_path, upgrade_strategy, extract_start).await
}
}
}
async fn extract_zip_archive(
zip_path: &std::path::Path,
upgrade_strategy: &UpgradeStrategy,
extract_start: Instant,
) -> Result<()> {
let file = std::fs::File::open(zip_path)?;
let mut archive = zip::ZipArchive::new(file)?;
info!(
"✅ ZIP opened successfully, contains {} files",
archive.len()
);
match upgrade_strategy {
UpgradeStrategy::FullUpgrade { .. } => {
let output_dir = std::path::Path::new("docker");
if output_dir.exists() {
safe_remove_docker_directory(output_dir)?;
} else {
std::fs::create_dir_all(output_dir)?;
}
let mut extracted_files = 0;
let mut extracted_size = 0u64;
let total_files = archive.len();
info!("🚀 Starting extraction of {} files...", total_files);
for i in 0..archive.len() {
let mut file = archive.by_index(i)?;
let file_name = file.name().to_string();
let enclosed_name = file.enclosed_name().ok_or_else(|| {
anyhow::anyhow!("Unsafe archive path detected: {}", file_name)
})?;
if should_skip_file(&file_name) {
info!("⏩ Skipping file: {}", file_name);
continue;
}
let clean_path = if enclosed_name.starts_with("docker") {
enclosed_name
.strip_prefix("docker")
.unwrap_or(&enclosed_name)
} else {
enclosed_name.as_path()
};
let target_path = output_dir.join(clean_path);
if is_upload_directory_path(&target_path) {
if target_path.exists() {
info!(
"🛡️ Keeping existing upload directory, skipping extraction: {}",
target_path.display()
);
continue;
} else {
info!(
"📁 Creating new upload directory structure: {}",
target_path.display()
);
}
}
if file.is_dir() {
std::fs::create_dir_all(&target_path)?;
} else {
force_extract_file(&mut file, &target_path)?;
extracted_files += 1;
extracted_size += file.size();
if extracted_files % (total_files / 10).max(1) == 0 {
let percentage = (extracted_files * 100) / total_files;
info!(
"📁 Extraction progress: {}% ({}/{} files, {:.1} MB)",
percentage,
extracted_files,
total_files,
extracted_size as f64 / 1024.0 / 1024.0
);
}
}
}
let elapsed = extract_start.elapsed();
info!("🎉 Docker service package extraction completed!");
info!(" 📁 Extracted files: {}", extracted_files);
info!(
" 📏 Total data size: {:.1} MB",
extracted_size as f64 / 1024.0 / 1024.0
);
info!(" ⏱️ Elapsed: {:.2} seconds", elapsed.as_secs_f64());
}
UpgradeStrategy::PatchUpgrade {
patch_info,
download_type: _,
..
} => {
let change_files = patch_info.get_changed_files();
let work_dir = get_docker_work_dir();
let upgrade_change_file_or_dir = change_files
.iter()
.map(|path| work_dir.join(path))
.collect::<Vec<_>>();
for file_or_dir in upgrade_change_file_or_dir {
if is_upload_directory_path(&file_or_dir) {
info!(
"🛡️ Keeping upload directory, skipping deletion: {}",
file_or_dir.display()
);
continue;
}
if file_or_dir.is_file() {
std::fs::remove_file(file_or_dir)?;
} else if file_or_dir.is_dir() {
std::fs::remove_dir_all(file_or_dir)?;
} else {
info!(
"File or directory does not exist, skipping: {}",
file_or_dir.display()
);
}
}
let operations = patch_info.operations.clone();
let mut extracted_files = 0;
let mut extracted_size = 0u64;
let total_files = archive.len();
info!("🚀 Starting extraction of {} files...", total_files);
if let Some(replace) = operations.replace {
let replace_files = replace.files;
let replace_dirs = replace.directories;
for file in replace_files {
let zip_path = format!("docker/{}", file.trim_start_matches('/'));
info!("🔍 Locating file: {} -> {}", file, zip_path);
let mut entry = archive.by_name(&zip_path).map_err(|e| {
anyhow::anyhow!(
"{}",
t!(
"utils.file_not_found_in_archive",
path = zip_path,
error = e.to_string()
)
)
})?;
let dst = work_dir.join(&file);
if is_upload_directory_path(&dst) {
if dst.exists() {
info!(
"🛡️ Keeping existing directory, skipping replacement: {}",
dst.display()
);
continue;
} else {
info!(
"📁 Creating new protected directory structure: {}",
dst.display()
);
}
}
force_extract_file(&mut entry, &dst)?;
extracted_files += 1;
extracted_size += entry.size();
}
for dir in replace_dirs {
let zip_dir_path = format!("docker/{}", dir.trim_start_matches('/'));
info!("📁 Processing directory: {} -> {}", dir, zip_dir_path);
let target_dir = work_dir.join(&dir);
if is_upload_directory_path(&target_dir) && target_dir.exists() {
info!(
"🛡️ Keeping existing directory, skipping directory replacement: {}",
target_dir.display()
);
continue;
}
if target_dir.exists() {
info!("🗑️ Force removing directory: {}", target_dir.display());
std::fs::remove_dir_all(&target_dir)?;
}
for i in 0..archive.len() {
let mut entry = archive.by_index(i)?;
let entry_name = entry.name();
if entry_name.starts_with(&zip_dir_path) {
let relative_path = entry_name
.strip_prefix(&zip_dir_path)
.unwrap_or("")
.trim_start_matches('/');
if relative_path.is_empty() && entry.is_dir() {
continue;
}
let dst = target_dir.join(relative_path);
ensure_parent_dir(&dst)?;
handle_extraction(
&mut entry,
&dst,
&mut extracted_files,
&mut extracted_size,
)?;
}
}
}
}
if let Some(delete) = operations.delete {
for file in delete.files {
let path = work_dir.join(file);
if is_upload_directory_path(&path) {
info!(
"🛡️ Keeping upload directory, skipping file deletion: {}",
path.display()
);
continue;
}
info!("🗑️ Removing file: {}", path.display());
if path.is_file() {
std::fs::remove_file(&path)?;
} else if path.exists() {
std::fs::remove_file(&path).or_else(|_| std::fs::remove_dir_all(&path))?;
} else {
info!("File does not exist, skipping: {}", path.display());
}
}
for dir in delete.directories {
let path = work_dir.join(dir);
if is_upload_directory_path(&path) {
info!(
"🛡️ Keeping upload directory, skipping directory deletion: {}",
path.display()
);
continue;
}
info!("🗑️ Removing directory: {}", path.display());
if path.is_dir() {
std::fs::remove_dir_all(&path)?;
} else if path.exists() {
std::fs::remove_file(&path).or_else(|_| std::fs::remove_dir_all(&path))?;
} else {
info!("Directory does not exist, skipping: {}", path.display());
}
}
}
use client_core::constants::sql::CRITICAL_UPGRADE_FILES;
for critical_file in CRITICAL_UPGRADE_FILES {
let zip_path = format!("docker/{}", critical_file);
let dst_path = work_dir.join(critical_file);
match archive.by_name(&zip_path) {
Ok(mut entry) => {
info!("🔧 Force updating critical file: {}", critical_file);
force_extract_file(&mut entry, &dst_path)?;
info!("✅ Critical file updated: {}", critical_file);
}
Err(_) => {
info!("⏭️ Critical file not present in archive: {}", zip_path);
}
}
}
}
UpgradeStrategy::NoUpgrade { .. } => {
return Err(anyhow::anyhow!(
"{}",
t!("utils.no_upgrade_extract_unsupported")
));
}
}
Ok(())
}
async fn extract_tar_gz_archive(
tar_gz_path: &std::path::Path,
upgrade_strategy: &UpgradeStrategy,
extract_start: Instant,
) -> Result<()> {
let tar_gz_path = tar_gz_path.to_path_buf();
let strategy = upgrade_strategy.clone();
tokio::task::spawn_blocking(move || {
extract_tar_gz_blocking(&tar_gz_path, &strategy, extract_start)
})
.await
.map_err(|e| anyhow::anyhow!("{}", t!("utils.extract_task_failed", error = e.to_string())))?
}
fn extract_tar_gz_blocking(
tar_gz_path: &std::path::Path,
upgrade_strategy: &UpgradeStrategy,
extract_start: Instant,
) -> Result<()> {
use flate2::read::GzDecoder;
use tar::Archive;
let tar_gz = std::fs::File::open(tar_gz_path)?;
let decoder = GzDecoder::new(tar_gz);
let mut archive = Archive::new(decoder);
let output_dir = std::path::Path::new("docker");
let mut extracted_files = 0;
let mut extracted_size = 0u64;
match upgrade_strategy {
UpgradeStrategy::FullUpgrade { .. } => {
if output_dir.exists() {
safe_remove_docker_directory(output_dir)?;
} else {
std::fs::create_dir_all(output_dir)?;
}
info!("🚀 Starting TAR.GZ extraction...");
for entry in archive.entries()? {
let mut entry: tar::Entry<flate2::read::GzDecoder<std::fs::File>> = entry?;
let path = entry.path()?;
let entry_type = entry.header().entry_type();
if entry_type.is_symlink() || entry_type.is_hard_link() {
return Err(anyhow::anyhow!(
"Archive links are not allowed: {}",
path.display()
));
}
if contains_unsafe_component(&path) {
return Err(anyhow::anyhow!(
"Unsafe archive path detected: {}",
path.display()
));
}
if should_skip_tar_entry(&path) {
continue;
}
let clean_path = path.strip_prefix("docker").unwrap_or(&path);
let target_path = output_dir.join(clean_path);
if is_upload_directory_path(&target_path) && target_path.exists() {
info!(
"🛡️ Keeping existing directory, skipping: {}",
target_path.display()
);
continue;
}
if let Some(parent) = target_path.parent()
&& !parent.exists()
{
std::fs::create_dir_all(parent)?;
}
entry.unpack(&target_path)?;
extracted_files += 1;
extracted_size += entry.size();
if extracted_files % 10 == 0 {
info!(
"📁 Extraction progress: {} files ({:.1} MB)",
extracted_files,
extracted_size as f64 / 1024.0 / 1024.0
);
}
}
let elapsed = extract_start.elapsed();
info!("🎉 Docker service package extraction completed!");
info!(" 📁 Extracted files: {}", extracted_files);
info!(
" 📏 Total data size: {:.1} MB",
extracted_size as f64 / 1024.0 / 1024.0
);
info!(" ⏱️ Elapsed: {:.2} seconds", elapsed.as_secs_f64());
}
UpgradeStrategy::PatchUpgrade { .. } => {
return Err(anyhow::anyhow!("{}", t!("utils.tar_gz_patch_unsupported")));
}
UpgradeStrategy::NoUpgrade { .. } => {
return Err(anyhow::anyhow!(
"{}",
t!("utils.no_upgrade_extract_unsupported")
));
}
}
Ok(())
}
fn should_skip_tar_entry(path: &std::path::Path) -> bool {
let s = path.to_string_lossy();
s.contains("__MACOSX") || s.contains(".DS_Store") || s.contains("._")
}
pub fn setup_logging(verbose: bool) {
#[allow(unused_imports)]
use tracing_subscriber::{EnvFilter, fmt, util::SubscriberInitExt};
let default_level = if verbose { "debug" } else { "info" };
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(default_level))
.add_directive("reqwest=warn".parse().unwrap())
.add_directive("tokio=warn".parse().unwrap())
.add_directive("hyper=warn".parse().unwrap());
if let Ok(log_file) = std::env::var("DUCK_LOG_FILE") {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_file)
.expect("Failed to create log file");
fmt()
.with_env_filter(env_filter)
.with_writer(file)
.with_target(true)
.with_thread_names(true)
.with_line_number(true)
.init();
} else {
fmt()
.with_env_filter(env_filter)
.with_target(false) .with_thread_names(false) .with_line_number(false) .without_time() .compact() .init();
}
}
#[allow(dead_code)]
pub fn setup_minimal_logging() {
#[allow(unused_imports)]
use tracing_subscriber::{EnvFilter, fmt, util::SubscriberInitExt};
let _ = fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_target(false)
.compact() .try_init();
}