cp_microservice/impl/
init.rs

1use std::mem::Discriminant;
2use std::{collections::HashMap, sync::Arc};
3
4use async_channel::Sender;
5use futures_util::future::join_all;
6use multiple_connections_lapin_wrapper::{
7    amqp_wrapper::AmqpWrapper, config::amqp_connect_config::AmqpConnectConfig,
8};
9use tokio_util::sync::CancellationToken;
10
11use crate::api::server::input::action::Action;
12use crate::r#impl::api::shared::amqp_api_entry::AmqpApiEntry;
13use crate::r#impl::process_signals::listen_to_process_signals;
14use crate::{
15    api::server::input::input_plugin::InputPlugin,
16    r#impl::api::server::input::amqp_input::AmqpInput,
17};
18
19pub struct ApiInitializationPackage<LogicRequestType: 'static + Send + Sync + std::fmt::Debug> {
20    pub amqp_connection_config: AmqpConnectConfig,
21    pub amqp_api: Vec<AmqpApiEntry>,
22    pub actions: HashMap<String, Action<LogicRequestType>>,
23    pub plugins: Vec<Arc<dyn InputPlugin + Send + Sync>>,
24}
25
26pub struct LogicInitializationPackage<
27    LogicRequestType: 'static + Send + Sync + std::fmt::Debug,
28    StorageRequestType: 'static + Send + Sync,
29> {
30    pub executors: HashMap<
31        Discriminant<LogicRequestType>,
32        crate::logic::executor::Executor<LogicRequestType, StorageRequestType>,
33    >,
34    pub storage_request_sender: Sender<StorageRequestType>,
35}
36
37pub async fn try_initialize_microservice<
38    LogicRequestType: 'static + Send + Sync + std::fmt::Debug,
39    StorageRequestType: 'static + Send + Sync + std::fmt::Debug,
40>(
41    api_initialization_package: ApiInitializationPackage<LogicRequestType>,
42    logic_initialization_package: LogicInitializationPackage<LogicRequestType, StorageRequestType>,
43) -> Result<(), std::io::Error> {
44    let cancellation_token = CancellationToken::new();
45
46    listen_to_process_signals(cancellation_token.clone());
47
48    let amqp_wrapper = match AmqpWrapper::try_new(api_initialization_package.amqp_connection_config)
49    {
50        Ok(amqp_wrapper) => amqp_wrapper,
51        Err(error) => {
52            return Err(std::io::Error::new(
53                std::io::ErrorKind::Other,
54                format!("failed to create AMQP wrapper: {}", error),
55            ))
56        }
57    };
58
59    let amqp_inputs =
60        generate_inputs_from_api(amqp_wrapper, api_initialization_package.amqp_api).await?;
61
62    let (logic_request_sender, logic_request_receiver) =
63        async_channel::bounded::<LogicRequestType>(1024usize);
64
65    let api_dispatch: crate::api::server::dispatch::Dispatch<AmqpInput, LogicRequestType> =
66        crate::api::server::dispatch::Dispatch::new(
67            amqp_inputs,
68            api_initialization_package.actions,
69            logic_request_sender,
70            api_initialization_package.plugins,
71        );
72
73    let api_cancellation_token = cancellation_token.clone();
74    tokio::spawn(async move {
75        // when handles have finished, the program will exit since an exit signal is sent to the process
76        let handles = api_dispatch.run(api_cancellation_token).await;
77
78        join_all(handles).await;
79
80        std::process::exit(0);
81    });
82
83    let logic_dispatch: crate::logic::dispatch::Dispatch<LogicRequestType, StorageRequestType> =
84        crate::logic::dispatch::Dispatch::new(
85            logic_request_receiver,
86            logic_initialization_package.executors,
87            logic_initialization_package.storage_request_sender,
88            cancellation_token,
89        );
90
91    tokio::spawn(logic_dispatch.run());
92
93    Ok(())
94}
95
96async fn generate_inputs_from_api(
97    mut amqp_wrapper: AmqpWrapper,
98    amqp_api: Vec<AmqpApiEntry>,
99) -> Result<Vec<AmqpInput>, std::io::Error> {
100    let mut inputs: Vec<AmqpInput> = Vec::new();
101
102    for amqp_api_entry in amqp_api {
103        let channel = match amqp_wrapper.try_get_channel().await {
104            Ok(channel) => channel,
105            Err(error) => {
106                return Err(std::io::Error::new(
107                    std::io::ErrorKind::Other,
108                    format!("failed to get AMQP channel: {}", &error),
109                ))
110            }
111        };
112
113        let amqp_input = match AmqpInput::try_new(channel, amqp_api_entry.amqp_queue_consumer).await
114        {
115            Ok(amqp_input) => amqp_input,
116            Err(error) => {
117                return Err(std::io::Error::new(
118                    std::io::ErrorKind::Other,
119                    format!("failed to create AMQP input: {}", &error),
120                ))
121            }
122        };
123
124        inputs.push(amqp_input);
125    }
126
127    Ok(inputs)
128}