1use crate::actions::channel_manager::CreateJobError;
5use crate::background::processor::{init_processor, IPCData};
6use crate::constants::{EXPIRATION_LAGGED_BY_1, EXPIRATION_LAGGED_BY_2, EXPIRATION_LAGGED_BY_4};
7use hearth_interconnect::messages::Message;
8use lazy_static::lazy_static;
9use log::{error, info};
10use rdkafka::producer::FutureProducer;
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::broadcast::error::TryRecvError;
15use tokio::sync::broadcast::{Receiver, Sender};
16use tokio::sync::{broadcast, Mutex, RwLock};
17use tokio::time;
18
19pub mod actions;
20pub mod background;
21pub(crate) mod constants;
22mod helpers;
23pub mod serenity;
24
25use crate::background::connector::{initialize_client, initialize_producer};
26use rdkafka::consumer::StreamConsumer;
27
28lazy_static! {
29 pub(crate) static ref PRODUCER: Mutex<Option<FutureProducer>> = Mutex::new(None);
30 pub(crate) static ref CONSUMER: Mutex<Option<StreamConsumer>> = Mutex::new(None);
31 pub(crate) static ref TX: Mutex<Option<Sender<String>>> = Mutex::new(None);
32 pub(crate) static ref RX: Mutex<Option<Receiver<String>>> = Mutex::new(None);
33}
34
35pub struct PlayerObject {
37 worker_id: Arc<RwLock<Option<String>>>,
38 job_id: Arc<RwLock<Option<String>>>,
39 guild_id: String,
40 tx: Arc<Sender<IPCData>>,
41 bg_com_tx: Sender<IPCData>,
42}
43
44impl PlayerObject {
45 pub async fn new(guild_id: String, com_tx: Sender<IPCData>) -> Result<Self, CreateJobError> {
47 let (tx, _rx) = broadcast::channel(16);
48
49 let handler = PlayerObject {
50 worker_id: Arc::new(RwLock::new(None)),
51 job_id: Arc::new(RwLock::new(None)),
52 guild_id,
53 tx: Arc::new(tx),
54 bg_com_tx: com_tx,
55 };
56
57 Ok(handler)
58 }
59}
60
61pub struct Charcoal {
63 pub players: Arc<RwLock<HashMap<String, PlayerObject>>>, pub tx: Sender<IPCData>,
65 pub rx: Receiver<IPCData>,
66}
67
68impl Charcoal {
69 fn start_global_checker(&mut self) {
70 info!("Started global data checker!");
71 let mut rxx = self.tx.subscribe();
72 let t_players = self.players.clone();
73 let mut tick_adjustments = 0;
74 tokio::task::spawn(async move {
75 let mut interval = time::interval(Duration::from_secs(1));
76
77 loop {
78 interval.tick().await;
79 let catch = rxx.try_recv();
80 match catch {
81 Ok(c) => {
82 if let IPCData::FromBackground(bg) = c {
83 match bg.message {
84 Message::ExternalJobExpired(je) => {
85 info!("Job Expired: {}", je.job_id);
86 let mut t_p_write = t_players.write().await;
87 t_p_write.remove(&je.guild_id);
88 }
89 Message::WorkerShutdownAlert(shutdown_alert) => {
90 info!("Worker shutdown! Cancelling Players!");
91 let mut t_p_write = t_players.write().await;
92 for job_id in shutdown_alert.affected_guild_ids {
93 t_p_write.remove(&job_id);
94 }
95 }
96 _ => {}
97 }
98 }
99 }
100 Err(e) => match e {
101 TryRecvError::Empty => {}
102 TryRecvError::Lagged(count) => {
103 error!("Expiration Checker - Lagged by: {}", count);
104 let mut tick_adjustment_ratio: f32 = tick_adjustments as f32;
105 if count >= 4 {
106 tick_adjustment_ratio =
107 (tick_adjustment_ratio * 1.2).clamp(1.0, 3.0);
108 interval = time::interval(Duration::from_millis(
109 (EXPIRATION_LAGGED_BY_4 / tick_adjustment_ratio) as u64,
110 ));
111 tick_adjustments += 1;
112 } else if count >= 2 {
113 tick_adjustment_ratio =
114 (tick_adjustment_ratio * 1.2).clamp(1.0, 3.0);
115 interval = time::interval(Duration::from_millis(
116 (EXPIRATION_LAGGED_BY_2 / tick_adjustment_ratio) as u64,
117 ));
118 tick_adjustments += 1;
119 } else if count >= 1 {
120 tick_adjustment_ratio =
121 (tick_adjustment_ratio * 1.2).clamp(1.0, 3.0);
122 interval = time::interval(Duration::from_millis(
123 (EXPIRATION_LAGGED_BY_1 / tick_adjustment_ratio) as u64,
124 ));
125 tick_adjustments += 1;
126 }
127 info!("Performed auto adjustment to prevent lag new interval: {} milliseconds",interval.period().as_millis());
128 }
129 _ => {
130 error!("Failed to receive expiration check with error: {}", e);
131 }
132 },
133 }
134 }
135 });
136 }
137}
138
139#[derive(Clone)]
140pub struct SSLConfig {
142 pub ssl_key: String,
144 pub ssl_ca: String,
146 pub ssl_cert: String,
148}
149
150#[derive(Clone)]
151pub struct SASLConfig {
152 pub kafka_username: String,
154 pub kafka_password: String,
156}
157
158#[derive(Clone)]
159pub struct CharcoalConfig {
161 pub ssl: Option<SSLConfig>,
163 pub sasl: Option<SASLConfig>,
165 pub kafka_topic: String,
167}
168
169pub async fn init_charcoal(broker: String, config: CharcoalConfig) -> Arc<Mutex<Charcoal>> {
171 let consumer = initialize_client(&broker, &config).await;
174
175 let producer = initialize_producer(&broker, &config);
176
177 let (tx, rx) = broadcast::channel(16);
178
179 let global_rx = tx.subscribe();
180 let sub_tx = tx.clone();
181
182 tokio::task::spawn(async move {
183 init_processor(rx, sub_tx, consumer, producer, config).await;
184 });
185
186 let mut c_instance = Charcoal {
187 players: Arc::new(RwLock::new(HashMap::new())),
188 tx,
189 rx: global_rx,
190 };
191
192 c_instance.start_global_checker(); Arc::new(Mutex::new(c_instance))
195}