use anyhow::anyhow;
use async_trait::async_trait;
use log::debug;
use std::marker::PhantomData;
use tokio::sync::{mpsc, oneshot};
use super::async_node_registry::AsyncNodeRegistryHandle;
#[derive(Copy, Clone, Debug, PartialEq)]
#[repr(u8)]
pub enum AsyncIBusOpMode {
Init = 0,
Config = 2,
PreOp = 16,
Run = 32,
}
pub fn is_valid_opmode_transition(
current_opmode: AsyncIBusOpMode,
requested_opmode: AsyncIBusOpMode,
) -> bool {
match (current_opmode, requested_opmode) {
(AsyncIBusOpMode::Init, AsyncIBusOpMode::Config) => true,
(AsyncIBusOpMode::Config, AsyncIBusOpMode::PreOp) => true,
(AsyncIBusOpMode::PreOp, AsyncIBusOpMode::Run) => true,
(_, AsyncIBusOpMode::Init) => true,
(current, requested) if current == requested => true,
_ => false,
}
}
pub enum IBusMessage<S, R> {
Request {
payload: S,
respond_to: oneshot::Sender<R>,
},
Broadcast { payload: R },
}
enum AsyncIBusNodeControlMessage {
Shutdown { respond_to: oneshot::Sender<bool> },
WriteOpMode {
opmode: AsyncIBusOpMode,
respond_to: oneshot::Sender<AsyncIBusOpMode>,
},
ReadOpMode {
respond_to: oneshot::Sender<AsyncIBusOpMode>,
},
}
#[async_trait]
pub trait AsyncIBusNode<S, R>: Send + Sync
where
S: Clone + Send + 'static,
R: Clone + Send + 'static,
{
fn node_id(&self) -> String;
async fn request_change_opmode(
&mut self,
opmode: AsyncIBusOpMode,
respond_to: oneshot::Sender<AsyncIBusOpMode>,
);
fn request_change_opmode_sync(&mut self, opmode : AsyncIBusOpMode) -> Result<(), anyhow::Error>;
async fn request_read_opmode(&self, respond_to: oneshot::Sender<AsyncIBusOpMode>);
fn request_read_opmode_sync(&self) -> AsyncIBusOpMode;
async fn request_received(&mut self, msg: S, respond_to: oneshot::Sender<R>);
async fn broadcast_received(&mut self, msg: R);
}
async fn run_ibus_node<S, R>(
mut node: Box<dyn AsyncIBusNode<S, R> + Send>,
mut ibus_rx: mpsc::Receiver<IBusMessage<S, R>>,
mut control_rx: mpsc::Receiver<AsyncIBusNodeControlMessage>,
) where
S: Clone + Send + Sync + 'static,
R: Clone + Send + Sync + 'static,
{
let fin: Option<oneshot::Sender<bool>>;
loop {
tokio::select! {
msg = ibus_rx.recv() => {
if let Some(m) = msg {
match m {
IBusMessage::Request{payload, respond_to} => {
node.request_received(payload, respond_to).await;
},
IBusMessage::Broadcast{payload} => {
node.broadcast_received(payload).await;
},
}
}
},
ctrl = control_rx.recv() => {
if let Some(m) = ctrl {
match m {
AsyncIBusNodeControlMessage::Shutdown {respond_to} => {
fin = Some(respond_to);
break;
},
AsyncIBusNodeControlMessage::WriteOpMode { opmode, respond_to } => {
node.request_change_opmode(opmode, respond_to).await;
},
AsyncIBusNodeControlMessage::ReadOpMode { respond_to } => {
node.request_read_opmode(respond_to).await;
}
}
}
}
}
}
debug!("AsyncIBusNode pipeline shutting down...");
if let Some(sender) = fin {
let _ = sender.send(true);
}
}
pub struct AsyncIBusNodeHandle<S, R>
where
S: Clone + Send + Sync + 'static,
R: Clone + Send + Sync + 'static,
{
control_tx: mpsc::Sender<AsyncIBusNodeControlMessage>,
registry_handle: AsyncNodeRegistryHandle<S, R>,
node_id: String,
_marker: PhantomData<(S, R)>,
}
impl<S, R> AsyncIBusNodeHandle<S, R>
where
S: Clone + Send + Sync + 'static,
R: Clone + Send + Sync + 'static,
{
pub fn new(
node: Box<dyn AsyncIBusNode<S, R> + Send>,
ibus_rx: mpsc::Receiver<IBusMessage<S, R>>,
registry_handle: AsyncNodeRegistryHandle<S, R>,
) -> Self {
let (ctrl_tx, ctrl_rx) = mpsc::channel(8);
let ret = Self {
control_tx: ctrl_tx,
registry_handle,
node_id: node.node_id(),
_marker: PhantomData,
};
tokio::spawn(run_ibus_node(node, ibus_rx, ctrl_rx));
return ret;
}
pub fn node_id(&self) -> &str {
return self.node_id.as_str();
}
pub async fn request(&self, msg: S) -> Result<R, anyhow::Error> {
let (send, recv) = oneshot::channel();
let ibus_msg = IBusMessage::Request {
payload: msg,
respond_to: send,
};
match self
.registry_handle
.send_message(&self.node_id, ibus_msg)
.await
{
Ok(_) => match recv.await {
Ok(res) => {
return Ok(res);
}
Err(err) => {
return Err(anyhow!("Error receiving response to message: {}", err));
}
},
Err(err) => {
return Err(anyhow!(
"Failed to send message to node {} with err: {}",
self.node_id,
err
));
}
}
}
pub async fn broadcast(&self, payload: R) -> Result<(), anyhow::Error> {
let ibus_msg = IBusMessage::Broadcast { payload: payload };
match self
.registry_handle
.send_message(&self.node_id, ibus_msg)
.await
{
Ok(_) => {
return Ok(());
}
Err(err) => {
return Err(anyhow!(
"Failed to send message to node {} with err: {}",
self.node_id,
err
));
}
}
}
pub async fn request_change_opmode(
&self,
opmode: AsyncIBusOpMode,
) -> Result<AsyncIBusOpMode, anyhow::Error> {
let (send, recv) = oneshot::channel();
let ctrl_msg = AsyncIBusNodeControlMessage::WriteOpMode {
opmode: opmode,
respond_to: send,
};
match self.control_tx.send(ctrl_msg).await {
Ok(_) => match recv.await {
Ok(res) => {
return Ok(res);
}
Err(err) => {
return Err(anyhow!("Error receiving response to message: {}", err));
}
},
Err(err) => {
return Err(anyhow!(
"Failed to send message to node {} with err: {}",
self.node_id,
err
));
}
}
}
pub async fn request_read_opmode(&self) -> Result<AsyncIBusOpMode, anyhow::Error> {
let (send, recv) = oneshot::channel();
let ctrl_msg = AsyncIBusNodeControlMessage::ReadOpMode { respond_to: send };
match self.control_tx.send(ctrl_msg).await {
Ok(_) => match recv.await {
Ok(res) => {
return Ok(res);
}
Err(err) => {
return Err(anyhow!("Error receiving response to message: {}", err));
}
},
Err(err) => {
return Err(anyhow!(
"Failed to send message to node {} with err: {}",
self.node_id,
err
));
}
}
}
pub async fn shutdown(&self) -> Result<(), anyhow::Error> {
let (send, recv) = oneshot::channel();
let ctrl_msg = AsyncIBusNodeControlMessage::Shutdown { respond_to: send };
match self.control_tx.send(ctrl_msg).await {
Ok(_) => match recv.await {
Ok(_) => {
return Ok(());
}
Err(err) => {
return Err(anyhow!("Error receiving response to message: {}", err));
}
},
Err(err) => {
return Err(anyhow!(
"Failed to send message to node {} with err: {}",
self.node_id,
err
));
}
}
}
}
pub trait AsyncIBusNodeFactory<S, R, CTX> {
fn create(
node_id: &str,
registry_handle: AsyncNodeRegistryHandle<S, R>,
configuration: &serde_json::Value,
context : CTX
) -> Result<AsyncIBusNodeHandle<S, R>, anyhow::Error>
where
Self: Sized + AsyncIBusNode<S, R> + 'static,
S: Clone + Send + Sync + 'static,
R: Clone + Send + Sync + 'static,
{
let (ibus_tx, ibus_rx) = mpsc::channel(64);
let handle;
match Self::init_node(node_id.to_string(), registry_handle.clone(), configuration, context) {
Ok(ret) => {
handle = AsyncIBusNodeHandle::<S, R>::new(ret, ibus_rx, registry_handle.clone());
}
Err(err) => {
return Err(anyhow!("Error initializing node {} : {}", node_id, err));
}
}
let res = tokio::task::block_in_place(|| {
let runtime = tokio::runtime::Handle::current();
return runtime.block_on(registry_handle.register_node(node_id, ibus_tx));
});
match res {
Ok(_) => return Ok(handle),
Err(err) => {
return Err(anyhow!(
"Failed to register new node with registry: {}",
err
));
}
}
}
fn init_node(
node_id: String,
registry_handle: AsyncNodeRegistryHandle<S, R>,
configuration: &serde_json::Value,
context : CTX
) -> Result<Box<Self>, anyhow::Error>;
}