Module fbp::fbp_node_trait[][src]

Expand description

FBP Node Trait

This trait provides the functionality of an FBP node. All structs that are to be FBP nodes must implement this trait. The good news is that most of the involved behaviors have already been implemented by this trait.

The following is an example of a simple Flow Based Programming system using three FBP nodes

Example

use serde::{Deserialize, Serialize};
use serde_json::*;
use std::io::{Error, ErrorKind, Read, Write};
use std::sync::{Arc, Mutex};
use std::fs::{File, OpenOptions};
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::result::Result;
use async_trait::async_trait;
use std::any::Any;

use crate::fbp::fbp_iidmessage::*;
use fbp::fbp_node_context::*;
use fbp::fbp_node_trait::*;
use fbp::fbp_node_error::*;
use fbp::fbp_threadsafe_wrapper::{ThreadSafeType, ThreadSafeOptionType};

// This FBP node simply passes incoming IIDMessages to any nodes that
// have registered to receive the output of this node.
#[derive(Clone, Serialize, Deserialize)]
pub struct PassthroughNode {
   data: Box<FBPNodeContext>,
}

impl PassthroughNode {

   pub fn new() -> Self {
       let mut result = PassthroughNode {
           data: Box::new(FBPNodeContext::new("PassthroughNode")),
       };

       result.node_data().set_node_is_configured(true);
       result.clone().start();
       result
   }
}


#[async_trait]
impl FBPNodeTrait for PassthroughNode {

   fn node_data_clone(&self) -> FBPNodeContext {
       self.data.deref().clone()
   }

   fn node_data(&self) -> &FBPNodeContext { &self.data }

   fn node_data_mut(&mut self) -> &mut FBPNodeContext { &mut self.data }

   fn process_message(&mut self, msg: IIDMessage) -> std::result::Result<IIDMessage, NodeError> {
       Ok(msg.clone())
   }
}

// This FBP Node will take an incoming IIDMessage and append data to the
// payload of the message and then send it on.
#[derive(Clone, Serialize, Deserialize)]
pub struct AppendNode {
   data: Box<FBPNodeContext>,

   #[serde(skip)]
   append_data: ThreadSafeOptionType<String>,
}

impl AppendNode {
   pub fn new() -> Self {
       let mut result = AppendNode {
           data: Box::new(FBPNodeContext::new("AppendNode")),
           append_data: ThreadSafeOptionType::new(None),
       };

       result.clone().start();
       result
   }

   pub fn set_append_data(&mut self, data: String) {
       self.append_data.set_option(Some(data));
       // This is the only outstanding field that needed to be configured
       // once set, the node is configured.
       self.data.set_node_is_configured(true);
   }
}

#[async_trait]
impl FBPNodeTrait for AppendNode {

   fn node_data_clone(&self) -> FBPNodeContext {
       self.data.deref().clone()
   }

   fn node_data(&self) -> &FBPNodeContext { &self.data }

   fn node_data_mut(&mut self) -> &mut FBPNodeContext { &mut self.data }

   // Here is an example of a node needing additional data before it can start processing
   // incoming IIDMessages.  The AppendNode FBP Node needs to be configured with the
   // string that will be appended to incoming messages.  That is why the process_config
   // method is implemented.  It will parse the incoming Config message and will then call
   // the set_append_data method after the string has been extracted from the payload.
   fn process_config(&mut self, msg: IIDMessage) -> std::result::Result<IIDMessage, NodeError> {
       if msg.msg_type() == MessageType::Config {
           if msg.payload().is_some() {
               let payload = msg.payload().as_ref().unwrap();
               let config_message: ConfigMessage = serde_json::from_str(&payload)
                   .expect("Failed to deserialize the config message");

               match config_message.msg_type() {
                   ConfigMessageType::Field => {
                       if config_message.data().as_ref().is_some() {
                           let config_str = json!(config_message.data().as_ref().unwrap());
                           let key_str = "append_data";
                           if config_str.to_string().contains(key_str) {
                               let json_str = config_str.as_str().unwrap();

                               let convert_result = serde_json::from_str(json_str);
                               if convert_result.is_ok() {
                                   let json_value: Value = convert_result.unwrap();
                                   let the_value = &json_value[key_str];
                                   let append_str = String::from(the_value.as_str().unwrap());

                                   self.set_append_data(append_str);
                               }
                           }
                       }
                   }
                   ConfigMessageType::Connect => {
                       // Deal with a Connect
                       // This is not implemented for this example
                   }
                   ConfigMessageType::Disconnect => {
                       // Deai with a Disconnect
                       // This is not implemented for this example
                   }
               };
           } //  if msg.payload.is_some()
       } // if msg.msg_type == MessageType::Config

       // Configuration messages should almost never be propagated as they relate to a specific
       // FBP node.  Sending an Invalid message will stop message propagation.
       Ok(IIDMessage::new(MessageType::Invalid, None))
   }

   // Given that the AppendNode does some work, it needs to implement the process_message
   // method to do that work
   fn process_message(&mut self, msg: IIDMessage) -> Result<IIDMessage, NodeError> {
       let string_ref = self.append_data.get_option().as_ref().unwrap().clone();

       if msg.payload().is_some() {
           let mut payload = msg.payload().as_ref().unwrap().clone();
           if self.append_data.is_some() {
               payload.push_str(string_ref.as_str());
           }

           let new_msg = IIDMessage::new(MessageType::Data, Some(payload));
           return Ok(new_msg);
       } else {
           if self.append_data.is_some() {
               let new_msg = IIDMessage::new(MessageType::Data, Some(string_ref));
               return Ok(new_msg);
           }
       }

       Ok(msg.clone())
   }
}

#[derive(Clone, Serialize, Deserialize)]
pub struct LoggerNode {
   data: Box<FBPNodeContext>,

   #[serde(skip)]
   log_file_path: ThreadSafeOptionType<String>,
}

impl LoggerNode {
   pub fn new() -> Self {
       let mut result = LoggerNode {
           data: Box::new(FBPNodeContext::new("LoggerNode")),
           log_file_path: ThreadSafeOptionType::new(None),
       };

       result.clone().start();
       result
   }


   pub fn set_log_file_path(&mut self, log_file_path: String) {
       self.log_file_path.set_option( Some(log_file_path));

       // Ensure the File
       let string_ref = self.log_file_path.get_option().as_ref().unwrap().clone();
       let file_path = Path::new(string_ref.as_str());
       let file = File::create(file_path).expect("Unable to create file");
       drop(file);

       self.data.set_node_is_configured(true);
   }

   pub fn get_log_string(&self) -> Result<String, Error> {
       if self.log_file_path.is_none() {
           return Err(Error::new(ErrorKind::Other, "Cannot get log string until the node is setup"));
       }

       let mut contents = String::new();
       let string_ref = self.log_file_path.get_option().as_ref().unwrap().clone();

       let file_path = Path::new(string_ref.as_str());
       let mut file = OpenOptions::new().read(true)
           .open(file_path)
           .expect("Failed to open file {} for reading");

       file.read_to_string(&mut contents)
           .expect("Failed to write contents to string");

       Ok(contents)
   }

   pub fn log_string_to_file(&self, data: &String) -> Result<(), Error> {
       if self.log_file_path.is_none() {
           return Err(Error::new(ErrorKind::Other, "Cannot get log to file until the node is setup"));
       }

       let string_ref = self.log_file_path.get_option().as_ref().unwrap().clone();
       let file_path = Path::new(string_ref.as_str());

       let mut file = OpenOptions::new().append(true)
           .open(file_path)
           .expect("Failed to open file for append");

       let string_to_write = data.clone();
       let string_to_write = string_to_write.replace("\0", "");

       let _write_result = file.write(string_to_write.as_bytes());
       Ok(())
   }
}

#[async_trait]
impl FBPNodeTrait for LoggerNode {

   fn node_data_clone(&self) -> FBPNodeContext {
       self.data.deref().clone()
   }

   fn node_data(&self) -> &FBPNodeContext { &self.data }

   fn node_data_mut(&mut self) -> &mut FBPNodeContext { &mut self.data }

   // Implement the process_config to use the log file path
   fn process_config(&mut self, msg: IIDMessage) -> std::result::Result<IIDMessage, NodeError> {
       if msg.msg_type() == MessageType::Config {
           if msg.payload().is_some() {
               let payload = msg.payload().as_ref().unwrap();
               let config_message: ConfigMessage = serde_json::from_str(&payload)
                   .expect("Failed to deserialize the config message");

               match config_message.msg_type() {
                   ConfigMessageType::Field => {
                       if config_message.data().as_ref().is_some() {
                           let config_str = json!(config_message.data().as_ref().unwrap());
                           let key_str = "log_file_path";
                           if config_str.to_string().contains(key_str) {
                               let json_str = config_str.as_str().unwrap();
                               let convert_result = serde_json::from_str(json_str);
                               if convert_result.is_ok() {
                                   let json_value: Value = convert_result.unwrap();
                                   let the_value = &json_value[key_str];
                                   let log_file_path = String::from(the_value.as_str().unwrap());
                                   self.set_log_file_path(log_file_path);
                               }
                           }
                       }
                   }
                   ConfigMessageType::Connect => {
                       // Deal with a Connect
                       // This is not implemented for this example
                   }
                   ConfigMessageType::Disconnect => {
                       // Deai with a Disconnect
                       // This is not implemented for this example
                   }
               };
           } //  if msg.payload.is_some()
       } // if msg.msg_type == MessageType::Config

       Ok(IIDMessage::new(MessageType::Invalid, None))
   }

   // Implement the process_message to do the work of this node by writing the log to a file
   fn process_message(&mut self, msg: IIDMessage) -> Result<IIDMessage, NodeError> {
       if msg.payload().is_some() {
           if self.log_string_to_file(&msg.clone().payload().as_ref().unwrap()).is_err() {
               return Err(NodeError::new("Failed to write message to log file"));
           }
       }

       Ok(msg.clone())
   }
}

Traits