mechutil 0.8.1

Utility structures and functions for mechatronics applications.
Documentation
//
// Copyright (C) 2024 Automated Design Corp.. All Rights Reserved.
// Created Date: 2024-10-08 21:00:36
// -----
// Last Modified: 2025-03-22 09:10:26
// -----
//
//

use async_trait::async_trait;
use mechutil::{
    async_bus::{
        async_ibus_node::{
            is_valid_opmode_transition, AsyncIBusNode, AsyncIBusNodeFactory, AsyncIBusOpMode, IBusMessage
        },
        async_node_registry::{self, AsyncNodeRegistryHandle},
    },
    subscription::filter::{DefaultSubscriptionFilter, SubscriptionFilter},
};

use anyhow::anyhow;
use simplelog::*;
use tokio::sync::oneshot;
use tokio::time::Duration;

#[derive(Clone, Debug, Default)]
pub struct MatchingFilter {
    match_value: String,
}

impl MatchingFilter {
    pub fn new(match_value: &str) -> Box<Self> {
        Box::new(Self {
            match_value: String::from(match_value),
        })
    }
}

impl SubscriptionFilter<CommandMessage> for MatchingFilter {
    fn matches(&self, msg: &CommandMessage) -> bool {
        log::debug!(
            "MatchingFilter::matches: msg.data = {}, self.match_value = {}",
            msg.data,
            self.match_value
        );
        msg.data == self.match_value
    }
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct CommandMessage {
    node_id: String,
    topic: String,
    data: String,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct SubscriptionInformation {
    target_node_id: String,
}

struct TestModule {
    node_id: String,
    registry_handle: AsyncNodeRegistryHandle<CommandMessage, CommandMessage>,
    configuration: serde_json::Value,
    opmode: AsyncIBusOpMode,
}

#[async_trait]
impl AsyncIBusNode<CommandMessage, CommandMessage> for TestModule {
    async fn request_change_opmode(
        &mut self,
        opmode: AsyncIBusOpMode,
        respond_to: oneshot::Sender<AsyncIBusOpMode>,
    ) {
        log::info!("Node {} request_change_opmode {:?}", self.node_id, opmode);

        if is_valid_opmode_transition(self.opmode, opmode) {
            match opmode {
                AsyncIBusOpMode::Init => {
                    log::info!("Node {} is now in Init mode", self.node_id);
                }
                AsyncIBusOpMode::Config => {
                    log::debug!(
                        "Node {} is Configuration: {}",
                        self.node_id,
                        self.configuration
                    );

                    // Iterate through self.configuration
                    if let Some(array) = self.configuration.as_array() {
                        for value in array {
                            if let Some(items) = value.as_object() {
                                for (k, v) in items {
                                    let topic = v.as_str().unwrap();

                                    log::debug!(
                                        "Node {} requesting subscription to topic: {}.{}",
                                        self.node_id,
                                        k,
                                        topic
                                    );

                                    if let Err(err) = self
                                        .registry_handle
                                        .subscribe(
                                            k,
                                            self.node_id.as_str(),
                                            Box::new(DefaultSubscriptionFilter),
                                        )
                                        .await
                                    {
                                        log::error!("Failed to send subscription request: {}", err);
                                    }
                                }
                            }
                        }
                    }

                    log::info!("Node {} is now in Config mode", self.node_id);
                }
                AsyncIBusOpMode::PreOp => {
                    log::info!("Node {} is now in PreOp mode", self.node_id);
                }
                AsyncIBusOpMode::Run => {
                    log::info!("Node {} is now in RUN mode", self.node_id);
                }
            }
        }

        let _ = respond_to.send(opmode);
    }


    /// A synchronous versio of request_change_opmode.
    fn request_change_opmode_sync(&mut self, opmode : AsyncIBusOpMode) -> Result<(), anyhow::Error> {
        log::info!("Node {} request_change_opmode {:?}", self.node_id, opmode);

        if is_valid_opmode_transition(self.opmode, opmode) {
            match opmode {
                AsyncIBusOpMode::Init => {
                    log::info!("Node {} is now in Init mode", self.node_id);
                }
                AsyncIBusOpMode::Config => {
                    log::debug!(
                        "Node {} is Configuration: {}",
                        self.node_id,
                        self.configuration
                    );

                    let node_id = self.node_id.clone();
                    let reg_handle = self.registry_handle.clone();

                    let arr;
                    if let Some(conf_arr) = self.configuration.as_array() {
                        arr = conf_arr.clone();
                    }
                    else {
                        arr = Vec::new();
                    }

                    let join_handle  =  tokio::spawn(async move {
                    // Iterate through self.configuration
                        for value in arr {
                            if let Some(items) = value.as_object() {
                                for (k, v) in items {
                                    let topic = v.as_str().unwrap();

                                    log::debug!(
                                        "Node {} requesting subscription to topic: {}.{}",
                                        node_id,
                                        k,
                                        topic
                                    );

                                    if let Err(err) =
                                        reg_handle
                                        .subscribe(
                                            k,
                                            node_id.as_str(),
                                            Box::new(DefaultSubscriptionFilter),
                                        )
                                        .await
                                    {
                                        log::error!("Failed to send subscription request: {}", err);
                                    }
                                }
                            }
                        }
                    

                    });


                    // Wait for the async task to complete.
                    match tokio::runtime::Runtime::new().unwrap().block_on(join_handle) {
                        Ok(_) => {
                            log::info!("Node {} is now in Config mode", self.node_id);
                        }
                        Err(err) => {
                            return Err(anyhow!("Task panicked: {}", err));
                        }
                    }
                                        
                    
                }
                AsyncIBusOpMode::PreOp => {
                    log::info!("Node {} is now in PreOp mode", self.node_id);
                }
                AsyncIBusOpMode::Run => {
                    log::info!("Node {} is now in RUN mode", self.node_id);
                }
            }
        }
        

        if self.opmode == opmode {
            return Ok(());
        }
        else {
            return Err(anyhow!("Failed to change opmode!"));
        }
    }

    async fn request_read_opmode(&self, respond_to: oneshot::Sender<AsyncIBusOpMode>) {
        log::info!("Node {} request_read_opmode", self.node_id);
        let _ = respond_to.send(self.opmode);
    }

    /// A synchronous version of request_read_opmode.
    fn request_read_opmode_sync(&self) -> AsyncIBusOpMode {
        return self.opmode;
    }


    fn node_id(&self) -> String {
        self.node_id.clone()
    }

    async fn request_received(
        &mut self,
        msg: CommandMessage,
        respond_to: oneshot::Sender<CommandMessage>,
    ) {
        log::info!("Node {} request_received {:?}", self.node_id, msg);
        let _ = respond_to.send(CommandMessage {
            node_id: self.node_id.clone(),
            topic: msg.topic,
            data: format!("{} request_received", self.node_id),
        });
    }

    async fn broadcast_received(&mut self, msg: CommandMessage) {
        log::info!("Node {} broadcast_received {:?}", self.node_id, msg);
    }
}

impl AsyncIBusNodeFactory<CommandMessage, CommandMessage, bool> for TestModule {
    fn init_node(
        node_id: String,
        registry_handle: AsyncNodeRegistryHandle<CommandMessage, CommandMessage>,
        configuration: &serde_json::Value,
        _context : bool
    ) -> Result<Box<Self>, anyhow::Error> {
        // Create the TestModule instance wrapped in Box
        let test_module = Box::new(Self {
            node_id: node_id.clone(),
            registry_handle: registry_handle.clone(),
            configuration: configuration.clone(),
            opmode: AsyncIBusOpMode::Init,
        });

        // Return the Box<Self> instance
        Ok(test_module)
    }
}

#[tokio::main]
async fn main() {
    CombinedLogger::init(vec![TermLogger::new(
        LevelFilter::Debug,
        Config::default(),
        TerminalMode::Mixed,
        ColorChoice::Auto,
    )])
    .unwrap();

    let node_registry = AsyncNodeRegistryHandle::<CommandMessage, CommandMessage>::new();

    let test1 =
        TestModule::create("test1", node_registry.clone(), &serde_json::Value::Null, false).unwrap();

    const SPOON: &str = "There is no spoon.";

    let subs = serde_json::json!([{"test1" : "morpheus"}]);
    let test2 = TestModule::create("test2", node_registry.clone(), &subs, false).unwrap();

    if let Err(err) = test1.request_change_opmode(AsyncIBusOpMode::Config).await {
        log::error!("Failed to change opmode: {}", err);
    }

    if let Err(err) = test2.request_change_opmode(AsyncIBusOpMode::Config).await {
        log::error!("Failed to change opmode: {}", err);
    }

    if let Err(err) = test1.request_change_opmode(AsyncIBusOpMode::PreOp).await {
        log::error!("Failed to change opmode: {}", err);
    }

    if let Err(err) = test2.request_change_opmode(AsyncIBusOpMode::PreOp).await {
        log::error!("Failed to change opmode: {}", err);
    }

    if let Err(err) = test1.request_change_opmode(AsyncIBusOpMode::Run).await {
        log::error!("Failed to change opmode: {}", err);
    }

    if let Err(err) = test2.request_change_opmode(AsyncIBusOpMode::Run).await {
        log::error!("Failed to change opmode: {}", err);
    }

    // Test1 broadcasts a message to the topic "morpheus"
    let broadcast_msg = CommandMessage {
        node_id: String::from("test1"),
        topic: String::from("morpheus"),
        data: String::from("Wake up, Neo..."),
    };

    if let Err(err) = node_registry
        .send_message(
            "test1",
            IBusMessage::Broadcast {
                payload: broadcast_msg.clone(),
            },
        )
        .await
    {
        log::error!("MAIN: Failed to broadcast message: {}", err);
    }

    for _ in 0..3 {
        // Test1 broadcasts a message to the topic "morpheus"
        let broadcast_valid_msg = CommandMessage {
            node_id: String::from("test1"),
            topic: String::from("morpheus"),
            data: String::from(SPOON),
        };

        if let Err(err) = node_registry
            .send_message(
                "test1",
                IBusMessage::Broadcast {
                    payload: broadcast_valid_msg.clone(),
                },
            )
            .await
        {
            log::error!("dcast message: {}", err);
        }

        tokio::time::sleep(Duration::from_secs(1)).await;
    }



    if let Err(err) = node_registry.broadcast(async_node_registry::BROADCAST_ID, CommandMessage { 
        node_id: "BROADCAST".to_string(), 
        topic: "HELLO".to_string(), 
        data: "I'm The Doctor.".to_string()
    }).await {
        log::error!("broadcast all message failed: {}", err);
    }

    println!("*** FINISHING ***");

    tokio::time::sleep(Duration::from_secs(1)).await;

    println!("*** GOODBYE ***");
}