use log::warn;
use std::net::{TcpListener, ToSocketAddrs};
use std::sync::Arc;
use threadpool::ThreadPool;
#[cfg(unix)]
use std::os::unix::net::UnixListener;
#[cfg(unix)]
use std::path::Path;
use crate::protocol::{
TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
};
use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
use crate::{ApplicationError, ApplicationErrorKind};
use super::TProcessor;
use crate::TransportErrorKind;
#[derive(Debug)]
pub struct TServer<PRC, RTF, IPF, WTF, OPF>
where
PRC: TProcessor + Send + Sync + 'static,
RTF: TReadTransportFactory + 'static,
IPF: TInputProtocolFactory + 'static,
WTF: TWriteTransportFactory + 'static,
OPF: TOutputProtocolFactory + 'static,
{
r_trans_factory: RTF,
i_proto_factory: IPF,
w_trans_factory: WTF,
o_proto_factory: OPF,
processor: Arc<PRC>,
worker_pool: ThreadPool,
}
impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
where
PRC: TProcessor + Send + Sync + 'static,
RTF: TReadTransportFactory + 'static,
IPF: TInputProtocolFactory + 'static,
WTF: TWriteTransportFactory + 'static,
OPF: TOutputProtocolFactory + 'static,
{
pub fn new(
read_transport_factory: RTF,
input_protocol_factory: IPF,
write_transport_factory: WTF,
output_protocol_factory: OPF,
processor: PRC,
num_workers: usize,
) -> TServer<PRC, RTF, IPF, WTF, OPF> {
TServer {
r_trans_factory: read_transport_factory,
i_proto_factory: input_protocol_factory,
w_trans_factory: write_transport_factory,
o_proto_factory: output_protocol_factory,
processor: Arc::new(processor),
worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
}
}
pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> {
let listener = TcpListener::bind(listen_address)?;
for stream in listener.incoming() {
match stream {
Ok(s) => {
let channel = TTcpChannel::with_stream(s);
self.handle_stream(channel)?;
}
Err(e) => {
warn!("failed to accept remote connection with error {:?}", e);
}
}
}
Err(crate::Error::Application(ApplicationError {
kind: ApplicationErrorKind::Unknown,
message: "aborted listen loop".into(),
}))
}
#[cfg(unix)]
pub fn listen_uds<P: AsRef<Path>>(&mut self, listen_path: P) -> crate::Result<()> {
let listener = UnixListener::bind(listen_path)?;
for stream in listener.incoming() {
match stream {
Ok(s) => {
self.handle_stream(s)?;
}
Err(e) => {
warn!(
"failed to accept connection via unix domain socket with error {:?}",
e
);
}
}
}
Err(crate::Error::Application(ApplicationError {
kind: ApplicationErrorKind::Unknown,
message: "aborted listen loop".into(),
}))
}
fn handle_stream<S: TIoChannel + Send + 'static>(&mut self, stream: S) -> crate::Result<()> {
let (i_prot, o_prot) = self.new_protocols_for_connection(stream)?;
let processor = self.processor.clone();
self.worker_pool
.execute(move || handle_incoming_connection(processor, i_prot, o_prot));
Ok(())
}
fn new_protocols_for_connection<S: TIoChannel + Send + 'static>(
&mut self,
stream: S,
) -> crate::Result<(
Box<dyn TInputProtocol + Send>,
Box<dyn TOutputProtocol + Send>,
)> {
let (r_chan, w_chan) = stream.split()?;
let r_tran = self.r_trans_factory.create(Box::new(r_chan));
let i_prot = self.i_proto_factory.create(r_tran);
let w_tran = self.w_trans_factory.create(Box::new(w_chan));
let o_prot = self.o_proto_factory.create(w_tran);
Ok((i_prot, o_prot))
}
}
fn handle_incoming_connection<PRC>(
processor: Arc<PRC>,
i_prot: Box<dyn TInputProtocol>,
o_prot: Box<dyn TOutputProtocol>,
) where
PRC: TProcessor,
{
let mut i_prot = i_prot;
let mut o_prot = o_prot;
loop {
match processor.process(&mut *i_prot, &mut *o_prot) {
Ok(()) => {}
Err(err) => {
match err {
crate::Error::Transport(ref transport_err)
if transport_err.kind == TransportErrorKind::EndOfFile => {}
other => warn!("processor completed with error: {:?}", other),
}
break;
}
}
}
}