Skip to main content

ranvier_std/nodes/
flow.rs

1use async_trait::async_trait;
2use ranvier_core::{bus::Bus, outcome::Outcome, transition::Transition};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use std::marker::PhantomData;
6use std::time::Duration;
7
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct DelayNode<T> {
10    pub duration_ms: u64,
11    #[serde(skip)]
12    pub _marker: PhantomData<T>,
13}
14
15impl<T> DelayNode<T> {
16    pub fn new(duration_ms: u64) -> Self {
17        Self {
18            duration_ms,
19            _marker: PhantomData,
20        }
21    }
22}
23
24#[async_trait]
25impl<T> Transition<T, T> for DelayNode<T>
26where
27    T: Send + Sync + 'static,
28{
29    type Error = String;
30    type Resources = ();
31
32    async fn run(
33        &self,
34        input: T,
35        _resources: &Self::Resources,
36        _bus: &mut Bus,
37    ) -> Outcome<T, Self::Error> {
38        tokio::time::sleep(Duration::from_millis(self.duration_ms)).await;
39        Outcome::next(input)
40    }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
44pub struct IdentityNode<T> {
45    #[serde(skip)]
46    pub _marker: PhantomData<T>,
47}
48
49impl<T> IdentityNode<T> {
50    pub fn new() -> Self {
51        Self {
52            _marker: PhantomData,
53        }
54    }
55}
56
57impl<T> Default for IdentityNode<T> {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63#[async_trait]
64impl<T> Transition<T, T> for IdentityNode<T>
65where
66    T: Send + Sync + 'static,
67{
68    type Error = String;
69    type Resources = ();
70
71    async fn run(
72        &self,
73        input: T,
74        _resources: &Self::Resources,
75        _bus: &mut Bus,
76    ) -> Outcome<T, Self::Error> {
77        Outcome::next(input)
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    #[tokio::test]
86    async fn delay_node_passes_after_sleep() {
87        let node = DelayNode::<String>::new(10); // 10ms
88        let mut bus = Bus::new();
89        let start = std::time::Instant::now();
90        let result = node.run("data".into(), &(), &mut bus).await;
91        assert!(start.elapsed().as_millis() >= 9);
92        assert!(matches!(result, Outcome::Next(ref v) if v == "data"));
93    }
94
95    #[tokio::test]
96    async fn identity_node_passes_through() {
97        let node = IdentityNode::<i32>::new();
98        let mut bus = Bus::new();
99        let result = node.run(42, &(), &mut bus).await;
100        assert!(matches!(result, Outcome::Next(42)));
101    }
102}