ruststream/lib.rs
1//! Rust core of the [`RustStream`](https://github.com/powersemmi/ruststream) messaging
2//! framework: broker-agnostic traits, message types, codecs, router runtime, and a
3//! conformance harness for broker authors.
4//!
5//! # Cargo features
6//!
7//! The core traits, the [`runtime::RustStream`] application object, middleware, and dispatch are
8//! always present. The rest is additive and opt-in. Codec features are mutually compatible and
9//! enable only the deserializers you need.
10//!
11//! * `json` (default): [`codec::JsonCodec`].
12//! * `msgpack`: [`codec::MsgpackCodec`].
13//! * `cbor`: [`codec::CborCodec`].
14//! * `memory`: [`memory::MemoryBroker`], an in-process broker usable in applications, prototypes
15//! and tests.
16//! * `macros`: the `#[subscriber]`, [`#[ruststream::app]`](macro@app), and
17//! [`#[derive(Message)]`](macro@Message) macros.
18//! * `asyncapi`: `AsyncAPI` document generation and the HTML viewer.
19//! * `metrics`: Prometheus metrics middleware and exporter.
20//! * `logging`: colored, `RUST_LOG`-driven console logging via `tracing-subscriber`
21//! ([`logging::init`]). The generated `cli` `run` command installs it automatically.
22//! * `conformance`: the [`conformance::harness`] contract suite, per-capability suites in
23//! [`conformance::capabilities`], and broker-agnostic [`conformance::helpers`] for application
24//! tests. Generic over any broker's `TestClient`, so it pulls in no concrete broker (enable
25//! `memory` too to run it against [`memory::MemoryBroker`]).
26//! * `cli`: the `ruststream` binary (`run`, `asyncapi gen`, `new`).
27//!
28//! Disable defaults (`default-features = false`) to drop the bundled JSON codec; the core traits,
29//! runtime, and dispatch remain. Add back only what you need.
30
31#![forbid(unsafe_code)]
32
33mod broker;
34mod buffered;
35mod capability;
36mod error;
37mod extensions;
38mod headers;
39mod message;
40mod publisher;
41mod schema;
42mod subscriber;
43mod subscription;
44pub mod testing;
45
46pub use broker::Broker;
47pub use buffered::{Buffered, BufferedSubscriber};
48pub use capability::{
49 BatchSubscriber, DescribeServer, Partitioned, RequestReply, ServerSpec, Subscribe,
50 TransactionalPublisher,
51};
52pub use error::AckError;
53pub use extensions::Extensions;
54pub use headers::Headers;
55pub use message::{IncomingMessage, OutgoingMessage, RawMessage};
56pub use publisher::Publisher;
57pub use schema::Message;
58pub use subscriber::Subscriber;
59pub use subscription::{Name, SubscriptionSource};
60
61pub mod codec;
62
63#[cfg(feature = "memory")]
64pub mod memory;
65
66pub mod runtime;
67
68pub use runtime::RustStream;
69
70/// Attribute macro that turns an `async fn` into a mountable subscriber definition.
71///
72/// Available with the `macros` feature. See [`ruststream_macros::subscriber`].
73#[cfg(feature = "macros")]
74pub use ruststream_macros::subscriber;
75
76/// Attribute macro that generates a `main` entry point from a `RustStream` builder function.
77///
78/// Available with the `macros` feature. See [`ruststream_macros::app`] and
79/// [`runtime::cli`].
80#[cfg(feature = "macros")]
81pub use ruststream_macros::app;
82
83/// Derive macro for [`Message`] metadata (type name + doc description).
84///
85/// Available with the `macros` feature.
86#[cfg(feature = "macros")]
87pub use ruststream_macros::Message;
88
89#[cfg(feature = "conformance")]
90pub mod conformance;
91
92#[cfg(feature = "asyncapi")]
93pub mod asyncapi;
94
95/// Re-export of [`schemars`] so message types can derive `JsonSchema` without a direct dependency.
96///
97/// Derive it on a message type (`#[derive(ruststream::schemars::JsonSchema)]`) and its payload
98/// schema is emitted into the generated [`AsyncAPI`](asyncapi) document. Available with the
99/// `asyncapi` feature.
100#[cfg(feature = "asyncapi")]
101pub use schemars;
102
103#[cfg(feature = "metrics")]
104pub mod metrics;
105
106#[cfg(feature = "logging")]
107pub mod logging;
108
109/// Implementation detail used by the `#[subscriber]` macro to capture a payload's JSON Schema.
110///
111/// Not part of the public API; no stability guarantees.
112#[doc(hidden)]
113pub mod __private {
114 use core::marker::PhantomData;
115
116 /// A type-carrying probe the macro reads a payload schema off.
117 ///
118 /// Schema selection uses inherent-vs-trait specialization (a stable-Rust trick): the schema
119 /// path is an inherent method on `Probe<T>` bounded by `T: JsonSchema`, and
120 /// [`NoSchemaProbe::schema_json`] is the trait fallback. Inherent methods win when present, so
121 /// `Probe::<T>::new().schema_json()` returns the schema for a concrete `T: JsonSchema` and
122 /// `None` otherwise - without forcing the bound onto every message type. The inherent method
123 /// exists only with the `asyncapi` feature.
124 #[derive(Debug)]
125 pub struct Probe<T>(pub PhantomData<T>);
126
127 impl<T> Probe<T> {
128 /// Constructs a probe for `T`.
129 #[must_use]
130 pub const fn new() -> Self {
131 Self(PhantomData)
132 }
133 }
134
135 impl<T> Default for Probe<T> {
136 fn default() -> Self {
137 Self::new()
138 }
139 }
140
141 /// The trait fallback: chosen for any `T` the inherent schema method does not cover.
142 pub trait NoSchemaProbe {
143 /// Returns `None` (no schema available for the probed type).
144 fn schema_json(&self) -> Option<String>;
145 }
146
147 impl<T> NoSchemaProbe for Probe<T> {
148 fn schema_json(&self) -> Option<String> {
149 None
150 }
151 }
152
153 #[cfg(feature = "asyncapi")]
154 impl<T: schemars::JsonSchema> Probe<T> {
155 /// Returns the serialized JSON Schema for `T` (inherent; preferred over the trait fallback).
156 #[must_use]
157 pub fn schema_json(&self) -> Option<String> {
158 serde_json::to_string(&schemars::schema_for!(T)).ok()
159 }
160 }
161
162 /// The trait fallback for [`Message`](crate::Message) metadata: chosen for any `T` the
163 /// inherent methods below do not cover.
164 pub trait NoMessageProbe {
165 /// Returns `None` (the probed type does not implement `Message`).
166 fn message_name(&self) -> Option<&'static str>;
167 /// Returns `None` (the probed type does not implement `Message`).
168 fn message_description(&self) -> Option<&'static str>;
169 }
170
171 impl<T> NoMessageProbe for Probe<T> {
172 fn message_name(&self) -> Option<&'static str> {
173 None
174 }
175
176 fn message_description(&self) -> Option<&'static str> {
177 None
178 }
179 }
180
181 impl<T: crate::Message> Probe<T> {
182 /// Returns [`Message::NAME`](crate::Message::NAME) for `T` (inherent; preferred over the
183 /// trait fallback).
184 #[must_use]
185 pub fn message_name(&self) -> Option<&'static str> {
186 Some(T::NAME)
187 }
188
189 /// Returns [`Message::DESCRIPTION`](crate::Message::DESCRIPTION) for `T` (inherent;
190 /// preferred over the trait fallback).
191 #[must_use]
192 pub fn message_description(&self) -> Option<&'static str> {
193 T::DESCRIPTION
194 }
195 }
196}