flowrs_wasm/flow/
app_state.rs

1use std::{any::Any, collections::HashMap, ops::Add, sync::Arc};
2
3use serde::{Deserialize, Deserializer};
4use serde_json::Value;
5
6use flowrs::{
7    connection::{connect, ConnectError, Input, Output, RuntimeConnectable},
8    node::{Context, Node, State},
9};
10use flowrs_std::{add::AddNode, basic::BasicNode, debug::DebugNode};
11
12#[derive(Clone, Debug)]
13pub struct FlowType(pub Arc<dyn Any + Send + Sync>);
14
15pub trait RuntimeNode: Node + RuntimeConnectable {}
16impl<T> RuntimeNode for T where T: Node + RuntimeConnectable {}
17// This implementation gives some control over which types should be
18// addable throughout the entire flow. As of now only homogenious types
19// allow addition.
20// As the Properties of a Node can be any JSON value, the addition of
21// such properties is limited to numbers (casted as float), lists and
22// strings (both concatinated upon addition).
23impl Add for FlowType {
24    type Output = FlowType;
25
26    fn add(self, rhs: Self) -> Self::Output {
27        if let Some(lhs) = self.0.downcast_ref::<i64>() {
28            if let Some(rhs) = rhs.0.downcast_ref::<i64>() {
29                return FlowType(Arc::new(lhs + rhs));
30            }
31        }
32        if let Some(lhs) = self.0.downcast_ref::<i32>() {
33            if let Some(rhs) = rhs.0.downcast_ref::<i32>() {
34                return FlowType(Arc::new(lhs + rhs));
35            }
36        }
37        if let Some(lhs) = self.0.downcast_ref::<String>() {
38            if let Some(rhs) = rhs.0.downcast_ref::<String>() {
39                let mut res = lhs.clone();
40                res.push_str(rhs);
41                return FlowType(Arc::new(res));
42            }
43        }
44        if let Some(lhs) = self.0.downcast_ref::<Value>() {
45            if let Some(rhs) = rhs.0.downcast_ref::<Value>() {
46                return match (lhs, rhs) {
47                    (Value::Number(a), Value::Number(b)) => {
48                        FlowType(Arc::new(a.as_f64().unwrap() + b.as_f64().unwrap()))
49                    }
50                    (Value::String(a), Value::String(b)) => {
51                        let mut res = a.clone();
52                        res.push_str(b);
53                        FlowType(Arc::new(a.clone()))
54                    }
55                    (Value::Array(a), Value::Array(b)) => {
56                        let mut res = a.clone();
57                        res.append(b.to_owned().as_mut());
58                        FlowType(Arc::new(a.clone()))
59                    }
60                    (a, b) => panic!(
61                        "Addition of JSON values of type {:?} and {:?} is not supported.",
62                        a, b
63                    ),
64                };
65            }
66        }
67        panic!(
68            "Addition not supported for type {:?} and {:?}.",
69            self.type_id(),
70            rhs.type_id()
71        );
72    }
73}
74
75pub struct AppState {
76    // For a yet TBD reason a HashMap of dyn types looses track of channel pointers.
77    // As a workaround Nodes are resolved in a two step process and stored in a Vec.
78    pub nodes: Vec<Box<dyn RuntimeNode + Send>>,
79    pub node_idc: HashMap<String, usize>,
80    pub context: State<Context>,
81}
82
83impl AppState {
84    pub fn new() -> AppState {
85        AppState {
86            nodes: Vec::new(),
87            node_idc: HashMap::new(),
88            context: State::new(Context::new()),
89        }
90    }
91
92    pub fn add_node(&mut self, name: &str, kind: String, props: Value) -> String {
93        let node: Box<dyn RuntimeNode + Send> = match kind.as_str() {
94            "nodes.arithmetics.add" => Box::new(AddNode::<FlowType, FlowType, FlowType>::new(
95                name,
96                self.context.clone(),
97                Value::Null,
98            )),
99            "nodes.basic" => Box::new(BasicNode::new(
100                name,
101                self.context.clone(),
102                FlowType(Arc::new(props)),
103            )),
104            "nodes.debug" => Box::new(DebugNode::<FlowType>::new(
105                name,
106                self.context.clone(),
107                Value::Null,
108            )),
109            _ => panic!("Nodes of type {} are not yet supported.", kind),
110        };
111        self.nodes.push(node);
112        self.node_idc.insert(name.to_owned(), self.nodes.len() - 1);
113        name.to_owned()
114    }
115
116    pub fn connect_at(
117        &mut self,
118        lhs: String,
119        rhs: String,
120        index_in: usize,
121        index_out: usize,
122    ) -> Result<(), ConnectError<FlowType>> {
123        let lhs_idx = self.node_idc.get(&lhs).unwrap().clone();
124        let rhs_idx = self.node_idc.get(&rhs).unwrap().clone();
125        // TODO: RefCell is not an ideal solution here.
126        let out_edge = self.nodes[lhs_idx]
127            .output_at(index_out)
128            .downcast_ref::<Output<FlowType>>()
129            .expect(&format!(
130                "{} Nodes output at {} couldn't be downcasted",
131                lhs, index_in
132            ))
133            .clone();
134        let in_edge = self.nodes[rhs_idx]
135            .input_at(index_in)
136            .downcast_ref::<Input<FlowType>>()
137            .unwrap()
138            .to_owned();
139        connect(out_edge, in_edge);
140        Ok(())
141    }
142
143    pub fn run(&mut self) {
144        self.nodes.iter_mut().for_each(|job| job.on_init());
145        self.nodes.iter_mut().for_each(|job| job.on_ready());
146        // Capped at 100 here for testing purposes. TODO: change to infinite loop with stop condition.
147        for _ in 0..100 {
148            self.nodes.iter_mut().for_each(|job| {
149                let _ = job.update();
150            });
151        }
152        self.nodes.iter_mut().for_each(|job| job.on_shutdown());
153    }
154}
155
156#[derive(Deserialize)]
157struct JsonNode {
158    name: String,
159    kind: String,
160    props: Value,
161}
162
163#[derive(Deserialize)]
164struct JsonConnection {
165    node: String,
166    index: usize,
167}
168
169#[derive(Deserialize)]
170struct JsonEdge {
171    source: JsonConnection,
172    dest: JsonConnection,
173}
174
175#[derive(Deserialize)]
176struct JsonData {
177    nodes: Vec<JsonNode>,
178    edges: Vec<JsonEdge>,
179}
180
181impl<'de> Deserialize<'de> for AppState {
182    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183    where
184        D: Deserializer<'de>,
185    {
186        let mut app_state = AppState::new();
187        let json_data: JsonData = JsonData::deserialize(deserializer)?;
188        json_data.nodes.iter().for_each(|node| {
189            app_state.add_node(&node.name, node.kind.clone(), node.props.to_owned());
190        });
191        json_data.edges.iter().for_each(|edge| {
192            let _ = app_state.connect_at(
193                edge.source.node.clone(),
194                edge.dest.node.clone(),
195                edge.dest.index,
196                edge.source.index,
197            );
198        });
199
200        Ok(app_state)
201    }
202}