use crate::message::Context;
use crate::runtime::{
compress, create_node, get_node_handler, Result,
RobomotionError, PACKAGE_CAPABILITIES,
};
use std::sync::atomic::{AtomicI32, Ordering};
use tokio::sync::oneshot;
static NODE_COUNT: AtomicI32 = AtomicI32::new(0);
static DONE: once_cell::sync::Lazy<parking_lot::RwLock<Option<oneshot::Sender<()>>>> =
once_cell::sync::Lazy::new(|| parking_lot::RwLock::new(None));
pub struct HandshakeConfig {
pub protocol_version: u32,
pub magic_cookie_key: &'static str,
pub magic_cookie_value: &'static str,
}
pub const HANDSHAKE: HandshakeConfig = HandshakeConfig {
protocol_version: 1,
magic_cookie_key: "robomotion_plugin",
magic_cookie_value: "6e80b1a2cf26c5935ed7b6e5be77fe218d5f358d",
};
pub struct GrpcServer {
}
impl GrpcServer {
pub fn new() -> Self {
Self {}
}
pub async fn init(&self, event_server: u32) -> Result<()> {
let addr = format!("http://127.0.0.1:{}", event_server);
let client = crate::runtime::RuntimeClient::connect(addr).await?;
crate::runtime::set_client(client);
Ok(())
}
pub async fn on_create(&self, name: &str, config: &[u8]) -> Result<()> {
create_node(name, config).await?;
let guid: serde_json::Value = serde_json::from_slice(config)?;
let guid = guid["guid"].as_str().unwrap_or("");
if let Some(handler_ref) = get_node_handler(guid) {
let mut handler = handler_ref.handler.write().await;
handler.on_create().await?;
} else {
return Err(RobomotionError::HandlerNotFound(guid.to_string()));
}
NODE_COUNT.fetch_add(1, Ordering::SeqCst);
Ok(())
}
pub async fn on_message(&self, guid: &str, in_message: &[u8]) -> Result<Vec<u8>> {
let data = compress::decompress(in_message)?;
let handler_ref = get_node_handler(guid)
.ok_or_else(|| RobomotionError::HandlerNotFound(guid.to_string()))?;
let mut ctx = Context::new(&data);
let delay_before = handler_ref.node.delay_before;
let delay_after = handler_ref.node.delay_after;
let continue_on_error = handler_ref.node.continue_on_error;
if delay_before > 0.0 {
tokio::time::sleep(std::time::Duration::from_secs_f32(delay_before)).await;
}
let result = {
let mut handler = handler_ref.handler.write().await;
handler.on_message(&mut ctx).await
};
if let Err(e) = result {
if !continue_on_error {
return Err(e);
}
}
if delay_after > 0.0 {
tokio::time::sleep(std::time::Duration::from_secs_f32(delay_after)).await;
}
if !ctx.is_empty() {
let out_msg = ctx.get_raw()?;
Ok(out_msg)
} else {
Ok(Vec::new())
}
}
pub async fn on_close(&self, guid: &str) -> Result<()> {
if let Some(handler_ref) = get_node_handler(guid) {
let mut handler = handler_ref.handler.write().await;
handler.on_close().await?;
}
let count = NODE_COUNT.fetch_sub(1, Ordering::SeqCst) - 1;
if count == 0 {
if let Some(tx) = DONE.write().take() {
let _ = tx.send(());
}
}
Ok(())
}
pub fn get_capabilities(&self) -> u64 {
PACKAGE_CAPABILITIES.bits()
}
}
impl Default for GrpcServer {
fn default() -> Self {
Self::new()
}
}
pub fn set_done_channel(tx: oneshot::Sender<()>) {
*DONE.write() = Some(tx);
}