Skip to main content

zenoh/api/builders/
info_links.rs

1//
2// Copyright (c) 2025 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15#[zenoh_macros::unstable]
16use std::future::{IntoFuture, Ready};
17
18#[zenoh_macros::unstable]
19use tracing::error;
20#[zenoh_macros::unstable]
21use zenoh_core::{Resolvable, Wait};
22#[zenoh_macros::unstable]
23use zenoh_result::ZResult;
24
25#[zenoh_macros::unstable]
26use crate::api::handlers::locked;
27#[zenoh_macros::unstable]
28use crate::api::info::Transport;
29#[zenoh_macros::unstable]
30use crate::api::info::{Link, LinkEvent};
31#[zenoh_macros::unstable]
32use crate::api::Id;
33#[zenoh_macros::unstable]
34use crate::{
35    api::cancellation::SyncGroup,
36    api::session::{UndeclarableSealed, WeakSession},
37    handlers::{Callback, DefaultHandler, IntoHandler},
38};
39
40/// A builder returned by [`SessionInfo::links()`](crate::session::SessionInfo::links).
41///
42/// The builder creates an iterator over the established [`Link`](crate::session::Link)s
43/// within the session.
44/// Multiple [`Link`](crate::session::Link) can be established between two zenoh nodes within
45/// the same [`Transport`](crate::session::Transport).
46/// By default all links are returned, but you can filter them by transport using
47/// the [`transport()`](LinksBuilder::transport) method.
48///
49/// # Examples
50/// ```no_run
51/// # #[tokio::main]
52/// # async fn main() {
53/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
54/// let links = session.info().links().await;
55/// for link in links {
56///     println!("Link: {} -> {}", link.src(), link.dst());
57/// }
58/// # }
59/// ```
60#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
61#[zenoh_macros::unstable]
62pub struct LinksBuilder<'a> {
63    session: &'a WeakSession,
64    transport: Option<Transport>,
65}
66
67#[zenoh_macros::unstable]
68impl<'a> LinksBuilder<'a> {
69    pub(crate) fn new(session: &'a WeakSession) -> Self {
70        Self {
71            session,
72            transport: None,
73        }
74    }
75
76    /// Filter links by transport.
77    ///
78    /// # Examples
79    /// ```no_run
80    /// # #[tokio::main]
81    /// # async fn main() {
82    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
83    /// let transports = session.info().transports().await;
84    /// if let Some(transport) = transports.into_iter().next() {
85    ///     let links = session.info().links().transport(transport).await;
86    ///     for link in links {
87    ///         println!("Link: {} -> {}", link.src(), link.dst());
88    ///     }
89    /// }
90    /// # }
91    /// ```
92    pub fn transport(mut self, transport: Transport) -> Self {
93        self.transport = Some(transport);
94        self
95    }
96}
97
98#[zenoh_macros::unstable]
99impl Resolvable for LinksBuilder<'_> {
100    type To = Box<dyn Iterator<Item = Link> + Send + Sync>;
101}
102
103#[zenoh_macros::unstable]
104impl Wait for LinksBuilder<'_> {
105    fn wait(self) -> Self::To {
106        self.session.runtime().get_links(self.transport.as_ref())
107    }
108}
109
110#[zenoh_macros::unstable]
111impl IntoFuture for LinksBuilder<'_> {
112    type Output = <Self as Resolvable>::To;
113    type IntoFuture = Ready<<Self as Resolvable>::To>;
114
115    fn into_future(self) -> Self::IntoFuture {
116        std::future::ready(self.wait())
117    }
118}
119
120#[zenoh_macros::unstable]
121pub(crate) struct LinkEventsListenerInner {
122    pub(crate) session: WeakSession,
123    pub(crate) id: Id,
124    pub(crate) undeclare_on_drop: bool,
125}
126
127#[zenoh_macros::unstable]
128impl std::fmt::Debug for LinkEventsListenerInner {
129    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
130        f.debug_struct("LinkEventsListenerInner")
131            .field("id", &self.id)
132            .field("undeclare_on_drop", &self.undeclare_on_drop)
133            .finish()
134    }
135}
136
137/// A listener that sends notifications when link lifecycle events occur.
138///
139/// Call [`undeclare()`](LinkEventsListener::undeclare) to stop receiving events.
140///
141/// # Examples
142/// ```no_run
143/// # #[tokio::main]
144/// # async fn main() {
145/// use zenoh::sample::SampleKind;
146///
147/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
148/// let listener = session.info()
149///     .link_events_listener()
150///     .history(true)
151///     .with(flume::bounded(32))
152///     .await
153///     .expect("Failed to declare link events listener");
154///
155/// while let Ok(event) = listener.recv_async().await {
156///     match event.kind() {
157///         SampleKind::Put => println!("Link added: {} -> {}",
158///             event.link().src(), event.link().dst()),
159///         SampleKind::Delete => println!("Link removed"),
160///     }
161/// }
162///
163/// // Cleanup
164/// listener.undeclare().await.unwrap();
165/// # }
166/// ```
167#[zenoh_macros::unstable]
168#[derive(Debug)]
169pub struct LinkEventsListener<Handler> {
170    pub(crate) inner: LinkEventsListenerInner,
171    pub(crate) handler: Handler,
172    pub(crate) callback_sync_group: SyncGroup,
173}
174
175#[zenoh_macros::unstable]
176impl<Handler> LinkEventsListener<Handler> {
177    /// Undeclare the listener and stop receiving events.
178    ///
179    /// # Examples
180    /// ```no_run
181    /// # #[tokio::main]
182    /// # async fn main() {
183    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
184    /// let listener = session.info()
185    ///     .link_events_listener()
186    ///     .with(flume::bounded(32))
187    ///     .await
188    ///     .expect("Failed to declare link events listener");
189    /// listener.undeclare().await.unwrap();
190    /// # }
191    /// ```
192    #[inline]
193    pub fn undeclare(self) -> LinkEventsListenerUndeclaration<Handler>
194    where
195        Handler: Send,
196    {
197        self.undeclare_inner(())
198    }
199
200    fn undeclare_impl(&mut self) -> ZResult<()> {
201        // Set flag first to avoid double panic
202        self.inner.undeclare_on_drop = false;
203        // Call session's undeclare method with the stored id
204        self.inner
205            .session
206            .undeclare_transport_links_listener_inner(self.inner.id)
207    }
208
209    /// Returns a reference to this listener's handler.
210    /// A handler is anything that implements [`IntoHandler`](crate::handlers::IntoHandler).
211    /// The default handler is [`DefaultHandler`](crate::handlers::DefaultHandler).
212    pub fn handler(&self) -> &Handler {
213        &self.handler
214    }
215
216    /// Returns a mutable reference to this listener's handler.
217    /// A handler is anything that implements [`IntoHandler`](crate::handlers::IntoHandler).
218    /// The default handler is [`DefaultHandler`](crate::handlers::DefaultHandler).
219    pub fn handler_mut(&mut self) -> &mut Handler {
220        &mut self.handler
221    }
222
223    #[zenoh_macros::internal]
224    pub fn set_background(&mut self, background: bool) {
225        self.inner.undeclare_on_drop = !background;
226    }
227}
228
229#[zenoh_macros::unstable]
230impl<Handler> Drop for LinkEventsListener<Handler> {
231    fn drop(&mut self) {
232        if self.inner.undeclare_on_drop {
233            if let Err(error) = self.undeclare_impl() {
234                error!(error);
235            }
236        }
237    }
238}
239
240#[zenoh_macros::unstable]
241impl<Handler: Send> UndeclarableSealed<()> for LinkEventsListener<Handler> {
242    type Undeclaration = LinkEventsListenerUndeclaration<Handler>;
243
244    fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
245        LinkEventsListenerUndeclaration {
246            listener: self,
247            wait_callbacks: false,
248        }
249    }
250}
251
252#[zenoh_macros::unstable]
253impl<Handler> std::ops::Deref for LinkEventsListener<Handler> {
254    type Target = Handler;
255
256    fn deref(&self) -> &Self::Target {
257        &self.handler
258    }
259}
260
261#[zenoh_macros::unstable]
262impl<Handler> std::ops::DerefMut for LinkEventsListener<Handler> {
263    fn deref_mut(&mut self) -> &mut Self::Target {
264        &mut self.handler
265    }
266}
267
268/// A [`Resolvable`] returned by [`LinkEventsListener::undeclare`]
269#[zenoh_macros::unstable]
270pub struct LinkEventsListenerUndeclaration<Handler> {
271    listener: LinkEventsListener<Handler>,
272    wait_callbacks: bool,
273}
274
275#[zenoh_macros::unstable]
276impl<Handler> Resolvable for LinkEventsListenerUndeclaration<Handler> {
277    type To = ZResult<()>;
278}
279
280#[zenoh_macros::unstable]
281impl<Handler> LinkEventsListenerUndeclaration<Handler> {
282    #[zenoh_macros::internal_or_unstable]
283    /// Block in undeclare operation until all currently running instances of link events listener callback (if any) return.
284    pub fn wait_callbacks(mut self) -> Self {
285        self.wait_callbacks = true;
286        self
287    }
288}
289
290#[zenoh_macros::unstable]
291impl<Handler> Wait for LinkEventsListenerUndeclaration<Handler> {
292    fn wait(mut self) -> <Self as Resolvable>::To {
293        self.listener.undeclare_impl()?;
294        if self.wait_callbacks {
295            self.listener.callback_sync_group.wait();
296        }
297        Ok(())
298    }
299}
300
301#[zenoh_macros::unstable]
302impl<Handler> IntoFuture for LinkEventsListenerUndeclaration<Handler> {
303    type Output = <Self as Resolvable>::To;
304    type IntoFuture = Ready<<Self as Resolvable>::To>;
305
306    fn into_future(self) -> Self::IntoFuture {
307        std::future::ready(self.wait())
308    }
309}
310
311/// A builder returned by [`SessionInfo::link_events_listener()`](crate::session::SessionInfo::link_events_listener).
312///
313/// The builder creates a [`LinkEventsListener`](crate::session::LinkEventsListener) which sends notifications
314/// when new [`Link`](crate::session::Link) are created or removed.
315///
316/// # Examples
317/// ```no_run
318/// # #[tokio::main]
319/// # async fn main() {
320/// use zenoh::sample::SampleKind;
321///
322/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
323/// let listener = session.info()
324///     .link_events_listener()
325///     .history(true)
326///     .with(flume::bounded(32))
327///     .await
328///     .expect("Failed to declare link events listener");
329///
330/// while let Ok(event) = listener.recv_async().await {
331///     match event.kind() {
332///         SampleKind::Put => println!("Link added: {} -> {}",
333///             event.link().src(), event.link().dst()),
334///         SampleKind::Delete => println!("Link removed"),
335///     }
336/// }
337/// # }
338/// ```
339#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
340#[zenoh_macros::unstable]
341pub struct LinkEventsListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
342    session: &'a WeakSession,
343    handler: Handler,
344    history: bool,
345    transport: Option<Transport>,
346}
347
348#[zenoh_macros::unstable]
349impl<'a> LinkEventsListenerBuilder<'a, DefaultHandler> {
350    pub(crate) fn new(session: &'a WeakSession) -> Self {
351        Self {
352            session,
353            handler: DefaultHandler::default(),
354            history: false,
355            transport: None,
356        }
357    }
358}
359
360#[zenoh_macros::unstable]
361impl<'a, Handler> LinkEventsListenerBuilder<'a, Handler> {
362    /// Enable history.
363    ///
364    /// Send events for existing links before live events.
365    pub fn history(mut self, enabled: bool) -> Self {
366        self.history = enabled;
367        self
368    }
369
370    /// Use a custom handler (channel, callback, etc.)
371    pub fn with<H>(self, handler: H) -> LinkEventsListenerBuilder<'a, H>
372    where
373        H: IntoHandler<LinkEvent>,
374    {
375        LinkEventsListenerBuilder {
376            session: self.session,
377            handler,
378            history: self.history,
379            transport: self.transport,
380        }
381    }
382
383    /// Filter link events by transport.
384    ///
385    /// # Examples
386    /// ```no_run
387    /// # #[tokio::main]
388    /// # async fn main() {
389    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
390    /// let transports = session.info().transports().await;
391    /// if let Some(transport) = transports.into_iter().next() {
392    ///     let listener = session.info()
393    ///         .link_events_listener()
394    ///         .transport(transport)
395    ///         .with(flume::bounded(32))
396    ///         .await;
397    /// }
398    /// # }
399    /// ```
400    pub fn transport(mut self, transport: Transport) -> Self {
401        self.transport = Some(transport);
402        self
403    }
404
405    /// Provide a callback to handle events
406    pub fn callback<F>(self, callback: F) -> LinkEventsListenerBuilder<'a, Callback<LinkEvent>>
407    where
408        F: Fn(LinkEvent) + Send + Sync + 'static,
409    {
410        self.with(Callback::from(callback))
411    }
412
413    /// Provide a mutable callback which is never called concurrently. If the callback can be accepted by
414    /// [`callback`](Self::callback), prefer using that instead for better performance.
415    pub fn callback_mut<F>(self, callback: F) -> LinkEventsListenerBuilder<'a, Callback<LinkEvent>>
416    where
417        F: FnMut(LinkEvent) + Send + Sync + 'static,
418    {
419        self.callback(locked(callback))
420    }
421}
422
423#[zenoh_macros::unstable]
424impl<'a> LinkEventsListenerBuilder<'a, Callback<LinkEvent>> {
425    /// Run the listener in the background, automatically dropping the handler when done.
426    ///
427    /// # Examples
428    /// ```no_run
429    /// # #[tokio::main]
430    /// # async fn main() {
431    /// use zenoh::sample::SampleKind;
432    ///
433    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
434    /// // no need to assign and keep a variable for a background listener
435    /// session.info()
436    ///     .link_events_listener()
437    ///     .callback(|event| {
438    ///         match event.kind() {
439    ///             SampleKind::Put => println!("Link added: {} -> {}",
440    ///                 event.link().src(), event.link().dst()),
441    ///             SampleKind::Delete => println!("Link removed"),
442    ///         }
443    ///     })
444    ///     .background()
445    ///     .await
446    ///     .unwrap();
447    /// # }
448    /// ```
449    pub fn background(self) -> LinkEventsListenerBuilder<'a, Callback<LinkEvent>, true> {
450        LinkEventsListenerBuilder {
451            session: self.session,
452            handler: self.handler,
453            history: self.history,
454            transport: self.transport,
455        }
456    }
457}
458
459#[zenoh_macros::unstable]
460impl<Handler> Resolvable for LinkEventsListenerBuilder<'_, Handler>
461where
462    Handler: IntoHandler<LinkEvent> + Send,
463    Handler::Handler: Send,
464{
465    type To = ZResult<LinkEventsListener<Handler::Handler>>;
466}
467
468#[zenoh_macros::unstable]
469impl<Handler> Wait for LinkEventsListenerBuilder<'_, Handler>
470where
471    Handler: IntoHandler<LinkEvent> + Send,
472    Handler::Handler: Send,
473{
474    fn wait(self) -> Self::To {
475        let callback_sync_group = SyncGroup::default();
476        let (callback, handler) = self.handler.into_handler();
477        let state = self.session.declare_transport_links_listener_inner(
478            callback,
479            self.history,
480            self.transport,
481            callback_sync_group.notifier(),
482        )?;
483
484        Ok(LinkEventsListener {
485            inner: LinkEventsListenerInner {
486                session: self.session.clone(),
487                id: state.id,
488                undeclare_on_drop: true,
489            },
490            handler,
491            callback_sync_group,
492        })
493    }
494}
495
496#[zenoh_macros::unstable]
497impl<Handler> IntoFuture for LinkEventsListenerBuilder<'_, Handler>
498where
499    Handler: IntoHandler<LinkEvent> + Send,
500    Handler::Handler: Send,
501{
502    type Output = <Self as Resolvable>::To;
503    type IntoFuture = Ready<<Self as Resolvable>::To>;
504
505    fn into_future(self) -> Self::IntoFuture {
506        std::future::ready(self.wait())
507    }
508}
509
510#[zenoh_macros::unstable]
511impl Resolvable for LinkEventsListenerBuilder<'_, Callback<LinkEvent>, true> {
512    type To = ZResult<()>;
513}
514
515#[zenoh_macros::unstable]
516impl Wait for LinkEventsListenerBuilder<'_, Callback<LinkEvent>, true> {
517    fn wait(self) -> <Self as Resolvable>::To {
518        let state = self.session.declare_transport_links_listener_inner(
519            self.handler,
520            self.history,
521            self.transport,
522            None,
523        )?;
524        // Set the listener to not undeclare on drop (background mode)
525        // Note: We can't access the listener to set background flag, so we just don't keep a reference
526        // The listener will live until explicitly undeclared or session closes
527        drop(state);
528        Ok(())
529    }
530}
531
532#[zenoh_macros::unstable]
533impl IntoFuture for LinkEventsListenerBuilder<'_, Callback<LinkEvent>, true> {
534    type Output = <Self as Resolvable>::To;
535    type IntoFuture = Ready<<Self as Resolvable>::To>;
536
537    fn into_future(self) -> Self::IntoFuture {
538        std::future::ready(self.wait())
539    }
540}