#[zenoh_macros::unstable]
use std::future::{IntoFuture, Ready};
#[zenoh_macros::unstable]
use tracing::error;
#[zenoh_macros::unstable]
use zenoh_core::{Resolvable, Wait};
#[zenoh_macros::unstable]
use zenoh_result::ZResult;
#[zenoh_macros::unstable]
use crate::api::handlers::locked;
#[zenoh_macros::unstable]
use crate::api::info::Transport;
#[zenoh_macros::unstable]
use crate::api::info::{Link, LinkEvent};
#[zenoh_macros::unstable]
use crate::api::Id;
#[zenoh_macros::unstable]
use crate::{
api::cancellation::SyncGroup,
api::session::{UndeclarableSealed, WeakSession},
handlers::{Callback, DefaultHandler, IntoHandler},
};
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[zenoh_macros::unstable]
pub struct LinksBuilder<'a> {
session: &'a WeakSession,
transport: Option<Transport>,
}
#[zenoh_macros::unstable]
impl<'a> LinksBuilder<'a> {
pub(crate) fn new(session: &'a WeakSession) -> Self {
Self {
session,
transport: None,
}
}
pub fn transport(mut self, transport: Transport) -> Self {
self.transport = Some(transport);
self
}
}
#[zenoh_macros::unstable]
impl Resolvable for LinksBuilder<'_> {
type To = Box<dyn Iterator<Item = Link> + Send + Sync>;
}
#[zenoh_macros::unstable]
impl Wait for LinksBuilder<'_> {
fn wait(self) -> Self::To {
self.session.runtime().get_links(self.transport.as_ref())
}
}
#[zenoh_macros::unstable]
impl IntoFuture for LinksBuilder<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[zenoh_macros::unstable]
pub(crate) struct LinkEventsListenerInner {
pub(crate) session: WeakSession,
pub(crate) id: Id,
pub(crate) undeclare_on_drop: bool,
}
#[zenoh_macros::unstable]
impl std::fmt::Debug for LinkEventsListenerInner {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("LinkEventsListenerInner")
.field("id", &self.id)
.field("undeclare_on_drop", &self.undeclare_on_drop)
.finish()
}
}
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct LinkEventsListener<Handler> {
pub(crate) inner: LinkEventsListenerInner,
pub(crate) handler: Handler,
pub(crate) callback_sync_group: SyncGroup,
}
#[zenoh_macros::unstable]
impl<Handler> LinkEventsListener<Handler> {
#[inline]
pub fn undeclare(self) -> LinkEventsListenerUndeclaration<Handler>
where
Handler: Send,
{
self.undeclare_inner(())
}
fn undeclare_impl(&mut self) -> ZResult<()> {
self.inner.undeclare_on_drop = false;
self.inner
.session
.undeclare_transport_links_listener_inner(self.inner.id)
}
pub fn handler(&self) -> &Handler {
&self.handler
}
pub fn handler_mut(&mut self) -> &mut Handler {
&mut self.handler
}
#[zenoh_macros::internal]
pub fn set_background(&mut self, background: bool) {
self.inner.undeclare_on_drop = !background;
}
}
#[zenoh_macros::unstable]
impl<Handler> Drop for LinkEventsListener<Handler> {
fn drop(&mut self) {
if self.inner.undeclare_on_drop {
if let Err(error) = self.undeclare_impl() {
error!(error);
}
}
}
}
#[zenoh_macros::unstable]
impl<Handler: Send> UndeclarableSealed<()> for LinkEventsListener<Handler> {
type Undeclaration = LinkEventsListenerUndeclaration<Handler>;
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
LinkEventsListenerUndeclaration {
listener: self,
wait_callbacks: false,
}
}
}
#[zenoh_macros::unstable]
impl<Handler> std::ops::Deref for LinkEventsListener<Handler> {
type Target = Handler;
fn deref(&self) -> &Self::Target {
&self.handler
}
}
#[zenoh_macros::unstable]
impl<Handler> std::ops::DerefMut for LinkEventsListener<Handler> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.handler
}
}
#[zenoh_macros::unstable]
pub struct LinkEventsListenerUndeclaration<Handler> {
listener: LinkEventsListener<Handler>,
wait_callbacks: bool,
}
#[zenoh_macros::unstable]
impl<Handler> Resolvable for LinkEventsListenerUndeclaration<Handler> {
type To = ZResult<()>;
}
#[zenoh_macros::unstable]
impl<Handler> LinkEventsListenerUndeclaration<Handler> {
#[zenoh_macros::internal_or_unstable]
pub fn wait_callbacks(mut self) -> Self {
self.wait_callbacks = true;
self
}
}
#[zenoh_macros::unstable]
impl<Handler> Wait for LinkEventsListenerUndeclaration<Handler> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.listener.undeclare_impl()?;
if self.wait_callbacks {
self.listener.callback_sync_group.wait();
}
Ok(())
}
}
#[zenoh_macros::unstable]
impl<Handler> IntoFuture for LinkEventsListenerUndeclaration<Handler> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[zenoh_macros::unstable]
pub struct LinkEventsListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
session: &'a WeakSession,
handler: Handler,
history: bool,
transport: Option<Transport>,
}
#[zenoh_macros::unstable]
impl<'a> LinkEventsListenerBuilder<'a, DefaultHandler> {
pub(crate) fn new(session: &'a WeakSession) -> Self {
Self {
session,
handler: DefaultHandler::default(),
history: false,
transport: None,
}
}
}
#[zenoh_macros::unstable]
impl<'a, Handler> LinkEventsListenerBuilder<'a, Handler> {
pub fn history(mut self, enabled: bool) -> Self {
self.history = enabled;
self
}
pub fn with<H>(self, handler: H) -> LinkEventsListenerBuilder<'a, H>
where
H: IntoHandler<LinkEvent>,
{
LinkEventsListenerBuilder {
session: self.session,
handler,
history: self.history,
transport: self.transport,
}
}
pub fn transport(mut self, transport: Transport) -> Self {
self.transport = Some(transport);
self
}
pub fn callback<F>(self, callback: F) -> LinkEventsListenerBuilder<'a, Callback<LinkEvent>>
where
F: Fn(LinkEvent) + Send + Sync + 'static,
{
self.with(Callback::from(callback))
}
pub fn callback_mut<F>(self, callback: F) -> LinkEventsListenerBuilder<'a, Callback<LinkEvent>>
where
F: FnMut(LinkEvent) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
}
#[zenoh_macros::unstable]
impl<'a> LinkEventsListenerBuilder<'a, Callback<LinkEvent>> {
pub fn background(self) -> LinkEventsListenerBuilder<'a, Callback<LinkEvent>, true> {
LinkEventsListenerBuilder {
session: self.session,
handler: self.handler,
history: self.history,
transport: self.transport,
}
}
}
#[zenoh_macros::unstable]
impl<Handler> Resolvable for LinkEventsListenerBuilder<'_, Handler>
where
Handler: IntoHandler<LinkEvent> + Send,
Handler::Handler: Send,
{
type To = ZResult<LinkEventsListener<Handler::Handler>>;
}
#[zenoh_macros::unstable]
impl<Handler> Wait for LinkEventsListenerBuilder<'_, Handler>
where
Handler: IntoHandler<LinkEvent> + Send,
Handler::Handler: Send,
{
fn wait(self) -> Self::To {
let callback_sync_group = SyncGroup::default();
let (callback, handler) = self.handler.into_handler();
let state = self.session.declare_transport_links_listener_inner(
callback,
self.history,
self.transport,
callback_sync_group.notifier(),
)?;
Ok(LinkEventsListener {
inner: LinkEventsListenerInner {
session: self.session.clone(),
id: state.id,
undeclare_on_drop: true,
},
handler,
callback_sync_group,
})
}
}
#[zenoh_macros::unstable]
impl<Handler> IntoFuture for LinkEventsListenerBuilder<'_, Handler>
where
Handler: IntoHandler<LinkEvent> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[zenoh_macros::unstable]
impl Resolvable for LinkEventsListenerBuilder<'_, Callback<LinkEvent>, true> {
type To = ZResult<()>;
}
#[zenoh_macros::unstable]
impl Wait for LinkEventsListenerBuilder<'_, Callback<LinkEvent>, true> {
fn wait(self) -> <Self as Resolvable>::To {
let state = self.session.declare_transport_links_listener_inner(
self.handler,
self.history,
self.transport,
None,
)?;
drop(state);
Ok(())
}
}
#[zenoh_macros::unstable]
impl IntoFuture for LinkEventsListenerBuilder<'_, Callback<LinkEvent>, true> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}