robomotion 0.1.3

Official Rust SDK for building Robomotion RPA packages
Documentation
//! gRPC server and client implementations.

use crate::message::Context;
use crate::runtime::{
    compress, create_node, get_node_handler, Result,
    RobomotionError, PACKAGE_CAPABILITIES,
};
use std::sync::atomic::{AtomicI32, Ordering};
use tokio::sync::oneshot;

/// Node count for tracking active nodes.
static NODE_COUNT: AtomicI32 = AtomicI32::new(0);

/// Done channel for shutdown signaling.
static DONE: once_cell::sync::Lazy<parking_lot::RwLock<Option<oneshot::Sender<()>>>> =
    once_cell::sync::Lazy::new(|| parking_lot::RwLock::new(None));

/// Plugin handshake configuration (go-plugin compatible).
pub struct HandshakeConfig {
    pub protocol_version: u32,
    pub magic_cookie_key: &'static str,
    pub magic_cookie_value: &'static str,
}

/// Default handshake configuration.
pub const HANDSHAKE: HandshakeConfig = HandshakeConfig {
    protocol_version: 1,
    magic_cookie_key: "robomotion_plugin",
    magic_cookie_value: "6e80b1a2cf26c5935ed7b6e5be77fe218d5f358d",
};

/// gRPC server implementation for the Node service.
pub struct GrpcServer {
    // broker: GrpcBroker,
}

impl GrpcServer {
    pub fn new() -> Self {
        Self {}
    }

    /// Handle Init request.
    pub async fn init(&self, event_server: u32) -> Result<()> {
        // Connect to the runtime helper service
        let addr = format!("http://127.0.0.1:{}", event_server);
        let client = crate::runtime::RuntimeClient::connect(addr).await?;
        crate::runtime::set_client(client);
        Ok(())
    }

    /// Handle OnCreate request.
    pub async fn on_create(&self, name: &str, config: &[u8]) -> Result<()> {
        // Create the node using the factory
        create_node(name, config).await?;

        // Get the node handler and call OnCreate
        let guid: serde_json::Value = serde_json::from_slice(config)?;
        let guid = guid["guid"].as_str().unwrap_or("");

        if let Some(handler_ref) = get_node_handler(guid) {
            let mut handler = handler_ref.handler.write().await;
            handler.on_create().await?;
        } else {
            return Err(RobomotionError::HandlerNotFound(guid.to_string()));
        }

        NODE_COUNT.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }

    /// Handle OnMessage request.
    pub async fn on_message(&self, guid: &str, in_message: &[u8]) -> Result<Vec<u8>> {
        // Decompress the message
        let data = compress::decompress(in_message)?;

        // Get the handler
        let handler_ref = get_node_handler(guid)
            .ok_or_else(|| RobomotionError::HandlerNotFound(guid.to_string()))?;

        // Create context
        let mut ctx = Context::new(&data);

        // Apply delays and process message
        let delay_before = handler_ref.node.delay_before;
        let delay_after = handler_ref.node.delay_after;
        let continue_on_error = handler_ref.node.continue_on_error;

        if delay_before > 0.0 {
            tokio::time::sleep(std::time::Duration::from_secs_f32(delay_before)).await;
        }

        let result = {
            let mut handler = handler_ref.handler.write().await;
            handler.on_message(&mut ctx).await
        };

        // Handle errors based on continue_on_error setting
        if let Err(e) = result {
            if !continue_on_error {
                return Err(e);
            }
        }

        if delay_after > 0.0 {
            tokio::time::sleep(std::time::Duration::from_secs_f32(delay_after)).await;
        }

        // Get output message
        if !ctx.is_empty() {
            let out_msg = ctx.get_raw()?;
            Ok(out_msg)
        } else {
            Ok(Vec::new())
        }
    }

    /// Handle OnClose request.
    pub async fn on_close(&self, guid: &str) -> Result<()> {
        if let Some(handler_ref) = get_node_handler(guid) {
            let mut handler = handler_ref.handler.write().await;
            handler.on_close().await?;
        }

        let count = NODE_COUNT.fetch_sub(1, Ordering::SeqCst) - 1;
        if count == 0 {
            // Signal shutdown
            if let Some(tx) = DONE.write().take() {
                let _ = tx.send(());
            }
        }

        Ok(())
    }

    /// Handle GetCapabilities request.
    pub fn get_capabilities(&self) -> u64 {
        PACKAGE_CAPABILITIES.bits()
    }
}

impl Default for GrpcServer {
    fn default() -> Self {
        Self::new()
    }
}

/// Set the done channel for shutdown signaling.
pub fn set_done_channel(tx: oneshot::Sender<()>) {
    *DONE.write() = Some(tx);
}