use chrono::{DateTime, Local};
use clap::{Parser, Subcommand};
use dragonfly_api::dfdaemon::v2::{DeleteTaskRequest, ListLocalTasksRequest};
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::net::preferred_local_ip;
use std::path::PathBuf;
use tabled::{
settings::{object::Rows, Alignment, Modify, Style},
Table, Tabled,
};
use termion::{color, style};
use tracing::Level;
use super::*;
#[derive(Debug, Clone, Parser)]
pub struct TaskCommand {
#[command(subcommand)]
pub subcommand: TaskSubCommand,
}
#[derive(Debug, Clone, Subcommand)]
pub enum TaskSubCommand {
#[command(
name = "ls",
author,
version,
about = "List tasks",
long_about = "List all tasks managed by the local dfdaemon."
)]
Ls(LsCommand),
#[command(
name = "rm",
author,
version,
about = "Remove a task",
long_about = "Remove a specific task by ID from the local dfdaemon."
)]
Rm(RmCommand),
}
impl TaskCommand {
pub async fn execute(self) -> Result<()> {
match self.subcommand {
TaskSubCommand::Ls(cmd) => cmd.execute().await,
TaskSubCommand::Rm(cmd) => cmd.execute().await,
}
}
}
#[derive(Debug, Clone, Parser)]
pub struct LsCommand {
#[arg(
short = 'e',
long = "endpoint",
default_value_os_t = dfdaemon::default_download_unix_socket_path(),
help = "Endpoint of dfdaemon's GRPC server"
)]
endpoint: PathBuf,
#[arg(
short = 'l',
long,
default_value = "info",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,
#[arg(long, default_value_t = false, help = "Specify whether to print log")]
console: bool,
}
impl LsCommand {
pub async fn execute(&self) -> Result<()> {
let _guards = init_command_tracing(self.log_level, self.console);
let dfdaemon_download_client =
match get_dfdaemon_download_client(self.endpoint.clone()).await {
Ok(client) => client,
Err(err) => {
println!(
"{}{}{}Connect Dfdaemon Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Message:{} can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
err,
self.endpoint.to_string_lossy(),
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
std::process::exit(1);
}
};
if let Err(err) = self.run(dfdaemon_download_client).await {
match err {
Error::TonicStatus(status) => {
println!(
"{}{}{}Listing Tasks Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
);
println!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Bad Code:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
status.code()
);
println!(
"{}{}{}Message:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
status.message()
);
println!(
"{}{}{}Details:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
std::str::from_utf8(status.details()).unwrap()
);
println!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
err => {
println!(
"{}{}{}Listing Tasks Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Message:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
err
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
}
std::process::exit(1);
}
Ok(())
}
async fn run(
&self,
dfdaemon_download_client: dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient,
) -> Result<()> {
let response = dfdaemon_download_client
.list_local_tasks(ListLocalTasksRequest {
remote_ip: preferred_local_ip().map(|ip| ip.to_string()),
})
.await?;
#[derive(Debug, Default, Tabled)]
#[tabled(rename_all = "UPPERCASE")]
struct TaskRow {
id: String,
#[tabled(rename = "PIECE LENGTH")]
piece_length: String,
#[tabled(rename = "CONTENT LENGTH")]
content_length: String,
#[tabled(rename = "CREATED")]
created_at: String,
#[tabled(rename = "FINISHED")]
finished_at: String,
#[tabled(rename = "FAILED")]
failed_at: String,
#[tabled(rename = "UPDATED")]
updated_at: String,
}
let mut rows: Vec<TaskRow> = Vec::new();
for task in response.tasks {
let mut row = TaskRow {
id: task.task_id.clone(),
piece_length: bytesize::to_string(task.piece_length.unwrap_or_default(), true),
content_length: bytesize::to_string(task.content_length.unwrap_or_default(), true),
created_at: "-".to_string(),
finished_at: "-".to_string(),
failed_at: "-".to_string(),
updated_at: "-".to_string(),
};
if let Some(ts) = task.created_at {
if let Some(dt) = DateTime::from_timestamp(ts.seconds, ts.nanos as u32) {
row.created_at = dt
.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S")
.to_string();
}
}
if let Some(ts) = task.finished_at {
if let Some(dt) = DateTime::from_timestamp(ts.seconds, ts.nanos as u32) {
row.finished_at = dt
.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S")
.to_string();
}
}
if let Some(ts) = task.failed_at {
if let Some(dt) = DateTime::from_timestamp(ts.seconds, ts.nanos as u32) {
row.failed_at = dt
.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S")
.to_string();
}
}
if let Some(ts) = task.updated_at {
if let Some(dt) = DateTime::from_timestamp(ts.seconds, ts.nanos as u32) {
row.updated_at = dt
.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S")
.to_string();
}
}
rows.push(row);
}
let mut table = Table::new(rows);
table
.with(Style::blank())
.with(Modify::new(Rows::first()).with(Alignment::left()));
println!("{table}");
Ok(())
}
}
#[derive(Debug, Clone, Parser)]
pub struct RmCommand {
#[arg(help = "Specify the task ID to remove")]
id: String,
#[arg(
short = 'e',
long = "endpoint",
default_value_os_t = dfdaemon::default_download_unix_socket_path(),
help = "Endpoint of dfdaemon's GRPC server"
)]
endpoint: PathBuf,
#[arg(
short = 'l',
long,
default_value = "info",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,
#[arg(long, default_value_t = false, help = "Specify whether to print log")]
console: bool,
}
impl RmCommand {
pub async fn execute(&self) -> Result<()> {
let _guards = init_command_tracing(self.log_level, self.console);
let dfdaemon_download_client =
match get_dfdaemon_download_client(self.endpoint.clone()).await {
Ok(client) => client,
Err(err) => {
println!(
"{}{}{}Connect Dfdaemon Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Message:{} can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
err,
self.endpoint.to_string_lossy(),
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
std::process::exit(1);
}
};
if let Err(err) = self.run(dfdaemon_download_client).await {
match err {
Error::TonicStatus(status) => {
println!(
"{}{}{}Removing Task Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
);
println!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Bad Code:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
status.code()
);
println!(
"{}{}{}Message:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
status.message()
);
println!(
"{}{}{}Details:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
std::str::from_utf8(status.details()).unwrap()
);
println!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
err => {
println!(
"{}{}{}Removing Task Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
println!(
"{}{}{}Message:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
err
);
println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
}
std::process::exit(1);
}
Ok(())
}
async fn run(
&self,
dfdaemon_download_client: dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient,
) -> Result<()> {
dfdaemon_download_client
.delete_task(DeleteTaskRequest {
task_id: self.id.clone(),
remote_ip: preferred_local_ip().map(|ip| ip.to_string()),
})
.await?;
println!(
"{}{}{}Task Removed!{}",
color::Fg(color::Green),
style::Italic,
style::Bold,
style::Reset
);
Ok(())
}
}