use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::{fs, io};
use app::configuration::{
ClusterConfig, Configuration, GlobalConfig, InternalConfig, Workspace, YozefuConfig,
};
use app::search::ValidSearchQuery;
use app::App;
use clap::error::ErrorKind;
use clap::{CommandFactory, Parser};
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use itertools::Itertools;
use lib::Error;
use rdkafka::consumer::BaseConsumer;
use strum::{Display, EnumString};
use tracing::{debug, info, warn};
use tui::Theme;
use tui::error::TuiError;
use tui::{State, Ui};
use crate::headless::Headless;
use crate::headless::formatter::{
JsonFormatter, KafkaFormatter, PlainFormatter, SimpleFormatter, TransposeFormatter,
};
use crate::log::{init_logging_file, init_logging_stderr};
use crate::theme::update_themes;
use crate::{APPLICATION_NAME, Cli, Cluster, GlobalArgs};
fn parse_cluster<T>(s: &str) -> Result<T, Error>
where
T: Cluster,
{
s.parse()
.map_err(|e: <T as FromStr>::Err| Error::Error(e.to_string()))
}
#[derive(Parser, Clone)]
#[command(author, version, about, long_about = None, propagate_version = true)]
pub struct MainCommand<T>
where
T: Cluster,
{
#[clap(short, long)]
pub debug: bool,
#[clap(short = 'c', short_alias='e', alias="environment", long, value_parser = parse_cluster::<T>, default_value_t, hide_default_value=true)]
cluster: T,
#[clap(long)]
#[clap(
short,
long,
alias = "topic",
group = "topic",
use_value_delimiter = true,
value_delimiter = ','
)]
pub topics: Vec<String>,
#[clap(short, long)]
pub properties: Vec<String>,
#[clap(long)]
pub headless: bool,
query: Vec<String>,
#[clap(long)]
pub theme: Option<String>,
#[clap(long, requires = "headless")]
pub format: Option<KafkaFormatterOption>,
#[clap(long, requires = "headless")]
pub disable_progress: bool,
#[clap(long, requires = "headless")]
pub export: bool,
#[clap(short, long)]
pub output: Option<PathBuf>,
#[command(flatten)]
pub global: GlobalArgs,
#[clap(skip)]
pub(crate) logs_file: Option<PathBuf>,
}
#[derive(Debug, Clone, EnumString, Display)]
#[strum(serialize_all = "lowercase")]
pub enum KafkaFormatterOption {
Transpose,
Simple,
Plain,
Human,
Json,
Log,
}
impl<T> MainCommand<T>
where
T: Cluster,
{
pub async fn execute(self, mut yozefu_config: YozefuConfig) -> Result<(), TuiError> {
for property in &self.properties {
match property.split_once('=') {
Some((key, value)) => {
yozefu_config.set_kafka_property(key, value);
}
None => {
return Err(TuiError::from(Error::Error(format!(
"Invalid kafka property '{property}', expected a '=' symbol to separate the property and its value."
))));
}
}
}
match self.headless {
true => {
let _ = init_logging_stderr(self.debug);
self.headless(&yozefu_config)
.await
.map_err(std::convert::Into::into)
}
false => {
self.tui(&yozefu_config).await
}
}
}
pub(crate) fn cluster(&self) -> T {
self.cluster.clone()
}
pub fn yozefu_config(&self) -> Result<YozefuConfig, Error> {
let cluster_config = self.cluster_config(&self.cluster)?;
Ok(cluster_config.create(&self.cluster.to_string()))
}
fn query(&self, initial_query: &str) -> Result<String, Error> {
let q = self.query.join(" ").trim().to_string();
if q.is_empty() {
return Ok(initial_query.to_string());
}
if q == "-" {
info!("Reading query from stdin");
let mut buffer = String::new();
io::stdin().read_line(&mut buffer)?;
return Ok(buffer);
}
match q.starts_with('@') {
true => {
let query_file = Path::new(&q[1..]);
fs::read_to_string(query_file).map_err(|e| {
Error::Error(format!(
"Cannot read search query from file {:?}: {}",
query_file.display(),
e
))
})
}
false => Ok(q),
}
}
fn workspace(&self, yozefu_config: &YozefuConfig) -> Result<Workspace, Error> {
let workspace = self.global.workspace();
Ok(Workspace::new(
&workspace.path,
workspace.config().clone(),
yozefu_config
.log_file
.clone()
.unwrap_or(workspace.log_file()),
))
}
fn themes(file: &Path) -> Result<HashMap<String, Theme>, Error> {
let content = fs::read_to_string(file)?;
let themes: HashMap<String, Theme> = serde_json::from_str(&content).map_err(|e| {
Error::Error(format!(
"Error while parsing themes file '{}': {}",
file.display(),
e
))
})?;
Ok(themes)
}
fn cluster_config(&self, cluster: &T) -> Result<ClusterConfig, TuiError> {
let config = self.read_config()?;
let available_clusters = config.clusters.keys().collect_vec().into_iter().join(", ");
match self.cluster().to_string().is_empty() {
true => {
let mut cmd = Cli::<T>::command();
cmd.error(
ErrorKind::MissingRequiredArgument,
format!(
"Argument '--cluster' was not provided. Possible clusters: [{available_clusters}]"
),
)
.exit();
}
false => {
if !config.clusters.contains_key(&cluster.to_string()) {
return Err(Error::Error(format!(
"Unknown cluster '{cluster}'. Possible clusters: [{available_clusters}]."
))
.into());
}
}
}
Ok(config.clusters.get(&cluster.to_string()).unwrap().clone())
}
fn read_config(&self) -> Result<GlobalConfig, Error> {
let workspace = self.global.workspace();
match GlobalConfig::read(workspace.config_file().as_path()) {
Ok(config) => {
Ok(config)
}
Err(e) => Err(e),
}
}
async fn load_theme(workspace: &Workspace, name: &str) -> Result<Theme, Error> {
let file = &workspace.themes_file();
let mut themes = Self::themes(file)?;
if !themes.contains_key(name) {
info!("Theme '{name}' not found. About to update theme file.");
let _ = update_themes(workspace).await;
themes = Self::themes(file)?;
}
let theme = match themes.get(name) {
Some(theme) => theme,
None => {
update_themes(workspace).await?;
warn!(
"Theme '{}' not found. Available themes are [{}]. Make sure it is defined in '{}'",
name,
themes.keys().join(", "),
file.display()
);
let theme = themes.iter().next().unwrap().1;
info!(
"Since the theme was not found, I'm going to use the first available theme '{}'",
theme.name
);
theme
}
};
Ok(theme.clone())
}
fn internal_config(&self, yozefu_config: &YozefuConfig) -> Result<InternalConfig, Error> {
let workspace = self.workspace(yozefu_config)?;
Ok(InternalConfig::new(yozefu_config.clone(), workspace))
}
async fn tui(&self, yozefu_config: &YozefuConfig) -> Result<(), TuiError> {
let cluster = self.cluster();
let internal_config = self.internal_config(yozefu_config)?;
let query = self.query(internal_config.initial_query())?;
let _ = init_logging_file(self.debug, &internal_config.workspace().log_file());
let theme_name = self
.theme
.clone()
.unwrap_or(internal_config.theme().to_string());
let color_palette = Self::load_theme(internal_config.workspace(), &theme_name).await?;
let state = State::new(&cluster.to_string(), color_palette, &internal_config);
let mut ui = Ui::new(
self.app(&query, internal_config)?,
&query,
self.topics.clone(),
state.clone(),
);
Self::check_connection(yozefu_config)?;
ui.run(self.topics.clone(), state).await
}
fn check_connection(yozefu_config: &YozefuConfig) -> Result<(), Error> {
let _ = yozefu_config.create_kafka_consumer::<BaseConsumer>()?;
Ok(())
}
fn app(&self, query: &str, config: InternalConfig) -> Result<App, Error> {
debug!("{config:?}");
let search_query = ValidSearchQuery::from(query, &config.workspace().filters_dir())?;
Ok(App::new(self.cluster().to_string(), config, search_query))
}
async fn headless(&self, yozefu_config: &YozefuConfig) -> Result<(), Error> {
let internal_config = self.internal_config(yozefu_config)?;
let query = self.query(internal_config.initial_query())?;
let progress = ProgressBar::new(0);
let date = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
progress.set_draw_target(ProgressDrawTarget::hidden());
progress.set_style(
ProgressStyle::with_template(&format!(
"[{date} {{msg:.green}} headless] {{pos}} records read {{per_sec}}"
))
.map_err(|e| Error::Error(e.to_string()))?,
);
progress.set_message("INFO");
let topics = self.topics(yozefu_config)?;
if !self.disable_progress {
progress.set_draw_target(ProgressDrawTarget::stderr());
}
let app = Headless::new(
self.app(&query, internal_config)?,
&topics,
self.formatter(),
self.export,
progress,
);
self.print_full_command(&self.cluster().to_string(), &topics, &query);
app.run().await?;
self.print_full_command(&self.cluster().to_string(), &topics, &query);
Ok(())
}
fn print_full_command(&self, cluster: &str, topics: &[String], query: &str) {
if self.topics.is_empty() {
let binary = std::env::current_exe()
.map(|f| f.file_name().unwrap().to_str().unwrap().to_string())
.unwrap_or(APPLICATION_NAME.to_string());
info!(
"Executed command: {} -c {} --headless --topics {} '{}'",
binary,
cluster,
topics.join(","),
query
);
}
}
fn topics(&self, yozefu_config: &YozefuConfig) -> Result<Vec<String>, Error> {
if !self.topics.is_empty() {
return Ok(self.topics.clone());
}
let items = App::list_topics_from_client(yozefu_config)?;
println!(
"Select topics to consume:\n {}",
items.iter().take(20).join("\n ")
);
if items.len() > 20 {
println!("... and {} more", items.len() - 20);
}
std::process::exit(1)
}
fn formatter(&self) -> Box<dyn KafkaFormatter> {
match &self.format {
Some(d) => match d {
KafkaFormatterOption::Transpose => Box::new(TransposeFormatter::new()),
KafkaFormatterOption::Simple => Box::new(SimpleFormatter::new()),
KafkaFormatterOption::Plain => Box::new(PlainFormatter::new()),
KafkaFormatterOption::Json => Box::new(JsonFormatter::new()),
KafkaFormatterOption::Human => Box::new(SimpleFormatter::new()),
KafkaFormatterOption::Log => Box::new(PlainFormatter::new()),
},
None => Box::new(TransposeFormatter::new()),
}
}
}