use async_trait::async_trait;
use mechutil::{
async_bus::{
async_ibus_node::{
is_valid_opmode_transition, AsyncIBusNode, AsyncIBusNodeFactory, AsyncIBusOpMode, IBusMessage
},
async_node_registry::{self, AsyncNodeRegistryHandle},
},
subscription::filter::{DefaultSubscriptionFilter, SubscriptionFilter},
};
use anyhow::anyhow;
use simplelog::*;
use tokio::sync::oneshot;
use tokio::time::Duration;
#[derive(Clone, Debug, Default)]
pub struct MatchingFilter {
match_value: String,
}
impl MatchingFilter {
pub fn new(match_value: &str) -> Box<Self> {
Box::new(Self {
match_value: String::from(match_value),
})
}
}
impl SubscriptionFilter<CommandMessage> for MatchingFilter {
fn matches(&self, msg: &CommandMessage) -> bool {
log::debug!(
"MatchingFilter::matches: msg.data = {}, self.match_value = {}",
msg.data,
self.match_value
);
msg.data == self.match_value
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct CommandMessage {
node_id: String,
topic: String,
data: String,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct SubscriptionInformation {
target_node_id: String,
}
struct TestModule {
node_id: String,
registry_handle: AsyncNodeRegistryHandle<CommandMessage, CommandMessage>,
configuration: serde_json::Value,
opmode: AsyncIBusOpMode,
}
#[async_trait]
impl AsyncIBusNode<CommandMessage, CommandMessage> for TestModule {
async fn request_change_opmode(
&mut self,
opmode: AsyncIBusOpMode,
respond_to: oneshot::Sender<AsyncIBusOpMode>,
) {
log::info!("Node {} request_change_opmode {:?}", self.node_id, opmode);
if is_valid_opmode_transition(self.opmode, opmode) {
match opmode {
AsyncIBusOpMode::Init => {
log::info!("Node {} is now in Init mode", self.node_id);
}
AsyncIBusOpMode::Config => {
log::debug!(
"Node {} is Configuration: {}",
self.node_id,
self.configuration
);
if let Some(array) = self.configuration.as_array() {
for value in array {
if let Some(items) = value.as_object() {
for (k, v) in items {
let topic = v.as_str().unwrap();
log::debug!(
"Node {} requesting subscription to topic: {}.{}",
self.node_id,
k,
topic
);
if let Err(err) = self
.registry_handle
.subscribe(
k,
self.node_id.as_str(),
Box::new(DefaultSubscriptionFilter),
)
.await
{
log::error!("Failed to send subscription request: {}", err);
}
}
}
}
}
log::info!("Node {} is now in Config mode", self.node_id);
}
AsyncIBusOpMode::PreOp => {
log::info!("Node {} is now in PreOp mode", self.node_id);
}
AsyncIBusOpMode::Run => {
log::info!("Node {} is now in RUN mode", self.node_id);
}
}
}
let _ = respond_to.send(opmode);
}
fn request_change_opmode_sync(&mut self, opmode : AsyncIBusOpMode) -> Result<(), anyhow::Error> {
log::info!("Node {} request_change_opmode {:?}", self.node_id, opmode);
if is_valid_opmode_transition(self.opmode, opmode) {
match opmode {
AsyncIBusOpMode::Init => {
log::info!("Node {} is now in Init mode", self.node_id);
}
AsyncIBusOpMode::Config => {
log::debug!(
"Node {} is Configuration: {}",
self.node_id,
self.configuration
);
let node_id = self.node_id.clone();
let reg_handle = self.registry_handle.clone();
let arr;
if let Some(conf_arr) = self.configuration.as_array() {
arr = conf_arr.clone();
}
else {
arr = Vec::new();
}
let join_handle = tokio::spawn(async move {
for value in arr {
if let Some(items) = value.as_object() {
for (k, v) in items {
let topic = v.as_str().unwrap();
log::debug!(
"Node {} requesting subscription to topic: {}.{}",
node_id,
k,
topic
);
if let Err(err) =
reg_handle
.subscribe(
k,
node_id.as_str(),
Box::new(DefaultSubscriptionFilter),
)
.await
{
log::error!("Failed to send subscription request: {}", err);
}
}
}
}
});
match tokio::runtime::Runtime::new().unwrap().block_on(join_handle) {
Ok(_) => {
log::info!("Node {} is now in Config mode", self.node_id);
}
Err(err) => {
return Err(anyhow!("Task panicked: {}", err));
}
}
}
AsyncIBusOpMode::PreOp => {
log::info!("Node {} is now in PreOp mode", self.node_id);
}
AsyncIBusOpMode::Run => {
log::info!("Node {} is now in RUN mode", self.node_id);
}
}
}
if self.opmode == opmode {
return Ok(());
}
else {
return Err(anyhow!("Failed to change opmode!"));
}
}
async fn request_read_opmode(&self, respond_to: oneshot::Sender<AsyncIBusOpMode>) {
log::info!("Node {} request_read_opmode", self.node_id);
let _ = respond_to.send(self.opmode);
}
fn request_read_opmode_sync(&self) -> AsyncIBusOpMode {
return self.opmode;
}
fn node_id(&self) -> String {
self.node_id.clone()
}
async fn request_received(
&mut self,
msg: CommandMessage,
respond_to: oneshot::Sender<CommandMessage>,
) {
log::info!("Node {} request_received {:?}", self.node_id, msg);
let _ = respond_to.send(CommandMessage {
node_id: self.node_id.clone(),
topic: msg.topic,
data: format!("{} request_received", self.node_id),
});
}
async fn broadcast_received(&mut self, msg: CommandMessage) {
log::info!("Node {} broadcast_received {:?}", self.node_id, msg);
}
}
impl AsyncIBusNodeFactory<CommandMessage, CommandMessage, bool> for TestModule {
fn init_node(
node_id: String,
registry_handle: AsyncNodeRegistryHandle<CommandMessage, CommandMessage>,
configuration: &serde_json::Value,
_context : bool
) -> Result<Box<Self>, anyhow::Error> {
let test_module = Box::new(Self {
node_id: node_id.clone(),
registry_handle: registry_handle.clone(),
configuration: configuration.clone(),
opmode: AsyncIBusOpMode::Init,
});
Ok(test_module)
}
}
#[tokio::main]
async fn main() {
CombinedLogger::init(vec![TermLogger::new(
LevelFilter::Debug,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)])
.unwrap();
let node_registry = AsyncNodeRegistryHandle::<CommandMessage, CommandMessage>::new();
let test1 =
TestModule::create("test1", node_registry.clone(), &serde_json::Value::Null, false).unwrap();
const SPOON: &str = "There is no spoon.";
let subs = serde_json::json!([{"test1" : "morpheus"}]);
let test2 = TestModule::create("test2", node_registry.clone(), &subs, false).unwrap();
if let Err(err) = test1.request_change_opmode(AsyncIBusOpMode::Config).await {
log::error!("Failed to change opmode: {}", err);
}
if let Err(err) = test2.request_change_opmode(AsyncIBusOpMode::Config).await {
log::error!("Failed to change opmode: {}", err);
}
if let Err(err) = test1.request_change_opmode(AsyncIBusOpMode::PreOp).await {
log::error!("Failed to change opmode: {}", err);
}
if let Err(err) = test2.request_change_opmode(AsyncIBusOpMode::PreOp).await {
log::error!("Failed to change opmode: {}", err);
}
if let Err(err) = test1.request_change_opmode(AsyncIBusOpMode::Run).await {
log::error!("Failed to change opmode: {}", err);
}
if let Err(err) = test2.request_change_opmode(AsyncIBusOpMode::Run).await {
log::error!("Failed to change opmode: {}", err);
}
let broadcast_msg = CommandMessage {
node_id: String::from("test1"),
topic: String::from("morpheus"),
data: String::from("Wake up, Neo..."),
};
if let Err(err) = node_registry
.send_message(
"test1",
IBusMessage::Broadcast {
payload: broadcast_msg.clone(),
},
)
.await
{
log::error!("MAIN: Failed to broadcast message: {}", err);
}
for _ in 0..3 {
let broadcast_valid_msg = CommandMessage {
node_id: String::from("test1"),
topic: String::from("morpheus"),
data: String::from(SPOON),
};
if let Err(err) = node_registry
.send_message(
"test1",
IBusMessage::Broadcast {
payload: broadcast_valid_msg.clone(),
},
)
.await
{
log::error!("dcast message: {}", err);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
if let Err(err) = node_registry.broadcast(async_node_registry::BROADCAST_ID, CommandMessage {
node_id: "BROADCAST".to_string(),
topic: "HELLO".to_string(),
data: "I'm The Doctor.".to_string()
}).await {
log::error!("broadcast all message failed: {}", err);
}
println!("*** FINISHING ***");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("*** GOODBYE ***");
}