#![allow(clippy::needless_lifetimes, clippy::elidable_lifetime_names)]
use crate::future::BoxFuture;
use std::future::Future;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use crate::cancel::DynCancelHandle;
use crate::capabilities::Capabilities;
use crate::error::Result;
use crate::query_stream::QueryStream;
use crate::schema::{QueryResult, Schema, SchemaCatalog, Table, TableSchema};
use crate::stream::DynRowStream;
use crate::value::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum ConnectionColor {
Red,
Yellow,
Green,
Blue,
Magenta,
Cyan,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "kebab-case")]
#[non_exhaustive]
pub enum SslMode {
Disable,
#[default]
Prefer,
Require,
VerifyCa,
VerifyFull,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionConfig {
pub id: uuid::Uuid,
pub name: String,
pub driver: String,
pub params: ConnectionParams,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ConnectionParams {
pub host: Option<String>,
pub port: Option<u16>,
pub database: Option<String>,
pub username: Option<String>,
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub password: Option<String>,
#[serde(default)]
pub options: std::collections::BTreeMap<String, String>,
#[serde(default)]
pub ssl_mode: SslMode,
#[serde(default)]
pub ssl_root_cert: Option<PathBuf>,
#[serde(default)]
pub ssl_cert: Option<PathBuf>,
#[serde(default)]
pub ssl_key: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ssh: Option<SshConfig>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub pre_connect: Vec<PreConnectStep>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub color: Option<ConnectionColor>,
#[serde(default, skip_serializing_if = "is_false")]
pub confirm_writes: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub read_only: bool,
}
#[allow(clippy::trivially_copy_pass_by_ref)]
const fn is_false(b: &bool) -> bool {
!*b
}
impl ConnectionParams {
#[must_use]
pub fn with(f: impl FnOnce(&mut Self)) -> Self {
let mut p = Self::default();
f(&mut p);
p
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub struct PreConnectStep {
pub command: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub save_output_to: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_secs: Option<u32>,
#[serde(default = "default_required")]
pub required: bool,
}
const fn default_required() -> bool {
true
}
impl PreConnectStep {
#[must_use]
pub fn new(command: impl Into<String>) -> Self {
Self {
command: command.into(),
save_output_to: None,
timeout_secs: None,
required: true,
}
}
#[must_use]
pub fn with_save_output_to(mut self, key: impl Into<String>) -> Self {
self.save_output_to = Some(key.into());
self
}
#[must_use]
pub const fn with_timeout_secs(mut self, secs: u32) -> Self {
self.timeout_secs = Some(secs);
self
}
#[must_use]
pub const fn with_required(mut self, required: bool) -> Self {
self.required = required;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub struct SshConfig {
pub host: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
pub user: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key_path: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub jump_host: Option<String>,
}
impl SshConfig {
pub fn new(host: impl Into<String>, user: impl Into<String>) -> Self {
Self {
host: host.into(),
port: None,
user: user.into(),
key_path: None,
jump_host: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
pub trait Connection: Send + Sync {
fn execute(
&mut self,
sql: &str,
params: &[Value],
) -> impl Future<Output = Result<QueryResult>> + Send;
fn stream(
&mut self,
sql: &str,
params: &[Value],
) -> impl Future<Output = Result<Box<dyn DynRowStream>>> + Send;
fn query(
&mut self,
sql: &str,
params: &[Value],
) -> impl Future<Output = Result<QueryStream>> + Send {
async move {
let inner = self.stream(sql, params).await?;
Ok(QueryStream::new(inner))
}
}
fn begin(&mut self) -> impl Future<Output = Result<()>> + Send;
fn begin_with(&mut self, isolation: IsolationLevel) -> impl Future<Output = Result<()>> + Send;
fn commit(&mut self) -> impl Future<Output = Result<()>> + Send;
fn rollback(&mut self) -> impl Future<Output = Result<()>> + Send;
fn savepoint(&mut self, name: &str) -> impl Future<Output = Result<()>> + Send {
let _ = name;
async { Err(crate::Error::unsupported("savepoints")) }
}
fn release_savepoint(&mut self, name: &str) -> impl Future<Output = Result<()>> + Send {
let _ = name;
async { Err(crate::Error::unsupported("savepoints")) }
}
fn rollback_to_savepoint(&mut self, name: &str) -> impl Future<Output = Result<()>> + Send {
let _ = name;
async { Err(crate::Error::unsupported("savepoints")) }
}
fn list_schemas(&mut self) -> impl Future<Output = Result<Vec<Schema>>> + Send;
fn list_tables(&mut self, schema: &str) -> impl Future<Output = Result<Vec<Table>>> + Send;
fn list_all_tables(&mut self) -> impl Future<Output = Result<SchemaCatalog>> + Send {
async move {
let schemas = self.list_schemas().await?;
let mut out = Vec::with_capacity(schemas.len());
for schema in schemas {
let tables = self.list_tables(&schema.name).await?;
out.push((schema, tables));
}
Ok(out)
}
}
fn describe_table(
&mut self,
schema: &str,
name: &str,
) -> impl Future<Output = Result<TableSchema>> + Send;
fn ping(&mut self) -> impl Future<Output = Result<()>> + Send;
fn cancel_handle(&self) -> Option<Box<dyn DynCancelHandle>>;
fn capabilities(&self) -> Capabilities;
fn fetch_ddl(
&mut self,
_schema: &str,
_table: &str,
) -> impl Future<Output = Result<String>> + Send {
async { Err(crate::Error::unsupported("fetch_ddl")) }
}
fn set_read_only(&mut self, read_only: bool) -> impl Future<Output = Result<()>> + Send {
let _ = read_only;
async { Err(crate::Error::unsupported("set_read_only")) }
}
fn close(self: Box<Self>) -> impl Future<Output = Result<()>> + Send;
}
pub trait DynConnection: Send + Sync {
fn execute<'a>(
&'a mut self,
sql: &'a str,
params: &'a [Value],
) -> BoxFuture<'a, Result<QueryResult>>;
fn stream<'a>(
&'a mut self,
sql: &'a str,
params: &'a [Value],
) -> BoxFuture<'a, Result<Box<dyn DynRowStream>>>;
fn query<'a>(
&'a mut self,
sql: &'a str,
params: &'a [Value],
) -> BoxFuture<'a, Result<QueryStream>>;
fn begin<'a>(&'a mut self) -> BoxFuture<'a, Result<()>>;
fn begin_with<'a>(&'a mut self, isolation: IsolationLevel) -> BoxFuture<'a, Result<()>>;
fn commit<'a>(&'a mut self) -> BoxFuture<'a, Result<()>>;
fn rollback<'a>(&'a mut self) -> BoxFuture<'a, Result<()>>;
fn savepoint<'a>(&'a mut self, name: &'a str) -> BoxFuture<'a, Result<()>>;
fn release_savepoint<'a>(&'a mut self, name: &'a str) -> BoxFuture<'a, Result<()>>;
fn rollback_to_savepoint<'a>(&'a mut self, name: &'a str) -> BoxFuture<'a, Result<()>>;
fn list_schemas<'a>(&'a mut self) -> BoxFuture<'a, Result<Vec<Schema>>>;
fn list_tables<'a>(&'a mut self, schema: &'a str) -> BoxFuture<'a, Result<Vec<Table>>>;
fn list_all_tables<'a>(&'a mut self) -> BoxFuture<'a, Result<SchemaCatalog>>;
fn describe_table<'a>(
&'a mut self,
schema: &'a str,
name: &'a str,
) -> BoxFuture<'a, Result<TableSchema>>;
fn ping<'a>(&'a mut self) -> BoxFuture<'a, Result<()>>;
fn cancel_handle(&self) -> Option<Box<dyn DynCancelHandle>>;
fn capabilities(&self) -> Capabilities;
fn fetch_ddl<'a>(
&'a mut self,
schema: &'a str,
table: &'a str,
) -> BoxFuture<'a, Result<String>>;
fn set_read_only<'a>(&'a mut self, read_only: bool) -> BoxFuture<'a, Result<()>>;
fn close(self: Box<Self>) -> BoxFuture<'static, Result<()>>;
}
impl<T> DynConnection for T
where
T: Connection + 'static,
{
fn execute<'a>(
&'a mut self,
sql: &'a str,
params: &'a [Value],
) -> BoxFuture<'a, Result<QueryResult>> {
Box::pin(<Self as Connection>::execute(self, sql, params))
}
fn stream<'a>(
&'a mut self,
sql: &'a str,
params: &'a [Value],
) -> BoxFuture<'a, Result<Box<dyn DynRowStream>>> {
Box::pin(<Self as Connection>::stream(self, sql, params))
}
fn query<'a>(
&'a mut self,
sql: &'a str,
params: &'a [Value],
) -> BoxFuture<'a, Result<QueryStream>> {
Box::pin(<Self as Connection>::query(self, sql, params))
}
fn begin<'a>(&'a mut self) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::begin(self))
}
fn begin_with<'a>(&'a mut self, isolation: IsolationLevel) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::begin_with(self, isolation))
}
fn commit<'a>(&'a mut self) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::commit(self))
}
fn rollback<'a>(&'a mut self) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::rollback(self))
}
fn savepoint<'a>(&'a mut self, name: &'a str) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::savepoint(self, name))
}
fn release_savepoint<'a>(&'a mut self, name: &'a str) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::release_savepoint(self, name))
}
fn rollback_to_savepoint<'a>(&'a mut self, name: &'a str) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::rollback_to_savepoint(self, name))
}
fn list_schemas<'a>(&'a mut self) -> BoxFuture<'a, Result<Vec<Schema>>> {
Box::pin(<Self as Connection>::list_schemas(self))
}
fn list_tables<'a>(&'a mut self, schema: &'a str) -> BoxFuture<'a, Result<Vec<Table>>> {
Box::pin(<Self as Connection>::list_tables(self, schema))
}
fn list_all_tables<'a>(&'a mut self) -> BoxFuture<'a, Result<SchemaCatalog>> {
Box::pin(<Self as Connection>::list_all_tables(self))
}
fn describe_table<'a>(
&'a mut self,
schema: &'a str,
name: &'a str,
) -> BoxFuture<'a, Result<TableSchema>> {
Box::pin(<Self as Connection>::describe_table(self, schema, name))
}
fn ping<'a>(&'a mut self) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::ping(self))
}
fn cancel_handle(&self) -> Option<Box<dyn DynCancelHandle>> {
<Self as Connection>::cancel_handle(self)
}
fn capabilities(&self) -> Capabilities {
<Self as Connection>::capabilities(self)
}
fn fetch_ddl<'a>(
&'a mut self,
schema: &'a str,
table: &'a str,
) -> BoxFuture<'a, Result<String>> {
Box::pin(<Self as Connection>::fetch_ddl(self, schema, table))
}
fn set_read_only<'a>(&'a mut self, read_only: bool) -> BoxFuture<'a, Result<()>> {
Box::pin(<Self as Connection>::set_read_only(self, read_only))
}
fn close(self: Box<Self>) -> BoxFuture<'static, Result<()>> {
Box::pin(<Self as Connection>::close(self))
}
}