use std::sync::{Arc, OnceLock};
use bytes::Bytes;
use ruststream::{
Broker, DescribeServer, OutgoingMessage, RawMessage, ServerSpec, Subscribe,
testing::{Coordinator, TestableBroker},
};
use crate::{
error::NatsError,
subscribe_options::SubscribeOptions,
testing::{
NatsTestPublisher, NatsTestSubscriber,
router::SubjectRouter,
subject::{SubjectPattern, validate_concrete_subject},
},
};
#[derive(Default)]
pub(crate) struct TestBrokerState {
pub(crate) router: SubjectRouter,
coordinator: OnceLock<Coordinator>,
}
impl TestBrokerState {
fn install_coordinator(&self, coordinator: Coordinator) {
let _ = self.coordinator.set(coordinator);
}
pub(crate) fn coordinator(&self) -> Option<Coordinator> {
self.coordinator.get().cloned()
}
}
impl std::fmt::Debug for TestBrokerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBrokerState")
.field("router", &self.router)
.finish_non_exhaustive()
}
}
#[derive(Clone, Default, Debug)]
pub struct NatsTestBroker {
state: Arc<TestBrokerState>,
}
impl NatsTestBroker {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[allow(clippy::unused_async, reason = "API parity with NatsBroker::subscribe")]
pub async fn subscribe(&self, opts: SubscribeOptions) -> Result<NatsTestSubscriber, NatsError> {
opts.validate()?;
let pattern = SubjectPattern::parse(opts.subject()).map_err(|err| {
NatsError::Subscribe(Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
})?;
let (id, requeue, rx) = self.state.router.subscribe(pattern);
Ok(NatsTestSubscriber::new(
Arc::clone(&self.state),
id,
rx,
requeue,
self.state.coordinator(),
))
}
#[must_use]
pub fn publisher(&self) -> NatsTestPublisher {
NatsTestPublisher::new(Arc::clone(&self.state))
}
}
impl Broker for NatsTestBroker {
type Error = NatsError;
async fn connect(&self) -> Result<(), Self::Error> {
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
self.state.router.clear();
Ok(())
}
}
#[allow(clippy::use_self)]
impl Subscribe for NatsTestBroker {
type Subscriber = NatsTestSubscriber;
async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
NatsTestBroker::subscribe(self, SubscribeOptions::new(name)).await
}
}
impl TestableBroker for NatsTestBroker {
fn install_coordinator(&self, coordinator: Coordinator) {
self.state.install_coordinator(coordinator);
}
fn inject(&self, message: OutgoingMessage<'_>) {
self.state.router.publish(
message.name().to_owned(),
Bytes::copy_from_slice(message.payload()),
message.headers().clone(),
self.state.coordinator().as_ref(),
);
}
fn published(&self, name: &str) -> Vec<RawMessage> {
self.state.router.published(name)
}
}
ruststream::register_testable_broker!(NatsTestBroker);
pub(crate) fn validate_publish_subject(subject: &str) -> Result<(), NatsError> {
validate_concrete_subject(subject).map_err(|err| {
NatsError::Publish(Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
})
}
impl DescribeServer for NatsTestBroker {
fn describe_server(&self) -> ServerSpec {
ServerSpec::in_process("nats")
}
}