code0_flow/flow_store/
service.rs

1use super::flow_identifier;
2use crate::flow_store::connection::FlowStore;
3use async_trait::async_trait;
4use log::error;
5use redis::{AsyncCommands, JsonAsyncCommands, RedisError, RedisResult};
6use tucana::shared::{Flow, Flows};
7
8#[derive(Debug)]
9pub struct FlowStoreError {
10    pub kind: FlowStoreErrorKind,
11    pub flow_id: i64,
12    pub reason: String,
13}
14
15#[derive(Debug)]
16pub enum FlowStoreErrorKind {
17    Serialization,
18    RedisOperation,
19    NoIdentifier,
20}
21
22/// Trait representing a service for managing flows in a Redis.
23#[async_trait]
24pub trait FlowStoreServiceBase {
25    async fn new(redis_client_arc: FlowStore) -> Self;
26    async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError>;
27    async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError>;
28    async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError>;
29    async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError>;
30    async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError>;
31    async fn query_flows(&mut self, pattern: String) -> Result<Flows, FlowStoreError>;
32}
33
34/// Struct representing a service for managing flows in a Redis.
35#[derive(Clone)]
36pub struct FlowStoreService {
37    pub(crate) redis_client_arc: FlowStore,
38}
39
40/// Implementation of a service for managing flows in a Redis.
41#[async_trait]
42impl FlowStoreServiceBase for FlowStoreService {
43    async fn new(redis_client_arc: FlowStore) -> FlowStoreService {
44        FlowStoreService { redis_client_arc }
45    }
46
47    /// Insert a list of flows into Redis
48    async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError> {
49        let mut connection = self.redis_client_arc.lock().await;
50
51        let identifier = match flow_identifier::get_flow_identifier(&flow) {
52            Some(id) => id,
53            None => {
54                return Err(FlowStoreError {
55                    kind: FlowStoreErrorKind::NoIdentifier,
56                    flow_id: flow.flow_id,
57                    reason: String::from("Identifier can't be determent!"),
58                });
59            }
60        };
61
62        let insert_result: RedisResult<()> = connection.json_set(identifier, "$", &flow).await;
63
64        match insert_result {
65            Err(redis_error) => {
66                error!("An Error occurred {}", redis_error);
67                Err(FlowStoreError {
68                    flow_id: flow.flow_id,
69                    kind: FlowStoreErrorKind::RedisOperation,
70                    reason: redis_error.to_string(),
71                })
72            }
73            _ => Ok(1),
74        }
75    }
76
77    /// Insert a flows into Redis
78    async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError> {
79        let mut total_modified = 0;
80
81        for flow in flows.flows {
82            let result = self.insert_flow(flow).await?;
83            total_modified += result;
84        }
85
86        Ok(total_modified)
87    }
88
89    /// Deletes a flow
90    async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError> {
91        let mut connection = self.redis_client_arc.lock().await;
92
93        let identifier = format!("{}::*", flow_id);
94        let keys: Vec<String> = connection.keys(&identifier).await?;
95        let deleted_flow: RedisResult<i64> = connection.json_del(keys, ".").await;
96
97        match deleted_flow {
98            Ok(int) => Ok(int),
99            Err(redis_error) => {
100                error!("An Error occurred {}", redis_error);
101                Err(redis_error)
102            }
103        }
104    }
105
106    /// Deletes a list of flows
107    async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError> {
108        let mut total_modified = 0;
109
110        for id in flow_ids {
111            let result = self.delete_flow(id).await?;
112            total_modified += result;
113        }
114
115        Ok(total_modified)
116    }
117
118    /// Queries for all ids in the redis
119    /// Returns `Result<Vec<i64>, RedisError>`: Result of the flow ids currently in Redis
120    async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError> {
121        let mut connection = self.redis_client_arc.lock().await;
122
123        let string_keys: Vec<String> = {
124            match connection.keys("*").await {
125                Ok(res) => res,
126                Err(error) => {
127                    error!("Can't retrieve keys from redis. Reason: {error}");
128                    return Err(error);
129                }
130            }
131        };
132
133        let mut real_keys: Vec<String> = vec![];
134
135        for key in string_keys {
136            if key.contains("::") {
137                let number = key.splitn(2, "::").next();
138                if let Some(real_number) = number {
139                    real_keys.push(String::from(real_number));
140                }
141            }
142        }
143
144        let int_keys: Vec<i64> = real_keys
145            .into_iter()
146            .filter_map(|key| key.parse::<i64>().ok())
147            .collect();
148
149        Ok(int_keys)
150    }
151
152    async fn query_flows(&mut self, pattern: String) -> Result<Flows, FlowStoreError> {
153        let mut connection = self.redis_client_arc.lock().await;
154
155        let keys: Vec<String> = {
156            match connection.keys(pattern).await {
157                Ok(res) => res,
158                Err(error) => {
159                    error!("Can't retrieve keys from redis. Reason: {error}");
160                    return Err(FlowStoreError {
161                        kind: FlowStoreErrorKind::RedisOperation,
162                        flow_id: 0,
163                        reason: error.detail().unwrap().to_string(),
164                    });
165                }
166            }
167        };
168
169        if keys.is_empty() {
170            return Ok(Flows { flows: vec![] });
171        }
172
173        match connection
174            .json_get::<Vec<String>, &str, Vec<String>>(keys, "$")
175            .await
176        {
177            Ok(json_values) => {
178                let mut all_flows: Vec<Flow> = Vec::new();
179
180                for json_str in json_values {
181                    match serde_json::from_str::<Vec<Flow>>(&json_str) {
182                        Ok(mut flows) => all_flows.append(&mut flows),
183                        Err(error) => {
184                            return Err(FlowStoreError {
185                                kind: FlowStoreErrorKind::Serialization,
186                                flow_id: 0,
187                                reason: error.to_string(),
188                            });
189                        }
190                    }
191                }
192
193                return Ok(Flows { flows: all_flows });
194            }
195            Err(error) => {
196                return Err(FlowStoreError {
197                    kind: FlowStoreErrorKind::RedisOperation,
198                    flow_id: 0,
199                    reason: error.detail().unwrap_or("Unknown Redis error").to_string(),
200                });
201            }
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::collections::HashMap;
209
210    use crate::flow_store::connection::create_flow_store_connection;
211    use crate::flow_store::connection::FlowStore;
212    use crate::flow_store::service::FlowStoreService;
213    use crate::flow_store::service::FlowStoreServiceBase;
214    use redis::{AsyncCommands, JsonAsyncCommands};
215    use serial_test::serial;
216    use testcontainers::core::IntoContainerPort;
217    use testcontainers::core::WaitFor;
218    use testcontainers::runners::AsyncRunner;
219    use testcontainers::GenericImage;
220    use tucana::shared::FlowSetting;
221    use tucana::shared::FlowSettingDefinition;
222    use tucana::shared::Struct;
223    use tucana::shared::{Flow, Flows};
224
225    fn get_string_value(value: &str) -> tucana::shared::Value {
226        tucana::shared::Value {
227            kind: Some(tucana::shared::value::Kind::StringValue(String::from(
228                value,
229            ))),
230        }
231    }
232
233    fn get_settings() -> Vec<FlowSetting> {
234        vec![
235            FlowSetting {
236                definition: Some(FlowSettingDefinition {
237                    id: String::from("1424525"),
238                    key: String::from("HTTP_HOST"),
239                }),
240                object: Some(Struct {
241                    fields: {
242                        let mut map = HashMap::new();
243                        map.insert(String::from("host"), get_string_value("abc.code0.tech"));
244                        map
245                    },
246                }),
247            },
248            FlowSetting {
249                definition: Some(FlowSettingDefinition {
250                    id: String::from("14245252352"),
251                    key: String::from("HTTP_METHOD"),
252                }),
253                object: Some(Struct {
254                    fields: {
255                        let mut map = HashMap::new();
256                        map.insert(String::from("method"), get_string_value("GET"));
257                        map
258                    },
259                }),
260            },
261        ]
262    }
263
264    macro_rules! redis_integration_test {
265        ($test_name:ident, $consumer:expr) => {
266            #[tokio::test]
267            #[serial]
268            async fn $test_name() {
269                let port: u16 = 6379;
270                let image_name = "redis/redis-stack";
271                let wait_message = "Ready to accept connections";
272
273                let container = GenericImage::new(image_name, "latest")
274                    .with_exposed_port(port.tcp())
275                    .with_wait_for(WaitFor::message_on_stdout(wait_message))
276                    .start()
277                    .await
278                    .unwrap();
279
280                let host = container.get_host().await.unwrap();
281                let host_port = container.get_host_port_ipv4(port).await.unwrap();
282                let url = format!("redis://{host}:{host_port}");
283                println!("Redis server started correctly on: {}", url.clone());
284
285                let connection = create_flow_store_connection(url).await;
286
287                {
288                    let mut con = connection.lock().await;
289
290                    let _: () = redis::cmd("FLUSHALL")
291                        .query_async(&mut **con)
292                        .await
293                        .expect("FLUSHALL command failed");
294                }
295
296                let base = FlowStoreService::new(connection.clone()).await;
297
298                $consumer(connection, base).await;
299                let _ = container.stop().await;
300            }
301        };
302    }
303
304    redis_integration_test!(
305        insert_one_flow,
306        (|connection: FlowStore, mut service: FlowStoreService| async move {
307            let flow = Flow {
308                flow_id: 1,
309                r#type: "REST".to_string(),
310                settings: get_settings(),
311                starting_node: None,
312                data_types: vec![],
313                input_type_identifier: None,
314                return_type_identifier: None,
315                project_id: 1,
316            };
317
318            match service.insert_flow(flow.clone()).await {
319                Ok(i) => println!("{}", i),
320                Err(err) => println!("{}", err.reason),
321            };
322
323            let redis_result: Option<String> = {
324                let mut redis_cmd = connection.lock().await;
325                redis_cmd
326                    .json_get("1::1::REST::abc.code0.tech::GET", "$")
327                    .await
328                    .unwrap()
329            };
330
331            println!("{}", redis_result.clone().unwrap());
332
333            assert!(redis_result.is_some());
334            let redis_flow: Vec<Flow> = serde_json::from_str(&*redis_result.unwrap()).unwrap();
335            assert_eq!(redis_flow[0], flow);
336        })
337    );
338
339    redis_integration_test!(
340        insert_one_flow_fails_no_identifier,
341        (|_connection: FlowStore, mut service: FlowStoreService| async move {
342            let flow = Flow {
343                flow_id: 1,
344                r#type: "".to_string(),
345                settings: get_settings(),
346                starting_node: None,
347                data_types: vec![],
348                input_type_identifier: None,
349                return_type_identifier: None,
350                project_id: 1,
351            };
352
353            assert!(!service.insert_flow(flow.clone()).await.is_ok());
354        })
355    );
356
357    redis_integration_test!(
358        insert_will_overwrite_existing_flow,
359        (|connection: FlowStore, mut service: FlowStoreService| async move {
360            let flow = Flow {
361                flow_id: 1,
362                r#type: "REST".to_string(),
363                settings: get_settings(),
364                data_types: vec![],
365                input_type_identifier: None,
366                return_type_identifier: None,
367                project_id: 1,
368                starting_node: None,
369            };
370
371            match service.insert_flow(flow.clone()).await {
372                Ok(i) => println!("{}", i),
373                Err(err) => println!("{}", err.reason),
374            };
375
376            let flow_overwrite = Flow {
377                flow_id: 1,
378                r#type: "REST".to_string(),
379                settings: get_settings(),
380                data_types: vec![],
381                input_type_identifier: Some(String::from("ABC")),
382                return_type_identifier: None,
383                project_id: 1,
384                starting_node: None,
385            };
386
387            let _ = service.insert_flow(flow_overwrite).await;
388            let amount = service.get_all_flow_ids().await;
389            assert_eq!(amount.unwrap().len(), 1);
390
391            let redis_result: Vec<String> = {
392                let mut redis_cmd = connection.lock().await;
393                redis_cmd
394                    .json_get("1::1::REST::abc.code0.tech::GET", "$")
395                    .await
396                    .unwrap()
397            };
398
399            assert_eq!(redis_result.len(), 1);
400            let string: &str = &*redis_result[0];
401            let redis_flow: Vec<Flow> = serde_json::from_str(string).unwrap();
402            assert!(redis_flow[0].r#input_type_identifier.is_some());
403        })
404    );
405
406    redis_integration_test!(
407        insert_many_flows,
408        (|_connection: FlowStore, mut service: FlowStoreService| async move {
409            let flow_one = Flow {
410                flow_id: 1,
411                r#type: "REST".to_string(),
412                settings: get_settings(),
413                data_types: vec![],
414                input_type_identifier: None,
415                return_type_identifier: None,
416                project_id: 1,
417                starting_node: None,
418            };
419
420            let flow_two = Flow {
421                flow_id: 2,
422                r#type: "REST".to_string(),
423                settings: get_settings(),
424                data_types: vec![],
425                input_type_identifier: None,
426                return_type_identifier: None,
427                project_id: 1,
428                starting_node: None,
429            };
430
431            let flow_three = Flow {
432                flow_id: 3,
433                r#type: "REST".to_string(),
434                settings: get_settings(),
435                starting_node: None,
436                data_types: vec![],
437                input_type_identifier: None,
438                return_type_identifier: None,
439                project_id: 1,
440            };
441
442            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
443            let flows = Flows { flows: flow_vec };
444
445            let amount = service.insert_flows(flows).await.unwrap();
446            assert_eq!(amount, 3);
447        })
448    );
449
450    redis_integration_test!(
451        delete_one_existing_flow,
452        (|connection: FlowStore, mut service: FlowStoreService| async move {
453            let flow = Flow {
454                flow_id: 1,
455                r#type: "REST".to_string(),
456                settings: get_settings(),
457                starting_node: None,
458                data_types: vec![],
459                input_type_identifier: None,
460                return_type_identifier: None,
461                project_id: 1,
462            };
463
464            match service.insert_flow(flow.clone()).await {
465                Ok(i) => println!("{}", i),
466                Err(err) => println!("{}", err.reason),
467            };
468
469            let result = service.delete_flow(1).await;
470
471            assert_eq!(result.unwrap(), 1);
472
473            let redis_result: Option<String> = {
474                let mut redis_cmd = connection.lock().await;
475                redis_cmd.get("1").await.unwrap()
476            };
477
478            assert!(redis_result.is_none());
479        })
480    );
481
482    redis_integration_test!(
483        delete_one_non_existing_flow,
484        (|_connection: FlowStore, mut service: FlowStoreService| async move {
485            let result = service.delete_flow(1).await;
486            assert_eq!(result.unwrap(), 0);
487        })
488    );
489
490    redis_integration_test!(
491        delete_many_existing_flows,
492        (|_connection: FlowStore, mut service: FlowStoreService| async move {
493            let flow_one = Flow {
494                flow_id: 1,
495                r#type: "REST".to_string(),
496                settings: get_settings(),
497                starting_node: None,
498                data_types: vec![],
499                input_type_identifier: None,
500                return_type_identifier: None,
501                project_id: 1,
502            };
503
504            let flow_two = Flow {
505                flow_id: 2,
506                r#type: "REST".to_string(),
507                settings: get_settings(),
508                starting_node: None,
509                data_types: vec![],
510                input_type_identifier: None,
511                return_type_identifier: None,
512                project_id: 1,
513            };
514
515            let flow_three = Flow {
516                flow_id: 3,
517                r#type: "REST".to_string(),
518                settings: get_settings(),
519                starting_node: None,
520                data_types: vec![],
521                input_type_identifier: None,
522                return_type_identifier: None,
523                project_id: 1,
524            };
525
526            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
527            let flows = Flows { flows: flow_vec };
528
529            let amount = service.insert_flows(flows).await.unwrap();
530            assert_eq!(amount, 3);
531
532            let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
533            assert_eq!(deleted_amount.unwrap(), 3);
534        })
535    );
536
537    redis_integration_test!(
538        delete_many_non_existing_flows,
539        (|_connection: FlowStore, mut service: FlowStoreService| async move {
540            let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
541            assert_eq!(deleted_amount.unwrap(), 0);
542        })
543    );
544
545    redis_integration_test!(
546        get_existing_flow_ids,
547        (|_connection: FlowStore, mut service: FlowStoreService| async move {
548            let flow_one = Flow {
549                flow_id: 1,
550                r#type: "REST".to_string(),
551                settings: get_settings(),
552                starting_node: None,
553                data_types: vec![],
554                input_type_identifier: None,
555                return_type_identifier: None,
556                project_id: 1,
557            };
558
559            let flow_two = Flow {
560                flow_id: 2,
561                r#type: "REST".to_string(),
562                settings: get_settings(),
563                starting_node: None,
564                data_types: vec![],
565                input_type_identifier: None,
566                return_type_identifier: None,
567                project_id: 1,
568            };
569
570            let flow_three = Flow {
571                flow_id: 3,
572                r#type: "REST".to_string(),
573                settings: get_settings(),
574                starting_node: None,
575                data_types: vec![],
576                input_type_identifier: None,
577                return_type_identifier: None,
578                project_id: 1,
579            };
580
581            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
582            let flows = Flows { flows: flow_vec };
583
584            let amount = service.insert_flows(flows).await.unwrap();
585            assert_eq!(amount, 3);
586
587            let mut flow_ids = service.get_all_flow_ids().await.unwrap();
588            flow_ids.sort();
589
590            assert_eq!(flow_ids, vec![1, 2, 3]);
591        })
592    );
593
594    redis_integration_test!(
595        get_empty_flow_ids,
596        (|_connection: FlowStore, mut service: FlowStoreService| async move {
597            let flow_ids = service.get_all_flow_ids().await;
598            assert_eq!(flow_ids.unwrap(), Vec::<i64>::new());
599        })
600    );
601
602    redis_integration_test!(
603        query_empty_flow_store,
604        (|_connection: FlowStore, mut service: FlowStoreService| async move {
605            let flows = service.query_flows(String::from("*")).await;
606            assert!(flows.is_ok());
607            assert!(flows.unwrap().flows.is_empty());
608        })
609    );
610
611    redis_integration_test!(
612        query_all_flows,
613        (|_connection: FlowStore, mut service: FlowStoreService| async move {
614            let flow_one = Flow {
615                flow_id: 1,
616                r#type: "REST".to_string(),
617                settings: get_settings(),
618                starting_node: None,
619                data_types: vec![],
620                input_type_identifier: None,
621                return_type_identifier: None,
622                project_id: 1,
623            };
624
625            let flow_two = Flow {
626                flow_id: 2,
627                r#type: "REST".to_string(),
628                settings: get_settings(),
629                starting_node: None,
630                data_types: vec![],
631                input_type_identifier: None,
632                return_type_identifier: None,
633                project_id: 1,
634            };
635
636            let flow_three = Flow {
637                flow_id: 3,
638                r#type: "REST".to_string(),
639                settings: get_settings(),
640                starting_node: None,
641                data_types: vec![],
642                input_type_identifier: None,
643                return_type_identifier: None,
644                project_id: 1,
645            };
646
647            let flows = service.query_flows(String::from("*")).await;
648            assert!(flows.is_ok());
649            assert!(flows.unwrap().flows.is_empty());
650
651            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
652            let flows = Flows { flows: flow_vec };
653
654            let amount = service.insert_flows(flows.clone()).await.unwrap();
655            assert_eq!(amount, 3);
656
657            let query_flows = service.query_flows(String::from("*")).await;
658
659            println!("{:?}", &query_flows);
660
661            assert!(query_flows.is_ok());
662
663            assert_eq!(flows.flows.len(), query_flows.unwrap().flows.len())
664        })
665    );
666
667    redis_integration_test!(
668        query_one_existing_flow,
669        (|_connection: FlowStore, mut service: FlowStoreService| async move {
670            let flow_one = Flow {
671                flow_id: 1,
672                r#type: "REST".to_string(),
673                settings: get_settings(),
674                starting_node: None,
675                data_types: vec![],
676                input_type_identifier: None,
677                return_type_identifier: None,
678                project_id: 1,
679            };
680
681            let flow_two = Flow {
682                flow_id: 2,
683                r#type: "REST".to_string(),
684                settings: get_settings(),
685                starting_node: None,
686                data_types: vec![],
687                input_type_identifier: None,
688                return_type_identifier: None,
689                project_id: 1,
690            };
691
692            let flow_three = Flow {
693                flow_id: 3,
694                r#type: "REST".to_string(),
695                settings: get_settings(),
696                starting_node: None,
697                data_types: vec![],
698                input_type_identifier: None,
699                return_type_identifier: None,
700                project_id: 1,
701            };
702
703            let flows = service.query_flows(String::from("*")).await;
704            assert!(flows.is_ok());
705            assert!(flows.unwrap().flows.is_empty());
706
707            let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
708            let flows = Flows { flows: flow_vec };
709
710            let amount = service.insert_flows(flows.clone()).await.unwrap();
711            assert_eq!(amount, 3);
712
713            let query_flows = service.query_flows(String::from("1::*")).await;
714
715            assert!(query_flows.is_ok());
716            assert_eq!(query_flows.unwrap().flows, vec![flow_one])
717        })
718    );
719}