use super::*;
#[must_use = "An AcceptorBuilder must be `finish`ed to be actually enabled."]
pub struct AcceptorBuilder<'a, Flavor, Opener = (), OpenerOverloadId = ()> {
pub(crate) bus: &'a Bus,
pub(crate) inner: Flavor,
pub(crate) reliability: bus::Reliability,
pub(crate) opener: Opener,
pub(crate) marker: core::marker::PhantomData<OpenerOverloadId>,
}
impl<'a, Flavor> AcceptorBuilder<'a, Flavor> {
pub fn on_receive_factory<Opener: CandidateHandler<OverloadId>, OverloadId>(
self,
opener: Opener,
) -> AcceptorBuilder<'a, Flavor, Opener, OverloadId> {
let Self {
bus,
inner,
reliability,
..
} = self;
AcceptorBuilder {
bus,
inner,
reliability,
opener,
marker: core::marker::PhantomData,
}
}
}
impl<Flavor, Opener, OverloadId> AcceptorBuilder<'_, Flavor, Opener, OverloadId> {
pub fn reliability(mut self, mode: bus::Reliability) -> Self {
self.reliability = mode;
self
}
}
pub struct ExactMatch(pub(crate) Option<bus::Topic>);
impl<Opener: CandidateHandler<OverloadId>, OverloadId>
AcceptorBuilder<'_, ExactMatch, Opener, OverloadId>
{
pub fn finish<OverloadId2, Handler>(
self,
on_connect: Handler,
) -> Result<Acceptor<Handler::Receiver>, BindError>
where
Handler: IntoChannel<OverloadId2, Opener::Output>,
Handler::Sender: ConcurrentChannel<OverloadId2, Opener::Output>,
{
let (tx, rx) = on_connect.into_channel();
let Self {
bus,
inner: ExactMatch(Some(topic)),
opener,
reliability,
marker: _,
} = self
else {
return Err(BindError::TopicAlreadyBound);
};
match bus.bus.inner.bind_topic(
(&*topic).into(),
reliability,
Arc::new(move |candidate| {
tx.send_infallibly(opener.open(StreamCandidate { inner: candidate }))
})
.into(),
) {
bus::BindResult::Ok(acceptor) => Ok(Acceptor {
receiver: rx,
acceptor,
}),
bus::BindResult::Err(e) => Err(e),
}
}
pub fn finish_with<F: Fn(Opener::Output) + Send + Sync + 'static>(
self,
on_connect: F,
) -> Result<Acceptor<()>, BindError> {
self.finish(on_connect)
}
}
pub use bus::BindError;
#[must_use = "An Acceptor must be kept alive to keep the acceptor running"]
pub struct Acceptor<T> {
receiver: T,
acceptor: bus::Acceptor,
}
impl<T> Acceptor<T> {
pub fn topic(&self) -> bus::Topic {
self.acceptor.inner.topic()
}
pub fn reliability(&self) -> bus::Reliability {
self.acceptor.inner.reliability()
}
}
impl<T> Deref for Acceptor<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.receiver
}
}
impl<T> DerefMut for Acceptor<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.receiver
}
}