Skip to main content

ranvier_std/nodes/
flow.rs

1use async_trait::async_trait;
2use ranvier_core::{bus::Bus, outcome::Outcome, transition::Transition};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use std::marker::PhantomData;
6use std::time::Duration;
7
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct DelayNode<T> {
10    pub duration_ms: u64,
11    #[serde(skip)]
12    pub _marker: PhantomData<T>,
13}
14
15impl<T> DelayNode<T> {
16    pub fn new(duration_ms: u64) -> Self {
17        Self {
18            duration_ms,
19            _marker: PhantomData,
20        }
21    }
22}
23
24#[async_trait]
25impl<T> Transition<T, T> for DelayNode<T>
26where
27    T: Send + Sync + 'static,
28{
29    type Error = String;
30    type Resources = ();
31
32    async fn run(
33        &self,
34        input: T,
35        _resources: &Self::Resources,
36        _bus: &mut Bus,
37    ) -> Outcome<T, Self::Error> {
38        tokio::time::sleep(Duration::from_millis(self.duration_ms)).await;
39        Outcome::next(input)
40    }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
44pub struct IdentityNode<T> {
45    #[serde(skip)]
46    pub _marker: PhantomData<T>,
47}
48
49impl<T> IdentityNode<T> {
50    pub fn new() -> Self {
51        Self {
52            _marker: PhantomData,
53        }
54    }
55}
56
57impl<T> Default for IdentityNode<T> {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63#[async_trait]
64impl<T> Transition<T, T> for IdentityNode<T>
65where
66    T: Send + Sync + 'static,
67{
68    type Error = String;
69    type Resources = ();
70
71    async fn run(
72        &self,
73        input: T,
74        _resources: &Self::Resources,
75        _bus: &mut Bus,
76    ) -> Outcome<T, Self::Error> {
77        Outcome::next(input)
78    }
79}