#![allow(unused)]
#[cfg(feature = "data_bincode")]
extern crate bincode;
#[cfg(feature = "data_cbor")]
extern crate serde_cbor;
#[cfg(feature = "data_json")]
extern crate serde_json;
use crate::model::record::DataFlowRecord;
use crate::model::registry::RegistryNode;
use crate::runtime::{RuntimeConfig, RuntimeInfo, RuntimeStatus};
use crate::zfresult::ErrorKind;
use crate::Result;
use crate::{bail, zferror};
use async_std::pin::Pin;
use async_std::stream::Stream;
use async_std::task::{Context, Poll};
use futures::StreamExt;
use futures_lite::FutureExt;
use pin_project_lite::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::convert::TryFrom;
use std::sync::Arc;
use uhlc::HLC;
use uuid::Uuid;
use zenoh::prelude::r#async::*;
use zenoh::query::Reply;
use super::Job;
pub static ROOT_PLUGIN_RUNTIME_PREFIX: &str = "@/router/";
pub static ROOT_PLUGIN_RUNTIME_SUFFIX: &str = "plugin/zenoh-flow";
pub static ROOT_STANDALONE: &str = "zenoh-flow";
pub static KEY_RUNTIMES: &str = "runtimes";
pub static KEY_REGISTRY: &str = "registry";
pub static KEY_FLOWS: &str = "flows";
pub static KEY_GRAPHS: &str = "graphs";
pub static KEY_INFO: &str = "info";
pub static KEY_STATUS: &str = "status";
pub static KEY_CONFIGURATION: &str = "configuration";
pub static KEY_JOB_QUEUE: &str = "job-queue";
pub static KEY_JOB_SUBMITTED: &str = "sumbitted";
pub static KEY_JOB_STARTED: &str = "started";
pub static KEY_JOB_DONE: &str = "done";
pub static KEY_JOB_FAILED: &str = "failed";
#[macro_export]
macro_rules! RT_INFO_PATH {
($prefix:expr, $rtid:expr) => {
format!(
"{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rtid,
$crate::runtime::resources::KEY_INFO
)
};
}
#[macro_export]
macro_rules! RT_STATUS_PATH {
($prefix:expr, $rtid:expr) => {
format!(
"{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rtid,
$crate::runtime::resources::KEY_STATUS
)
};
}
#[macro_export]
macro_rules! RT_CONFIGURATION_PATH {
($prefix:expr, $rtid:expr) => {
format!(
"{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rtid,
$crate::runtime::resources::KEY_CONFIGURATION
)
};
}
#[macro_export]
macro_rules! RT_FLOW_PATH {
($prefix:expr, $rtid:expr, $fid:expr, $iid:expr) => {
format!(
"{}/{}/{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rtid,
$crate::runtime::resources::KEY_FLOWS,
$fid,
$iid
)
};
}
#[macro_export]
macro_rules! RT_FLOW_SELECTOR_BY_INSTANCE {
($prefix:expr, $rtid:expr, $iid:expr) => {
format!(
"{}/{}/{}/{}/*/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rtid,
$crate::runtime::resources::KEY_FLOWS,
$iid
)
};
}
#[macro_export]
macro_rules! RT_FLOW_SELECTOR_BY_FLOW {
($prefix:expr, $rtid:expr, $fid:expr) => {
format!(
"{}/{}/{}/{}/{}/*",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rtid,
$crate::runtime::resources::KEY_FLOWS,
$fid
)
};
}
#[macro_export]
macro_rules! RT_FLOW_SELECTOR_ALL {
($prefix:expr, $rtid:expr) => {
format!(
"{}/{}/{}/{}/*/*",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rtid,
$crate::runtime::resources::KEY_FLOWS
)
};
}
#[macro_export]
macro_rules! FLOW_SELECTOR_BY_INSTANCE {
($prefix:expr, $iid:expr) => {
format!(
"{}/{}/*/{}/*/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$crate::runtime::resources::KEY_FLOWS,
$iid
)
};
}
#[macro_export]
macro_rules! FLOW_SELECTOR_BY_FLOW {
($prefix:expr, $fid:expr) => {
format!(
"{}/{}/*/{}/{}/*",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$crate::runtime::resources::KEY_FLOWS,
$fid
)
};
}
#[macro_export]
macro_rules! REG_GRAPH_SELECTOR {
($prefix:expr, $fid:expr) => {
format!(
"{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_REGISTRY,
$crate::runtime::resources::KEY_GRAPHS,
$fid
)
};
}
#[macro_export]
macro_rules! JQ_SUMBITTED_SEL {
($prefix:expr, $rid:expr) => {
format!(
"{}/{}/{}/{}/{}/*",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rid,
$crate::runtime::resources::KEY_JOB_QUEUE,
$crate::runtime::resources::KEY_JOB_SUBMITTED
)
};
}
#[macro_export]
macro_rules! JQ_SUMBITTED_JOB {
($prefix:expr, $rid:expr, $jid: expr) => {
format!(
"{}/{}/{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rid,
$crate::runtime::resources::KEY_JOB_QUEUE,
$crate::runtime::resources::KEY_JOB_SUBMITTED,
$jid
)
};
}
#[macro_export]
macro_rules! JQ_STARTED_JOB {
($prefix:expr, $rid:expr, $jid: expr) => {
format!(
"{}/{}/{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rid,
$crate::runtime::resources::KEY_JOB_QUEUE,
$crate::runtime::resources::KEY_JOB_STARTED,
$jid
)
};
}
#[macro_export]
macro_rules! JQ_DONE_JOB {
($prefix:expr, $rid:expr, $jid: expr) => {
format!(
"{}/{}/{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rid,
$crate::runtime::resources::KEY_JOB_QUEUE,
$crate::runtime::resources::KEY_JOB_DONE,
$jid
)
};
}
#[macro_export]
macro_rules! JQ_FAILED_JOB {
($prefix:expr, $rid:expr, $jid: expr) => {
format!(
"{}/{}/{}/{}/{}/{}",
$prefix,
$crate::runtime::resources::KEY_RUNTIMES,
$rid,
$crate::runtime::resources::KEY_JOB_QUEUE,
$crate::runtime::resources::KEY_JOB_FAILED,
$jid
)
};
}
pub fn deserialize_data<T>(raw_data: &[u8]) -> Result<T>
where
T: DeserializeOwned,
{
#[cfg(feature = "data_bincode")]
return Ok(bincode::deserialize::<T>(&raw_data)?);
#[cfg(feature = "data_cbor")]
return Ok(serde_cbor::from_slice::<T>(&raw_data)?);
#[cfg(feature = "data_json")]
return Ok(serde_json::from_str::<T>(std::str::from_utf8(raw_data)?)?);
}
#[cfg(feature = "data_bincode")]
pub fn serialize_data<T: ?Sized>(data: &T) -> FResult<Vec<u8>>
where
T: Serialize,
{
Ok(bincode::serialize(data)?)
}
#[cfg(feature = "data_json")]
pub fn serialize_data<T: ?Sized>(data: &T) -> Result<Vec<u8>>
where
T: Serialize,
{
Ok(serde_json::to_string(data)?.into_bytes())
}
#[cfg(feature = "data_cbor")]
pub fn serialize_data<T>(data: &T) -> FResult<Vec<u8>>
where
T: Serialize,
{
Ok(serde_cbor::to_vec(data)?)
}
pub fn convert<T>(sample: Sample) -> Result<T>
where
T: DeserializeOwned,
{
match sample.kind {
SampleKind::Put => match sample.value.encoding {
Encoding::APP_OCTET_STREAM => {
match deserialize_data::<T>(&sample.value.payload.contiguous()) {
Ok(data) => Ok(data),
Err(e) => Err(e),
}
}
_ => {
log::warn!(
"Received sample with wrong encoding {:?}, dropping",
sample.value.encoding
);
Err(zferror!(
ErrorKind::DeserializationError,
"Received sample with wrong encoding {:?}, dropping",
sample.value.encoding
)
.into())
}
},
SampleKind::Delete => {
log::warn!("Received delete sample drop it");
Err(zferror!(
ErrorKind::DeserializationError,
"Received delete sample dropping it"
)
.into())
}
}
}
#[derive(Clone)]
pub struct DataStore {
z: Arc<zenoh::Session>,
}
impl DataStore {
pub fn new(z: Arc<zenoh::Session>) -> Self {
Self { z }
}
pub async fn get_runtime_info(&self, rtid: &ZenohId) -> Result<RuntimeInfo> {
let selector = RT_INFO_PATH!(ROOT_STANDALONE, rtid);
self.get_from_zenoh::<RuntimeInfo>(&selector).await
}
pub async fn get_all_runtime_info(&self) -> Result<Vec<RuntimeInfo>> {
let selector = RT_INFO_PATH!(ROOT_STANDALONE, "*");
self.get_vec_from_zenoh::<RuntimeInfo>(&selector).await
}
pub async fn get_runtime_info_by_name(&self, rtid: &str) -> Result<RuntimeInfo> {
let selector = RT_INFO_PATH!(ROOT_STANDALONE, "*");
let rts = self.get_vec_from_zenoh::<RuntimeInfo>(&selector).await?;
for rt in &rts {
if *rt.name == *rtid {
return Ok(rt.clone());
}
}
bail!(ErrorKind::NotFound)
}
pub async fn remove_runtime_info(&self, rtid: &ZenohId) -> Result<()> {
let path = RT_INFO_PATH!(ROOT_STANDALONE, rtid);
self.z.delete(&path).res().await
}
pub async fn add_runtime_info(&self, rtid: &ZenohId, rt_info: &RuntimeInfo) -> Result<()> {
let path = RT_INFO_PATH!(ROOT_STANDALONE, rtid);
let encoded_info = serialize_data(rt_info)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn get_runtime_config(&self, rtid: &ZenohId) -> Result<RuntimeConfig> {
let selector = RT_CONFIGURATION_PATH!(ROOT_STANDALONE, rtid);
self.get_from_zenoh::<RuntimeConfig>(&selector).await
}
pub async fn subscribe_runtime_config(
&self,
rtid: &ZenohId,
) -> Result<zenoh::subscriber::Subscriber<'static, flume::Receiver<Sample>>> {
bail!(ErrorKind::Unimplemented)
}
pub async fn remove_runtime_config(&self, rtid: &ZenohId) -> Result<()> {
let path = RT_CONFIGURATION_PATH!(ROOT_STANDALONE, rtid);
self.z.delete(&path).res().await
}
pub async fn add_runtime_config(&self, rtid: &ZenohId, rt_info: &RuntimeConfig) -> Result<()> {
let path = RT_CONFIGURATION_PATH!(ROOT_STANDALONE, rtid);
let encoded_info = serialize_data(rt_info)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn get_runtime_status(&self, rtid: &ZenohId) -> Result<RuntimeStatus> {
let selector = RT_STATUS_PATH!(ROOT_STANDALONE, rtid);
self.get_from_zenoh::<RuntimeStatus>(&selector).await
}
pub async fn remove_runtime_status(&self, rtid: &ZenohId) -> Result<()> {
let path = RT_STATUS_PATH!(ROOT_STANDALONE, rtid);
self.z.delete(&path).res().await
}
pub async fn add_runtime_status(&self, rtid: &ZenohId, rt_info: &RuntimeStatus) -> Result<()> {
let path = RT_STATUS_PATH!(ROOT_STANDALONE, rtid);
let encoded_info = serialize_data(rt_info)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn get_runtime_flow_by_instance(
&self,
rtid: &ZenohId,
iid: &Uuid,
) -> Result<DataFlowRecord> {
let selector = RT_FLOW_SELECTOR_BY_INSTANCE!(ROOT_STANDALONE, rtid, iid);
self.get_from_zenoh::<DataFlowRecord>(&selector).await
}
pub async fn get_flow_by_instance(&self, iid: &Uuid) -> Result<DataFlowRecord> {
let selector = RT_FLOW_SELECTOR_BY_INSTANCE!(ROOT_STANDALONE, "*", iid);
self.get_from_zenoh::<DataFlowRecord>(&selector).await
}
pub async fn get_runtime_flow_instances(
&self,
rtid: &ZenohId,
fid: &str,
) -> Result<Vec<DataFlowRecord>> {
let selector = RT_FLOW_SELECTOR_BY_FLOW!(ROOT_STANDALONE, rtid, fid);
self.get_vec_from_zenoh::<DataFlowRecord>(&selector).await
}
pub async fn get_flow_instances(&self, fid: &str) -> Result<Vec<DataFlowRecord>> {
let selector = FLOW_SELECTOR_BY_FLOW!(ROOT_STANDALONE, fid);
self.get_vec_from_zenoh::<DataFlowRecord>(&selector).await
}
pub async fn get_all_instances(&self) -> Result<Vec<DataFlowRecord>> {
let selector = FLOW_SELECTOR_BY_FLOW!(ROOT_STANDALONE, "*");
self.get_vec_from_zenoh::<DataFlowRecord>(&selector).await
}
pub async fn get_flow_instance_runtimes(&self, iid: &Uuid) -> Result<Vec<ZenohId>> {
let selector = RT_FLOW_SELECTOR_BY_INSTANCE!(ROOT_STANDALONE, "*", iid);
let mut ds = self.z.get(&selector).res().await?;
let mut runtimes = Vec::new();
for kv in ds.into_iter() {
if let Ok(sample) = &kv.sample {
let id = sample
.key_expr
.as_str()
.split('/')
.nth(2) .ok_or_else(|| {
log::error!(
"Could not extract the instance id from key expression: {}",
sample.key_expr.as_str()
);
zferror!(ErrorKind::DeserializationError)
})?;
runtimes.push(id.parse::<ZenohId>()?);
}
}
Ok(runtimes)
}
pub async fn remove_runtime_flow_instance(
&self,
rtid: &ZenohId,
fid: &str,
iid: &Uuid,
) -> Result<()> {
let path = RT_FLOW_PATH!(ROOT_STANDALONE, rtid, fid, iid);
self.z.delete(&path).res().await
}
pub async fn add_runtime_flow(
&self,
rtid: &ZenohId,
flow_instance: &DataFlowRecord,
) -> Result<()> {
let path = RT_FLOW_PATH!(
ROOT_STANDALONE,
rtid,
flow_instance.flow,
flow_instance.uuid
);
let encoded_info = serialize_data(flow_instance)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn add_graph(&self, graph: &RegistryNode) -> Result<()> {
let path = REG_GRAPH_SELECTOR!(ROOT_STANDALONE, &graph.id);
let encoded_info = serialize_data(graph)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn get_graph(&self, graph_id: &str) -> Result<RegistryNode> {
let selector = REG_GRAPH_SELECTOR!(ROOT_STANDALONE, graph_id);
self.get_from_zenoh::<RegistryNode>(&selector).await
}
pub async fn get_all_graphs(&self) -> Result<Vec<RegistryNode>> {
let selector = REG_GRAPH_SELECTOR!(ROOT_STANDALONE, "*");
self.get_vec_from_zenoh::<RegistryNode>(&selector).await
}
pub async fn delete_graph(&self, graph_id: &str) -> Result<()> {
let path = REG_GRAPH_SELECTOR!(ROOT_STANDALONE, &graph_id);
self.z.delete(&path).res().await
}
pub async fn subscribe_sumbitted_jobs(
&self,
rtid: &ZenohId,
) -> Result<zenoh::subscriber::Subscriber<'static, flume::Receiver<Sample>>> {
let selector = JQ_SUMBITTED_SEL!(ROOT_STANDALONE, rtid);
self.z.declare_subscriber(&selector).res().await
}
pub async fn add_submitted_job(&self, rtid: &ZenohId, job: &Job) -> Result<()> {
let path = JQ_SUMBITTED_JOB!(ROOT_STANDALONE, rtid, &job.id);
let encoded_info = serialize_data(job)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn del_submitted_job(&self, rtid: &ZenohId, id: &Uuid) -> Result<()> {
let path = JQ_SUMBITTED_JOB!(ROOT_STANDALONE, rtid, id);
self.z.delete(&path).res().await
}
pub async fn add_started_job(&self, rtid: &ZenohId, job: &Job) -> Result<()> {
let path = JQ_STARTED_JOB!(ROOT_STANDALONE, rtid, &job.id);
let encoded_info = serialize_data(job)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn add_done_job(&self, rtid: &ZenohId, job: &Job) -> Result<()> {
let path = JQ_DONE_JOB!(ROOT_STANDALONE, rtid, &job.id);
let encoded_info = serialize_data(job)?;
self.z.put(&path, encoded_info).res().await
}
pub async fn add_failed_job(&self, rtid: &ZenohId, job: &Job) -> Result<()> {
let path = JQ_FAILED_JOB!(ROOT_STANDALONE, rtid, &job.id);
let encoded_info = serialize_data(job)?;
self.z.put(&path, encoded_info).res().await
}
async fn get_from_zenoh<T>(&self, path: &str) -> Result<T>
where
T: DeserializeOwned,
{
let mut ds = self.z.get(path).res().await?;
let data = ds.into_iter().collect::<Vec<Reply>>();
match data.len() {
0 => Err(zferror!(ErrorKind::Empty).into()),
_ => {
let kv = &data[0];
match &kv.sample {
Ok(sample) => match &sample.value.encoding {
&Encoding::APP_OCTET_STREAM => {
let ni = deserialize_data::<T>(&sample.value.payload.contiguous())?;
Ok(ni)
}
_ => Err(zferror!(ErrorKind::DeserializationError).into()),
},
_ => Err(zferror!(ErrorKind::DeserializationError).into()),
}
}
}
}
async fn get_vec_from_zenoh<T>(&self, selector: &str) -> Result<Vec<T>>
where
T: DeserializeOwned,
{
let mut ds = self.z.get(selector).res().await?;
let mut zf_data: Vec<T> = Vec::new();
for kv in ds.into_iter() {
match &kv.sample {
Ok(sample) => match &sample.value.encoding {
&Encoding::APP_OCTET_STREAM => {
let ni = deserialize_data::<T>(&sample.value.payload.contiguous())?;
zf_data.push(ni);
}
_ => return Err(zferror!(ErrorKind::DeserializationError).into()),
},
_ => return Err(zferror!(ErrorKind::DeserializationError).into()),
}
}
Ok(zf_data)
}
}