charcoal_client/background/
processor.rs1use crate::background::connector::send_message;
2use crate::CharcoalConfig;
3use hearth_interconnect::errors::ErrorReport;
4use hearth_interconnect::messages::{Message, Metadata};
5use log::{debug, error};
6use rdkafka::consumer::BaseConsumer;
7use rdkafka::producer::FutureProducer;
8use rdkafka::Message as KafkaMessage;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::broadcast::{Receiver, Sender};
13
14#[derive(Clone, Debug)]
15pub struct FromBackgroundData {
16 pub message: Message,
17}
18
19#[derive(Clone, Debug)]
20pub struct FromMainData {
21 pub message: Message,
22 pub response_tx: Arc<Sender<IPCData>>,
23 pub guild_id: String,
24}
25
26#[derive(Clone, Debug)]
27pub enum IPCData {
28 FromBackground(FromBackgroundData),
29 FromMain(FromMainData),
30 ErrorReport(ErrorReport),
31 MetadataResult(Metadata),
32}
33
34impl IPCData {
36 pub fn new_from_main(
37 message: Message,
38 sender: Arc<Sender<IPCData>>,
39 guild_id: String,
40 ) -> IPCData {
41 IPCData::FromMain(FromMainData {
42 message,
43 response_tx: sender,
44 guild_id,
45 })
46 }
47 pub fn new_from_background(message: Message) -> IPCData {
48 IPCData::FromBackground(FromBackgroundData { message })
49 }
50}
51
52pub async fn parse_message(
53 message: Message,
54 guild_id_to_tx: &mut HashMap<String, Arc<Sender<IPCData>>>,
55 global_tx: &mut Sender<IPCData>,
56) {
57 match &message {
58 Message::ErrorReport(e) => {
59 error!("GOT Error: {:?} From Hearth Server", e);
60 let tx = guild_id_to_tx.get_mut(&e.guild_id);
61 match tx {
62 Some(tx) => {
63 let gt = tx.send(IPCData::ErrorReport(e.clone()));
64 match gt {
65 Ok(_) => {}
66 Err(e) => {
67 error!("Failed to send error report with error: {:?}", e);
68 }
69 }
70 }
71 None => {
72 error!("Failed to get appropriate sender when attempting to send error report")
73 }
74 }
75 }
76 Message::ExternalJobExpired(_je) => {
77 let r = global_tx.send(IPCData::new_from_background(message));
78 match r {
79 Ok(_) => {}
80 Err(e) => {
81 error!(
82 "Failed to send Kafka message to main thread once received with error: {}!",
83 e
84 )
85 }
86 }
87 }
88 Message::WorkerShutdownAlert(_) => {
89 let r = global_tx.send(IPCData::new_from_background(message));
90 match r {
91 Ok(_) => {}
92 Err(e) => {
93 error!(
94 "Failed to send Kafka message to main thread once received with error: {}!",
95 e
96 )
97 }
98 }
99 }
100 Message::ExternalQueueJobResponse(r) => {
101 let tx = guild_id_to_tx.get_mut(&r.guild_id);
102 match tx {
103 Some(tx) => {
104 let r = tx.send(IPCData::new_from_background(message));
105 match r {
106 Ok(_) => {}
107 Err(e) => {
108 error!("Failed to send Kafka message to main thread once received with error: {}!",e)
109 }
110 }
111 }
112 None => {
113 error!("Failed to send Response from BG Thread!");
114 }
115 }
116 }
117 Message::ExternalMetadataResult(metadata) => {
118 let tx = guild_id_to_tx.get_mut(&metadata.guild_id);
119 match tx {
120 Some(tx) => {
121 let gt = tx.send(IPCData::MetadataResult(metadata.clone()));
122 match gt {
123 Ok(_) => {}
124 Err(e) => {
125 error!("Failed to send error report with error: {:?}", e);
126 }
127 }
128 }
129 None => {
130 error!("Failed to get appropriate sender when attempting to send error report")
131 }
132 }
133 }
134 _ => {}
135 }
136}
137
138pub async fn init_processor(
139 mut rx: Receiver<IPCData>,
140 mut global_tx: Sender<IPCData>,
141 consumer: BaseConsumer,
142 mut producer: FutureProducer,
143 config: CharcoalConfig,
144) {
145 let mut guild_id_to_tx: HashMap<String, Arc<Sender<IPCData>>> = HashMap::new();
146 loop {
147 let mss = consumer.poll(Duration::from_millis(25));
148 if let Some(p) = mss {
149 match p {
150 Ok(m) => {
151 let payload = m.payload();
152
153 match payload {
154 Some(payload) => {
155 let parsed_message: Result<Message, serde_json::Error> =
156 serde_json::from_slice(payload);
157
158 match parsed_message {
159 Ok(m) => {
160 parse_message(m, &mut guild_id_to_tx, &mut global_tx).await;
161 }
162 Err(e) => error!("{}", e),
163 }
164 }
165 None => {
166 error!("Received No Payload!");
167 }
168 }
169 }
170 Err(e) => error!("{}", e),
171 }
172 }
173 let rx_data = rx.try_recv();
175 match rx_data {
176 Ok(d) => {
177 if let IPCData::FromMain(m) = d {
178 guild_id_to_tx.insert(m.guild_id, m.response_tx);
179 send_message(&m.message, &config.kafka_topic, &mut producer).await;
180 }
181 }
182 Err(e) => {
183 if e.to_string() == "channel empty" {
184 debug!("Channel empty!");
185 } else {
186 error!("Receive failed with: {}", e);
187 }
188 }
189 }
190 }
191}