#![allow(clippy::upper_case_acronyms)]
#[macro_use]
extern crate failure;
#[macro_use]
extern crate prettytable;
extern crate base64;
extern crate exitfailure;
use clap::{Parser, Subcommand};
use git_version::git_version;
use prettytable::Table;
use rand::seq::SliceRandom;
use std::collections::HashSet;
use std::error::Error;
use std::fs::read_to_string;
use std::sync::Arc;
use uuid::Uuid;
use zenoh::prelude::r#async::*;
use zenoh_flow::runtime::resources::DataStore;
use zenoh_flow::runtime::DaemonInterfaceClient;
const GIT_VERSION: &str = git_version!(prefix = "v", cargo_prefix = "v");
const DEFAULT_ZENOH_CFG: &str = "/etc/zenoh-flow/zfctl-zenoh.json";
const ENV_ZENOH_CFG: &str = "ZFCTL_CFG";
#[derive(Subcommand, Debug)]
#[clap(about = "Creates new entities in Zenoh Flow")]
pub enum CreateKind {
#[clap(about = "Adds a new flow into the Zenoh Flow registry")]
Flow {
#[clap(
name = "Flow descriptor path",
help = "Upload the flow in the registry (Unimplemented)"
)]
descriptor_path: std::path::PathBuf,
},
#[clap(about = "Creates a new instance for the given flow")]
Instance {
#[clap(
name = "Flow descriptor path",
help = "Creates a new instance for the given flow"
)]
descriptor_path: std::path::PathBuf,
},
}
#[derive(Subcommand, Debug)]
#[clap(about = "Starts entities in Zenoh Flow")]
pub enum StartKind {
#[clap(about = "Starts the given node in the given instance")]
Node {
#[clap(
short,
long,
name = "instance uuid",
help = "The instance containing the node"
)]
instance_id: Uuid,
#[clap(short, long, name = "node id", help = "The node identifier")]
node_id: String,
},
#[clap(
about = "Start a replay for the given source, in the given instance, using the given key_expr for retrieving the data"
)]
#[clap(about = "Starts the given flow instance")]
Instance {
#[clap(name = "instance uuid", help = "The instance to be started")]
instance_id: Uuid,
},
}
#[derive(Subcommand, Debug)]
#[clap(about = "Stop entities in Zenoh Flow")]
pub enum StopKind {
#[clap(about = "Stops the given node in the given instance")]
Node {
#[clap(
short,
long,
name = "instance uuid",
help = "The instance containing the node"
)]
instance_id: Uuid,
#[clap(short, long, name = "node id", help = "The node identifier")]
node_id: String,
},
#[clap(about = "Stops the given replay, for the given source in the given instance")]
#[clap(about = "Stops the given flow instance")]
Instance {
#[clap(name = "instance uuid", help = "The instance to be stopped")]
instance_id: Uuid,
},
}
#[derive(Subcommand, Debug)]
#[clap(about = "Gets information about entities from Zenoh Flow")]
pub enum GetKind {
#[clap(about = "Gets information about the given flow")]
Flow {
#[clap(name = "flow identifier", help = "The flow you are interested in")]
id: String,
},
#[clap(about = "Gets information about the given instance")]
Instance {
#[clap(name = "instance uuid", help = "The instance you are interested in")]
id: Uuid,
},
#[clap(about = "Gets information about the given runtime")]
Runtime {
#[clap(name = "runtime uuid", help = "The runtime you are interested in")]
id: Uuid,
},
}
#[derive(Subcommand, Debug)]
#[clap(about = "Lists entities in Zenoh Flow")]
pub enum ListKind {
#[clap(about = "Lists the exiting flows")]
Flows,
#[clap(about = "Lists the exiting instances")]
Instances,
#[clap(about = "Lists the runtimes")]
Runtimes,
}
#[derive(Subcommand, Debug)]
#[clap(about = "Deletes entities from Zenoh Flow")]
pub enum DeleteKind {
#[clap(about = "Deletes the given flow")]
Flow {
#[clap(short, long, name = "flow identifier", help = "The flow to be deleted")]
id: String,
},
#[clap(about = "Deletes the given instance")]
Instance {
#[clap(name = "instance uuid", help = "The instance to be deleted")]
id: Uuid,
},
}
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
pub enum ZFCtl {
#[clap(subcommand)]
Create(CreateKind),
#[clap(subcommand)]
Delete(DeleteKind),
#[clap(subcommand)]
Get(GetKind),
#[clap(subcommand)]
List(ListKind),
#[clap(subcommand)]
Start(StartKind),
#[clap(subcommand)]
Stop(StopKind),
#[clap(about = "Creates and starts a flow instance")]
Launch {
#[clap(name = "Flow descriptor path", help = "Flow to be started")]
descriptor_path: std::path::PathBuf,
},
#[clap(about = "Stops and deletes a flow instance")]
Destroy {
#[clap(name = "instance uuid", help = "The instance to be destroyed")]
id: Uuid,
},
}
#[async_std::main]
async fn main() {
env_logger::try_init().unwrap_or_else(|_| log::warn!("`env_logger` already initialized"));
log::debug!("Eclipse Zenoh-Flow Ctl {}", GIT_VERSION);
let args = ZFCtl::parse();
log::debug!("Args: {:?}", args);
let zsession = Arc::new(get_zenoh().await.unwrap());
let store = DataStore::new(zsession.clone());
match args {
ZFCtl::Create(ak) => match ak {
CreateKind::Flow { descriptor_path } => {
println!(
"This is going to store the flow described in {:?}",
descriptor_path
);
}
CreateKind::Instance { descriptor_path } => {
log::trace!(
"This is going to store the flow described in {:?}",
descriptor_path
);
let yaml_df = read_to_string(descriptor_path).unwrap();
let df =
zenoh_flow::model::descriptor::DataFlowDescriptor::from_yaml(&yaml_df).unwrap();
let df = df.flatten().await.unwrap();
df.validate().unwrap();
let client = get_client(zsession.clone()).await;
let instance_uuid = client.create_instance(df).await.unwrap().unwrap();
log::debug!("Created: {:?}", instance_uuid);
println!("{}", instance_uuid);
} },
ZFCtl::Get(gk) => match gk {
GetKind::Flow { id } => {
println!("This is going to get information for the flow {:?}", id);
}
GetKind::Instance { id } => {
log::debug!("This is going to get information for the instance {:?}", id);
let mut table = Table::new();
let instance = store.get_flow_by_instance(&id).await.unwrap();
table.add_row(row![
"UUID",
"Flow",
"Operators",
"Sinks",
"Sources",
"Connectors",
"Links",
]);
table.add_row(row![
instance.uuid,
instance.flow,
instance
.operators
.values()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n"),
instance
.sinks
.values()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n"),
instance
.sources
.values()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n"),
instance
.connectors
.values()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n"),
instance
.links
.iter()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n")
]);
table.printstd();
}
GetKind::Runtime { id } => {
let mut table = Table::new();
let runtime_info = store.get_runtime_info(&id).await.unwrap();
let runtime_status = store.get_runtime_status(&id).await.unwrap();
table.add_row(row![
"UUID",
"Name",
"Status",
"Running Flows",
"Running Operators",
"Running Sources",
"Running Sinks",
"Running Connectors"
]);
table.add_row(row![
runtime_status.id,
runtime_info.name,
format!("{:?}", runtime_info.status),
runtime_status.running_flows,
runtime_status.running_operators,
runtime_status.running_sources,
runtime_status.running_sinks,
runtime_status.running_connectors,
]);
table.printstd();
}
},
ZFCtl::Delete(dk) => match dk {
DeleteKind::Flow { id } => {
println!("This is going to delete the flow {:?}", id);
}
DeleteKind::Instance { id } => {
log::debug!("This is going to delete the instance {:?}", id);
let client = get_client(zsession.clone()).await;
let record = client.delete_instance(id).await.unwrap().unwrap();
log::debug!("Deleted: {:?}", record);
println!("{}", record.uuid);
}
},
ZFCtl::Start(sk) => match sk {
StartKind::Node {
instance_id,
node_id,
} => {
let mut table = Table::new();
let client = get_client(zsession.clone()).await;
table.add_row(row!["UUID", "Name", "Status",]);
client
.start_node(instance_id, node_id.clone())
.await
.unwrap()
.unwrap();
table.add_row(row![instance_id, node_id, String::from("Running"),]);
table.printstd();
}
StartKind::Instance { instance_id } => {
log::debug!("This is going to start the instance {:?}", instance_id);
let client = get_client(zsession.clone()).await;
client.start_instance(instance_id).await.unwrap().unwrap();
log::debug!("Started: {:?}", instance_id);
println!("{}", instance_id);
}
},
ZFCtl::Stop(sk) => match sk {
StopKind::Node {
instance_id,
node_id,
} => {
let mut table = Table::new();
let client = get_client(zsession.clone()).await;
table.add_row(row!["UUID", "Name", "Status",]);
client
.stop_node(instance_id, node_id.clone())
.await
.unwrap()
.unwrap();
table.add_row(row![instance_id, node_id, String::from("Stopped"),]);
table.printstd();
}
StopKind::Instance { instance_id } => {
log::debug!("This is going to stop the instance {:?}", instance_id);
let client = get_client(zsession.clone()).await;
let record = client.stop_instance(instance_id).await.unwrap().unwrap();
log::debug!("stopeed: {:?}", record);
println!("{}", record.uuid);
}
},
ZFCtl::List(lk) => {
let mut table = Table::new();
match lk {
ListKind::Flows => {
panic!("Unimlemented")
}
ListKind::Instances => {
let instances = store.get_all_instances().await.unwrap();
table.add_row(row![
"UUID",
"Flow",
"# Operators",
"# Sinks",
"# Sources",
"# Connectors",
"# Links",
]);
let instances: HashSet<_> = instances.iter().collect();
for instance in instances {
table.add_row(row![
instance.uuid,
instance.flow,
instance.operators.len(),
instance.sinks.len(),
instance.sources.len(),
instance.connectors.len(),
instance.links.len(),
]);
}
}
ListKind::Runtimes => {
table.add_row(row!["UUID", "Name", "Status",]);
let runtimes = store.get_all_runtime_info().await.unwrap();
for r in runtimes {
table.add_row(row![r.id, r.name, format!("{:?}", r.status),]);
}
}
};
table.printstd();
}
ZFCtl::Launch { descriptor_path } => {
log::debug!(
"This is going to launch the flow described in {:?}",
descriptor_path
);
let yaml_df = read_to_string(descriptor_path).unwrap();
let df =
zenoh_flow::model::descriptor::DataFlowDescriptor::from_yaml(&yaml_df).unwrap();
let df = df.flatten().await.unwrap();
df.validate().unwrap();
let client = get_client(zsession.clone()).await;
let instance_uuid = client.instantiate(df).await.unwrap().unwrap();
log::debug!("Launched: {:?}", instance_uuid);
println!("{}", instance_uuid);
}
ZFCtl::Destroy { id } => {
log::debug!("This is going to destroy the instance {}", id);
let client = get_client(zsession.clone()).await;
let record = client.teardown(id).await.unwrap().unwrap();
log::debug!("Destroyed: {:?}", record);
println!("{}", record.uuid);
}
}
}
async fn get_zenoh() -> Result<Session, Box<dyn Error + Send + Sync + 'static>> {
let z_config_file = std::env::var(ENV_ZENOH_CFG).ok().unwrap_or_else(|| {
let mut config_path = dirs::home_dir().expect("Could not get $HOME directory, aborting.");
config_path.push(DEFAULT_ZENOH_CFG);
config_path
.into_os_string()
.into_string()
.expect("Invalid unicode data found while trying to get `zftcl-zenoh.json`")
});
let zconfig = zenoh::config::Config::from_file(z_config_file)?;
Ok(zenoh::open(zconfig).res().await.unwrap())
}
async fn get_client(zsession: Arc<Session>) -> DaemonInterfaceClient {
let servers = DaemonInterfaceClient::find_servers(zsession.clone())
.await
.unwrap();
let entry_point = servers.choose(&mut rand::thread_rng()).unwrap();
log::debug!("Selected entrypoint runtime: {:?}", entry_point);
DaemonInterfaceClient::new(zsession.clone(), *entry_point)
}