schmoozer 0.4.1

A simple abstraction over a retryable async operation, such as establishing a connection
Documentation
//! _schmoozer_ is intended to be used as an `async` (re)connector.  It
//! consists of two primary parts:
//! - The [`Connector`] trait is implemented by applications/libraries that
//!   need to run retryable connection loops.
//! - [`run()`] is a function that takes in a `Connector` implementation, and
//!   attempts to establish a connection, delaying and retrying on failures
//!   that the callback reports as retriable, and calls the
//!   [`Connector::run()`] trait method once a connection has been successfully
//!   been established.
//!
//! Perhaps paradoxically the [`run()`] function does not itself actually
//! attempt to establish any connections -- it relies on the
//! [`Connector::connect()`] trait method implementation to establish
//! connections.
//!
//! The "good path" overall flow of the connector loop is to call the
//! `connect()` trait method.  If it is successful, call the trait's `run()`
//! method, passing along the newly allocated connection. The main application
//! logic relating to the connection should implemented in this method.
//!
//! The primary purpose of the connector concerns the "retryable failure path":
//! If the `connect()` method encounters a failure it can choose to signal to
//! back to the connector loop that the error is "retryable", in which case the
//! `retry_delay()` method is called to determine if the connector loop should
//! retry (and implement a delay before returning instructions to do so).
//!
//! Likewise, the [`Connector::run()`] trait method returns its [`RunResult`]
//! to indicate whether the connector should reconnect or exit, either
//! successfully or with an error.
//!
//! # Features
//! | Feature   | Function
//! |-----------|----------
//! | `tcpconn` | Enable support for a simple TCP (re)connector.
//! | `tracing` | Make the connector loop generator tracing logs.

#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(feature = "tcpconn")]
#[cfg_attr(docsrs, doc(cfg(feature = "tcpconn")))]
pub mod tcpconn;

pub use async_trait::async_trait;

#[cfg(feature = "tcpconn")]
pub use tcpconn::SimpleTcpConnector;

/// Application callbacks for the [`run()`] function (or equivalent).
#[async_trait]
pub trait Connector {
  /// The connection type that the `Connector::connect()` implementor spawns.
  ///
  /// Once created, an instance of this type will be passed to the
  /// `Connector::run()` implementation.
  type ConnType: Send;

  /// The application error return type.
  type Error: Send;

  /// Attempt to establish a connection.
  ///
  /// If a connection was successfully established the implementation returns
  /// [`ConnResult::Connected`], which will instruct the connector loop in
  /// [`run()`] to call the [`Connector::run()`] implementation.
  ///
  /// If the implementation detects a termination condition the handler returns
  /// [`ConnResult::Exit`], which will cause [`run()`]'s connection loop
  /// to terminate and return the result passed along with `ConnResult::Exit`.
  ///
  /// The implementation returns [`ConnResult::Reconnect`] to signal that some
  /// kind of retryable failure occurred.  The connector loop in [`run()`] will
  /// call the [`Connector::retry_delay()`] to check if it should attempt a
  /// reconnection, and delay before doing so.
  ///
  /// # Errors
  /// If a fatal error occurs that is not retryable the implementation returns
  /// [`ConnResult::Exit`] with an `Err(E)`, which will be returned by the
  /// connector loop function.
  async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error>;

  /// Give application a chance to determine whether or not to attempt a
  /// reconnection, and delay before doing so.
  ///
  /// This implementation is called either when `Connection::connect()` or
  /// `Connector::run()` return `ConnResult::Reconnect` or
  /// `RunResult::Reconnect`.
  ///
  /// The application should return [`RunResult::Reconnect`] to instruct the
  /// connector loop in [`run()`] to call [`Connector::connect()`] again to
  /// attempt to establish a connection.
  ///
  /// If the implementation detects a termination condition the handler returns
  /// [`RunResult::Exit`], which will cause [`run()`]'s connection loop
  /// to terminate and return the result passed along with `RunResult::Exit`.
  ///
  /// # Errors
  /// If a fatal error occurs that is not retryable the implementation returns
  /// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the
  /// connector loop function.
  async fn retry_delay(&mut self) -> RunResult<Self::Error>;

  /// Run the application's connection handler.
  ///
  /// The application should return [`RunResult::Reconnect`] to instruct the
  /// connector loop in [`run()`] to attempt a reconnect.
  ///
  /// If the implementation detects a termination condition the handler returns
  /// [`RunResult::Exit`], which will cause [`run()`]'s connection loop
  /// to terminate and return the result passed along with `RunResult::Exit`.
  ///
  /// # Errors
  /// If a fatal error occurs that is not retryable the implementation returns
  /// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the
  /// connector loop function.
  async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error>;

  /// Optionally return a `String` representation of the connector's target
  /// address.
  ///
  /// The returned `String` is intended for display/logging, and is not meant
  /// to be reversible.
  ///
  /// The default implementation returns `None`.
  fn display_target(&self) -> Option<String> {
    None
  }
}


/// Special-purpose result returned by [`Connector::connect()`].
pub enum ConnResult<C, E> {
  /// The connection was successful.
  ///
  /// Run the [`Connector::run()`] with the connection `C`.
  Connected(C),

  /// Connection could not be established.
  ///
  /// Call [`Connector::retry_delay()`] to check if reconnection attempts has
  /// been exhaused and, if applicable, delay before reconnection attempt.
  Reconnect,

  /// Terminate the reconnection loop.
  Exit(Result<(), E>)
}

/// Returned by [`Connector::run()`]
pub enum RunResult<E> {
  /// Attempt to reconnect.
  Reconnect,

  /// Terminate the reconnection loop.
  Exit(Result<(), E>)
}


/// Establish and process a network connection.
///
/// The `run()` function will enter a loop that will attempt to establish a
/// connection by calling the [`Connector::connect()`] implementation.  If a
/// connection is successfully established the connector loop will call the
/// [`Connector::run()`] implementation.
///
/// The main purpose of the connector loop to handle connection retry requests
/// from either the `connect()` or the `run()` trait implementations
/// (presumably because they failed in a retryable manner).  If a reconnection
/// request is returned [`Connector::retry_delay()`] will be called to allow
/// the application to implement its own logic to determine whether the
/// reconnection shoulld proceed and optionally adding a delay before the
/// reconnection attempt.
///
/// # Exit conditions
/// The (re)connection loop will exit if:
/// - [`Connector::connect()`] returns [`ConnResult::Exit`]
/// - [`Connector::retry_delay()`] returns [`RunResult::Exit`]
/// - [`Connector::run()`] returns [`RunResult::Exit`]
///
/// # Errors
/// If any of the `Connector`'s callbacks return `ConnResult::Exit(Err(_))` or
/// `RunResult::Exit(Err(_))` this function will return the error back to the
/// caller.
#[allow(clippy::missing_errors_doc)]
pub async fn run<E>(
  mut connector: impl Connector<Error = E> + Send
) -> Result<(), E>
where
  E: Send + std::fmt::Debug
{
  #[cfg(feature = "tracing")]
  tracing::info!("Enter (re)connection loop");
  loop {
    // Call the application's connect callback to attempt to establish
    // connection.
    #[cfg(feature = "tracing")]
    tracing::info!("Attempt to establish connection");
    match connector.connect().await {
      ConnResult::Connected(conn) => {
        // A connection was successfully established -- call the run()
        // implementation.
        #[cfg(feature = "tracing")]
        tracing::info!(
          "Got connection -- call application connection handler"
        );
        match connector.run(conn).await {
          RunResult::Reconnect => {
            // The application has requested a reconnection.
            // Fall through to retry_delay()
            #[cfg(feature = "tracing")]
            tracing::debug!("Connector::run() requested reconnection");
          }
          RunResult::Exit(res) => {
            #[cfg(feature = "tracing")]
            tracing::info!(
              "Connector::connect() requested termination: {res:?}"
            );
            break res;
          }
        }
      }
      ConnResult::Reconnect => {
        // The connector returned a retriable error
        // fall through to retry()/delay()
        #[cfg(feature = "tracing")]
        tracing::debug!("Connector::connect() requested reconnection");
      }
      ConnResult::Exit(res) => {
        // Terminate reconnection loop
        #[cfg(feature = "tracing")]
        tracing::info!("Connector::connect() requested termination; {res:?}");
        break res;
      }
    }

    // If this point is reached the application has requested a reconnection.
    // Call `retry_delay()` to allow the application to determine whether to
    // retry or not.

    #[cfg(feature = "tracing")]
    tracing::info!("Call retry/delay callback");
    match connector.retry_delay().await {
      RunResult::Reconnect => {
        // Application wants to reconnect.
        #[cfg(feature = "tracing")]
        tracing::debug!("Connector::retry_delay() requested reconnection");
        continue;
      }
      RunResult::Exit(res) => {
        // Terminate reconnection loop
        #[cfg(feature = "tracing")]
        tracing::info!(
          "Connector::retry_delay() requested termination: {res:?}"
        );
        break res;
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :