use core::num::NonZeroU16;
use std::collections::BTreeSet;
use nom::{
Finish as _, IResult, Parser,
branch::alt,
bytes::complete::{tag, take_while, take_while1},
character::complete::{char, multispace0},
combinator::{all_consuming, map, value},
multi::{many0, separated_list1},
sequence::{delimited, pair, preceded},
};
use nom_language::error::VerboseError;
use super::{ConcurrentlyConfig, Error, FillFactor, SqlFragment};
use crate::identifier::{AccessMethod, Index, QualifiedTable, Schema, Table};
#[derive(Debug, clap::Subcommand)]
pub enum Command {
Create(Create),
Gc(Gc),
}
#[derive(Debug)]
pub enum Output {
Create(super::create::Result),
Gc(super::gc::Result),
}
impl std::fmt::Display for Output {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Create(result) => result.fmt(f),
Self::Gc(result) => result.fmt(f),
}
}
}
impl Command {
pub async fn run(self, config: &crate::Config) -> Result<Output, Error> {
let output = match self {
Self::Create(create) => create.run(config).await.map(Output::Create),
Self::Gc(gc) => gc.run(config).await.map(Output::Gc),
}?;
log::info!("{output}");
Ok(output)
}
}
#[derive(Debug, clap::Args)]
pub struct Create {
#[arg(long)]
table: Table,
#[arg(long)]
index: Index,
#[arg(long)]
key_expression: SqlFragment,
#[arg(long, default_value = "public")]
schema: Schema,
#[arg(long)]
unique: bool,
#[arg(long, default_value = "btree")]
method: AccessMethod,
#[arg(long)]
include: Option<SqlFragment>,
#[arg(long)]
where_clause: Option<SqlFragment>,
#[arg(long)]
fillfactor: Option<FillFactor>,
#[arg(
long,
default_value = "none",
default_missing_value = "all",
num_args = 0..=1,
value_name = "MODE",
value_parser = parse_concurrently,
)]
concurrently: ConcurrentlyConfig,
#[arg(long, default_value = "1")]
jobs: NonZeroU16,
#[arg(long)]
dry_run: bool,
}
impl Create {
pub async fn run(self, config: &crate::Config) -> Result<super::create::Result, Error> {
let input = super::create::Input {
qualified_table: QualifiedTable {
schema: self.schema,
table: self.table,
},
index: self.index,
key_expression: self.key_expression,
unique: self.unique,
method: self.method,
include: self.include,
where_clause: self.where_clause,
fillfactor: self.fillfactor,
concurrently: self.concurrently,
};
super::create::run(config, &input, self.jobs, self.dry_run).await
}
}
fn parse_concurrently(value: &str) -> Result<ConcurrentlyConfig, String> {
let parsed = all_consuming(delimited(multispace0, concurrently_spec, multispace0))
.parse(value)
.finish();
match parsed {
Ok(("", concurrently_config)) => Ok(concurrently_config),
Ok((remaining, _)) => Err(format!("unexpected trailing input: '{remaining}'")),
Err(error) => Err(nom_language::error::convert_error(value, error)),
}
}
fn parse_table_set(values: Vec<String>) -> Result<BTreeSet<Table>, String> {
let mut tables = BTreeSet::new();
for value in values {
let table: Table = value
.parse()
.map_err(|error| format!("invalid table identifier '{value}': {error}"))?;
if !tables.insert(table.clone()) {
return Err(format!("duplicate table identifier '{table}'"));
}
}
Ok(tables)
}
fn concurrently_spec(input: &str) -> IResult<&str, ConcurrentlyConfig, VerboseError<&str>> {
alt((
value(ConcurrentlyConfig::All, tag("all")),
value(ConcurrentlyConfig::None, tag("none")),
value(ConcurrentlyConfig::None, tag("disabled")),
map(
preceded(
tag("except:"),
delimited(multispace0, table_set, multispace0),
),
ConcurrentlyConfig::Except,
),
))
.parse(input)
}
fn table_identifier(input: &str) -> IResult<&str, String, VerboseError<&str>> {
alt((quoted_identifier, unquoted_identifier)).parse(input)
}
fn table_list(input: &str) -> IResult<&str, Vec<String>, VerboseError<&str>> {
separated_list1(
delimited(multispace0, char(','), multispace0),
table_identifier,
)
.parse(input)
}
fn table_set(input: &str) -> IResult<&str, BTreeSet<Table>, VerboseError<&str>> {
nom::combinator::map_res(table_list, parse_table_set).parse(input)
}
fn quoted_identifier(input: &str) -> IResult<&str, String, VerboseError<&str>> {
let (input, parts) = delimited(
char('"'),
many0(alt((
value("\"", tag("\"\"")),
take_while1(|character: char| character != '"'),
))),
char('"'),
)
.parse(input)?;
let mut value = String::new();
for part in parts {
value.push_str(part);
}
Ok((input, value))
}
fn unquoted_identifier(input: &str) -> IResult<&str, String, VerboseError<&str>> {
let (input, value) = nom::combinator::recognize(pair(
take_while1(|character: char| character.is_ascii_alphabetic() || character == '_'),
take_while(|character: char| {
character.is_ascii_alphanumeric() || character == '_' || character == '$'
}),
))
.parse(input)?;
Ok((input, value.to_ascii_lowercase()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_concurrently_none() {
assert_eq!(
parse_concurrently("none").unwrap(),
ConcurrentlyConfig::None
);
}
#[test]
fn parse_concurrently_except_quoted_with_comma() {
let mut tables = BTreeSet::new();
tables.insert("foo,bar".parse::<Table>().unwrap());
tables.insert("baz".parse::<Table>().unwrap());
assert_eq!(
parse_concurrently("except:\"foo,bar\", baz").unwrap(),
ConcurrentlyConfig::Except(tables)
);
}
#[test]
fn parse_concurrently_except_quoted_with_escape() {
let mut tables = BTreeSet::new();
tables.insert("foo\"bar".parse::<Table>().unwrap());
assert_eq!(
parse_concurrently("except:\"foo\"\"bar\"").unwrap(),
ConcurrentlyConfig::Except(tables)
);
}
#[test]
fn parse_concurrently_except_empty_fails() {
assert!(parse_concurrently("except:").is_err());
}
#[test]
fn parse_concurrently_except_unquoted() {
let mut tables = BTreeSet::new();
tables.insert("events_2024".parse::<Table>().unwrap());
tables.insert("foobar".parse::<Table>().unwrap());
assert_eq!(
parse_concurrently("except:events_2024, FooBar").unwrap(),
ConcurrentlyConfig::Except(tables)
);
}
#[test]
fn parse_concurrently_except_duplicate_fails() {
assert!(parse_concurrently("except:events_2024, events_2024").is_err());
}
}
#[derive(Debug, clap::Args)]
pub struct Gc {
#[arg(long)]
index: Index,
#[arg(long, default_value = "public")]
schema: Schema,
#[arg(long, default_value = "1")]
jobs: NonZeroU16,
#[arg(long)]
dry_run: bool,
}
impl Gc {
pub async fn run(self, config: &crate::Config) -> Result<super::gc::Result, Error> {
let input = super::gc::Input {
schema: self.schema,
index: self.index,
};
super::gc::run(config, &input, self.jobs, self.dry_run).await
}
}