use std::{
future::{IntoFuture, Ready},
time::Duration,
};
use zenoh_core::{Resolvable, Result as ZResult, Wait};
#[cfg(feature = "unstable")]
use crate::api::cancellation::CancellationTokenBuilderTrait;
use crate::api::{
handlers::{locked, Callback, DefaultHandler, IntoHandler},
key_expr::KeyExpr,
liveliness::LivelinessToken,
query::Reply,
sample::{Locality, Sample},
session::Session,
subscriber::{Subscriber, SubscriberInner, SubscriberKind},
};
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct LivelinessTokenBuilder<'a, 'b> {
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
}
impl Resolvable for LivelinessTokenBuilder<'_, '_> {
type To = ZResult<LivelinessToken>;
}
impl Wait for LivelinessTokenBuilder<'_, '_> {
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
let session = self.session;
let key_expr = self.key_expr?.into_owned();
session
.declare_liveliness_inner(&key_expr)
.map(|id| LivelinessToken {
session: self.session.downgrade(),
id,
undeclare_on_drop: true,
})
}
}
impl IntoFuture for LivelinessTokenBuilder<'_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct LivelinessSubscriberBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> {
pub session: &'a Session,
pub key_expr: ZResult<KeyExpr<'b>>,
pub handler: Handler,
pub history: bool,
}
impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn callback<F>(self, callback: F) -> LivelinessSubscriberBuilder<'a, 'b, Callback<Sample>>
where
F: Fn(Sample) + Send + Sync + 'static,
{
self.with(Callback::from(callback))
}
#[inline]
pub fn callback_mut<F>(
self,
callback: F,
) -> LivelinessSubscriberBuilder<'a, 'b, Callback<Sample>>
where
F: FnMut(Sample) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
#[inline]
pub fn with<Handler>(self, handler: Handler) -> LivelinessSubscriberBuilder<'a, 'b, Handler>
where
Handler: IntoHandler<Sample>,
{
let LivelinessSubscriberBuilder {
session,
key_expr,
handler: _,
history,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
handler,
history,
}
}
}
impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, Callback<Sample>> {
pub fn background(self) -> LivelinessSubscriberBuilder<'a, 'b, Callback<Sample>, true> {
LivelinessSubscriberBuilder {
session: self.session,
key_expr: self.key_expr,
handler: self.handler,
history: self.history,
}
}
}
impl<Handler, const BACKGROUND: bool> LivelinessSubscriberBuilder<'_, '_, Handler, BACKGROUND> {
#[inline]
pub fn history(mut self, history: bool) -> Self {
self.history = history;
self
}
}
impl<Handler> Resolvable for LivelinessSubscriberBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type To = ZResult<Subscriber<Handler::Handler>>;
}
impl<Handler> Wait for LivelinessSubscriberBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
let key_expr = self.key_expr?;
let session = self.session;
let (callback, handler) = self.handler.into_handler();
let callback_sync_group = crate::api::cancellation::SyncGroup::default();
session
.declare_liveliness_subscriber_inner(
&key_expr,
Locality::default(),
self.history,
callback,
callback_sync_group.notifier(),
)
.map(|sub_state| Subscriber {
inner: SubscriberInner {
session: self.session.downgrade(),
id: sub_state.id,
key_expr: sub_state.key_expr.clone(),
kind: SubscriberKind::LivelinessSubscriber,
undeclare_on_drop: true,
},
handler,
callback_sync_group,
})
}
}
impl<Handler> IntoFuture for LivelinessSubscriberBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl Resolvable for LivelinessSubscriberBuilder<'_, '_, Callback<Sample>, true> {
type To = ZResult<()>;
}
impl Wait for LivelinessSubscriberBuilder<'_, '_, Callback<Sample>, true> {
fn wait(self) -> <Self as Resolvable>::To {
self.session.declare_liveliness_subscriber_inner(
&self.key_expr?,
Locality::default(),
self.history,
self.handler,
None,
)?;
Ok(())
}
}
impl IntoFuture for LivelinessSubscriberBuilder<'_, '_, Callback<Sample>, true> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct LivelinessGetBuilder<'a, 'b, Handler> {
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) timeout: Duration,
pub(crate) handler: Handler,
#[cfg(feature = "unstable")]
pub(crate) cancellation_token: Option<crate::api::cancellation::CancellationToken>,
}
impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn callback<F>(self, callback: F) -> LivelinessGetBuilder<'a, 'b, Callback<Reply>>
where
F: Fn(Reply) + Send + Sync + 'static,
{
self.with(Callback::from(callback))
}
#[inline]
pub fn callback_mut<F>(self, callback: F) -> LivelinessGetBuilder<'a, 'b, Callback<Reply>>
where
F: FnMut(Reply) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
#[inline]
pub fn with<Handler>(self, handler: Handler) -> LivelinessGetBuilder<'a, 'b, Handler>
where
Handler: IntoHandler<Reply>,
{
let LivelinessGetBuilder {
session,
key_expr,
timeout,
handler: _,
#[cfg(feature = "unstable")]
cancellation_token,
} = self;
LivelinessGetBuilder {
session,
key_expr,
timeout,
handler,
#[cfg(feature = "unstable")]
cancellation_token,
}
}
}
impl<Handler> LivelinessGetBuilder<'_, '_, Handler> {
#[inline]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
#[cfg(feature = "unstable")]
#[zenoh_macros::internal_trait]
impl<Handler> CancellationTokenBuilderTrait for LivelinessGetBuilder<'_, '_, Handler> {
#[zenoh_macros::unstable_doc]
fn cancellation_token(
self,
cancellation_token: crate::api::cancellation::CancellationToken,
) -> Self {
Self {
cancellation_token: Some(cancellation_token),
..self
}
}
}
impl<Handler> Resolvable for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
type To = ZResult<Handler::Handler>;
}
impl<Handler> Wait for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_handler();
self.session.liveliness_query(
&self.key_expr?,
self.timeout,
callback,
#[cfg(feature = "unstable")]
self.cancellation_token,
)?;
Ok(receiver)
}
}
impl<Handler> IntoFuture for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}