ruststream 0.4.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! The process entry point generated by [`#[ruststream::app]`](macro@crate::app).
//!
//! An app crate exposes a builder function (`fn app() -> RustStream<_>`) and marks it with the
//! attribute macro, which expands to a `main` that calls [`run_main`]. That gives a zero-boilerplate
//! binary which understands two commands:
//!
//! - `run` (the default) builds a multi-thread Tokio runtime and runs the service until an
//!   interrupt, mirroring [`RustStream::run`]. With the `logging` feature enabled it first installs
//!   the colored console logger ([`crate::logging`]), so a scaffolded service prints logs without
//!   any setup; an app that installs its own subscriber keeps it.
//! - `asyncapi gen [-o <file>] [--yaml]` builds the [`AsyncAPI`](crate::asyncapi) document for the
//!   service and writes it to stdout or a file. Requires the crate to enable the `asyncapi` feature.
//!
//! The standalone `ruststream` CLI drives these by shelling out to `cargo run -- <command>`, so the
//! same dispatch serves both `cargo run` and `ruststream run`.

use std::process::ExitCode;

use thiserror::Error;

use super::app::{RustStream, RustStreamError};

/// Errors surfaced by the generated CLI entry point.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum CliError {
    /// The Tokio runtime could not be built.
    #[error("failed to build the async runtime: {0}")]
    Runtime(#[source] std::io::Error),
    /// The service returned an error while running (see [`RustStreamError`]).
    #[error(transparent)]
    Run(#[from] RustStreamError),
    /// The first argument was not a recognized command.
    #[error("unknown command {0:?}; expected `run` or `asyncapi gen`")]
    UnknownCommand(String),
    /// `asyncapi` was followed by something other than `gen`.
    #[error("unknown `asyncapi` subcommand {0:?}; expected `gen`")]
    UnknownAsyncApi(String),
    /// An unrecognized flag was passed to `asyncapi gen`.
    #[error("unknown option {0:?} for `asyncapi gen`")]
    UnknownOption(String),
    /// `asyncapi gen` was requested but the crate does not enable the `asyncapi` feature.
    #[error("`asyncapi gen` requires the crate to enable ruststream's `asyncapi` feature")]
    AsyncApiDisabled,
    /// The generated spec could not be written to the requested file.
    #[error("could not write the spec to {path}: {source}")]
    WriteSpec {
        /// The output path that failed.
        path: String,
        /// The underlying I/O error.
        #[source]
        source: std::io::Error,
    },
    /// The spec could not be serialized to JSON.
    #[cfg(feature = "asyncapi")]
    #[error("could not serialize the AsyncAPI spec to JSON: {0}")]
    SerializeJson(#[from] serde_json::Error),
    /// The spec could not be serialized to YAML.
    #[cfg(feature = "asyncapi")]
    #[error("could not serialize the AsyncAPI spec to YAML: {0}")]
    SerializeYaml(#[source] serde_norway::Error),
}

/// The command parsed from the process arguments.
enum Command {
    /// Run the service until interrupted.
    Run,
    /// Generate the `AsyncAPI` document.
    AsyncApiGen {
        /// Output file, or stdout when `None`.
        out: Option<String>,
        /// Emit YAML instead of JSON.
        yaml: bool,
    },
}

/// Runs the generated CLI for `build`'s service, returning a process exit code.
///
/// Called by the [`#[ruststream::app]`](macro@crate::app) expansion; you rarely call it directly.
/// It reads the process arguments, dispatches to `run` or `asyncapi gen`, and prints any error to
/// stderr.
///
/// # Examples
///
/// ```no_run
/// # #[cfg(feature = "memory")]
/// # {
/// use ruststream::memory::MemoryBroker;
/// use ruststream::runtime::{AppInfo, RustStream};
///
/// fn app() -> RustStream {
///     RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
/// }
///
/// fn main() -> std::process::ExitCode {
///     ruststream::runtime::cli::run_main(app)
/// }
/// # }
/// ```
#[must_use]
pub fn run_main<L, F>(build: F) -> ExitCode
where
    F: FnOnce() -> RustStream<L>,
{
    match execute(build) {
        Ok(()) => ExitCode::SUCCESS,
        Err(err) => {
            eprintln!("ruststream: {err}");
            ExitCode::FAILURE
        }
    }
}

fn execute<L, F>(build: F) -> Result<(), CliError>
where
    F: FnOnce() -> RustStream<L>,
{
    let args: Vec<String> = std::env::args().skip(1).collect();
    match parse(&args)? {
        Command::Run => {
            // Install the colored console logger so a freshly scaffolded service prints logs out
            // of the box. Ignore the error: it only fails if the app already installed its own
            // subscriber, which we must not replace.
            #[cfg(feature = "logging")]
            let _ = crate::logging::init();
            let app = build();
            let runtime = tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .build()
                .map_err(CliError::Runtime)?;
            runtime.block_on(app.run())?;
            Ok(())
        }
        Command::AsyncApiGen { out, yaml } => generate_spec(&build(), out.as_deref(), yaml),
    }
}

fn parse(args: &[String]) -> Result<Command, CliError> {
    match args.first().map(String::as_str) {
        None | Some("run") => Ok(Command::Run),
        Some("asyncapi") => parse_asyncapi(&args[1..]),
        Some(other) => Err(CliError::UnknownCommand(other.to_owned())),
    }
}

fn parse_asyncapi(args: &[String]) -> Result<Command, CliError> {
    if args.first().map(String::as_str) != Some("gen") {
        let found = args.first().cloned().unwrap_or_default();
        return Err(CliError::UnknownAsyncApi(found));
    }
    let mut out = None;
    let mut yaml = false;
    let mut rest = args[1..].iter();
    while let Some(arg) = rest.next() {
        match arg.as_str() {
            "-o" | "--out" => out = rest.next().cloned(),
            "--yaml" => yaml = true,
            other => return Err(CliError::UnknownOption(other.to_owned())),
        }
    }
    Ok(Command::AsyncApiGen { out, yaml })
}

#[cfg(feature = "asyncapi")]
fn generate_spec<L>(app: &RustStream<L>, out: Option<&str>, yaml: bool) -> Result<(), CliError> {
    let spec = crate::asyncapi::build_spec(app);
    let text = if yaml {
        spec.to_yaml().map_err(CliError::SerializeYaml)?
    } else {
        spec.to_json()?
    };
    if let Some(path) = out {
        std::fs::write(path, text).map_err(|source| CliError::WriteSpec {
            path: path.to_owned(),
            source,
        })
    } else {
        println!("{text}");
        Ok(())
    }
}

#[cfg(not(feature = "asyncapi"))]
fn generate_spec<L>(_app: &RustStream<L>, _out: Option<&str>, _yaml: bool) -> Result<(), CliError> {
    Err(CliError::AsyncApiDisabled)
}

#[cfg(test)]
mod tests {
    use super::{Command, parse};

    fn args(parts: &[&str]) -> Vec<String> {
        parts.iter().map(|s| (*s).to_owned()).collect()
    }

    #[test]
    fn empty_args_default_to_run() {
        assert!(matches!(parse(&args(&[])).unwrap(), Command::Run));
        assert!(matches!(parse(&args(&["run"])).unwrap(), Command::Run));
    }

    #[test]
    fn asyncapi_gen_parses_output_and_format() {
        let cmd = parse(&args(&["asyncapi", "gen", "-o", "spec.yaml", "--yaml"])).unwrap();
        let Command::AsyncApiGen { out, yaml } = cmd else {
            panic!("expected asyncapi gen");
        };
        assert_eq!(out.as_deref(), Some("spec.yaml"));
        assert!(yaml);
    }

    #[test]
    fn asyncapi_gen_defaults_to_json_stdout() {
        let cmd = parse(&args(&["asyncapi", "gen"])).unwrap();
        let Command::AsyncApiGen { out, yaml } = cmd else {
            panic!("expected asyncapi gen");
        };
        assert!(out.is_none());
        assert!(!yaml);
    }

    #[test]
    fn unknown_command_is_rejected() {
        assert!(parse(&args(&["frobnicate"])).is_err());
        assert!(parse(&args(&["asyncapi", "lint"])).is_err());
        assert!(parse(&args(&["asyncapi", "gen", "--nope"])).is_err());
    }
}