1use crate::flow_store::connection::FlowStore;
2use async_trait::async_trait;
3use log::error;
4use redis::{AsyncCommands, JsonAsyncCommands, RedisError, RedisResult};
5use tucana::shared::{Flow, Flows};
6
7#[derive(Debug)]
8pub struct FlowStoreError {
9 pub kind: FlowStoreErrorKind,
10 pub flow_id: i64,
11 pub reason: String,
12}
13
14#[derive(Debug)]
15pub enum FlowStoreErrorKind {
16 Serialization,
17 RedisOperation,
18}
19
20#[async_trait]
22pub trait FlowStoreServiceBase {
23 async fn new(redis_client_arc: FlowStore) -> Self;
24 async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError>;
25 async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError>;
26 async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError>;
27 async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError>;
28 async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError>;
29}
30
31#[derive(Clone)]
33pub struct FlowStoreService {
34 pub(crate) redis_client_arc: FlowStore,
35}
36
37#[async_trait]
39impl FlowStoreServiceBase for FlowStoreService {
40 async fn new(redis_client_arc: FlowStore) -> FlowStoreService {
41 FlowStoreService { redis_client_arc }
42 }
43
44 async fn insert_flow(&mut self, flow: Flow) -> Result<i64, FlowStoreError> {
46 let mut connection = self.redis_client_arc.lock().await;
47
48 let insert_result: RedisResult<()> = connection
49 .json_set(flow.flow_id.to_string(), "$", &flow)
50 .await;
51
52 match insert_result {
53 Err(redis_error) => {
54 error!("An Error occurred {}", redis_error);
55 Err(FlowStoreError {
56 flow_id: flow.flow_id,
57 kind: FlowStoreErrorKind::RedisOperation,
58 reason: redis_error.to_string(),
59 })
60 }
61 _ => Ok(1),
62 }
63 }
64
65 async fn insert_flows(&mut self, flows: Flows) -> Result<i64, FlowStoreError> {
67 let mut total_modified = 0;
68
69 for flow in flows.flows {
70 let result = self.insert_flow(flow).await?;
71 total_modified += result;
72 }
73
74 Ok(total_modified)
75 }
76
77 async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError> {
79 let mut connection = self.redis_client_arc.lock().await;
80 let deleted_flow: RedisResult<i64> = connection.json_del(flow_id, ".").await;
81
82 match deleted_flow {
83 Ok(int) => Ok(int),
84 Err(redis_error) => {
85 error!("An Error occurred {}", redis_error);
86 Err(redis_error)
87 }
88 }
89 }
90
91 async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError> {
93 let mut total_modified = 0;
94
95 for id in flow_ids {
96 let result = self.delete_flow(id).await?;
97 total_modified += result;
98 }
99
100 Ok(total_modified)
101 }
102
103 async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError> {
106 let mut connection = self.redis_client_arc.lock().await;
107
108 let string_keys: Vec<String> = {
109 match connection.keys("*").await {
110 Ok(res) => res,
111 Err(error) => {
112 error!("Can't retrieve keys from redis. Reason: {error}");
113 return Err(error);
114 }
115 }
116 };
117
118 let int_keys: Vec<i64> = string_keys
119 .into_iter()
120 .filter_map(|key| key.parse::<i64>().ok())
121 .collect();
122
123 Ok(int_keys)
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use crate::flow_store::connection::create_flow_store_connection;
130 use crate::flow_store::connection::FlowStore;
131 use crate::flow_store::service::FlowStoreService;
132 use crate::flow_store::service::FlowStoreServiceBase;
133 use redis::{AsyncCommands, JsonAsyncCommands};
134 use serial_test::serial;
135 use testcontainers::core::IntoContainerPort;
136 use testcontainers::core::WaitFor;
137 use testcontainers::runners::AsyncRunner;
138 use testcontainers::GenericImage;
139 use tucana::shared::{Flow, Flows};
140
141 macro_rules! redis_integration_test {
142 ($test_name:ident, $consumer:expr) => {
143 #[tokio::test]
144 #[serial]
145 async fn $test_name() {
146 let port: u16 = 6379;
147 let image_name = "redis/redis-stack";
148 let wait_message = "Ready to accept connections";
149
150 let container = GenericImage::new(image_name, "latest")
151 .with_exposed_port(port.tcp())
152 .with_wait_for(WaitFor::message_on_stdout(wait_message))
153 .start()
154 .await
155 .unwrap();
156
157 let host = container.get_host().await.unwrap();
158 let host_port = container.get_host_port_ipv4(port).await.unwrap();
159 let url = format!("redis://{host}:{host_port}");
160 println!("Redis server started correctly on: {}", url.clone());
161
162 let connection = create_flow_store_connection(url).await;
163
164 {
165 let mut con = connection.lock().await;
166
167 let _: () = redis::cmd("FLUSHALL")
168 .query_async(&mut **con)
169 .await
170 .expect("FLUSHALL command failed");
171 }
172
173 let base = FlowStoreService::new(connection.clone()).await;
174
175 $consumer(connection, base).await;
176 let _ = container.stop().await;
177 }
178 };
179 }
180
181 redis_integration_test!(
182 insert_one_flow,
183 (|connection: FlowStore, mut service: FlowStoreService| async move {
184 let flow = Flow {
185 flow_id: 1,
186 r#type: "".to_string(),
187 settings: vec![],
188 starting_node: None,
189 data_types: vec![],
190 input_type_identifier: None,
191 return_type_identifier: None,
192 project_id: 1,
193 };
194
195 match service.insert_flow(flow.clone()).await {
196 Ok(i) => println!("{}", i),
197 Err(err) => println!("{}", err.reason),
198 };
199
200 let redis_result: Option<String> = {
201 let mut redis_cmd = connection.lock().await;
202 redis_cmd.json_get("1", "$").await.unwrap()
203 };
204
205 println!("{}", redis_result.clone().unwrap());
206
207 assert!(redis_result.is_some());
208 let redis_flow: Vec<Flow> = serde_json::from_str(&*redis_result.unwrap()).unwrap();
209 assert_eq!(redis_flow[0], flow);
210 })
211 );
212
213 redis_integration_test!(
253 insert_many_flows,
254 (|_connection: FlowStore, mut service: FlowStoreService| async move {
255 let flow_one = Flow {
256 flow_id: 1,
257 r#type: "".to_string(),
258 settings: vec![],
259 data_types: vec![],
260 input_type_identifier: None,
261 return_type_identifier: None,
262 project_id: 1,
263 starting_node: None,
264 };
265
266 let flow_two = Flow {
267 flow_id: 2,
268 r#type: "".to_string(),
269 settings: vec![],
270 data_types: vec![],
271 input_type_identifier: None,
272 return_type_identifier: None,
273 project_id: 1,
274 starting_node: None,
275 };
276
277 let flow_three = Flow {
278 flow_id: 3,
279 r#type: "".to_string(),
280 settings: vec![],
281 starting_node: None,
282 data_types: vec![],
283 input_type_identifier: None,
284 return_type_identifier: None,
285 project_id: 1,
286 };
287
288 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
289 let flows = Flows { flows: flow_vec };
290
291 let amount = service.insert_flows(flows).await.unwrap();
292 assert_eq!(amount, 3);
293 })
294 );
295
296 redis_integration_test!(
297 delete_one_existing_flow,
298 (|connection: FlowStore, mut service: FlowStoreService| async move {
299 let flow = Flow {
300 flow_id: 1,
301 r#type: "".to_string(),
302 settings: vec![],
303 starting_node: None,
304 data_types: vec![],
305 input_type_identifier: None,
306 return_type_identifier: None,
307 project_id: 1,
308 };
309
310 match service.insert_flow(flow.clone()).await {
311 Ok(i) => println!("{}", i),
312 Err(err) => println!("{}", err.reason),
313 };
314
315 let result = service.delete_flow(1).await;
316
317 assert_eq!(result.unwrap(), 1);
318
319 let redis_result: Option<String> = {
320 let mut redis_cmd = connection.lock().await;
321 redis_cmd.get("1").await.unwrap()
322 };
323
324 assert!(redis_result.is_none());
325 })
326 );
327
328 redis_integration_test!(
329 delete_one_non_existing_flow,
330 (|_connection: FlowStore, mut service: FlowStoreService| async move {
331 let result = service.delete_flow(1).await;
332 assert_eq!(result.unwrap(), 0);
333 })
334 );
335
336 redis_integration_test!(
337 delete_many_existing_flows,
338 (|_connection: FlowStore, mut service: FlowStoreService| async move {
339 let flow_one = Flow {
340 flow_id: 1,
341 r#type: "".to_string(),
342 settings: vec![],
343 starting_node: None,
344 data_types: vec![],
345 input_type_identifier: None,
346 return_type_identifier: None,
347 project_id: 1,
348 };
349
350 let flow_two = Flow {
351 flow_id: 2,
352 r#type: "".to_string(),
353 settings: vec![],
354 starting_node: None,
355 data_types: vec![],
356 input_type_identifier: None,
357 return_type_identifier: None,
358 project_id: 1,
359 };
360
361 let flow_three = Flow {
362 flow_id: 3,
363 r#type: "".to_string(),
364 settings: vec![],
365 starting_node: None,
366 data_types: vec![],
367 input_type_identifier: None,
368 return_type_identifier: None,
369 project_id: 1,
370 };
371
372 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
373 let flows = Flows { flows: flow_vec };
374
375 let amount = service.insert_flows(flows).await.unwrap();
376 assert_eq!(amount, 3);
377
378 let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
379 assert_eq!(deleted_amount.unwrap(), 3);
380 })
381 );
382
383 redis_integration_test!(
384 delete_many_non_existing_flows,
385 (|_connection: FlowStore, mut service: FlowStoreService| async move {
386 let deleted_amount = service.delete_flows(vec![1, 2, 3]).await;
387 assert_eq!(deleted_amount.unwrap(), 0);
388 })
389 );
390
391 redis_integration_test!(
392 get_existing_flow_ids,
393 (|_connection: FlowStore, mut service: FlowStoreService| async move {
394 let flow_one = Flow {
395 flow_id: 1,
396 r#type: "".to_string(),
397 settings: vec![],
398 starting_node: None,
399 data_types: vec![],
400 input_type_identifier: None,
401 return_type_identifier: None,
402 project_id: 1,
403 };
404
405 let flow_two = Flow {
406 flow_id: 2,
407 r#type: "".to_string(),
408 settings: vec![],
409 starting_node: None,
410 data_types: vec![],
411 input_type_identifier: None,
412 return_type_identifier: None,
413 project_id: 1,
414 };
415
416 let flow_three = Flow {
417 flow_id: 3,
418 r#type: "".to_string(),
419 settings: vec![],
420 starting_node: None,
421 data_types: vec![],
422 input_type_identifier: None,
423 return_type_identifier: None,
424 project_id: 1,
425 };
426
427 let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
428 let flows = Flows { flows: flow_vec };
429
430 let amount = service.insert_flows(flows).await.unwrap();
431 assert_eq!(amount, 3);
432
433 let mut flow_ids = service.get_all_flow_ids().await.unwrap();
434 flow_ids.sort();
435
436 assert_eq!(flow_ids, vec![1, 2, 3]);
437 })
438 );
439
440 redis_integration_test!(
441 get_empty_flow_ids,
442 (|_connection: FlowStore, mut service: FlowStoreService| async move {
443 let flow_ids = service.get_all_flow_ids().await;
444 assert_eq!(flow_ids.unwrap(), Vec::<i64>::new());
445 })
446 );
447}