1use super::flow_identifier;
2use crate::flow_store::connection::FlowStore;
3use async_trait::async_trait;
4use log::error;
5use redis::{AsyncCommands, JsonAsyncCommands, RedisError, RedisResult};
6use tucana::shared::{Flow, Flows};
7
8#[derive(Debug)]
9pub struct FlowStoreError {
10 pub kind: FlowStoreErrorKind,
11 pub flow_id: i64,
12 pub reason: String,
13}
14
15#[derive(Debug)]
16pub enum FlowStoreErrorKind {
17 Serialization,
18 RedisOperation,
19 NoIdentifier,
20}
21
22#[async_trait]
24pub trait FlowStoreServiceBase {
25 async fn new(redis_client_arc: FlowStore) -> Self;
26 async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError>;
27 async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError>;
28 async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError>;
29 async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError>;
30 async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError>;
31 async fn query_flows(&mut self, pattern: String) -> Result<Flows, FlowStoreError>;
32}
33
34#[derive(Clone)]
36pub struct FlowStoreService {
37 pub(crate) redis_client_arc: FlowStore,
38}
39
40#[async_trait]
42impl FlowStoreServiceBase for FlowStoreService {
43 async fn new(redis_client_arc: FlowStore) -> FlowStoreService {
44 FlowStoreService { redis_client_arc }
45 }
46
47 async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError> {
49 let mut connection = self.redis_client_arc.lock().await;
50
51 let identifier = match flow_identifier::get_flow_identifier(&flow) {
52 Some(id) => id,
53 None => {
54 return Err(FlowStoreError {
55 kind: FlowStoreErrorKind::NoIdentifier,
56 flow_id: flow.flow_id,
57 reason: String::from("Identifier can't be determent!"),
58 });
59 }
60 };
61
62 let insert_result: RedisResult<()> = connection.json_set(identifier, "$", &flow).await;
63
64 match insert_result {
65 Err(redis_error) => {
66 error!("An Error occurred {}", redis_error);
67 Err(FlowStoreError {
68 flow_id: flow.flow_id,
69 kind: FlowStoreErrorKind::RedisOperation,
70 reason: redis_error.to_string(),
71 })
72 }
73 _ => Ok(1),
74 }
75 }
76
77 async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError> {
79 let mut total_modified = 0;
80
81 for flow in flows.flows {
82 let result = self.insert_flow(flow).await?;
83 total_modified += result;
84 }
85
86 Ok(total_modified)
87 }
88
89 async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError> {
91 let mut connection = self.redis_client_arc.lock().await;
92
93 let identifier = format!("{}::*", flow_id);
94 let keys: Vec<String> = connection.keys(&identifier).await?;
95 let deleted_flow: RedisResult<i64> = connection.json_del(keys, ".").await;
96
97 match deleted_flow {
98 Ok(int) => Ok(int),
99 Err(redis_error) => {
100 error!("An Error occurred {}", redis_error);
101 Err(redis_error)
102 }
103 }
104 }
105
106 async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError> {
108 let mut total_modified = 0;
109
110 for id in flow_ids {
111 let result = self.delete_flow(id).await?;
112 total_modified += result;
113 }
114
115 Ok(total_modified)
116 }
117
118 async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError> {
121 let mut connection = self.redis_client_arc.lock().await;
122
123 let string_keys: Vec<String> = {
124 match connection.keys("*").await {
125 Ok(res) => res,
126 Err(error) => {
127 error!("Can't retrieve keys from redis. Reason: {error}");
128 return Err(error);
129 }
130 }
131 };
132
133 let mut real_keys: Vec<String> = vec![];
134
135 for key in string_keys {
136 if key.contains("::") {
137 let number = key.splitn(2, "::").next();
138 if let Some(real_number) = number {
139 real_keys.push(String::from(real_number));
140 }
141 }
142 }
143
144 let int_keys: Vec<i64> = real_keys
145 .into_iter()
146 .filter_map(|key| key.parse::<i64>().ok())
147 .collect();
148
149 Ok(int_keys)
150 }
151
152 async fn query_flows(&mut self, pattern: String) -> Result<Flows, FlowStoreError> {
153 let mut connection = self.redis_client_arc.lock().await;
154
155 let keys: Vec<String> = {
156 match connection.keys(pattern).await {
157 Ok(res) => res,
158 Err(error) => {
159 error!("Can't retrieve keys from redis. Reason: {error}");
160 return Err(FlowStoreError {
161 kind: FlowStoreErrorKind::RedisOperation,
162 flow_id: 0,
163 reason: error.detail().unwrap().to_string(),
164 });
165 }
166 }
167 };
168
169 if keys.is_empty() {
170 return Ok(Flows { flows: vec![] });
171 }
172
173 match connection
174 .json_get::<Vec<String>, &str, Vec<String>>(keys, "$")
175 .await
176 {
177 Ok(json_values) => {
178 let mut all_flows: Vec<Flow> = Vec::new();
179
180 for json_str in json_values {
181 match serde_json::from_str::<Vec<Flow>>(&json_str) {
182 Ok(mut flows) => all_flows.append(&mut flows),
183 Err(error) => {
184 return Err(FlowStoreError {
185 kind: FlowStoreErrorKind::Serialization,
186 flow_id: 0,
187 reason: error.to_string(),
188 });
189 }
190 }
191 }
192
193 return Ok(Flows { flows: all_flows });
194 }
195 Err(error) => {
196 return Err(FlowStoreError {
197 kind: FlowStoreErrorKind::RedisOperation,
198 flow_id: 0,
199 reason: error.detail().unwrap_or("Unknown Redis error").to_string(),
200 });
201 }
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use std::collections::HashMap;
209
210 use crate::flow_store::connection::create_flow_store_connection;
211 use crate::flow_store::connection::FlowStore;
212 use crate::flow_store::service::FlowStoreService;
213 use crate::flow_store::service::FlowStoreServiceBase;
214 use redis::{AsyncCommands, JsonAsyncCommands};
215 use serial_test::serial;
216 use testcontainers::core::IntoContainerPort;
217 use testcontainers::core::WaitFor;
218 use testcontainers::runners::AsyncRunner;
219 use testcontainers::GenericImage;
220 use tucana::shared::FlowSetting;
221 use tucana::shared::FlowSettingDefinition;
222 use tucana::shared::Struct;
223 use tucana::shared::{Flow, Flows};
224
225 fn get_string_value(value: &str) -> tucana::shared::Value {
226 tucana::shared::Value {
227 kind: Some(tucana::shared::value::Kind::StringValue(String::from(
228 value,
229 ))),
230 }
231 }
232
233 fn get_settings() -> Vec<FlowSetting> {
234 vec![
235 FlowSetting {
236 definition: Some(FlowSettingDefinition {
237 id: String::from("1424525"),
238 key: String::from("HTTP_HOST"),
239 }),
240 object: Some(Struct {
241 fields: {
242 let mut map = HashMap::new();
243 map.insert(String::from("host"), get_string_value("abc.code0.tech"));
244 map
245 },
246 }),
247 },
248 FlowSetting {
249 definition: Some(FlowSettingDefinition {
250 id: String::from("14245252352"),
251 key: String::from("HTTP_METHOD"),
252 }),
253 object: Some(Struct {
254 fields: {
255 let mut map = HashMap::new();
256 map.insert(String::from("method"), get_string_value("GET"));
257 map
258 },
259 }),
260 },
261 ]
262 }
263
264 macro_rules! redis_integration_test {
265 ($test_name:ident, $consumer:expr) => {
266 #[tokio::test]
267 #[serial]
268 async fn $test_name() {
269 let port: u16 = 6379;
270 let image_name = "redis/redis-stack";
271 let wait_message = "Ready to accept connections";
272
273 let container = GenericImage::new(image_name, "latest")
274 .with_exposed_port(port.tcp())
275 .with_wait_for(WaitFor::message_on_stdout(wait_message))
276 .start()
277 .await
278 .unwrap();
279
280 let host = container.get_host().await.unwrap();
281 let host_port = container.get_host_port_ipv4(port).await.unwrap();
282 let url = format!("redis://{host}:{host_port}");
283 println!("Redis server started correctly on: {}", url.clone());
284
285 let connection = create_flow_store_connection(url).await;
286
287 {
288 let mut con = connection.lock().await;
289
290 let _: () = redis::cmd("FLUSHALL")
291 .query_async(&mut **con)
292 .await
293 .expect("FLUSHALL command failed");
294 }
295
296 let base = FlowStoreService::new(connection.clone()).await;
297
298 $consumer(connection, base).await;
299 let _ = container.stop().await;
300 }
301 };
302 }
303
304 redis_integration_test!(
305 insert_one_flow,
306 (|connection: FlowStore, mut service: FlowStoreService| async move {
307 let flow = Flow {
308 flow_id: 1,
309 r#type: "REST".to_string(),
310 settings: get_settings(),
311 starting_node: None,
312 data_types: vec![],
313 input_type_identifier: None,
314 return_type_identifier: None,
315 project_id: 1,
316 };
317
318 match service.insert_flow(flow.clone()).await {
319 Ok(i) => println!("{}", i),
320 Err(err) => println!("{}", err.reason),
321 };
322
323 let redis_result: Option<String> = {
324 let mut redis_cmd = connection.lock().await;
325 redis_cmd
326 .json_get("1::1::REST::abc.code0.tech::GET", "$")
327 .await
328 .unwrap()
329 };
330
331 println!("{}", redis_result.clone().unwrap());
332
333 assert!(redis_result.is_some());
334 let redis_flow: Vec<Flow> = serde_json::from_str(&*redis_result.unwrap()).unwrap();
335 assert_eq!(redis_flow[0], flow);
336 })
337 );
338
339 redis_integration_test!(
340 insert_one_flow_fails_no_identifier,
341 (|_connection: FlowStore, mut service: FlowStoreService| async move {
342 let flow = Flow {
343 flow_id: 1,
344 r#type: "".to_string(),
345 settings: get_settings(),
346 starting_node: None,
347 data_types: vec![],
348 input_type_identifier: None,
349 return_type_identifier: None,
350 project_id: 1,
351 };
352
353 assert!(!service.insert_flow(flow.clone()).await.is_ok());
354 })
355 );
356
357 redis_integration_test!(
358 insert_will_overwrite_existing_flow,
359 (|connection: FlowStore, mut service: FlowStoreService| async move {
360 let flow = Flow {
361 flow_id: 1,
362 r#type: "REST".to_string(),
363 settings: get_settings(),
364 data_types: vec![],
365 input_type_identifier: None,
366 return_type_identifier: None,
367 project_id: 1,
368 starting_node: None,
369 };
370
371 match service.insert_flow(flow.clone()).await {
372 Ok(i) => println!("{}", i),
373 Err(err) => println!("{}", err.reason),
374 };
375
376 let flow_overwrite = Flow {
377 flow_id: 1,
378 r#type: "REST".to_string(),
379 settings: get_settings(),
380 data_types: vec![],
381 input_type_identifier: Some(String::from("ABC")),
382 return_type_identifier: None,
383 project_id: 1,
384 starting_node: None,
385 };
386
387 let _ = service.insert_flow(flow_overwrite).await;
388 let amount = service.get_all_flow_ids().await;
389 assert_eq!(amount.unwrap().len(), 1);
390
391 let redis_result: Vec<String> = {
392 let mut redis_cmd = connection.lock().await;
393 redis_cmd
394 .json_get("1::1::REST::abc.code0.tech::GET", "$")
395 .await
396 .unwrap()
397 };
398
399 assert_eq!(redis_result.len(), 1);
400 let string: &str = &*redis_result[0];
401 let redis_flow: Vec<Flow> = serde_json::from_str(string).unwrap();
402 assert!(redis_flow[0].r#input_type_identifier.is_some());
403 })
404 );
405
406 redis_integration_test!(
407 insert_many_flows,
408 (|_connection: FlowStore, mut service: FlowStoreService| async move {
409 let flow_one = Flow {
410 flow_id: 1,
411 r#type: "REST".to_string(),
412 settings: get_settings(),
413 data_types: vec![],
414 input_type_identifier: None,
415 return_type_identifier: None,
416 project_id: 1,
417 starting_node: None,
418 };
419
420 let flow_two = Flow {
421 flow_id: 2,
422 r#type: "REST".to_string(),
423 settings: get_settings(),
424 data_types: vec![],
425 input_type_identifier: None,
426 return_type_identifier: None,
427 project_id: 1,
428 starting_node: None,
429 };
430
431 let flow_three = Flow {
432 flow_id: 3,
433 r#type: "REST".to_string(),
434 settings: get_settings(),
435 starting_node: None,
436 data_types: vec![],
437 input_type_identifier: None,
438 return_type_identifier: None,
439 project_id: 1,
440 };
441
442 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
443 let flows = Flows { flows: flow_vec };
444
445 let amount = service.insert_flows(flows).await.unwrap();
446 assert_eq!(amount, 3);
447 })
448 );
449
450 redis_integration_test!(
451 delete_one_existing_flow,
452 (|connection: FlowStore, mut service: FlowStoreService| async move {
453 let flow = Flow {
454 flow_id: 1,
455 r#type: "REST".to_string(),
456 settings: get_settings(),
457 starting_node: None,
458 data_types: vec![],
459 input_type_identifier: None,
460 return_type_identifier: None,
461 project_id: 1,
462 };
463
464 match service.insert_flow(flow.clone()).await {
465 Ok(i) => println!("{}", i),
466 Err(err) => println!("{}", err.reason),
467 };
468
469 let result = service.delete_flow(1).await;
470
471 assert_eq!(result.unwrap(), 1);
472
473 let redis_result: Option<String> = {
474 let mut redis_cmd = connection.lock().await;
475 redis_cmd.get("1").await.unwrap()
476 };
477
478 assert!(redis_result.is_none());
479 })
480 );
481
482 redis_integration_test!(
483 delete_one_non_existing_flow,
484 (|_connection: FlowStore, mut service: FlowStoreService| async move {
485 let result = service.delete_flow(1).await;
486 assert_eq!(result.unwrap(), 0);
487 })
488 );
489
490 redis_integration_test!(
491 delete_many_existing_flows,
492 (|_connection: FlowStore, mut service: FlowStoreService| async move {
493 let flow_one = Flow {
494 flow_id: 1,
495 r#type: "REST".to_string(),
496 settings: get_settings(),
497 starting_node: None,
498 data_types: vec![],
499 input_type_identifier: None,
500 return_type_identifier: None,
501 project_id: 1,
502 };
503
504 let flow_two = Flow {
505 flow_id: 2,
506 r#type: "REST".to_string(),
507 settings: get_settings(),
508 starting_node: None,
509 data_types: vec![],
510 input_type_identifier: None,
511 return_type_identifier: None,
512 project_id: 1,
513 };
514
515 let flow_three = Flow {
516 flow_id: 3,
517 r#type: "REST".to_string(),
518 settings: get_settings(),
519 starting_node: None,
520 data_types: vec![],
521 input_type_identifier: None,
522 return_type_identifier: None,
523 project_id: 1,
524 };
525
526 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
527 let flows = Flows { flows: flow_vec };
528
529 let amount = service.insert_flows(flows).await.unwrap();
530 assert_eq!(amount, 3);
531
532 let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
533 assert_eq!(deleted_amount.unwrap(), 3);
534 })
535 );
536
537 redis_integration_test!(
538 delete_many_non_existing_flows,
539 (|_connection: FlowStore, mut service: FlowStoreService| async move {
540 let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
541 assert_eq!(deleted_amount.unwrap(), 0);
542 })
543 );
544
545 redis_integration_test!(
546 get_existing_flow_ids,
547 (|_connection: FlowStore, mut service: FlowStoreService| async move {
548 let flow_one = Flow {
549 flow_id: 1,
550 r#type: "REST".to_string(),
551 settings: get_settings(),
552 starting_node: None,
553 data_types: vec![],
554 input_type_identifier: None,
555 return_type_identifier: None,
556 project_id: 1,
557 };
558
559 let flow_two = Flow {
560 flow_id: 2,
561 r#type: "REST".to_string(),
562 settings: get_settings(),
563 starting_node: None,
564 data_types: vec![],
565 input_type_identifier: None,
566 return_type_identifier: None,
567 project_id: 1,
568 };
569
570 let flow_three = Flow {
571 flow_id: 3,
572 r#type: "REST".to_string(),
573 settings: get_settings(),
574 starting_node: None,
575 data_types: vec![],
576 input_type_identifier: None,
577 return_type_identifier: None,
578 project_id: 1,
579 };
580
581 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
582 let flows = Flows { flows: flow_vec };
583
584 let amount = service.insert_flows(flows).await.unwrap();
585 assert_eq!(amount, 3);
586
587 let mut flow_ids = service.get_all_flow_ids().await.unwrap();
588 flow_ids.sort();
589
590 assert_eq!(flow_ids, vec![1, 2, 3]);
591 })
592 );
593
594 redis_integration_test!(
595 get_empty_flow_ids,
596 (|_connection: FlowStore, mut service: FlowStoreService| async move {
597 let flow_ids = service.get_all_flow_ids().await;
598 assert_eq!(flow_ids.unwrap(), Vec::<i64>::new());
599 })
600 );
601
602 redis_integration_test!(
603 query_empty_flow_store,
604 (|_connection: FlowStore, mut service: FlowStoreService| async move {
605 let flows = service.query_flows(String::from("*")).await;
606 assert!(flows.is_ok());
607 assert!(flows.unwrap().flows.is_empty());
608 })
609 );
610
611 redis_integration_test!(
612 query_all_flows,
613 (|_connection: FlowStore, mut service: FlowStoreService| async move {
614 let flow_one = Flow {
615 flow_id: 1,
616 r#type: "REST".to_string(),
617 settings: get_settings(),
618 starting_node: None,
619 data_types: vec![],
620 input_type_identifier: None,
621 return_type_identifier: None,
622 project_id: 1,
623 };
624
625 let flow_two = Flow {
626 flow_id: 2,
627 r#type: "REST".to_string(),
628 settings: get_settings(),
629 starting_node: None,
630 data_types: vec![],
631 input_type_identifier: None,
632 return_type_identifier: None,
633 project_id: 1,
634 };
635
636 let flow_three = Flow {
637 flow_id: 3,
638 r#type: "REST".to_string(),
639 settings: get_settings(),
640 starting_node: None,
641 data_types: vec![],
642 input_type_identifier: None,
643 return_type_identifier: None,
644 project_id: 1,
645 };
646
647 let flows = service.query_flows(String::from("*")).await;
648 assert!(flows.is_ok());
649 assert!(flows.unwrap().flows.is_empty());
650
651 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
652 let flows = Flows { flows: flow_vec };
653
654 let amount = service.insert_flows(flows.clone()).await.unwrap();
655 assert_eq!(amount, 3);
656
657 let query_flows = service.query_flows(String::from("*")).await;
658
659 println!("{:?}", &query_flows);
660
661 assert!(query_flows.is_ok());
662
663 assert_eq!(flows.flows.len(), query_flows.unwrap().flows.len())
664 })
665 );
666
667 redis_integration_test!(
668 query_one_existing_flow,
669 (|_connection: FlowStore, mut service: FlowStoreService| async move {
670 let flow_one = Flow {
671 flow_id: 1,
672 r#type: "REST".to_string(),
673 settings: get_settings(),
674 starting_node: None,
675 data_types: vec![],
676 input_type_identifier: None,
677 return_type_identifier: None,
678 project_id: 1,
679 };
680
681 let flow_two = Flow {
682 flow_id: 2,
683 r#type: "REST".to_string(),
684 settings: get_settings(),
685 starting_node: None,
686 data_types: vec![],
687 input_type_identifier: None,
688 return_type_identifier: None,
689 project_id: 1,
690 };
691
692 let flow_three = Flow {
693 flow_id: 3,
694 r#type: "REST".to_string(),
695 settings: get_settings(),
696 starting_node: None,
697 data_types: vec![],
698 input_type_identifier: None,
699 return_type_identifier: None,
700 project_id: 1,
701 };
702
703 let flows = service.query_flows(String::from("*")).await;
704 assert!(flows.is_ok());
705 assert!(flows.unwrap().flows.is_empty());
706
707 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
708 let flows = Flows { flows: flow_vec };
709
710 let amount = service.insert_flows(flows.clone()).await.unwrap();
711 assert_eq!(amount, 3);
712
713 let query_flows = service.query_flows(String::from("1::*")).await;
714
715 assert!(query_flows.is_ok());
716 assert_eq!(query_flows.unwrap().flows, vec![flow_one])
717 })
718 );
719}