use futures::{sink::SinkExt, stream::StreamExt};
use tokio::io::{AsyncRead, AsyncWrite, Result};
use tokio::sync::mpsc;
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
use fern_proxy_interfaces::{SQLHandlerConfig, SQLMessage, SQLMessageHandler};
#[derive(Debug)]
pub enum Direction {
ClientServer,
ServerClient,
}
impl std::fmt::Display for Direction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
Self::ClientServer => write!(f, "Client -> Server"),
Self::ServerClient => write!(f, "Server -> Client"),
}
}
}
#[derive(Debug)]
pub struct Pipe<R, W, C, I, S, H> {
direction: Direction,
stream: FramedRead<R, C>,
sink: FramedWrite<W, C>,
frame_handlers: H,
_short_circuit: ShortCircuit<I, S>,
}
impl<R, W, C, I, S, H> Pipe<R, W, C, I, S, H>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
C: Decoder + Decoder<Item = I> + Encoder<I> + Default,
I: SQLMessage,
S: SQLMessage,
H: SQLMessageHandler<I> + Send + Sync,
{
pub fn new(
direction: Direction,
receiver: R,
sender: W,
short_circuit: ShortCircuit<I, S>,
config: &SQLHandlerConfig,
) -> Pipe<R, W, C, I, S, H> {
Pipe {
direction,
stream: FramedRead::new(receiver, C::default()),
sink: FramedWrite::new(sender, C::default()),
frame_handlers: H::new(config),
_short_circuit: short_circuit,
}
}
pub async fn run(&mut self) -> Result<()>
where
<C as Encoder<I>>::Error: std::fmt::Display,
std::io::Error: From<<C as Encoder<I>>::Error>,
{
log::trace!("[{}] running pipe", self.direction);
loop {
let mut packet = tokio::select! {
result = self.stream.next() => {
if let Some(Ok(packet)) = result {
packet
} else {
let err = std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("[{}] read 0 bytes, closing pipe", self.direction),
);
log::trace!("{}", err);
return Err(err);
}
},
};
packet = self.frame_handlers.process(packet).await;
if let Err(err) = self.sink.send(packet).await {
log::error!("[{}] cannot send to sink: {}", self.direction, err);
return Err(err.into());
}
}
}
}
#[derive(Debug)]
pub struct ShortCircuit<I, S> {
_tx: mpsc::Sender<S>,
_rx: mpsc::Receiver<I>,
}
impl<I, S> ShortCircuit<I, S> {
pub fn new(tx: mpsc::Sender<S>, rx: mpsc::Receiver<I>) -> ShortCircuit<I, S> {
ShortCircuit { _tx: tx, _rx: rx }
}
}