zenoh_flow_runtime/loader/
extensions.rs

1//
2// Copyright (c) 2021 - 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use super::{validate_library, NodeSymbol};
16
17use std::{
18    collections::HashMap,
19    ops::{Deref, DerefMut},
20    path::PathBuf,
21    sync::Arc,
22};
23
24use anyhow::Context;
25use libloading::Library;
26use serde::{Deserialize, Deserializer};
27use zenoh_flow_commons::Result;
28use zenoh_flow_nodes::{OperatorFn, SinkFn, SourceFn};
29
30/// A convenient wrapper for a set of [Extension].
31///
32/// The main purpose of this structure is to facilitate parsing.
33///
34/// # Example configuration
35///
36/// ```
37/// # use zenoh_flow_runtime::Extensions;
38/// # let yaml = r#"
39/// - file_extension: py
40///   libraries:
41///     source: /home/zenoh-flow/extension/libpy_source.so
42///     operator: /home/zenoh-flow/extension/libpy_operator.so
43///     sink: /home/zenoh-flow/extension/libpy_sink.so
44///
45/// - file_extension: js
46///   libraries:
47///     source: /home/zenoh-flow/extension/libwasm_source.so
48///     operator: /home/zenoh-flow/extension/libwasm_operator.so
49///     sink: /home/zenoh-flow/extension/libwasm_sink.so
50/// # "#;
51/// # serde_yaml::from_str::<Extensions>(yaml).unwrap();
52#[derive(Default, Debug, Clone, Deserialize, PartialEq, Eq)]
53#[repr(transparent)]
54pub struct Extensions(
55    #[serde(deserialize_with = "deserialize_extensions")] HashMap<Arc<str>, Extension>,
56);
57
58impl Deref for Extensions {
59    type Target = HashMap<Arc<str>, Extension>;
60
61    fn deref(&self) -> &Self::Target {
62        &self.0
63    }
64}
65
66impl DerefMut for Extensions {
67    fn deref_mut(&mut self) -> &mut Self::Target {
68        &mut self.0
69    }
70}
71
72impl From<Extensions> for HashMap<Arc<str>, Extension> {
73    fn from(value: Extensions) -> Self {
74        value.0
75    }
76}
77
78impl Extensions {
79    /// Returns the [PathBuf] of the library to load for the provided [NodeSymbol].
80    ///
81    /// This function is used in a generic context where we don't actually know which type of node we are manipulating.
82    pub(crate) fn get_library_path(
83        &self,
84        file_extension: &str,
85        symbol: &NodeSymbol,
86    ) -> Option<&PathBuf> {
87        self.get(file_extension).map(|extension| match symbol {
88            NodeSymbol::Source => &extension.libraries.source,
89            NodeSymbol::Operator => &extension.libraries.operator,
90            NodeSymbol::Sink => &extension.libraries.sink,
91        })
92    }
93
94    /// Attempts to add an extension to this Zenoh-Flow [runtime](crate::Runtime).
95    ///
96    /// Note that if a previous entry was added for the same file extension, the previous entry will be returned.
97    ///
98    /// # Errors
99    ///
100    /// This method will return an error if any of the library:
101    /// - does not expose the correct symbol (see these macros: [1], [2], [3]),
102    /// - was not compiled with the same Rust version,
103    /// - was not using the same version of Zenoh-Flow as this [runtime](crate::Runtime).
104    ///
105    /// [1]: zenoh_flow_nodes::prelude::export_source
106    /// [2]: zenoh_flow_nodes::prelude::export_operator
107    /// [3]: zenoh_flow_nodes::prelude::export_sink
108    pub(crate) fn try_add_extension(
109        &mut self,
110        file_extension: impl Into<String>,
111        source: impl Into<PathBuf>,
112        operator: impl Into<PathBuf>,
113        sink: impl Into<PathBuf>,
114    ) -> Result<Option<Extension>> {
115        let file_ext: Arc<str> = file_extension.into().into();
116        let libraries = ExtensionLibraries::new(source.into(), sink.into(), operator.into())?;
117
118        Ok(self.insert(
119            file_ext.clone(),
120            Extension {
121                file_extension: file_ext,
122                libraries,
123            },
124        ))
125    }
126}
127
128/// An `Extension` associates a file extension (e.g. `.py`) to a set of shared libraries.
129///
130/// This details how a Zenoh-Flow runtime should load nodes that have the [url](url::Url) of their implementation with
131/// this extension.
132///
133/// Zenoh-Flow only supports node implementation in the form of [shared libraries]. To support additional implementation
134/// --- for instance [Python scripts] --- a Zenoh-Flow runtime needs to be informed on (i) which shared libraries it
135/// should load and (ii) how it should make these shared libraries "load" the node implementation.
136///
137/// To support an extension on a Zenoh-Flow runtime, one can either detail them in the configuration file of the runtime
138/// or through the dedicated [method](crate::RuntimeBuilder::add_extension()).
139///
140/// # Example configuration
141///
142/// (Yaml)
143///
144/// ```
145/// # use zenoh_flow_runtime::Extension;
146/// # let yaml = r#"
147/// file_extension: py
148/// libraries:
149///   source: /home/zenoh-flow/libpy_source.so
150///   operator: /home/zenoh-flow/libpy_operator.so
151///   sink: /home/zenoh-flow/libpy_sink.so
152/// # "#;
153/// # serde_yaml::from_str::<Extension>(yaml).unwrap();
154/// ```
155///
156/// [shared libraries]: std::env::consts::DLL_EXTENSION
157/// [Python scripts]: https://github.com/eclipse-zenoh/zenoh-flow-python
158// NOTE: We separate the libraries in its own dedicated structure to have that same textual representation (YAML/JSON).
159//       There is no real need to do so.
160#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
161pub struct Extension {
162    pub(crate) file_extension: Arc<str>,
163    pub(crate) libraries: ExtensionLibraries,
164}
165
166impl Extension {
167    /// Returns the file extension associated with this extension.
168    ///
169    /// # Example
170    ///
171    /// ```
172    /// # use zenoh_flow_runtime::Extension;
173    /// # let yaml = r#"
174    /// # file_extension: py
175    /// # libraries:
176    /// #   source: /home/zenoh-flow/libpy_source.so
177    /// #   operator: /home/zenoh-flow/libpy_operator.so
178    /// #   sink: /home/zenoh-flow/libpy_sink.so
179    /// # "#;
180    /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
181    /// assert_eq!(extension.file_extension(), "py");
182    /// ```
183    pub fn file_extension(&self) -> &str {
184        &self.file_extension
185    }
186
187    /// Returns the [path](PathBuf) of the shared library responsible for loading Source nodes for this file extension.
188    ///
189    /// # Example
190    ///
191    /// ```
192    /// # use zenoh_flow_runtime::Extension;
193    /// # let yaml = r#"
194    /// # file_extension: py
195    /// # libraries:
196    /// #   source: /home/zenoh-flow/libpy_source.so
197    /// #   operator: /home/zenoh-flow/libpy_operator.so
198    /// #   sink: /home/zenoh-flow/libpy_sink.so
199    /// # "#;
200    /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
201    /// assert_eq!(extension.source().to_str(), Some("/home/zenoh-flow/libpy_source.so"));
202    /// ```
203    pub fn source(&self) -> &PathBuf {
204        &self.libraries.source
205    }
206
207    /// Returns the [path](PathBuf) of the shared library responsible for loading Operator nodes for this file
208    /// extension.
209    ///
210    /// # Example
211    ///
212    /// ```
213    /// # use zenoh_flow_runtime::Extension;
214    /// # let yaml = r#"
215    /// # file_extension: py
216    /// # libraries:
217    /// #   source: /home/zenoh-flow/libpy_source.so
218    /// #   operator: /home/zenoh-flow/libpy_operator.so
219    /// #   sink: /home/zenoh-flow/libpy_sink.so
220    /// # "#;
221    /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
222    /// assert_eq!(extension.operator().to_str(), Some("/home/zenoh-flow/libpy_operator.so"));
223    /// ```
224    pub fn operator(&self) -> &PathBuf {
225        &self.libraries.operator
226    }
227
228    /// Returns the [path](PathBuf) of the shared library responsible for loading Sink nodes for this file extension.
229    ///
230    /// # Example
231    ///
232    /// ```
233    /// # use zenoh_flow_runtime::Extension;
234    /// # let yaml = r#"
235    /// # file_extension: py
236    /// # libraries:
237    /// #   source: /home/zenoh-flow/libpy_source.so
238    /// #   operator: /home/zenoh-flow/libpy_operator.so
239    /// #   sink: /home/zenoh-flow/libpy_sink.so
240    /// # "#;
241    /// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
242    /// assert_eq!(extension.sink().to_str(), Some("/home/zenoh-flow/libpy_sink.so"));
243    /// ```
244    pub fn sink(&self) -> &PathBuf {
245        &self.libraries.sink
246    }
247}
248
249#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
250pub(crate) struct ExtensionLibraries {
251    pub(crate) source: PathBuf,
252    pub(crate) sink: PathBuf,
253    pub(crate) operator: PathBuf,
254}
255
256impl ExtensionLibraries {
257    /// Return a new set of extension libraries after validating them.
258    ///
259    /// # Errors
260    ///
261    /// This method will return an error if any of the library:
262    /// - does not expose the correct symbol,
263    /// - was not compiled with the same Rust version,
264    /// - was not using the same Zenoh-Flow version as this Zenoh-Flow [runtime](crate::Runtime).
265    pub(crate) fn new(source: PathBuf, operator: PathBuf, sink: PathBuf) -> Result<Self> {
266        let libraries = Self {
267            source,
268            sink,
269            operator,
270        };
271
272        libraries.validate()?;
273
274        Ok(libraries)
275    }
276
277    /// Validates that all the libraries expose the correct symbols and were compiled with the same Rust and Zenoh-Flow
278    /// versions.
279    ///
280    /// # Errors
281    ///
282    /// This method will return an error if any of the library:
283    /// - does not expose the correct symbol,
284    /// - was not compiled with the same Rust version,
285    /// - was not using the same Zenoh-Flow version as this Zenoh-Flow [runtime](crate::Runtime).
286    //
287    // NOTE: We are separating this method from the `new` method because, when we deserialise this structure, we need to
288    // call `validate` after creating it.
289    pub(crate) fn validate(&self) -> Result<()> {
290        unsafe {
291            validate_library::<SourceFn>(&Library::new(&self.source)?, &NodeSymbol::Source)
292                .with_context(|| format!("{}", self.source.display()))?;
293            validate_library::<OperatorFn>(&Library::new(&self.operator)?, &NodeSymbol::Operator)
294                .with_context(|| format!("{}", self.operator.display()))?;
295            validate_library::<SinkFn>(&Library::new(&self.sink)?, &NodeSymbol::Sink)
296                .with_context(|| format!("{}", self.sink.display()))?;
297        }
298
299        Ok(())
300    }
301}
302
303/// Attempts to deserialise a set of [Extension] from the provided string.
304///
305/// # Errors
306///
307/// This function will return an error if:
308/// - the string cannot be deserialised into a vector of [Extension],
309/// - any [Extension] does not provide valid libraries.
310pub(crate) fn deserialize_extensions<'de, D>(
311    deserializer: D,
312) -> std::result::Result<HashMap<Arc<str>, Extension>, D::Error>
313where
314    D: Deserializer<'de>,
315{
316    let extensions: Vec<Extension> = serde::de::Deserialize::deserialize(deserializer)?;
317    let extensions_map = extensions
318        .into_iter()
319        .map(|extension| (extension.file_extension.clone(), extension))
320        .collect::<HashMap<_, _>>();
321
322    #[cfg(not(feature = "test-utils"))]
323    {
324        for extension in extensions_map.values() {
325            extension.libraries.validate().map_err(|e| {
326                serde::de::Error::custom(format!(
327                    "Failed to validate the libraries for extension < {} >: {:?}",
328                    extension.file_extension, e
329                ))
330            })?;
331        }
332    }
333
334    Ok(extensions_map)
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn test_deserialize() {
343        let extensions_yaml = r#"
344- file_extension: py
345  libraries:
346    source: /home/zenoh-flow/extension/libpython_source.so
347    operator: /home/zenoh-flow/extension/libpython_operator.so
348    sink: /home/zenoh-flow/extension/libpython_sink.so
349
350- file_extension: js
351  libraries:
352    source: /home/zenoh-flow/extension/libwasm_source.so
353    operator: /home/zenoh-flow/extension/libwasm_operator.so
354    sink: /home/zenoh-flow/extension/libwasm_sink.so
355"#;
356
357        serde_yaml::from_str::<Extensions>(extensions_yaml)
358            .expect("Failed to deserialize Extensions from YAML");
359    }
360}