use clap::Subcommand;
use serde::Serialize;
use super::get_admin_client;
use crate::exit_code::ExitCode;
use crate::output::Formatter;
use rc_core::admin::{AdminApi, RebalancePoolStatus, RebalanceStatus};
#[derive(Subcommand, Debug)]
pub enum RebalanceCommands {
Start(StartArgs),
Status(StatusArgs),
Stop(StopArgs),
}
#[derive(clap::Args, Debug)]
pub struct StartArgs {
pub alias: String,
}
#[derive(clap::Args, Debug)]
pub struct StatusArgs {
pub alias: String,
}
#[derive(clap::Args, Debug)]
pub struct StopArgs {
pub alias: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct RebalanceOperationOutput {
success: bool,
message: String,
target: String,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<String>,
}
pub async fn execute(cmd: RebalanceCommands, formatter: &Formatter) -> ExitCode {
match cmd {
RebalanceCommands::Start(args) => execute_start(args, formatter).await,
RebalanceCommands::Status(args) => execute_status(args, formatter).await,
RebalanceCommands::Stop(args) => execute_stop(args, formatter).await,
}
}
async fn execute_start(args: StartArgs, formatter: &Formatter) -> ExitCode {
let client = match get_admin_client(&args.alias, formatter) {
Ok(c) => c,
Err(code) => return code,
};
match client.rebalance_start().await {
Ok(result) => {
if formatter.is_json() {
formatter.json(&RebalanceOperationOutput {
success: true,
message: "Rebalance started successfully".to_string(),
target: args.alias,
id: Some(result.id),
});
} else {
formatter.success("Rebalance started successfully.");
if !result.id.is_empty() {
formatter.println(&format!(" ID: {}", result.id));
}
}
ExitCode::Success
}
Err(e) => {
formatter.error(&format!("Failed to start rebalance: {e}"));
ExitCode::GeneralError
}
}
}
async fn execute_status(args: StatusArgs, formatter: &Formatter) -> ExitCode {
let client = match get_admin_client(&args.alias, formatter) {
Ok(c) => c,
Err(code) => return code,
};
match client.rebalance_status().await {
Ok(status) => {
if formatter.is_json() {
formatter.json(&status);
} else {
print_rebalance_status(&status, formatter);
}
ExitCode::Success
}
Err(e) => {
formatter.error(&format!("Failed to get rebalance status: {e}"));
ExitCode::GeneralError
}
}
}
async fn execute_stop(args: StopArgs, formatter: &Formatter) -> ExitCode {
let client = match get_admin_client(&args.alias, formatter) {
Ok(c) => c,
Err(code) => return code,
};
match client.rebalance_stop().await {
Ok(()) => {
if formatter.is_json() {
formatter.json(&RebalanceOperationOutput {
success: true,
message: "Rebalance stopped successfully".to_string(),
target: args.alias,
id: None,
});
} else {
formatter.success("Rebalance stopped successfully.");
}
ExitCode::Success
}
Err(e) => {
formatter.error(&format!("Failed to stop rebalance: {e}"));
ExitCode::GeneralError
}
}
}
fn print_rebalance_status(status: &RebalanceStatus, formatter: &Formatter) {
formatter.println(&format!(
"{} {}",
formatter.style_name("Rebalance:"),
if status.id.is_empty() {
formatter.style_date("not started")
} else {
formatter.style_size("configured")
}
));
if !status.id.is_empty() {
formatter.println(&format!(" ID: {}", status.id));
}
if let Some(stopped_at) = &status.stopped_at {
formatter.println(&format!(" Stopped: {}", stopped_at));
}
if status.pools.is_empty() {
formatter.println(" No pool status found.");
return;
}
formatter.println("");
formatter.println(&formatter.style_name("Per-pool usage:"));
for pool in &status.pools {
print_rebalance_pool(pool, formatter);
}
}
fn print_rebalance_pool(pool: &RebalancePoolStatus, formatter: &Formatter) {
let status = if pool.status.is_empty() {
"idle"
} else {
pool.status.as_str()
};
formatter.println(&format!(
" Pool {}: {:.2}% used ({})",
pool.id,
pool.used * 100.0,
style_status(status, formatter)
));
if let Some(error) = &pool.last_error
&& !error.is_empty()
{
formatter.println(&format!(" Last error: {error}"));
}
if let Some(progress) = &pool.progress {
formatter.println(&format!(
" Progress: {} moved, {} objects, {} versions",
format_bytes(progress.bytes),
progress.num_objects,
progress.num_versions
));
formatter.println(&format!(
" Remaining: {} buckets, ETA {}",
progress.remaining_buckets,
format_duration(progress.eta)
));
if !progress.bucket.is_empty() {
formatter.println(&format!(
" Current: {}/{}",
progress.bucket, progress.object
));
}
formatter.println(&format!(
" Elapsed: {}",
format_duration(progress.elapsed)
));
}
}
fn style_status(status: &str, formatter: &Formatter) -> String {
match status {
"Started" | "running" => formatter.style_name(status),
"Stopped" | "stopped" => formatter.theme().warning.apply_to(status).to_string(),
"Completed" | "complete" => formatter.style_size(status),
"idle" => formatter.style_date(status),
_ => status.to_string(),
}
}
fn format_duration(seconds: u64) -> String {
let hours = seconds / 3600;
let minutes = (seconds % 3600) / 60;
let secs = seconds % 60;
if hours > 0 {
format!("{hours}h {minutes}m {secs}s")
} else if minutes > 0 {
format!("{minutes}m {secs}s")
} else {
format!("{secs}s")
}
}
fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = KB * 1024;
const GB: u64 = MB * 1024;
const TB: u64 = GB * 1024;
if bytes >= TB {
format!("{:.2} TiB", bytes as f64 / TB as f64)
} else if bytes >= GB {
format!("{:.2} GiB", bytes as f64 / GB as f64)
} else if bytes >= MB {
format!("{:.2} MiB", bytes as f64 / MB as f64)
} else if bytes >= KB {
format!("{:.2} KiB", bytes as f64 / KB as f64)
} else {
format!("{bytes} B")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_duration() {
assert_eq!(format_duration(0), "0s");
assert_eq!(format_duration(65), "1m 5s");
assert_eq!(format_duration(3661), "1h 1m 1s");
}
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(0), "0 B");
assert_eq!(format_bytes(1024), "1.00 KiB");
assert_eq!(format_bytes(1024 * 1024), "1.00 MiB");
}
}