charcoal_client/background/
processor.rs

1use crate::background::connector::send_message;
2use crate::CharcoalConfig;
3use hearth_interconnect::errors::ErrorReport;
4use hearth_interconnect::messages::{Message, Metadata};
5use log::{debug, error};
6use rdkafka::consumer::BaseConsumer;
7use rdkafka::producer::FutureProducer;
8use rdkafka::Message as KafkaMessage;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::broadcast::{Receiver, Sender};
13
14#[derive(Clone, Debug)]
15pub struct FromBackgroundData {
16    pub message: Message,
17}
18
19#[derive(Clone, Debug)]
20pub struct FromMainData {
21    pub message: Message,
22    pub response_tx: Arc<Sender<IPCData>>,
23    pub guild_id: String,
24}
25
26#[derive(Clone, Debug)]
27pub enum IPCData {
28    FromBackground(FromBackgroundData),
29    FromMain(FromMainData),
30    ErrorReport(ErrorReport),
31    MetadataResult(Metadata),
32}
33
34// Makes things slightly easier
35impl IPCData {
36    pub fn new_from_main(
37        message: Message,
38        sender: Arc<Sender<IPCData>>,
39        guild_id: String,
40    ) -> IPCData {
41        IPCData::FromMain(FromMainData {
42            message,
43            response_tx: sender,
44            guild_id,
45        })
46    }
47    pub fn new_from_background(message: Message) -> IPCData {
48        IPCData::FromBackground(FromBackgroundData { message })
49    }
50}
51
52pub async fn parse_message(
53    message: Message,
54    guild_id_to_tx: &mut HashMap<String, Arc<Sender<IPCData>>>,
55    global_tx: &mut Sender<IPCData>,
56) {
57    match &message {
58        Message::ErrorReport(e) => {
59            error!("GOT Error: {:?} From Hearth Server", e);
60            let tx = guild_id_to_tx.get_mut(&e.guild_id);
61            match tx {
62                Some(tx) => {
63                    let gt = tx.send(IPCData::ErrorReport(e.clone()));
64                    match gt {
65                        Ok(_) => {}
66                        Err(e) => {
67                            error!("Failed to send error report with error: {:?}", e);
68                        }
69                    }
70                }
71                None => {
72                    error!("Failed to get appropriate sender when attempting to send error report")
73                }
74            }
75        }
76        Message::ExternalJobExpired(_je) => {
77            let r = global_tx.send(IPCData::new_from_background(message));
78            match r {
79                Ok(_) => {}
80                Err(e) => {
81                    error!(
82                        "Failed to send Kafka message to main thread once received with error: {}!",
83                        e
84                    )
85                }
86            }
87        }
88        Message::WorkerShutdownAlert(_) => {
89            let r = global_tx.send(IPCData::new_from_background(message));
90            match r {
91                Ok(_) => {}
92                Err(e) => {
93                    error!(
94                        "Failed to send Kafka message to main thread once received with error: {}!",
95                        e
96                    )
97                }
98            }
99        }
100        Message::ExternalQueueJobResponse(r) => {
101            let tx = guild_id_to_tx.get_mut(&r.guild_id);
102            match tx {
103                Some(tx) => {
104                    let r = tx.send(IPCData::new_from_background(message));
105                    match r {
106                        Ok(_) => {}
107                        Err(e) => {
108                            error!("Failed to send Kafka message to main thread once received with error: {}!",e)
109                        }
110                    }
111                }
112                None => {
113                    error!("Failed to send Response from BG Thread!");
114                }
115            }
116        }
117        Message::ExternalMetadataResult(metadata) => {
118            let tx = guild_id_to_tx.get_mut(&metadata.guild_id);
119            match tx {
120                Some(tx) => {
121                    let gt = tx.send(IPCData::MetadataResult(metadata.clone()));
122                    match gt {
123                        Ok(_) => {}
124                        Err(e) => {
125                            error!("Failed to send error report with error: {:?}", e);
126                        }
127                    }
128                }
129                None => {
130                    error!("Failed to get appropriate sender when attempting to send error report")
131                }
132            }
133        }
134        _ => {}
135    }
136}
137
138pub async fn init_processor(
139    mut rx: Receiver<IPCData>,
140    mut global_tx: Sender<IPCData>,
141    consumer: BaseConsumer,
142    mut producer: FutureProducer,
143    config: CharcoalConfig,
144) {
145    let mut guild_id_to_tx: HashMap<String, Arc<Sender<IPCData>>> = HashMap::new();
146    loop {
147        let mss = consumer.poll(Duration::from_millis(25));
148        if let Some(p) = mss {
149            match p {
150                Ok(m) => {
151                    let payload = m.payload();
152
153                    match payload {
154                        Some(payload) => {
155                            let parsed_message: Result<Message, serde_json::Error> =
156                                serde_json::from_slice(payload);
157
158                            match parsed_message {
159                                Ok(m) => {
160                                    parse_message(m, &mut guild_id_to_tx, &mut global_tx).await;
161                                }
162                                Err(e) => error!("{}", e),
163                            }
164                        }
165                        None => {
166                            error!("Received No Payload!");
167                        }
168                    }
169                }
170                Err(e) => error!("{}", e),
171            }
172        }
173        // Receive messages from main function
174        let rx_data = rx.try_recv();
175        match rx_data {
176            Ok(d) => {
177                if let IPCData::FromMain(m) = d {
178                    guild_id_to_tx.insert(m.guild_id, m.response_tx);
179                    send_message(&m.message, &config.kafka_topic, &mut producer).await;
180                }
181            }
182            Err(e) => {
183                if e.to_string() == "channel empty" {
184                    debug!("Channel empty!");
185                } else {
186                    error!("Receive failed with: {}", e);
187                }
188            }
189        }
190    }
191}