use std::{sync::Arc, time::Duration};
use ruststream::{Broker, DescribeServer, RawMessage, ServerSpec, Subscribe};
use crate::{
error::RedisError,
testing::{RedisTestPublisher, RedisTestSubscriber, router::KeyRouter},
};
#[derive(Default)]
pub(crate) struct TestBrokerState {
pub(crate) router: KeyRouter,
}
impl std::fmt::Debug for TestBrokerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBrokerState")
.field("router", &self.router)
.finish()
}
}
#[derive(Clone, Default, Debug)]
pub struct RedisTestBroker {
state: Arc<TestBrokerState>,
}
impl RedisTestBroker {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub(crate) fn state(&self) -> &Arc<TestBrokerState> {
&self.state
}
#[allow(
clippy::unused_async,
reason = "API parity with RedisBroker::subscribe"
)]
pub async fn subscribe(
&self,
key: impl Into<String>,
) -> Result<RedisTestSubscriber, RedisError> {
let key = key.into();
validate_key(&key).map_err(RedisError::Subscribe)?;
let (id, requeue, rx) = self.state.router.subscribe(key);
Ok(RedisTestSubscriber::new(
Arc::clone(&self.state),
id,
rx,
requeue,
))
}
#[must_use]
pub fn publisher(&self) -> RedisTestPublisher {
RedisTestPublisher::new(Arc::clone(&self.state))
}
pub async fn expect_published(
&self,
key: &str,
count: usize,
timeout_dur: Duration,
) -> Vec<RawMessage> {
self.state
.router
.expect_published(key, count, timeout_dur)
.await
}
}
impl Broker for RedisTestBroker {
type Error = RedisError;
async fn connect(&self) -> Result<(), Self::Error> {
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
self.state.router.clear();
Ok(())
}
}
#[allow(clippy::use_self)]
impl Subscribe for RedisTestBroker {
type Subscriber = RedisTestSubscriber;
async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
RedisTestBroker::subscribe(self, name).await
}
}
type BoxError = Box<dyn std::error::Error + Send + Sync>;
fn validate_key(key: &str) -> Result<(), BoxError> {
if key.is_empty() {
return Err("stream key must be non-empty".into());
}
Ok(())
}
pub(crate) fn validate_publish_key(key: &str) -> Result<(), RedisError> {
validate_key(key).map_err(RedisError::Publish)
}
impl DescribeServer for RedisTestBroker {
fn describe_server(&self) -> ServerSpec {
ServerSpec::new("in-process", "redis")
}
}