use crate::proto::nodo as proto;
use eyre::{bail, eyre, Result, WrapErr};
use lz4_flex::{compress_prepend_size, decompress_size_prepended};
use nng::{Protocol, Socket};
use nodo::prelude::{
ParameterId, ParameterProperties, ParameterSet, ParameterValue, ParameterWithPropertiesSet,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum ConfigureRequest {
Configure(ParameterSet<String, String>),
List,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ConfigureReply {
Failure(String),
Success,
List(ParameterWithPropertiesSet<String, String>),
}
pub struct ConfigureServer {
socket: Socket,
fallback_failure_reply: Vec<u8>,
}
impl ConfigureServer {
pub fn open(address: &str) -> Result<Self> {
let fallback_failure_reply = encode(&proto::ConfigureReply {
kind: Some(proto::configure_reply::Kind::Failure(
proto::ConfigureReplyFailure {
message: "unknown error".into(),
},
)),
})
.context("failed to create fallback failure reply")?;
log::info!("Opening Configure REP socket at '{}'..", address);
let socket = Socket::new(Protocol::Rep0)?;
socket.pipe_notify(move |_, ev| {
log::trace!("pipe_notify: {ev:?}");
})?;
socket.listen(address)?;
Ok(Self {
socket,
fallback_failure_reply,
})
}
pub fn handle_requests<F>(&self, on_request_f: F) -> Result<()>
where
F: FnMut(ConfigureRequest) -> ConfigureReply,
{
match self.socket.try_recv() {
Ok(request_buffer) => {
let reply_buffer = match Self::handle_request_impl(&request_buffer, on_request_f) {
Ok(buff) => buff,
Err(err) => {
log::error!("failed to handle configure request: {err:?}");
let err_reply_buff = encode(&proto::ConfigureReply {
kind: Some(proto::configure_reply::Kind::Failure(
proto::ConfigureReplyFailure {
message: format!("{err:?}"),
},
)),
})
.unwrap_or_else(|_| self.fallback_failure_reply.clone());
err_reply_buff
}
};
match self.socket.try_send(&reply_buffer) {
Err(err) => {
log::error!("failed to send configure reply: {err:?}");
}
Ok(()) => {}
}
Ok(())
}
Err(nng::Error::TryAgain) => Ok(()),
Err(err) => Err(err)?,
}
}
fn handle_request_impl<F>(request_buffer: &[u8], mut on_request_f: F) -> eyre::Result<Vec<u8>>
where
F: FnMut(ConfigureRequest) -> ConfigureReply,
{
let request_proto = decode(&request_buffer)?;
let request = request_from_proto(request_proto)?;
let reply = on_request_f(request);
let reply_proto = reply_to_proto(&reply);
let reply_buffer = encode(&reply_proto)?;
Ok(reply_buffer)
}
}
pub struct ConfigureClient {
socket: Socket,
}
impl ConfigureClient {
pub fn dial(address: &str) -> Result<Self> {
log::info!("Opening Configure REQ socket at '{}'..", address);
let socket = Socket::new(Protocol::Req0)?;
socket.pipe_notify(move |_, ev| {
log::trace!("pipe_notify: {ev:?}");
})?;
socket.dial_async(address)?;
Ok(Self { socket })
}
pub fn send_request(&self, request: &ConfigureRequest) -> Result<ConfigureReply> {
let request_proto = request_to_proto(request);
let request_buffer = encode(&request_proto)?;
self.socket
.try_send(&request_buffer)
.map_err(|(_, err)| err)?;
let reply_buffer = self.socket.recv()?;
let reply_proto = decode(&reply_buffer)?;
let reply = reply_from_proto(reply_proto)?;
Ok(reply)
}
}
fn request_to_proto(request: &ConfigureRequest) -> proto::ConfigureRequest {
let kind = match request {
ConfigureRequest::Configure(params) => {
proto::configure_request::Kind::Update(proto::ConfigureRequestUpdate {
params: parameters_to_proto(params),
})
}
ConfigureRequest::List => {
proto::configure_request::Kind::List(proto::ConfigureRequestList {})
}
};
proto::ConfigureRequest { kind: Some(kind) }
}
fn reply_to_proto(reply: &ConfigureReply) -> proto::ConfigureReply {
let kind = match reply {
ConfigureReply::Failure(message) => {
proto::configure_reply::Kind::Failure(proto::ConfigureReplyFailure {
message: message.clone(),
})
}
ConfigureReply::Success => {
proto::configure_reply::Kind::Success(proto::ConfigureReplySuccess {})
}
ConfigureReply::List(params) => {
proto::configure_reply::Kind::List(proto::ConfigureReplyList {
params: parameters_with_properties_to_proto(params),
})
}
};
proto::ConfigureReply { kind: Some(kind) }
}
fn request_from_proto(request: proto::ConfigureRequest) -> eyre::Result<ConfigureRequest> {
match request.kind.ok_or_else(|| eyre::eyre!("missing kind"))? {
proto::configure_request::Kind::Update(update) => Ok(ConfigureRequest::Configure(
parameters_from_proto(update.params)?,
)),
proto::configure_request::Kind::List(_) => Ok(ConfigureRequest::List),
}
}
fn reply_from_proto(reply: proto::ConfigureReply) -> eyre::Result<ConfigureReply> {
match reply.kind.ok_or_else(|| eyre::eyre!("missing kind"))? {
proto::configure_reply::Kind::Failure(failure) => {
Ok(ConfigureReply::Failure(failure.message))
}
proto::configure_reply::Kind::Success(_) => Ok(ConfigureReply::Success),
proto::configure_reply::Kind::List(list) => Ok(ConfigureReply::List(
parameters_with_properties_from_proto(list.params)?,
)),
}
}
fn parameters_to_proto(params: &ParameterSet<String, String>) -> Vec<proto::Parameter> {
params
.iter()
.map(|(ParameterId(node, param), value)| proto::Parameter {
node: node.clone(),
param: param.clone(),
value: Some(parameter_value_to_proto(value.clone())),
})
.collect()
}
fn parameters_with_properties_to_proto(
params: &ParameterWithPropertiesSet<String, String>,
) -> Vec<proto::ParameterWithProperties> {
params
.iter()
.map(
|(ParameterId(node, param), (props, value))| proto::ParameterWithProperties {
node: node.clone(),
param: param.clone(),
value: Some(parameter_value_to_proto(value.clone())),
is_mutable: props.is_mutable,
},
)
.collect()
}
fn parameters_from_proto(
params: Vec<proto::Parameter>,
) -> eyre::Result<ParameterSet<String, String>> {
let mut result = ParameterSet::new();
for entry in params {
let value = parameter_value_from_proto(
entry
.value
.ok_or_else(|| eyre!("ParameterValue must not be None"))?,
)?;
result.insert(ParameterId(entry.node, entry.param), value);
}
Ok(result)
}
fn parameters_with_properties_from_proto(
params: Vec<proto::ParameterWithProperties>,
) -> eyre::Result<ParameterWithPropertiesSet<String, String>> {
let mut result = ParameterWithPropertiesSet::new();
for entry in params {
let value = parameter_value_from_proto(
entry
.value
.ok_or_else(|| eyre!("ParameterValue must not be None"))?,
)?;
result.insert(
ParameterId(entry.node, entry.param),
(
ParameterProperties {
dtype: value.dtype(),
is_mutable: entry.is_mutable,
},
value,
),
);
}
Ok(result)
}
fn parameter_value_to_proto(value: ParameterValue) -> proto::ParameterValue {
proto::ParameterValue {
kind: Some(match value {
ParameterValue::Bool(v) => proto::parameter_value::Kind::Bool(v),
ParameterValue::Int64(v) => proto::parameter_value::Kind::Int64(v),
ParameterValue::Usize(v) => proto::parameter_value::Kind::Usize(v as u64),
ParameterValue::Float64(v) => proto::parameter_value::Kind::Float64(v),
ParameterValue::String(v) => proto::parameter_value::Kind::String(v),
ParameterValue::VecFloat64(v) => {
proto::parameter_value::Kind::VecFloat64(proto::VecFloat64 { entries: v })
}
}),
}
}
fn parameter_value_from_proto(value: proto::ParameterValue) -> eyre::Result<ParameterValue> {
match value.kind {
Some(proto::parameter_value::Kind::Bool(v)) => Ok(ParameterValue::Bool(v)),
Some(proto::parameter_value::Kind::Int64(v)) => Ok(ParameterValue::Int64(v)),
Some(proto::parameter_value::Kind::Usize(v)) => Ok(ParameterValue::Usize(v as usize)),
Some(proto::parameter_value::Kind::Float64(v)) => Ok(ParameterValue::Float64(v)),
Some(proto::parameter_value::Kind::String(v)) => Ok(ParameterValue::String(v)),
Some(proto::parameter_value::Kind::VecFloat64(v)) => {
Ok(ParameterValue::VecFloat64(v.entries))
}
None => bail!("ParameterValue kind must not be None"),
}
}
fn encode<T: prost::Message>(proto: &T) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
proto.encode(&mut buffer)?;
Ok(compress_prepend_size(&buffer))
}
fn decode<T: prost::Message + Default>(buffer: &[u8]) -> eyre::Result<T> {
let uncompressed = decompress_size_prepended(&buffer)?;
Ok(<T as prost::Message>::decode(uncompressed.as_slice())?)
}