use async_trait::async_trait;
use tokio::sync::mpsc;
use crate::core::error::DbResult;
use crate::core::models::*;
pub struct QueryBatch {
pub columns: Vec<String>,
pub rows: Vec<Vec<String>>,
pub done: bool,
}
fn skip_leading_noise(sql: &str) -> usize {
let bytes = sql.as_bytes();
let mut i = 0;
loop {
while i < bytes.len() && (bytes[i] as char).is_whitespace() {
i += 1;
}
if i >= bytes.len() {
return i;
}
if i + 1 < bytes.len() && bytes[i] == b'-' && bytes[i + 1] == b'-' {
i += 2;
while i < bytes.len() && bytes[i] != b'\n' {
i += 1;
}
continue;
}
if i + 1 < bytes.len() && bytes[i] == b'/' && bytes[i + 1] == b'*' {
i += 2;
let mut depth = 1usize;
while i + 1 < bytes.len() && depth > 0 {
if bytes[i] == b'/' && bytes[i + 1] == b'*' {
depth += 1;
i += 2;
} else if bytes[i] == b'*' && bytes[i + 1] == b'/' {
depth -= 1;
i += 2;
} else {
i += 1;
}
}
continue;
}
return i;
}
}
pub fn is_row_producing_query(sql: &str) -> bool {
let start = skip_leading_noise(sql);
let rest = &sql[start..];
let upper: String = rest
.chars()
.take(8) .flat_map(|c| c.to_uppercase())
.collect();
upper.starts_with("SELECT") || upper.starts_with("WITH")
}
#[cfg(test)]
mod classifier_tests {
use super::is_row_producing_query;
#[test]
fn plain_select() {
assert!(is_row_producing_query("SELECT * FROM t"));
assert!(is_row_producing_query("select * from t"));
}
#[test]
fn plain_with() {
assert!(is_row_producing_query(
"WITH x AS (SELECT 1) SELECT * FROM x"
));
}
#[test]
fn leading_line_comment() {
assert!(is_row_producing_query("-- note\nSELECT * FROM t"));
assert!(is_row_producing_query(
"-- a\n-- b\n SELECT * FROM t ORDER BY x DESC"
));
}
#[test]
fn leading_block_comment() {
assert!(is_row_producing_query("/* hello */ SELECT 1"));
assert!(is_row_producing_query("/* /* nested */ */\nSELECT 1"));
}
#[test]
fn mixed_comments_and_whitespace() {
assert!(is_row_producing_query(
"\n -- c1\n/* c2 */\n SELECT * FROM t"
));
}
#[test]
fn dml_not_row_producing() {
assert!(!is_row_producing_query("INSERT INTO t VALUES (1)"));
assert!(!is_row_producing_query("UPDATE t SET x = 1"));
assert!(!is_row_producing_query("DELETE FROM t"));
assert!(!is_row_producing_query("-- sneaky\nUPDATE t SET x = 1"));
}
#[test]
fn ddl_not_row_producing() {
assert!(!is_row_producing_query("CREATE TABLE t (id INT)"));
assert!(!is_row_producing_query("BEGIN NULL; END;"));
}
}
#[allow(dead_code)]
#[async_trait]
pub trait DatabaseAdapter: Send + Sync {
fn name(&self) -> &str;
fn db_type(&self) -> DatabaseType;
async fn get_schemas(&self) -> DbResult<Vec<Schema>>;
async fn get_tables(&self, schema: &str) -> DbResult<Vec<Table>>;
async fn get_views(&self, schema: &str) -> DbResult<Vec<View>>;
async fn get_procedures(&self, schema: &str) -> DbResult<Vec<Procedure>>;
async fn get_functions(&self, schema: &str) -> DbResult<Vec<Function>>;
async fn get_columns(&self, schema: &str, table: &str) -> DbResult<Vec<Column>>;
async fn execute(&self, query: &str) -> DbResult<QueryResult>;
async fn execute_streaming(
&self,
query: &str,
tx: mpsc::Sender<DbResult<QueryBatch>>,
) -> DbResult<()> {
let result = self.execute(query).await?;
let _ = tx
.send(Ok(QueryBatch {
columns: result.columns,
rows: result.rows,
done: true,
}))
.await;
Ok(())
}
async fn get_packages(&self, _schema: &str) -> DbResult<Vec<Package>> {
Ok(vec![])
}
async fn get_package_content(
&self,
_schema: &str,
_name: &str,
) -> DbResult<Option<PackageContent>> {
Ok(None)
}
async fn get_materialized_views(&self, _schema: &str) -> DbResult<Vec<MaterializedView>> {
Ok(vec![])
}
async fn get_indexes(&self, _schema: &str) -> DbResult<Vec<Index>> {
Ok(vec![])
}
async fn get_sequences(&self, _schema: &str) -> DbResult<Vec<Sequence>> {
Ok(vec![])
}
async fn get_types(&self, _schema: &str) -> DbResult<Vec<DbType>> {
Ok(vec![])
}
async fn get_triggers(&self, _schema: &str) -> DbResult<Vec<Trigger>> {
Ok(vec![])
}
async fn get_events(&self, _schema: &str) -> DbResult<Vec<DbEvent>> {
Ok(vec![])
}
async fn get_type_attributes(&self, _schema: &str, _name: &str) -> DbResult<QueryResult> {
Ok(QueryResult {
columns: vec![],
rows: vec![],
elapsed: None,
})
}
async fn get_type_methods(&self, _schema: &str, _name: &str) -> DbResult<QueryResult> {
Ok(QueryResult {
columns: vec![],
rows: vec![],
elapsed: None,
})
}
async fn get_trigger_info(&self, _schema: &str, _name: &str) -> DbResult<QueryResult> {
Ok(QueryResult {
columns: vec![],
rows: vec![],
elapsed: None,
})
}
async fn get_table_ddl(&self, _schema: &str, _table: &str) -> DbResult<String> {
Ok(String::new())
}
async fn get_source_code(
&self,
_schema: &str,
_name: &str,
_obj_type: &str,
) -> DbResult<String> {
Ok(String::new())
}
async fn get_foreign_keys(&self, _schema: &str, _table: &str) -> DbResult<Vec<ForeignKeyInfo>> {
Ok(vec![])
}
async fn compile_check(&self, _sql: &str) -> DbResult<Vec<CompileDiagnostic>> {
Ok(vec![])
}
async fn get_function_return_columns(
&self,
_schema: Option<&str>,
_package: Option<&str>,
_function: &str,
) -> DbResult<Vec<Column>> {
Ok(vec![])
}
}