zenoh_flow_runtime/loader/extensions.rs
1//
2// Copyright (c) 2021 - 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use super::{validate_library, NodeSymbol};
16
17use std::{
18 collections::HashMap,
19 ops::{Deref, DerefMut},
20 path::PathBuf,
21 sync::Arc,
22};
23
24use anyhow::Context;
25use libloading::Library;
26use serde::{Deserialize, Deserializer};
27use zenoh_flow_commons::Result;
28use zenoh_flow_nodes::{OperatorFn, SinkFn, SourceFn};
29
30/// A convenient wrapper for a set of [Extension].
31///
32/// The main purpose of this structure is to facilitate parsing.
33///
34/// # Example configuration
35///
36/// ```
37/// # use zenoh_flow_runtime::Extensions;
38/// # let yaml = r#"
39/// - file_extension: py
40/// libraries:
41/// source: /home/zenoh-flow/extension/libpy_source.so
42/// operator: /home/zenoh-flow/extension/libpy_operator.so
43/// sink: /home/zenoh-flow/extension/libpy_sink.so
44///
45/// - file_extension: js
46/// libraries:
47/// source: /home/zenoh-flow/extension/libwasm_source.so
48/// operator: /home/zenoh-flow/extension/libwasm_operator.so
49/// sink: /home/zenoh-flow/extension/libwasm_sink.so
50/// # "#;
51/// # serde_yaml::from_str::<Extensions>(yaml).unwrap();
52#[derive(Default, Debug, Clone, Deserialize, PartialEq, Eq)]
53#[repr(transparent)]
54pub struct Extensions(
55 #[serde(deserialize_with = "deserialize_extensions")] HashMap<Arc<str>, Extension>,
56);
57
58impl Deref for Extensions {
59 type Target = HashMap<Arc<str>, Extension>;
60
61 fn deref(&self) -> &Self::Target {
62 &self.0
63 }
64}
65
66impl DerefMut for Extensions {
67 fn deref_mut(&mut self) -> &mut Self::Target {
68 &mut self.0
69 }
70}
71
72impl From<Extensions> for HashMap<Arc<str>, Extension> {
73 fn from(value: Extensions) -> Self {
74 value.0
75 }
76}
77
78impl Extensions {
79 /// Returns the [PathBuf] of the library to load for the provided [NodeSymbol].
80 ///
81 /// This function is used in a generic context where we don't actually know which type of node we are manipulating.
82 pub(crate) fn get_library_path(
83 &self,
84 file_extension: &str,
85 symbol: &NodeSymbol,
86 ) -> Option<&PathBuf> {
87 self.get(file_extension).map(|extension| match symbol {
88 NodeSymbol::Source => &extension.libraries.source,
89 NodeSymbol::Operator => &extension.libraries.operator,
90 NodeSymbol::Sink => &extension.libraries.sink,
91 })
92 }
93
94 /// Attempts to add an extension to this Zenoh-Flow [runtime](crate::Runtime).
95 ///
96 /// Note that if a previous entry was added for the same file extension, the previous entry will be returned.
97 ///
98 /// # Errors
99 ///
100 /// This method will return an error if any of the library:
101 /// - does not expose the correct symbol (see these macros: [1], [2], [3]),
102 /// - was not compiled with the same Rust version,
103 /// - was not using the same version of Zenoh-Flow as this [runtime](crate::Runtime).
104 ///
105 /// [1]: zenoh_flow_nodes::prelude::export_source
106 /// [2]: zenoh_flow_nodes::prelude::export_operator
107 /// [3]: zenoh_flow_nodes::prelude::export_sink
108 pub(crate) fn try_add_extension(
109 &mut self,
110 file_extension: impl Into<String>,
111 source: impl Into<PathBuf>,
112 operator: impl Into<PathBuf>,
113 sink: impl Into<PathBuf>,
114 ) -> Result<Option<Extension>> {
115 let file_ext: Arc<str> = file_extension.into().into();
116 let libraries = ExtensionLibraries::new(source.into(), sink.into(), operator.into())?;
117
118 Ok(self.insert(
119 file_ext.clone(),
120 Extension {
121 file_extension: file_ext,
122 libraries,
123 },
124 ))
125 }
126}
127
128/// An `Extension` associates a file extension (e.g. `.py`) to a set of shared libraries.
129///
130/// This details how a Zenoh-Flow runtime should load nodes that have the [url](url::Url) of their implementation with
131/// this extension.
132///
133/// Zenoh-Flow only supports node implementation in the form of [shared libraries]. To support additional implementation
134/// --- for instance [Python scripts] --- a Zenoh-Flow runtime needs to be informed on (i) which shared libraries it
135/// should load and (ii) how it should make these shared libraries "load" the node implementation.
136///
137/// To support an extension on a Zenoh-Flow runtime, one can either detail them in the configuration file of the runtime
138/// or through the dedicated [method](crate::RuntimeBuilder::add_extension()).
139///
140/// # Example configuration
141///
142/// (Yaml)
143///
144/// ```
145/// # use zenoh_flow_runtime::Extension;
146/// # let yaml = r#"
147/// file_extension: py
148/// libraries:
149/// source: /home/zenoh-flow/libpy_source.so
150/// operator: /home/zenoh-flow/libpy_operator.so
151/// sink: /home/zenoh-flow/libpy_sink.so
152/// # "#;
153/// # serde_yaml::from_str::<Extension>(yaml).unwrap();
154/// ```
155///
156/// [shared libraries]: std::env::consts::DLL_EXTENSION
157/// [Python scripts]: https://github.com/eclipse-zenoh/zenoh-flow-python
158// NOTE: We separate the libraries in its own dedicated structure to have that same textual representation (YAML/JSON).
159// There is no real need to do so.
160#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
161pub struct Extension {
162 pub(crate) file_extension: Arc<str>,
163 pub(crate) libraries: ExtensionLibraries,
164}
165
166impl Extension {
167 /// Returns the file extension associated with this extension.
168 ///
169 /// # Example
170 ///
171 /// ```
172 /// # use zenoh_flow_runtime::Extension;
173 /// # let yaml = r#"
174 /// # file_extension: py
175 /// # libraries:
176 /// # source: /home/zenoh-flow/libpy_source.so
177 /// # operator: /home/zenoh-flow/libpy_operator.so
178 /// # sink: /home/zenoh-flow/libpy_sink.so
179 /// # "#;
180 /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
181 /// assert_eq!(extension.file_extension(), "py");
182 /// ```
183 pub fn file_extension(&self) -> &str {
184 &self.file_extension
185 }
186
187 /// Returns the [path](PathBuf) of the shared library responsible for loading Source nodes for this file extension.
188 ///
189 /// # Example
190 ///
191 /// ```
192 /// # use zenoh_flow_runtime::Extension;
193 /// # let yaml = r#"
194 /// # file_extension: py
195 /// # libraries:
196 /// # source: /home/zenoh-flow/libpy_source.so
197 /// # operator: /home/zenoh-flow/libpy_operator.so
198 /// # sink: /home/zenoh-flow/libpy_sink.so
199 /// # "#;
200 /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
201 /// assert_eq!(extension.source().to_str(), Some("/home/zenoh-flow/libpy_source.so"));
202 /// ```
203 pub fn source(&self) -> &PathBuf {
204 &self.libraries.source
205 }
206
207 /// Returns the [path](PathBuf) of the shared library responsible for loading Operator nodes for this file
208 /// extension.
209 ///
210 /// # Example
211 ///
212 /// ```
213 /// # use zenoh_flow_runtime::Extension;
214 /// # let yaml = r#"
215 /// # file_extension: py
216 /// # libraries:
217 /// # source: /home/zenoh-flow/libpy_source.so
218 /// # operator: /home/zenoh-flow/libpy_operator.so
219 /// # sink: /home/zenoh-flow/libpy_sink.so
220 /// # "#;
221 /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
222 /// assert_eq!(extension.operator().to_str(), Some("/home/zenoh-flow/libpy_operator.so"));
223 /// ```
224 pub fn operator(&self) -> &PathBuf {
225 &self.libraries.operator
226 }
227
228 /// Returns the [path](PathBuf) of the shared library responsible for loading Sink nodes for this file extension.
229 ///
230 /// # Example
231 ///
232 /// ```
233 /// # use zenoh_flow_runtime::Extension;
234 /// # let yaml = r#"
235 /// # file_extension: py
236 /// # libraries:
237 /// # source: /home/zenoh-flow/libpy_source.so
238 /// # operator: /home/zenoh-flow/libpy_operator.so
239 /// # sink: /home/zenoh-flow/libpy_sink.so
240 /// # "#;
241 /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
242 /// assert_eq!(extension.sink().to_str(), Some("/home/zenoh-flow/libpy_sink.so"));
243 /// ```
244 pub fn sink(&self) -> &PathBuf {
245 &self.libraries.sink
246 }
247}
248
249#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
250pub(crate) struct ExtensionLibraries {
251 pub(crate) source: PathBuf,
252 pub(crate) sink: PathBuf,
253 pub(crate) operator: PathBuf,
254}
255
256impl ExtensionLibraries {
257 /// Return a new set of extension libraries after validating them.
258 ///
259 /// # Errors
260 ///
261 /// This method will return an error if any of the library:
262 /// - does not expose the correct symbol,
263 /// - was not compiled with the same Rust version,
264 /// - was not using the same Zenoh-Flow version as this Zenoh-Flow [runtime](crate::Runtime).
265 pub(crate) fn new(source: PathBuf, operator: PathBuf, sink: PathBuf) -> Result<Self> {
266 let libraries = Self {
267 source,
268 sink,
269 operator,
270 };
271
272 libraries.validate()?;
273
274 Ok(libraries)
275 }
276
277 /// Validates that all the libraries expose the correct symbols and were compiled with the same Rust and Zenoh-Flow
278 /// versions.
279 ///
280 /// # Errors
281 ///
282 /// This method will return an error if any of the library:
283 /// - does not expose the correct symbol,
284 /// - was not compiled with the same Rust version,
285 /// - was not using the same Zenoh-Flow version as this Zenoh-Flow [runtime](crate::Runtime).
286 //
287 // NOTE: We are separating this method from the `new` method because, when we deserialise this structure, we need to
288 // call `validate` after creating it.
289 pub(crate) fn validate(&self) -> Result<()> {
290 unsafe {
291 validate_library::<SourceFn>(&Library::new(&self.source)?, &NodeSymbol::Source)
292 .with_context(|| format!("{}", self.source.display()))?;
293 validate_library::<OperatorFn>(&Library::new(&self.operator)?, &NodeSymbol::Operator)
294 .with_context(|| format!("{}", self.operator.display()))?;
295 validate_library::<SinkFn>(&Library::new(&self.sink)?, &NodeSymbol::Sink)
296 .with_context(|| format!("{}", self.sink.display()))?;
297 }
298
299 Ok(())
300 }
301}
302
303/// Attempts to deserialise a set of [Extension] from the provided string.
304///
305/// # Errors
306///
307/// This function will return an error if:
308/// - the string cannot be deserialised into a vector of [Extension],
309/// - any [Extension] does not provide valid libraries.
310pub(crate) fn deserialize_extensions<'de, D>(
311 deserializer: D,
312) -> std::result::Result<HashMap<Arc<str>, Extension>, D::Error>
313where
314 D: Deserializer<'de>,
315{
316 let extensions: Vec<Extension> = serde::de::Deserialize::deserialize(deserializer)?;
317 let extensions_map = extensions
318 .into_iter()
319 .map(|extension| (extension.file_extension.clone(), extension))
320 .collect::<HashMap<_, _>>();
321
322 #[cfg(not(feature = "test-utils"))]
323 {
324 for extension in extensions_map.values() {
325 extension.libraries.validate().map_err(|e| {
326 serde::de::Error::custom(format!(
327 "Failed to validate the libraries for extension < {} >: {:?}",
328 extension.file_extension, e
329 ))
330 })?;
331 }
332 }
333
334 Ok(extensions_map)
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_deserialize() {
343 let extensions_yaml = r#"
344- file_extension: py
345 libraries:
346 source: /home/zenoh-flow/extension/libpython_source.so
347 operator: /home/zenoh-flow/extension/libpython_operator.so
348 sink: /home/zenoh-flow/extension/libpython_sink.so
349
350- file_extension: js
351 libraries:
352 source: /home/zenoh-flow/extension/libwasm_source.so
353 operator: /home/zenoh-flow/extension/libwasm_operator.so
354 sink: /home/zenoh-flow/extension/libwasm_sink.so
355"#;
356
357 serde_yaml::from_str::<Extensions>(extensions_yaml)
358 .expect("Failed to deserialize Extensions from YAML");
359 }
360}