nbio/
lib.rs

1//! # Description
2//!
3//! This crate aims to make it easier to reason about uni-directional and bi-directional nonblocking I/O.
4//!
5//! This is done using patterns that extend beyond dealing directly with raw bytes, the [`std::io::Read`] and [`std::io::Write`] traits,
6//! and [`std::io::ErrorKind::WouldBlock`] errors. Since this crate's main focus is nonblocking I/O, all [`Session`] implementations provided
7//! by this crate are non-blocking by default.
8//!
9//! # Sessions
10//!
11//! The core [`Session`] trait encapsulates controlling a single instance of a connection or logical session.
12//! To differentiate with the [`std::io::Read`] and [`std::io::Write`] traits that only deal with raw bytes, this
13//! crate uses [`Publish`] and [`Receive`] terminology, which utilize associated types to handle any payload type.
14//!
15//! A [`Session`] impl is typically also either [`Publish`], [`Receive`], or both.
16//! While the [`tcp`] module provides a [`Session`] implementation that provides unframed non-blocking binary IO operations,
17//! other [`Session`] impls are able to provide significantly more functionality using the same non-blocking patterns.
18//!
19//! This crate will often use the term `Duplex` to distinguish a [`Session`] that is **both** [`Publish`] and [`Receive`].
20//!
21//! # Associated Types
22//!
23//! Sessions operate on implementation-specific [`Receive::ReceivePayload`] and [`Publish::PublishPayload`] types.
24//! These types are able to utilize a lifetime `'a`, which is tied to the lifetime of the underlying [`Session`],
25//! providing the ability for implementations to reference internal buffers or queues without copying.
26//!
27//! # Errors
28//!
29//! The philosophy of this crate is that an [`Err`] should always represent a transport or protocol-level error.
30//! An [`Err`] should not be returned by a function as a condition that should be handled during **normal** branching logic.
31//! As a result, instead of forcing you to handle [`std::io::ErrorKind::WouldBlock`] everywhere you deal with nonblocking code,
32//! this crate will indicate partial receive/publish operations using [`ReceiveOutcome::Idle`], [`ReceiveOutcome::Active`],
33//! and [`PublishOutcome::Incomplete`] as [`Result::Ok`].
34//!
35//! # Features
36//!
37//! The [`Session`] impls in this crate are enabled by certain features.
38//! By default, features that do not require a special build environment are enabled for rapid prototyping.
39//! In a production codebase, you will likey want to pick and choose your required features.
40//!
41//! Feature list:
42//! - `aeron`
43//! - `crossbeam`
44//! - `http`
45//! - `mock`
46//! - `mpsc`
47//! - `tcp`
48//! - `websocket`
49//!
50//! Features not enabled by default:
51//! - `aeron`: requires `cmake` and `clang`.
52//!
53//! # Examples
54//!
55//! ## Streaming TCP
56//!
57//! The following example shows how to use streaming TCP to publish and receive a traditional stream of bytes.
58//!
59//! ```no_run
60//! use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
61//! use nbio::tcp::TcpSession;
62//!
63//! // establish connection
64//! let mut client = TcpSession::connect("192.168.123.456:54321", None, None).unwrap();
65//!
66//! // publish some bytes until completion
67//! let mut pending_publish = "hello world!".as_bytes();
68//! while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
69//!     pending_publish = pending;
70//! }
71//!
72//! // print received bytes
73//! loop {
74//!     if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
75//!         println!("received: {payload:?}");
76//!     }
77//! }
78//! ```
79//!
80//! ## Framing TCP
81//!
82//! The following example shows how to [`frame`] messages over TCP to publish and receive payloads framed with a preceeding u64 length field.
83//! Notice how it is almost identical to the code above, except it guarantees that read slices are always identical to their corresponding write slices.
84//!
85//! ```no_run
86//! use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
87//! use nbio::tcp::TcpSession;
88//! use nbio::frame::{FrameDuplex, U64FrameDeserializer, U64FrameSerializer};
89//!
90//! // establish connection wrapped in a framing session
91//! let client = TcpSession::connect("192.168.123.456:54321", None, None).unwrap();
92//! let mut client = FrameDuplex::new(client, U64FrameDeserializer::new(), U64FrameSerializer::new(), 4096);
93//!
94//! // publish some bytes until completion
95//! let mut pending_publish = "hello world!".as_bytes();
96//! while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
97//!     pending_publish = pending;
98//! }
99//!
100//! // print received bytes
101//! loop {
102//!     if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
103//!         println!("received: {payload:?}");
104//!     }
105//! }
106//! ```
107//!
108//! ## HTTP Client
109//!
110//! The following example shows how to use the [`http`] module to drive an HTTP 1.x request/response using the same non-blocking model.
111//! Notice how the primitives of driving a buffered write to completion and receiving a framed response is the same as any other framed session.
112//! In fact, the `conn` returned by `client.request(..)` is simply a [`frame::FrameDuplex`] that utilizes a [`http::Http1RequestSerializer`] and
113//! [`http::Http1ResponseDeserializer`].
114//!
115//! ```no_run
116//! use http::Request;
117//! use nbio::{Receive, Session, ReceiveOutcome};
118//! use nbio::http::HttpClient;
119//! use tcp_stream::OwnedTLSConfig;
120//!
121//! // create the client and make the request
122//! let mut client = HttpClient::new();
123//! let mut conn = client
124//!     .request(Request::get("http://icanhazip.com").body(()).unwrap())
125//!     .unwrap();
126//!
127//! // read the conn until a full response is received
128//! loop {
129//!     if let ReceiveOutcome::Payload(r) = conn.receive().unwrap() {
130//!         println!("Response Body: {}", String::from_utf8_lossy(r.body()));
131//!         break;
132//!     }
133//! }
134//! ```
135//!
136//! ## WebSocket
137//!
138//! The following example sends a message and then receives all subsequent messages from a websocket connection.
139//! Just like the HTTP example, this simply encapsulates [`frame::FrameDuplex`] but utilizes a [`websocket::WebSocketFrameSerializer`]
140//! and [`websocket::WebSocketFrameDeserializer`]. All TLS and WebSocket handshaking is taken care of during the
141//! [`SessionStatus::Establishing`] [`Session::status`] workflow.
142//!
143//! ```no_run
144//! use nbio::{Publish, PublishOutcome, Receive, Session, SessionStatus, ReceiveOutcome};
145//! use nbio::websocket::{Message, WebSocketSession};
146//!
147//! // connect and drive the handshake
148//! let mut session = WebSocketSession::connect("wss://echo.websocket.org/", None, None).unwrap();
149//! while session.status() == SessionStatus::Establishing {
150//!      session.drive().unwrap();
151//! }
152//!
153//! // publish a message
154//! let mut pending_publish = Message::Text("hello world!".into());
155//! while let PublishOutcome::Incomplete(pending) = session.publish(pending_publish).unwrap() {
156//!     pending_publish = pending;
157//! }
158//!
159//! // receive messages
160//! loop {
161//!     if let ReceiveOutcome::Payload(r) = session.receive().unwrap() {
162//!         println!("Received: {:?}", r);
163//!         break;
164//!     }
165//! }
166//! ```
167
168#[cfg(any(feature = "crossbeam"))]
169pub extern crate crossbeam_channel;
170#[cfg(any(feature = "http"))]
171pub extern crate http as hyperium_http;
172#[cfg(any(feature = "tcp"))]
173pub extern crate tcp_stream;
174#[cfg(any(feature = "websocket"))]
175pub extern crate tungstenite;
176
177#[cfg(any(feature = "aeron"))]
178pub mod aeron;
179pub mod buffer;
180pub mod compat;
181#[cfg(any(feature = "crossbeam"))]
182pub mod crossbeam;
183pub mod dns;
184pub mod frame;
185#[cfg(any(feature = "http"))]
186pub mod http;
187pub mod liveness;
188#[cfg(any(feature = "mock"))]
189pub mod mock;
190#[cfg(any(feature = "mpsc"))]
191pub mod mpsc;
192#[cfg(any(feature = "tcp"))]
193pub mod tcp;
194pub mod tls;
195#[cfg(any(feature = "websocket"))]
196pub mod websocket;
197
198use std::{fmt::Debug, io::Error};
199
200/// An instance of a connection or logical session, which may also support [`Receive`], [`Publish`], or dispatching received events to a [`Callback`]/[`CallbackRef`].
201///
202/// ## Connecting
203///
204/// Some implementations may not default to an established state, in which case immediate calls to `publish()` and `receive()` will fail.
205/// The [`Session::status`] function provides the current status, which will not return `Established` until all required handshakes are complete.
206/// When [`Session::status`] returns [`SessionStatus::Establishing`], you may drive the connection process via the [`Session::drive`] function.
207///
208/// ## Retrying
209///
210/// The [`Ok`] result of `publish(..)` and `receive(..)` operations may return [`ReceiveOutcome::Idle`], [`ReceiveOutcome::Active`], or [`PublishOutcome::Incomplete`].
211/// These outcomes indicate that an operation may need to be retried. See [`ReceiveOutcome`] and [`PublishOutcome`] for more details.
212///
213/// ## Duty Cycles
214///
215/// The [`Session::drive`] operation is used to finish connecting and to service reading/writing buffered data and to dispatch callbacks.
216/// Most, but not all, [`Session`] implementations will require periodic calls to [`Session::drive`] in order to function.
217/// Implementations that do not require calls to [`Session::drive`] will no-op when it is called.
218///
219/// [`Receive::receive`] and [`Publish::publish`] will implicitly drive, allowing users to receive or publish/retry in a tight loop without intermediately calling drive.
220/// This means that all [`Publish`] and [`Receive`] implementations must call drive internally.
221///
222/// ## Publishing
223///
224/// Session impls that can publish data will implement [`Publish`].
225///
226/// ## Receiving
227///
228/// Session impls that can receive data via polling implement [`Receive`].
229/// Impls that receive data via callbacks will accept a [`Callback`] or [`CallbackRef`] as input.
230///
231/// For cross-compatibilty between [`Receive`] and [`Callback`]/[`CallbackRef`] paradiagms, see the [`callback`] module.
232/// - [`callback::CallbackQueue`] impls [`Callback`]
233pub trait Session: Debug {
234    /// Check the current session status.
235    ///
236    /// If this returns [`SessionStatus::Establishing`], use [`Session::drive`] to progress the connection process.
237    fn status(&self) -> SessionStatus;
238
239    /// Some implementations will internally buffer payloads or require a duty cycle to drive callbacks.
240    /// Those implementations will require `drive(..)` to be called continuously to completely publish and/or receive data.
241    /// This function will return [`DriveOutcome::Active`] if work was done, indicating to any scheduler that more work may be pending.
242    /// When this function returns [`DriveOutcome::Idle`], only then should it indicate to a scheduler that yielding or idling is appropriate.
243    fn drive(&mut self) -> Result<DriveOutcome, Error>;
244}
245
246/// Returned by the [`Session::status`] function, providing the current connection state
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
248pub enum SessionStatus {
249    /// Session attempting to connect, handshake, or otherwise establish, and will move to `Established` or `Terminated` as [`Session::drive`] is called.
250    Establishing,
251    /// Session is currently established, and will move `Terminated` when an unrecoverable error is encountered
252    Established,
253    /// Session terminal state, connection has been closed
254    Terminated,
255}
256
257/// Returned by the [`Session::drive`] function, providing the result of the drive operation
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
259pub enum DriveOutcome {
260    /// The drive operation resulted in work being done, which means the user should attempt to call [`Session::drive`] again as soon as possible.
261    Active,
262    /// The drive operation did not result in any work being done, which means the user may decide to yield or backoff.
263    Idle,
264}
265
266/// A [`Session`] implementation that can receive payloads via polling.
267pub trait Receive: Session {
268    /// The type returned by the `receive(..)` function.
269    type ReceivePayload<'a>
270    where
271        Self: 'a;
272
273    /// Attempt to receive a `payload` from the session.
274    /// This will return [`ReceiveOutcome::Payload`] when data has been received.
275    /// [`ReceiveOutcome::Active`] can be used to report that work was completed, but data is not ready.
276    /// This means that only [`ReceiveOutcome::None`] should be used to indicate to a scheduler that yielding or idling is appropriate.
277    fn receive<'a>(&'a mut self) -> Result<ReceiveOutcome<Self::ReceivePayload<'a>>, Error>;
278}
279
280/// Returned by the [`Receive::receive`] function, providing the outcome or information about the receive action.
281///
282/// The generic type `T` will match the cooresponding [`Receive::ReceivePayload`].
283pub enum ReceiveOutcome<T> {
284    /// Contains a reference to payload received from the [`Receive::receive`] action.
285    Payload(T),
286
287    /// Data was buffered. This means a partial payload was received, but could not be returned as complete `Data`.
288    Active,
289
290    /// No work was done. This is useful to signal to a scheduler or idle strategy that it may be time to yield.
291    Idle,
292}
293impl<T: Debug> Debug for ReceiveOutcome<T> {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        match self {
296            ReceiveOutcome::Payload(x) => f.write_str(&format!("ReceiveOutcome::Payload({x:?})")),
297            ReceiveOutcome::Active => f.write_str("ReceiveOutcome::Active"),
298            ReceiveOutcome::Idle => f.write_str("ReceiveOutcome::Idle"),
299        }
300    }
301}
302impl<T: Clone> Clone for ReceiveOutcome<T> {
303    fn clone(&self) -> Self {
304        match self {
305            ReceiveOutcome::Payload(x) => ReceiveOutcome::Payload(x.clone()),
306            ReceiveOutcome::Active => ReceiveOutcome::Active,
307            ReceiveOutcome::Idle => ReceiveOutcome::Idle,
308        }
309    }
310}
311
312/// A [`Session`] implementation that can publish payloads.
313pub trait Publish: Session {
314    /// The type given to the `publish(..)` function.
315    type PublishPayload<'a>
316    where
317        Self: 'a;
318
319    /// Write the given `payload` to the session.
320    ///
321    /// This will return [`PublishOutcome::Incomplete`] if the publish is not immediately completed fully,
322    /// in which case `T` of `Incomplete(T)` data must be retried.
323    ///
324    /// Note that it is possible for some implementations that a publish is partially complete, so you must
325    /// re-attempt the data encapsulated by `Incomplete`, not the data originally passed into the function.
326    /// This guidance can only be ignored when you are not writing generic code and you know that your
327    /// [`Publish`] impl is "all-or-none".
328    fn publish<'a>(
329        &mut self,
330        payload: Self::PublishPayload<'a>,
331    ) -> Result<PublishOutcome<Self::PublishPayload<'a>>, Error>;
332}
333
334/// A [`Publish`] implementation that exposes a blocking flush operation.
335pub trait Flush: Publish {
336    /// Flush all pending publish data, blocking until completion.
337    fn flush(&mut self) -> Result<(), Error>;
338}
339
340/// Returned by the [`Publish::publish`] function, providing the outcome of the publish action.
341///
342/// The generic type `T` will match the cooresponding [`Publish::PublishPayload`].
343pub enum PublishOutcome<T> {
344    /// The publish action completed fully
345    Published,
346
347    /// The publish action was not performed or was partially performed.
348    ///
349    /// The returned reference must be passed back into the [`Publish::publish`] function for the publish action to complete.
350    /// Whether or not the returned reference may consist of partial data depends on the [`Session`] implementation.
351    ///
352    /// If you are looking for a general retry pattern, it is **always** safe to finish the publish by passing this returned
353    /// reference back into the `publish` function for another attempt, but it is only **sometimes** appropriate to return the entire
354    /// original publish reference into the `publish` function for a second attempt.
355    Incomplete(T),
356}
357impl<T> PublishOutcome<T> {
358    pub fn is_published(&self) -> bool {
359        match self {
360            PublishOutcome::Published => true,
361            PublishOutcome::Incomplete(_) => false,
362        }
363    }
364    pub fn is_incomplete(&self) -> bool {
365        match self {
366            PublishOutcome::Published => false,
367            PublishOutcome::Incomplete(_) => true,
368        }
369    }
370}
371impl<T: Debug> Debug for PublishOutcome<T> {
372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373        match self {
374            PublishOutcome::Published => f.write_str("PublishOutcome::Published"),
375            PublishOutcome::Incomplete(x) => {
376                f.write_str(&format!("PublishOutcome::Incomplete({x:?})"))
377            }
378        }
379    }
380}
381impl<T: Clone> Clone for PublishOutcome<T> {
382    fn clone(&self) -> Self {
383        match self {
384            PublishOutcome::Published => PublishOutcome::Published,
385            PublishOutcome::Incomplete(x) => PublishOutcome::Incomplete(x.clone()),
386        }
387    }
388}
389
390/// Used by push-oriented receivers to handle moved payloads as they are received.
391///
392/// See the [`compat`] module for [`Receive`] compatibility
393pub trait Callback<T> {
394    fn callback(&mut self, payload: T);
395}
396impl<T, F: FnMut(T)> Callback<T> for F {
397    fn callback(&mut self, payload: T) {
398        self(payload)
399    }
400}
401impl<T> Callback<T> for () {
402    fn callback(&mut self, _payload: T) {}
403}
404
405/// Used by push-oriented receivers to handle payload references as they are received.
406///
407/// See the [`compat`] module for [`Receive`] compatibility
408pub trait CallbackRef<T: ?Sized> {
409    fn callback_ref(&mut self, payload: &T);
410
411    /// Compatibility helper to convert any `CallbackRef` into a [`Callback`] when `T` is [`Sized`].
412    ///
413    /// This provides maximum compatibility when ownership of `T` is not required, as it allows users
414    /// to always implement [`CallbackRef`] which can then be used in either circumstance.
415    fn into_callback(mut self) -> impl Callback<T>
416    where
417        Self: Sized,
418        T: Sized,
419    {
420        move |x| self.callback_ref(&x)
421    }
422}
423impl<T: ?Sized, F: FnMut(&T)> CallbackRef<T> for F {
424    fn callback_ref(&mut self, payload: &T) {
425        self(payload)
426    }
427}
428impl<T> CallbackRef<T> for () {
429    fn callback_ref(&mut self, _payload: &T) {}
430}