amqp_api_server/api/output/
amqp_output_element.rs1use std::sync::Arc;
2use state_tracker::state::State;
3use state_tracker::state_tracker_client::StateTrackerClient;
4
5use lapin::Channel;
6use serde_json::Value;
7use tokio::sync::mpsc::Receiver;
8use crate::config::amqp_output_api::AmqpOutputApi;
9
10pub struct AmqpOutputElement {
11 name: String,
12 output_config: AmqpOutputApi,
13 state_tracker: StateTrackerClient,
14}
15
16impl AmqpOutputElement {
17 pub fn new(name: String, output_config: AmqpOutputApi, mut state_tracker: StateTrackerClient) -> AmqpOutputElement {
18 state_tracker.set_id(name.clone());
19
20 AmqpOutputElement {
21 name,
22 output_config,
23 state_tracker,
24 }
25 }
26
27 pub fn name(&self) -> &str {
28 self.name.as_str()
29 }
30
31 pub fn output_config(&self) -> &AmqpOutputApi {
32 &self.output_config
33 }
34
35 pub fn owned_output_config(self) -> AmqpOutputApi {
36 self.output_config
37 }
38}
39
40impl AmqpOutputElement {
41 pub async fn run(self, channel: Arc<Channel>, mut receiver: Receiver<Value>) {
42 let queue = self.output_config.queue();
43
44 match channel
45 .queue_declare(
46 queue.name(),
47 *queue.declare().options(),
48 queue.declare().arguments().clone(),
49 )
50 .await
51 {
52 Ok(_) => (),
53 Err(error) => {
54 handle_error(format!("failed to declare queue for output element '{}': '{}'",
55 self.name,
56 error), &self.state_tracker).await;
57
58 return;
59 }
60 };
61
62 loop {
63 let data = match receiver.recv().await {
64 Some(data) => data,
65 None => {
66 log::info!("received none from receiver");
67 continue;
68 }
69 };
70
71 let payload = match serde_json::to_vec(&data) {
72 Ok(payload) => payload,
73 Err(error) => {
74 handle_error(format!("failed to serialize output data as bytes: {}", error), &self.state_tracker).await;
75 continue;
76 }
77 };
78
79 match channel
80 .basic_publish(
81 self.output_config.publish().exchange(),
82 queue.name(),
83 *self.output_config.publish().options(),
84 payload.as_slice(),
85 self.output_config
86 .publish()
87 .properties()
88 .clone(),
89 )
90 .await
91 {
92 Ok(_) => (),
93 Err(error) => {
94 handle_error(format!("failed to publish to queue: {}", error), &self.state_tracker).await;
95 continue;
96 }
97 }
98
99 match self.state_tracker.send_state(State::Valid).await {
100 Ok(_) => (),
101 Err(error) => log::warn!("failed to send valid state to state tracker: {}", error),
102 }
103 }
104 }
105}
106
107async fn handle_error(error_message: String, state_tracker: &StateTrackerClient) {
108 log::error!("{}", error_message);
109
110 match state_tracker.send_state(State::Error(error_message)).await {
111 Ok(_) => (),
112 Err(error) => {
113 log::warn!("failed to send error state to state tracker: '{}'", error);
114 }
115 }
116}