ruststream_fred/testing/
broker.rs1use std::{sync::Arc, time::Duration};
4
5use ruststream::{Broker, DescribeServer, RawMessage, ServerSpec, Subscribe};
6
7use crate::{
8 error::RedisError,
9 testing::{RedisTestPublisher, RedisTestSubscriber, router::KeyRouter},
10};
11
12#[derive(Default)]
18pub(crate) struct TestBrokerState {
19 pub(crate) router: KeyRouter,
20}
21
22impl std::fmt::Debug for TestBrokerState {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 f.debug_struct("TestBrokerState")
25 .field("router", &self.router)
26 .finish()
27 }
28}
29
30#[derive(Clone, Default, Debug)]
40pub struct RedisTestBroker {
41 state: Arc<TestBrokerState>,
42}
43
44impl RedisTestBroker {
45 #[must_use]
47 pub fn new() -> Self {
48 Self::default()
49 }
50
51 pub(crate) fn state(&self) -> &Arc<TestBrokerState> {
52 &self.state
53 }
54
55 #[allow(
63 clippy::unused_async,
64 reason = "API parity with RedisBroker::subscribe"
65 )]
66 pub async fn subscribe(
67 &self,
68 key: impl Into<String>,
69 ) -> Result<RedisTestSubscriber, RedisError> {
70 let key = key.into();
71 validate_key(&key).map_err(RedisError::Subscribe)?;
72 let (id, requeue, rx) = self.state.router.subscribe(key);
73 Ok(RedisTestSubscriber::new(
74 Arc::clone(&self.state),
75 id,
76 rx,
77 requeue,
78 ))
79 }
80
81 #[must_use]
83 pub fn publisher(&self) -> RedisTestPublisher {
84 RedisTestPublisher::new(Arc::clone(&self.state))
85 }
86
87 pub async fn expect_published(
91 &self,
92 key: &str,
93 count: usize,
94 timeout_dur: Duration,
95 ) -> Vec<RawMessage> {
96 self.state
97 .router
98 .expect_published(key, count, timeout_dur)
99 .await
100 }
101}
102
103impl Broker for RedisTestBroker {
104 type Error = RedisError;
105
106 async fn connect(&self) -> Result<(), Self::Error> {
107 Ok(())
108 }
109
110 async fn shutdown(&self) -> Result<(), Self::Error> {
111 self.state.router.clear();
112 Ok(())
113 }
114}
115
116#[allow(clippy::use_self)]
117impl Subscribe for RedisTestBroker {
118 type Subscriber = RedisTestSubscriber;
119
120 async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
121 RedisTestBroker::subscribe(self, name).await
122 }
123}
124
125type BoxError = Box<dyn std::error::Error + Send + Sync>;
126
127fn validate_key(key: &str) -> Result<(), BoxError> {
129 if key.is_empty() {
130 return Err("stream key must be non-empty".into());
131 }
132 Ok(())
133}
134
135pub(crate) fn validate_publish_key(key: &str) -> Result<(), RedisError> {
137 validate_key(key).map_err(RedisError::Publish)
138}
139
140impl DescribeServer for RedisTestBroker {
141 fn describe_server(&self) -> ServerSpec {
142 ServerSpec::new("in-process", "redis")
144 }
145}