Skip to main content

ruststream/
lib.rs

1//! Rust core of the [`RustStream`](https://github.com/powersemmi/ruststream) messaging
2//! framework: broker-agnostic traits, message types, codecs, router runtime, and a
3//! conformance harness for broker authors.
4//!
5//! # Cargo features
6//!
7//! The core traits, the [`runtime::RustStream`] application object, middleware, and dispatch are
8//! always present. The rest is additive and opt-in. Codec features are mutually compatible and
9//! enable only the deserializers you need.
10//!
11//! * `json` (default): [`codec::JsonCodec`].
12//! * `msgpack`: [`codec::MsgpackCodec`].
13//! * `cbor`: [`codec::CborCodec`].
14//! * `memory`: [`memory::MemoryBroker`], an in-process broker usable in applications, prototypes
15//!   and tests.
16//! * `macros`: the `#[subscriber]`, [`#[ruststream::app]`](macro@app), and
17//!   [`#[derive(Message)]`](macro@Message) macros.
18//! * `asyncapi`: `AsyncAPI` document generation and the HTML viewer.
19//! * `metrics`: Prometheus metrics middleware and exporter.
20//! * `logging`: colored, `RUST_LOG`-driven console logging via `tracing-subscriber`
21//!   ([`logging::init`]). The generated `cli` `run` command installs it automatically.
22//! * `conformance`: the [`conformance::harness`] contract suite, per-capability suites in
23//!   [`conformance::capabilities`], and broker-agnostic [`conformance::helpers`] for application
24//!   tests. Generic over any broker's `TestClient`, so it pulls in no concrete broker (enable
25//!   `memory` too to run it against [`memory::MemoryBroker`]).
26//! * `cli`: the `ruststream` binary (`run`, `asyncapi gen`, `new`).
27//!
28//! Disable defaults (`default-features = false`) to drop the bundled JSON codec; the core traits,
29//! runtime, and dispatch remain. Add back only what you need.
30
31#![forbid(unsafe_code)]
32
33mod broker;
34mod buffered;
35mod capability;
36mod error;
37mod extensions;
38mod headers;
39mod message;
40mod publisher;
41mod schema;
42mod subscriber;
43mod subscription;
44pub mod testing;
45
46pub use broker::Broker;
47pub use buffered::{Buffered, BufferedSubscriber};
48pub use capability::{
49    BatchSubscriber, DescribeServer, Partitioned, RequestReply, ServerSpec, Subscribe,
50    TransactionalPublisher,
51};
52pub use error::AckError;
53pub use extensions::Extensions;
54pub use headers::Headers;
55pub use message::{IncomingMessage, OutgoingMessage, RawMessage};
56pub use publisher::Publisher;
57pub use schema::Message;
58pub use subscriber::Subscriber;
59pub use subscription::{Name, SubscriptionSource};
60
61pub mod codec;
62
63#[cfg(feature = "memory")]
64pub mod memory;
65
66pub mod runtime;
67
68pub use runtime::RustStream;
69
70/// Attribute macro that turns an `async fn` into a mountable subscriber definition.
71///
72/// Available with the `macros` feature. See [`ruststream_macros::subscriber`].
73#[cfg(feature = "macros")]
74pub use ruststream_macros::subscriber;
75
76/// Attribute macro that generates a `main` entry point from a `RustStream` builder function.
77///
78/// Available with the `macros` feature. See [`ruststream_macros::app`] and
79/// [`runtime::cli`].
80#[cfg(feature = "macros")]
81pub use ruststream_macros::app;
82
83/// Derive macro for [`Message`] metadata (type name + doc description).
84///
85/// Available with the `macros` feature.
86#[cfg(feature = "macros")]
87pub use ruststream_macros::Message;
88
89#[cfg(feature = "conformance")]
90pub mod conformance;
91
92#[cfg(feature = "asyncapi")]
93pub mod asyncapi;
94
95/// Re-export of [`schemars`] so message types can derive `JsonSchema` without a direct dependency.
96///
97/// Derive it on a message type (`#[derive(ruststream::schemars::JsonSchema)]`) and its payload
98/// schema is emitted into the generated [`AsyncAPI`](asyncapi) document. Available with the
99/// `asyncapi` feature.
100#[cfg(feature = "asyncapi")]
101pub use schemars;
102
103#[cfg(feature = "metrics")]
104pub mod metrics;
105
106#[cfg(feature = "logging")]
107pub mod logging;
108
109/// Implementation detail used by the `#[subscriber]` macro to capture a payload's JSON Schema.
110///
111/// Not part of the public API; no stability guarantees.
112#[doc(hidden)]
113pub mod __private {
114    use core::marker::PhantomData;
115
116    /// A type-carrying probe the macro reads a payload schema off.
117    ///
118    /// Schema selection uses inherent-vs-trait specialization (a stable-Rust trick): the schema
119    /// path is an inherent method on `Probe<T>` bounded by `T: JsonSchema`, and
120    /// [`NoSchemaProbe::schema_json`] is the trait fallback. Inherent methods win when present, so
121    /// `Probe::<T>::new().schema_json()` returns the schema for a concrete `T: JsonSchema` and
122    /// `None` otherwise - without forcing the bound onto every message type. The inherent method
123    /// exists only with the `asyncapi` feature.
124    #[derive(Debug)]
125    pub struct Probe<T>(pub PhantomData<T>);
126
127    impl<T> Probe<T> {
128        /// Constructs a probe for `T`.
129        #[must_use]
130        pub const fn new() -> Self {
131            Self(PhantomData)
132        }
133    }
134
135    impl<T> Default for Probe<T> {
136        fn default() -> Self {
137            Self::new()
138        }
139    }
140
141    /// The trait fallback: chosen for any `T` the inherent schema method does not cover.
142    pub trait NoSchemaProbe {
143        /// Returns `None` (no schema available for the probed type).
144        fn schema_json(&self) -> Option<String>;
145    }
146
147    impl<T> NoSchemaProbe for Probe<T> {
148        fn schema_json(&self) -> Option<String> {
149            None
150        }
151    }
152
153    #[cfg(feature = "asyncapi")]
154    impl<T: schemars::JsonSchema> Probe<T> {
155        /// Returns the serialized JSON Schema for `T` (inherent; preferred over the trait fallback).
156        #[must_use]
157        pub fn schema_json(&self) -> Option<String> {
158            serde_json::to_string(&schemars::schema_for!(T)).ok()
159        }
160    }
161
162    /// The trait fallback for [`Message`](crate::Message) metadata: chosen for any `T` the
163    /// inherent methods below do not cover.
164    pub trait NoMessageProbe {
165        /// Returns `None` (the probed type does not implement `Message`).
166        fn message_name(&self) -> Option<&'static str>;
167        /// Returns `None` (the probed type does not implement `Message`).
168        fn message_description(&self) -> Option<&'static str>;
169    }
170
171    impl<T> NoMessageProbe for Probe<T> {
172        fn message_name(&self) -> Option<&'static str> {
173            None
174        }
175
176        fn message_description(&self) -> Option<&'static str> {
177            None
178        }
179    }
180
181    impl<T: crate::Message> Probe<T> {
182        /// Returns [`Message::NAME`](crate::Message::NAME) for `T` (inherent; preferred over the
183        /// trait fallback).
184        #[must_use]
185        pub fn message_name(&self) -> Option<&'static str> {
186            Some(T::NAME)
187        }
188
189        /// Returns [`Message::DESCRIPTION`](crate::Message::DESCRIPTION) for `T` (inherent;
190        /// preferred over the trait fallback).
191        #[must_use]
192        pub fn message_description(&self) -> Option<&'static str> {
193            T::DESCRIPTION
194        }
195    }
196}