1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
//! ### What is Retty?
//! Retty is an asynchronous Rust networking framework that makes it easy to build protocols, application clients/servers.
//!
//! It's like [Netty](https://netty.io) or [Wangle](https://github.com/facebook/wangle), but in Rust.
//!
//! ### What is a Pipeline?
//! The fundamental abstraction of Retty is the [Pipeline](crate::channel::Pipeline).
//! It offers immense flexibility to customize how requests and responses are handled by your service.
//! Once you have fully understood this abstraction,
//! you will be able to write all sorts of sophisticated protocols, application clients/servers.
//!  
//! A [Pipeline](crate::channel::Pipeline) is a chain of request/response [handlers](crate::channel::Handler) that handle [inbound](crate::channel::InboundHandler) request and
//! [outbound](crate::channel::OutboundHandler) response. Once you chain handlers together, it provides an agile way to convert
//! a raw data stream into the desired message type and the inverse -- desired message type to raw data stream.
//! Pipeline implements an advanced form of the Intercepting Filter pattern to give a user full control
//! over how an event is handled and how the handlers in a pipeline interact with each other.
//!
//! A [Handler](crate::channel::Handler) should do one and only one function - just like the UNIX philosophy. If you have a handler that
//! is doing more than one function than you should split it into individual handlers. This is really important for
//! maintainability and flexibility as its common to change your protocol for one reason or the other.
//!
//! ### How does an event flow in a Pipeline?
//! ```text
//!                                                       | write()
//!   +---------------------------------------------------+---------------+
//!   |                             Pipeline              |               |
//!   |                                                  \|/              |
//!   |    +---------------------+            +-----------+----------+    |
//!   |    |  InboundHandler  N  |            |  OutboundHandler  N  |    |
//!   |    +----------+----------+            +-----------+----------+    |
//!   |              /|\          \                       |               |
//!   |               |            \........              |               |
//!   |               |                     \             |               |
//!   |               |                     _\|          \|/              |
//!   |    +----------+----------+            +-----------+----------+    |
//!   |    |  InboundHandler N-1 |            |  OutboundHandler N-1 |    |
//!   |    +----------+----------+            +-----------+----------+    |
//!   |              /|\          \                       |               |
//!   |               |            \                      |               |
//!   |               |           OutboundContext.fire_write()            |
//!   |               |                  \                |               |
//!   |               |                   \               |               |
//!   |   InboundContext.fire_read()       \              |               |
//!   |               |                     \             |               |
//!   |               |                     _\|          \|/              |
//!   |    +----------+----------+            +-----------+----------+    |
//!   |    |  InboundHandler  2  |            |  OutboundHandler  2  |    |
//!   |    +----------+----------+            +-----------+----------+    |
//!   |              /|\          \                       |               |
//!   |               |            \........              |               |
//!   |               |                     \             |               |
//!   |               |                     _\|          \|/              |
//!   |    +----------+----------+            +-----------+----------+    |
//!   |    |  InboundHandler  1  |            |  OutboundHandler  1  |    |
//!   |    +----------+----------+            +-----------+----------+    |
//!   |              /|\                                  |               |
//!   +---------------+-----------------------------------+---------------+
//!                   | read()                            |
//!                   |                                  \|/
//!   +---------------+-----------------------------------+---------------+
//!   |               |                                   |               |
//!   |  [ AsyncTransport.read() ]            [ AsyncTransport.write() ]  |
//!   |                                                                   |
//!   |        Internal I/O Threads (Transport Implementation)            |
//!   +-------------------------------------------------------------------+
//! ```
//!
//! ### Echo Server Example
//! Let's look at how to write an echo server.
//!
//! Here's the main piece of code in our echo server; it receives a string from inbound direction in the pipeline,
//! prints it to stdout and sends it back to outbound direction in the pipeline. It's really important to add the
//! line delimiter because our pipeline will use a line decoder.
//! ```ignore
//! struct EchoDecoder;
//! struct EchoEncoder;
//! struct EchoHandler {
//!     decoder: EchoDecoder,
//!     encoder: EchoEncoder,
//! }
//!
//! impl InboundHandler for EchoDecoder {
//!     type Rin = String;
//!     type Rout = Self::Rin;
//!
//!     fn read(
//!         &mut self,
//!         ctx: &InboundContext<Self::Rin, Self::Rout>,
//!         msg: Self::Rin,
//!     ) {
//!         println!("handling {}", msg);
//!         ctx.fire_write(format!("{}\r\n", msg));
//!     }
//! }
//!
//! impl OutboundHandler for EchoEncoder {
//!     type Win = String;
//!     type Wout = Self::Win;
//!
//!     fn write(
//!         &mut self,
//!         ctx: &OutboundContext<Self::Win, Self::Wout>,
//!         msg: Self::Win,
//!     ) {
//!         ctx.fire_write(msg);
//!     }
//! }
//!
//! impl Handler for EchoHandler {
//!     type Rin = String;
//!     type Rout = Self::Rin;
//!     type Win = String;
//!     type Wout = Self::Win;
//!
//!     fn name(&self) -> &str {
//!         "EchoHandler"
//!     }
//!
//!     fn split(
//!         self,
//!     ) -> (
//!         Box<dyn InboundHandler<Rin = Self::Rin, Rout = Self::Rout>>,
//!         Box<dyn OutboundHandler<Win = Self::Win, Wout = Self::Wout>>,
//!     ) {
//!         (Box::new(self.decoder), Box::new(self.encoder))
//!     }
//! }
//! ```
//!
//! This needs to be the final handler in the pipeline. Now the definition of the pipeline is needed to handle the requests and responses.
//! ```ignore
//! let mut bootstrap = BootstrapServerTcp::new();
//! bootstrap.pipeline(Box::new(move |writer: AsyncTransportWrite<TaggedBytesMut>| {
//!     let mut pipeline: Pipeline<TaggedBytesMut, TaggedString> = Pipeline::new();
//!
//!     let async_transport_handler = AsyncTransport::new(writer);
//!     let line_based_frame_decoder_handler = TaggedByteToMessageCodec::new(Box::new(
//!         LineBasedFrameDecoder::new(8192, true, TerminatorType::BOTH),
//!     ));
//!     let string_codec_handler = TaggedStringCodec::new();
//!     let echo_handler = EchoHandler::new();
//!
//!     pipeline.add_back(async_transport_handler);
//!     pipeline.add_back(line_based_frame_decoder_handler);
//!     pipeline.add_back(string_codec_handler);
//!     pipeline.add_back(echo_handler);
//!     pipeline.finalize()
//! }));
//! ```
//!
//! It is very important to be strict in the order of insertion as they are ordered by insertion. The pipeline has 4 handlers:
//!
//! * [AsyncTransport](crate::transport::AsyncTransport)
//!     * Inbound: Reads a raw data stream from the socket and converts it into a zero-copy byte buffer.
//!     * Outbound: Writes the contents of a zero-copy byte buffer to the underlying socket.
//! * [TaggedByteToMessageCodec](crate::codec::byte_to_message_decoder::TaggedByteToMessageCodec)
//!     * Inbound: receives a zero-copy byte buffer and splits on line-endings
//!     * Outbound: just passes the byte buffer to AsyncTransport
//! * [TaggedStringCodec](crate::codec::string_codec::TaggedStringCodec)
//!     * Inbound: receives a byte buffer and decodes it into a std::string and pass up to the EchoHandler.
//!     * Outbound: receives a std::string and encodes it into a byte buffer and pass down to the TaggedByteToMessageCodec.
//! * EchoHandler
//!     * Inbound: receives a std::string and writes it to the pipeline, which will send the message outbound.
//!     * Outbound: receives a std::string and forwards it to TaggedStringCodec.
//!
//! Now that all needs to be done is plug the pipeline factory into a [BootstrapServerTcp](crate::bootstrap::BootstrapTcpServer) and that’s pretty much it.
//! Bind a local host:port and wait for it to stop.
//!
//! ```ignore
//! bootstrap.bind(format!("{}:{}", host, port))?;
//!
//! println!("Press ctrl-c to stop");
//! tokio::select! {
//!     _ = tokio::signal::ctrl_c() => {
//!         bootstrap.graceful_stop().await;
//!     }
//! };
//! ```
//!
//! ### Echo Client Example
//! The code for the echo client is very similar to the Echo Server. Here is the main echo handler.
//! ```ignore
//! impl InboundHandler for EchoDecoder {
//!     type Rin = String;
//!     type Rout = Self::Rin;
//!
//!     fn read(
//!         &mut self,
//!         _ctx: &InboundContext<Self::Rin, Self::Rout>,
//!         msg: Self::Rin,
//!     ) {
//!         println!("received back: {}", msg);
//!     }
//!     fn read_exception(
//!         &mut self,
//!         ctx: &InboundContext<Self::Rin, Self::Rout>,
//!         err: Box<dyn Error + Send + Sync>,
//!     ) {
//!         println!("received exception: {}", err);
//!         ctx.fire_close();
//!     }
//!     fn read_eof(&mut self, ctx: &InboundContext<Self::Rin, Self::Rout>) {
//!         println!("EOF received :(");
//!         ctx.fire_close();
//!     }
//! }
//!
//! impl OutboundHandler for EchoEncoder {
//!     type Win = String;
//!     type Wout = Self::Win;
//!
//!     fn write(
//!         &mut self,
//!         ctx: &OutboundContext<Self::Win, Self::Wout>,
//!         msg: Self::Win,
//!     ) {
//!         ctx.fire_write(msg);
//!     }
//! }
//!
//! impl Handler for EchoHandler {
//!     type Rin = String;
//!     type Rout = Self::Rin;
//!     type Win = String;
//!     type Wout = Self::Win;
//!
//!     fn name(&self) -> &str {
//!         "EchoHandler"
//!     }
//!
//!     fn split(
//!         self,
//!     ) -> (
//!         Box<dyn InboundHandler<Rin = Self::Rin, Rout = Self::Rout>>,
//!         Box<dyn OutboundHandler<Win = Self::Win, Wout = Self::Wout>>,
//!     ) {
//!         (Box::new(self.decoder), Box::new(self.encoder))
//!     }
//! }
//! ```
//!
//! Notice that we override other methods—read_exception and read_eof.
//! There are few other methods that can be overriden. If you need to handle a particular event,
//! just override the corresponding method.
//!
//! Now onto the client’s pipeline factory. It is identical the server’s pipeline factory, which
//! handles writing data.
//! ```ignore
//! let mut bootstrap = BootstrapClientTcp::new();
//! bootstrap.pipeline(Box::new( move |writer: AsyncTransportWrite<TaggedBytesMut>| {
//!     let mut pipeline: Pipeline<TaggedBytesMut, TaggedString> = Pipeline::new();
//!
//!     let async_transport_handler = AsyncTransport::new(writer);
//!     let line_based_frame_decoder_handler = TaggedByteToMessageCodec::new(Box::new(
//!         LineBasedFrameDecoder::new(8192, true, TerminatorType::BOTH),
//!     ));
//!     let string_codec_handler = TaggedStringCodec::new();
//!     let echo_handler = EchoHandler::new();
//!
//!     pipeline.add_back(async_transport_handler);
//!     pipeline.add_back(line_based_frame_decoder_handler);
//!     pipeline.add_back(string_codec_handler);
//!     pipeline.add_back(echo_handler);
//!     pipeline.finalize()
//! }));
//! ```
//!
//! Now that all needs to be done is plug the pipeline factory into a BootstrapTcpClient and that’s pretty much it.
//! Connect to the remote peer and then read line from stdin and write it to pipeline.
//! ```ignore
//! let pipeline = bootstrap.connect(format!("{}:{}", host, port)).await?;
//!
//! println!("Enter bye to stop");
//! let mut buffer = String::new();
//! while tokio::io::stdin().read_line(&mut buffer).await.is_ok() {
//!     match buffer.trim_end() {
//!         "" => break,
//!         line => {
//!             pipeline.write(format!("{}\r\n", line));
//!             if line == "bye" {
//!                 pipeline.close();
//!                 break;
//!             }
//!         }
//!     };
//!     buffer.clear();
//! }
//!
//! bootstrap.graceful_stop().await;
//! ```
#![doc(html_logo_url = "https://raw.githubusercontent.com/retty-io/retty/master/docs/retty.io.jpg")]
#![warn(rust_2018_idioms)]
#![allow(dead_code)]
#![warn(missing_docs)]

pub mod bootstrap;
pub mod channel;
pub mod codec;
pub mod executor;
pub mod transport;