charcoal_client/
lib.rs

1//! Charcoal is a client-library for Hearth that makes it easy to use Hearth with Rust.
2//! See Examples in the Github repo [here](https://github.com/Hearth-Industries/Charcoal/tree/main/examples)
3
4use crate::actions::channel_manager::CreateJobError;
5use crate::background::processor::{init_processor, IPCData};
6use crate::constants::{EXPIRATION_LAGGED_BY_1, EXPIRATION_LAGGED_BY_2, EXPIRATION_LAGGED_BY_4};
7use hearth_interconnect::messages::Message;
8use lazy_static::lazy_static;
9use log::{error, info};
10use rdkafka::producer::FutureProducer;
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::broadcast::error::TryRecvError;
15use tokio::sync::broadcast::{Receiver, Sender};
16use tokio::sync::{broadcast, Mutex, RwLock};
17use tokio::time;
18
19pub mod actions;
20pub mod background;
21pub(crate) mod constants;
22mod helpers;
23pub mod serenity;
24
25use crate::background::connector::{initialize_client, initialize_producer};
26use rdkafka::consumer::StreamConsumer;
27
28lazy_static! {
29    pub(crate) static ref PRODUCER: Mutex<Option<FutureProducer>> = Mutex::new(None);
30    pub(crate) static ref CONSUMER: Mutex<Option<StreamConsumer>> = Mutex::new(None);
31    pub(crate) static ref TX: Mutex<Option<Sender<String>>> = Mutex::new(None);
32    pub(crate) static ref RX: Mutex<Option<Receiver<String>>> = Mutex::new(None);
33}
34
35/// Represents an instance in a voice channel
36pub struct PlayerObject {
37    worker_id: Arc<RwLock<Option<String>>>,
38    job_id: Arc<RwLock<Option<String>>>,
39    guild_id: String,
40    tx: Arc<Sender<IPCData>>,
41    bg_com_tx: Sender<IPCData>,
42}
43
44impl PlayerObject {
45    /// Creates a new Player Object that can then be joined to channel and used to playback audio
46    pub async fn new(guild_id: String, com_tx: Sender<IPCData>) -> Result<Self, CreateJobError> {
47        let (tx, _rx) = broadcast::channel(16);
48
49        let handler = PlayerObject {
50            worker_id: Arc::new(RwLock::new(None)),
51            job_id: Arc::new(RwLock::new(None)),
52            guild_id,
53            tx: Arc::new(tx),
54            bg_com_tx: com_tx,
55        };
56
57        Ok(handler)
58    }
59}
60
61/// Stores Charcoal instance
62pub struct Charcoal {
63    pub players: Arc<RwLock<HashMap<String, PlayerObject>>>, // Guild ID to PlayerObject
64    pub tx: Sender<IPCData>,
65    pub rx: Receiver<IPCData>,
66}
67
68impl Charcoal {
69    fn start_global_checker(&mut self) {
70        info!("Started global data checker!");
71        let mut rxx = self.tx.subscribe();
72        let t_players = self.players.clone();
73        let mut tick_adjustments = 0;
74        tokio::task::spawn(async move {
75            let mut interval = time::interval(Duration::from_secs(1));
76
77            loop {
78                interval.tick().await;
79                let catch = rxx.try_recv();
80                match catch {
81                    Ok(c) => {
82                        if let IPCData::FromBackground(bg) = c {
83                            match bg.message {
84                                Message::ExternalJobExpired(je) => {
85                                    info!("Job Expired: {}", je.job_id);
86                                    let mut t_p_write = t_players.write().await;
87                                    t_p_write.remove(&je.guild_id);
88                                }
89                                Message::WorkerShutdownAlert(shutdown_alert) => {
90                                    info!("Worker shutdown! Cancelling Players!");
91                                    let mut t_p_write = t_players.write().await;
92                                    for job_id in shutdown_alert.affected_guild_ids {
93                                        t_p_write.remove(&job_id);
94                                    }
95                                }
96                                _ => {}
97                            }
98                        }
99                    }
100                    Err(e) => match e {
101                        TryRecvError::Empty => {}
102                        TryRecvError::Lagged(count) => {
103                            error!("Expiration Checker - Lagged by: {}", count);
104                            let mut tick_adjustment_ratio: f32 = tick_adjustments as f32;
105                            if count >= 4 {
106                                tick_adjustment_ratio =
107                                    (tick_adjustment_ratio * 1.2).clamp(1.0, 3.0);
108                                interval = time::interval(Duration::from_millis(
109                                    (EXPIRATION_LAGGED_BY_4 / tick_adjustment_ratio) as u64,
110                                ));
111                                tick_adjustments += 1;
112                            } else if count >= 2 {
113                                tick_adjustment_ratio =
114                                    (tick_adjustment_ratio * 1.2).clamp(1.0, 3.0);
115                                interval = time::interval(Duration::from_millis(
116                                    (EXPIRATION_LAGGED_BY_2 / tick_adjustment_ratio) as u64,
117                                ));
118                                tick_adjustments += 1;
119                            } else if count >= 1 {
120                                tick_adjustment_ratio =
121                                    (tick_adjustment_ratio * 1.2).clamp(1.0, 3.0);
122                                interval = time::interval(Duration::from_millis(
123                                    (EXPIRATION_LAGGED_BY_1 / tick_adjustment_ratio) as u64,
124                                ));
125                                tick_adjustments += 1;
126                            }
127                            info!("Performed auto adjustment to prevent lag new interval: {} milliseconds",interval.period().as_millis());
128                        }
129                        _ => {
130                            error!("Failed to receive expiration check with error: {}", e);
131                        }
132                    },
133                }
134            }
135        });
136    }
137}
138
139#[derive(Clone)]
140/// Stores SSL Config for Kafka
141pub struct SSLConfig {
142    /// Path to the SSL key file
143    pub ssl_key: String,
144    /// Path to the SSL CA file
145    pub ssl_ca: String,
146    /// Path to the SSL cert file
147    pub ssl_cert: String,
148}
149
150#[derive(Clone)]
151pub struct SASLConfig {
152    /// Kafka Username
153    pub kafka_username: String,
154    /// Kafka Password
155    pub kafka_password: String,
156}
157
158#[derive(Clone)]
159/// Configuration for charcoal
160pub struct CharcoalConfig {
161    /// Configure SSl for kafka. If left as None no SSL is configured
162    pub ssl: Option<SSLConfig>,
163    /// Configure SASL/Password and Username Based Authentication for Kafka. If left as None no SASL is configured
164    pub sasl: Option<SASLConfig>,
165    /// Kafka topic to connect to. This should be the same one the hearth server(s) are on.
166    pub kafka_topic: String,
167}
168
169/// Initializes Charcoal Instance
170pub async fn init_charcoal(broker: String, config: CharcoalConfig) -> Arc<Mutex<Charcoal>> {
171    // This isn't great we should really switch to rdkafka instead of kafka
172
173    let consumer = initialize_client(&broker, &config).await;
174
175    let producer = initialize_producer(&broker, &config);
176
177    let (tx, rx) = broadcast::channel(16);
178
179    let global_rx = tx.subscribe();
180    let sub_tx = tx.clone();
181
182    tokio::task::spawn(async move {
183        init_processor(rx, sub_tx, consumer, producer, config).await;
184    });
185
186    let mut c_instance = Charcoal {
187        players: Arc::new(RwLock::new(HashMap::new())),
188        tx,
189        rx: global_rx,
190    };
191
192    c_instance.start_global_checker(); // Start checking for expired jobs
193
194    Arc::new(Mutex::new(c_instance))
195}