use std::future::{IntoFuture, Ready};
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
use crate::{
api::{
cancellation::SyncGroup,
handlers::{locked, DefaultHandler, IntoHandler},
key_expr::KeyExpr,
queryable::{Query, Queryable, QueryableInner},
sample::Locality,
},
handlers::Callback,
Session,
};
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct QueryableBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> {
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) complete: bool,
pub(crate) origin: Locality,
pub(crate) handler: Handler,
}
impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn callback<F>(self, callback: F) -> QueryableBuilder<'a, 'b, Callback<Query>>
where
F: Fn(Query) + Send + Sync + 'static,
{
self.with(Callback::from(callback))
}
#[inline]
pub fn callback_mut<F>(self, callback: F) -> QueryableBuilder<'a, 'b, Callback<Query>>
where
F: FnMut(Query) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
#[inline]
pub fn with<Handler>(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler>
where
Handler: IntoHandler<Query>,
{
let QueryableBuilder {
session,
key_expr,
complete,
origin,
handler: _,
} = self;
QueryableBuilder {
session,
key_expr,
complete,
origin,
handler,
}
}
}
impl<'a, 'b> QueryableBuilder<'a, 'b, Callback<Query>> {
pub fn background(self) -> QueryableBuilder<'a, 'b, Callback<Query>, true> {
QueryableBuilder {
session: self.session,
key_expr: self.key_expr,
complete: self.complete,
origin: self.origin,
handler: self.handler,
}
}
}
impl<Handler, const BACKGROUND: bool> QueryableBuilder<'_, '_, Handler, BACKGROUND> {
#[inline]
pub fn complete(mut self, complete: bool) -> Self {
self.complete = complete;
self
}
#[inline]
pub fn allowed_origin(mut self, origin: Locality) -> Self {
self.origin = origin;
self
}
}
impl<Handler> Resolvable for QueryableBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Query> + Send,
Handler::Handler: Send,
{
type To = ZResult<Queryable<Handler::Handler>>;
}
impl<Handler> Wait for QueryableBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Query> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
let callback_sync_group = SyncGroup::default();
let session = self.session;
let (callback, receiver) = self.handler.into_handler();
let mut ke = self.key_expr?;
ke = self.session.declare_nonwild_prefix(ke)?;
session
.declare_queryable_inner(
&ke,
self.complete,
self.origin,
callback,
callback_sync_group.notifier(),
)
.map(|qable_state| Queryable {
inner: QueryableInner {
session: self.session.downgrade(),
id: qable_state.id,
undeclare_on_drop: true,
key_expr: ke.into_owned(),
},
handler: receiver,
callback_sync_group,
})
}
}
impl<Handler> IntoFuture for QueryableBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Query> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl Resolvable for QueryableBuilder<'_, '_, Callback<Query>, true> {
type To = ZResult<()>;
}
impl Wait for QueryableBuilder<'_, '_, Callback<Query>, true> {
fn wait(self) -> <Self as Resolvable>::To {
let mut ke = self.key_expr?;
ke = self.session.declare_nonwild_prefix(ke)?;
self.session.declare_queryable_inner(
&ke,
self.complete,
self.origin,
self.handler,
None,
)?;
Ok(())
}
}
impl IntoFuture for QueryableBuilder<'_, '_, Callback<Query>, true> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}