use crate::{
bail,
model::descriptor::{SinkDescriptor, SourceDescriptor},
prelude::{
zferror, Configuration, Context, ErrorKind, InputRaw, Inputs, Node, OutputRaw, Outputs,
PortId, Sink, Source,
},
runtime::dataflow::{
loader::{NodeDeclaration, CORE_VERSION, RUSTC_VERSION},
node::{SinkFn, SourceFn},
},
types::LinkMessage,
Result as ZFResult,
};
use async_std::sync::Mutex;
use async_trait::async_trait;
use flume::{Receiver, RecvError};
use futures::{future::select_all, Future};
use std::mem;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};
use zenoh::{
prelude::r#async::*, publication::Publisher, shm::SharedMemoryManager, subscriber::Subscriber,
};
static KEY_KEYEXPRESSIONS: &str = "key-expressions";
static KEY_SHM_ELEM_SIZE: &str = "shared_memory_element_size";
static KEY_SHM_TOTAL_ELEMENTS: &str = "shared_memory_elements";
static KEY_SHM_BACKOFF: &str = "shared_memory_backoff";
pub(crate) type ZSubFut =
Pin<Box<dyn Future<Output = (PortId, Result<Sample, RecvError>)> + Send + Sync>>;
fn wait_zenoh_sub(id: PortId, sub: &Subscriber<Receiver<Sample>>) -> ZSubFut {
let sub = sub.receiver.clone();
Box::pin(async move { (id, sub.recv_async().await) })
}
pub(crate) struct ZenohSource<'a> {
_session: Arc<Session>,
outputs: HashMap<PortId, OutputRaw>,
subscribers: HashMap<PortId, Subscriber<'a, Receiver<Sample>>>,
futs: Arc<Mutex<Vec<ZSubFut>>>,
}
pub(crate) fn get_zenoh_source_declaration() -> NodeDeclaration<SourceFn> {
NodeDeclaration::<SourceFn> {
rustc_version: RUSTC_VERSION,
core_version: CORE_VERSION,
constructor: |context: Context, configuration: Option<Configuration>, outputs: Outputs| {
Box::pin(async {
let node = ZenohSource::new(context, configuration, outputs).await?;
Ok(Arc::new(node) as Arc<dyn Node>)
})
},
}
}
pub(crate) fn get_zenoh_source_descriptor(
configuration: &Configuration,
) -> ZFResult<SourceDescriptor> {
let mut outputs = vec![];
let local_configuration = configuration.as_object().ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert configuration to HashMap: {:?}",
configuration
)
})?;
let keyexpressions = local_configuration.get(KEY_KEYEXPRESSIONS).ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Missing key-expressions in builtin sink configuration"
)
})?;
let keyexpressions = keyexpressions.as_object().ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert configuration to HashMap: {:?}",
configuration
)
})?;
for id in keyexpressions.keys() {
outputs.push(id.clone().into());
}
Ok(SourceDescriptor {
id: "zenoh-source".into(),
outputs,
uri: Some("builtin://zenoh".to_string()),
configuration: Some(configuration.clone()),
})
}
#[async_trait]
impl<'a> Source for ZenohSource<'a> {
async fn new(
context: Context,
configuration: Option<Configuration>,
mut outputs: Outputs,
) -> ZFResult<Self> {
let mut source_outputs: HashMap<PortId, OutputRaw> = HashMap::new();
let mut subscribers: HashMap<PortId, Subscriber<'a, Receiver<Sample>>> = HashMap::new();
match configuration {
Some(configuration) => {
let keyexpressions = configuration.get(KEY_KEYEXPRESSIONS).ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Missing key-expressions in builtin sink configuration"
)
})?;
let keyexpressions = keyexpressions.as_object().ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert configuration to HashMap: {:?}",
configuration
)
})?;
for (id, value) in keyexpressions {
let ke = value
.as_str()
.ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert value to string: {:?}",
value
)
})?
.to_string();
let output = outputs
.take(id)
.ok_or(zferror!(
ErrorKind::MissingOutput(id.clone()),
"Unable to find output: {id}"
))?
.raw();
let subscriber = context
.zenoh_session()
.declare_subscriber(&ke)
.res()
.await?;
subscribers.insert(id.clone().into(), subscriber);
source_outputs.insert(id.clone().into(), output);
}
let futs = subscribers
.iter()
.map(|(id, sub)| wait_zenoh_sub(id.clone(), sub))
.collect();
Ok(ZenohSource {
_session: context.zenoh_session(),
outputs: source_outputs,
subscribers,
futs: Arc::new(Mutex::new(futs)),
})
}
None => {
bail!(
ErrorKind::MissingConfiguration,
"Builtin ZenohSource needs a configuration!"
)
}
}
}
}
#[async_trait]
impl<'a> Node for ZenohSource<'a> {
async fn iteration(&self) -> ZFResult<()> {
let mut futs = self.futs.lock().await;
let tmp = mem::take(&mut (*futs));
let ((id, result), _index, mut remaining) = select_all(tmp).await;
match result {
Ok(sample) => {
let data = sample.payload.contiguous().to_vec();
let ke = sample.key_expr;
log::trace!(
"[ZenohSource] Received data from {ke:?} Len: {} for output: {id}",
data.len()
);
let output = self.outputs.get(&id).ok_or(zferror!(
ErrorKind::MissingOutput(id.to_string()),
"Unable to find output!"
))?;
output.send(data, None).await?;
}
Err(e) => log::error!("[ZenohSource] got a Zenoh error from output {id} : {e:?}"),
}
let sub = self.subscribers.get(&id).ok_or_else(|| {
zferror!(
ErrorKind::RecvError,
"Unable to find < {id} > for built-in Zenoh Source"
)
})?;
remaining.push(wait_zenoh_sub(id, sub));
*futs = remaining;
Ok(())
}
}
pub(crate) type ZFInputFut =
Pin<Box<dyn Future<Output = (PortId, ZFResult<LinkMessage>)> + Send + Sync>>;
fn wait_flow_input(id: PortId, input: &InputRaw) -> ZFInputFut {
let input = input.clone();
Box::pin(async move { (id, input.recv().await) })
}
pub(crate) struct ZenohSink<'a> {
_session: Arc<Session>,
inputs: HashMap<PortId, InputRaw>,
publishers: HashMap<PortId, Publisher<'a>>,
state: Arc<Mutex<ZenohSinkState>>,
shm_element_size: usize,
shm_backoff: u64,
}
pub(crate) struct ZenohSinkState {
pub(crate) futs: Vec<ZFInputFut>,
pub(crate) shm: Option<SharedMemoryManager>,
pub(crate) buffer: Vec<u8>,
}
pub(crate) fn get_zenoh_sink_declaration() -> NodeDeclaration<SinkFn> {
NodeDeclaration::<SinkFn> {
rustc_version: RUSTC_VERSION,
core_version: CORE_VERSION,
constructor: |context: Context, configuration: Option<Configuration>, inputs: Inputs| {
Box::pin(async {
let node = ZenohSink::new(context, configuration, inputs).await?;
Ok(Arc::new(node) as Arc<dyn Node>)
})
},
}
}
pub(crate) fn get_zenoh_sink_descriptor(configuration: &Configuration) -> ZFResult<SinkDescriptor> {
let mut inputs = vec![];
let local_configuration = configuration.as_object().ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert configuration to HashMap: {:?}",
configuration
)
})?;
let keyexpressions = local_configuration.get(KEY_KEYEXPRESSIONS).ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Missing key-expressions in builtin sink configuration"
)
})?;
let keyexpressions = keyexpressions.as_object().ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert configuration to HashMap: {:?}",
configuration
)
})?;
for id in keyexpressions.keys() {
inputs.push(id.clone().into());
}
Ok(SinkDescriptor {
id: "zenoh-sink".into(),
inputs,
uri: Some("builtin://zenoh".to_string()),
configuration: Some(configuration.clone()),
})
}
#[async_trait]
impl<'a> Sink for ZenohSink<'a> {
async fn new(
context: Context,
configuration: Option<Configuration>,
mut inputs: Inputs,
) -> ZFResult<Self> {
let mut sink_inputs: HashMap<PortId, InputRaw> = HashMap::new();
let mut publishers: HashMap<PortId, Publisher<'a>> = HashMap::new();
let id = uuid::Uuid::new_v4().to_string();
match configuration {
Some(configuration) => {
let get_or_default = |configuration: &serde_json::Value, key, default| {
if let Some(value) = configuration.get(key) {
if let Some(value) = value.as_u64() {
return value;
}
log::warn!("Failed to parse / interpret provided value for {key} into u64, using default value: {default}");
}
default
};
let mut shm_element_size = 0;
let mut shm_backoff = 0;
let mut shm_manager = None;
if *context.shared_memory_enabled() {
shm_element_size = get_or_default(
&configuration,
KEY_SHM_ELEM_SIZE,
*context.shared_memory_element_size() as u64,
) as usize;
let shm_elem_count = get_or_default(
&configuration,
KEY_SHM_TOTAL_ELEMENTS,
*context.shared_memory_elements() as u64,
) as usize;
shm_backoff = get_or_default(
&configuration,
KEY_SHM_BACKOFF,
*context.shared_memory_backoff(),
);
let shm_size = shm_element_size * shm_elem_count;
shm_manager = Some(SharedMemoryManager::make(id, shm_size).map_err(|_| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to allocate {shm_size} bytes of shared memory"
)
})?);
}
let keyexpressions = configuration.get(KEY_KEYEXPRESSIONS).ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Missing key-expressions in builtin sink configuration"
)
})?;
let keyexpressions = keyexpressions.as_object().ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert configuration to HashMap: {:?}",
configuration
)
})?;
for (id, value) in keyexpressions {
let ke = value
.as_str()
.ok_or_else(|| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to convert value to string: {:?}",
value
)
})?
.to_string();
let input = inputs
.take(id)
.ok_or(zferror!(
ErrorKind::MissingInput(id.clone()),
"Unable to find input: {id}"
))?
.raw();
let subscriber = context.zenoh_session().declare_publisher(ke).res().await?;
publishers.insert(id.clone().into(), subscriber);
sink_inputs.insert(id.clone().into(), input);
}
let futs = sink_inputs
.iter()
.map(|(id, input)| wait_flow_input(id.clone(), input))
.collect();
Ok(ZenohSink {
_session: context.zenoh_session(),
inputs: sink_inputs,
publishers,
state: Arc::new(Mutex::new(ZenohSinkState {
futs,
shm: shm_manager,
buffer: Vec::new(),
})),
shm_element_size,
shm_backoff,
})
}
None => {
bail!(
ErrorKind::MissingConfiguration,
"Builtin ZenohSink needs a configuration!"
)
}
}
}
}
#[async_trait]
impl<'a> Node for ZenohSink<'a> {
async fn iteration(&self) -> ZFResult<()> {
let mut state = self.state.lock().await;
let tmp = mem::take(&mut state.futs);
let ((id, result), _index, mut remaining) = select_all(tmp).await;
match result {
Ok(LinkMessage::Data(dm)) => {
dm.try_as_bytes_into(&mut state.buffer)?;
let publisher = self.publishers.get(&id).ok_or_else(|| {
zferror!(ErrorKind::SendError, "Unable to find Publisher for {id}")
})?;
match state.shm {
Some(ref mut shm) => {
let mut buff = match shm.alloc(self.shm_element_size) {
Ok(buf) => buf,
Err(_) => {
async_std::task::sleep(std::time::Duration::from_nanos(
self.shm_backoff,
))
.await;
log::trace!(
"After failing allocation the GC collected: {} bytes -- retrying",
shm.garbage_collect()
);
log::trace!(
"Trying to de-fragment memory... De-fragmented {} bytes",
shm.defragment()
);
shm.alloc(self.shm_element_size).map_err(|_| {
zferror!(
ErrorKind::ConfigurationError,
"Unable to allocated {} in the shared memory buffer!",
self.shm_element_size
)
})?
}
};
let slice = unsafe { buff.as_mut_slice() };
let data_len = state.buffer.len();
if data_len < slice.len() {
slice[0..data_len].copy_from_slice(&state.buffer);
publisher.put(buff).res().await?;
} else {
log::warn!("[ZenohSink] Sending data via network as we are unable to send it over shared memory, the serialized size is {} while shared memory is {}", data_len, self.shm_element_size);
publisher.put(state.buffer.as_slice()).res().await?;
}
}
None => publisher.put(state.buffer.as_slice()).res().await?,
};
}
Ok(_) => (), Err(e) => log::error!("[ZenohSink] got error on link {id}: {e:?}"),
}
let input = self.inputs.get(&id).ok_or_else(|| {
zferror!(
ErrorKind::RecvError,
"Unable to find input < {id} > for built-in Zenoh Sink"
)
})?;
remaining.push(wait_flow_input(id, input));
state.futs = remaining;
Ok(())
}
}
#[cfg(test)]
#[path = "./tests/builtin-zenoh.rs"]
mod tests;