code0_flow/flow_store/
service.rs

1use crate::flow_store::connection::FlowStore;
2use async_trait::async_trait;
3use log::error;
4use redis::{AsyncCommands, JsonAsyncCommands, RedisError, RedisResult};
5use tucana::shared::{Flow, Flows};
6
7#[derive(Debug)]
8pub struct FlowStoreError {
9    pub kind: FlowStoreErrorKind,
10    pub flow_id: i64,
11    pub reason: String,
12}
13
14#[derive(Debug)]
15pub enum FlowStoreErrorKind {
16    Serialization,
17    RedisOperation,
18}
19
20/// Trait representing a service for managing flows in a Redis.
21#[async_trait]
22pub trait FlowStoreServiceBase {
23    async fn new(redis_client_arc: FlowStore) -> Self;
24    async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError>;
25    async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError>;
26    async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError>;
27    async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError>;
28    async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError>;
29}
30
31/// Struct representing a service for managing flows in a Redis.
32#[derive(Clone)]
33pub struct FlowStoreService {
34    pub(crate) redis_client_arc: FlowStore,
35}
36
37/// Implementation of a service for managing flows in a Redis.
38#[async_trait]
39impl FlowStoreServiceBase for FlowStoreService {
40    async fn new(redis_client_arc: FlowStore) -> FlowStoreService {
41        FlowStoreService { redis_client_arc }
42    }
43
44    /// Insert a list of flows into Redis
45    async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError> {
46        let mut connection = self.redis_client_arc.lock().await;
47
48        let insert_result: RedisResult<()> = connection
49            .json_set(flow.flow_id.to_string(), "$", &flow)
50            .await;
51
52        match insert_result {
53            Err(redis_error) => {
54                error!("An Error occurred {}", redis_error);
55                Err(FlowStoreError {
56                    flow_id: flow.flow_id,
57                    kind: FlowStoreErrorKind::RedisOperation,
58                    reason: redis_error.to_string(),
59                })
60            }
61            _ => Ok(1),
62        }
63    }
64
65    /// Insert a flows into Redis
66    async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError> {
67        let mut total_modified = 0;
68
69        for flow in flows.flows {
70            let result = self.insert_flow(flow).await?;
71            total_modified += result;
72        }
73
74        Ok(total_modified)
75    }
76
77    /// Deletes a flow
78    async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError> {
79        let mut connection = self.redis_client_arc.lock().await;
80        let deleted_flow: RedisResult<i64> = connection.json_del(flow_id, ".").await;
81
82        match deleted_flow {
83            Ok(int) => Ok(int),
84            Err(redis_error) => {
85                error!("An Error occurred {}", redis_error);
86                Err(redis_error)
87            }
88        }
89    }
90
91    /// Deletes a list of flows
92    async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError> {
93        let mut total_modified = 0;
94
95        for id in flow_ids {
96            let result = self.delete_flow(id).await?;
97            total_modified += result;
98        }
99
100        Ok(total_modified)
101    }
102
103    /// Queries for all ids in the redis
104    /// Returns `Result<Vec<i64>, RedisError>`: Result of the flow ids currently in Redis
105    async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError> {
106        let mut connection = self.redis_client_arc.lock().await;
107
108        let string_keys: Vec<String> = {
109            match connection.keys("*").await {
110                Ok(res) => res,
111                Err(error) => {
112                    error!("Can't retrieve keys from redis. Reason: {error}");
113                    return Err(error);
114                }
115            }
116        };
117
118        let int_keys: Vec<i64> = string_keys
119            .into_iter()
120            .filter_map(|key| key.parse::<i64>().ok())
121            .collect();
122
123        Ok(int_keys)
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use crate::flow_store::connection::create_flow_store_connection;
130    use crate::flow_store::connection::FlowStore;
131    use crate::flow_store::service::FlowStoreService;
132    use crate::flow_store::service::FlowStoreServiceBase;
133    use redis::{AsyncCommands, JsonAsyncCommands};
134    use serial_test::serial;
135    use testcontainers::core::IntoContainerPort;
136    use testcontainers::core::WaitFor;
137    use testcontainers::runners::AsyncRunner;
138    use testcontainers::GenericImage;
139    use tucana::shared::{Flow, Flows};
140
141    macro_rules! redis_integration_test {
142        ($test_name:ident, $consumer:expr) => {
143            #[tokio::test]
144            #[serial]
145            async fn $test_name() {
146                let port: u16 = 6379;
147                let image_name = "redis/redis-stack";
148                let wait_message = "Ready to accept connections";
149
150                let container = GenericImage::new(image_name, "latest")
151                    .with_exposed_port(port.tcp())
152                    .with_wait_for(WaitFor::message_on_stdout(wait_message))
153                    .start()
154                    .await
155                    .unwrap();
156
157                let host = container.get_host().await.unwrap();
158                let host_port = container.get_host_port_ipv4(port).await.unwrap();
159                let url = format!("redis://{host}:{host_port}");
160                println!("Redis server started correctly on: {}", url.clone());
161
162                let connection = create_flow_store_connection(url).await;
163
164                {
165                    let mut con = connection.lock().await;
166
167                    let _: () = redis::cmd("FLUSHALL")
168                        .query_async(&mut **con)
169                        .await
170                        .expect("FLUSHALL command failed");
171                }
172
173                let base = FlowStoreService::new(connection.clone()).await;
174
175                $consumer(connection, base).await;
176                let _ = container.stop().await;
177            }
178        };
179    }
180
181    redis_integration_test!(
182        insert_one_flow,
183        (|connection: FlowStore, mut service: FlowStoreService| async move {
184            let flow = Flow {
185                flow_id: 1,
186                r#type: "".to_string(),
187                settings: vec![],
188                starting_node: None,
189                data_types: vec![],
190                input_type_identifier: None,
191                return_type_identifier: None,
192                project_id: 1,
193            };
194
195            match service.insert_flow(flow.clone()).await {
196                Ok(i) => println!("{}", i),
197                Err(err) => println!("{}", err.reason),
198            };
199
200            let redis_result: Option<String> = {
201                let mut redis_cmd = connection.lock().await;
202                redis_cmd.json_get("1", "$").await.unwrap()
203            };
204
205            println!("{}", redis_result.clone().unwrap());
206
207            assert!(redis_result.is_some());
208            let redis_flow: Vec<Flow> = serde_json::from_str(&*redis_result.unwrap()).unwrap();
209            assert_eq!(redis_flow[0], flow);
210        })
211    );
212
213    //    Broke after switching to redis :( need fix
214    //    redis_integration_test!(
215    //        insert_will_overwrite_existing_flow,
216    //        (|connection: FlowStore, mut service: FlowStoreService| async move {
217    //            let flow = Flow {
218    //                flow_id: 1,
219    //                r#type: "".to_string(),
220    //                settings: vec![],
221    //                starting_node: None,
222    //            };
223    //
224    //            match service.insert_flow(flow.clone()).await {
225    //                Ok(i) => println!("{}", i),
226    //                Err(err) => println!("{}", err.reason),
227    //            };
228    //
229    //            let flow_overwrite = Flow {
230    //                flow_id: 1,
231    //                r#type: "ABC".to_string(),
232    //                settings: vec![],
233    //                starting_node: None,
234    //            };
235    //
236    //            let _ = service.insert_flow(flow_overwrite).await;
237    //            let amount = service.get_all_flow_ids().await;
238    //            assert_eq!(amount.unwrap().len(), 1);
239    //
240    //            let redis_result: Vec<String> = {
241    //                let mut redis_cmd = connection.lock().await;
242    //                redis_cmd.json_get("1", "$").await.unwrap()
243    //            };
244    //
245    //            assert_eq!(redis_result.len(), 1);
246    //            let string: &str = &*redis_result[0];
247    //            let redis_flow: Flow = serde_json::from_str(string).unwrap();
248    //            assert_eq!(redis_flow.r#type, "ABC".to_string());
249    //        })
250    //    );
251    //
252    redis_integration_test!(
253        insert_many_flows,
254        (|_connection: FlowStore, mut service: FlowStoreService| async move {
255            let flow_one = Flow {
256                flow_id: 1,
257                r#type: "".to_string(),
258                settings: vec![],
259                data_types: vec![],
260                input_type_identifier: None,
261                return_type_identifier: None,
262                project_id: 1,
263                starting_node: None,
264            };
265
266            let flow_two = Flow {
267                flow_id: 2,
268                r#type: "".to_string(),
269                settings: vec![],
270                data_types: vec![],
271                input_type_identifier: None,
272                return_type_identifier: None,
273                project_id: 1,
274                starting_node: None,
275            };
276
277            let flow_three = Flow {
278                flow_id: 3,
279                r#type: "".to_string(),
280                settings: vec![],
281                starting_node: None,
282                data_types: vec![],
283                input_type_identifier: None,
284                return_type_identifier: None,
285                project_id: 1,
286            };
287
288            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
289            let flows = Flows { flows: flow_vec };
290
291            let amount = service.insert_flows(flows).await.unwrap();
292            assert_eq!(amount, 3);
293        })
294    );
295
296    redis_integration_test!(
297        delete_one_existing_flow,
298        (|connection: FlowStore, mut service: FlowStoreService| async move {
299            let flow = Flow {
300                flow_id: 1,
301                r#type: "".to_string(),
302                settings: vec![],
303                starting_node: None,
304                data_types: vec![],
305                input_type_identifier: None,
306                return_type_identifier: None,
307                project_id: 1,
308            };
309
310            match service.insert_flow(flow.clone()).await {
311                Ok(i) => println!("{}", i),
312                Err(err) => println!("{}", err.reason),
313            };
314
315            let result = service.delete_flow(1).await;
316
317            assert_eq!(result.unwrap(), 1);
318
319            let redis_result: Option<String> = {
320                let mut redis_cmd = connection.lock().await;
321                redis_cmd.get("1").await.unwrap()
322            };
323
324            assert!(redis_result.is_none());
325        })
326    );
327
328    redis_integration_test!(
329        delete_one_non_existing_flow,
330        (|_connection: FlowStore, mut service: FlowStoreService| async move {
331            let result = service.delete_flow(1).await;
332            assert_eq!(result.unwrap(), 0);
333        })
334    );
335
336    redis_integration_test!(
337        delete_many_existing_flows,
338        (|_connection: FlowStore, mut service: FlowStoreService| async move {
339            let flow_one = Flow {
340                flow_id: 1,
341                r#type: "".to_string(),
342                settings: vec![],
343                starting_node: None,
344                data_types: vec![],
345                input_type_identifier: None,
346                return_type_identifier: None,
347                project_id: 1,
348            };
349
350            let flow_two = Flow {
351                flow_id: 2,
352                r#type: "".to_string(),
353                settings: vec![],
354                starting_node: None,
355                data_types: vec![],
356                input_type_identifier: None,
357                return_type_identifier: None,
358                project_id: 1,
359            };
360
361            let flow_three = Flow {
362                flow_id: 3,
363                r#type: "".to_string(),
364                settings: vec![],
365                starting_node: None,
366                data_types: vec![],
367                input_type_identifier: None,
368                return_type_identifier: None,
369                project_id: 1,
370            };
371
372            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
373            let flows = Flows { flows: flow_vec };
374
375            let amount = service.insert_flows(flows).await.unwrap();
376            assert_eq!(amount, 3);
377
378            let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
379            assert_eq!(deleted_amount.unwrap(), 3);
380        })
381    );
382
383    redis_integration_test!(
384        delete_many_non_existing_flows,
385        (|_connection: FlowStore, mut service: FlowStoreService| async move {
386            let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
387            assert_eq!(deleted_amount.unwrap(), 0);
388        })
389    );
390
391    redis_integration_test!(
392        get_existing_flow_ids,
393        (|_connection: FlowStore, mut service: FlowStoreService| async move {
394            let flow_one = Flow {
395                flow_id: 1,
396                r#type: "".to_string(),
397                settings: vec![],
398                starting_node: None,
399                data_types: vec![],
400                input_type_identifier: None,
401                return_type_identifier: None,
402                project_id: 1,
403            };
404
405            let flow_two = Flow {
406                flow_id: 2,
407                r#type: "".to_string(),
408                settings: vec![],
409                starting_node: None,
410                data_types: vec![],
411                input_type_identifier: None,
412                return_type_identifier: None,
413                project_id: 1,
414            };
415
416            let flow_three = Flow {
417                flow_id: 3,
418                r#type: "".to_string(),
419                settings: vec![],
420                starting_node: None,
421                data_types: vec![],
422                input_type_identifier: None,
423                return_type_identifier: None,
424                project_id: 1,
425            };
426
427            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
428            let flows = Flows { flows: flow_vec };
429
430            let amount = service.insert_flows(flows).await.unwrap();
431            assert_eq!(amount, 3);
432
433            let mut flow_ids = service.get_all_flow_ids().await.unwrap();
434            flow_ids.sort();
435
436            assert_eq!(flow_ids, vec![1, 2, 3]);
437        })
438    );
439
440    redis_integration_test!(
441        get_empty_flow_ids,
442        (|_connection: FlowStore, mut service: FlowStoreService| async move {
443            let flow_ids = service.get_all_flow_ids().await;
444            assert_eq!(flow_ids.unwrap(), Vec::<i64>::new());
445        })
446    );
447}