flowrs_wasm/flow/
app_state.rs1use std::{any::Any, collections::HashMap, ops::Add, sync::Arc};
2
3use serde::{Deserialize, Deserializer};
4use serde_json::Value;
5
6use flowrs::{
7 connection::{connect, ConnectError, Input, Output, RuntimeConnectable},
8 node::{Context, Node, State},
9};
10use flowrs_std::{add::AddNode, basic::BasicNode, debug::DebugNode};
11
12#[derive(Clone, Debug)]
13pub struct FlowType(pub Arc<dyn Any + Send + Sync>);
14
15pub trait RuntimeNode: Node + RuntimeConnectable {}
16impl<T> RuntimeNode for T where T: Node + RuntimeConnectable {}
17impl Add for FlowType {
24 type Output = FlowType;
25
26 fn add(self, rhs: Self) -> Self::Output {
27 if let Some(lhs) = self.0.downcast_ref::<i64>() {
28 if let Some(rhs) = rhs.0.downcast_ref::<i64>() {
29 return FlowType(Arc::new(lhs + rhs));
30 }
31 }
32 if let Some(lhs) = self.0.downcast_ref::<i32>() {
33 if let Some(rhs) = rhs.0.downcast_ref::<i32>() {
34 return FlowType(Arc::new(lhs + rhs));
35 }
36 }
37 if let Some(lhs) = self.0.downcast_ref::<String>() {
38 if let Some(rhs) = rhs.0.downcast_ref::<String>() {
39 let mut res = lhs.clone();
40 res.push_str(rhs);
41 return FlowType(Arc::new(res));
42 }
43 }
44 if let Some(lhs) = self.0.downcast_ref::<Value>() {
45 if let Some(rhs) = rhs.0.downcast_ref::<Value>() {
46 return match (lhs, rhs) {
47 (Value::Number(a), Value::Number(b)) => {
48 FlowType(Arc::new(a.as_f64().unwrap() + b.as_f64().unwrap()))
49 }
50 (Value::String(a), Value::String(b)) => {
51 let mut res = a.clone();
52 res.push_str(b);
53 FlowType(Arc::new(a.clone()))
54 }
55 (Value::Array(a), Value::Array(b)) => {
56 let mut res = a.clone();
57 res.append(b.to_owned().as_mut());
58 FlowType(Arc::new(a.clone()))
59 }
60 (a, b) => panic!(
61 "Addition of JSON values of type {:?} and {:?} is not supported.",
62 a, b
63 ),
64 };
65 }
66 }
67 panic!(
68 "Addition not supported for type {:?} and {:?}.",
69 self.type_id(),
70 rhs.type_id()
71 );
72 }
73}
74
75pub struct AppState {
76 pub nodes: Vec<Box<dyn RuntimeNode + Send>>,
79 pub node_idc: HashMap<String, usize>,
80 pub context: State<Context>,
81}
82
83impl AppState {
84 pub fn new() -> AppState {
85 AppState {
86 nodes: Vec::new(),
87 node_idc: HashMap::new(),
88 context: State::new(Context::new()),
89 }
90 }
91
92 pub fn add_node(&mut self, name: &str, kind: String, props: Value) -> String {
93 let node: Box<dyn RuntimeNode + Send> = match kind.as_str() {
94 "nodes.arithmetics.add" => Box::new(AddNode::<FlowType, FlowType, FlowType>::new(
95 name,
96 self.context.clone(),
97 Value::Null,
98 )),
99 "nodes.basic" => Box::new(BasicNode::new(
100 name,
101 self.context.clone(),
102 FlowType(Arc::new(props)),
103 )),
104 "nodes.debug" => Box::new(DebugNode::<FlowType>::new(
105 name,
106 self.context.clone(),
107 Value::Null,
108 )),
109 _ => panic!("Nodes of type {} are not yet supported.", kind),
110 };
111 self.nodes.push(node);
112 self.node_idc.insert(name.to_owned(), self.nodes.len() - 1);
113 name.to_owned()
114 }
115
116 pub fn connect_at(
117 &mut self,
118 lhs: String,
119 rhs: String,
120 index_in: usize,
121 index_out: usize,
122 ) -> Result<(), ConnectError<FlowType>> {
123 let lhs_idx = self.node_idc.get(&lhs).unwrap().clone();
124 let rhs_idx = self.node_idc.get(&rhs).unwrap().clone();
125 let out_edge = self.nodes[lhs_idx]
127 .output_at(index_out)
128 .downcast_ref::<Output<FlowType>>()
129 .expect(&format!(
130 "{} Nodes output at {} couldn't be downcasted",
131 lhs, index_in
132 ))
133 .clone();
134 let in_edge = self.nodes[rhs_idx]
135 .input_at(index_in)
136 .downcast_ref::<Input<FlowType>>()
137 .unwrap()
138 .to_owned();
139 connect(out_edge, in_edge);
140 Ok(())
141 }
142
143 pub fn run(&mut self) {
144 self.nodes.iter_mut().for_each(|job| job.on_init());
145 self.nodes.iter_mut().for_each(|job| job.on_ready());
146 for _ in 0..100 {
148 self.nodes.iter_mut().for_each(|job| {
149 let _ = job.update();
150 });
151 }
152 self.nodes.iter_mut().for_each(|job| job.on_shutdown());
153 }
154}
155
156#[derive(Deserialize)]
157struct JsonNode {
158 name: String,
159 kind: String,
160 props: Value,
161}
162
163#[derive(Deserialize)]
164struct JsonConnection {
165 node: String,
166 index: usize,
167}
168
169#[derive(Deserialize)]
170struct JsonEdge {
171 source: JsonConnection,
172 dest: JsonConnection,
173}
174
175#[derive(Deserialize)]
176struct JsonData {
177 nodes: Vec<JsonNode>,
178 edges: Vec<JsonEdge>,
179}
180
181impl<'de> Deserialize<'de> for AppState {
182 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183 where
184 D: Deserializer<'de>,
185 {
186 let mut app_state = AppState::new();
187 let json_data: JsonData = JsonData::deserialize(deserializer)?;
188 json_data.nodes.iter().for_each(|node| {
189 app_state.add_node(&node.name, node.kind.clone(), node.props.to_owned());
190 });
191 json_data.edges.iter().for_each(|edge| {
192 let _ = app_state.connect_at(
193 edge.source.node.clone(),
194 edge.dest.node.clone(),
195 edge.dest.index,
196 edge.source.index,
197 );
198 });
199
200 Ok(app_state)
201 }
202}