use std::sync::mpsc::{channel, RecvError};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use uuid::Uuid;
use crate::fbp_asyncstate::*;
use crate::fbp_iidmessage::*;
use crate::fbp_threadsafe_wrapper::*;
pub struct SenderWrapper(std::sync::mpsc::Sender<IIDMessage>);
impl Deref for SenderWrapper {
type Target = std::sync::mpsc::Sender<IIDMessage>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for SenderWrapper {
fn default() -> Self {
let (sender, _) = channel::<IIDMessage>();
SenderWrapper(sender)
}
}
impl Clone for SenderWrapper {
fn clone(&self) -> Self {
let sender = self.deref().clone();
SenderWrapper(sender)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct FBPNodeSender {
#[serde(skip)]
sender: ThreadSafeType<SenderWrapper>,
}
impl FBPNodeSender {
pub fn new(sender: SenderWrapper) -> Self {
FBPNodeSender {
sender: ThreadSafeType::new(sender),
}
}
pub fn send(&self, msg: IIDMessage) {
let send_result = self.sender.get_arc().lock().unwrap().deref().send(msg);
if send_result.is_err() {
}
}
}
impl Default for FBPNodeSender {
fn default() -> Self {
let (sender, _) = channel::<IIDMessage>();
FBPNodeSender::new(SenderWrapper(sender))
}
}
pub struct ReceiverWrapper(std::sync::mpsc::Receiver<IIDMessage>);
impl Deref for ReceiverWrapper {
type Target = std::sync::mpsc::Receiver<IIDMessage>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for ReceiverWrapper {
fn default() -> Self {
let (_, receiver) = channel::<IIDMessage>();
ReceiverWrapper(receiver)
}
}
impl Clone for ReceiverWrapper {
fn clone(&self) -> Self {
ReceiverWrapper::default()
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct FBPNodeReceiver {
#[serde(skip)]
pub receiver: ThreadSafeType<ReceiverWrapper>,
}
impl FBPNodeReceiver {
pub fn new(receiver: ReceiverWrapper) -> Self {
FBPNodeReceiver {
receiver: ThreadSafeType::new(receiver),
}
}
pub fn recv(&self) -> Result<IIDMessage, RecvError> {
self.receiver.get_arc().lock().unwrap().deref().recv()
}
}
impl Default for FBPNodeReceiver {
fn default() -> Self {
let (_, receiver) = channel::<IIDMessage>();
FBPNodeReceiver::new(ReceiverWrapper(receiver))
}
}
pub trait NodeSerializer {
fn make_self_from_string<'a, T>(json_string: &'a str) -> T
where
T: std::marker::Sized + serde::Deserialize<'a>,
{
serde_json::from_str(json_string).unwrap()
}
fn serialize_node(&self) -> String
where
Self: std::marker::Sized + serde::Serialize,
{
serde_json::to_string(&self).unwrap()
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct FBPNodeContext {
name: String,
#[serde(skip)]
uuid: Uuid,
#[serde(skip)]
tx: Box<FBPNodeSender>,
#[serde(skip)]
rx: Box<FBPNodeReceiver>,
#[serde(skip)]
pub output_vec: ThreadSafeType<HashMap<String, Vec<Box<FBPNodeContext>>>>,
#[serde(skip)]
pub is_configured: AsyncState,
#[serde(skip)]
pub is_running: AsyncState,
#[serde(skip)]
pub node_completion: AsyncState,
#[serde(skip)]
node_suspended: Arc<AtomicBool>,
}
impl FBPNodeContext {
pub fn new(name: &str) -> Self {
let (sender, receiver) = channel::<IIDMessage>();
FBPNodeContext {
name: name.to_string(),
uuid: Uuid::new_v4(),
tx: Box::new(FBPNodeSender::new(SenderWrapper(sender))),
rx: Box::new(FBPNodeReceiver::new(ReceiverWrapper(receiver))),
output_vec: ThreadSafeType::new(HashMap::new()),
is_configured: AsyncState::new(),
is_running: AsyncState::new(),
node_completion: AsyncState::new(),
node_suspended: Arc::new(AtomicBool::new(false)),
}
}
pub fn name(&self) -> String {
self.name.clone()
}
pub fn uuid(&self) -> Uuid {
self.uuid
}
pub fn rx(&self) -> Box<FBPNodeReceiver> {
self.rx.clone()
}
pub fn tx(&self) -> Box<FBPNodeSender> {
self.tx.clone()
}
pub fn node_is_running(&self) -> bool {
self.is_running.is_ready()
}
pub fn set_node_is_running(&self, flag: bool) {
self.is_running.set_is_ready(flag);
}
pub async fn wait_for_node_to_be_running(&self) {
self.is_running.clone().await;
}
pub fn node_has_completed(&self) -> bool {
self.node_completion.is_ready()
}
pub fn set_node_has_completed(&self, flag: bool) {
self.node_completion.set_is_ready(flag);
}
pub async fn wait_for_node_to_complete(&self) {
self.node_completion.clone().await;
}
pub fn node_is_configured(&self) -> bool {
self.is_configured.is_ready()
}
pub fn set_node_is_configured(&self, flag: bool) {
self.is_configured.set_is_ready(flag);
}
pub async fn wait_for_node_to_be_configured(&self) {
self.is_configured.clone().await;
}
pub fn node_is_suspended(&self) -> bool {
self.node_suspended.deref().load(Ordering::Relaxed)
}
pub fn set_is_suspended(&self, flag: bool) {
self.node_suspended.store(flag, Ordering::Relaxed)
}
pub fn add_receiver(&mut self, receiver: &mut FBPNodeContext, key: Option<String>) {
let mut hash_key = "Any".to_string();
if key.is_some() {
hash_key = key.clone().unwrap();
}
if self.output_vec.get_type().is_empty() {
let mut vec_for_key: Vec<Box<FBPNodeContext>> = Vec::new();
vec_for_key.push(Box::new(receiver.clone()));
self.output_vec
.get_type()
.insert(hash_key.clone(), vec_for_key);
} else {
if self.output_vec.get_type().get_mut(&hash_key).is_some() {
self.output_vec
.get_type()
.get_mut(&hash_key)
.unwrap()
.push(Box::new(receiver.clone()));
} else {
let mut vec_for_key: Vec<Box<FBPNodeContext>> = Vec::new();
vec_for_key.push(Box::new(receiver.clone()));
self.output_vec
.get_type()
.insert(hash_key.clone(), vec_for_key);
}
}
}
pub fn remove_receiver(&mut self, receiver: &mut FBPNodeContext, key: Option<String>) {
let mut hash_key = "Any".to_string();
if key.is_some() {
hash_key = key.clone().unwrap();
}
if self.output_vec.get_type().get_mut(&hash_key).is_some() {
let index = self
.output_vec
.get_type()
.get_mut(&hash_key)
.unwrap()
.iter()
.position(|r| r.deref() == receiver)
.unwrap();
self.output_vec
.get_type()
.get_mut(&hash_key)
.unwrap()
.remove(index);
}
}
pub fn get_num_items_for_receiver_vec(&self, key: Option<String>) -> usize {
let mut hash_key = "Any".to_string();
if key.is_some() {
hash_key = key.clone().unwrap();
}
let mut result: usize = 0;
if self.output_vec.get_type().get(&hash_key).is_some() {
result = self.output_vec.get_type().get(&hash_key).unwrap().len();
}
result
}
pub fn post_msg(&self, msg: IIDMessage) {
if self.node_is_running() {
self.tx.send(msg.clone());
}
}
pub fn post_msg_to_group(&self, msg: IIDMessage, key: Option<String>) {
if key.is_none() {
return;
}
let hash_key = key.unwrap();
if self.output_vec.get_type().get(&hash_key).is_none() {
return;
}
for ctx in self.output_vec.get_type().get(&hash_key).unwrap().iter() {
ctx.post_msg(msg.clone());
}
}
}
impl PartialEq for FBPNodeContext {
fn eq(&self, other: &Self) -> bool {
self.uuid == other.uuid
}
}
mod tests {
use super::*;
use async_trait::async_trait;
use serde_json::json;
use serde_json::value::Value;
use std::fs::{File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Write};
use std::ops::Deref;
use std::path::Path;
#[allow(unused_imports)]
use std::{thread, time};
use crate::fbp_node_error::*;
use crate::fbp_node_trait::*;
const LOGGER_GROUP: &str = "Logger_Group";
#[derive(Clone, Serialize, Deserialize)]
pub struct LoggerNode {
data: Box<FBPNodeContext>,
#[serde(skip)]
log_file_path: ThreadSafeOptionType<String>,
}
impl LoggerNode {
#[allow(dead_code)]
pub fn new() -> Self {
let result = LoggerNode {
data: Box::new(FBPNodeContext::new("LoggerNode")),
log_file_path: ThreadSafeOptionType::new(None),
};
result.clone().start();
result
}
pub fn set_log_file_path(&mut self, log_file_path: String) {
self.log_file_path.set_option(Some(log_file_path));
let string_ref = self.log_file_path.get_option().as_ref().unwrap().clone();
let file_path = Path::new(string_ref.as_str());
let _file = File::create(file_path).expect("Unable to create file");
self.data.set_node_is_configured(true);
}
#[allow(dead_code)]
pub fn get_log_string(&self) -> Result<String, Error> {
if self.log_file_path.is_none() {
return Err(Error::new(
ErrorKind::Other,
"Cannot get log string until the node is setup",
));
}
let mut contents = String::new();
let string_ref = self.log_file_path.get_option().as_ref().unwrap().clone();
let file_path = Path::new(string_ref.as_str());
let mut file = OpenOptions::new()
.read(true)
.open(file_path)
.expect("Failed to open file {} for reading");
file.read_to_string(&mut contents)
.expect("Failed to write contents to string");
Ok(contents)
}
pub fn log_string_to_file(&self, data: &String) -> Result<(), Error> {
if self.log_file_path.is_none() {
return Err(Error::new(
ErrorKind::Other,
"Cannot get log to file until the node is setup",
));
}
let string_ref = self.log_file_path.get_option().as_ref().unwrap().clone();
let file_path = Path::new(string_ref.as_str());
let mut file = OpenOptions::new()
.append(true)
.open(file_path)
.expect("Failed to open file for append");
let string_to_write = data.clone();
let string_to_write = string_to_write.replace("\0", "");
let _write_result = file.write(string_to_write.as_bytes());
Ok(())
}
}
#[async_trait]
impl FBPNodeTrait for LoggerNode {
fn node_data_clone(&self) -> FBPNodeContext {
self.data.deref().clone()
}
fn node_data(&self) -> &FBPNodeContext {
&self.data
}
fn node_data_mut(&mut self) -> &mut FBPNodeContext {
&mut self.data
}
fn process_config(
&mut self,
msg: IIDMessage,
) -> std::result::Result<IIDMessage, NodeError> {
if msg.msg_type() == MessageType::Config {
if msg.payload().is_some() {
let payload = msg.payload().as_ref().unwrap();
let config_message: ConfigMessage = serde_json::from_str(&payload)
.expect("Failed to deserialize the config message");
match config_message.msg_type() {
ConfigMessageType::Field => {
if config_message.data().as_ref().is_some() {
let config_str = json!(config_message.data().as_ref().unwrap());
let key_str = "log_file_path";
if config_str.to_string().contains(key_str) {
let json_str = config_str.as_str().unwrap();
let convert_result = serde_json::from_str(json_str);
if convert_result.is_ok() {
let json_value: Value = convert_result.unwrap();
let the_value = &json_value[key_str];
let log_file_path =
String::from(the_value.as_str().unwrap());
self.set_log_file_path(log_file_path);
}
}
}
}
ConfigMessageType::Connect => {
}
ConfigMessageType::Disconnect => {
}
};
} }
Ok(IIDMessage::new(MessageType::Invalid, None))
}
fn process_message(&mut self, msg: IIDMessage) -> Result<IIDMessage, NodeError> {
if msg.payload().is_some() {
let log_string = msg.clone().payload().as_ref().clone().unwrap().clone();
if self.log_string_to_file(&log_string).is_err() {
return Err(NodeError::new("Failed to write message to log file"));
}
}
Ok(msg.clone())
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct PassthroughNode {
data: Box<FBPNodeContext>,
}
impl PassthroughNode {
#[allow(dead_code)]
pub fn new() -> Self {
let result = PassthroughNode {
data: Box::new(FBPNodeContext::new("PassthroughNode")),
};
result.node_data().set_node_is_configured(true);
result.clone().start();
result
}
}
#[async_trait]
impl FBPNodeTrait for PassthroughNode {
fn node_data_clone(&self) -> FBPNodeContext {
self.data.deref().clone()
}
fn node_data(&self) -> &FBPNodeContext {
&self.data
}
fn node_data_mut(&mut self) -> &mut FBPNodeContext {
&mut self.data
}
fn process_message(
&mut self,
msg: IIDMessage,
) -> std::result::Result<IIDMessage, NodeError> {
if self
.node_data()
.get_num_items_for_receiver_vec(Some(LOGGER_GROUP.to_string()))
> 0
{
if msg.payload().is_some() {
let orig_payload = msg.payload().as_ref().unwrap().clone();
let mut new_payload =
"The PassthroughNode received a data message with this payload: "
.to_string();
new_payload.push_str(orig_payload.as_str());
let logger_msg = IIDMessage::new(MessageType::Data, Some(new_payload.clone()));
self.node_data()
.post_msg_to_group(logger_msg, Some(LOGGER_GROUP.to_string()));
}
}
Ok(msg.clone())
}
}
#[test]
fn multiple_outputs() {
let mut lg_node = LoggerNode::new();
lg_node.set_log_file_path("PassthroughNode_Log.txt".to_string());
let mut pt_node = PassthroughNode::new();
pt_node
.node_data_mut()
.add_receiver(lg_node.node_data_mut(), Some(LOGGER_GROUP.to_string()));
let mut lg_normal_node = LoggerNode::new();
lg_normal_node.set_log_file_path("Normal_Log.txt".to_string());
pt_node
.node_data_mut()
.add_receiver(lg_normal_node.node_data_mut(), None);
let msg_str = "It was the best of times, it was the worst of times".to_string();
let a_msg = IIDMessage::new(MessageType::Data, Some(msg_str.clone()));
pt_node.node_data().post_msg(a_msg);
thread::sleep(time::Duration::from_secs(2));
let log_str_result = lg_node.get_log_string();
assert!(log_str_result.is_ok());
let log_string = log_str_result.unwrap();
let good_log_string = "The PassthroughNode received a data message with this payload: It was the best of times, it was the worst of times".to_string();
assert_eq!(log_string, good_log_string);
let normal_log_str_result = lg_normal_node.get_log_string();
assert!(normal_log_str_result.is_ok());
let normal_log_string = normal_log_str_result.unwrap();
let good_normal_log_string =
"It was the best of times, it was the worst of times".to_string();
assert_eq!(normal_log_string, good_normal_log_string);
}
}