use std::{
fmt,
future::{IntoFuture, Ready},
ops::{Deref, DerefMut},
sync::Arc,
};
use tracing::error;
use zenoh_core::{Resolvable, Wait};
use zenoh_protocol::{
core::{EntityId, Parameters, WireExpr, ZenohIdProto},
network::{response, Mapping, RequestId, Response, ResponseFinal},
zenoh::{self, reply::ReplyBody, Del, Put, ResponseBody},
};
use zenoh_result::ZResult;
#[zenoh_macros::unstable]
use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto};
#[zenoh_macros::unstable]
use crate::api::sample::SourceInfo;
#[zenoh_macros::internal]
use crate::net::primitives::DummyPrimitives;
use crate::{
api::{
builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder},
bytes::ZBytes,
cancellation::SyncGroup,
encoding::Encoding,
handlers::CallbackParameter,
key_expr::KeyExpr,
query::ReplyKeyExpr,
sample::{Locality, QoS, Sample, SampleKind},
selector::{Selector, REPLY_KEY_EXPR_ANY_SEL_PARAM},
session::{UndeclarableSealed, WeakSession},
Id,
},
handlers::Callback,
net::primitives::Primitives,
};
pub(crate) struct LocalReplyPrimitives {
session: WeakSession,
}
pub(crate) struct RemoteReplyPrimitives {
pub(crate) session: Option<WeakSession>,
pub(crate) primitives: Arc<dyn Primitives>,
}
pub(crate) enum ReplyPrimitives {
Local(LocalReplyPrimitives),
Remote(RemoteReplyPrimitives),
}
impl ReplyPrimitives {
pub(crate) fn new_local(session: WeakSession) -> Self {
ReplyPrimitives::Local(LocalReplyPrimitives { session })
}
pub(crate) fn new_remote(
session: Option<WeakSession>,
primitives: Arc<dyn Primitives>,
) -> Self {
ReplyPrimitives::Remote(RemoteReplyPrimitives {
session,
primitives,
})
}
pub(crate) fn send_response_final(&self, msg: &mut ResponseFinal) {
match self {
ReplyPrimitives::Local(local) => local.session.send_response_final(msg),
ReplyPrimitives::Remote(remote) => remote.primitives.send_response_final(msg),
}
}
pub(crate) fn send_response(&self, msg: &mut Response) {
match self {
ReplyPrimitives::Local(local) => local.session.send_response(msg),
ReplyPrimitives::Remote(remote) => remote.primitives.send_response(msg),
}
}
pub(crate) fn keyexpr_to_wire(&self, key_expr: &KeyExpr) -> WireExpr<'static> {
match self {
ReplyPrimitives::Local(local) => key_expr.to_wire_local(&local.session).to_owned(),
ReplyPrimitives::Remote(remote) => match &remote.session {
Some(s) => key_expr.to_wire(s).to_owned(),
None => WireExpr {
scope: 0,
suffix: std::borrow::Cow::Owned(key_expr.as_str().into()),
mapping: Mapping::Sender,
},
},
}
}
}
pub(crate) struct QueryInner {
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) parameters: Parameters<'static>,
pub(crate) qid: RequestId,
pub(crate) zid: ZenohIdProto,
pub(crate) qos: QoS,
#[cfg(feature = "unstable")]
pub(crate) source_info: Option<SourceInfo>,
pub(crate) primitives: ReplyPrimitives,
}
impl QueryInner {
#[zenoh_macros::internal]
fn empty() -> Self {
QueryInner {
key_expr: KeyExpr::dummy(),
parameters: Parameters::empty(),
qid: 0,
zid: ZenohIdProto::default(),
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: None,
primitives: ReplyPrimitives::new_remote(None, Arc::new(DummyPrimitives)),
}
}
}
impl Drop for QueryInner {
fn drop(&mut self) {
self.primitives.send_response_final(&mut ResponseFinal {
rid: self.qid,
ext_qos: self.qos.into(),
ext_tstamp: None,
});
}
}
#[derive(Clone)]
pub struct Query {
pub(crate) inner: Arc<QueryInner>,
pub(crate) eid: EntityId,
pub(crate) value: Option<(ZBytes, Encoding)>,
pub(crate) attachment: Option<ZBytes>,
}
impl Query {
#[inline(always)]
pub fn selector(&self) -> Selector<'_> {
Selector::borrowed(&self.inner.key_expr, &self.inner.parameters)
}
#[inline(always)]
pub fn key_expr(&self) -> &KeyExpr<'static> {
&self.inner.key_expr
}
#[inline(always)]
pub fn parameters(&self) -> &Parameters<'static> {
&self.inner.parameters
}
#[inline(always)]
pub fn payload(&self) -> Option<&ZBytes> {
self.value.as_ref().map(|v| &v.0)
}
#[inline(always)]
pub fn payload_mut(&mut self) -> Option<&mut ZBytes> {
self.value.as_mut().map(|v| &mut v.0)
}
#[inline(always)]
pub fn encoding(&self) -> Option<&Encoding> {
self.value.as_ref().map(|v| &v.1)
}
pub fn attachment(&self) -> Option<&ZBytes> {
self.attachment.as_ref()
}
pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> {
self.attachment.as_mut()
}
#[zenoh_macros::unstable]
#[inline]
pub fn source_info(&self) -> Option<&SourceInfo> {
self.inner.source_info.as_ref()
}
#[inline(always)]
#[zenoh_macros::internal]
pub fn reply_sample(&self, sample: Sample) -> ReplySample<'_> {
ReplySample {
query: self,
sample,
}
}
#[inline(always)]
pub fn reply<'b, TryIntoKeyExpr, IntoZBytes>(
&self,
key_expr: TryIntoKeyExpr,
payload: IntoZBytes,
) -> ReplyBuilder<'_, 'b, ReplyBuilderPut>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
IntoZBytes: Into<ZBytes>,
{
ReplyBuilder::<'_, 'b, ReplyBuilderPut>::new(self, key_expr, payload)
}
#[inline(always)]
pub fn reply_err<IntoZBytes>(&self, payload: IntoZBytes) -> ReplyErrBuilder<'_>
where
IntoZBytes: Into<ZBytes>,
{
ReplyErrBuilder::new(self, payload)
}
#[inline(always)]
pub fn reply_del<'b, TryIntoKeyExpr>(
&self,
key_expr: TryIntoKeyExpr,
) -> ReplyBuilder<'_, 'b, ReplyBuilderDelete>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
ReplyBuilder::<'_, 'b, ReplyBuilderDelete>::new(self, key_expr)
}
pub fn accepts_replies(&self) -> ReplyKeyExpr {
if self._accepts_any_replies() {
ReplyKeyExpr::Any
} else {
ReplyKeyExpr::MatchingQuery
}
}
fn _accepts_any_replies(&self) -> bool {
self.parameters().contains_key(REPLY_KEY_EXPR_ANY_SEL_PARAM)
}
#[zenoh_macros::internal]
pub fn empty() -> Self {
Query {
inner: Arc::new(QueryInner::empty()),
eid: 0,
value: None,
attachment: None,
}
}
#[inline(always)]
#[zenoh_macros::unstable]
pub fn priority(&self) -> crate::qos::Priority {
self.inner.qos.priority()
}
#[inline(always)]
#[zenoh_macros::unstable]
pub fn congestion_control(&self) -> crate::qos::CongestionControl {
self.inner.qos.congestion_control()
}
#[inline(always)]
#[zenoh_macros::unstable]
pub fn express(&self) -> bool {
self.inner.qos.express()
}
}
impl fmt::Debug for Query {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Query")
.field("key_selector", &self.inner.key_expr)
.field("parameters", &self.inner.parameters)
.finish()
}
}
impl fmt::Display for Query {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Query")
.field(
"selector",
&format!("{}{}", &self.inner.key_expr, &self.inner.parameters),
)
.finish()
}
}
impl CallbackParameter for Query {
type Message<'a> = Self;
fn from_message(msg: Self::Message<'_>) -> Self {
msg
}
}
#[zenoh_macros::internal]
pub struct ReplySample<'a> {
query: &'a Query,
sample: Sample,
}
#[zenoh_macros::internal]
impl Resolvable for ReplySample<'_> {
type To = ZResult<()>;
}
#[zenoh_macros::internal]
impl Wait for ReplySample<'_> {
fn wait(self) -> <Self as Resolvable>::To {
self.query._reply_sample(self.sample)
}
}
#[zenoh_macros::internal]
impl IntoFuture for ReplySample<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl Query {
pub(crate) fn _reply_sample(&self, sample: Sample) -> ZResult<()> {
if !self._accepts_any_replies() && !self.key_expr().intersects(&sample.key_expr) {
bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.key_expr())
}
#[cfg(not(feature = "unstable"))]
let ext_sinfo = None;
#[cfg(feature = "unstable")]
let ext_sinfo = sample.source_info.map(Into::into);
self.inner.primitives.send_response(&mut Response {
rid: self.inner.qid,
wire_expr: self.inner.primitives.keyexpr_to_wire(&sample.key_expr),
payload: ResponseBody::Reply(zenoh::Reply {
consolidation: zenoh::ConsolidationMode::DEFAULT,
ext_unknown: vec![],
payload: match sample.kind {
SampleKind::Put => ReplyBody::Put(Put {
timestamp: sample.timestamp,
encoding: sample.encoding.into(),
ext_sinfo,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: sample.attachment.map(|a| a.into()),
ext_unknown: vec![],
payload: sample.payload.into(),
}),
SampleKind::Delete => ReplyBody::Del(Del {
timestamp: sample.timestamp,
ext_sinfo,
ext_attachment: sample.attachment.map(|a| a.into()),
ext_unknown: vec![],
}),
},
}),
ext_qos: sample.qos.into(),
ext_tstamp: None,
ext_respid: Some(response::ext::ResponderIdType {
zid: self.inner.zid,
eid: self.eid,
}),
});
Ok(())
}
}
pub(crate) struct QueryableState {
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) complete: bool,
pub(crate) origin: Locality,
pub(crate) callback: Callback<Query>,
}
impl fmt::Debug for QueryableState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Queryable")
.field("id", &self.id)
.field("key_expr", &self.key_expr)
.field("complete", &self.complete)
.finish()
}
}
#[derive(Debug)]
pub(crate) struct QueryableInner {
pub(crate) session: WeakSession,
pub(crate) id: Id,
pub(crate) undeclare_on_drop: bool,
pub(crate) key_expr: KeyExpr<'static>,
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct QueryableUndeclaration<Handler> {
queryable: Queryable<Handler>,
wait_callbacks: bool,
}
impl<Handler> QueryableUndeclaration<Handler> {
#[zenoh_macros::internal_or_unstable]
pub fn wait_callbacks(mut self) -> Self {
self.wait_callbacks = true;
self
}
}
impl<Handler> Resolvable for QueryableUndeclaration<Handler> {
type To = ZResult<()>;
}
impl<Handler> Wait for QueryableUndeclaration<Handler> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.queryable.undeclare_impl()?;
if self.wait_callbacks {
self.queryable.callback_sync_group.wait();
}
Ok(())
}
}
impl<Handler> IntoFuture for QueryableUndeclaration<Handler> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct Queryable<Handler> {
pub(crate) inner: QueryableInner,
pub(crate) handler: Handler,
pub(crate) callback_sync_group: SyncGroup,
}
impl<Handler> Queryable<Handler> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.inner.session.zid().into(),
eid: self.inner.id,
}
.into()
}
pub fn handler(&self) -> &Handler {
&self.handler
}
pub fn handler_mut(&mut self) -> &mut Handler {
&mut self.handler
}
#[inline]
pub fn undeclare(self) -> QueryableUndeclaration<Handler>
where
Handler: Send,
{
UndeclarableSealed::undeclare_inner(self, ())
}
fn undeclare_impl(&mut self) -> ZResult<()> {
self.inner.undeclare_on_drop = false;
self.inner.session.close_queryable(self.inner.id)
}
#[zenoh_macros::internal]
pub fn set_background(&mut self, background: bool) {
self.inner.undeclare_on_drop = !background;
}
#[inline]
pub fn key_expr(&self) -> &KeyExpr<'static> {
&self.inner.key_expr
}
}
impl<Handler> Drop for Queryable<Handler> {
fn drop(&mut self) {
if self.inner.undeclare_on_drop {
if let Err(error) = self.undeclare_impl() {
error!(error);
}
}
}
}
impl<Handler: Send> UndeclarableSealed<()> for Queryable<Handler> {
type Undeclaration = QueryableUndeclaration<Handler>;
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
QueryableUndeclaration {
queryable: self,
wait_callbacks: false,
}
}
}
impl<Handler> Deref for Queryable<Handler> {
type Target = Handler;
fn deref(&self) -> &Self::Target {
self.handler()
}
}
impl<Handler> DerefMut for Queryable<Handler> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.handler_mut()
}
}