use std::sync::{Arc, OnceLock};
use bytes::Bytes;
use ruststream::{
Broker, DescribeServer, OutgoingMessage, RawMessage, ServerSpec, Subscribe,
testing::{Coordinator, TestableBroker},
};
use crate::{
error::RedisError,
testing::{RedisTestPublisher, RedisTestSubscriber, router::KeyRouter},
};
#[derive(Default)]
pub(crate) struct TestBrokerState {
pub(crate) router: KeyRouter,
coordinator: OnceLock<Coordinator>,
}
impl TestBrokerState {
pub(crate) fn install_coordinator(&self, coordinator: Coordinator) {
let _ = self.coordinator.set(coordinator);
}
pub(crate) fn coordinator(&self) -> Option<Coordinator> {
self.coordinator.get().cloned()
}
}
impl std::fmt::Debug for TestBrokerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBrokerState")
.field("router", &self.router)
.finish_non_exhaustive()
}
}
#[derive(Clone, Default, Debug)]
pub struct RedisTestBroker {
state: Arc<TestBrokerState>,
}
impl RedisTestBroker {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[allow(
clippy::unused_async,
reason = "API parity with RedisBroker::subscribe"
)]
pub async fn subscribe(
&self,
key: impl Into<String>,
) -> Result<RedisTestSubscriber, RedisError> {
let key = key.into();
validate_key(&key).map_err(RedisError::Subscribe)?;
let (id, requeue, rx) = self.state.router.subscribe(key);
Ok(RedisTestSubscriber::new(
Arc::clone(&self.state),
id,
rx,
requeue,
))
}
#[must_use]
pub fn publisher(&self) -> RedisTestPublisher {
RedisTestPublisher::new(Arc::clone(&self.state))
}
}
impl Broker for RedisTestBroker {
type Error = RedisError;
async fn connect(&self) -> Result<(), Self::Error> {
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
self.state.router.clear();
Ok(())
}
}
#[allow(clippy::use_self)]
impl Subscribe for RedisTestBroker {
type Subscriber = RedisTestSubscriber;
async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
RedisTestBroker::subscribe(self, name).await
}
}
impl TestableBroker for RedisTestBroker {
fn install_coordinator(&self, coordinator: Coordinator) {
self.state.install_coordinator(coordinator);
}
fn inject(&self, message: OutgoingMessage<'_>) {
self.state.router.publish(
message.name().to_owned(),
Bytes::copy_from_slice(message.payload()),
message.headers().clone(),
self.state.coordinator().as_ref(),
);
}
fn published(&self, name: &str) -> Vec<RawMessage> {
self.state.router.published(name)
}
}
ruststream::register_testable_broker!(RedisTestBroker);
type BoxError = Box<dyn std::error::Error + Send + Sync>;
fn validate_key(key: &str) -> Result<(), BoxError> {
if key.is_empty() {
return Err("stream key must be non-empty".into());
}
Ok(())
}
pub(crate) fn validate_publish_key(key: &str) -> Result<(), RedisError> {
validate_key(key).map_err(RedisError::Publish)
}
impl DescribeServer for RedisTestBroker {
fn describe_server(&self) -> ServerSpec {
ServerSpec::in_process("redis")
}
}