amqp_api_server/api/
init.rs

1use std::sync::Arc;
2use multiple_connections_lapin_wrapper::amqp_wrapper::AmqpWrapper;
3
4use crate::api::initialization_package::InitializationPackage;
5use crate::api::input::amqp_request_dispatch::AmqpRequestDispatch;
6use crate::api::input::authorizer::try_generate_authorizer;
7use crate::error::{Error, ErrorKind};
8
9use super::output::amqp_output_router::AmqpOutputRouter;
10
11pub async fn initialize<LogicRequestType: Send + 'static>(
12    package: InitializationPackage<LogicRequestType>,
13) -> Result<(), Error> {
14    let logic_request_sender = package.logic_request_sender();
15
16    let api = package.api;
17
18    let input_registration = package.input_registration;
19    let input_elements = input_registration(&api)?;
20
21    let config = package.config;
22    let authorizer = Arc::new(try_generate_authorizer(config.openid_connect).await?);
23
24    let connect_config = config.amqp_connect_config;
25    let mut amqp_wrapper =
26        match AmqpWrapper::try_new(connect_config) {
27            Ok(amqp_wrapper) => amqp_wrapper,
28            Err(error) => return Err(Error::new(ErrorKind::InternalFailure, format!("failed to initialize amqp wrapper: {}", error))),
29        };
30
31    let state_tracker_client = package.state_tracker_client;
32
33    for input_element in input_elements {
34        let channel = match amqp_wrapper.try_get_channel().await {
35            Ok(channel) => channel,
36            Err(error) => return Err(Error::new(ErrorKind::InternalFailure, format!("failed to get channel: {}", error))),
37        };
38
39        let dispatch =
40            AmqpRequestDispatch::new(channel, input_element, authorizer.clone(), logic_request_sender.clone(), state_tracker_client.clone());
41
42        tokio::spawn(dispatch.run());
43    }
44
45    let output_registration = package.output_registration;
46    let output_elements = output_registration(&api, state_tracker_client.clone())?;
47    let output_channel = match amqp_wrapper.try_get_channel().await {
48        Ok(channel) => channel,
49        Err(error) => return Err(Error::new(ErrorKind::InternalFailure, format!("failed to get channel: {}", error))),
50    };
51
52    let output_router = AmqpOutputRouter::new(
53        output_channel,
54        output_elements,
55        package.output_receiver,
56    );
57
58    tokio::spawn(output_router.run());
59
60    Ok(())
61}