amqp_api_server/api/output/
amqp_output_element.rs

1use std::sync::Arc;
2use state_tracker::state::State;
3use state_tracker::state_tracker_client::StateTrackerClient;
4
5use lapin::Channel;
6use serde_json::Value;
7use tokio::sync::mpsc::Receiver;
8use crate::config::amqp_output_api::AmqpOutputApi;
9
10pub struct AmqpOutputElement {
11    name: String,
12    output_config: AmqpOutputApi,
13    state_tracker: StateTrackerClient,
14}
15
16impl AmqpOutputElement {
17    pub fn new(name: String, output_config: AmqpOutputApi, mut state_tracker: StateTrackerClient) -> AmqpOutputElement {
18        state_tracker.set_id(name.clone());
19
20        AmqpOutputElement {
21            name,
22            output_config,
23            state_tracker,
24        }
25    }
26
27    pub fn name(&self) -> &str {
28        self.name.as_str()
29    }
30
31    pub fn output_config(&self) -> &AmqpOutputApi {
32        &self.output_config
33    }
34
35    pub fn owned_output_config(self) -> AmqpOutputApi {
36        self.output_config
37    }
38}
39
40impl AmqpOutputElement {
41    pub async fn run(self, channel: Arc<Channel>, mut receiver: Receiver<Value>) {
42        let queue = self.output_config.queue();
43
44        match channel
45            .queue_declare(
46                queue.name(),
47                *queue.declare().options(),
48                queue.declare().arguments().clone(),
49            )
50            .await
51        {
52            Ok(_) => (),
53            Err(error) => {
54                handle_error(format!("failed to declare queue for output element '{}': '{}'",
55                                     self.name,
56                                     error), &self.state_tracker).await;
57
58                return;
59            }
60        };
61
62        loop {
63            let data = match receiver.recv().await {
64                Some(data) => data,
65                None => {
66                    log::info!("received none from receiver");
67                    continue;
68                }
69            };
70
71            let payload = match serde_json::to_vec(&data) {
72                Ok(payload) => payload,
73                Err(error) => {
74                    handle_error(format!("failed to serialize output data as bytes: {}", error), &self.state_tracker).await;
75                    continue;
76                }
77            };
78
79            match channel
80                .basic_publish(
81                    self.output_config.publish().exchange(),
82                    queue.name(),
83                    *self.output_config.publish().options(),
84                    payload.as_slice(),
85                    self.output_config
86                        .publish()
87                        .properties()
88                        .clone(),
89                )
90                .await
91            {
92                Ok(_) => (),
93                Err(error) => {
94                    handle_error(format!("failed to publish to queue: {}", error), &self.state_tracker).await;
95                    continue;
96                }
97            }
98
99            match self.state_tracker.send_state(State::Valid).await {
100                Ok(_) => (),
101                Err(error) => log::warn!("failed to send valid state to state tracker: {}", error),
102            }
103        }
104    }
105}
106
107async fn handle_error(error_message: String, state_tracker: &StateTrackerClient) {
108    log::error!("{}", error_message);
109
110    match state_tracker.send_state(State::Error(error_message)).await {
111        Ok(_) => (),
112        Err(error) => {
113            log::warn!("failed to send error state to state tracker: '{}'", error);
114        }
115    }
116}