pub mod action;
pub mod conditional_node;
pub mod default_node;
pub mod id_allocate;
pub mod loop_node;
pub mod router_node;
pub mod typed_action;
pub use action::{Action, EmptyAction};
pub use conditional_node::ConditionalNode;
pub use default_node::DefaultNode;
pub use loop_node::{LoopCondition, LoopNode};
pub use router_node::{Router, RouterNode};
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::{
DagrsResult,
connection::{in_channel::InChannels, out_channel::OutChannels},
utils::{env::EnvVar, output::Output},
};
use id_allocate::alloc_id;
#[async_trait]
pub trait Node: Send + Sync {
fn id(&self) -> NodeId;
fn name(&self) -> NodeName;
fn input_channels(&mut self) -> &mut InChannels;
fn output_channels(&mut self) -> &mut OutChannels;
async fn run(&mut self, env: Arc<EnvVar>) -> Output;
fn is_condition(&self) -> bool {
false
}
fn loop_structure(&self) -> Option<Vec<Arc<Mutex<dyn Node>>>> {
None
}
fn has_typed_input(&self) -> bool {
false
}
fn has_typed_output(&self) -> bool {
false
}
fn max_retries(&self) -> u32 {
0
}
fn retry_delay_ms(&self, _attempt: u32) -> u64 {
100
}
fn reset(&mut self) {}
fn restore_from_checkpoint(&mut self, _loop_count: usize) -> DagrsResult<()> {
Ok(())
}
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Ord, PartialOrd)]
pub struct NodeId(pub(crate) usize);
impl NodeId {
pub fn as_usize(&self) -> usize {
self.0
}
}
impl From<NodeId> for usize {
fn from(value: NodeId) -> Self {
value.0
}
}
pub type NodeName = String;
#[derive(Default)]
pub struct NodeTable(pub(crate) HashMap<NodeName, NodeId>);
pub const NODE_TABLE_STR: &str = "node_table";
impl NodeTable {
pub fn alloc_id_for(&mut self, name: &str) -> NodeId {
let id = alloc_id();
log::debug!("alloc id {:?} for {:?}", id, name);
if let Some(v) = self.0.insert(name.to_string(), id) {
log::warn!("Node {} is already allocated with id {:?}.", name, v);
};
id
}
pub fn get(&self, name: &str) -> Option<&NodeId> {
self.0.get(name)
}
pub fn new() -> Self {
Self::default()
}
}
impl EnvVar {
pub fn get_node_id(&self, node_name: &str) -> Option<&NodeId> {
self.get_ref::<NodeTable>(NODE_TABLE_STR)
.and_then(|node_table| node_table.get(node_name))
}
}