amqp_api_server/api/output/
amqp_output_router.rs1use std::{collections::HashMap, sync::Arc};
2
3use lapin::Channel;
4use serde_json::Value;
5use tokio::sync::mpsc::{Receiver, Sender};
6use crate::api::output::amqp_output_element::AmqpOutputElement;
7
8pub struct AmqpOutputRouter {
9 receiver: Receiver<(String, Value)>,
10 output_senders: HashMap<String, Sender<Value>>,
11}
12
13impl AmqpOutputRouter {
14 pub fn new(
15 channel: Arc<Channel>,
16 elements: Vec<AmqpOutputElement>,
17 receiver: Receiver<(String, Value)>,
18 ) -> AmqpOutputRouter {
19 let mut output_senders = HashMap::new();
20 let channel = channel;
21
22 for element in elements {
23 let (sender, receiver) = tokio::sync::mpsc::channel(1024);
24 output_senders.insert(element.name().to_string(), sender);
25 tokio::spawn(element.run(channel.clone(), receiver));
26 }
27
28 AmqpOutputRouter {
29 receiver,
30 output_senders,
31 }
32 }
33
34 pub async fn run(mut self) {
35 loop {
36 let element_and_data = match self.receiver.recv().await {
37 Some(delivery) => delivery,
38 None => {
39 log::info!("received none from receiver");
40 continue;
41 }
42 };
43
44 let output_sender = match self.output_senders.get(&element_and_data.0) {
45 Some(config) => config,
46 None => {
47 log::error!("missing output element for '{}'", element_and_data.0);
48 continue;
49 }
50 };
51
52 match output_sender.send(element_and_data.1).await {
53 Ok(_) => (),
54 Err(error) => {
55 log::error!(
56 "failed to send data to output element '{}': '{}'",
57 element_and_data.0,
58 error
59 );
60 }
61 }
62 }
63 }
64}