amqp_api_server/api/output/
amqp_output_router.rs

1use std::{collections::HashMap, sync::Arc};
2
3use lapin::Channel;
4use serde_json::Value;
5use tokio::sync::mpsc::{Receiver, Sender};
6use crate::api::output::amqp_output_element::AmqpOutputElement;
7
8pub struct AmqpOutputRouter {
9    receiver: Receiver<(String, Value)>,
10    output_senders: HashMap<String, Sender<Value>>,
11}
12
13impl AmqpOutputRouter {
14    pub fn new(
15        channel: Arc<Channel>,
16        elements: Vec<AmqpOutputElement>,
17        receiver: Receiver<(String, Value)>,
18    ) -> AmqpOutputRouter {
19        let mut output_senders = HashMap::new();
20        let channel = channel;
21
22        for element in elements {
23            let (sender, receiver) = tokio::sync::mpsc::channel(1024);
24            output_senders.insert(element.name().to_string(), sender);
25            tokio::spawn(element.run(channel.clone(), receiver));
26        }
27
28        AmqpOutputRouter {
29            receiver,
30            output_senders,
31        }
32    }
33
34    pub async fn run(mut self) {
35        loop {
36            let element_and_data = match self.receiver.recv().await {
37                Some(delivery) => delivery,
38                None => {
39                    log::info!("received none from receiver");
40                    continue;
41                }
42            };
43
44            let output_sender = match self.output_senders.get(&element_and_data.0) {
45                Some(config) => config,
46                None => {
47                    log::error!("missing output element for '{}'", element_and_data.0);
48                    continue;
49                }
50            };
51
52            match output_sender.send(element_and_data.1).await {
53                Ok(_) => (),
54                Err(error) => {
55                    log::error!(
56                        "failed to send data to output element '{}': '{}'",
57                        element_and_data.0,
58                        error
59                    );
60                }
61            }
62        }
63    }
64}