use std::{
fmt,
future::{IntoFuture, Ready},
ops::{Deref, DerefMut},
};
use tracing::error;
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
#[cfg(feature = "unstable")]
use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto};
use crate::api::{
cancellation::SyncGroup,
handlers::Callback,
key_expr::KeyExpr,
sample::{Locality, Sample},
session::{UndeclarableSealed, WeakSession},
Id,
};
pub(crate) struct SubscriberState {
pub(crate) id: Id,
pub(crate) remote_id: Id,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) origin: Locality,
pub(crate) callback: Callback<Sample>,
pub(crate) history: bool,
}
impl fmt::Debug for SubscriberState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Subscriber")
.field("id", &self.id)
.field("key_expr", &self.key_expr)
.finish()
}
}
#[derive(Debug)]
pub(crate) struct SubscriberInner {
pub(crate) session: WeakSession,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) kind: SubscriberKind,
pub(crate) undeclare_on_drop: bool,
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct SubscriberUndeclaration<Handler> {
subscriber: Subscriber<Handler>,
wait_callbacks: bool,
}
impl<Handler> SubscriberUndeclaration<Handler> {
#[zenoh_macros::internal_or_unstable]
pub fn wait_callbacks(mut self) -> Self {
self.wait_callbacks = true;
self
}
}
impl<Handler> Resolvable for SubscriberUndeclaration<Handler> {
type To = ZResult<()>;
}
impl<Handler> Wait for SubscriberUndeclaration<Handler> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.subscriber.undeclare_impl()?;
if self.wait_callbacks {
self.subscriber.callback_sync_group.wait();
}
Ok(())
}
}
impl<Handler> IntoFuture for SubscriberUndeclaration<Handler> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct Subscriber<Handler> {
pub(crate) inner: SubscriberInner,
pub(crate) handler: Handler,
pub(crate) callback_sync_group: SyncGroup,
}
impl<Handler> Subscriber<Handler> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.inner.session.zid().into(),
eid: self.inner.id,
}
.into()
}
pub fn key_expr(&self) -> &KeyExpr<'static> {
&self.inner.key_expr
}
pub fn handler(&self) -> &Handler {
&self.handler
}
pub fn handler_mut(&mut self) -> &mut Handler {
&mut self.handler
}
#[inline]
pub fn undeclare(self) -> SubscriberUndeclaration<Handler>
where
Handler: Send,
{
self.undeclare_inner(())
}
fn undeclare_impl(&mut self) -> ZResult<()> {
self.inner.undeclare_on_drop = false;
self.inner
.session
.undeclare_subscriber_inner(self.inner.id, self.inner.kind)?;
Ok(())
}
#[zenoh_macros::internal]
pub fn set_background(&mut self, background: bool) {
self.inner.undeclare_on_drop = !background;
}
#[zenoh_macros::internal]
pub fn session(&self) -> &WeakSession {
&self.inner.session
}
}
impl<Handler> Drop for Subscriber<Handler> {
fn drop(&mut self) {
if self.inner.undeclare_on_drop {
if let Err(error) = self.undeclare_impl() {
error!(error);
}
}
}
}
impl<Handler: Send> UndeclarableSealed<()> for Subscriber<Handler> {
type Undeclaration = SubscriberUndeclaration<Handler>;
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
SubscriberUndeclaration {
subscriber: self,
wait_callbacks: false,
}
}
}
impl<Handler> Deref for Subscriber<Handler> {
type Target = Handler;
fn deref(&self) -> &Self::Target {
self.handler()
}
}
impl<Handler> DerefMut for Subscriber<Handler> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.handler_mut()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SubscriberKind {
Subscriber,
LivelinessSubscriber,
}