use std::{collections::BTreeMap, path::PathBuf};
use anyhow::{Result, anyhow};
use clap::{Args, Subcommand, ValueEnum};
use eventdbx::config::{
CsvPluginConfig, HttpPluginConfig, JsonPluginConfig, LogPluginConfig, PluginConfig,
PluginDefinition, PluginKind, PostgresColumnConfig, PostgresPluginConfig, SqlitePluginConfig,
TcpPluginConfig, load_or_default,
};
#[derive(Subcommand)]
pub enum PluginCommands {
#[command(name = "postgres")]
PostgresConfigure(PluginPostgresConfigureArgs),
#[command(name = "sqlite")]
SqliteConfigure(PluginSqliteConfigureArgs),
#[command(name = "csv")]
CsvConfigure(PluginCsvConfigureArgs),
#[command(name = "tcp")]
TcpConfigure(PluginTcpConfigureArgs),
#[command(name = "http")]
HttpConfigure(PluginHttpConfigureArgs),
#[command(name = "json")]
JsonConfigure(PluginJsonConfigureArgs),
#[command(name = "log")]
LogConfigure(PluginLogConfigureArgs),
#[command(name = "map")]
Map(PluginMapArgs),
}
#[derive(Args)]
pub struct PluginPostgresConfigureArgs {
#[arg(long = "connection")]
pub connection: String,
#[arg(long, default_value_t = false)]
pub disable: bool,
}
#[derive(Args)]
pub struct PluginSqliteConfigureArgs {
#[arg(long)]
pub path: PathBuf,
#[arg(long, default_value_t = false)]
pub disable: bool,
}
#[derive(Args)]
pub struct PluginCsvConfigureArgs {
#[arg(long)]
pub output_dir: PathBuf,
#[arg(long, default_value_t = false)]
pub disable: bool,
}
#[derive(Args)]
pub struct PluginTcpConfigureArgs {
#[arg(long)]
pub host: String,
#[arg(long)]
pub port: u16,
#[arg(long, default_value_t = false)]
pub disable: bool,
}
#[derive(Args)]
pub struct PluginHttpConfigureArgs {
#[arg(long)]
pub endpoint: String,
#[arg(long = "header", value_parser = parse_key_value, value_name = "KEY=VALUE")]
pub headers: Vec<KeyValue>,
#[arg(long, default_value_t = false)]
pub disable: bool,
}
#[derive(Args)]
pub struct PluginJsonConfigureArgs {
#[arg(long)]
pub path: PathBuf,
#[arg(long, default_value_t = false)]
pub pretty: bool,
#[arg(long, default_value_t = false)]
pub disable: bool,
}
#[derive(Args)]
pub struct PluginLogConfigureArgs {
#[arg(long, default_value = "info")]
pub level: String,
#[arg(long)]
pub template: Option<String>,
#[arg(long, default_value_t = false)]
pub disable: bool,
}
#[derive(Debug, Clone)]
pub struct KeyValue {
pub key: String,
pub value: String,
}
#[derive(Args)]
pub struct PluginMapArgs {
#[arg(long, value_enum)]
pub plugin: Option<PluginTarget>,
#[arg(long)]
pub aggregate: String,
#[arg(long)]
pub field: String,
#[arg(long = "datatype")]
pub data_type: String,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
#[clap(rename_all = "lowercase")]
pub enum PluginTarget {
Postgres,
Sqlite,
Csv,
Tcp,
Http,
Json,
Log,
}
impl From<PluginTarget> for PluginKind {
fn from(value: PluginTarget) -> Self {
match value {
PluginTarget::Postgres => PluginKind::Postgres,
PluginTarget::Sqlite => PluginKind::Sqlite,
PluginTarget::Csv => PluginKind::Csv,
PluginTarget::Tcp => PluginKind::Tcp,
PluginTarget::Http => PluginKind::Http,
PluginTarget::Json => PluginKind::Json,
PluginTarget::Log => PluginKind::Log,
}
}
}
pub fn execute(config_path: Option<PathBuf>, command: PluginCommands) -> Result<()> {
let (mut config, path) = load_or_default(config_path)?;
match command {
PluginCommands::PostgresConfigure(args) => {
let existing_mapping = config
.plugins
.iter()
.find_map(|def| match &def.config {
PluginConfig::Postgres(settings) => Some(settings.field_mappings.clone()),
_ => None,
})
.unwrap_or_default();
let definition = PluginDefinition {
enabled: !args.disable,
config: PluginConfig::Postgres(PostgresPluginConfig {
connection_string: args.connection,
field_mappings: existing_mapping,
}),
};
config.set_plugin(definition);
config.ensure_data_dir()?;
config.save(&path)?;
if args.disable {
println!("Postgres plugin disabled");
} else {
println!("Postgres plugin configured");
}
}
PluginCommands::SqliteConfigure(args) => {
let definition = PluginDefinition {
enabled: !args.disable,
config: PluginConfig::Sqlite(SqlitePluginConfig {
path: args.path.clone(),
}),
};
config.set_plugin(definition);
config.ensure_data_dir()?;
config.save(&path)?;
if args.disable {
println!("SQLite plugin disabled");
} else {
println!("SQLite plugin configured");
}
}
PluginCommands::CsvConfigure(args) => {
let definition = PluginDefinition {
enabled: !args.disable,
config: PluginConfig::Csv(CsvPluginConfig {
output_dir: args.output_dir.clone(),
}),
};
config.set_plugin(definition);
config.ensure_data_dir()?;
config.save(&path)?;
if args.disable {
println!("CSV plugin disabled");
} else {
println!("CSV plugin configured");
}
}
PluginCommands::TcpConfigure(args) => {
let definition = PluginDefinition {
enabled: !args.disable,
config: PluginConfig::Tcp(TcpPluginConfig {
host: args.host,
port: args.port,
}),
};
config.set_plugin(definition);
config.ensure_data_dir()?;
config.save(&path)?;
if args.disable {
println!("TCP plugin disabled");
} else {
println!("TCP plugin configured");
}
}
PluginCommands::HttpConfigure(args) => {
let mut headers = BTreeMap::new();
for entry in args.headers {
headers.insert(entry.key, entry.value);
}
let definition = PluginDefinition {
enabled: !args.disable,
config: PluginConfig::Http(HttpPluginConfig {
endpoint: args.endpoint,
headers,
}),
};
config.set_plugin(definition);
config.ensure_data_dir()?;
config.save(&path)?;
if args.disable {
println!("HTTP plugin disabled");
} else {
println!("HTTP plugin configured");
}
}
PluginCommands::JsonConfigure(args) => {
let definition = PluginDefinition {
enabled: !args.disable,
config: PluginConfig::Json(JsonPluginConfig {
path: args.path,
pretty: args.pretty,
}),
};
config.set_plugin(definition);
config.ensure_data_dir()?;
config.save(&path)?;
if args.disable {
println!("JSON plugin disabled");
} else {
println!("JSON plugin configured");
}
}
PluginCommands::LogConfigure(args) => {
let definition = PluginDefinition {
enabled: !args.disable,
config: PluginConfig::Log(LogPluginConfig {
level: args.level.clone(),
template: args.template.clone(),
}),
};
config.set_plugin(definition);
config.ensure_data_dir()?;
config.save(&path)?;
if args.disable {
println!("Log plugin disabled");
} else {
println!("Log plugin configured");
}
}
PluginCommands::Map(args) => match args.plugin {
None => {
config.set_column_type(&args.aggregate, &args.field, args.data_type.clone());
config.ensure_data_dir()?;
config.save(&path)?;
println!(
"Mapped base {}.{} as {}",
args.aggregate, args.field, args.data_type
);
}
Some(plugin) => match PluginKind::from(plugin) {
PluginKind::Postgres => {
let definition = config
.plugins
.iter_mut()
.find(|def| matches!(def.config, PluginConfig::Postgres(_)))
.ok_or_else(|| {
anyhow!(
"configure postgres plugin before mapping fields with `eventdbx plugin postgres --connection=...`"
)
})?;
match &mut definition.config {
PluginConfig::Postgres(settings) => {
let mut field_config = PostgresColumnConfig::default();
field_config.data_type = Some(args.data_type.clone());
settings
.field_mappings
.entry(args.aggregate.clone())
.or_default()
.insert(args.field.clone(), field_config);
config.ensure_data_dir()?;
config.save(&path)?;
println!(
"Mapped {}.{} as {}",
args.aggregate, args.field, args.data_type
);
}
_ => {
return Err(anyhow!(
"unexpected plugin configuration variant; reconfigure the postgres plugin and try again"
));
}
}
}
PluginKind::Sqlite => {
return Err(anyhow!(
"field mapping is not supported for the SQLite plugin"
));
}
PluginKind::Csv => {
return Err(anyhow!("field mapping is not supported for the CSV plugin"));
}
PluginKind::Tcp | PluginKind::Http | PluginKind::Json | PluginKind::Log => {
return Err(anyhow!(
"field mapping is only supported for the Postgres plugin"
));
}
},
},
}
Ok(())
}
fn parse_key_value(raw: &str) -> Result<KeyValue, String> {
let mut parts = raw.splitn(2, '=');
let key = parts
.next()
.ok_or_else(|| "missing key".to_string())?
.trim()
.to_string();
let value = parts
.next()
.ok_or_else(|| "missing value".to_string())?
.trim()
.to_string();
if key.is_empty() {
return Err("header key cannot be empty".to_string());
}
Ok(KeyValue { key, value })
}