cp_microservice/impl/
init.rs1use std::mem::Discriminant;
2use std::{collections::HashMap, sync::Arc};
3
4use async_channel::Sender;
5use futures_util::future::join_all;
6use multiple_connections_lapin_wrapper::{
7 amqp_wrapper::AmqpWrapper, config::amqp_connect_config::AmqpConnectConfig,
8};
9use tokio_util::sync::CancellationToken;
10
11use crate::api::server::input::action::Action;
12use crate::r#impl::api::shared::amqp_api_entry::AmqpApiEntry;
13use crate::r#impl::process_signals::listen_to_process_signals;
14use crate::{
15 api::server::input::input_plugin::InputPlugin,
16 r#impl::api::server::input::amqp_input::AmqpInput,
17};
18
19pub struct ApiInitializationPackage<LogicRequestType: 'static + Send + Sync + std::fmt::Debug> {
20 pub amqp_connection_config: AmqpConnectConfig,
21 pub amqp_api: Vec<AmqpApiEntry>,
22 pub actions: HashMap<String, Action<LogicRequestType>>,
23 pub plugins: Vec<Arc<dyn InputPlugin + Send + Sync>>,
24}
25
26pub struct LogicInitializationPackage<
27 LogicRequestType: 'static + Send + Sync + std::fmt::Debug,
28 StorageRequestType: 'static + Send + Sync,
29> {
30 pub executors: HashMap<
31 Discriminant<LogicRequestType>,
32 crate::logic::executor::Executor<LogicRequestType, StorageRequestType>,
33 >,
34 pub storage_request_sender: Sender<StorageRequestType>,
35}
36
37pub async fn try_initialize_microservice<
38 LogicRequestType: 'static + Send + Sync + std::fmt::Debug,
39 StorageRequestType: 'static + Send + Sync + std::fmt::Debug,
40>(
41 api_initialization_package: ApiInitializationPackage<LogicRequestType>,
42 logic_initialization_package: LogicInitializationPackage<LogicRequestType, StorageRequestType>,
43) -> Result<(), std::io::Error> {
44 let cancellation_token = CancellationToken::new();
45
46 listen_to_process_signals(cancellation_token.clone());
47
48 let amqp_wrapper = match AmqpWrapper::try_new(api_initialization_package.amqp_connection_config)
49 {
50 Ok(amqp_wrapper) => amqp_wrapper,
51 Err(error) => {
52 return Err(std::io::Error::new(
53 std::io::ErrorKind::Other,
54 format!("failed to create AMQP wrapper: {}", error),
55 ))
56 }
57 };
58
59 let amqp_inputs =
60 generate_inputs_from_api(amqp_wrapper, api_initialization_package.amqp_api).await?;
61
62 let (logic_request_sender, logic_request_receiver) =
63 async_channel::bounded::<LogicRequestType>(1024usize);
64
65 let api_dispatch: crate::api::server::dispatch::Dispatch<AmqpInput, LogicRequestType> =
66 crate::api::server::dispatch::Dispatch::new(
67 amqp_inputs,
68 api_initialization_package.actions,
69 logic_request_sender,
70 api_initialization_package.plugins,
71 );
72
73 let api_cancellation_token = cancellation_token.clone();
74 tokio::spawn(async move {
75 let handles = api_dispatch.run(api_cancellation_token).await;
77
78 join_all(handles).await;
79
80 std::process::exit(0);
81 });
82
83 let logic_dispatch: crate::logic::dispatch::Dispatch<LogicRequestType, StorageRequestType> =
84 crate::logic::dispatch::Dispatch::new(
85 logic_request_receiver,
86 logic_initialization_package.executors,
87 logic_initialization_package.storage_request_sender,
88 cancellation_token,
89 );
90
91 tokio::spawn(logic_dispatch.run());
92
93 Ok(())
94}
95
96async fn generate_inputs_from_api(
97 mut amqp_wrapper: AmqpWrapper,
98 amqp_api: Vec<AmqpApiEntry>,
99) -> Result<Vec<AmqpInput>, std::io::Error> {
100 let mut inputs: Vec<AmqpInput> = Vec::new();
101
102 for amqp_api_entry in amqp_api {
103 let channel = match amqp_wrapper.try_get_channel().await {
104 Ok(channel) => channel,
105 Err(error) => {
106 return Err(std::io::Error::new(
107 std::io::ErrorKind::Other,
108 format!("failed to get AMQP channel: {}", &error),
109 ))
110 }
111 };
112
113 let amqp_input = match AmqpInput::try_new(channel, amqp_api_entry.amqp_queue_consumer).await
114 {
115 Ok(amqp_input) => amqp_input,
116 Err(error) => {
117 return Err(std::io::Error::new(
118 std::io::ErrorKind::Other,
119 format!("failed to create AMQP input: {}", &error),
120 ))
121 }
122 };
123
124 inputs.push(amqp_input);
125 }
126
127 Ok(inputs)
128}