1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//! The [`Broker`] trait: the entry point of any broker implementation.
use ;
/// A connection to a message broker, owning its lifecycle.
///
/// `Broker` is the entry point of any broker crate (`ruststream-nats`, `ruststream-kafka`, ...).
/// It owns only the connection lifecycle: implementations establish their network connection in
/// [`connect`] and release all resources in [`shutdown`]. Subscribing is described separately by a
/// [`SubscriptionSource`](crate::SubscriptionSource) (or the [`Subscribe`](crate::Subscribe)
/// capability for the by-name case), so a single broker can offer several subscription kinds with
/// different subscriber types (`Redis` pub/sub vs streams vs lists). Publishers are likewise
/// produced by broker-specific constructors and registered on the app.
///
/// `Send + Sync` is required so the router can share the broker handle across tasks.
///
/// # Lazy startup contract
///
/// Implementations MUST be constructible **synchronously**, without performing I/O: expose a plain
/// `new(..)` constructor that only captures configuration (addresses, credentials). All network
/// setup happens in [`connect`], which the runtime calls once at startup, after the synchronous
/// `#[ruststream::app]` builder has run. This is what lets a service be assembled with the app
/// macro regardless of broker. A broker that can only be built by connecting (an `async` "connect
/// and return the handle" constructor) does not satisfy this contract. Each broker also ships a
/// [`SubscriptionSource`](crate::SubscriptionSource) for its subjects, resolved after `connect`.
/// [`conformance::harness::lifecycle`](crate::conformance::harness::lifecycle) checks the whole
/// path: synchronous construction, `connect`, subscribe through the source, deliver, ack, shutdown.
///
/// # Examples
///
/// ```
/// use ruststream::Broker;
///
/// async fn lifecycle<B: Broker>(broker: &B) -> Result<(), B::Error> {
/// broker.connect().await?;
/// broker.shutdown().await
/// }
/// ```
///
/// [`connect`]: Self::connect
/// [`shutdown`]: Self::shutdown