use crate::cmd::{load_identity, Authority, AxCliCommand};
use ax_core::{
node_connection::{connect, mk_swarm, request_single, Task},
util::formats::{ActyxOSCode, ActyxOSError, ActyxOSResult, AdminRequest, AdminResponse, TopicDeleteResponse},
};
use comfy_table::{presets::UTF8_FULL_CONDENSED, Cell, Table};
use futures::{channel::mpsc, future::join_all, stream};
use serde::{Deserialize, Serialize};
use std::time::Duration;
pub struct TopicsDelete;
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "connection")]
pub enum DeleteOutput {
Reachable {
host: String,
response: TopicDeleteResponse,
},
Unreachable {
host: String,
},
Unauthorized {
host: String,
},
Error {
host: String,
error: ActyxOSError,
},
}
async fn request(timeout: u8, mut conn: mpsc::Sender<Task>, authority: Authority, topic_name: String) -> DeleteOutput {
let host = authority.original.clone();
let response = tokio::time::timeout(Duration::from_secs(timeout.into()), async move {
let peer = connect(&mut conn, authority).await?;
request_single(
&mut conn,
move |tx| Task::Admin(peer, AdminRequest::TopicDelete { name: topic_name }, tx),
Ok,
)
.await
})
.await;
if let Ok(response) = response {
match response {
Ok(AdminResponse::TopicDeleteResponse(response)) => DeleteOutput::Reachable { host, response },
Ok(response) => DeleteOutput::Error {
host,
error: ActyxOSError::internal(format!("Unexpected response from node: {:?}", response)),
},
Err(error) if error.code() == ActyxOSCode::ERR_NODE_UNREACHABLE => DeleteOutput::Unreachable { host },
Err(error) if error.code() == ActyxOSCode::ERR_UNAUTHORIZED => DeleteOutput::Unauthorized { host },
Err(error) => DeleteOutput::Error { host, error },
}
} else {
DeleteOutput::Error {
host,
error: ActyxOSError::new(ax_core::util::formats::ActyxOSCode::ERR_NODE_UNREACHABLE, "timeout"),
}
}
}
async fn delete_run(opts: DeleteOpts) -> ActyxOSResult<Vec<DeleteOutput>> {
let identity = load_identity(&opts.identity)?;
let timeout = opts.timeout;
let (task, channel) = mk_swarm(identity).await?;
tokio::spawn(task);
Ok(join_all(
opts.authority
.into_iter()
.map(|a| request(timeout, channel.clone(), a, opts.topic.clone()))
.collect::<Vec<_>>(),
)
.await)
}
impl AxCliCommand for TopicsDelete {
type Opt = DeleteOpts;
type Output = Vec<DeleteOutput>;
fn run(
opts: Self::Opt,
) -> Box<dyn futures::Stream<Item = ax_core::util::formats::ActyxOSResult<Self::Output>> + Unpin> {
let requests = Box::pin(delete_run(opts));
Box::new(stream::once(requests))
}
fn pretty(result: Self::Output) -> String {
let mut table = Table::new();
table
.load_preset(UTF8_FULL_CONDENSED)
.set_header(["NODE ID", "HOST", "DELETED"]);
for output in result {
match output {
DeleteOutput::Reachable { host, response } => {
table.add_row([
Cell::new(response.node_id),
Cell::new(host),
Cell::new(if response.deleted { "Y" } else { "N" }),
]);
}
DeleteOutput::Unreachable { host } => {
table.add_row([Cell::new("AX was unreachable on host"), Cell::new(host)]);
}
DeleteOutput::Unauthorized { host } => {
table.add_row([Cell::new("Unauthorized on host"), Cell::new(host)]);
}
DeleteOutput::Error { host, error } => {
table.add_row([
Cell::new(format!("Received error \"{}\" from host", error)),
Cell::new(host),
]);
}
}
}
table.to_string()
}
}
#[derive(clap::Parser, Clone, Debug)]
pub struct DeleteOpts {
#[arg(required = true)]
topic: String,
#[arg(name = "NODE", required = true)]
authority: Vec<Authority>,
#[arg(short, long)]
identity: Option<String>,
#[arg(short, long, default_value = "5")]
timeout: u8,
}