up_transport_zenoh/lib.rs
1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14/*!
15This crate provides an implementation of the Eclipse Zenoh ™ uProtocol Transport.
16The transport uses Zenoh's publish-subscribe mechanism to exchange messages. It is
17designed to be used in conjunction with the [up-rust](https://crates.io/crates/up_rust)
18crate, which provides the uProtocol message types and utilities.
19
20The transport is designed to run in the context of a [tokio `Runtime`] which
21needs to be configured outside of the transport according to the
22processing requirements of the use case at hand. The transport does
23not make any implicit assumptions about the number of threads available
24and does not spawn any threads itself.
25
26[tokio `Runtime`]: https://docs.rs/tokio/latest/tokio/runtime/index.html
27*/
28
29mod listener_registry;
30pub(crate) mod utransport;
31
32use std::sync::Arc;
33
34use listener_registry::ListenerRegistry;
35use tracing::error;
36use up_rust::{UCode, UStatus, UUri};
37use zenoh::{Config, Session};
38// Re-export Zenoh config
39pub use zenoh::config as zenoh_config;
40
41const UPROTOCOL_MAJOR_VERSION: u8 = 1;
42const DEFAULT_MAX_LISTENERS: usize = 100;
43
44/// An Eclipse Zenoh ™ based uProtocol transport implementation.
45///
46/// The transport registers callbacks on the Zenoh runtime for listeners that
47/// are being registered using `up_rust::UTransport::register_listener`.
48///
49/// <div class="warning">
50///
51/// The registered listeners are being invoked sequentially on the **same thread**
52/// that the callback is being executed on. Implementers of listeners are therefore
53/// **strongly advised** to move non-trivial processing logic to **another/dedicated
54/// thread**, if necessary. Please refer to `subscriber` and `notification_receiver`
55/// in the examples directory for how this can be done.
56///
57/// </div>
58pub struct UPTransportZenoh {
59 session: Arc<Session>,
60 subscribers: ListenerRegistry,
61 local_authority: String,
62}
63
64impl UPTransportZenoh {
65 /// Gets a builder for creating a new Zenoh transport.
66 ///
67 /// # Arguments
68 ///
69 /// * `local_uri` - The URI identifying the (local) uEntity that the transport runs on.
70 ///
71 /// # Errors
72 ///
73 /// Returns an error if the URI contains an empty or wildcard authority name
74 /// or has a non-zero resource ID.
75 pub fn builder<U: Into<String>>(
76 local_authority: U,
77 ) -> Result<UPTransportZenohBuilder<InitialBuilderState>, UStatus> {
78 let authority_name = local_authority.into();
79 if authority_name.is_empty() || &authority_name == "*" {
80 return Err(UStatus::fail_with_code(
81 UCode::INVALID_ARGUMENT,
82 "Authority name must be non-empty and must not be the wildcard authority name",
83 ));
84 }
85
86 UUri::verify_authority(&authority_name).map_err(|err| {
87 UStatus::fail_with_code(
88 UCode::INVALID_ARGUMENT,
89 format!("Invalid authority name: {err}"),
90 )
91 })?;
92
93 Ok(UPTransportZenohBuilder {
94 common: Box::new(CommonProperties {
95 local_authority: authority_name,
96 max_listeners: DEFAULT_MAX_LISTENERS,
97 }),
98 extra: InitialBuilderState,
99 })
100 }
101
102 async fn init_with_config(
103 config: Config,
104 local_authority: String,
105 max_listeners: usize,
106 ) -> Result<UPTransportZenoh, UStatus> {
107 let session = zenoh::open(config).await.map_err(|err| {
108 let msg = "Failed to open Zenoh session";
109 error!("{msg}: {err}");
110 UStatus::fail_with_code(UCode::INTERNAL, msg)
111 })?;
112 Ok(Self::init_with_session(
113 session,
114 local_authority,
115 max_listeners,
116 ))
117 }
118
119 fn init_with_session(
120 session: Session,
121 local_authority: String,
122 max_listeners: usize,
123 ) -> UPTransportZenoh {
124 let session_to_use = Arc::new(session);
125 UPTransportZenoh {
126 session: session_to_use.clone(),
127 subscribers: ListenerRegistry::new(session_to_use, max_listeners),
128 local_authority,
129 }
130 }
131
132 /// Enables a tracing formatter subscriber that is initialized from the `RUST_LOG` environment variable.
133 pub fn try_init_log_from_env() {
134 zenoh::init_log_from_env_or("");
135 }
136}
137
138struct CommonProperties {
139 local_authority: String,
140 max_listeners: usize,
141}
142
143pub struct InitialBuilderState;
144pub struct ConfigBuilderState {
145 config: zenoh_config::Config,
146}
147pub struct ConfigPathBuilderState {
148 config_path: String,
149}
150
151pub struct SessionBuilderState {
152 zenoh_session: Session,
153}
154
155pub trait BuilderState {}
156impl BuilderState for InitialBuilderState {}
157impl BuilderState for ConfigBuilderState {}
158impl BuilderState for ConfigPathBuilderState {}
159impl BuilderState for SessionBuilderState {}
160
161pub struct UPTransportZenohBuilder<S: BuilderState> {
162 common: Box<CommonProperties>,
163 extra: S,
164}
165
166impl UPTransportZenohBuilder<InitialBuilderState> {
167 /// Sets the Zenoh configuration to use for the transport.
168 ///
169 /// Please refer to the [Zenoh documentation](https://zenoh.io/docs/manual/configuration/) for details.
170 #[must_use]
171 pub fn with_config(
172 self,
173 config: zenoh_config::Config,
174 ) -> UPTransportZenohBuilder<ConfigBuilderState> {
175 UPTransportZenohBuilder {
176 common: self.common,
177 extra: ConfigBuilderState { config },
178 }
179 }
180
181 /// Sets the path to a Zenoh configuration file to use for the transport.
182 ///
183 /// Please refer to the [Zenoh documentation](https://zenoh.io/docs/manual/configuration/) for details.
184 #[must_use]
185 pub fn with_config_path(
186 self,
187 config_path: String,
188 ) -> UPTransportZenohBuilder<ConfigPathBuilderState> {
189 UPTransportZenohBuilder {
190 common: self.common,
191 extra: ConfigPathBuilderState { config_path },
192 }
193 }
194
195 /// Sets an existing Zenoh session to use for the transport.
196 #[must_use]
197 pub fn with_session(
198 self,
199 zenoh_session: Session,
200 ) -> UPTransportZenohBuilder<SessionBuilderState> {
201 UPTransportZenohBuilder {
202 common: self.common,
203 extra: SessionBuilderState { zenoh_session },
204 }
205 }
206}
207
208impl UPTransportZenohBuilder<ConfigBuilderState> {
209 /// Creates the transport based on the provided configuration properties.
210 ///
211 /// # Returns
212 ///
213 /// The newly created transport instance. Note that the builder consumes itself.
214 ///
215 /// # Errors
216 ///
217 /// Returns an error if the transport cannot be created.
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// #[tokio::main]
223 /// # async fn main() {
224 /// use up_transport_zenoh::{zenoh_config, UPTransportZenoh};
225 ///
226 /// assert!(UPTransportZenoh::builder("local_authority")
227 /// .expect("Invalid authority name")
228 /// .with_config(zenoh_config::Config::default())
229 /// .with_max_listeners(10)
230 /// .build()
231 /// .await
232 /// .is_ok());
233 /// # }
234 /// ```
235 pub async fn build(self) -> Result<UPTransportZenoh, UStatus> {
236 UPTransportZenoh::init_with_config(
237 self.extra.config,
238 self.common.local_authority,
239 self.common.max_listeners,
240 )
241 .await
242 }
243}
244
245impl UPTransportZenohBuilder<ConfigPathBuilderState> {
246 /// Creates the transport based on the provided configuration file.
247 ///
248 /// # Returns
249 ///
250 /// The newly created transport instance. Note that the builder consumes itself.
251 ///
252 /// # Errors
253 ///
254 /// Returns an error if the transport cannot be created, e.g. because the configuration
255 /// file cannot be read or is invalid.
256 ///
257 /// # Examples
258 ///
259 /// ```
260 /// #[tokio::main]
261 /// # async fn main() {
262 /// use up_transport_zenoh::UPTransportZenoh;
263 ///
264 /// assert!(UPTransportZenoh::builder("local_authority")
265 /// .expect("Invalid authority name")
266 /// .with_config_path("non-existing-config.json5".to_string())
267 /// .build()
268 /// .await
269 /// .is_err_and(|e| e.get_code() == up_rust::UCode::INVALID_ARGUMENT));
270 /// # }
271 /// ```
272 pub async fn build(self) -> Result<UPTransportZenoh, UStatus> {
273 let config = zenoh_config::Config::from_file(self.extra.config_path).map_err(|e| {
274 error!("Failed to load Zenoh config from file: {e}");
275 UStatus::fail_with_code(UCode::INVALID_ARGUMENT, e.to_string())
276 })?;
277 UPTransportZenoh::init_with_config(
278 config,
279 self.common.local_authority,
280 self.common.max_listeners,
281 )
282 .await
283 }
284}
285
286impl UPTransportZenohBuilder<SessionBuilderState> {
287 /// Creates the transport based on the provided configuration file.
288 ///
289 /// # Returns
290 ///
291 /// The newly created transport instance. Note that the builder consumes itself.
292 ///
293 /// # Errors
294 ///
295 /// Returns an error if the transport cannot be created.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// #[tokio::main]
301 /// # async fn main() {
302 /// use up_transport_zenoh::UPTransportZenoh;
303 /// use zenoh::config::Config;
304 ///
305 /// let zenoh_session = zenoh::open(Config::default()).await.expect("Failed to open Zenoh session");
306 /// assert!(UPTransportZenoh::builder("local_authority")
307 /// .expect("Invalid authority name")
308 /// .with_session(zenoh_session)
309 /// .with_max_listeners(10)
310 /// .build()
311 /// .is_ok());
312 /// # }
313 /// ```
314 pub fn build(self) -> Result<UPTransportZenoh, UStatus> {
315 Ok(UPTransportZenoh::init_with_session(
316 self.extra.zenoh_session,
317 self.common.local_authority,
318 self.common.max_listeners,
319 ))
320 }
321}
322
323impl<S: BuilderState> UPTransportZenohBuilder<S> {
324 /// Sets the maximum number of listeners that can be registered with this transport.
325 /// If not set explicitly, the default value is 100.
326 #[must_use]
327 pub fn with_max_listeners(mut self, max_listeners: usize) -> Self {
328 self.common.max_listeners = max_listeners;
329 self
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use test_case::test_case;
337
338 #[test_case("vehicle1" => true; "succeeds for valid authority name")]
339 #[test_case("This is not an authority name" => false; "fails for invalid authority name")]
340 #[test_case("" => false; "fails for empty authority name")]
341 #[test_case("*" => false; "fails for wildcard authority name")]
342 #[tokio::test(flavor = "multi_thread")]
343 async fn test_getting_a_builder<S: Into<String>>(local_authority: S) -> bool {
344 if let Ok(builder) = UPTransportZenoh::builder(local_authority) {
345 builder
346 .with_config(zenoh_config::Config::default())
347 .build()
348 .await
349 .is_ok()
350 } else {
351 false
352 }
353 }
354}