use std::future::{IntoFuture, Ready};
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
use crate::{
api::{
handlers::{locked, Callback, DefaultHandler, IntoHandler},
key_expr::KeyExpr,
sample::{Locality, Sample},
subscriber::{Subscriber, SubscriberInner, SubscriberKind},
},
Session,
};
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct SubscriberBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> {
#[cfg(feature = "internal")]
pub session: &'a Session,
#[cfg(not(feature = "internal"))]
pub(crate) session: &'a Session,
#[cfg(feature = "internal")]
pub key_expr: ZResult<KeyExpr<'b>>,
#[cfg(not(feature = "internal"))]
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
#[cfg(feature = "internal")]
pub origin: Locality,
#[cfg(not(feature = "internal"))]
pub(crate) origin: Locality,
#[cfg(feature = "internal")]
pub handler: Handler,
#[cfg(not(feature = "internal"))]
pub(crate) handler: Handler,
}
impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn callback<F>(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback<Sample>>
where
F: Fn(Sample) + Send + Sync + 'static,
{
self.with(Callback::from(callback))
}
#[inline]
pub fn callback_mut<F>(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback<Sample>>
where
F: FnMut(Sample) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
#[inline]
pub fn with<Handler>(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Handler>
where
Handler: IntoHandler<Sample>,
{
let SubscriberBuilder {
session,
key_expr,
origin,
handler: _,
} = self;
SubscriberBuilder {
session,
key_expr,
origin,
handler,
}
}
}
impl<'a, 'b> SubscriberBuilder<'a, 'b, Callback<Sample>> {
pub fn background(self) -> SubscriberBuilder<'a, 'b, Callback<Sample>, true> {
SubscriberBuilder {
session: self.session,
key_expr: self.key_expr,
origin: self.origin,
handler: self.handler,
}
}
}
impl<Handler, const BACKGROUND: bool> SubscriberBuilder<'_, '_, Handler, BACKGROUND> {
#[inline]
pub fn allowed_origin(mut self, origin: Locality) -> Self {
self.origin = origin;
self
}
}
impl<Handler> Resolvable for SubscriberBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type To = ZResult<Subscriber<Handler::Handler>>;
}
impl<Handler> Wait for SubscriberBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
let mut key_expr = self.key_expr?;
key_expr = self.session.declare_nonwild_prefix(key_expr)?;
let session = self.session;
let (callback, receiver) = self.handler.into_handler();
let callback_sync_group = crate::api::cancellation::SyncGroup::default();
session
.declare_subscriber_inner(
&key_expr,
self.origin,
callback,
callback_sync_group.notifier(),
)
.map(|sub_state| Subscriber {
inner: SubscriberInner {
session: session.downgrade(),
id: sub_state.id,
key_expr: sub_state.key_expr.clone(),
kind: SubscriberKind::Subscriber,
undeclare_on_drop: true,
},
handler: receiver,
callback_sync_group,
})
}
}
impl<Handler> IntoFuture for SubscriberBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl Resolvable for SubscriberBuilder<'_, '_, Callback<Sample>, true> {
type To = ZResult<()>;
}
impl Wait for SubscriberBuilder<'_, '_, Callback<Sample>, true> {
fn wait(self) -> <Self as Resolvable>::To {
let mut key_expr = self.key_expr?;
key_expr = self.session.declare_nonwild_prefix(key_expr)?;
self.session
.declare_subscriber_inner(&key_expr, self.origin, self.handler, None)?;
Ok(())
}
}
impl IntoFuture for SubscriberBuilder<'_, '_, Callback<Sample>, true> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}