use crate::command::{Executable, default_tracing};
use crate::{
LOCALHOST,
common::{connect_to_coordinator_rpc, rpc},
};
use dora_core::descriptor::DescriptorExt;
use dora_core::{descriptor::Descriptor, topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT};
use dora_message::{
cli_to_coordinator::CoordinatorControlClient, coordinator_to_cli::DataflowStatus, tarpc,
};
use eyre::{Context, bail};
use std::{
io::{IsTerminal, Write},
net::SocketAddr,
};
use std::{net::IpAddr, path::PathBuf};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};
pub async fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {
let mut error_occurred = false;
let color_choice = if std::io::stdout().is_terminal() {
ColorChoice::Auto
} else {
ColorChoice::Never
};
let mut stdout = termcolor::StandardStream::stdout(color_choice);
let client =
match connect_to_coordinator_rpc(coordinator_addr.ip(), coordinator_addr.port()).await {
Ok(client) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
write!(stdout, "✓ ")?;
let _ = stdout.reset();
writeln!(stdout, "Coordinator: Running")?;
writeln!(
stdout,
" Address: {}:{}",
coordinator_addr.ip(),
coordinator_addr.port()
)?;
Some(client)
}
Err(_) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
write!(stdout, "✗ ")?;
let _ = stdout.reset();
writeln!(stdout, "Coordinator: Not running")?;
error_occurred = true;
None
}
};
let daemon_running_result = match client.as_ref() {
Some(c) => Some(daemon_running(c).await?),
None => None,
};
if daemon_running_result == Some(true) {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
write!(stdout, "✓ ")?;
let _ = stdout.reset();
writeln!(stdout, "Daemon: Running")?;
} else if client.is_some() {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
write!(stdout, "✗ ")?;
let _ = stdout.reset();
writeln!(stdout, "Daemon: Not running")?;
error_occurred = true;
} else {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
write!(stdout, "✗ ")?;
let _ = stdout.reset();
writeln!(stdout, "Daemon: Unknown (coordinator not available)")?;
error_occurred = true;
}
if let Some(ref c) = client {
if let Ok(count) = query_running_dataflow_count(c).await {
writeln!(stdout, "Active dataflows: {}", count)?;
}
}
if error_occurred {
bail!("System check failed.");
}
Ok(())
}
pub async fn daemon_running(client: &CoordinatorControlClient) -> Result<bool, eyre::ErrReport> {
rpc(
"check daemon connection",
client.daemon_connected(tarpc::context::current()),
)
.await
}
async fn query_running_dataflow_count(
client: &CoordinatorControlClient,
) -> Result<usize, eyre::ErrReport> {
let list = rpc("list dataflows", client.list(tarpc::context::current())).await?;
Ok(list
.0
.iter()
.filter(|d| d.status == DataflowStatus::Running)
.count())
}
#[derive(Debug, clap::Args)]
pub struct Status {
#[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: Option<PathBuf>,
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
}
impl Executable for Status {
async fn execute(self) -> eyre::Result<()> {
default_tracing()?;
match self.dataflow {
Some(dataflow) => {
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check_environment((self.coordinator_addr, self.coordinator_port).into()).await?
}
None => {
check_environment((self.coordinator_addr, self.coordinator_port).into()).await?
}
}
Ok(())
}
}