use std::io::{Read, Write};
use std::thread;
use mavio::error::IoErrorKind;
use mavio::io::{StdIoReader, StdIoWriter};
use crate::core::io::{ChannelInfo, ConnectionInfo, IncomingFrame};
use crate::core::io::{Receiver, Sender};
use crate::core::utils::{Closable, SharedCloser};
use crate::sync::consts::{
CHANNEL_STOP_JOIN_ATTEMPTS, CHANNEL_STOP_JOIN_POOLING_INTERVAL, CHANNEL_STOP_POOLING_INTERVAL,
};
use crate::sync::io::{IncomingFrameProducer, OutgoingFrameHandler, OutgoingFrameSender};
use crate::prelude::*;
#[derive(Clone, Debug)]
pub struct ChannelFactory<V: MaybeVersioned> {
pub(super) info: ConnectionInfo,
pub(super) state: Closable,
pub(super) sender: OutgoingFrameSender<V>,
pub(super) send_handler: OutgoingFrameHandler<V>,
pub(super) producer: IncomingFrameProducer<V>,
}
impl<V: MaybeVersioned> ChannelFactory<V> {
#[must_use]
pub fn build<R: Read, W: Write>(
&self,
info: ChannelInfo,
reader: R,
writer: W,
) -> Channel<V, R, W> {
Channel {
conn_state: self.state.clone(),
info,
reader,
writer,
send_handler: self.send_handler.clone(),
producer: self.producer.clone(),
}
}
pub fn info(&self) -> &ConnectionInfo {
&self.info
}
pub fn is_closed(&self) -> bool {
self.state.is_closed()
}
pub fn producer(&self) -> &IncomingFrameProducer<V> {
&self.producer
}
pub fn sender(&self) -> &OutgoingFrameSender<V> {
&self.sender
}
pub fn send_handler(&self) -> &OutgoingFrameHandler<V> {
&self.send_handler
}
}
pub struct Channel<V: MaybeVersioned, R: Read, W: Write> {
conn_state: Closable,
info: ChannelInfo,
reader: R,
writer: W,
send_handler: OutgoingFrameHandler<V>,
producer: IncomingFrameProducer<V>,
}
impl<V: MaybeVersioned, R: Read + Send + 'static, W: Write + Send + 'static> Channel<V, R, W> {
pub fn spawn(self) -> SharedCloser {
let info = self.info;
let conn_state = self.conn_state;
let state = SharedCloser::new();
log::trace!("[{info:?}] spawning peer connection");
let write_handler = {
let info = info.clone();
let send_handler = self.send_handler;
let frame_writer = Sender::new(StdIoWriter::new(self.writer));
thread::spawn(move || Self::write_handler(info, send_handler, frame_writer))
};
let read_handler = {
let conn_state = conn_state.clone();
let state = state.clone();
let info = info.clone();
let producer = self.producer;
let frame_reader = Receiver::new(StdIoReader::new(self.reader));
thread::spawn(move || {
Self::read_handler(state, conn_state, info, producer, frame_reader)
})
};
{
let info = info.clone();
let state = state.clone();
thread::spawn(move || {
Self::handle_stop(state, conn_state, info, write_handler, read_handler);
});
}
state.clone()
}
fn write_handler(
info: ChannelInfo,
send_handler: OutgoingFrameHandler<V>,
mut frame_writer: Sender<std::io::Error, StdIoWriter<W>, V>,
) -> Result<()> {
loop {
let out_frame = match send_handler.recv() {
Ok(out_frame) => out_frame,
Err(err) => {
frame_writer.flush().map_err(Error::from)?;
return Err(Error::from(err));
}
};
if !out_frame.should_send_to(info.id()) {
continue;
}
log::trace!("[{info:?}] received outgoing frame from API");
loop {
if let Err(err) = frame_writer.send(out_frame.frame()) {
let err = Error::from(err);
if let Error::Io(err) = err {
if let IoErrorKind::Std(err_kind) = err.kind() {
if let std::io::ErrorKind::TimedOut = err_kind {
continue;
}
}
return Err(Error::Io(err));
}
}
log::trace!("[{info:?}] written outgoing frame");
break;
}
}
}
fn read_handler(
state: SharedCloser,
conn_state: Closable,
info: ChannelInfo,
producer: IncomingFrameProducer<V>,
mut frame_reader: Receiver<std::io::Error, StdIoReader<R>, V>,
) -> Result<()> {
loop {
if conn_state.is_closed() || state.is_closed() {
return Ok(());
}
let frame = match frame_reader.recv() {
Ok(frame) => frame,
Err(err) => {
let err = Error::from(err);
if let Error::Io(err) = err {
if let IoErrorKind::Std(err_kind) = err.kind() {
if let std::io::ErrorKind::TimedOut = err_kind {
continue;
}
}
return Err(Error::Io(err));
}
continue;
}
};
log::trace!("[{info:?}] received incoming frame");
producer.send(IncomingFrame::new(frame, info.clone()))?;
log::trace!("[{info:?}] sent incoming frame to API");
}
}
fn handle_stop(
mut state: SharedCloser,
conn_state: Closable,
info: ChannelInfo,
write_handler: thread::JoinHandle<Result<()>>,
read_handler: thread::JoinHandle<Result<()>>,
) {
while !(state.is_closed()
|| conn_state.is_closed()
|| write_handler.is_finished()
|| read_handler.is_finished())
{
thread::sleep(CHANNEL_STOP_POOLING_INTERVAL);
}
state.close();
for i in 0..CHANNEL_STOP_JOIN_ATTEMPTS {
if write_handler.is_finished() && read_handler.is_finished() {
break;
}
thread::sleep(CHANNEL_STOP_JOIN_POOLING_INTERVAL);
if i == CHANNEL_STOP_JOIN_ATTEMPTS - 1 {
log::warn!(
"[{info:?}] write/read handlers are stuck, finished: write={}, read={}",
write_handler.is_finished(),
read_handler.is_finished()
);
return;
}
}
if let (Ok(res_write), Ok(res_read)) = (write_handler.join(), read_handler.join()) {
if let Err(err) = res_write {
log::debug!("[{info:?}] write handler finished with error: {err:?}")
}
if let Err(err) = res_read {
log::debug!("[{info:?}] read handler finished with error: {err:?}")
}
} else {
log::error!("[{info:?}] error joining read/write handlers");
}
log::trace!("[{info:?}] handlers stopped");
}
}