use std::{sync::Arc, time::Duration};
use ruststream::{Broker, RawMessage};
use crate::{
error::NatsError,
subscribe_options::SubscribeOptions,
testing::{
NatsTestPublisher, NatsTestSubscriber,
router::SubjectRouter,
subject::{SubjectPattern, validate_concrete_subject},
},
};
#[derive(Default)]
pub(crate) struct TestBrokerState {
pub(crate) router: SubjectRouter,
}
impl std::fmt::Debug for TestBrokerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBrokerState")
.field("router", &self.router)
.finish()
}
}
#[derive(Clone, Default, Debug)]
pub struct NatsTestBroker {
state: Arc<TestBrokerState>,
}
impl NatsTestBroker {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub(crate) fn state(&self) -> &Arc<TestBrokerState> {
&self.state
}
#[allow(clippy::unused_async, reason = "API parity with NatsBroker::subscribe")]
pub async fn subscribe(&self, opts: SubscribeOptions) -> Result<NatsTestSubscriber, NatsError> {
opts.validate()?;
let pattern = SubjectPattern::parse(opts.subject()).map_err(|err| {
NatsError::Subscribe(Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
})?;
let (id, requeue, rx) = self.state.router.subscribe(pattern);
Ok(NatsTestSubscriber::new(
Arc::clone(&self.state),
id,
rx,
requeue,
))
}
#[must_use]
pub fn publisher(&self) -> NatsTestPublisher {
NatsTestPublisher::new(Arc::clone(&self.state))
}
pub async fn expect_published(
&self,
topic: &str,
count: usize,
timeout_dur: Duration,
) -> Vec<RawMessage> {
self.state
.router
.expect_published(topic, count, timeout_dur)
.await
}
}
impl Broker for NatsTestBroker {
type Subscriber = NatsTestSubscriber;
type Publisher = NatsTestPublisher;
type Error = NatsError;
async fn connect(&self) -> Result<(), Self::Error> {
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
self.state.router.clear();
Ok(())
}
}
pub(crate) fn validate_publish_subject(subject: &str) -> Result<(), NatsError> {
validate_concrete_subject(subject).map_err(|err| {
NatsError::Publish(Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
})
}