use std::path::PathBuf;
use anyhow::Result;
use clap::{Args, Subcommand};
use eventdbx::{
config::load_or_default,
error::EventError,
schema::{CreateSchemaInput, SchemaManager, SchemaUpdate},
};
#[derive(Subcommand)]
pub enum SchemaCommands {
Create(SchemaCreateArgs),
Add(SchemaAddEventArgs),
Alter(SchemaAlterArgs),
Remove(SchemaRemoveEventArgs),
List,
Show {
aggregate: String,
},
}
#[derive(Args)]
pub struct SchemaCreateArgs {
#[arg(long)]
pub aggregate: String,
#[arg(long, value_delimiter = ',')]
pub events: Vec<String>,
#[arg(long)]
pub snapshot_threshold: Option<u64>,
}
#[derive(Args)]
pub struct SchemaAddEventArgs {
pub aggregate: String,
#[arg(required = true, value_delimiter = ',')]
pub events: Vec<String>,
}
#[derive(Args)]
pub struct SchemaAlterArgs {
pub aggregate: String,
pub event: Option<String>,
#[arg(long)]
pub snapshot_threshold: Option<u64>,
#[arg(long)]
pub lock: Option<bool>,
#[arg(long)]
pub field: Option<String>,
#[arg(long, short = 'a', value_delimiter = ',')]
pub add: Vec<String>,
#[arg(long, short = 'r', value_delimiter = ',')]
pub remove: Vec<String>,
}
#[derive(Args)]
pub struct SchemaRemoveEventArgs {
pub aggregate: String,
pub event: String,
}
pub fn execute(config_path: Option<PathBuf>, command: SchemaCommands) -> Result<()> {
let (config, _) = load_or_default(config_path)?;
let manager = SchemaManager::load(config.schema_store_path())?;
match command {
SchemaCommands::Create(args) => {
let schema = manager.create(CreateSchemaInput {
aggregate: args.aggregate,
events: args.events,
snapshot_threshold: args.snapshot_threshold,
})?;
println!(
"schema={} events={} snapshot_threshold={:?}",
schema.aggregate,
schema.events.len(),
schema.snapshot_threshold
);
}
SchemaCommands::Add(args) => match manager.get(&args.aggregate) {
Ok(_) => {
let mut update = SchemaUpdate::default();
for event in &args.events {
update.event_add_fields.entry(event.clone()).or_default();
}
let schema = manager.update(&args.aggregate, update)?;
println!(
"schema={} added_events={} total_events={}",
schema.aggregate,
args.events.join(","),
schema.events.len()
);
}
Err(EventError::SchemaNotFound) => {
let schema = manager.create(CreateSchemaInput {
aggregate: args.aggregate.clone(),
events: args.events.clone(),
snapshot_threshold: None,
})?;
println!(
"schema={} created events={}",
schema.aggregate,
schema.events.len()
);
}
Err(err) => return Err(err.into()),
},
SchemaCommands::Alter(args) => {
let mut update = SchemaUpdate::default();
if let Some(value) = args.snapshot_threshold {
update.snapshot_threshold = Some(Some(value));
}
if args.event.is_none() && args.field.is_none() {
if let Some(lock) = args.lock {
update.locked = Some(lock);
}
}
if let Some(field) = args.field {
if let Some(lock) = args.lock {
update.field_lock = Some((field, lock));
} else {
return Err(anyhow::anyhow!(
"--lock must be provided when using --field"
));
}
}
if let Some(event) = args.event {
if !args.add.is_empty() {
update
.event_add_fields
.entry(event.clone())
.or_default()
.extend(args.add.clone());
}
if !args.remove.is_empty() {
update
.event_remove_fields
.insert(event.clone(), args.remove.clone());
}
if args.add.is_empty() && args.remove.is_empty() {
return Err(anyhow::anyhow!(
"provide --add or --remove when specifying an event"
));
}
} else if (!args.add.is_empty() || !args.remove.is_empty()) && args.event.is_none() {
return Err(anyhow::anyhow!(
"--event must be provided when adding or removing fields"
));
}
let schema = manager.update(&args.aggregate, update)?;
println!(
"schema={} updated_at={} version_events={}",
schema.aggregate,
schema.updated_at.to_rfc3339(),
schema.events.len()
);
}
SchemaCommands::Remove(args) => {
let schema = manager.remove_event(&args.aggregate, &args.event)?;
println!(
"schema={} removed_event={} remaining_events={}",
schema.aggregate,
args.event,
schema.events.len()
);
}
SchemaCommands::List => {
for schema in manager.list() {
println!(
"schema={} events={} locked={} snapshot_threshold={:?}",
schema.aggregate,
schema.events.len(),
schema.locked,
schema.snapshot_threshold
);
}
}
SchemaCommands::Show { aggregate } => {
let schema = manager.get(&aggregate)?;
println!("{}", serde_json::to_string_pretty(&schema)?);
}
}
Ok(())
}