use std::time::Duration;
use chrono::{DateTime, Utc};
use futures::stream::{self, StreamExt};
use futures::Stream;
use http::Uri;
use tonic::transport::{Channel, ClientTlsConfig};
use evidentsource_core::domain::{DatabaseError, DatabaseName};
use evidentsource_core::{DatabaseCatalog, DatabaseIdentity};
use crate::auth::Credentials;
use crate::connection::Connection;
use crate::conversions::timestamp_to_datetime;
use crate::EvidentSourceClient;
#[derive(Debug, Clone)]
pub struct DatabaseIdentityImpl {
name: DatabaseName,
created_at: DateTime<Utc>,
}
impl DatabaseIdentity for DatabaseIdentityImpl {
fn name(&self) -> &DatabaseName {
&self.name
}
fn created_at(&self) -> DateTime<Utc> {
self.created_at
}
}
#[derive(Clone)]
pub struct EvidentSource {
client: EvidentSourceClient,
}
impl EvidentSource {
pub fn builder(addr: &str) -> EvidentSourceBuilder {
EvidentSourceBuilder::new(addr)
}
pub async fn connect_to_server(addr: &str) -> Result<Self, crate::Error> {
Self::connect_with_auth(addr, Credentials::None).await
}
pub async fn connect_with_auth(
addr: &str,
credentials: Credentials,
) -> Result<Self, crate::Error> {
let client = EvidentSourceClient::with_credentials(addr, credentials).await?;
Ok(Self { client })
}
pub fn from_client(client: EvidentSourceClient) -> Self {
Self { client }
}
pub async fn connect(&self, database: &DatabaseName) -> Result<Connection, DatabaseError> {
Connection::new(self.client.clone(), database.clone()).await
}
pub fn client(&self) -> &EvidentSourceClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut EvidentSourceClient {
&mut self.client
}
}
impl std::fmt::Debug for EvidentSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EvidentSource").finish()
}
}
impl DatabaseCatalog for EvidentSource {
type Identity = DatabaseIdentityImpl;
fn list_databases(&self) -> impl Stream<Item = DatabaseName> {
let mut client = self.client.clone();
stream::once(async move {
let result = client.fetch_catalog().await;
match result {
Ok(response_stream) => response_stream
.filter_map(|result| async move {
match result {
Ok(reply) => DatabaseName::new(&reply.database_name).ok(),
Err(_) => None,
}
})
.boxed(),
Err(_) => stream::empty().boxed(),
}
})
.flatten()
}
fn create_database(
&self,
name: DatabaseName,
) -> impl std::future::Future<Output = Result<Self::Identity, DatabaseError>> {
let mut client = self.client.clone();
async move {
let proto_db = client
.create_database(name.to_string())
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &name.to_string())
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
let db_name = DatabaseName::new(&proto_db.name)?;
let created_at = proto_db
.created_at
.ok_or_else(|| {
DatabaseError::ServerError("missing created_at timestamp".to_string())
})
.and_then(|ts| {
timestamp_to_datetime(ts).map_err(|e| {
DatabaseError::ServerError(format!("invalid timestamp: {}", e))
})
})?;
Ok(DatabaseIdentityImpl {
name: db_name,
created_at,
})
}
}
fn delete_database(
&self,
name: DatabaseName,
) -> impl std::future::Future<Output = Result<(), DatabaseError>> {
let mut client = self.client.clone();
async move {
client
.delete_database(name.to_string())
.await
.map_err(|e| match e {
crate::Error::GrpcStatus(ref status) => {
crate::status_mapping::to_database_error(status, &name.to_string())
}
_ => DatabaseError::ServerError(e.to_string()),
})?;
Ok(())
}
}
}
#[derive(Debug, Clone)]
pub enum TlsConfig {
Native,
Disabled,
}
#[derive(Debug, Clone)]
pub struct BackoffConfig {
pub initial: Duration,
pub max: Duration,
pub multiplier: f64,
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
initial: Duration::from_millis(100),
max: Duration::from_secs(30),
multiplier: 2.0,
}
}
}
#[derive(Debug, Clone)]
pub struct EvidentSourceBuilder {
addr: String,
credentials: Credentials,
tls_config: Option<TlsConfig>,
connect_timeout: Option<Duration>,
backoff: Option<BackoffConfig>,
}
impl EvidentSourceBuilder {
pub fn new(addr: &str) -> Self {
Self {
addr: addr.to_string(),
credentials: Credentials::None,
tls_config: None,
connect_timeout: None,
backoff: None,
}
}
pub fn credentials(mut self, credentials: Credentials) -> Self {
self.credentials = credentials;
self
}
pub fn tls(mut self, config: TlsConfig) -> Self {
self.tls_config = Some(config);
self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}
pub fn backoff(mut self, config: BackoffConfig) -> Self {
self.backoff = Some(config);
self
}
pub async fn connect(self) -> Result<EvidentSource, crate::Error> {
let uri = Uri::try_from(&self.addr)?;
let mut channel_builder = Channel::builder(uri.clone());
if let Some(timeout) = self.connect_timeout {
channel_builder = channel_builder.connect_timeout(timeout);
}
let is_https = uri.scheme_str() == Some("https");
match self.tls_config {
Some(TlsConfig::Native) | None if is_https => {
let tls_config = ClientTlsConfig::new()
.with_native_roots()
.domain_name(uri.host().unwrap_or("localhost"));
channel_builder = channel_builder.tls_config(tls_config)?;
}
Some(TlsConfig::Disabled) => {
}
_ => {
}
}
let channel = channel_builder.connect().await?;
let interceptor = crate::auth::AuthInterceptor::new(self.credentials);
let client =
crate::com::evidentsource::evident_source_client::EvidentSourceClient::with_interceptor(
channel,
interceptor,
);
Ok(EvidentSource {
client: EvidentSourceClient { client },
})
}
pub fn get_backoff(&self) -> Option<&BackoffConfig> {
self.backoff.as_ref()
}
}