use crate::node::InputStream;
use futures::stream;
use std::any::Any;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
#[derive(Clone, Copy)]
pub enum MessageType {
Config,
Data,
}
pub type TestSender = mpsc::Sender<Arc<dyn Any + Send + Sync>>;
pub type TestSenderVec = Vec<mpsc::Sender<Arc<dyn Any + Send + Sync>>>;
#[async_trait::async_trait]
pub trait NodeFunction: Send + Sync {
async fn apply(
&self,
value: Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String>;
}
pub type NodeConfig = Arc<dyn NodeFunction>;
type OutputReceiver = tokio::sync::mpsc::Receiver<Arc<dyn Any + Send + Sync>>;
pub fn process_configurable_node<C, F, Fut>(
config_stream: InputStream,
data_stream: InputStream,
config_state: Arc<Mutex<Option<Arc<C>>>>,
process_data: F,
) -> (OutputReceiver, OutputReceiver)
where
C: Send + Sync + Clone + 'static,
F: Fn(Arc<dyn Any + Send + Sync>, &Arc<C>) -> Fut + Send + Sync + Clone + 'static,
Fut: std::future::Future<Output = Result<Option<Arc<dyn Any + Send + Sync>>, String>> + Send,
{
let config_stream = config_stream.map(|item| (MessageType::Config, item));
let data_stream = data_stream.map(|item| (MessageType::Data, item));
let merged_stream = stream::select(config_stream, data_stream);
let (out_tx, out_rx) = tokio::sync::mpsc::channel(10);
let (error_tx, error_rx) = tokio::sync::mpsc::channel(10);
let config_state_clone = Arc::clone(&config_state);
let out_tx_clone = out_tx.clone();
let error_tx_clone = error_tx.clone();
let process_data_clone = process_data.clone();
tokio::spawn(async move {
let mut merged = merged_stream;
let mut current_config: Option<Arc<C>> = None;
while let Some((msg_type, item)) = merged.next().await {
match msg_type {
MessageType::Config => {
if let Ok(cfg) = item.downcast::<C>() {
current_config = Some(cfg.clone());
*config_state_clone.lock().await = Some(cfg);
} else {
let error_msg: String = format!(
"Invalid configuration type - expected {}",
std::any::type_name::<C>()
);
let error_arc: Arc<dyn Any + Send + Sync> = Arc::new(error_msg);
let _ = error_tx_clone.send(error_arc).await;
}
}
MessageType::Data => {
match ¤t_config {
Some(cfg) => {
match process_data_clone(item, cfg).await {
Ok(Some(output)) => {
let _ = out_tx_clone.send(output).await;
}
Ok(None) => {
}
Err(error_msg) => {
let error_arc: Arc<dyn Any + Send + Sync> = Arc::new(error_msg);
let _ = error_tx_clone.send(error_arc).await;
}
}
}
None => {
let error_msg: String =
"No configuration set. Please send configuration before data.".to_string();
let error_arc: Arc<dyn Any + Send + Sync> = Arc::new(error_msg);
let _ = error_tx_clone.send(error_arc).await;
}
}
}
}
}
});
(out_rx, error_rx)
}
pub struct BaseNode {
pub name: String,
pub input_port_names: Vec<String>,
pub output_port_names: Vec<String>,
}
impl BaseNode {
pub fn new(name: String, input_port_names: Vec<String>, output_port_names: Vec<String>) -> Self {
Self {
name,
input_port_names,
output_port_names,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn set_name(&mut self, name: &str) {
self.name = name.to_string();
}
pub fn input_port_names(&self) -> &[String] {
&self.input_port_names
}
pub fn output_port_names(&self) -> &[String] {
&self.output_port_names
}
pub fn has_input_port(&self, name: &str) -> bool {
self.input_port_names.contains(&name.to_string())
}
pub fn has_output_port(&self, name: &str) -> bool {
self.output_port_names.contains(&name.to_string())
}
}