use clap::Parser;
use dragonfly_api::common::v2::ObjectStorage;
use dragonfly_api::dfdaemon::v2::UploadPersistentTaskRequest;
use dragonfly_client_config::dfstore::default_dfstore_persistent_replica_count;
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use dragonfly_client_util::net::preferred_local_ip;
use indicatif::{ProgressBar, ProgressStyle};
use path_absolutize::*;
use std::path::{Path, PathBuf};
use std::time::Duration;
use termion::{color, style};
use tracing::info;
use url::Url;
use super::*;
const DEFAULT_PROGRESS_BAR_STEADY_TICK_INTERVAL: Duration = Duration::from_millis(80);
#[derive(Debug, Clone, Parser)]
pub struct ImportCommand {
#[arg(help = "Specify the path of the file to import")]
path: PathBuf,
#[arg(
long,
help = "Specify the URL for copying data to object storage. Format: scheme://<bucket>/<path>. Examples: s3://<bucket>/<path>, abs://<bucket>/<path>"
)]
url: Url,
#[arg(
long = "persistent-replica-count",
default_value_t = default_dfstore_persistent_replica_count(),
env = "DFSTORE_IMPORT_PERSISTENT_REPLICA_COUNT",
help = "Specify the replica count of the persistent cache task"
)]
persistent_replica_count: u64,
#[arg(
long = "ttl",
value_parser= humantime::parse_duration,
default_value = "1h",
env = "DFSTORE_IMPORT_TTL",
help = "Specify the ttl of the persistent cache task, maximum is 7d and minimum is 1m"
)]
ttl: Duration,
#[arg(
long = "timeout",
value_parser= humantime::parse_duration,
default_value = "30m",
env = "DFSTORE_IMPORT_TIMEOUT",
help = "Specify the timeout for importing a file"
)]
timeout: Duration,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_REGION",
help = "Specify the region for the Object Storage Service (e.g., us-east-1)"
)]
storage_region: Option<String>,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_ENDPOINT",
help = "Specify the endpoint URL for the Object Storage Service (e.g., https://s3.amazonaws.com)"
)]
storage_endpoint: Option<String>,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_ACCESS_KEY_ID",
help = "Specify the access key ID for authenticating with the Object Storage Service"
)]
storage_access_key_id: Option<String>,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_ACCESS_KEY_SECRET",
help = "Specify the secret access key for authenticating with the Object Storage Service"
)]
storage_access_key_secret: Option<String>,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_SECURITY_TOKEN",
help = "Specify the security token for the Object Storage Service"
)]
storage_security_token: Option<String>,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_INSECURE_SKIP_VERIFY",
help = "Specify whether to skip verify TLS certification for object storage service"
)]
storage_insecure_skip_verify: Option<bool>,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_SESSION_TOKEN",
help = "Specify the session token for Amazon Simple Storage Service(S3)"
)]
storage_session_token: Option<String>,
#[arg(
long,
env = "DFSTORE_IMPORT_STORAGE_CREDENTIAL_PATH",
help = "Specify the local path to the credential file which is used for OAuth2 authentication for Google Cloud Storage Service(GCS)"
)]
storage_credential_path: Option<String>,
#[arg(
long,
default_value = "publicRead",
env = "DFSTORE_IMPORT_STORAGE_PREDEFINED_ACL",
help = "Specify the predefined ACL for Google Cloud Storage Service(GCS)"
)]
storage_predefined_acl: Option<String>,
#[arg(
long,
default_value_t = false,
env = "DFSTORE_IMPORT_NO_PROGRESS",
help = "Specify whether to disable the progress bar display"
)]
no_progress: bool,
#[arg(
short = 'e',
long = "endpoint",
default_value_os_t = dfdaemon::default_download_unix_socket_path(),
help = "Endpoint of dfdaemon's GRPC server"
)]
endpoint: PathBuf,
#[arg(
short = 'l',
long,
default_value = "info",
env = "DFSTORE_IMPORT_LOG_LEVEL",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,
#[arg(
long,
default_value_t = false,
env = "DFSTORE_IMPORT_CONSOLE",
help = "Specify whether to print log"
)]
console: bool,
}
impl ImportCommand {
pub async fn execute(&self) -> Result<()> {
Args::parse();
let _guards = init_command_tracing(self.log_level, self.console);
if let Err(err) = self.validate_args() {
println!(
"{}{}{}Validating Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Message:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
err,
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
std::process::exit(1);
}
let dfdaemon_download_client =
match get_dfdaemon_download_client(self.endpoint.to_path_buf()).await {
Ok(client) => client,
Err(err) => {
println!(
"{}{}{}Connect Dfdaemon Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
err,
self.endpoint.to_string_lossy(),
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
std::process::exit(1);
}
};
if let Err(err) = self.run(dfdaemon_download_client).await {
match err {
Error::TonicStatus(status) => {
println!(
"{}{}{}Importing Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
);
println!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Bad Code:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
status.code()
);
println!(
"{}{}{}Message:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
status.message()
);
println!(
"{}{}{}Details:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
std::str::from_utf8(status.details()).unwrap()
);
println!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
err => {
println!(
"{}{}{}Importing Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Message:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
err
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
}
std::process::exit(1);
}
Ok(())
}
async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
let absolute_path = Path::new(&self.path).absolutize()?;
info!("import file: {}", absolute_path.to_string_lossy());
let progress_bar = if self.no_progress {
ProgressBar::hidden()
} else {
ProgressBar::new_spinner()
};
progress_bar.enable_steady_tick(DEFAULT_PROGRESS_BAR_STEADY_TICK_INTERVAL);
progress_bar.set_style(
ProgressStyle::with_template("{spinner:.blue} {msg}")
.unwrap()
.tick_strings(&["⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"]),
);
progress_bar.set_message("Importing...");
dfdaemon_download_client
.upload_persistent_task(UploadPersistentTaskRequest {
url: self.url.to_string(),
object_storage: Some(ObjectStorage {
region: self.storage_region.clone(),
endpoint: self.storage_endpoint.clone(),
access_key_id: self.storage_access_key_id.clone(),
access_key_secret: self.storage_access_key_secret.clone(),
security_token: self.storage_security_token.clone(),
session_token: self.storage_session_token.clone(),
credential_path: self.storage_credential_path.clone(),
predefined_acl: self.storage_predefined_acl.clone(),
insecure_skip_verify: self.storage_insecure_skip_verify,
}),
path: absolute_path.to_string_lossy().to_string(),
persistent_replica_count: self.persistent_replica_count,
ttl: Some(
prost_wkt_types::Duration::try_from(self.ttl).or_err(ErrorType::ParseError)?,
),
timeout: Some(
prost_wkt_types::Duration::try_from(self.timeout)
.or_err(ErrorType::ParseError)?,
),
remote_ip: preferred_local_ip().map(|ip| ip.to_string()),
})
.await?;
progress_bar.finish_with_message(format!("Done: {}", self.url));
Ok(())
}
fn validate_args(&self) -> Result<()> {
if self.ttl < Duration::from_secs(5 * 60)
|| self.ttl > Duration::from_secs(7 * 24 * 60 * 60)
{
return Err(Error::ValidationError(format!(
"ttl must be between 5 minutes and 7 days, but got {}",
self.ttl.as_secs()
)));
}
if self.path.is_dir() {
return Err(Error::ValidationError(format!(
"path {} is a directory",
self.path.display()
)));
}
if !self.path.exists() {
return Err(Error::ValidationError(format!(
"path {} does not exist",
self.path.display()
)));
}
Ok(())
}
}