#![deny(
clippy::all,
clippy::cargo,
clippy::nursery,
clippy::pedantic,
deprecated_in_future,
future_incompatible,
missing_docs,
nonstandard_style,
rust_2018_idioms,
rustdoc,
warnings,
unused_results,
unused_qualifications,
unused_lifetimes,
unused_import_braces,
unsafe_code,
unreachable_pub,
trivial_casts,
trivial_numeric_casts,
missing_debug_implementations,
missing_copy_implementations
)]
#![warn(variant_size_differences)]
#![allow(clippy::multiple_crate_versions, missing_doc_code_examples)]
#![doc(html_root_url = "https://docs.rs/automaat-processor-sql-query/0.1.0")]
use automaat_core::{Context, Processor};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlparser::dialect::GenericSqlDialect;
use sqlparser::sqlast::SQLStatement;
use sqlparser::sqlparser::{Parser, ParserError};
use std::collections::HashMap;
use std::{error, fmt};
use url::Url;
#[cfg_attr(feature = "juniper", derive(juniper::GraphQLObject))]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct SqlQuery {
pub statement: String,
#[serde(with = "url_serde")]
pub url: Url,
}
#[cfg(feature = "juniper")]
#[graphql(name = "StringRegexInput")]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, juniper::GraphQLInputObject)]
pub struct Input {
statement: String,
#[serde(with = "url_serde")]
url: Url,
}
#[cfg(feature = "juniper")]
impl From<Input> for SqlQuery {
fn from(input: Input) -> Self {
Self {
statement: input.statement,
url: input.url,
}
}
}
impl SqlQuery {
fn run_postgres_statement(&self) -> Result<Option<String>, Error> {
use postgres::{types::Type as T, Client, NoTls};
use serde_json::{to_string, to_value};
let mut conn = Client::connect(self.url.as_str(), NoTls).map_err(Error::from)?;
let rows = conn
.query(self.statement.as_str(), &[])
.map_err(Error::from)?;
let mut results = vec![];
for row in rows {
let mut map = HashMap::new();
for column in row.columns() {
let n = column.name();
let value: Value = match column.type_() {
&T::BOOL => to_value::<Option<bool>>(row.get(n))?,
&T::INT4 => to_value::<Option<i32>>(row.get(n))?,
&T::JSON | &T::JSONB => to_value::<Option<Value>>(row.get(n))?,
&T::TEXT | &T::VARCHAR => to_value::<Option<String>>(row.get(n))?,
ty => return Err(Error::ReturnType(ty.to_string())),
};
let _ = map.insert(n.to_string(), value);
}
if !map.is_empty() {
results.push(map);
}
}
if results.is_empty() {
return Ok(None);
};
Ok(Some(to_string(&results)?))
}
fn run_sqlite_statement(&self) -> Result<Option<String>, Error> {
unimplemented!()
}
fn run_mysql_statement(&self) -> Result<Option<String>, Error> {
unimplemented!()
}
}
impl<'a> Processor<'a> for SqlQuery {
const NAME: &'static str = "SQL Query";
type Error = Error;
type Output = String;
fn validate(&self) -> Result<(), Self::Error> {
let ast = Parser::parse_sql(&GenericSqlDialect {}, self.statement.to_owned())
.map_err(Error::from)?;
if ast.len() != 1 {
return Err(Error::MultipleStatements);
};
match ast[0] {
SQLStatement::SQLSelect(_) => (),
_ => return Err(Error::StatementType),
};
match self.url.scheme() {
"postgres" => Ok(()),
scheme => Err(Error::Scheme(scheme.to_owned())),
}
}
fn run(&self, _context: &Context) -> Result<Option<Self::Output>, Self::Error> {
match self.url.scheme() {
"postgres" => self.run_postgres_statement(),
"sqlite" => self.run_sqlite_statement(),
"mysql" => self.run_mysql_statement(),
_ => unimplemented!(),
}
}
}
#[derive(Debug)]
pub enum Error {
MultipleStatements,
Postgres(postgres::Error),
ReturnType(String),
Scheme(String),
Serde(serde_json::Error),
StatementType,
Syntax(String),
#[doc(hidden)]
__Unknown,
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Error::MultipleStatements => write!(f, "Multiple SQL statements found"),
Error::Postgres(ref err) => write!(f, "Postgres error: {}", err),
Error::ReturnType(ref string) => write!(f, "Unsupported return type: {}", string),
Error::Scheme(ref string) => write!(f, "Unsupported URL scheme: {}", string),
Error::Serde(ref err) => write!(f, "Serde error: {}", err),
Error::StatementType => write!(f, "Non-SELECT statements are not supported"),
Error::Syntax(ref string) => write!(f, "Syntax error: {}", string),
Error::__Unknown => unimplemented!(),
}
}
}
impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
Error::MultipleStatements
| Error::ReturnType(_)
| Error::Scheme(_)
| Error::StatementType
| Error::Syntax(_) => None,
Error::Postgres(ref err) => Some(err),
Error::Serde(ref err) => Some(err),
Error::__Unknown => unreachable!(),
}
}
}
impl From<ParserError> for Error {
fn from(err: ParserError) -> Self {
match err {
ParserError::ParserError(string) | ParserError::TokenizerError(string) => {
Error::Syntax(string)
}
}
}
}
impl From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Self {
Error::Serde(err)
}
}
impl From<postgres::Error> for Error {
fn from(err: postgres::Error) -> Self {
Error::Postgres(err)
}
}
#[cfg(test)]
mod tests {
use super::*;
use postgres::{Client, NoTls};
use rand::Rng;
use serde_json::json;
struct PgData {
client: Client,
table: String,
}
impl Drop for PgData {
fn drop(&mut self) {
let query = format!("DROP TABLE {}", self.table);
let _ = self.client.execute(query.as_str(), &[]).unwrap();
}
}
fn processor_stub() -> SqlQuery {
SqlQuery {
statement: "SELECT * FROM table".to_owned(),
url: Url::parse("postgres://postgres@127.0.0.1").unwrap(),
}
}
fn prepare_pg_data(columns: &str, insert: &str) -> PgData {
let table = format!("foo_{}", rand::thread_rng().gen::<u16>());
let mut client = Client::connect("postgres://postgres@127.0.0.1", NoTls).unwrap();
let query = format!(
"DROP TABLE IF EXISTS {}; CREATE UNLOGGED TABLE {} {}; INSERT INTO {} {};",
table, table, columns, table, insert
);
let _ = client.simple_query(query.as_str()).unwrap();
PgData { client, table }
}
mod run {
use super::*;
#[test]
fn test_empty_output() {
let mut processor = processor_stub();
processor.statement = "SELECT".to_owned();
let context = Context::new().unwrap();
let output = processor.run(&context).unwrap();
assert!(output.is_none())
}
#[test]
fn test_single_value_output() {
let pg = prepare_pg_data("(id INT)", "(id) VALUES (1)");
let mut processor = processor_stub();
processor.statement = format!("SELECT * FROM {}", pg.table);
let context = Context::new().unwrap();
let output = processor.run(&context).unwrap().expect("Some");
assert_eq!(json!([{ "id": 1 }]).to_string(), output);
}
#[test]
fn test_multi_value_output() {
let pg = prepare_pg_data("(id INT)", "(id) VALUES (1), (2), (3)");
let mut processor = processor_stub();
processor.statement = format!("SELECT * FROM {}", pg.table);
let context = Context::new().unwrap();
let output = processor.run(&context).unwrap().expect("Some");
assert_eq!(
json!([{ "id": 1 }, { "id": 2 }, { "id": 3 }]).to_string(),
output
);
}
#[test]
fn test_null_value_output() {
let pg = prepare_pg_data("(col INT)", "(col) VALUES (NULL)");
let mut processor = processor_stub();
processor.statement = format!("SELECT * FROM {}", pg.table);
let context = Context::new().unwrap();
let output = processor.run(&context).unwrap().expect("Some");
assert_eq!(json!([{ "col": null }]).to_string(), output);
}
#[test]
fn test_bool_value_output() {
let pg = prepare_pg_data("(col BOOL)", "(col) VALUES (true)");
let mut processor = processor_stub();
processor.statement = format!("SELECT * FROM {}", pg.table);
let context = Context::new().unwrap();
let output = processor.run(&context).unwrap().expect("Some");
assert_eq!(json!([{ "col": true }]).to_string(), output);
}
#[test]
fn test_string_value_output() {
let pg = prepare_pg_data("(col TEXT)", "(col) VALUES ('hello')");
let mut processor = processor_stub();
processor.statement = format!("SELECT * FROM {}", pg.table);
let context = Context::new().unwrap();
let output = processor.run(&context).unwrap().expect("Some");
assert_eq!(json!([{ "col": "hello" }]).to_string(), output);
}
#[test]
fn test_json_value_output() {
let pg = prepare_pg_data("(col JSON)", "(col) VALUES ('[1,2,{\"1\":true}]')");
let mut processor = processor_stub();
processor.statement = format!("SELECT * FROM {}", pg.table);
let context = Context::new().unwrap();
let output = processor.run(&context).unwrap().expect("Some");
assert_eq!(
json!([{ "col": [1, 2, { "1": true }] }]).to_string(),
output
);
}
#[test]
fn test_invalid_table() {
let mut processor = processor_stub();
processor.statement = "SELECT * FROM does_not_exist".to_owned();
let context = Context::new().unwrap();
let error = processor.run(&context).unwrap_err();
assert!(error
.to_string()
.contains("relation \"does_not_exist\" does not exist"));
}
}
mod validate {
use super::*;
#[test]
fn test_select_statement() {
let mut processor = processor_stub();
processor.statement = "SELECT * FROM table".to_owned();
processor.validate().unwrap()
}
#[test]
#[should_panic]
fn test_update_statement() {
let mut processor = processor_stub();
processor.statement = "UPDATE table SET field1 = 1 WHERE field1 = 0".to_owned();
processor.validate().unwrap()
}
#[test]
#[should_panic]
fn test_delete_statement() {
let mut processor = processor_stub();
processor.statement = "DELETE FROM table WHERE field1 = 0".to_owned();
processor.validate().unwrap()
}
#[test]
#[should_panic]
fn test_invalid_statement() {
let mut processor = processor_stub();
processor.statement = "HELLO WORLD".to_owned();
processor.validate().unwrap()
}
#[test]
fn test_postgres_scheme() {
let mut processor = processor_stub();
processor.url = Url::parse("postgres://127.0.0.1").unwrap();
processor.validate().unwrap()
}
#[test]
#[should_panic]
fn test_sqlite_scheme() {
let mut processor = processor_stub();
processor.url = Url::parse("sqlite://127.0.0.1").unwrap();
processor.validate().unwrap()
}
#[test]
#[should_panic]
fn test_mysql_scheme() {
let mut processor = processor_stub();
processor.url = Url::parse("mysql://127.0.0.1").unwrap();
processor.validate().unwrap()
}
#[test]
#[should_panic]
fn test_invalid_scheme() {
let mut processor = processor_stub();
processor.url = Url::parse("invalid://127.0.0.1").unwrap();
processor.validate().unwrap()
}
}
#[test]
fn test_readme_deps() {
version_sync::assert_markdown_deps_updated!("README.md");
}
#[test]
fn test_html_root_url() {
version_sync::assert_html_root_url_updated!("src/lib.rs");
}
}