1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
//! # Description
//!
//! This crate aims to make it easier to reason about uni-directional and bi-directional nonblocking I/O.
//!
//! This is done using patterns that extend beyond dealing directly with raw bytes, the [`std::io::Read`] and [`std::io::Write`] traits,
//! and [`std::io::ErrorKind::WouldBlock`] errors. Since this crate's main focus is nonblocking I/O, all [`Session`] implementations provided
//! by this crate are non-blocking by default.
//!
//! # Sessions
//!
//! The core [`Session`] trait encapsulates controlling a single instance of a connection or logical session.
//! To differentiate with the [`std::io::Read`] and [`std::io::Write`] traits that only deal with raw bytes, this
//! crate uses [`Publish`] and [`Receive`] terminology, which utilize associated types to handle any payload type.
//!
//! A [`Session`] impl is typically also either [`Publish`], [`Receive`], or both.
//! While the [`tcp`] module provides a [`Session`] implementation that provides unframed non-blocking binary IO operations,
//! other [`Session`] impls are able to provide significantly more functionality using the same non-blocking patterns.
//!
//! This crate will often use the term `Duplex` to distinguish a [`Session`] that is **both** [`Publish`] and [`Receive`].
//!
//! # Associated Types
//!
//! Sessions operate on implementation-specific [`Receive::ReceivePayload`] and [`Publish::PublishPayload`] types.
//! These types are able to utilize a lifetime `'a`, which is tied to the lifetime of the underlying [`Session`],
//! providing the ability for implementations to reference internal buffers or queues without copying.
//!
//! # Errors
//!
//! The philosophy of this crate is that an [`Err`] should always represent a transport or protocol-level error.
//! An [`Err`] should not be returned by a function as a condition that should be handled during **normal** branching logic.
//! As a result, instead of forcing you to handle [`std::io::ErrorKind::WouldBlock`] everywhere you deal with nonblocking code,
//! this crate will indicate partial receive/publish operations using [`ReceiveOutcome::Idle`], [`ReceiveOutcome::Active`],
//! and [`PublishOutcome::Incomplete`] as [`Result::Ok`].
//!
//! # Features
//!
//! The [`Session`] impls in this crate are enabled by certain features.
//! By default, features that do not require a special build environment are enabled for rapid prototyping.
//! In a production codebase, you will likey want to pick and choose your required features.
//!
//! Feature list:
//! - `aeron`
//! - `crossbeam`
//! - `http`
//! - `mock`
//! - `mpsc`
//! - `tcp`
//! - `websocket`
//!
//! Features not enabled by default:
//! - `aeron`: requires `cmake` and `clang`.
//!
//! # Examples
//!
//! ## Streaming TCP
//!
//! The following example shows how to use streaming TCP to publish and receive a traditional stream of bytes.
//!
//! ```no_run
//! use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
//! use nbio::tcp::TcpSession;
//!
//! // establish connection
//! let mut client = TcpSession::connect("192.168.123.456:54321", None, None).unwrap();
//!
//! // publish some bytes until completion
//! let mut pending_publish = "hello world!".as_bytes();
//! while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
//! pending_publish = pending;
//! }
//!
//! // print received bytes
//! loop {
//! if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
//! println!("received: {payload:?}");
//! }
//! }
//! ```
//!
//! ## Framing TCP
//!
//! The following example shows how to [`frame`] messages over TCP to publish and receive payloads framed with a preceeding u64 length field.
//! Notice how it is almost identical to the code above, except it guarantees that read slices are always identical to their corresponding write slices.
//!
//! ```no_run
//! use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
//! use nbio::tcp::TcpSession;
//! use nbio::frame::{FrameDuplex, U64FrameDeserializer, U64FrameSerializer};
//!
//! // establish connection wrapped in a framing session
//! let client = TcpSession::connect("192.168.123.456:54321", None, None).unwrap();
//! let mut client = FrameDuplex::new(client, U64FrameDeserializer::new(), U64FrameSerializer::new(), 4096);
//!
//! // publish some bytes until completion
//! let mut pending_publish = "hello world!".as_bytes();
//! while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
//! pending_publish = pending;
//! }
//!
//! // print received bytes
//! loop {
//! if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
//! println!("received: {payload:?}");
//! }
//! }
//! ```
//!
//! ## HTTP Client
//!
//! The following example shows how to use the [`http`] module to drive an HTTP 1.x request/response using the same non-blocking model.
//! Notice how the primitives of driving a buffered write to completion and receiving a framed response is the same as any other framed session.
//! In fact, the `conn` returned by `client.request(..)` is simply a [`frame::FrameDuplex`] that utilizes a [`http::Http1RequestSerializer`] and
//! [`http::Http1ResponseDeserializer`].
//!
//! ```no_run
//! use http::Request;
//! use nbio::{Receive, Session, ReceiveOutcome};
//! use nbio::http::HttpClient;
//! use tcp_stream::OwnedTLSConfig;
//!
//! // create the client and make the request
//! let mut client = HttpClient::new();
//! let mut conn = client
//! .request(Request::get("http://icanhazip.com").body(()).unwrap())
//! .unwrap();
//!
//! // read the conn until a full response is received
//! loop {
//! if let ReceiveOutcome::Payload(r) = conn.receive().unwrap() {
//! println!("Response Body: {}", String::from_utf8_lossy(r.body()));
//! break;
//! }
//! }
//! ```
//!
//! ## WebSocket
//!
//! The following example sends a message and then receives all subsequent messages from a websocket connection.
//! Just like the HTTP example, this simply encapsulates [`frame::FrameDuplex`] but utilizes a [`websocket::WebSocketFrameSerializer`]
//! and [`websocket::WebSocketFrameDeserializer`]. All TLS and WebSocket handshaking is taken care of during the
//! [`SessionStatus::Establishing`] [`Session::status`] workflow.
//!
//! ```no_run
//! use nbio::{Publish, PublishOutcome, Receive, Session, SessionStatus, ReceiveOutcome};
//! use nbio::websocket::{Message, WebSocketSession};
//!
//! // connect and drive the handshake
//! let mut session = WebSocketSession::connect("wss://echo.websocket.org/", None, None).unwrap();
//! while session.status() == SessionStatus::Establishing {
//! session.drive().unwrap();
//! }
//!
//! // publish a message
//! let mut pending_publish = Message::Text("hello world!".into());
//! while let PublishOutcome::Incomplete(pending) = session.publish(pending_publish).unwrap() {
//! pending_publish = pending;
//! }
//!
//! // receive messages
//! loop {
//! if let ReceiveOutcome::Payload(r) = session.receive().unwrap() {
//! println!("Received: {:?}", r);
//! break;
//! }
//! }
//! ```
pub extern crate crossbeam_channel;
pub extern crate http as hyperium_http;
pub extern crate tcp_stream;
pub extern crate tungstenite;
use ;
/// An instance of a connection or logical session, which may also support [`Receive`], [`Publish`], or dispatching received events to a [`Callback`]/[`CallbackRef`].
///
/// ## Connecting
///
/// Some implementations may not default to an established state, in which case immediate calls to `publish()` and `receive()` will fail.
/// The [`Session::status`] function provides the current status, which will not return `Established` until all required handshakes are complete.
/// When [`Session::status`] returns [`SessionStatus::Establishing`], you may drive the connection process via the [`Session::drive`] function.
///
/// ## Retrying
///
/// The [`Ok`] result of `publish(..)` and `receive(..)` operations may return [`ReceiveOutcome::Idle`], [`ReceiveOutcome::Active`], or [`PublishOutcome::Incomplete`].
/// These outcomes indicate that an operation may need to be retried. See [`ReceiveOutcome`] and [`PublishOutcome`] for more details.
///
/// ## Duty Cycles
///
/// The [`Session::drive`] operation is used to finish connecting and to service reading/writing buffered data and to dispatch callbacks.
/// Most, but not all, [`Session`] implementations will require periodic calls to [`Session::drive`] in order to function.
/// Implementations that do not require calls to [`Session::drive`] will no-op when it is called.
///
/// [`Receive::receive`] and [`Publish::publish`] will implicitly drive, allowing users to receive or publish/retry in a tight loop without intermediately calling drive.
/// This means that all [`Publish`] and [`Receive`] implementations must call drive internally.
///
/// ## Publishing
///
/// Session impls that can publish data will implement [`Publish`].
///
/// ## Receiving
///
/// Session impls that can receive data via polling implement [`Receive`].
/// Impls that receive data via callbacks will accept a [`Callback`] or [`CallbackRef`] as input.
///
/// For cross-compatibilty between [`Receive`] and [`Callback`]/[`CallbackRef`] paradiagms, see the [`callback`] module.
/// - [`callback::CallbackQueue`] impls [`Callback`]
/// Returned by the [`Session::status`] function, providing the current connection state
/// Returned by the [`Session::drive`] function, providing the result of the drive operation
/// A [`Session`] implementation that can receive payloads via polling.
/// Returned by the [`Receive::receive`] function, providing the outcome or information about the receive action.
///
/// The generic type `T` will match the cooresponding [`Receive::ReceivePayload`].
/// A [`Session`] implementation that can publish payloads.
/// A [`Publish`] implementation that exposes a blocking flush operation.
/// Returned by the [`Publish::publish`] function, providing the outcome of the publish action.
///
/// The generic type `T` will match the cooresponding [`Publish::PublishPayload`].
/// Used by push-oriented receivers to handle moved payloads as they are received.
///
/// See the [`compat`] module for [`Receive`] compatibility
/// Used by push-oriented receivers to handle payload references as they are received.
///
/// See the [`compat`] module for [`Receive`] compatibility