up_transport_zenoh/
lib.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14/*!
15This crate provides an implementation of the Eclipse Zenoh ™ uProtocol Transport.
16The transport uses Zenoh's publish-subscribe mechanism to exchange messages. It is
17designed to be used in conjunction with the [up-rust](https://crates.io/crates/up_rust)
18crate, which provides the uProtocol message types and utilities.
19
20The transport is designed to run in the context of a [tokio `Runtime`] which
21needs to be configured outside of the transport according to the
22processing requirements of the use case at hand. The transport does
23not make any implicit assumptions about the number of threads available
24and does not spawn any threads itself.
25
26[tokio `Runtime`]: https://docs.rs/tokio/latest/tokio/runtime/index.html
27*/
28
29mod listener_registry;
30pub(crate) mod utransport;
31
32use std::sync::Arc;
33
34use listener_registry::ListenerRegistry;
35use tracing::error;
36use up_rust::{UCode, UStatus, UUri};
37use zenoh::{Config, Session};
38// Re-export Zenoh config
39pub use zenoh::config as zenoh_config;
40
41const UPROTOCOL_MAJOR_VERSION: u8 = 1;
42const DEFAULT_MAX_LISTENERS: usize = 100;
43
44/// An Eclipse Zenoh ™ based uProtocol transport implementation.
45///
46/// The transport registers callbacks on the Zenoh runtime for listeners that
47/// are being registered using `up_rust::UTransport::register_listener`.
48///
49/// <div class="warning">
50///
51/// The registered listeners are being invoked sequentially on the **same thread**
52/// that the callback is being executed on. Implementers of listeners are therefore
53/// **strongly advised** to move non-trivial processing logic to **another/dedicated
54/// thread**, if necessary. Please refer to `subscriber` and `notification_receiver`
55/// in the examples directory for how this can be done.
56///
57/// </div>
58pub struct UPTransportZenoh {
59    session: Arc<Session>,
60    subscribers: ListenerRegistry,
61    local_authority: String,
62}
63
64impl UPTransportZenoh {
65    /// Gets a builder for creating a new Zenoh transport.
66    ///
67    /// # Arguments
68    ///
69    /// * `local_uri` - The URI identifying the (local) uEntity that the transport runs on.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the URI contains an empty or wildcard authority name
74    /// or has a non-zero resource ID.
75    pub fn builder<U: Into<String>>(
76        local_authority: U,
77    ) -> Result<UPTransportZenohBuilder<InitialBuilderState>, UStatus> {
78        let authority_name = local_authority.into();
79        if authority_name.is_empty() || &authority_name == "*" {
80            return Err(UStatus::fail_with_code(
81                UCode::INVALID_ARGUMENT,
82                "Authority name must be non-empty and must not be the wildcard authority name",
83            ));
84        }
85
86        UUri::verify_authority(&authority_name).map_err(|err| {
87            UStatus::fail_with_code(
88                UCode::INVALID_ARGUMENT,
89                format!("Invalid authority name: {err}"),
90            )
91        })?;
92
93        Ok(UPTransportZenohBuilder {
94            common: Box::new(CommonProperties {
95                local_authority: authority_name,
96                max_listeners: DEFAULT_MAX_LISTENERS,
97            }),
98            extra: InitialBuilderState,
99        })
100    }
101
102    async fn init_with_config(
103        config: Config,
104        local_authority: String,
105        max_listeners: usize,
106    ) -> Result<UPTransportZenoh, UStatus> {
107        let session = zenoh::open(config).await.map_err(|err| {
108            let msg = "Failed to open Zenoh session";
109            error!("{msg}: {err}");
110            UStatus::fail_with_code(UCode::INTERNAL, msg)
111        })?;
112        Ok(Self::init_with_session(
113            session,
114            local_authority,
115            max_listeners,
116        ))
117    }
118
119    fn init_with_session(
120        session: Session,
121        local_authority: String,
122        max_listeners: usize,
123    ) -> UPTransportZenoh {
124        let session_to_use = Arc::new(session);
125        UPTransportZenoh {
126            session: session_to_use.clone(),
127            subscribers: ListenerRegistry::new(session_to_use, max_listeners),
128            local_authority,
129        }
130    }
131
132    /// Enables a tracing formatter subscriber that is initialized from the `RUST_LOG` environment variable.
133    pub fn try_init_log_from_env() {
134        zenoh::init_log_from_env_or("");
135    }
136}
137
138struct CommonProperties {
139    local_authority: String,
140    max_listeners: usize,
141}
142
143pub struct InitialBuilderState;
144pub struct ConfigBuilderState {
145    config: zenoh_config::Config,
146}
147pub struct ConfigPathBuilderState {
148    config_path: String,
149}
150
151pub struct SessionBuilderState {
152    zenoh_session: Session,
153}
154
155pub trait BuilderState {}
156impl BuilderState for InitialBuilderState {}
157impl BuilderState for ConfigBuilderState {}
158impl BuilderState for ConfigPathBuilderState {}
159impl BuilderState for SessionBuilderState {}
160
161pub struct UPTransportZenohBuilder<S: BuilderState> {
162    common: Box<CommonProperties>,
163    extra: S,
164}
165
166impl UPTransportZenohBuilder<InitialBuilderState> {
167    /// Sets the Zenoh configuration to use for the transport.
168    ///
169    /// Please refer to the [Zenoh documentation](https://zenoh.io/docs/manual/configuration/) for details.
170    #[must_use]
171    pub fn with_config(
172        self,
173        config: zenoh_config::Config,
174    ) -> UPTransportZenohBuilder<ConfigBuilderState> {
175        UPTransportZenohBuilder {
176            common: self.common,
177            extra: ConfigBuilderState { config },
178        }
179    }
180
181    /// Sets the path to a Zenoh configuration file to use for the transport.
182    ///
183    /// Please refer to the [Zenoh documentation](https://zenoh.io/docs/manual/configuration/) for details.
184    #[must_use]
185    pub fn with_config_path(
186        self,
187        config_path: String,
188    ) -> UPTransportZenohBuilder<ConfigPathBuilderState> {
189        UPTransportZenohBuilder {
190            common: self.common,
191            extra: ConfigPathBuilderState { config_path },
192        }
193    }
194
195    /// Sets an existing Zenoh session to use for the transport.
196    #[must_use]
197    pub fn with_session(
198        self,
199        zenoh_session: Session,
200    ) -> UPTransportZenohBuilder<SessionBuilderState> {
201        UPTransportZenohBuilder {
202            common: self.common,
203            extra: SessionBuilderState { zenoh_session },
204        }
205    }
206}
207
208impl UPTransportZenohBuilder<ConfigBuilderState> {
209    /// Creates the transport based on the provided configuration properties.
210    ///
211    /// # Returns
212    ///
213    /// The newly created transport instance. Note that the builder consumes itself.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the transport cannot be created.
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// #[tokio::main]
223    /// # async fn main() {
224    /// use up_transport_zenoh::{zenoh_config, UPTransportZenoh};
225    ///
226    /// assert!(UPTransportZenoh::builder("local_authority")
227    ///    .expect("Invalid authority name")
228    ///    .with_config(zenoh_config::Config::default())
229    ///    .with_max_listeners(10)
230    ///    .build()
231    ///    .await
232    ///    .is_ok());
233    /// # }
234    /// ```
235    pub async fn build(self) -> Result<UPTransportZenoh, UStatus> {
236        UPTransportZenoh::init_with_config(
237            self.extra.config,
238            self.common.local_authority,
239            self.common.max_listeners,
240        )
241        .await
242    }
243}
244
245impl UPTransportZenohBuilder<ConfigPathBuilderState> {
246    /// Creates the transport based on the provided configuration file.
247    ///
248    /// # Returns
249    ///
250    /// The newly created transport instance. Note that the builder consumes itself.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the transport cannot be created, e.g. because the configuration
255    /// file cannot be read or is invalid.
256    ///
257    /// # Examples
258    ///
259    /// ```
260    /// #[tokio::main]
261    /// # async fn main() {
262    /// use up_transport_zenoh::UPTransportZenoh;
263    ///
264    /// assert!(UPTransportZenoh::builder("local_authority")
265    ///    .expect("Invalid authority name")
266    ///    .with_config_path("non-existing-config.json5".to_string())
267    ///    .build()
268    ///    .await
269    ///    .is_err_and(|e| e.get_code() == up_rust::UCode::INVALID_ARGUMENT));
270    /// # }
271    /// ```
272    pub async fn build(self) -> Result<UPTransportZenoh, UStatus> {
273        let config = zenoh_config::Config::from_file(self.extra.config_path).map_err(|e| {
274            error!("Failed to load Zenoh config from file: {e}");
275            UStatus::fail_with_code(UCode::INVALID_ARGUMENT, e.to_string())
276        })?;
277        UPTransportZenoh::init_with_config(
278            config,
279            self.common.local_authority,
280            self.common.max_listeners,
281        )
282        .await
283    }
284}
285
286impl UPTransportZenohBuilder<SessionBuilderState> {
287    /// Creates the transport based on the provided configuration file.
288    ///
289    /// # Returns
290    ///
291    /// The newly created transport instance. Note that the builder consumes itself.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the transport cannot be created.
296    ///
297    /// # Examples
298    ///
299    /// ```
300    /// #[tokio::main]
301    /// # async fn main() {
302    /// use up_transport_zenoh::UPTransportZenoh;
303    /// use zenoh::config::Config;
304    ///
305    /// let zenoh_session = zenoh::open(Config::default()).await.expect("Failed to open Zenoh session");
306    /// assert!(UPTransportZenoh::builder("local_authority")
307    ///    .expect("Invalid authority name")
308    ///    .with_session(zenoh_session)
309    ///    .with_max_listeners(10)
310    ///    .build()
311    ///    .is_ok());
312    /// # }
313    /// ```
314    pub fn build(self) -> Result<UPTransportZenoh, UStatus> {
315        Ok(UPTransportZenoh::init_with_session(
316            self.extra.zenoh_session,
317            self.common.local_authority,
318            self.common.max_listeners,
319        ))
320    }
321}
322
323impl<S: BuilderState> UPTransportZenohBuilder<S> {
324    /// Sets the maximum number of listeners that can be registered with this transport.
325    /// If not set explicitly, the default value is 100.
326    #[must_use]
327    pub fn with_max_listeners(mut self, max_listeners: usize) -> Self {
328        self.common.max_listeners = max_listeners;
329        self
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use test_case::test_case;
337
338    #[test_case("vehicle1" => true; "succeeds for valid authority name")]
339    #[test_case("This is not an authority name" => false; "fails for invalid authority name")]
340    #[test_case("" => false; "fails for empty authority name")]
341    #[test_case("*" => false; "fails for wildcard authority name")]
342    #[tokio::test(flavor = "multi_thread")]
343    async fn test_getting_a_builder<S: Into<String>>(local_authority: S) -> bool {
344        if let Ok(builder) = UPTransportZenoh::builder(local_authority) {
345            builder
346                .with_config(zenoh_config::Config::default())
347                .build()
348                .await
349                .is_ok()
350        } else {
351            false
352        }
353    }
354}