1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
/* ==========================================================================
File: fbp_wait_for_payload.rs
Description: This file provides the means for receiving an input message
an allowing code to wait until a payload has been received.
This can come in handle when sending a message to another
node and the result of that message is needed in order to
continue processing.
History: RustDev 08/23/2021 New functionality
Copyright © 2021 Pesa Switching Systems Inc. All rights reserved.
========================================================================== */
//! A FBP node that will allow for waiting on the reception of a message
//!
//! Somethimes especially when a TCP message is sent to another node, the
//! sending node may need to get the results of that send in order to be
//! able to continue processing. The WaitForPayloadNode allows for this.
//!
//!
use async_trait::async_trait;
use std::ops::{Deref};
use crate::fbp_iidmessage::*;
use crate::fbp_node_context::*;
use crate::fbp_node_error::*;
use crate::fbp_node_trait::*;
use crate::fbp_threadsafe_wrapper::*;
use crate::fbp_asyncstate::*;
// Define a type that will wait for a payload to be updated.
/// This can be useful when wanting to get the output of a node
/// within that node itself. One make this node a receiver of
/// the node whose output is desired and then have another node
/// call the get_payload async method which will wait for the
/// payload to arrive.
///
/// /// # Example
///
/// Basic usage:
/// ```
/// use async_trait::async_trait;
/// use std::ops::{Deref};
/// use serde::{Deserialize, Serialize};
///
/// use fbp::fbp_iidmessage::*;
/// use fbp::fbp_node_context::*;
/// use fbp::fbp_node_trait::*;
/// use fbp::fbp_wait_for_payload::*;
/// use fbp::fbp_node_error::*;
///
/// #[derive(Clone, Serialize, Deserialize)]
/// pub struct PassthroughNode {
/// data: Box<FBPNodeContext>,
/// }
///
/// impl PassthroughNode {
/// #[allow(dead_code)]
/// pub fn new() -> Self {
/// let result = PassthroughNode {
/// data: Box::new(FBPNodeContext::new("PassthroughNode")),
/// };
///
/// result.node_data().set_node_is_configured(true);
/// result.clone().start();
/// result
/// }
/// }
///
/// #[async_trait]
/// impl FBPNodeTrait for PassthroughNode {
/// fn node_data_clone(&self) -> FBPNodeContext {
/// self.data.deref().clone()
/// }
///
/// fn node_data(&self) -> &FBPNodeContext {
/// &self.data
/// }
///
/// fn node_data_mut(&mut self) -> &mut FBPNodeContext {
/// &mut self.data
/// }
///
/// fn process_message(
/// &mut self,
/// msg: IIDMessage,
/// ) -> std::result::Result<IIDMessage, NodeError> {
///
/// Ok(msg.clone())
/// }
/// }
///
/// let mut pt_node = PassthroughNode::new();
/// let mut wait_node = WaitForPayloadNode::new();
///
/// pt_node.node_data_mut().add_receiver(wait_node.node_data_mut(), None);
///
/// let a_msg = IIDMessage::new(MessageType::Data, Some("This is a payload".to_string()));
/// pt_node.node_data().post_msg(a_msg);
/// let mut rt = tokio::runtime::Builder::new()
/// .enable_all()
/// .build()
/// .unwrap();
///
/// let mut payload: String = String::new();
///
/// rt.block_on(async {
/// payload = wait_node.get_payload().await;
/// });
///
/// ```
#[derive(Clone)]
pub struct WaitForPayloadNode {
data: Box<FBPNodeContext>,
pub wait_for_payload: AsyncState,
pub payload: ThreadSafeOptionType<String>,
}
impl WaitForPayloadNode {
/// Create a new WaitForPayloadNode.
///
/// This node wil wait on a payload that is sent to it as an
/// IIDMessage.
pub fn new() -> Self {
let result = WaitForPayloadNode {
data: Box::new(FBPNodeContext::new("WaitForPayloadNode")),
wait_for_payload: AsyncState::new(),
payload: ThreadSafeOptionType::new(None),
};
result.node_data().set_node_is_configured(true);
result.clone().start();
result
}
/// Get the payload sent to this node. This is an async method
/// that will wait for an IIDMesssage to be sent to this node and
/// will return the the payload of that message when it arrives.
#[allow(dead_code)]
pub async fn get_payload(&self) -> String {
if self.payload.is_some() {
return self.payload.get_option().as_ref().unwrap().clone();
}
// Wait for data
let wait_ref = self.wait_for_payload.clone();
wait_ref.await;
if self.payload.is_some() {
return self.payload.get_option().as_ref().unwrap().clone();
}
// Just in case :-)
String::new()
}
}
#[async_trait]
impl FBPNodeTrait for WaitForPayloadNode {
fn node_data_clone(&self) -> FBPNodeContext {
self.data.deref().clone()
}
fn node_data(&self) -> &FBPNodeContext { &self.data }
fn node_data_mut(&mut self) -> &mut FBPNodeContext { &mut self.data }
// When an IIDMessage is received, it will be set into the payload field and
// the AsyncState will be signaled.
fn process_message(&mut self, msg: IIDMessage) -> Result<IIDMessage, NodeError> {
if msg.payload().is_some() {
let the_payload = msg.payload().as_ref().unwrap().clone();
self.payload.set_option(Some(the_payload));
self.wait_for_payload.set_is_ready(true);
}
Ok(msg.clone())
}
}