thrift_async/
lib.rs

1//! # thrift_async
2//!
3//! Half-asynchrounous, half-synchrounous implementation of an [Apache Thrift] server.
4//!
5//! This crate is fully compatible with the [`thrift`] crate.
6//!
7//! Example usage:
8//!
9//! ```rust
10//! use thrift_async::TAsyncServer;
11//!
12//! fn main() {
13//!     let processor = <some TProcessor>;
14//!
15//!     TAsyncServer::new(processor)
16//!         .listen("127.0.0.1:8080")
17//!         .unwrap() // panic on failure
18//! }
19//!
20//! ```
21//!
22//! [Apache Thrift]: https://thrift.apache.org/
23/// [`thrift`]: https://crates.io/crates/thrift
24
25mod transport;
26
27use std::net::SocketAddr;
28use std::sync::Arc;
29
30use failure::Error;
31use futures::try_ready;
32use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
33use thrift::server::TProcessor;
34use tokio::net::TcpListener;
35use tokio::prelude::*;
36
37use crate::transport::{ReadResult, TAReadFramedTransport, TAWriteFramedTransport};
38
39pub const DEFAULT_MAX_FRAME_SIZE: u32 = 256 * 1024 * 1024;
40pub const DEFAULT_CORE_READ_FRAME_SIZE: u32 = 2 * 1024 * 1024;
41pub const DEFAULT_CORE_WRITE_FRAME_SIZE: u32 = 2 * 1024 * 1024;
42pub const DEFAULT_CORE_RESIZE_FREQUENCY: u64 = 512;
43
44#[derive(Clone, Copy, Debug)]
45struct CoreResize {
46    read_size: usize,
47    write_size: usize,
48    frequency: u64,
49}
50
51struct TAsyncProcessor<R: AsyncRead, W: AsyncWrite, P: TProcessor> {
52    reader: R,
53    writer: W,
54    processor: Arc<P>,
55    max_frame_size: usize,
56    core_resize: CoreResize,
57}
58
59impl<R: AsyncRead, W: AsyncWrite, P: TProcessor> TAsyncProcessor<R, W, P> {
60    fn new(
61        reader: R,
62        writer: W,
63        processor: Arc<P>,
64        max_frame_size: usize,
65        core_resize: CoreResize,
66    ) -> Self {
67        TAsyncProcessor {
68            reader,
69            writer,
70            processor,
71            max_frame_size,
72            core_resize,
73        }
74    }
75}
76
77impl<R: AsyncRead, W: AsyncWrite, P: TProcessor> Future for TAsyncProcessor<R, W, P> {
78    type Item = ();
79    type Error = ();
80
81    fn poll(&mut self) -> Result<Async<<Self as Future>::Item>, <Self as Future>::Error> {
82        let max_frame_size = self.max_frame_size as usize;
83        let mut read_transport = TAReadFramedTransport::new(&mut self.reader, max_frame_size);
84
85        let mut write_transport = TAWriteFramedTransport::new(&mut self.writer);
86
87        let mut processed_count = 0u64;
88
89        loop {
90            match try_ready!(read_transport.poll()) {
91                ReadResult::Ok => (),
92                ReadResult::EOF => break, // EOF reached - client disconnected
93                ReadResult::TooLarge(size) => {
94                    // Frame is too large (non-thrift framed input?) - disconnect client
95                    eprintln!(
96                        "Frame size {} too large (maximum: {})",
97                        size, self.max_frame_size
98                    );
99                    break
100                },
101            }
102
103            let read_cursor = read_transport.frame_cursor();
104
105            {
106                // TODO: I want NLL :(
107                let write_cursor = write_transport.frame_cursor();
108
109                let mut input_protocol = TBinaryInputProtocol::new(read_cursor, false);
110                let mut output_protocol = TBinaryOutputProtocol::new(write_cursor, false);
111
112                let process = self
113                    .processor
114                    .process(&mut input_protocol, &mut output_protocol);
115
116                if let Err(e) = process {
117                    eprintln!("Error processing thrift input - {:?}", e);
118                }
119            }
120
121            try_ready!(write_transport.poll());
122
123            // resize core frame sizes
124            processed_count += 1;
125            if processed_count % self.core_resize.frequency == 0 {
126                read_transport.core_resize(self.core_resize.read_size);
127                write_transport.core_resize(self.core_resize.write_size);
128            }
129        }
130
131        Ok(Async::Ready(()))
132    }
133}
134
135#[derive(Debug, Clone)]
136pub struct TAsyncServer<P: TProcessor + Send + Sync + 'static> {
137    processor: Arc<P>,
138    max_frame_size: u32,
139    core_read_frame_size: u32,
140    core_write_frame_size: u32,
141    core_resize_frequency: u64,
142}
143
144impl<P: TProcessor + Send + Sync + 'static> TAsyncServer<P> {
145    /// Create a new almost-asynchronous server, from a synchronous request `TProcessor`.
146    ///
147    /// Input/Output transports **must** be framed. Input/Output protocol **must** be binary.
148    ///
149    /// The server accepts incoming connections, keeping two frame buffers: for reading and writing
150    /// to the connection socket. All read/write operations happen asynchronously (leveraging
151    /// [`tokio`]).
152    /// Once a frame is fully read, processing happen synchronously within tokio's runtime.
153    ///
154    /// **NOTE** this crate is compatible with code generation from the [`thrift`] crate (fully
155    /// synchronous), hence the almost-asynchronous (or half async, half sync)  model.
156    ///
157    /// [`tokio`]: https://tokio.rs
158    /// [`thrift`]: https://crates.io/crates/thrift
159    pub fn new(processor: P) -> TAsyncServer<P> {
160        TAsyncServer {
161            processor: Arc::new(processor),
162            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
163            core_read_frame_size: DEFAULT_CORE_READ_FRAME_SIZE,
164            core_write_frame_size: DEFAULT_CORE_WRITE_FRAME_SIZE,
165            core_resize_frequency: DEFAULT_CORE_RESIZE_FREQUENCY,
166        }
167    }
168
169    /// The maximum read frame size allowed per client for this server.
170    ///
171    /// Non-framed messages can be interpreted as a huge frame size and can put a big hit on the
172    /// server memory footprint. Limiting the maximum frame size can prevent ill-formed data from
173    /// having too much effect.
174    ///
175    /// Default: 256 MB
176    pub fn max_frame_size(&mut self, max_frame_size: u32) -> &mut Self {
177        self.max_frame_size = max_frame_size;
178        self
179    }
180
181    /// The read frame size at which the server is happy to run with.
182    /// Frame buffer size can temporarily grow higher than this limit (but never higher than
183    /// `max_frame_size`), but the size will periodically be checked.
184    /// If the frame buffer size is higher than this limit during the check, the buffer will be
185    /// rebuilt, to reduce memory footprint.
186    ///
187    /// Default: 2 MB
188    pub fn core_read_frame_size(&mut self, core_read_frame_size: u32) -> &mut Self {
189        self.core_read_frame_size = core_read_frame_size;
190        self
191    }
192
193    /// The write frame size at which the server is happy to run with.
194    /// Frame buffer size can temporarily grow higher than this limit, but the size will
195    /// periodically be checked.
196    /// If the frame buffer size is higher than this limit during the check, the buffer will be
197    /// rebuilt, to reduce memory footprint.
198    ///
199    /// Default: 2 MB
200    pub fn core_write_frame_size(&mut self, core_write_frame_size: u32) -> &mut Self {
201        self.core_write_frame_size = core_write_frame_size;
202        self
203    }
204
205    /// The frequency at which frame buffers size will be checked against their core size.
206    ///
207    /// Frequency represents the number of frames processed per client connection.
208    ///
209    /// e.g: check every 512 received requests per client.
210    ///
211    /// Default: 512
212    pub fn core_resize_frequency(&mut self, core_resize_frequency: u64) -> &mut Self {
213        self.core_resize_frequency = core_resize_frequency;
214        self
215    }
216
217    /// Listen for incoming connections on `address`.
218    ///
219    /// `address` should be in the form `host:port`.
220    ///
221    /// e.g: `127.0.0.1:8080`.
222    ///
223    /// Returns an error if the address cannot be parsed, or cannot be bound.
224    pub fn listen(&mut self, address: &str) -> Result<(), Error> {
225        let address = address.parse::<SocketAddr>()?;
226
227        self.listen_address(address)
228    }
229
230    /// Listen for incoming connections on `address`.
231    ///
232    /// Returns an error if the address cannot not be bound.
233    pub fn listen_address(&mut self, address: SocketAddr) -> Result<(), Error> {
234        let socket = TcpListener::bind(&address)?;
235
236        let processor = self.processor.clone();
237        let max_frame_size = self.max_frame_size as usize;
238        let core_resize = CoreResize {
239            read_size: self.core_read_frame_size as usize,
240            write_size: self.core_write_frame_size as usize,
241            frequency: self.core_resize_frequency,
242        };
243
244        let server = socket
245            .incoming()
246            .map_err(|err| eprintln!("Socket Error: {:?}", err))
247            .for_each(move |socket| {
248                let (reader, writer) = socket.split();
249                let processor = processor.clone();
250                let async_processor =
251                    TAsyncProcessor::new(reader, writer, processor, max_frame_size, core_resize);
252
253                tokio::spawn(async_processor)
254            });
255
256        tokio::run(server);
257
258        Ok(())
259
260        /*
261        TODO: one day.
262        self.runtime
263            .spawn(server);
264        
265        self.runtime
266            .shutdown_on_idle()
267            .wait()
268            .unwrap();  // TODO: error
269        */
270    }
271}