#![allow(clippy::manual_async_fn)]
use std::collections::HashMap;
use std::convert::TryFrom;
use crate::model::descriptor::{
FlattenDataFlowDescriptor, OperatorDescriptor, SinkDescriptor, SourceDescriptor,
};
use crate::model::record::DataFlowRecord;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
use zenoh::prelude::ZenohId;
use self::dataflow::loader::LoaderConfig;
use crate::runtime::dataflow::loader::Loader;
use crate::types::{ControlMessage, FlowId, RuntimeId};
use crate::zferror;
use crate::zfresult::ErrorKind;
use crate::{DaemonResult, Result as ZFResult};
use uhlc::{Timestamp, HLC};
use zenoh::Session;
use zrpc::zrpcresult::{ZRPCError, ZRPCResult};
use zrpc_macros::zservice;
pub mod dataflow;
pub mod resources;
pub mod worker_pool;
#[derive(Clone)]
pub struct RuntimeContext {
pub session: Arc<Session>,
pub loader: Arc<Loader>,
pub hlc: Arc<HLC>,
pub runtime_name: RuntimeId,
pub runtime_uuid: ZenohId,
pub shared_memory_element_size: usize,
pub shared_memory_elements: usize,
pub shared_memory_backoff: u64,
pub use_shm: bool,
}
#[derive(Clone)]
pub struct InstanceContext {
pub flow_id: FlowId,
pub instance_id: Uuid,
pub runtime: RuntimeContext,
}
pub async fn map_to_infrastructure(
mut descriptor: FlattenDataFlowDescriptor,
runtime: &str,
) -> ZFResult<FlattenDataFlowDescriptor> {
log::debug!("[Dataflow mapping] Begin mapping for: {}", descriptor.flow);
let runtime_id: Arc<str> = runtime.into();
let mut mapping = descriptor.mapping.clone().map_or(HashMap::new(), |m| m);
for o in &descriptor.operators {
mapping
.entry(o.id.clone())
.or_insert_with(|| runtime_id.clone());
}
for o in &descriptor.sources {
mapping
.entry(o.id.clone())
.or_insert_with(|| runtime_id.clone());
}
for o in &descriptor.sinks {
mapping
.entry(o.id.clone())
.or_insert_with(|| runtime_id.clone());
}
log::trace!(
"[Dataflow mapping] Mapping for: {} is {:?}",
descriptor.flow,
mapping
);
descriptor.mapping = Some(mapping);
Ok(descriptor)
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
pub enum RuntimeStatusKind {
Ready,
NotReady,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RuntimeInfo {
pub id: ZenohId,
pub name: Arc<str>,
pub tags: Vec<String>,
pub status: RuntimeStatusKind,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RuntimeStatus {
pub id: ZenohId,
pub running_flows: usize,
pub running_operators: usize,
pub running_sources: usize,
pub running_sinks: usize,
pub running_connectors: usize,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
pub enum ZenohConfigKind {
Peer,
Client,
}
impl std::fmt::Display for ZenohConfigKind {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
ZenohConfigKind::Peer => write!(f, "peer"),
ZenohConfigKind::Client => write!(f, "client"),
}
}
}
impl TryFrom<zenoh::config::whatami::WhatAmI> for ZenohConfigKind {
type Error = crate::zfresult::Error;
fn try_from(value: zenoh::config::whatami::WhatAmI) -> Result<Self, Self::Error> {
match value {
zenoh::config::whatami::WhatAmI::Client => Ok(Self::Client),
zenoh::config::whatami::WhatAmI::Peer => Ok(Self::Peer),
_ => Err(zferror!(ErrorKind::MissingConfiguration).into()),
}
}
}
#[allow(clippy::from_over_into)]
impl Into<zenoh::config::whatami::WhatAmI> for ZenohConfigKind {
fn into(self) -> zenoh::config::whatami::WhatAmI {
match self {
Self::Peer => zenoh::config::whatami::WhatAmI::Peer,
Self::Client => zenoh::config::whatami::WhatAmI::Client,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RuntimeConfig {
pub pid_file: String, pub path: String, pub name: String,
pub uuid: ZenohId,
pub loader: LoaderConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobKind {
CreateInstance(FlattenDataFlowDescriptor, Uuid),
DeleteInstance(Uuid),
Instantiate(FlattenDataFlowDescriptor, Uuid),
Teardown(Uuid),
StartInstance(Uuid),
StopInstance(Uuid),
StartNode(Uuid, String),
StopNode(Uuid, String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobStatus {
Submitted(Timestamp),
Started(Timestamp),
Done(Timestamp),
Failed(Timestamp, String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Job {
id: Uuid,
job: JobKind,
status: JobStatus,
assignee: Option<usize>,
}
impl Job {
fn new_instantiate(
dfd: FlattenDataFlowDescriptor,
instance_id: Uuid,
id: Uuid,
ts: Timestamp,
) -> Self {
Self {
id,
job: JobKind::Instantiate(dfd, instance_id),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
fn new_create(
dfd: FlattenDataFlowDescriptor,
instance_id: Uuid,
id: Uuid,
ts: Timestamp,
) -> Self {
Self {
id,
job: JobKind::CreateInstance(dfd, instance_id),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
fn new_teardown(fid: Uuid, id: Uuid, ts: Timestamp) -> Self {
Self {
id,
job: JobKind::Teardown(fid),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
fn new_delete(fid: Uuid, id: Uuid, ts: Timestamp) -> Self {
Self {
id,
job: JobKind::DeleteInstance(fid),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
fn new_start(fid: Uuid, id: Uuid, ts: Timestamp) -> Self {
Self {
id,
job: JobKind::StartInstance(fid),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
fn new_stop(fid: Uuid, id: Uuid, ts: Timestamp) -> Self {
Self {
id,
job: JobKind::StopInstance(fid),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
fn new_start_node(fid: Uuid, node_id: String, id: Uuid, ts: Timestamp) -> Self {
Self {
id,
job: JobKind::StartNode(fid, node_id),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
fn new_stop_node(fid: Uuid, node_id: String, id: Uuid, ts: Timestamp) -> Self {
Self {
id,
job: JobKind::StopNode(fid, node_id),
status: JobStatus::Submitted(ts),
assignee: None,
}
}
pub fn get_id(&self) -> &Uuid {
&self.id
}
pub fn get_kind(&self) -> &JobKind {
&self.job
}
pub fn get_status(&self) -> &JobStatus {
&self.status
}
pub fn get_assigne(&self) -> &Option<usize> {
&self.assignee
}
pub fn set_status(&mut self, status: JobStatus) {
self.status = status;
}
pub fn assign(&mut self, assignee: usize) {
self.assignee.replace(assignee);
}
pub fn started(&mut self, assignee: usize, ts: Timestamp) {
self.assignee.replace(assignee);
self.status = JobStatus::Started(ts);
}
pub fn done(&mut self, ts: Timestamp) {
self.status = JobStatus::Done(ts);
}
pub fn failed(&mut self, ts: Timestamp, error_description: String) {
self.status = JobStatus::Failed(ts, error_description)
}
}
#[zservice(
timeout_s = 60,
prefix = "zf/daemon",
service_uuid = "11111111111111111111111111111111"
)]
pub trait DaemonInterface {
async fn create_instance(&self, flow: FlattenDataFlowDescriptor) -> DaemonResult<Uuid>;
async fn delete_instance(&self, instance_id: Uuid) -> DaemonResult<DataFlowRecord>;
async fn instantiate(&self, flow: FlattenDataFlowDescriptor) -> DaemonResult<Uuid>;
async fn teardown(&self, instance_id: Uuid) -> DaemonResult<DataFlowRecord>;
async fn start_instance(&self, instance_id: Uuid) -> DaemonResult<()>;
async fn stop_instance(&self, instance_id: Uuid) -> DaemonResult<DataFlowRecord>;
async fn start_node(&self, instance_id: Uuid, node: String) -> DaemonResult<()>;
async fn stop_node(&self, instance_id: Uuid, node: String) -> DaemonResult<()>;
}
#[zservice(
timeout_s = 600,
prefix = "zf/daemon",
service_uuid = "22222222222222222222222222222222"
)]
pub trait DaemonInterfaceInternal {
async fn prepare(&self, instance_id: Uuid) -> DaemonResult<DataFlowRecord>;
async fn clean(&self, instance_id: Uuid) -> DaemonResult<DataFlowRecord>;
async fn start(&self, instance_id: Uuid) -> DaemonResult<()>;
async fn start_sources(&self, instance_id: Uuid) -> DaemonResult<()>;
async fn stop(&self, instance_id: Uuid) -> DaemonResult<()>;
async fn stop_sources(&self, instance_id: Uuid) -> DaemonResult<()>;
async fn notify_runtime(
&self,
instance_id: Uuid,
runtime: String,
message: ControlMessage,
) -> DaemonResult<()>;
async fn check_operator_compatibility(
&self,
operator: OperatorDescriptor,
) -> DaemonResult<bool>;
async fn check_source_compatibility(&self, source: SourceDescriptor) -> DaemonResult<bool>;
async fn check_sink_compatibility(&self, sink: SinkDescriptor) -> DaemonResult<bool>;
}