use std::fmt::Debug;
use std::future::Future;
use async_trait::async_trait;
use tokio::task::JoinHandle;
use crate::asnc::consts::CONN_STOP_POOLING_INTERVAL;
use crate::asnc::io::{
incoming_channel, outgoing_channel, ChannelFactory, IncomingFrameReceiver, OutgoingFrameSender,
};
use crate::asnc::marker::AsyncConnConf;
use crate::core::io::{ConnectionConf, ConnectionInfo};
use crate::core::utils::{Closable, SharedCloser};
use crate::prelude::*;
#[async_trait]
pub trait ConnectionBuilder<V: MaybeVersioned>: ConnectionConf {
async fn build(&self) -> Result<(Connection<V>, ConnectionHandler)>;
fn to_conf(&self) -> AsyncConnConf<V>;
fn is_repairable(&self) -> bool {
false
}
}
#[derive(Debug)]
pub struct Connection<V: MaybeVersioned> {
info: ConnectionInfo,
sender: OutgoingFrameSender<V>,
receiver: IncomingFrameReceiver<V>,
state: SharedCloser,
}
pub struct ConnectionHandler {
inner: JoinHandle<Result<()>>,
}
impl ConnectionHandler {
pub fn spawn<F>(task: F) -> Self
where
F: Future<Output = Result<()>> + Send + 'static,
{
Self {
inner: tokio::spawn(task),
}
}
pub fn spawn_from_state(state: SharedCloser) -> Self {
Self::spawn(async move {
while !state.is_closed() {
tokio::time::sleep(CONN_STOP_POOLING_INTERVAL).await;
}
Ok(())
})
}
pub(crate) fn handle<V: MaybeVersioned>(self, conn: &Connection<V>) {
let mut state = conn.state.clone();
let info = conn.info.clone();
tokio::task::spawn(async move {
let result = self.inner.await;
state.close();
match result {
Ok(res) => match res {
Ok(_) => log::debug!("[{info:?}] listener stopped"),
Err(err) => log::debug!("[{info:?}] listener exited with error: {err:?}"),
},
Err(err) => log::error!("[{info:?}] listener failed: {err:?}"),
}
});
}
}
impl<V: MaybeVersioned> Connection<V> {
pub fn new(info: ConnectionInfo, state: SharedCloser) -> (Self, ChannelFactory<V>) {
let (sender, send_handler) = outgoing_channel(state.to_closable());
let (producer, receiver) = incoming_channel();
let connection = Self {
info,
sender: sender.clone(),
receiver,
state,
};
let builder = ChannelFactory {
info: connection.info.clone(),
state: connection.state.to_closable(),
sender,
send_handler,
producer,
};
(connection, builder)
}
pub fn info(&self) -> &ConnectionInfo {
&self.info
}
pub(in crate::asnc) fn state(&self) -> Closable {
self.state.to_closable()
}
pub(in crate::asnc) fn share_state(&self) -> SharedCloser {
self.state.clone()
}
pub(in crate::asnc) fn sender(&self) -> OutgoingFrameSender<V> {
self.sender.clone()
}
pub(in crate::asnc) fn receiver(&self) -> IncomingFrameReceiver<V> {
self.receiver.clone()
}
pub(in crate::asnc) fn reuse(&self) -> Connection<V> {
let mut state = SharedCloser::new();
let conn = Self {
info: self.info.clone(),
sender: self.sender.clone(),
receiver: self.receiver.clone(),
state: state.clone(),
};
let parent_state = self.state.to_closable();
tokio::task::spawn(async move {
while !parent_state.is_closed() && !state.is_closed() {
tokio::time::sleep(CONN_STOP_POOLING_INTERVAL).await;
}
state.close();
});
conn
}
fn close(&mut self) {
if self.state.is_closed() {
return;
}
self.state.close();
log::debug!("[{:?}] connection closed", self.info);
}
}
impl<V: MaybeVersioned> Drop for Connection<V> {
fn drop(&mut self) {
self.close();
}
}