use super::instance::builtin::zenoh::{get_zenoh_sink_declaration, get_zenoh_source_declaration};
use super::node::{
ConstructorFn, OperatorConstructor, OperatorFn, SinkConstructor, SinkFn, SourceConstructor,
SourceFn,
};
use crate::model::record::{OperatorRecord, SinkRecord, SourceRecord};
use crate::model::{Middleware, ZFUri};
use crate::types::Configuration;
use crate::utils::parse_uri;
use crate::zfresult::ErrorKind;
use crate::Result;
use crate::{bail, zferror};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[cfg(target_family = "unix")]
use libloading::os::unix::Library;
#[cfg(target_family = "windows")]
use libloading::Library;
#[cfg(target_family = "unix")]
static LOAD_FLAGS: std::os::raw::c_int =
libloading::os::unix::RTLD_NOW | libloading::os::unix::RTLD_LOCAL;
pub static CORE_VERSION: &str = env!("CARGO_PKG_VERSION");
pub static RUSTC_VERSION: &str = env!("RUSTC_VERSION");
pub static EXT_FILE_EXTENSION: &str = "zfext";
pub(crate) enum NodeSymbol {
Source,
Operator,
Sink,
}
impl NodeSymbol {
pub(crate) fn to_bytes(&self) -> &[u8] {
match self {
NodeSymbol::Source => b"_zf_export_source\0",
NodeSymbol::Operator => b"_zf_export_operator\0",
NodeSymbol::Sink => b"_zf_export_sink\0",
}
}
}
pub struct NodeDeclaration<C> {
pub rustc_version: &'static str,
pub core_version: &'static str,
pub constructor: C,
}
pub type SourceDeclaration = NodeDeclaration<SourceFn>;
pub type OperatorDeclaration = NodeDeclaration<OperatorFn>;
pub type SinkDeclaration = NodeDeclaration<SinkFn>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExtensibleImplementation {
pub(crate) name: String,
pub(crate) file_extension: String,
pub(crate) source_lib: String,
pub(crate) sink_lib: String,
pub(crate) operator_lib: String,
pub(crate) config_lib_key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoaderConfig {
extensions: Vec<ExtensibleImplementation>,
}
impl LoaderConfig {
pub fn new() -> Self {
Self { extensions: vec![] }
}
pub fn try_add_extension(&mut self, ext: ExtensibleImplementation) -> Result<()> {
if self.extensions.iter().any(|e| e.name == ext.name) {
return Err(zferror!(ErrorKind::Duplicate).into());
}
self.extensions.push(ext);
Ok(())
}
pub fn remove_extension(&mut self, name: &str) -> Option<ExtensibleImplementation> {
if let Some(index) = self.extensions.iter().position(|e| e.name == name) {
let ext = self.extensions.remove(index);
return Some(ext);
}
None
}
pub fn get_extension_by_file_extension(
&self,
file_extension: &str,
) -> Option<&ExtensibleImplementation> {
if let Some(ext) = self
.extensions
.iter()
.find(|e| e.file_extension == file_extension)
{
return Some(ext);
}
None
}
pub fn get_extension_by_name(&self, name: &str) -> Option<&ExtensibleImplementation> {
if let Some(ext) = self.extensions.iter().find(|e| e.name == name) {
return Some(ext);
}
None
}
}
impl Default for LoaderConfig {
fn default() -> Self {
Self::new()
}
}
pub struct Loader {
pub(crate) config: LoaderConfig,
}
impl Loader {
pub fn new(config: LoaderConfig) -> Self {
Self { config }
}
unsafe fn load_node_from_file<T: ConstructorFn>(
&self,
node_symbol: NodeSymbol,
file_path: PathBuf,
configuration: &mut Option<Configuration>,
) -> Result<(Library, T)> {
let file_extension = crate::utils::get_file_extension(&file_path).ok_or_else(|| {
zferror!(
ErrorKind::LoadingError,
"Missing file extension for < {:?} >",
file_path,
)
})?;
let library_path = if crate::utils::is_dynamic_library(&file_extension) {
file_path
} else {
match self.config.get_extension_by_file_extension(&file_extension) {
Some(e) => {
Self::wrap_configuration(configuration, e.config_lib_key.clone(), &file_path)?;
let lib = match node_symbol {
NodeSymbol::Source => &e.source_lib,
NodeSymbol::Operator => &e.operator_lib,
NodeSymbol::Sink => &e.sink_lib,
};
std::fs::canonicalize(lib)?
}
_ => bail!(ErrorKind::Unimplemented),
}
};
log::trace!("[Loader] loading library {:?}", library_path);
#[cfg(target_family = "unix")]
let library = Library::open(Some(library_path.clone()), LOAD_FLAGS)?;
#[cfg(target_family = "windows")]
let library = Library::new(library_path)?;
let decl = library
.get::<*mut NodeDeclaration<T>>(node_symbol.to_bytes())?
.read();
if decl.rustc_version != RUSTC_VERSION || decl.core_version != CORE_VERSION {
return Err(zferror!(
ErrorKind::VersionMismatch,
"Library {} rustc expected {} rustc found {} - Zenoh-Flow expected {} Zenoh-Flow found {}",
library_path.display(),
RUSTC_VERSION,
decl.rustc_version,
CORE_VERSION,
decl.core_version
)
.into());
}
Ok((library, decl.constructor))
}
fn load_source_from_builtin(&self, middleware: Middleware) -> Result<SourceFn> {
match middleware {
Middleware::Zenoh => {
let declaration = get_zenoh_source_declaration();
Ok(declaration.constructor)
}
}
}
fn load_sink_from_builtin(&self, middleware: Middleware) -> Result<SinkFn> {
match middleware {
Middleware::Zenoh => {
let declaration = get_zenoh_sink_declaration();
Ok(declaration.constructor)
}
}
}
pub(crate) fn load_source_constructor(
&self,
mut record: SourceRecord,
) -> Result<SourceConstructor> {
if let Some(uri) = &record.uri {
match parse_uri(uri)? {
ZFUri::File(file_path) => {
let (library, constructor) = unsafe {
self.load_node_from_file::<SourceFn>(
NodeSymbol::Source,
file_path,
&mut record.configuration,
)?
};
Ok(SourceConstructor::new_dynamic(
record,
constructor,
Arc::new(library),
))
}
ZFUri::Builtin(mw) => {
let constructor = self.load_source_from_builtin(mw)?;
Ok(SourceConstructor::new_static(record, constructor))
}
}
} else {
bail!(
ErrorKind::LoadingError,
"Missing URI for dynamically loaded Source < {} >.",
record.id.clone()
)
}
}
pub(crate) fn load_operator_constructor(
&self,
mut record: OperatorRecord,
) -> Result<OperatorConstructor> {
if let Some(uri) = &record.uri {
match parse_uri(uri)? {
ZFUri::File(file_path) => {
let (library, constructor) = unsafe {
self.load_node_from_file::<OperatorFn>(
NodeSymbol::Operator,
file_path,
&mut record.configuration,
)?
};
Ok(OperatorConstructor::new_dynamic(
record,
constructor,
Arc::new(library),
))
}
ZFUri::Builtin(_mw) => {
bail!(
ErrorKind::Unimplemented,
"Loading builtin operators is not supported < {} >.",
record.id.clone()
)
}
}
} else {
bail!(
ErrorKind::LoadingError,
"Missing URI for dynamically loaded Operator < {} >.",
record.id.clone()
)
}
}
pub(crate) fn load_sink_constructor(&self, mut record: SinkRecord) -> Result<SinkConstructor> {
if let Some(uri) = &record.uri {
match parse_uri(uri)? {
ZFUri::File(file_path) => {
let (library, constructor) = unsafe {
self.load_node_from_file::<SinkFn>(
NodeSymbol::Sink,
file_path,
&mut record.configuration,
)?
};
Ok(SinkConstructor::new_dynamic(
record,
constructor,
Arc::new(library),
))
}
ZFUri::Builtin(mw) => {
let constructor = self.load_sink_from_builtin(mw)?;
Ok(SinkConstructor::new_static(record, constructor))
}
}
} else {
bail!(
ErrorKind::LoadingError,
"Missing URI for dynamically loaded Sink < {} >.",
record.id.clone()
)
}
}
fn wrap_configuration(
configuration: &mut Option<Configuration>,
config_key: String,
file_path: &Path,
) -> Result<()> {
let mut new_config: serde_json::map::Map<String, Configuration> =
serde_json::map::Map::new();
let config = configuration.take();
new_config.insert(
config_key,
file_path
.to_str()
.ok_or_else(|| {
zferror!(
ErrorKind::LoadingError,
"Unable parse file path < {:?} >.",
file_path,
)
})?
.into(),
);
if let Some(config) = config {
new_config.insert(String::from("configuration"), config);
}
*configuration = Some(new_config.into());
Ok(())
}
}