1mod transport;
26
27use std::net::SocketAddr;
28use std::sync::Arc;
29
30use failure::Error;
31use futures::try_ready;
32use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
33use thrift::server::TProcessor;
34use tokio::net::TcpListener;
35use tokio::prelude::*;
36
37use crate::transport::{ReadResult, TAReadFramedTransport, TAWriteFramedTransport};
38
39pub const DEFAULT_MAX_FRAME_SIZE: u32 = 256 * 1024 * 1024;
40pub const DEFAULT_CORE_READ_FRAME_SIZE: u32 = 2 * 1024 * 1024;
41pub const DEFAULT_CORE_WRITE_FRAME_SIZE: u32 = 2 * 1024 * 1024;
42pub const DEFAULT_CORE_RESIZE_FREQUENCY: u64 = 512;
43
44#[derive(Clone, Copy, Debug)]
45struct CoreResize {
46 read_size: usize,
47 write_size: usize,
48 frequency: u64,
49}
50
51struct TAsyncProcessor<R: AsyncRead, W: AsyncWrite, P: TProcessor> {
52 reader: R,
53 writer: W,
54 processor: Arc<P>,
55 max_frame_size: usize,
56 core_resize: CoreResize,
57}
58
59impl<R: AsyncRead, W: AsyncWrite, P: TProcessor> TAsyncProcessor<R, W, P> {
60 fn new(
61 reader: R,
62 writer: W,
63 processor: Arc<P>,
64 max_frame_size: usize,
65 core_resize: CoreResize,
66 ) -> Self {
67 TAsyncProcessor {
68 reader,
69 writer,
70 processor,
71 max_frame_size,
72 core_resize,
73 }
74 }
75}
76
77impl<R: AsyncRead, W: AsyncWrite, P: TProcessor> Future for TAsyncProcessor<R, W, P> {
78 type Item = ();
79 type Error = ();
80
81 fn poll(&mut self) -> Result<Async<<Self as Future>::Item>, <Self as Future>::Error> {
82 let max_frame_size = self.max_frame_size as usize;
83 let mut read_transport = TAReadFramedTransport::new(&mut self.reader, max_frame_size);
84
85 let mut write_transport = TAWriteFramedTransport::new(&mut self.writer);
86
87 let mut processed_count = 0u64;
88
89 loop {
90 match try_ready!(read_transport.poll()) {
91 ReadResult::Ok => (),
92 ReadResult::EOF => break, ReadResult::TooLarge(size) => {
94 eprintln!(
96 "Frame size {} too large (maximum: {})",
97 size, self.max_frame_size
98 );
99 break
100 },
101 }
102
103 let read_cursor = read_transport.frame_cursor();
104
105 {
106 let write_cursor = write_transport.frame_cursor();
108
109 let mut input_protocol = TBinaryInputProtocol::new(read_cursor, false);
110 let mut output_protocol = TBinaryOutputProtocol::new(write_cursor, false);
111
112 let process = self
113 .processor
114 .process(&mut input_protocol, &mut output_protocol);
115
116 if let Err(e) = process {
117 eprintln!("Error processing thrift input - {:?}", e);
118 }
119 }
120
121 try_ready!(write_transport.poll());
122
123 processed_count += 1;
125 if processed_count % self.core_resize.frequency == 0 {
126 read_transport.core_resize(self.core_resize.read_size);
127 write_transport.core_resize(self.core_resize.write_size);
128 }
129 }
130
131 Ok(Async::Ready(()))
132 }
133}
134
135#[derive(Debug, Clone)]
136pub struct TAsyncServer<P: TProcessor + Send + Sync + 'static> {
137 processor: Arc<P>,
138 max_frame_size: u32,
139 core_read_frame_size: u32,
140 core_write_frame_size: u32,
141 core_resize_frequency: u64,
142}
143
144impl<P: TProcessor + Send + Sync + 'static> TAsyncServer<P> {
145 pub fn new(processor: P) -> TAsyncServer<P> {
160 TAsyncServer {
161 processor: Arc::new(processor),
162 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
163 core_read_frame_size: DEFAULT_CORE_READ_FRAME_SIZE,
164 core_write_frame_size: DEFAULT_CORE_WRITE_FRAME_SIZE,
165 core_resize_frequency: DEFAULT_CORE_RESIZE_FREQUENCY,
166 }
167 }
168
169 pub fn max_frame_size(&mut self, max_frame_size: u32) -> &mut Self {
177 self.max_frame_size = max_frame_size;
178 self
179 }
180
181 pub fn core_read_frame_size(&mut self, core_read_frame_size: u32) -> &mut Self {
189 self.core_read_frame_size = core_read_frame_size;
190 self
191 }
192
193 pub fn core_write_frame_size(&mut self, core_write_frame_size: u32) -> &mut Self {
201 self.core_write_frame_size = core_write_frame_size;
202 self
203 }
204
205 pub fn core_resize_frequency(&mut self, core_resize_frequency: u64) -> &mut Self {
213 self.core_resize_frequency = core_resize_frequency;
214 self
215 }
216
217 pub fn listen(&mut self, address: &str) -> Result<(), Error> {
225 let address = address.parse::<SocketAddr>()?;
226
227 self.listen_address(address)
228 }
229
230 pub fn listen_address(&mut self, address: SocketAddr) -> Result<(), Error> {
234 let socket = TcpListener::bind(&address)?;
235
236 let processor = self.processor.clone();
237 let max_frame_size = self.max_frame_size as usize;
238 let core_resize = CoreResize {
239 read_size: self.core_read_frame_size as usize,
240 write_size: self.core_write_frame_size as usize,
241 frequency: self.core_resize_frequency,
242 };
243
244 let server = socket
245 .incoming()
246 .map_err(|err| eprintln!("Socket Error: {:?}", err))
247 .for_each(move |socket| {
248 let (reader, writer) = socket.split();
249 let processor = processor.clone();
250 let async_processor =
251 TAsyncProcessor::new(reader, writer, processor, max_frame_size, core_resize);
252
253 tokio::spawn(async_processor)
254 });
255
256 tokio::run(server);
257
258 Ok(())
259
260 }
271}