mechutil 0.8.1

Utility structures and functions for mechatronics applications.
Documentation
//
// Copyright (C) 2025 Automated Design Corp.. All Rights Reserved.
// Created Date: 2025-03-15 12:41:19
// -----
// Last Modified: 2025-03-16 17:55:31
// -----
// 
//

//! # UdpClient Module
//!
//! This module provides the `UdpClient` struct, which is responsible for interacting with the `UdpBroker`
//! over a UDP connection. The client supports sending requests, subscribing to topics, writing values,
//! and listening for incoming messages from the broker.
//!
//! ## Key Features
//! - **Node Registration**: Clients can register themselves with the broker using a unique node ID.
//! - **Topic Subscription**: Clients can subscribe to specific topics to receive updates.
//! - **Value Writing**: Clients can write values to specific topics on the broker.
//! - **Message Listening**: Clients can listen for incoming messages from the broker.
//!
//! ## Usage
//! The `UdpClient` is designed to be used in asynchronous environments, leveraging `tokio` for
//! non-blocking I/O operations. It provides methods for sending requests, handling responses, and
//! managing subscriptions.
//!
//! ## Example
//! ```rust
//! let client = UdpClient::new("node1", "0.0.0.0:0", "127.0.0.1:5555").await?;
//! client.register().await?;
//! client.subscribe("example_topic", &serde_json::json!({}), &serde_json::json!({})).await?;
//! client.write("example_topic", &serde_json::json!("value"), &serde_json::json!({})).await?;
//! client.listen().await?;
//! ```
//!
//! ## Error Handling
//! All methods return `Result` types, allowing for robust error handling. Errors can occur due to
//! network issues, serialization failures, or invalid inputs.
//!
//! ## Dependencies
//! - `tokio`: For asynchronous networking.
//! - `serde_json`: For message serialization and deserialization.
//! - `log`: For logging messages and errors.
//! - `anyhow`: For flexible error handling.
//!
//! ## Notes
//! - The client uses UDP for communication, which is lightweight but does not guarantee message delivery.
//! - Ensure the broker is running and accessible at the specified address before using the client.
//!



use tokio::net::UdpSocket;
use std::net::SocketAddr;
use serde_json;
use log;
use anyhow::anyhow;

use super::types::{CoreLinkMessage, CoreLinkMessageType, CoreLinkSubscriptionPayload, CoreLinkTopicValuePayload};

/// Represents a client for interacting with the UdpBroker.
///
/// The `UdpClient` is responsible for sending requests to the broker, subscribing to topics,
/// writing values, and listening for incoming messages.
pub struct UdpClient {
    socket: UdpSocket,
    broker_addr: SocketAddr,
    last_transaction_id: u32,
    node_id: String,
}

impl UdpClient {
    /// Create a new UdpClient.
    ///
    /// ### Parameters
    /// - `node_id`: A unique identifier for the client node. Must be at least 2 characters long.
    /// - `bind_addr`: The address to bind the client socket to (e.g., "0.0.0.0:0").
    /// - `broker_addr`: The address of the UdpBroker (e.g., "127.0.0.1:5555").
    ///
    /// ### Returns
    /// A new `UdpClient` instance or an error if the socket binding or address parsing fails.
    pub async fn new(node_id: &str, bind_addr: &str, broker_addr: &str) -> Result<Self, anyhow::Error> {
        let socket = UdpSocket::bind(bind_addr).await?;
        let broker_addr: SocketAddr = broker_addr.parse()?;

        if node_id.len() < 2 {
            return Err(anyhow!("Invalid node id provided: {}", node_id));
        }

        Ok(Self {
            socket,
            broker_addr,
            last_transaction_id: 0,
            node_id: node_id.to_lowercase(),
        })
    }

    /// Returns the node ID of this client.
    ///
    /// ### Returns
    /// The node ID as a string slice.
    pub fn node_id(&self) -> &str {
        return self.node_id.as_str();
    }

    /// Increment the transaction ID and avoid 0 should roll-over occur.
    ///
    /// ### Returns
    /// The new transaction ID.
    fn increment_transaction_id(&mut self) -> u32 {
        self.last_transaction_id += 1;
        // rollover
        if self.last_transaction_id == 0 {
            self.last_transaction_id = 1;
        }

        return self.last_transaction_id;
    }

    /// Send a request to the broker.
    ///
    /// ### Parameters
    /// - `message`: The `CoreLinkMessage` to send.
    ///
    /// ### Returns
    /// The response from the broker as a `CoreLinkMessage`.
    pub async fn send_message(&self, message: &mut CoreLinkMessage) -> Result<CoreLinkMessage, anyhow::Error> {
        CoreLinkMessage::calculate_crc32(&message);

        let msg_str = serde_json::to_string(&message)?;
        self.socket.send_to(msg_str.as_bytes(), self.broker_addr).await?;

        let mut buf = [0; 1024];
        let (len, _) = self.socket.recv_from(&mut buf).await?;
        let response_str = String::from_utf8(buf[..len].to_vec())?;
        let response: CoreLinkMessage = serde_json::from_str(&response_str)?;

        Ok(response)
    }

    /// Register this node with the broker.
    ///
    /// ### Returns
    /// `Ok(())` if the registration is successful, or an error if the registration fails.
    pub async fn register(&mut self) -> Result<(), anyhow::Error> {
        let mut message = CoreLinkMessage {
            transaction_id: self.increment_transaction_id(),
            timecode: 0,
            node_id: self.node_id.clone(),
            message_type: CoreLinkMessageType::Register,
            crc: 0,
            data: serde_json::Value::Null,
        };

        match self.send_message(&mut message).await {
            Ok(res) => {
                println!("** {:?} ** ", res);

                if res.is_error() {
                    return Err(anyhow!("Failed to register node: {}", res.error_message()));
                } else {
                    return Ok(());
                }
            }
            Err(err) => {
                return Err(anyhow!("Failed to send register request: {}", err));
            }
        }
    }

    /// Subscribe to a topic.
    ///
    /// ### Parameters
    /// - `fqdn`: The fully-qualified domain name (FQDN) of the topic to subscribe to.
    /// - `options`: Additional options for the subscription (e.g., filters or settings).
    /// - `user_data`: Custom user data to include in the subscription request.
    ///
    /// ### Returns
    /// `Ok(())` if the subscription is successful, or an error if the subscription fails.
    pub async fn subscribe(
        &mut self,
        fqdn: &str,
        options: &serde_json::Value,
        user_data: &serde_json::Value,
    ) -> Result<(), anyhow::Error> {
        let mut payload = CoreLinkSubscriptionPayload::new(fqdn);
        payload.options = options.clone();
        payload.user_data = user_data.clone();

        match serde_json::to_value(&payload) {
            Ok(data) => {
                let mut message = CoreLinkMessage {
                    transaction_id: self.increment_transaction_id(),
                    timecode: 0,
                    node_id: self.node_id.clone(),
                    message_type: CoreLinkMessageType::Subscribe,
                    crc: 0,
                    data: data,
                };

                match self.send_message(&mut message).await {
                    Ok(res) => {
                        if let Some(err) = res.data.get("error_message") {
                            return Err(anyhow!("Failed to subscribe to value: {}", err.to_string()));
                        } else {
                            return Ok(());
                        }
                    }
                    Err(err) => {
                        return Err(anyhow!("Failed to send subscribe request: {}", err));
                    }
                }
            }
            Err(err) => {
                return Err(anyhow!("Failed to generate SUBSCRIBE message: {}", err));
            }
        }
    }

    /// Request writing a value to the broker.
    ///
    /// ### Parameters
    /// - `fqdn`: The fully-qualified domain name (FQDN) of the topic to write to.
    /// - `value`: The value to write to the topic.
    /// - `user_data`: Custom user data to include in the write request.
    ///
    /// ### Returns
    /// `Ok(())` if the write operation is successful, or an error if the write fails.
    pub async fn write(
        &mut self,
        fqdn: &str,
        value: &serde_json::Value,
        user_data: &serde_json::Value,
    ) -> Result<(), anyhow::Error> {
        let mut payload = CoreLinkTopicValuePayload::new(fqdn);
        payload.value = value.clone();
        payload.user_data = user_data.clone();

        match serde_json::to_value(&payload) {
            Ok(data) => {
                let mut message = CoreLinkMessage {
                    transaction_id: self.increment_transaction_id(),
                    timecode: 0,
                    node_id: self.node_id.clone(),
                    message_type: CoreLinkMessageType::Write,
                    crc: 0,
                    data: data,
                };

                match self.send_message(&mut message).await {
                    Ok(res) => {
                        if let Some(err) = res.data.get("error_message") {
                            return Err(anyhow!("Failed to write value: {}", err.to_string()));
                        } else {
                            return Ok(());
                        }
                    }
                    Err(err) => {
                        return Err(anyhow!("Failed to send write request: {}", err));
                    }
                }
            }
            Err(err) => {
                return Err(anyhow!("Failed to generate WRITE message: {}", err));
            }
        }
    }

    /// Listen for incoming messages from the broker.
    ///
    /// This function runs in a loop, listening for messages and handling them.
    ///
    /// ### Returns
    /// `Ok(())` if the listener runs successfully, or an error if the socket fails.
    pub async fn listen(&self) -> Result<(), anyhow::Error> {
        let mut buf = [0; 1024];
        loop {
            let (len, _) = self.socket.recv_from(&mut buf).await?;
            let msg_str = String::from_utf8(buf[..len].to_vec())?;
            let message: CoreLinkMessage = serde_json::from_str(&msg_str)?;

            // Handle the message (e.g., print it or process it).
            log::info!("Received message: {:?}", message);
        }
    }
}