nbio/lib.rs
1//! # Description
2//!
3//! This crate aims to make it easier to reason about uni-directional and bi-directional nonblocking I/O.
4//!
5//! This is done using patterns that extend beyond dealing directly with raw bytes, the [`std::io::Read`] and [`std::io::Write`] traits,
6//! and [`std::io::ErrorKind::WouldBlock`] errors. Since this crate's main focus is nonblocking I/O, all [`Session`] implementations provided
7//! by this crate are non-blocking by default.
8//!
9//! # Sessions
10//!
11//! The core [`Session`] trait encapsulates controlling a single instance of a connection or logical session.
12//! To differentiate with the [`std::io::Read`] and [`std::io::Write`] traits that only deal with raw bytes, this
13//! crate uses [`Publish`] and [`Receive`] terminology, which utilize associated types to handle any payload type.
14//!
15//! A [`Session`] impl is typically also either [`Publish`], [`Receive`], or both.
16//! While the [`tcp`] module provides a [`Session`] implementation that provides unframed non-blocking binary IO operations,
17//! other [`Session`] impls are able to provide significantly more functionality using the same non-blocking patterns.
18//!
19//! This crate will often use the term `Duplex` to distinguish a [`Session`] that is **both** [`Publish`] and [`Receive`].
20//!
21//! # Associated Types
22//!
23//! Sessions operate on implementation-specific [`Receive::ReceivePayload`] and [`Publish::PublishPayload`] types.
24//! These types are able to utilize a lifetime `'a`, which is tied to the lifetime of the underlying [`Session`],
25//! providing the ability for implementations to reference internal buffers or queues without copying.
26//!
27//! # Errors
28//!
29//! The philosophy of this crate is that an [`Err`] should always represent a transport or protocol-level error.
30//! An [`Err`] should not be returned by a function as a condition that should be handled during **normal** branching logic.
31//! As a result, instead of forcing you to handle [`std::io::ErrorKind::WouldBlock`] everywhere you deal with nonblocking code,
32//! this crate will indicate partial receive/publish operations using [`ReceiveOutcome::Idle`], [`ReceiveOutcome::Active`],
33//! and [`PublishOutcome::Incomplete`] as [`Result::Ok`].
34//!
35//! # Features
36//!
37//! The [`Session`] impls in this crate are enabled by certain features.
38//! By default, features that do not require a special build environment are enabled for rapid prototyping.
39//! In a production codebase, you will likey want to pick and choose your required features.
40//!
41//! Feature list:
42//! - `aeron`
43//! - `crossbeam`
44//! - `http`
45//! - `mock`
46//! - `mpsc`
47//! - `tcp`
48//! - `websocket`
49//!
50//! Features not enabled by default:
51//! - `aeron`: requires `cmake` and `clang`.
52//!
53//! # Examples
54//!
55//! ## Streaming TCP
56//!
57//! The following example shows how to use streaming TCP to publish and receive a traditional stream of bytes.
58//!
59//! ```no_run
60//! use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
61//! use nbio::tcp::TcpSession;
62//!
63//! // establish connection
64//! let mut client = TcpSession::connect("192.168.123.456:54321", None, None).unwrap();
65//!
66//! // publish some bytes until completion
67//! let mut pending_publish = "hello world!".as_bytes();
68//! while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
69//! pending_publish = pending;
70//! }
71//!
72//! // print received bytes
73//! loop {
74//! if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
75//! println!("received: {payload:?}");
76//! }
77//! }
78//! ```
79//!
80//! ## Framing TCP
81//!
82//! The following example shows how to [`frame`] messages over TCP to publish and receive payloads framed with a preceeding u64 length field.
83//! Notice how it is almost identical to the code above, except it guarantees that read slices are always identical to their corresponding write slices.
84//!
85//! ```no_run
86//! use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
87//! use nbio::tcp::TcpSession;
88//! use nbio::frame::{FrameDuplex, U64FrameDeserializer, U64FrameSerializer};
89//!
90//! // establish connection wrapped in a framing session
91//! let client = TcpSession::connect("192.168.123.456:54321", None, None).unwrap();
92//! let mut client = FrameDuplex::new(client, U64FrameDeserializer::new(), U64FrameSerializer::new(), 4096);
93//!
94//! // publish some bytes until completion
95//! let mut pending_publish = "hello world!".as_bytes();
96//! while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
97//! pending_publish = pending;
98//! }
99//!
100//! // print received bytes
101//! loop {
102//! if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
103//! println!("received: {payload:?}");
104//! }
105//! }
106//! ```
107//!
108//! ## HTTP Client
109//!
110//! The following example shows how to use the [`http`] module to drive an HTTP 1.x request/response using the same non-blocking model.
111//! Notice how the primitives of driving a buffered write to completion and receiving a framed response is the same as any other framed session.
112//! In fact, the `conn` returned by `client.request(..)` is simply a [`frame::FrameDuplex`] that utilizes a [`http::Http1RequestSerializer`] and
113//! [`http::Http1ResponseDeserializer`].
114//!
115//! ```no_run
116//! use http::Request;
117//! use nbio::{Receive, Session, ReceiveOutcome};
118//! use nbio::http::HttpClient;
119//! use tcp_stream::OwnedTLSConfig;
120//!
121//! // create the client and make the request
122//! let mut client = HttpClient::new();
123//! let mut conn = client
124//! .request(Request::get("http://icanhazip.com").body(()).unwrap())
125//! .unwrap();
126//!
127//! // read the conn until a full response is received
128//! loop {
129//! if let ReceiveOutcome::Payload(r) = conn.receive().unwrap() {
130//! println!("Response Body: {}", String::from_utf8_lossy(r.body()));
131//! break;
132//! }
133//! }
134//! ```
135//!
136//! ## WebSocket
137//!
138//! The following example sends a message and then receives all subsequent messages from a websocket connection.
139//! Just like the HTTP example, this simply encapsulates [`frame::FrameDuplex`] but utilizes a [`websocket::WebSocketFrameSerializer`]
140//! and [`websocket::WebSocketFrameDeserializer`]. All TLS and WebSocket handshaking is taken care of during the
141//! [`SessionStatus::Establishing`] [`Session::status`] workflow.
142//!
143//! ```no_run
144//! use nbio::{Publish, PublishOutcome, Receive, Session, SessionStatus, ReceiveOutcome};
145//! use nbio::websocket::{Message, WebSocketSession};
146//!
147//! // connect and drive the handshake
148//! let mut session = WebSocketSession::connect("wss://echo.websocket.org/", None, None).unwrap();
149//! while session.status() == SessionStatus::Establishing {
150//! session.drive().unwrap();
151//! }
152//!
153//! // publish a message
154//! let mut pending_publish = Message::Text("hello world!".into());
155//! while let PublishOutcome::Incomplete(pending) = session.publish(pending_publish).unwrap() {
156//! pending_publish = pending;
157//! }
158//!
159//! // receive messages
160//! loop {
161//! if let ReceiveOutcome::Payload(r) = session.receive().unwrap() {
162//! println!("Received: {:?}", r);
163//! break;
164//! }
165//! }
166//! ```
167
168#[cfg(any(feature = "crossbeam"))]
169pub extern crate crossbeam_channel;
170#[cfg(any(feature = "http"))]
171pub extern crate http as hyperium_http;
172#[cfg(any(feature = "tcp"))]
173pub extern crate tcp_stream;
174#[cfg(any(feature = "websocket"))]
175pub extern crate tungstenite;
176
177#[cfg(any(feature = "aeron"))]
178pub mod aeron;
179pub mod buffer;
180pub mod compat;
181#[cfg(any(feature = "crossbeam"))]
182pub mod crossbeam;
183pub mod dns;
184pub mod frame;
185#[cfg(any(feature = "http"))]
186pub mod http;
187pub mod liveness;
188#[cfg(any(feature = "mock"))]
189pub mod mock;
190#[cfg(any(feature = "mpsc"))]
191pub mod mpsc;
192#[cfg(any(feature = "tcp"))]
193pub mod tcp;
194pub mod tls;
195#[cfg(any(feature = "websocket"))]
196pub mod websocket;
197
198use std::{fmt::Debug, io::Error};
199
200/// An instance of a connection or logical session, which may also support [`Receive`], [`Publish`], or dispatching received events to a [`Callback`]/[`CallbackRef`].
201///
202/// ## Connecting
203///
204/// Some implementations may not default to an established state, in which case immediate calls to `publish()` and `receive()` will fail.
205/// The [`Session::status`] function provides the current status, which will not return `Established` until all required handshakes are complete.
206/// When [`Session::status`] returns [`SessionStatus::Establishing`], you may drive the connection process via the [`Session::drive`] function.
207///
208/// ## Retrying
209///
210/// The [`Ok`] result of `publish(..)` and `receive(..)` operations may return [`ReceiveOutcome::Idle`], [`ReceiveOutcome::Active`], or [`PublishOutcome::Incomplete`].
211/// These outcomes indicate that an operation may need to be retried. See [`ReceiveOutcome`] and [`PublishOutcome`] for more details.
212///
213/// ## Duty Cycles
214///
215/// The [`Session::drive`] operation is used to finish connecting and to service reading/writing buffered data and to dispatch callbacks.
216/// Most, but not all, [`Session`] implementations will require periodic calls to [`Session::drive`] in order to function.
217/// Implementations that do not require calls to [`Session::drive`] will no-op when it is called.
218///
219/// [`Receive::receive`] and [`Publish::publish`] will implicitly drive, allowing users to receive or publish/retry in a tight loop without intermediately calling drive.
220/// This means that all [`Publish`] and [`Receive`] implementations must call drive internally.
221///
222/// ## Publishing
223///
224/// Session impls that can publish data will implement [`Publish`].
225///
226/// ## Receiving
227///
228/// Session impls that can receive data via polling implement [`Receive`].
229/// Impls that receive data via callbacks will accept a [`Callback`] or [`CallbackRef`] as input.
230///
231/// For cross-compatibilty between [`Receive`] and [`Callback`]/[`CallbackRef`] paradiagms, see the [`callback`] module.
232/// - [`callback::CallbackQueue`] impls [`Callback`]
233pub trait Session: Debug {
234 /// Check the current session status.
235 ///
236 /// If this returns [`SessionStatus::Establishing`], use [`Session::drive`] to progress the connection process.
237 fn status(&self) -> SessionStatus;
238
239 /// Some implementations will internally buffer payloads or require a duty cycle to drive callbacks.
240 /// Those implementations will require `drive(..)` to be called continuously to completely publish and/or receive data.
241 /// This function will return [`DriveOutcome::Active`] if work was done, indicating to any scheduler that more work may be pending.
242 /// When this function returns [`DriveOutcome::Idle`], only then should it indicate to a scheduler that yielding or idling is appropriate.
243 fn drive(&mut self) -> Result<DriveOutcome, Error>;
244}
245
246/// Returned by the [`Session::status`] function, providing the current connection state
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
248pub enum SessionStatus {
249 /// Session attempting to connect, handshake, or otherwise establish, and will move to `Established` or `Terminated` as [`Session::drive`] is called.
250 Establishing,
251 /// Session is currently established, and will move `Terminated` when an unrecoverable error is encountered
252 Established,
253 /// Session terminal state, connection has been closed
254 Terminated,
255}
256
257/// Returned by the [`Session::drive`] function, providing the result of the drive operation
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
259pub enum DriveOutcome {
260 /// The drive operation resulted in work being done, which means the user should attempt to call [`Session::drive`] again as soon as possible.
261 Active,
262 /// The drive operation did not result in any work being done, which means the user may decide to yield or backoff.
263 Idle,
264}
265
266/// A [`Session`] implementation that can receive payloads via polling.
267pub trait Receive: Session {
268 /// The type returned by the `receive(..)` function.
269 type ReceivePayload<'a>
270 where
271 Self: 'a;
272
273 /// Attempt to receive a `payload` from the session.
274 /// This will return [`ReceiveOutcome::Payload`] when data has been received.
275 /// [`ReceiveOutcome::Active`] can be used to report that work was completed, but data is not ready.
276 /// This means that only [`ReceiveOutcome::None`] should be used to indicate to a scheduler that yielding or idling is appropriate.
277 fn receive<'a>(&'a mut self) -> Result<ReceiveOutcome<Self::ReceivePayload<'a>>, Error>;
278}
279
280/// Returned by the [`Receive::receive`] function, providing the outcome or information about the receive action.
281///
282/// The generic type `T` will match the cooresponding [`Receive::ReceivePayload`].
283pub enum ReceiveOutcome<T> {
284 /// Contains a reference to payload received from the [`Receive::receive`] action.
285 Payload(T),
286
287 /// Data was buffered. This means a partial payload was received, but could not be returned as complete `Data`.
288 Active,
289
290 /// No work was done. This is useful to signal to a scheduler or idle strategy that it may be time to yield.
291 Idle,
292}
293impl<T: Debug> Debug for ReceiveOutcome<T> {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 match self {
296 ReceiveOutcome::Payload(x) => f.write_str(&format!("ReceiveOutcome::Payload({x:?})")),
297 ReceiveOutcome::Active => f.write_str("ReceiveOutcome::Active"),
298 ReceiveOutcome::Idle => f.write_str("ReceiveOutcome::Idle"),
299 }
300 }
301}
302impl<T: Clone> Clone for ReceiveOutcome<T> {
303 fn clone(&self) -> Self {
304 match self {
305 ReceiveOutcome::Payload(x) => ReceiveOutcome::Payload(x.clone()),
306 ReceiveOutcome::Active => ReceiveOutcome::Active,
307 ReceiveOutcome::Idle => ReceiveOutcome::Idle,
308 }
309 }
310}
311
312/// A [`Session`] implementation that can publish payloads.
313pub trait Publish: Session {
314 /// The type given to the `publish(..)` function.
315 type PublishPayload<'a>
316 where
317 Self: 'a;
318
319 /// Write the given `payload` to the session.
320 ///
321 /// This will return [`PublishOutcome::Incomplete`] if the publish is not immediately completed fully,
322 /// in which case `T` of `Incomplete(T)` data must be retried.
323 ///
324 /// Note that it is possible for some implementations that a publish is partially complete, so you must
325 /// re-attempt the data encapsulated by `Incomplete`, not the data originally passed into the function.
326 /// This guidance can only be ignored when you are not writing generic code and you know that your
327 /// [`Publish`] impl is "all-or-none".
328 fn publish<'a>(
329 &mut self,
330 payload: Self::PublishPayload<'a>,
331 ) -> Result<PublishOutcome<Self::PublishPayload<'a>>, Error>;
332}
333
334/// A [`Publish`] implementation that exposes a blocking flush operation.
335pub trait Flush: Publish {
336 /// Flush all pending publish data, blocking until completion.
337 fn flush(&mut self) -> Result<(), Error>;
338}
339
340/// Returned by the [`Publish::publish`] function, providing the outcome of the publish action.
341///
342/// The generic type `T` will match the cooresponding [`Publish::PublishPayload`].
343pub enum PublishOutcome<T> {
344 /// The publish action completed fully
345 Published,
346
347 /// The publish action was not performed or was partially performed.
348 ///
349 /// The returned reference must be passed back into the [`Publish::publish`] function for the publish action to complete.
350 /// Whether or not the returned reference may consist of partial data depends on the [`Session`] implementation.
351 ///
352 /// If you are looking for a general retry pattern, it is **always** safe to finish the publish by passing this returned
353 /// reference back into the `publish` function for another attempt, but it is only **sometimes** appropriate to return the entire
354 /// original publish reference into the `publish` function for a second attempt.
355 Incomplete(T),
356}
357impl<T> PublishOutcome<T> {
358 pub fn is_published(&self) -> bool {
359 match self {
360 PublishOutcome::Published => true,
361 PublishOutcome::Incomplete(_) => false,
362 }
363 }
364 pub fn is_incomplete(&self) -> bool {
365 match self {
366 PublishOutcome::Published => false,
367 PublishOutcome::Incomplete(_) => true,
368 }
369 }
370}
371impl<T: Debug> Debug for PublishOutcome<T> {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 match self {
374 PublishOutcome::Published => f.write_str("PublishOutcome::Published"),
375 PublishOutcome::Incomplete(x) => {
376 f.write_str(&format!("PublishOutcome::Incomplete({x:?})"))
377 }
378 }
379 }
380}
381impl<T: Clone> Clone for PublishOutcome<T> {
382 fn clone(&self) -> Self {
383 match self {
384 PublishOutcome::Published => PublishOutcome::Published,
385 PublishOutcome::Incomplete(x) => PublishOutcome::Incomplete(x.clone()),
386 }
387 }
388}
389
390/// Used by push-oriented receivers to handle moved payloads as they are received.
391///
392/// See the [`compat`] module for [`Receive`] compatibility
393pub trait Callback<T> {
394 fn callback(&mut self, payload: T);
395}
396impl<T, F: FnMut(T)> Callback<T> for F {
397 fn callback(&mut self, payload: T) {
398 self(payload)
399 }
400}
401impl<T> Callback<T> for () {
402 fn callback(&mut self, _payload: T) {}
403}
404
405/// Used by push-oriented receivers to handle payload references as they are received.
406///
407/// See the [`compat`] module for [`Receive`] compatibility
408pub trait CallbackRef<T: ?Sized> {
409 fn callback_ref(&mut self, payload: &T);
410
411 /// Compatibility helper to convert any `CallbackRef` into a [`Callback`] when `T` is [`Sized`].
412 ///
413 /// This provides maximum compatibility when ownership of `T` is not required, as it allows users
414 /// to always implement [`CallbackRef`] which can then be used in either circumstance.
415 fn into_callback(mut self) -> impl Callback<T>
416 where
417 Self: Sized,
418 T: Sized,
419 {
420 move |x| self.callback_ref(&x)
421 }
422}
423impl<T: ?Sized, F: FnMut(&T)> CallbackRef<T> for F {
424 fn callback_ref(&mut self, payload: &T) {
425 self(payload)
426 }
427}
428impl<T> CallbackRef<T> for () {
429 fn callback_ref(&mut self, _payload: &T) {}
430}