Skip to main content

fluxer/client/
mod.rs

1//! Gateway client and connection management.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use futures::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
9use crate::cache::Cache;
10use crate::error::ClientError;
11use crate::event::EventHandler;
12use crate::http::Http;
13use crate::model::voice::VoiceState;
14use std::time::Duration;
15
16const DEFAULT_API_URL: &str = "https://api.fluxer.app/v1";
17const DEFAULT_GATEWAY_URL: &str = "wss://gateway.fluxer.app/?v=1&encoding=json";
18
19enum LoopControl {
20    Done,
21    Reconnect { resume: bool },
22}
23
24/// Shared state passed into every event handler call. This is how you interact
25/// with the API from inside your event handlers.
26///
27/// ```rust,no_run
28/// # use fluxer::prelude::*;
29/// # async fn example(ctx: Context) {
30/// ctx.http.send_message("channel_id", "Hello!").await.unwrap();
31/// # }
32/// ```
33#[derive(Clone)]
34pub struct Context {
35    /// HTTP client for REST API calls.
36    pub http: Arc<Http>,
37    /// In-memory cache populated automatically from gateway events.
38    pub cache: Arc<Cache>,
39    /// Raw gateway sender. You probably won't need this directly --
40    /// voice join/leave use it internally.
41    pub gateway_tx: Arc<tokio::sync::mpsc::Sender<String>>,
42    /// Per-guild voice state, populated from `VOICE_STATE_UPDATE` and `VOICE_SERVER_UPDATE`.
43    /// Used internally by `join_voice` / `leave_voice`.
44    pub voice_states: Arc<Mutex<HashMap<String, VoiceState>>>,
45    pub(crate) live_rooms: Arc<Mutex<HashMap<String, std::sync::Arc<livekit::Room>>>>,
46}
47
48impl Context {
49    /// Joins a voice channel. Sends an opcode 4 to the gateway and waits
50    /// up to 10 seconds for the server to send back connection details.
51    pub async fn join_voice(
52        &self,
53        guild_id: &str,
54        channel_id: &str,
55    ) -> Result<crate::voice::FluxerVoiceConnection, ClientError> {
56        {
57            let mut states = self.voice_states.lock().await;
58            states.remove(guild_id);
59        }
60
61        let join_payload = serde_json::json!({
62            "op": 4,
63            "d": {
64                "guild_id": guild_id,
65                "channel_id": channel_id,
66                "self_mute": false,
67                "self_deaf": false
68            }
69        });
70        self.gateway_tx
71            .send(join_payload.to_string())
72            .await
73            .map_err(|e| ClientError::Voice(e.to_string()))?;
74
75        let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
76        let voice_state = loop {
77            {
78                let states = self.voice_states.lock().await;
79                if let Some(vs) = states.get(guild_id) {
80                    if !vs.token.is_empty() && !vs.endpoint.is_empty() {
81                        break vs.clone();
82                    }
83                }
84            }
85            if tokio::time::Instant::now() >= deadline {
86                return Err(ClientError::Voice(
87                    "Timed out waiting for VOICE_SERVER_UPDATE".into(),
88                ));
89            }
90            tokio::time::sleep(Duration::from_millis(100)).await;
91        };
92
93        let conn = crate::voice::FluxerVoiceConnection::connect(
94            &voice_state.endpoint,
95            &voice_state.token,
96        )
97        .await
98        .map_err(|e| ClientError::Voice(e.to_string()))?;
99
100        self.live_rooms.lock().await.insert(guild_id.to_string(), conn.room.clone());
101
102        Ok(conn)
103    }
104
105    /// Leaves a voice channel. Closes the LiveKit room and tells the gateway.
106    pub async fn leave_voice(&self, guild_id: &str) -> Result<(), ClientError> {
107        if let Some(room) = self.live_rooms.lock().await.remove(guild_id) {
108            let _ = room.close().await;
109        }
110
111        let payload = serde_json::json!({
112            "op": 4,
113            "d": {
114                "guild_id": guild_id,
115                "channel_id": null,
116                "self_mute": false,
117                "self_deaf": false
118            }
119        });
120        self.gateway_tx
121            .send(payload.to_string())
122            .await
123            .map_err(|e| ClientError::Voice(e.to_string()))?;
124        self.voice_states.lock().await.remove(guild_id);
125        Ok(())
126    }
127}
128
129/// Builder for creating a [`Client`]. You need at minimum a token and an event handler.
130///
131/// ```rust,no_run
132/// use fluxer::prelude::*;
133/// # struct MyHandler;
134/// # #[async_trait::async_trait]
135/// # impl EventHandler for MyHandler {}
136///
137/// let client = Client::builder("token")
138///     .event_handler(MyHandler)
139///     .build();
140/// ```
141pub struct ClientBuilder {
142    token: String,
143    api_url: String,
144    handler: Option<Arc<dyn EventHandler>>,
145}
146
147impl ClientBuilder {
148    pub fn new(token: impl Into<String>) -> Self {
149        Self {
150            token: token.into(),
151            api_url: DEFAULT_API_URL.to_string(),
152            handler: None,
153        }
154    }
155
156    /// Sets the event handler. Required -- the builder panics at `.build()` without this.
157    pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
158        self.handler = Some(Arc::new(handler));
159        self
160    }
161
162    /// Override the API base URL. Defaults to `https://api.fluxer.app/v1`.
163    pub fn api_url(mut self, url: impl Into<String>) -> Self {
164        self.api_url = url.into();
165        self
166    }
167
168    pub fn build(self) -> Client {
169        let _ = rustls::crypto::ring::default_provider().install_default();
170        let http = Arc::new(Http::new(&self.token, self.api_url));
171        Client {
172            http,
173            cache: Cache::new(),
174            handler: self.handler.expect("call .event_handler() before .build()"),
175        }
176    }
177}
178
179/// The gateway client. Manages the WebSocket connection, heartbeating,
180/// reconnection, and event dispatch.
181///
182/// Call [`start`](Client::start) to connect. It runs until a fatal error
183/// happens (like an invalid token) and reconnects automatically on transient
184/// failures.
185pub struct Client {
186    pub(crate) http: Arc<Http>,
187    cache: Arc<Cache>,
188    handler: Arc<dyn EventHandler>,
189}
190
191impl Client {
192    pub fn builder(token: impl Into<String>) -> ClientBuilder {
193        ClientBuilder::new(token)
194    }
195
196    /// Connects to the gateway and starts processing events. Blocks forever
197    /// unless a fatal error occurs.
198    pub async fn start(&mut self) -> Result<(), ClientError> {
199        let mut session_id: Option<String> = None;
200        let mut resume_url: Option<String> = None;
201        let mut last_seq: Option<u64> = None;
202        let mut backoff = Duration::from_secs(1);
203
204        loop {
205            let result = self
206                .run_session(&mut session_id, &mut resume_url, &mut last_seq)
207                .await;
208
209            match result {
210                Ok(LoopControl::Done) => return Ok(()),
211
212                Ok(LoopControl::Reconnect { resume }) => {
213                    if !resume {
214                        session_id = None;
215                        resume_url = None;
216                        last_seq = None;
217                        let jitter = Duration::from_millis(1000 + (rand::random::<u64>() % 4000));
218                        tokio::time::sleep(jitter).await;
219                    } else {
220                        eprintln!("[fluxer-rs] Reconnecting in {:?} (will resume)...", backoff);
221                        tokio::time::sleep(backoff).await;
222                        backoff = (backoff * 2).min(Duration::from_secs(60));
223                        continue;
224                    }
225                }
226
227                Err(ClientError::ConnectionClosed) => {
228                    eprintln!("[fluxer-rs] Connection closed, reconnecting in {:?}...", backoff);
229                    tokio::time::sleep(backoff).await;
230                    backoff = (backoff * 2).min(Duration::from_secs(60));
231                    continue;
232                }
233
234                Err(e) => return Err(e),
235            }
236
237            backoff = Duration::from_secs(1);
238        }
239    }
240
241    async fn run_session(
242        &self,
243        session_id: &mut Option<String>,
244        resume_url: &mut Option<String>,
245        last_seq: &mut Option<u64>,
246    ) -> Result<LoopControl, ClientError> {
247        let gateway_url = if session_id.is_some() {
248            resume_url
249                .clone()
250                .unwrap_or_else(|| DEFAULT_GATEWAY_URL.to_string())
251        } else {
252            match self.http.get_gateway().await {
253                Ok(url) => {
254                    let base = url.trim_end_matches('/');
255                    format!("{}/?v=1&encoding=json", base)
256                }
257                Err(_) => DEFAULT_GATEWAY_URL.to_string(),
258            }
259        };
260
261        let (ws_stream, _) = connect_async(&gateway_url).await?;
262        let (write, mut read) = ws_stream.split();
263        let write = Arc::new(Mutex::new(write));
264        let seq_shared: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(*last_seq));
265        let ack_shared: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
266        let (gateway_tx, mut gateway_rx) = tokio::sync::mpsc::channel::<String>(64);
267        {
268            let write_fwd = write.clone();
269            tokio::spawn(async move {
270                while let Some(msg) = gateway_rx.recv().await {
271                    let mut guard = write_fwd.lock().await;
272                    if guard
273                        .send(WsMessage::Text(msg.into()))
274                        .await
275                        .is_err()
276                    {
277                        break;
278                    }
279                }
280            });
281        }
282
283        let ctx = Context {
284            http: self.http.clone(),
285            cache: self.cache.clone(),
286            gateway_tx: Arc::new(gateway_tx),
287            voice_states: Arc::new(Mutex::new(HashMap::new())),
288            live_rooms: Arc::new(Mutex::new(HashMap::new())),
289        };
290
291        let token = self.http.get_token().to_string();
292        if let (Some(sid), Some(seq)) = (session_id.as_deref(), *last_seq) {
293            let resume_payload = serde_json::json!({
294                "op": 6,
295                "d": { "token": token, "session_id": sid, "seq": seq }
296            });
297            write
298                .lock()
299                .await
300                .send(WsMessage::Text(resume_payload.to_string().into()))
301                .await?;
302        } else {
303            let identify = serde_json::json!({
304                "op": 2,
305                "d": {
306                    "token": token,
307                    "intents": 0,   // Fluxer has no intents yet
308                    "properties": {
309                        "os": "linux",
310                        "browser": "fluxer-rust",
311                        "device": "fluxer-rust"
312                    }
313                }
314            });
315            write
316                .lock()
317                .await
318                .send(WsMessage::Text(identify.to_string().into()))
319                .await?;
320        }
321
322        let handler = self.handler.clone();
323
324        while let Some(msg_result) = read.next().await {
325            let text = match msg_result? {
326                WsMessage::Text(t) => t,
327                WsMessage::Close(frame) => {
328                    let code = frame.as_ref().map(|f| u16::from(f.code)).unwrap_or(0);
329                    match code {
330                        4004 => {
331                            eprintln!("[fluxer-rs] Authentication failed (4004) — invalid token, shutting down.");
332                            return Ok(LoopControl::Done);
333                        }
334                        4010 => {
335                            eprintln!("[fluxer-rs] Invalid shard (4010) — shutting down.");
336                            return Ok(LoopControl::Done);
337                        }
338                        4011 => {
339                            eprintln!("[fluxer-rs] Sharding required (4011) — shutting down.");
340                            return Ok(LoopControl::Done);
341                        }
342                        4012 => {
343                            eprintln!("[fluxer-rs] Invalid API version (4012) — shutting down.");
344                            return Ok(LoopControl::Done);
345                        }
346                        _ => return Err(ClientError::ConnectionClosed),
347                    }
348                }
349                WsMessage::Ping(d) => {
350                    let _ = write.lock().await.send(WsMessage::Pong(d)).await;
351                    continue;
352                }
353                _ => continue,
354            };
355
356            let payload: Value = serde_json::from_str(text.as_str())?;
357            let op = payload["op"].as_u64().unwrap_or(255);
358
359            if let Some(s) = payload["s"].as_u64() {
360                *last_seq = Some(s);
361                *seq_shared.lock().await = Some(s);
362            }
363
364            match op {
365                10 => {
366                    let interval_ms = payload["d"]["heartbeat_interval"]
367                        .as_u64()
368                        .unwrap_or(41_250);
369
370                    let write_hb = write.clone();
371                    let seq_hb = seq_shared.clone();
372                    let ack_hb = ack_shared.clone();
373
374                    tokio::spawn(async move {
375                        let jitter = Duration::from_millis(
376                            (rand::random::<u64>() % interval_ms).max(1),
377                        );
378                        tokio::time::sleep(jitter).await;
379
380                        let mut ticker =
381                            tokio::time::interval(Duration::from_millis(interval_ms));
382                        loop {
383                            ticker.tick().await;
384
385                            {
386                                let mut ack = ack_hb.lock().await;
387                                if !*ack {
388                                    eprintln!(
389                                        "[fluxer-rs] No heartbeat ACK — zombie connection, dropping."
390                                    );
391                                    break;
392                                }
393                                *ack = false;
394                            }
395
396                            let seq = *seq_hb.lock().await;
397                            let hb = serde_json::json!({ "op": 1, "d": seq });
398                            let mut guard = write_hb.lock().await;
399                            if guard
400                                .send(WsMessage::Text(hb.to_string().into()))
401                                .await
402                                .is_err()
403                            {
404                                break;
405                            }
406                        }
407                    });
408                }
409
410                11 => {
411                    *ack_shared.lock().await = true;
412                }
413
414                0 => {
415                    let event_type = payload["t"].as_str().unwrap_or("").to_string();
416                    let data = payload["d"].clone();
417                    let ctx2 = ctx.clone();
418                    let handler2 = handler.clone();
419
420                    if event_type == "READY" {
421                        if let Some(sid) = data["session_id"].as_str() {
422                            *session_id = Some(sid.to_string());
423                        }
424                        if let Some(rurl) = data["resume_gateway_url"].as_str() {
425                            *resume_url = Some(format!(
426                                "{}/?v=1&encoding=json",
427                                rurl.trim_end_matches('/')
428                            ));
429                        }
430                    }
431
432                    tokio::spawn(async move {
433                        cache_update(&ctx2.cache, &event_type, &data).await;
434                        dispatch_event(event_type, data, ctx2, handler2).await;
435                    });
436                }
437
438                7 => {
439                    eprintln!("[fluxer-rs] Received op 7 Reconnect.");
440                    return Ok(LoopControl::Reconnect { resume: true });
441                }
442
443                9 => {
444                    let resumable = payload["d"].as_bool().unwrap_or(false);
445                    eprintln!("[fluxer-rs] Invalid session (resumable={resumable}).");
446                    return Ok(LoopControl::Reconnect { resume: resumable });
447                }
448
449                1 => {
450                    let seq = *seq_shared.lock().await;
451                    let hb = serde_json::json!({ "op": 1, "d": seq });
452                    let _ = write
453                        .lock()
454                        .await
455                        .send(WsMessage::Text(hb.to_string().into()))
456                        .await;
457                }
458
459                _ => {}
460            }
461        }
462
463        Err(ClientError::ConnectionClosed)
464    }
465}
466
467async fn cache_update(cache: &Cache, event_type: &str, data: &Value) {
468    match event_type {
469        "READY" => {
470            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
471                *cache.current_user.write().await = Some(user);
472            }
473        }
474        "GUILD_CREATE" => {
475            if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
476                let guild_id = guild.id.clone();
477
478                if let Some(channels) = data["channels"].as_array() {
479                    let mut ch_map = cache.channels.write().await;
480                    for ch_val in channels {
481                        if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
482                            ch_map.insert(ch.id.clone(), ch);
483                        }
484                    }
485                }
486                
487                if let Some(members) = data["members"].as_array() {
488                    let mut user_map = cache.users.write().await;
489                    for m_val in members {
490                        if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
491                            user_map.insert(user.id.clone(), user);
492                        }
493                    }
494                }
495                cache.guilds.write().await.insert(guild_id, guild);
496            }
497        }
498        "GUILD_UPDATE" => {
499            if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
500                cache.guilds.write().await.insert(guild.id.clone(), guild);
501            }
502        }
503        "GUILD_DELETE" => {
504            if let Some(id) = data["id"].as_str() {
505                cache.guilds.write().await.remove(id);
506            }
507        }
508        "CHANNEL_CREATE" | "CHANNEL_UPDATE" => {
509            if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(data.clone()) {
510                cache.channels.write().await.insert(ch.id.clone(), ch);
511            }
512        }
513        "CHANNEL_DELETE" => {
514            if let Some(id) = data["id"].as_str() {
515                cache.channels.write().await.remove(id);
516            }
517        }
518        "GUILD_MEMBER_ADD" | "GUILD_MEMBER_UPDATE" => {
519            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
520                cache.users.write().await.insert(user.id.clone(), user);
521            }
522        }
523        "MESSAGE_CREATE" => {
524            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["author"].clone()) {
525                cache.users.write().await.insert(user.id.clone(), user);
526            }
527        }
528        _ => {}
529    }
530}
531
532async fn dispatch_event(
533    event_type: String,
534    data: Value,
535    ctx: Context,
536    handler: Arc<dyn EventHandler>,
537) {
538    use crate::model::{
539        Channel, ChannelPinsUpdate, ChannelUpdateBulk, Guild, GuildBanAdd, GuildBanRemove,
540        GuildEmojisUpdate, GuildMemberAdd, GuildMemberRemove, GuildMemberUpdate,
541        GuildRoleCreate, GuildRoleDelete, GuildRoleUpdate, GuildRoleUpdateBulk,
542        GuildStickersUpdate, InviteCreate, InviteDelete, WebhooksUpdate,
543        Message, MessageDelete, MessageDeleteBulk, MessageUpdate, ReactionAdd,
544        ReactionRemove, ReactionRemoveAll, ReactionRemoveEmoji, Ready, TypingStart,
545        UnavailableGuild,
546    };
547    use crate::model::voice::VoiceState;
548
549    macro_rules! dispatch {
550        ($method:ident, $ty:ty) => {{
551            match serde_json::from_value::<$ty>(data.clone()) {
552                Ok(v) => handler.$method(ctx, v).await,
553                Err(e) => eprintln!(
554                    "[fluxer-rs] Failed to deserialize {} event: {}",
555                    stringify!($ty),
556                    e
557                ),
558            }
559        }};
560    }
561
562    match event_type.as_str() {
563        "READY"   => dispatch!(on_ready, Ready),
564        "RESUMED" => eprintln!("[fluxer-rs] Session resumed successfully."),
565        "MESSAGE_CREATE"      => dispatch!(on_message, Message),
566        "MESSAGE_UPDATE"      => dispatch!(on_message_update, MessageUpdate),
567        "MESSAGE_DELETE"      => dispatch!(on_message_delete, MessageDelete),
568        "MESSAGE_DELETE_BULK" => dispatch!(on_message_delete_bulk, MessageDeleteBulk),
569        "MESSAGE_REACTION_ADD"          => dispatch!(on_reaction_add, ReactionAdd),
570        "MESSAGE_REACTION_REMOVE"       => dispatch!(on_reaction_remove, ReactionRemove),
571        "MESSAGE_REACTION_REMOVE_ALL"   => dispatch!(on_reaction_remove_all, ReactionRemoveAll),
572        "MESSAGE_REACTION_REMOVE_EMOJI" => dispatch!(on_reaction_remove_emoji, ReactionRemoveEmoji),
573        "TYPING_START" => dispatch!(on_typing_start, TypingStart),
574        "CHANNEL_CREATE"      => dispatch!(on_channel_create, Channel),
575        "CHANNEL_UPDATE"      => dispatch!(on_channel_update, Channel),
576        "CHANNEL_DELETE"      => dispatch!(on_channel_delete, Channel),
577        "CHANNEL_PINS_UPDATE" => dispatch!(on_channel_pins_update, ChannelPinsUpdate),
578        "GUILD_CREATE" => dispatch!(on_guild_create, Guild),
579        "GUILD_UPDATE" => dispatch!(on_guild_update, Guild),
580        "GUILD_DELETE" => dispatch!(on_guild_delete, UnavailableGuild),
581        "GUILD_MEMBER_ADD"    => dispatch!(on_guild_member_add, GuildMemberAdd),
582        "GUILD_MEMBER_UPDATE" => dispatch!(on_guild_member_update, GuildMemberUpdate),
583        "GUILD_MEMBER_REMOVE" => dispatch!(on_guild_member_remove, GuildMemberRemove),
584        "GUILD_BAN_ADD"    => dispatch!(on_guild_ban_add, GuildBanAdd),
585        "GUILD_BAN_REMOVE" => dispatch!(on_guild_ban_remove, GuildBanRemove),
586        "GUILD_ROLE_CREATE"      => dispatch!(on_guild_role_create, GuildRoleCreate),
587        "GUILD_ROLE_UPDATE"      => dispatch!(on_guild_role_update, GuildRoleUpdate),
588        "GUILD_ROLE_UPDATE_BULK" => dispatch!(on_guild_role_update_bulk, GuildRoleUpdateBulk),
589        "GUILD_ROLE_DELETE"      => dispatch!(on_guild_role_delete, GuildRoleDelete),
590        "GUILD_EMOJIS_UPDATE"   => dispatch!(on_guild_emojis_update, GuildEmojisUpdate),
591        "GUILD_STICKERS_UPDATE" => dispatch!(on_guild_stickers_update, GuildStickersUpdate),
592        "CHANNEL_UPDATE_BULK" => dispatch!(on_channel_update_bulk, ChannelUpdateBulk),
593        "INVITE_CREATE" => dispatch!(on_invite_create, InviteCreate),
594        "INVITE_DELETE" => dispatch!(on_invite_delete, InviteDelete),
595        "WEBHOOKS_UPDATE" => dispatch!(on_webhooks_update, WebhooksUpdate),
596        "VOICE_STATE_UPDATE" => {
597            let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
598            let sess = data["session_id"].as_str().unwrap_or("").to_string();
599            if !guild_id.is_empty() && !sess.is_empty() {
600                let mut states = ctx.voice_states.lock().await;
601                let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
602                    token: String::new(),
603                    endpoint: String::new(),
604                    session_id: None,
605                });
606                entry.session_id = Some(sess);
607            }
608        }
609        "VOICE_SERVER_UPDATE" => {
610            let token = data["token"].as_str().unwrap_or("").to_string();
611            let endpoint = data["endpoint"].as_str().unwrap_or("").to_string();
612            let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
613            if !guild_id.is_empty() && !token.is_empty() && !endpoint.is_empty() {
614                let mut states = ctx.voice_states.lock().await;
615                let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
616                    token: String::new(),
617                    endpoint: String::new(),
618                    session_id: None,
619                });
620                entry.token = token;
621                entry.endpoint = if endpoint.starts_with("wss://")
622                    || endpoint.starts_with("https://")
623                {
624                    endpoint
625                } else {
626                    format!("wss://{}", endpoint)
627                };
628            }
629        }
630
631        "INTERACTION_CREATE"
632        | "SESSIONS_REPLACE"
633        | "STAGE_INSTANCE_CREATE"
634        | "STAGE_INSTANCE_UPDATE"
635        | "STAGE_INSTANCE_DELETE" => {}
636
637        other => {
638            eprintln!("[fluxer-rs] Unknown event: {}", other);
639        }
640    }
641}