1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//! The [`TestableBroker`] contract: the single test interface a broker implements so its in-process
//! routing works with both the [`TestApp`](super::TestApp) harness and the
//! [`conformance`](crate::conformance) suite.
use Duration;
use crate::;
use Coordinator;
/// A broker whose in-process routing can be driven by the test tooling.
///
/// A broker crate ships an in-process transport - a normal [`Broker`](crate::Broker) that routes in
/// memory with no server, emulating the broker's Core routing - and implements `TestableBroker` on
/// it. That one
/// type then works with both the [`TestApp`](super::TestApp) harness (application unit tests) and
/// [`conformance::harness::run_suite`](crate::conformance::harness::run_suite) (routing self-check).
///
/// To plug into the harness, the broker also:
/// - calls [`Coordinator::enqueued`] on every live enqueue into a subscriber and
/// [`Coordinator::consumed`] when a delivery is acked, nacked, or dropped (so the harness can tell
/// when the reaction has settled), and routes delayed redeliveries through
/// [`Coordinator::schedule_redelivery`];
/// - registers its concrete type with [`register_testable_broker!`](crate::register_testable_broker)
/// so the harness can recover it from the type-erased app.
///
/// [`MemoryBroker`](crate::memory::MemoryBroker) is the in-tree reference implementation.
///
/// It is a separate, object-safe capability (not a [`Broker`](crate::Broker) supertrait, since
/// `Broker` is not dyn-compatible), so the harness can hold `&dyn TestableBroker` recovered from the
/// type-erased app.
///
/// # Examples
///
/// ```
/// # #[cfg(feature = "memory")]
/// # {
/// use ruststream::memory::MemoryBroker;
/// use ruststream::testing::TestableBroker;
///
/// fn published<B: TestableBroker>(broker: &B, name: &str) -> usize {
/// broker.published(name).len()
/// }
///
/// assert_eq!(published(&MemoryBroker::new(), "orders"), 0);
/// # }
/// ```
/// A downcaster from a type-erased broker to a concrete [`TestableBroker`].
///
/// Submitted once per broker type with
/// [`register_testable_broker!`](crate::register_testable_broker); the harness iterates the
/// registered downcasters to recover each broker's transport from the built app.
collect!;
/// Registers a concrete [`TestableBroker`] type for harness recovery from a built application.
///
/// Call it once at broker-crate scope (under the crate's `testing` feature) so
/// [`TestApp`](crate::testing::TestApp) can recover that broker from the type-erased app.
///
/// # Examples
///
/// ```
/// # #[cfg(feature = "memory")]
/// # {
/// use ruststream::memory::MemoryBroker;
/// // The in-tree `MemoryBroker` is already registered; a broker crate registers its own type:
/// // ruststream::register_testable_broker!(MyTestBroker);
/// # let _ = MemoryBroker::new();
/// # }
/// ```
/// Waits until at least `count` messages have been published to `name` on `broker`.
///
/// Returns all observed messages; on timeout it returns those seen so far (so assert on the returned
/// messages, not just on length). For application tests prefer [`TestApp`](super::TestApp), which
/// drives to quiescence without polling; this helper is for tests that run a service via
/// [`run_until`](crate::runtime::RustStream::run_until) directly.
///
/// # Examples
///
/// ```
/// # #[cfg(all(feature = "memory", feature = "json"))]
/// # async fn demo() {
/// use std::time::Duration;
/// use ruststream::memory::MemoryBroker;
/// use ruststream::testing::{TestableBroker, expect_published};
/// use ruststream::{OutgoingMessage};
///
/// let broker = MemoryBroker::new();
/// broker.inject(OutgoingMessage::new("out", b"x".as_slice()));
/// let seen = expect_published(&broker, "out", 1, Duration::from_secs(1)).await;
/// assert_eq!(seen.len(), 1);
/// # }
/// ```
pub async