amqp_api_server/api/
init.rs1use std::sync::Arc;
2use multiple_connections_lapin_wrapper::amqp_wrapper::AmqpWrapper;
3
4use crate::api::initialization_package::InitializationPackage;
5use crate::api::input::amqp_request_dispatch::AmqpRequestDispatch;
6use crate::api::input::authorizer::try_generate_authorizer;
7use crate::error::{Error, ErrorKind};
8
9use super::output::amqp_output_router::AmqpOutputRouter;
10
11pub async fn initialize<LogicRequestType: Send + 'static>(
12 package: InitializationPackage<LogicRequestType>,
13) -> Result<(), Error> {
14 let logic_request_sender = package.logic_request_sender();
15
16 let api = package.api;
17
18 let input_registration = package.input_registration;
19 let input_elements = input_registration(&api)?;
20
21 let config = package.config;
22 let authorizer = Arc::new(try_generate_authorizer(config.openid_connect).await?);
23
24 let connect_config = config.amqp_connect_config;
25 let mut amqp_wrapper =
26 match AmqpWrapper::try_new(connect_config) {
27 Ok(amqp_wrapper) => amqp_wrapper,
28 Err(error) => return Err(Error::new(ErrorKind::InternalFailure, format!("failed to initialize amqp wrapper: {}", error))),
29 };
30
31 let state_tracker_client = package.state_tracker_client;
32
33 for input_element in input_elements {
34 let channel = match amqp_wrapper.try_get_channel().await {
35 Ok(channel) => channel,
36 Err(error) => return Err(Error::new(ErrorKind::InternalFailure, format!("failed to get channel: {}", error))),
37 };
38
39 let dispatch =
40 AmqpRequestDispatch::new(channel, input_element, authorizer.clone(), logic_request_sender.clone(), state_tracker_client.clone());
41
42 tokio::spawn(dispatch.run());
43 }
44
45 let output_registration = package.output_registration;
46 let output_elements = output_registration(&api, state_tracker_client.clone())?;
47 let output_channel = match amqp_wrapper.try_get_channel().await {
48 Ok(channel) => channel,
49 Err(error) => return Err(Error::new(ErrorKind::InternalFailure, format!("failed to get channel: {}", error))),
50 };
51
52 let output_router = AmqpOutputRouter::new(
53 output_channel,
54 output_elements,
55 package.output_receiver,
56 );
57
58 tokio::spawn(output_router.run());
59
60 Ok(())
61}