kanin 0.21.0

An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin).
Documentation
//! Module for the [App] struct and surrounding utilities.

mod task;

use std::{any::Any, fmt, sync::Arc};

use anymap::Map;
use futures::future::{select_all, SelectAll};
use lapin::{self, Connection, ConnectionProperties};
use log::{debug, info, trace};
use tokio::task::JoinHandle;

use self::task::TaskFactory;
use crate::{extract::State, Error, Handler, HandlerConfig, Respond, Result};

/// Apps can hold any type as state. These types can then be extracted in handlers. This state is stored in a type-map.
pub(crate) type StateMap = Map<dyn Any + Send + Sync>;

/// The central struct of your application.
#[must_use = "The app will not do anything unless you call `.run`."]
pub struct App {
    /// A map from routing keys to task factories.
    /// Task factories are constructed in [`App::handler`] and called in [`App::run`].
    handlers: Vec<TaskFactory>,
    /// A map from types to a single value of that type.
    /// This is used to hold the state values that users may want to store before running the app,
    /// and then extract in their handlers.
    state: StateMap,
}

impl Default for App {
    fn default() -> Self {
        Self {
            state: Map::new(),
            handlers: Vec::default(),
        }
    }
}

impl App {
    /// Creates a new kanin app with the default configuration.
    pub fn new() -> Self {
        Self::default()
    }

    /// Registers a new handler for the given routing key with the default prefetch count.
    ///
    /// The handler will respond to any messages with `reply_to` and `correlation_id` properties.
    /// This requires that the response type implements Respond (which is automatically implemented for protobuf messages).
    pub fn handler<H, Args, Res>(self, routing_key: impl Into<String>, handler: H) -> Self
    where
        H: Handler<Args, Res> + Send + 'static,
        Res: Respond + fmt::Debug + Send,
    {
        self.handler_with_config(routing_key, handler, Default::default())
    }

    /// Registers a new handler for the given routing key with the given queue configuration.
    ///
    /// The handler will respond to any messages with `reply_to` and `correlation_id` properties.
    /// This requires that the response type implements Respond (which is automatically implemented for protobuf messages).
    pub fn handler_with_config<H, Args, Res>(
        mut self,
        routing_key: impl Into<String>,
        handler: H,
        config: HandlerConfig,
    ) -> Self
    where
        H: Handler<Args, Res> + Send + 'static,
        Res: Respond + fmt::Debug + Send,
    {
        // Create and save the task factory - this is a function that creates the async task that will be run in tokio.
        self.handlers
            .push(TaskFactory::new(routing_key.into(), handler, config));

        self
    }

    /// Adds a type as state to this app.
    ///
    /// An `App` may use any number of types as state. The app will contain one instance of each type.
    ///
    /// The state added to the app through this method can subsequently be used in request handlers,
    /// by making use of the [`crate::extract::State`] extractor.
    ///
    /// # Panics
    /// Panics if the given type has already been registered with the app.
    pub fn state<T: Send + Sync + 'static>(mut self, value: T) -> Self {
        if self.state.insert(State(Arc::new(value))).is_some() {
            panic!(
                "Attempted to register a state type, `{}` that had already been registered before! \
                You can only register one value of each type. If you need multiple values of the same type, \
                use the newtype pattern to signify the semantic difference between the two values.",
                std::any::type_name::<T>()
            );
        }
        self
    }

    /// Connects to AMQP with the given address and calls [`run_with_connection`][App::run_with_connection] with the resulting connection.
    /// See [`run_with_connection`][App::run_with_connection] for more details.
    #[allow(clippy::missing_errors_doc)]
    pub async fn run(self, amqp_addr: &str) -> Result<()> {
        debug!("Connecting to AMQP on address: {amqp_addr:?} ...");
        let conn = Connection::connect(amqp_addr, ConnectionProperties::default()).await?;
        trace!("Connected to AMQP on address: {amqp_addr:?}");
        self.run_with_connection(&conn).await
    }

    /// Runs the app with all the handlers that have been registered.
    ///
    /// Each handler is given its own dedicated channel associated with the given connection.
    /// The handlers then run in their own spawned tokio tasks.
    /// Handlers handle requests concurrently by spawning new tokio tasks for each incoming request.
    ///
    /// # Errors
    /// Returns an `Err` on any of the below conditions:
    /// * No handlers were registered.
    /// * A connection to the AMQP broker could not be established.
    /// * Queue/consumer declaration or binding failed while setting up a handler.
    ///
    /// # Panics
    /// On connection errors, the app will simply panic.
    pub async fn run_with_connection(self, conn: &Connection) -> Result<()> {
        let handles = self.setup_handlers(conn).await?;
        let (returning_handler, _remaining_handlers_count, _leftover_handlers) = handles.await;

        match returning_handler {
            Ok(routing_key) => {
                // This case can only happen if the handler task runs to completion.
                // I.e. it completes the loop of consuming messages. This should only happen if the consumer is cancelled somehow.
                panic!("A handler task for routing key {routing_key:?} returned unexpectedly! Was the consumer cancelled?");
            }
            Err(e) => {
                // The JoinError is either a task cancellation or a panic.
                // We don't cancel tasks so this must be a handler panic.
                panic!("A handler panicked: {:#}", e);
            }
        }
    }

    /// Set up all the handlers, returning a [`SelectAll`] future that collects all the join handles.
    pub(crate) async fn setup_handlers(
        self,
        conn: &Connection,
    ) -> Result<SelectAll<JoinHandle<String>>> {
        if self.handlers.is_empty() {
            return Err(Error::NoHandlers);
        }
        conn.on_error(|e| {
            panic!("Connection returned error: {e:#}");
        });
        let mut join_handles = Vec::new();
        let state = Arc::new(self.state);
        for task_factory in self.handlers.into_iter() {
            debug!(
                "Spawning handler task for routing key: {:?} ...",
                task_factory.routing_key()
            );

            // Construct the task from the factory. This produces a pinned future which we can then spawn.
            let task = task_factory.build(conn, state.clone()).await?;

            // Spawn the task and save the join handle.
            join_handles.push(tokio::spawn(task));
        }
        info!(
            "Connected to AMQP broker. Listening on {} handler{}.",
            join_handles.len(),
            if join_handles.len() == 1 { "" } else { "s" }
        );

        Ok(select_all(join_handles))
    }
}