nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
use crate::proto::nodo as proto;
use eyre::{bail, eyre, Result, WrapErr};
use lz4_flex::{compress_prepend_size, decompress_size_prepended};
use nng::{Protocol, Socket};
use nodo::prelude::{
    ParameterId, ParameterProperties, ParameterSet, ParameterValue, ParameterWithPropertiesSet,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub enum ConfigureRequest {
    Configure(ParameterSet<String, String>),
    List,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ConfigureReply {
    Failure(String),
    Success,
    List(ParameterWithPropertiesSet<String, String>),
}

/// The server is running in the nodo runtime and serves configure requests
pub struct ConfigureServer {
    socket: Socket,

    /// Fallback reply buffer used when error reply generation fails
    fallback_failure_reply: Vec<u8>,
}

impl ConfigureServer {
    pub fn open(address: &str) -> Result<Self> {
        let fallback_failure_reply = encode(&proto::ConfigureReply {
            kind: Some(proto::configure_reply::Kind::Failure(
                proto::ConfigureReplyFailure {
                    message: "unknown error".into(),
                },
            )),
        })
        .context("failed to create fallback failure reply")?;

        log::info!("Opening Configure REP socket at '{}'..", address);

        let socket = Socket::new(Protocol::Rep0)?;

        socket.pipe_notify(move |_, ev| {
            log::trace!("pipe_notify: {ev:?}");
        })?;

        socket.listen(address)?;

        Ok(Self {
            socket,
            fallback_failure_reply,
        })
    }

    /// Handle a request from a client. We always need to send a reply even if errors happen. We
    /// also need to gracefully handle if clients suddenly die.
    pub fn handle_requests<F>(&self, on_request_f: F) -> Result<()>
    where
        F: FnMut(ConfigureRequest) -> ConfigureReply,
    {
        match self.socket.try_recv() {
            Ok(request_buffer) => {
                let reply_buffer = match Self::handle_request_impl(&request_buffer, on_request_f) {
                    Ok(buff) => buff,
                    Err(err) => {
                        log::error!("failed to handle configure request: {err:?}");

                        // Formulate an error message and if that fails (doh!) use the fallback
                        let err_reply_buff = encode(&proto::ConfigureReply {
                            kind: Some(proto::configure_reply::Kind::Failure(
                                proto::ConfigureReplyFailure {
                                    message: format!("{err:?}"),
                                },
                            )),
                        })
                        .unwrap_or_else(|_| self.fallback_failure_reply.clone());

                        err_reply_buff
                    }
                };

                match self.socket.try_send(&reply_buffer) {
                    Err(err) => {
                        log::error!("failed to send configure reply: {err:?}");
                    }
                    Ok(()) => {}
                }

                Ok(())
            }
            Err(nng::Error::TryAgain) => Ok(()),
            Err(err) => Err(err)?,
        }
    }

    /// Deserialize proto, convert to Rust type, generate reply, convert reply to proto and
    /// serialize.
    fn handle_request_impl<F>(request_buffer: &[u8], mut on_request_f: F) -> eyre::Result<Vec<u8>>
    where
        F: FnMut(ConfigureRequest) -> ConfigureReply,
    {
        let request_proto = decode(&request_buffer)?;

        let request = request_from_proto(request_proto)?;

        let reply = on_request_f(request);

        let reply_proto = reply_to_proto(&reply);

        let reply_buffer = encode(&reply_proto)?;

        Ok(reply_buffer)
    }
}

/// The client is running in the configure client which would like to configure the nodo app
pub struct ConfigureClient {
    socket: Socket,
}

impl ConfigureClient {
    pub fn dial(address: &str) -> Result<Self> {
        log::info!("Opening Configure REQ socket at '{}'..", address);

        let socket = Socket::new(Protocol::Req0)?;

        socket.pipe_notify(move |_, ev| {
            log::trace!("pipe_notify: {ev:?}");
        })?;

        socket.dial_async(address)?;

        Ok(Self { socket })
    }

    pub fn send_request(&self, request: &ConfigureRequest) -> Result<ConfigureReply> {
        let request_proto = request_to_proto(request);

        let request_buffer = encode(&request_proto)?;

        self.socket
            .try_send(&request_buffer)
            .map_err(|(_, err)| err)?;

        let reply_buffer = self.socket.recv()?;

        let reply_proto = decode(&reply_buffer)?;

        let reply = reply_from_proto(reply_proto)?;

        Ok(reply)
    }
}

fn request_to_proto(request: &ConfigureRequest) -> proto::ConfigureRequest {
    let kind = match request {
        ConfigureRequest::Configure(params) => {
            proto::configure_request::Kind::Update(proto::ConfigureRequestUpdate {
                params: parameters_to_proto(params),
            })
        }
        ConfigureRequest::List => {
            proto::configure_request::Kind::List(proto::ConfigureRequestList {})
        }
    };
    proto::ConfigureRequest { kind: Some(kind) }
}

fn reply_to_proto(reply: &ConfigureReply) -> proto::ConfigureReply {
    let kind = match reply {
        ConfigureReply::Failure(message) => {
            proto::configure_reply::Kind::Failure(proto::ConfigureReplyFailure {
                message: message.clone(),
            })
        }
        ConfigureReply::Success => {
            proto::configure_reply::Kind::Success(proto::ConfigureReplySuccess {})
        }
        ConfigureReply::List(params) => {
            proto::configure_reply::Kind::List(proto::ConfigureReplyList {
                params: parameters_with_properties_to_proto(params),
            })
        }
    };
    proto::ConfigureReply { kind: Some(kind) }
}

fn request_from_proto(request: proto::ConfigureRequest) -> eyre::Result<ConfigureRequest> {
    match request.kind.ok_or_else(|| eyre::eyre!("missing kind"))? {
        proto::configure_request::Kind::Update(update) => Ok(ConfigureRequest::Configure(
            parameters_from_proto(update.params)?,
        )),
        proto::configure_request::Kind::List(_) => Ok(ConfigureRequest::List),
    }
}

fn reply_from_proto(reply: proto::ConfigureReply) -> eyre::Result<ConfigureReply> {
    match reply.kind.ok_or_else(|| eyre::eyre!("missing kind"))? {
        proto::configure_reply::Kind::Failure(failure) => {
            Ok(ConfigureReply::Failure(failure.message))
        }
        proto::configure_reply::Kind::Success(_) => Ok(ConfigureReply::Success),
        proto::configure_reply::Kind::List(list) => Ok(ConfigureReply::List(
            parameters_with_properties_from_proto(list.params)?,
        )),
    }
}

fn parameters_to_proto(params: &ParameterSet<String, String>) -> Vec<proto::Parameter> {
    params
        .iter()
        .map(|(ParameterId(node, param), value)| proto::Parameter {
            node: node.clone(),
            param: param.clone(),
            value: Some(parameter_value_to_proto(value.clone())),
        })
        .collect()
}

fn parameters_with_properties_to_proto(
    params: &ParameterWithPropertiesSet<String, String>,
) -> Vec<proto::ParameterWithProperties> {
    params
        .iter()
        .map(
            |(ParameterId(node, param), (props, value))| proto::ParameterWithProperties {
                node: node.clone(),
                param: param.clone(),
                value: Some(parameter_value_to_proto(value.clone())),
                is_mutable: props.is_mutable,
            },
        )
        .collect()
}

fn parameters_from_proto(
    params: Vec<proto::Parameter>,
) -> eyre::Result<ParameterSet<String, String>> {
    let mut result = ParameterSet::new();
    for entry in params {
        let value = parameter_value_from_proto(
            entry
                .value
                .ok_or_else(|| eyre!("ParameterValue must not be None"))?,
        )?;
        result.insert(ParameterId(entry.node, entry.param), value);
    }
    Ok(result)
}

fn parameters_with_properties_from_proto(
    params: Vec<proto::ParameterWithProperties>,
) -> eyre::Result<ParameterWithPropertiesSet<String, String>> {
    let mut result = ParameterWithPropertiesSet::new();
    for entry in params {
        let value = parameter_value_from_proto(
            entry
                .value
                .ok_or_else(|| eyre!("ParameterValue must not be None"))?,
        )?;
        result.insert(
            ParameterId(entry.node, entry.param),
            (
                ParameterProperties {
                    dtype: value.dtype(),
                    is_mutable: entry.is_mutable,
                },
                value,
            ),
        );
    }
    Ok(result)
}

fn parameter_value_to_proto(value: ParameterValue) -> proto::ParameterValue {
    proto::ParameterValue {
        kind: Some(match value {
            ParameterValue::Bool(v) => proto::parameter_value::Kind::Bool(v),
            ParameterValue::Int64(v) => proto::parameter_value::Kind::Int64(v),
            ParameterValue::Usize(v) => proto::parameter_value::Kind::Usize(v as u64),
            ParameterValue::Float64(v) => proto::parameter_value::Kind::Float64(v),
            ParameterValue::String(v) => proto::parameter_value::Kind::String(v),
            ParameterValue::VecFloat64(v) => {
                proto::parameter_value::Kind::VecFloat64(proto::VecFloat64 { entries: v })
            }
        }),
    }
}

fn parameter_value_from_proto(value: proto::ParameterValue) -> eyre::Result<ParameterValue> {
    match value.kind {
        Some(proto::parameter_value::Kind::Bool(v)) => Ok(ParameterValue::Bool(v)),
        Some(proto::parameter_value::Kind::Int64(v)) => Ok(ParameterValue::Int64(v)),
        Some(proto::parameter_value::Kind::Usize(v)) => Ok(ParameterValue::Usize(v as usize)),
        Some(proto::parameter_value::Kind::Float64(v)) => Ok(ParameterValue::Float64(v)),
        Some(proto::parameter_value::Kind::String(v)) => Ok(ParameterValue::String(v)),
        Some(proto::parameter_value::Kind::VecFloat64(v)) => {
            Ok(ParameterValue::VecFloat64(v.entries))
        }
        None => bail!("ParameterValue kind must not be None"),
    }
}

fn encode<T: prost::Message>(proto: &T) -> Result<Vec<u8>> {
    let mut buffer = Vec::new();
    proto.encode(&mut buffer)?;
    Ok(compress_prepend_size(&buffer))
}

fn decode<T: prost::Message + Default>(buffer: &[u8]) -> eyre::Result<T> {
    let uncompressed = decompress_size_prepended(&buffer)?;
    Ok(<T as prost::Message>::decode(uncompressed.as_slice())?)
}