ruststream_fred/testing/
client.rs1use std::time::Duration;
4
5use ruststream::{Broker, OutgoingMessage, Publisher, RawMessage, testing::TestClient};
6
7use crate::{
8 error::RedisError,
9 testing::{
10 broker::RedisTestBroker, publisher::RedisTestPublisher, subscriber::RedisTestSubscriber,
11 },
12};
13
14pub struct RedisTestClient {
19 broker: RedisTestBroker,
20}
21
22impl std::fmt::Debug for RedisTestClient {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 f.debug_struct("RedisTestClient")
25 .field("broker", &self.broker)
26 .finish()
27 }
28}
29
30impl TestClient for RedisTestClient {
31 type Broker = RedisTestBroker;
32 type Subscriber = RedisTestSubscriber;
33 type Publisher = RedisTestPublisher;
34 type Error = RedisError;
35
36 async fn start() -> Result<Self, Self::Error> {
37 Ok(Self {
38 broker: RedisTestBroker::new(),
39 })
40 }
41
42 fn broker(&self) -> &Self::Broker {
43 &self.broker
44 }
45
46 async fn publish(&self, topic: &str, payload: &[u8]) -> Result<(), Self::Error> {
47 let publisher = self.broker.publisher();
48 publisher
49 .publish(OutgoingMessage::new(topic, payload))
50 .await
51 }
52
53 async fn subscribe(&self, topic: &str) -> Result<RedisTestSubscriber, Self::Error> {
54 self.broker.subscribe(topic).await
55 }
56
57 async fn publisher(&self) -> Result<Self::Publisher, Self::Error> {
58 Ok(self.broker.publisher())
59 }
60
61 async fn expect_published(
62 &self,
63 topic: &str,
64 count: usize,
65 timeout_dur: Duration,
66 ) -> Result<Vec<RawMessage>, Self::Error> {
67 Ok(self
68 .broker
69 .state()
70 .router
71 .expect_published(topic, count, timeout_dur)
72 .await)
73 }
74
75 async fn shutdown(self) -> Result<(), Self::Error> {
76 <RedisTestBroker as Broker>::shutdown(&self.broker).await
77 }
78}