Skip to main content

ruststream_fred/testing/
client.rs

1//! [`RedisTestClient`]: `TestClient` driver consumed by the conformance harness.
2
3use std::time::Duration;
4
5use ruststream::{Broker, OutgoingMessage, Publisher, RawMessage, testing::TestClient};
6
7use crate::{
8    error::RedisError,
9    testing::{
10        broker::RedisTestBroker, publisher::RedisTestPublisher, subscriber::RedisTestSubscriber,
11    },
12};
13
14/// Driver around a single [`RedisTestBroker`] instance.
15///
16/// `RedisTestClient::start()` constructs a fresh, isolated broker. Use it as the entry point in the
17/// `ruststream::conformance` harness and in handler integration tests.
18pub struct RedisTestClient {
19    broker: RedisTestBroker,
20}
21
22impl std::fmt::Debug for RedisTestClient {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        f.debug_struct("RedisTestClient")
25            .field("broker", &self.broker)
26            .finish()
27    }
28}
29
30impl TestClient for RedisTestClient {
31    type Broker = RedisTestBroker;
32    type Subscriber = RedisTestSubscriber;
33    type Publisher = RedisTestPublisher;
34    type Error = RedisError;
35
36    async fn start() -> Result<Self, Self::Error> {
37        Ok(Self {
38            broker: RedisTestBroker::new(),
39        })
40    }
41
42    fn broker(&self) -> &Self::Broker {
43        &self.broker
44    }
45
46    async fn publish(&self, topic: &str, payload: &[u8]) -> Result<(), Self::Error> {
47        let publisher = self.broker.publisher();
48        publisher
49            .publish(OutgoingMessage::new(topic, payload))
50            .await
51    }
52
53    async fn subscribe(&self, topic: &str) -> Result<RedisTestSubscriber, Self::Error> {
54        self.broker.subscribe(topic).await
55    }
56
57    async fn publisher(&self) -> Result<Self::Publisher, Self::Error> {
58        Ok(self.broker.publisher())
59    }
60
61    async fn expect_published(
62        &self,
63        topic: &str,
64        count: usize,
65        timeout_dur: Duration,
66    ) -> Result<Vec<RawMessage>, Self::Error> {
67        Ok(self
68            .broker
69            .state()
70            .router
71            .expect_published(topic, count, timeout_dur)
72            .await)
73    }
74
75    async fn shutdown(self) -> Result<(), Self::Error> {
76        <RedisTestBroker as Broker>::shutdown(&self.broker).await
77    }
78}