use super::{Executable, default_tracing};
use crate::common::{
connect_and_check_version, handle_dataflow_result, long_context, query_running_dataflows, rpc,
};
use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST};
use dora_message::{
cli_to_coordinator::CoordinatorControlClient, coordinator_to_cli::StopDataflowReply,
};
use duration_str::parse;
use eyre::Context;
use std::net::IpAddr;
use std::time::Duration;
use uuid::Uuid;
#[derive(Debug, clap::Args)]
pub struct Stop {
uuid: Option<Uuid>,
#[clap(long)]
name: Option<String>,
#[clap(
long,
value_name = "DURATION",
group = "strategy",
verbatim_doc_comment
)]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
#[clap(short, long, action, group = "strategy")]
force: bool,
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
}
impl Executable for Stop {
async fn execute(self) -> eyre::Result<()> {
default_tracing()?;
let client = connect_and_check_version(self.coordinator_addr, self.coordinator_port)
.await
.wrap_err("could not connect to dora coordinator")?;
match (self.uuid, self.name) {
(Some(uuid), _) => stop_dataflow(uuid, self.grace_duration, self.force, &client).await,
(None, Some(name)) => {
stop_dataflow_by_name(name, self.grace_duration, self.force, &client).await
}
(None, None) => {
stop_dataflow_interactive(self.grace_duration, self.force, &client).await
}
}
}
}
async fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
force: bool,
client: &CoordinatorControlClient,
) -> eyre::Result<()> {
let list = query_running_dataflows(client)
.await
.wrap_err("failed to query running dataflows")?;
let active = list.get_active();
if active.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?;
stop_dataflow(selection.uuid, grace_duration, force, client).await?;
}
Ok(())
}
async fn stop_dataflow(
uuid: Uuid,
grace_duration: Option<Duration>,
force: bool,
client: &CoordinatorControlClient,
) -> Result<(), eyre::ErrReport> {
let StopDataflowReply { uuid, result } = rpc(
"stop dataflow",
client.stop(long_context(), uuid, grace_duration, force),
)
.await?;
handle_dataflow_result(result, Some(uuid))
}
async fn stop_dataflow_by_name(
name: String,
grace_duration: Option<Duration>,
force: bool,
client: &CoordinatorControlClient,
) -> Result<(), eyre::ErrReport> {
let StopDataflowReply { uuid, result } = rpc(
"stop dataflow by name",
client.stop_by_name(long_context(), name, grace_duration, force),
)
.await?;
handle_dataflow_result(result, Some(uuid))
}