ruststream_fred/testing/
broker.rs1use std::sync::{Arc, OnceLock};
7
8use bytes::Bytes;
9use ruststream::{
10 Broker, DescribeServer, OutgoingMessage, RawMessage, ServerSpec, Subscribe,
11 testing::{Coordinator, TestableBroker},
12};
13
14use crate::{
15 error::RedisError,
16 testing::{RedisTestPublisher, RedisTestSubscriber, router::KeyRouter},
17};
18
19#[derive(Default)]
25pub(crate) struct TestBrokerState {
26 pub(crate) router: KeyRouter,
27 coordinator: OnceLock<Coordinator>,
31}
32
33impl TestBrokerState {
34 pub(crate) fn install_coordinator(&self, coordinator: Coordinator) {
37 let _ = self.coordinator.set(coordinator);
38 }
39
40 pub(crate) fn coordinator(&self) -> Option<Coordinator> {
43 self.coordinator.get().cloned()
44 }
45}
46
47impl std::fmt::Debug for TestBrokerState {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("TestBrokerState")
50 .field("router", &self.router)
51 .finish_non_exhaustive()
52 }
53}
54
55#[derive(Clone, Default, Debug)]
65pub struct RedisTestBroker {
66 state: Arc<TestBrokerState>,
67}
68
69impl RedisTestBroker {
70 #[must_use]
72 pub fn new() -> Self {
73 Self::default()
74 }
75
76 #[allow(
84 clippy::unused_async,
85 reason = "API parity with RedisBroker::subscribe"
86 )]
87 pub async fn subscribe(
88 &self,
89 key: impl Into<String>,
90 ) -> Result<RedisTestSubscriber, RedisError> {
91 let key = key.into();
92 validate_key(&key).map_err(RedisError::Subscribe)?;
93 let (id, requeue, rx) = self.state.router.subscribe(key);
94 Ok(RedisTestSubscriber::new(
95 Arc::clone(&self.state),
96 id,
97 rx,
98 requeue,
99 ))
100 }
101
102 #[must_use]
104 pub fn publisher(&self) -> RedisTestPublisher {
105 RedisTestPublisher::new(Arc::clone(&self.state))
106 }
107}
108
109impl Broker for RedisTestBroker {
110 type Error = RedisError;
111
112 async fn connect(&self) -> Result<(), Self::Error> {
113 Ok(())
114 }
115
116 async fn shutdown(&self) -> Result<(), Self::Error> {
117 self.state.router.clear();
118 Ok(())
119 }
120}
121
122#[allow(clippy::use_self)]
123impl Subscribe for RedisTestBroker {
124 type Subscriber = RedisTestSubscriber;
125
126 async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
127 RedisTestBroker::subscribe(self, name).await
128 }
129}
130
131impl TestableBroker for RedisTestBroker {
133 fn install_coordinator(&self, coordinator: Coordinator) {
134 self.state.install_coordinator(coordinator);
135 }
136
137 fn inject(&self, message: OutgoingMessage<'_>) {
138 self.state.router.publish(
142 message.name().to_owned(),
143 Bytes::copy_from_slice(message.payload()),
144 message.headers().clone(),
145 self.state.coordinator().as_ref(),
146 );
147 }
148
149 fn published(&self, name: &str) -> Vec<RawMessage> {
150 self.state.router.published(name)
151 }
152}
153
154ruststream::register_testable_broker!(RedisTestBroker);
155type BoxError = Box<dyn std::error::Error + Send + Sync>;
158
159fn validate_key(key: &str) -> Result<(), BoxError> {
161 if key.is_empty() {
162 return Err("stream key must be non-empty".into());
163 }
164 Ok(())
165}
166
167pub(crate) fn validate_publish_key(key: &str) -> Result<(), RedisError> {
169 validate_key(key).map_err(RedisError::Publish)
170}
171
172impl DescribeServer for RedisTestBroker {
173 fn describe_server(&self) -> ServerSpec {
174 ServerSpec::in_process("redis")
176 }
177}