use std::sync::Arc;
use std::time::Instant;
use future::BoxFuture;
use futures::prelude::*;
use log::{trace, warn};
use tokio::sync::mpsc;
use tokio::time::sleep;
use crate::backoff_strategy::BackoffStrategy;
use crate::connection_factory::ConnectionFactory;
use crate::{Ping, Poolable};
use super::inner_pool::PoolMessage;
use super::instrumentation::PoolInstrumentation;
use super::{Managed, PoolMessageEnvelope};
pub(crate) struct ExtendedConnectionFactory<T: Poolable> {
inner_factory: Arc<dyn ConnectionFactory<Connection = T> + Send + Sync + 'static>,
send_back: mpsc::UnboundedSender<PoolMessageEnvelope<T>>,
pub instrumentation: PoolInstrumentation,
back_off_strategy: BackoffStrategy,
}
impl<T: Poolable> ExtendedConnectionFactory<T> {
pub fn new(
inner_factory: Arc<dyn ConnectionFactory<Connection = T> + Send + Sync + 'static>,
send_back: mpsc::UnboundedSender<PoolMessageEnvelope<T>>,
instrumentation: PoolInstrumentation,
back_off_strategy: BackoffStrategy,
) -> Self {
Self {
inner_factory,
send_back,
instrumentation,
back_off_strategy,
}
}
pub fn send_back_cloned(&self) -> mpsc::UnboundedSender<PoolMessageEnvelope<T>> {
self.send_back.clone()
}
pub fn send_message(&mut self, message: PoolMessage<T>) -> Result<(), PoolMessage<T>> {
message.send_on_internal_channel(&mut self.send_back)
}
pub fn create_connection(mut self, initiated_at: Instant) {
let f = async move {
let mut attempt = 1;
let result = loop {
if self
.send_message(PoolMessage::CheckAlive(Instant::now()))
.is_err()
{
break Err("Pool is gone.".to_string());
}
match self.do_a_create_connection_attempt(initiated_at).await {
Ok(managed) => {
drop(managed);
trace!("Dropped newly created connection to be sent to pool");
break Ok(());
}
Err(this) => {
self = this;
if let Some(backoff) = self.back_off_strategy.get_next_backoff(attempt) {
warn!(
"Retry on in to create connection after attempt {} in {:?}",
attempt, backoff
);
sleep(backoff).await;
} else {
warn!(
"Retry on in to create connection after attempt {} immediately",
attempt
);
}
attempt += 1;
}
}
};
if let Err(err) = result {
warn!("Create connection finally failed: {}", err);
}
};
tokio::spawn(f);
}
pub fn connecting_to(&self) -> &str {
self.inner_factory.connecting_to()
}
pub fn ping(&self, timeout: Instant) -> BoxFuture<Ping> {
self.inner_factory.ping(timeout)
}
fn do_a_create_connection_attempt(
self,
initiated_at: Instant,
) -> impl Future<Output = Result<Managed<T>, Self>> {
let start_connect = Instant::now();
let inner_factory = Arc::clone(&self.inner_factory);
async move {
let conn = inner_factory.create_connection().await;
match conn {
Ok(conn) => {
trace!("new connection created");
self.instrumentation
.connection_created(initiated_at.elapsed(), start_connect.elapsed());
Ok(Managed::fresh(conn, self))
}
Err(err) => {
self.instrumentation.connection_factory_failed();
warn!("Connection factory failed: {}", err);
Err(self)
}
}
}
}
}