use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use threadpool::ThreadPool;
use {ApplicationError, ApplicationErrorKind};
use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
use super::TProcessor;
#[derive(Debug)]
pub struct TServer<PRC, RTF, IPF, WTF, OPF>
where
PRC: TProcessor + Send + Sync + 'static,
RTF: TReadTransportFactory + 'static,
IPF: TInputProtocolFactory + 'static,
WTF: TWriteTransportFactory + 'static,
OPF: TOutputProtocolFactory + 'static,
{
r_trans_factory: RTF,
i_proto_factory: IPF,
w_trans_factory: WTF,
o_proto_factory: OPF,
processor: Arc<PRC>,
worker_pool: ThreadPool,
}
impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
where PRC: TProcessor + Send + Sync + 'static,
RTF: TReadTransportFactory + 'static,
IPF: TInputProtocolFactory + 'static,
WTF: TWriteTransportFactory + 'static,
OPF: TOutputProtocolFactory + 'static {
pub fn new(
read_transport_factory: RTF,
input_protocol_factory: IPF,
write_transport_factory: WTF,
output_protocol_factory: OPF,
processor: PRC,
num_workers: usize,
) -> TServer<PRC, RTF, IPF, WTF, OPF> {
TServer {
r_trans_factory: read_transport_factory,
i_proto_factory: input_protocol_factory,
w_trans_factory: write_transport_factory,
o_proto_factory: output_protocol_factory,
processor: Arc::new(processor),
worker_pool: ThreadPool::with_name(
"Thrift service processor".to_owned(),
num_workers,
),
}
}
pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
let listener = TcpListener::bind(listen_address)?;
for stream in listener.incoming() {
match stream {
Ok(s) => {
let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
let processor = self.processor.clone();
self.worker_pool
.execute(move || handle_incoming_connection(processor, i_prot, o_prot),);
}
Err(e) => {
warn!("failed to accept remote connection with error {:?}", e);
}
}
}
Err(
::Error::Application(
ApplicationError {
kind: ApplicationErrorKind::Unknown,
message: "aborted listen loop".into(),
},
),
)
}
fn new_protocols_for_connection(
&mut self,
stream: TcpStream,
) -> ::Result<(Box<TInputProtocol + Send>, Box<TOutputProtocol + Send>)> {
let channel = TTcpChannel::with_stream(stream);
let (r_chan, w_chan) = channel.split()?;
let r_tran = self.r_trans_factory.create(Box::new(r_chan));
let i_prot = self.i_proto_factory.create(r_tran);
let w_tran = self.w_trans_factory.create(Box::new(w_chan));
let o_prot = self.o_proto_factory.create(w_tran);
Ok((i_prot, o_prot))
}
}
fn handle_incoming_connection<PRC>(
processor: Arc<PRC>,
i_prot: Box<TInputProtocol>,
o_prot: Box<TOutputProtocol>,
) where
PRC: TProcessor,
{
let mut i_prot = i_prot;
let mut o_prot = o_prot;
loop {
let r = processor.process(&mut *i_prot, &mut *o_prot);
if let Err(e) = r {
warn!("processor completed with error: {:?}", e);
break;
}
}
}