use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use dagrs::{
DefaultNode, EnvVar, Graph, Node, NodeTable, Output,
connection::{in_channel::TypedInChannels, out_channel::TypedOutChannels},
node::typed_action::TypedAction,
};
use tokio::time::sleep;
#[derive(Default)]
pub struct SenderAction {
message: String,
}
impl SenderAction {
pub fn new(message: String) -> Self {
Self { message }
}
}
#[async_trait]
impl TypedAction for SenderAction {
type I = ();
type O = String;
async fn run(
&self,
_: TypedInChannels<Self::I>,
out: TypedOutChannels<Self::O>,
_: Arc<EnvVar>,
) -> Output {
out.broadcast(self.message.clone()).await;
Output::Out(None)
}
}
#[derive(Default)]
pub struct SlowSenderAction {
message: String,
}
impl SlowSenderAction {
pub fn new(message: String) -> Self {
Self { message }
}
}
#[async_trait]
impl TypedAction for SlowSenderAction {
type I = ();
type O = String;
async fn run(
&self,
_: TypedInChannels<Self::I>,
out: TypedOutChannels<Self::O>,
_: Arc<EnvVar>,
) -> Output {
sleep(Duration::from_millis(500)).await;
out.broadcast(self.message.clone()).await;
Output::Out(None)
}
}
#[derive(Default)]
pub struct ReceiverAction;
#[async_trait]
impl TypedAction for ReceiverAction {
type I = String;
type O = ();
async fn run(
&self,
mut input: TypedInChannels<Self::I>,
_: TypedOutChannels<Self::O>,
_: Arc<EnvVar>,
) -> Output {
match input.recv_any().await {
Ok((sender_id, content)) => {
let message = content.unwrap();
println!("Received message '{}' from node {:?}", message, sender_id);
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
}
}
match input.recv_any().await {
Ok((sender_id, content)) => {
let message = content.unwrap();
println!("Received message '{}' from node {:?}", message, sender_id);
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
}
}
Output::Out(None)
}
}
#[tokio::main]
async fn main() {
let mut node_table = NodeTable::new();
let sender1 = DefaultNode::with_action(
"Sender1".to_string(),
SenderAction::new("Hello from Sender".to_string()),
&mut node_table,
);
let sender2 = DefaultNode::with_action(
"Sender2".to_string(),
SlowSenderAction::new("Hello from SlowSender".to_string()),
&mut node_table,
);
let receiver =
DefaultNode::with_action("Receiver".to_string(), ReceiverAction, &mut node_table);
let sender1_id = sender1.id();
let sender2_id = sender2.id();
let receiver_id = receiver.id();
let mut graph = Graph::new();
graph.add_node(sender1).unwrap();
graph.add_node(sender2).unwrap();
graph.add_node(receiver).unwrap();
graph.add_edge(sender1_id, vec![receiver_id]).unwrap();
graph.add_edge(sender2_id, vec![receiver_id]).unwrap();
match graph.async_start().await {
Ok(_) => (),
Err(e) => {
eprintln!("Graph execution failed: {:?}", e);
}
}
}