Skip to main content

fluxer/client/
mod.rs

1//! Gateway client and connection management.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use futures::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
9use crate::cache::Cache;
10use crate::error::ClientError;
11use crate::event::EventHandler;
12use crate::http::Http;
13use crate::model::voice::VoiceState;
14use std::time::Duration;
15
16const DEFAULT_API_URL: &str = "https://api.fluxer.app/v1";
17const DEFAULT_GATEWAY_URL: &str = "wss://gateway.fluxer.app/?v=1&encoding=json";
18
19enum LoopControl {
20    Done,
21    Reconnect { resume: bool },
22}
23
24/// Shared state passed into every event handler call. This is how you interact
25/// with the API from inside your event handlers.
26///
27/// ```rust,no_run
28/// # use fluxer::prelude::*;
29/// # async fn example(ctx: Context) {
30/// ctx.http.send_message("channel_id", "Hello!").await.unwrap();
31/// # }
32/// ```
33#[derive(Clone)]
34pub struct Context {
35    /// HTTP client for REST API calls.
36    pub http: Arc<Http>,
37    /// In-memory cache populated automatically from gateway events.
38    pub cache: Arc<Cache>,
39    /// Raw gateway sender. You probably won't need this directly --
40    /// voice join/leave use it internally.
41    pub gateway_tx: Arc<tokio::sync::mpsc::Sender<String>>,
42    /// Per-guild voice state, populated from `VOICE_STATE_UPDATE` and `VOICE_SERVER_UPDATE`.
43    /// Used internally by `join_voice` / `leave_voice`.
44    pub voice_states: Arc<Mutex<HashMap<String, VoiceState>>>,
45    pub(crate) live_rooms: Arc<Mutex<HashMap<String, std::sync::Arc<livekit::Room>>>>,
46    pub(crate) handler: Arc<dyn EventHandler>,
47}
48
49impl Context {
50    /// Joins a voice channel. Sends an opcode 4 to the gateway and waits
51    /// up to 10 seconds for the server to send back connection details.
52    pub async fn join_voice(
53        &self,
54        guild_id: &str,
55        channel_id: &str,
56    ) -> Result<crate::voice::FluxerVoiceConnection, ClientError> {
57        {
58            let mut states = self.voice_states.lock().await;
59            states.remove(guild_id);
60        }
61
62        let join_payload = serde_json::json!({
63            "op": 4,
64            "d": {
65                "guild_id": guild_id,
66                "channel_id": channel_id,
67                "self_mute": false,
68                "self_deaf": false
69            }
70        });
71        self.gateway_tx
72            .send(join_payload.to_string())
73            .await
74            .map_err(|e| ClientError::Voice(e.to_string()))?;
75
76        let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
77        let voice_state = loop {
78            {
79                let states = self.voice_states.lock().await;
80                if let Some(vs) = states.get(guild_id) {
81                    if !vs.token.is_empty() && !vs.endpoint.is_empty() {
82                        break vs.clone();
83                    }
84                }
85            }
86            if tokio::time::Instant::now() >= deadline {
87                return Err(ClientError::Voice(
88                    "Timed out waiting for VOICE_SERVER_UPDATE".into(),
89                ));
90            }
91            tokio::time::sleep(Duration::from_millis(100)).await;
92        };
93
94        let conn = crate::voice::FluxerVoiceConnection::connect(
95            &voice_state.endpoint,
96            &voice_state.token,
97            self.clone(),
98        )
99        .await
100        .map_err(|e| ClientError::Voice(e.to_string()))?;
101
102        self.live_rooms.lock().await.insert(guild_id.to_string(), conn.room.clone());
103
104        Ok(conn)
105    }
106
107    /// Leaves a voice channel. Closes the LiveKit room and tells the gateway.
108    pub async fn leave_voice(&self, guild_id: &str) -> Result<(), ClientError> {
109        if let Some(room) = self.live_rooms.lock().await.remove(guild_id) {
110            let _ = room.close().await;
111        }
112
113        let payload = serde_json::json!({
114            "op": 4,
115            "d": {
116                "guild_id": guild_id,
117                "channel_id": null,
118                "self_mute": false,
119                "self_deaf": false
120            }
121        });
122        self.gateway_tx
123            .send(payload.to_string())
124            .await
125            .map_err(|e| ClientError::Voice(e.to_string()))?;
126        self.voice_states.lock().await.remove(guild_id);
127        Ok(())
128    }
129
130    /// Subscribes to a guild's events (op 14). Called automatically for all
131    /// guilds on READY, but you can call this manually if you join a new guild
132    /// after the initial connection (e.g. from inside `on_guild_create`).
133    pub async fn subscribe_guild(&self, guild_id: &str) -> Result<(), ClientError> {
134        let payload = serde_json::json!({
135            "op": 14,
136            "d": {
137                "subscriptions": {
138                    guild_id: { "active": true, "typing": true }
139                }
140            }
141        });
142        self.gateway_tx
143            .send(payload.to_string())
144            .await
145            .map_err(|e| ClientError::Voice(e.to_string()))
146    }
147}
148
149/// Builder for creating a [`Client`]. You need at minimum a token and an event handler.
150///
151/// ```rust,no_run
152/// use fluxer::prelude::*;
153/// # struct MyHandler;
154/// # #[async_trait::async_trait]
155/// # impl EventHandler for MyHandler {}
156///
157/// let client = Client::builder("token")
158///     .event_handler(MyHandler)
159///     .build();
160/// ```
161pub struct ClientBuilder {
162    token: String,
163    api_url: String,
164    handler: Option<Arc<dyn EventHandler>>,
165    user_token: bool,
166}
167
168impl ClientBuilder {
169    pub fn new(token: impl Into<String>) -> Self {
170        Self {
171            token: token.into(),
172            api_url: DEFAULT_API_URL.to_string(),
173            handler: None,
174            user_token: false,
175        }
176    }
177
178    /// Use a user token instead of a bot token (no `Bot ` prefix on HTTP requests).
179    pub fn user_token(mut self) -> Self {
180        self.user_token = true;
181        self
182    }
183
184    /// Sets the event handler. Required -- the builder panics at `.build()` without this.
185    pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
186        self.handler = Some(Arc::new(handler));
187        self
188    }
189
190    /// Override the API base URL. Defaults to `https://api.fluxer.app/v1`.
191    pub fn api_url(mut self, url: impl Into<String>) -> Self {
192        self.api_url = url.into();
193        self
194    }
195
196    pub fn build(self) -> Client {
197        let _ = rustls::crypto::ring::default_provider().install_default();
198        let http = if self.user_token {
199            Arc::new(Http::new_user(&self.token, self.api_url))
200        } else {
201            Arc::new(Http::new(&self.token, self.api_url))
202        };
203        Client {
204            http,
205            cache: Cache::new(),
206            handler: self.handler.expect("call .event_handler() before .build()"),
207        }
208    }
209}
210
211/// The gateway client. Manages the WebSocket connection, heartbeating,
212/// reconnection, and event dispatch.
213///
214/// Call [`start`](Client::start) to connect. It runs until a fatal error
215/// happens (like an invalid token) and reconnects automatically on transient
216/// failures.
217pub struct Client {
218    pub(crate) http: Arc<Http>,
219    cache: Arc<Cache>,
220    handler: Arc<dyn EventHandler>,
221}
222
223impl Client {
224    pub fn builder(token: impl Into<String>) -> ClientBuilder {
225        ClientBuilder::new(token)
226    }
227
228    /// Connects to the gateway and starts processing events. Blocks forever
229    /// unless a fatal error occurs.
230    pub async fn start(&mut self) -> Result<(), ClientError> {
231        let mut session_id: Option<String> = None;
232        let mut resume_url: Option<String> = None;
233        let mut last_seq: Option<u64> = None;
234        let mut backoff = Duration::from_secs(1);
235
236        loop {
237            let result = self
238                .run_session(&mut session_id, &mut resume_url, &mut last_seq)
239                .await;
240
241            match result {
242                Ok(LoopControl::Done) => return Ok(()),
243
244                Ok(LoopControl::Reconnect { resume }) => {
245                    if !resume {
246                        session_id = None;
247                        resume_url = None;
248                        last_seq = None;
249                        let jitter = Duration::from_millis(1000 + (rand::random::<u64>() % 4000));
250                        tokio::time::sleep(jitter).await;
251                    } else {
252                        eprintln!("[fluxer-rs] Reconnecting in {:?} (will resume)...", backoff);
253                        tokio::time::sleep(backoff).await;
254                        backoff = (backoff * 2).min(Duration::from_secs(60));
255                        continue;
256                    }
257                }
258
259                Err(ClientError::ConnectionClosed) => {
260                    eprintln!("[fluxer-rs] Connection closed, reconnecting in {:?}...", backoff);
261                    tokio::time::sleep(backoff).await;
262                    backoff = (backoff * 2).min(Duration::from_secs(60));
263                    continue;
264                }
265
266                Err(e) => return Err(e),
267            }
268
269            backoff = Duration::from_secs(1);
270        }
271    }
272
273    async fn run_session(
274        &self,
275        session_id: &mut Option<String>,
276        resume_url: &mut Option<String>,
277        last_seq: &mut Option<u64>,
278    ) -> Result<LoopControl, ClientError> {
279        let gateway_url = if session_id.is_some() {
280            resume_url
281                .clone()
282                .unwrap_or_else(|| DEFAULT_GATEWAY_URL.to_string())
283        } else {
284            match self.http.get_gateway().await {
285                Ok(url) => {
286                    let base = url.trim_end_matches('/');
287                    format!("{}/?v=1&encoding=json", base)
288                }
289                Err(_) => DEFAULT_GATEWAY_URL.to_string(),
290            }
291        };
292
293        let (ws_stream, _) = connect_async(&gateway_url).await?;
294        let (write, mut read) = ws_stream.split();
295        let write = Arc::new(Mutex::new(write));
296        let seq_shared: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(*last_seq));
297        let ack_shared: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
298        let (gateway_tx, mut gateway_rx) = tokio::sync::mpsc::channel::<String>(64);
299        {
300            let write_fwd = write.clone();
301            tokio::spawn(async move {
302                while let Some(msg) = gateway_rx.recv().await {
303                    let mut guard = write_fwd.lock().await;
304                    if guard
305                        .send(WsMessage::Text(msg.into()))
306                        .await
307                        .is_err()
308                    {
309                        break;
310                    }
311                }
312            });
313        }
314
315        let ctx = Context {
316            http: self.http.clone(),
317            cache: self.cache.clone(),
318            gateway_tx: Arc::new(gateway_tx),
319            voice_states: Arc::new(Mutex::new(HashMap::new())),
320            live_rooms: Arc::new(Mutex::new(HashMap::new())),
321            handler: self.handler.clone(),
322        };
323
324        let token = self.http.get_token().to_string();
325        if let (Some(sid), Some(seq)) = (session_id.as_deref(), *last_seq) {
326            let resume_payload = serde_json::json!({
327                "op": 6,
328                "d": { "token": token, "session_id": sid, "seq": seq }
329            });
330            write
331                .lock()
332                .await
333                .send(WsMessage::Text(resume_payload.to_string().into()))
334                .await?;
335        } else {
336            let identify = serde_json::json!({
337                "op": 2,
338                "d": {
339                    "token": token,
340                    "intents": 0,   // Fluxer has no intents yet
341                    "properties": {
342                        "os": "linux",
343                        "browser": "fluxer-rust",
344                        "device": "fluxer-rust"
345                    }
346                }
347            });
348            write
349                .lock()
350                .await
351                .send(WsMessage::Text(identify.to_string().into()))
352                .await?;
353        }
354
355        let handler = self.handler.clone();
356
357        while let Some(msg_result) = read.next().await {
358            let text = match msg_result? {
359                WsMessage::Text(t) => t,
360                WsMessage::Close(frame) => {
361                    let code = frame.as_ref().map(|f| u16::from(f.code)).unwrap_or(0);
362                    match code {
363                        4004 => {
364                            eprintln!("[fluxer-rs] Authentication failed (4004) — invalid token, shutting down.");
365                            return Ok(LoopControl::Done);
366                        }
367                        4010 => {
368                            eprintln!("[fluxer-rs] Invalid shard (4010) — shutting down.");
369                            return Ok(LoopControl::Done);
370                        }
371                        4011 => {
372                            eprintln!("[fluxer-rs] Sharding required (4011) — shutting down.");
373                            return Ok(LoopControl::Done);
374                        }
375                        4012 => {
376                            eprintln!("[fluxer-rs] Invalid API version (4012) — shutting down.");
377                            return Ok(LoopControl::Done);
378                        }
379                        _ => return Err(ClientError::ConnectionClosed),
380                    }
381                }
382                WsMessage::Ping(d) => {
383                    let _ = write.lock().await.send(WsMessage::Pong(d)).await;
384                    continue;
385                }
386                _ => continue,
387            };
388
389            let payload: Value = serde_json::from_str(text.as_str())?;
390            let op = payload["op"].as_u64().unwrap_or(255);
391
392            if let Some(s) = payload["s"].as_u64() {
393                *last_seq = Some(s);
394                *seq_shared.lock().await = Some(s);
395            }
396
397            match op {
398                10 => {
399                    let interval_ms = payload["d"]["heartbeat_interval"]
400                        .as_u64()
401                        .unwrap_or(41_250);
402
403                    let write_hb = write.clone();
404                    let seq_hb = seq_shared.clone();
405                    let ack_hb = ack_shared.clone();
406
407                    tokio::spawn(async move {
408                        let jitter = Duration::from_millis(
409                            (rand::random::<u64>() % interval_ms).max(1),
410                        );
411                        tokio::time::sleep(jitter).await;
412
413                        let mut ticker =
414                            tokio::time::interval(Duration::from_millis(interval_ms));
415                        loop {
416                            ticker.tick().await;
417
418                            {
419                                let mut ack = ack_hb.lock().await;
420                                if !*ack {
421                                    eprintln!(
422                                        "[fluxer-rs] No heartbeat ACK — zombie connection, dropping."
423                                    );
424                                    break;
425                                }
426                                *ack = false;
427                            }
428
429                            let seq = *seq_hb.lock().await;
430                            let hb = serde_json::json!({ "op": 1, "d": seq });
431                            let mut guard = write_hb.lock().await;
432                            if guard
433                                .send(WsMessage::Text(hb.to_string().into()))
434                                .await
435                                .is_err()
436                            {
437                                break;
438                            }
439                        }
440                    });
441                }
442
443                11 => {
444                    *ack_shared.lock().await = true;
445                }
446
447                0 => {
448                    let event_type = payload["t"].as_str().unwrap_or("").to_string();
449                    let data = payload["d"].clone();
450                    let ctx2 = ctx.clone();
451                    let handler2 = handler.clone();
452
453                    if event_type == "READY" {
454                        if let Some(sid) = data["session_id"].as_str() {
455                            *session_id = Some(sid.to_string());
456                        }
457                        if let Some(rurl) = data["resume_gateway_url"].as_str() {
458                            *resume_url = Some(format!(
459                                "{}/?v=1&encoding=json",
460                                rurl.trim_end_matches('/')
461                            ));
462                        }
463                        // Subscribe to all guilds (op 14) so events are received.
464                        // Without this, self-bots only receive DM and system events.
465                        if let Some(guilds) = data["guilds"].as_array() {
466                            let mut subscriptions = serde_json::Map::new();
467                            for g in guilds {
468                                if let Some(id) = g["id"].as_str() {
469                                    subscriptions.insert(
470                                        id.to_string(),
471                                        serde_json::json!({ "active": true, "typing": true }),
472                                    );
473                                }
474                            }
475                            if !subscriptions.is_empty() {
476                                let lazy = serde_json::json!({
477                                    "op": 14,
478                                    "d": { "subscriptions": subscriptions }
479                                });
480                                let _ = ctx.gateway_tx.send(lazy.to_string()).await;
481                            }
482                        }
483                    }
484
485                    tokio::spawn(async move {
486                        cache_update(&ctx2.cache, &event_type, &data).await;
487                        dispatch_event(event_type, data, ctx2, handler2).await;
488                    });
489                }
490
491                7 => {
492                    eprintln!("[fluxer-rs] Received op 7 Reconnect.");
493                    return Ok(LoopControl::Reconnect { resume: true });
494                }
495
496                9 => {
497                    let resumable = payload["d"].as_bool().unwrap_or(false);
498                    eprintln!("[fluxer-rs] Invalid session (resumable={resumable}).");
499                    return Ok(LoopControl::Reconnect { resume: resumable });
500                }
501
502                1 => {
503                    let seq = *seq_shared.lock().await;
504                    let hb = serde_json::json!({ "op": 1, "d": seq });
505                    let _ = write
506                        .lock()
507                        .await
508                        .send(WsMessage::Text(hb.to_string().into()))
509                        .await;
510                }
511
512                _ => {}
513            }
514        }
515
516        Err(ClientError::ConnectionClosed)
517    }
518}
519
520async fn cache_update(cache: &Cache, event_type: &str, data: &Value) {
521    match event_type {
522        "READY" => {
523            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
524                *cache.current_user.write().await = Some(user);
525            }
526        }
527        "GUILD_CREATE" => {
528            if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
529                let guild_id = guild.id.clone();
530
531                if let Some(channels) = data["channels"].as_array() {
532                    let mut ch_map = cache.channels.write().await;
533                    for ch_val in channels {
534                        if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
535                            ch_map.insert(ch.id.clone(), ch);
536                        }
537                    }
538                }
539                
540                if let Some(members) = data["members"].as_array() {
541                    let mut user_map = cache.users.write().await;
542                    for m_val in members {
543                        if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
544                            user_map.insert(user.id.clone(), user);
545                        }
546                    }
547                }
548                cache.guilds.write().await.insert(guild_id, guild);
549            }
550        }
551        "GUILD_SYNC" => {
552            if let Some(channels) = data["channels"].as_array() {
553                let mut ch_map = cache.channels.write().await;
554                for ch_val in channels {
555                    if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
556                        ch_map.insert(ch.id.clone(), ch);
557                    }
558                }
559            }
560            if let Some(members) = data["members"].as_array() {
561                let mut user_map = cache.users.write().await;
562                for m_val in members {
563                    if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
564                        user_map.insert(user.id.clone(), user);
565                    }
566                }
567            }
568        }
569        "GUILD_UPDATE" => {
570            if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
571                cache.guilds.write().await.insert(guild.id.clone(), guild);
572            }
573        }
574        "GUILD_DELETE" => {
575            if let Some(id) = data["id"].as_str() {
576                cache.guilds.write().await.remove(id);
577            }
578        }
579        "CHANNEL_CREATE" | "CHANNEL_UPDATE" => {
580            if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(data.clone()) {
581                cache.channels.write().await.insert(ch.id.clone(), ch);
582            }
583        }
584        "CHANNEL_DELETE" => {
585            if let Some(id) = data["id"].as_str() {
586                cache.channels.write().await.remove(id);
587            }
588        }
589        "GUILD_MEMBER_ADD" | "GUILD_MEMBER_UPDATE" => {
590            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
591                cache.users.write().await.insert(user.id.clone(), user);
592            }
593        }
594        "MESSAGE_CREATE" => {
595            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["author"].clone()) {
596                cache.users.write().await.insert(user.id.clone(), user);
597            }
598        }
599        _ => {}
600    }
601}
602
603async fn dispatch_event(
604    event_type: String,
605    data: Value,
606    ctx: Context,
607    handler: Arc<dyn EventHandler>,
608) {
609    use crate::model::{
610        AuthSessionChange, CallCreate, CallDelete, CallUpdate, Channel, ChannelPinsAck,
611        ChannelPinsUpdate, ChannelRecipientAdd, ChannelRecipientRemove, ChannelUpdateBulk, Guild,
612        GuildAuditLogEntryCreate, GuildBanAdd, GuildBanRemove, GuildEmojisUpdate, GuildMemberAdd,
613        GuildMemberListUpdate, GuildMemberRemove, GuildMembersChunk, GuildMemberUpdate,
614        GuildRoleCreate, GuildRoleDelete, GuildRoleUpdate, GuildRoleUpdateBulk, GuildStickersUpdate,
615        GuildSync, InviteCreate, InviteDelete, Message, MessageAck, MessageDelete, MessageDeleteBulk,
616        MessageReactionAddMany, MessageUpdate, PassiveUpdates, PresenceUpdate, PresenceUpdateBulk,
617        ReactionAdd, ReactionRemove, ReactionRemoveAll, ReactionRemoveEmoji, Ready,
618        RecentMentionDelete, RelationshipAdd, RelationshipRemove, RelationshipUpdate, Resumed,
619        SavedMessageCreate, SavedMessageDelete, SessionsReplace, TypingStart, UnavailableGuild,
620        UserConnectionsUpdate, UserGuildSettingsUpdate, UserNoteUpdate, UserPinnedDmsUpdate,
621        UserSettingsUpdate, UserUpdate, VoiceServerUpdate, VoiceStateUpdate, WebhooksUpdate,
622    };
623    use crate::model::voice::VoiceState;
624
625    macro_rules! dispatch {
626        ($method:ident, $ty:ty) => {{
627            match serde_json::from_value::<$ty>(data.clone()) {
628                Ok(v) => handler.$method(ctx, v).await,
629                Err(e) => eprintln!(
630                    "[fluxer-rs] Failed to deserialize {} event: {}",
631                    stringify!($ty),
632                    e
633                ),
634            }
635        }};
636    }
637
638    match event_type.as_str() {
639        "READY"   => dispatch!(on_ready, Ready),
640        "RESUMED" => dispatch!(on_resumed, Resumed),
641        "MESSAGE_CREATE"      => dispatch!(on_message, Message),
642        "MESSAGE_UPDATE"      => dispatch!(on_message_update, MessageUpdate),
643        "MESSAGE_DELETE"      => dispatch!(on_message_delete, MessageDelete),
644        "MESSAGE_DELETE_BULK" => dispatch!(on_message_delete_bulk, MessageDeleteBulk),
645        "MESSAGE_REACTION_ADD"          => dispatch!(on_reaction_add, ReactionAdd),
646        "MESSAGE_REACTION_REMOVE"       => dispatch!(on_reaction_remove, ReactionRemove),
647        "MESSAGE_REACTION_REMOVE_ALL"   => dispatch!(on_reaction_remove_all, ReactionRemoveAll),
648        "MESSAGE_REACTION_REMOVE_EMOJI" => dispatch!(on_reaction_remove_emoji, ReactionRemoveEmoji),
649        "TYPING_START" => dispatch!(on_typing_start, TypingStart),
650        "CHANNEL_CREATE"      => dispatch!(on_channel_create, Channel),
651        "CHANNEL_UPDATE"      => dispatch!(on_channel_update, Channel),
652        "CHANNEL_DELETE"      => dispatch!(on_channel_delete, Channel),
653        "CHANNEL_PINS_UPDATE" => dispatch!(on_channel_pins_update, ChannelPinsUpdate),
654        "CHANNEL_PINS_ACK"    => dispatch!(on_channel_pins_ack, ChannelPinsAck),
655        "GUILD_CREATE" => dispatch!(on_guild_create, Guild),
656        "GUILD_UPDATE" => dispatch!(on_guild_update, Guild),
657        "GUILD_DELETE" => dispatch!(on_guild_delete, UnavailableGuild),
658        "GUILD_MEMBER_ADD"    => dispatch!(on_guild_member_add, GuildMemberAdd),
659        "GUILD_MEMBER_UPDATE" => dispatch!(on_guild_member_update, GuildMemberUpdate),
660        "GUILD_MEMBER_REMOVE" => dispatch!(on_guild_member_remove, GuildMemberRemove),
661        "GUILD_BAN_ADD"    => dispatch!(on_guild_ban_add, GuildBanAdd),
662        "GUILD_BAN_REMOVE" => dispatch!(on_guild_ban_remove, GuildBanRemove),
663        "GUILD_ROLE_CREATE"      => dispatch!(on_guild_role_create, GuildRoleCreate),
664        "GUILD_ROLE_UPDATE"      => dispatch!(on_guild_role_update, GuildRoleUpdate),
665        "GUILD_ROLE_UPDATE_BULK" => dispatch!(on_guild_role_update_bulk, GuildRoleUpdateBulk),
666        "GUILD_ROLE_DELETE"      => dispatch!(on_guild_role_delete, GuildRoleDelete),
667        "GUILD_EMOJIS_UPDATE"   => dispatch!(on_guild_emojis_update, GuildEmojisUpdate),
668        "GUILD_STICKERS_UPDATE" => dispatch!(on_guild_stickers_update, GuildStickersUpdate),
669        "GUILD_AUDIT_LOG_ENTRY_CREATE" => dispatch!(on_guild_audit_log_entry_create, GuildAuditLogEntryCreate),
670        "CHANNEL_UPDATE_BULK" => dispatch!(on_channel_update_bulk, ChannelUpdateBulk),
671        "INVITE_CREATE" => dispatch!(on_invite_create, InviteCreate),
672        "INVITE_DELETE" => dispatch!(on_invite_delete, InviteDelete),
673        "WEBHOOKS_UPDATE" => dispatch!(on_webhooks_update, WebhooksUpdate),
674        "VOICE_STATE_UPDATE" => {
675            let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
676            let sess = data["session_id"].as_str().unwrap_or("").to_string();
677            if !guild_id.is_empty() && !sess.is_empty() {
678                let mut states = ctx.voice_states.lock().await;
679                let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
680                    token: String::new(),
681                    endpoint: String::new(),
682                    session_id: None,
683                });
684                entry.session_id = Some(sess);
685            }
686            match serde_json::from_value::<VoiceStateUpdate>(data.clone()) {
687                Ok(v) => handler.on_voice_state_update(ctx, v).await,
688                Err(e) => eprintln!("[fluxer-rs] Failed to deserialize VOICE_STATE_UPDATE: {}", e),
689            }
690        }
691        "VOICE_SERVER_UPDATE" => {
692            let token = data["token"].as_str().unwrap_or("").to_string();
693            let endpoint = data["endpoint"].as_str().unwrap_or("").to_string();
694            let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
695            if !guild_id.is_empty() && !token.is_empty() && !endpoint.is_empty() {
696                let mut states = ctx.voice_states.lock().await;
697                let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
698                    token: String::new(),
699                    endpoint: String::new(),
700                    session_id: None,
701                });
702                entry.token = token.clone();
703                entry.endpoint = if endpoint.starts_with("wss://")
704                    || endpoint.starts_with("https://")
705                {
706                    endpoint.clone()
707                } else {
708                    format!("wss://{}", endpoint)
709                };
710            }
711            match serde_json::from_value::<VoiceServerUpdate>(data.clone()) {
712                Ok(v) => handler.on_voice_server_update(ctx, v).await,
713                Err(e) => eprintln!("[fluxer-rs] Failed to deserialize VOICE_SERVER_UPDATE: {}", e),
714            }
715        }
716
717        "PRESENCE_UPDATE"       => dispatch!(on_presence_update, PresenceUpdate),
718        "PRESENCE_UPDATE_BULK"  => dispatch!(on_presence_update_bulk, PresenceUpdateBulk),
719        "USER_SETTINGS_UPDATE"  => dispatch!(on_user_settings_update, UserSettingsUpdate),
720        "USER_UPDATE"           => dispatch!(on_user_update, UserUpdate),
721        "USER_GUILD_SETTINGS_UPDATE" => dispatch!(on_user_guild_settings_update, UserGuildSettingsUpdate),
722        "USER_PINNED_DMS_UPDATE"     => dispatch!(on_user_pinned_dms_update, UserPinnedDmsUpdate),
723        "USER_NOTE_UPDATE"           => dispatch!(on_user_note_update, UserNoteUpdate),
724        "USER_CONNECTIONS_UPDATE"    => dispatch!(on_user_connections_update, UserConnectionsUpdate),
725        "AUTH_SESSION_CHANGE"        => dispatch!(on_auth_session_change, AuthSessionChange),
726        "MESSAGE_ACK"           => dispatch!(on_message_ack, MessageAck),
727        "SESSIONS_REPLACE" => {
728            match serde_json::from_value::<Vec<crate::model::SessionEntry>>(data.clone()) {
729                Ok(v) => handler.on_sessions_replace(ctx, SessionsReplace(v)).await,
730                Err(e) => eprintln!("[fluxer-rs] Failed to deserialize SESSIONS_REPLACE: {}", e),
731            }
732        }
733        "RELATIONSHIP_ADD"    => dispatch!(on_relationship_add, RelationshipAdd),
734        "RELATIONSHIP_UPDATE" => dispatch!(on_relationship_update, RelationshipUpdate),
735        "RELATIONSHIP_REMOVE" => dispatch!(on_relationship_remove, RelationshipRemove),
736        "CALL_CREATE" => dispatch!(on_call_create, CallCreate),
737        "CALL_UPDATE" => dispatch!(on_call_update, CallUpdate),
738        "CALL_DELETE" => dispatch!(on_call_delete, CallDelete),
739        "CHANNEL_RECIPIENT_ADD"    => dispatch!(on_channel_recipient_add, ChannelRecipientAdd),
740        "CHANNEL_RECIPIENT_REMOVE" => dispatch!(on_channel_recipient_remove, ChannelRecipientRemove),
741        "MESSAGE_REACTION_ADD_MANY" => dispatch!(on_message_reaction_add_many, MessageReactionAddMany),
742        "RECENT_MENTION_DELETE"     => dispatch!(on_recent_mention_delete, RecentMentionDelete),
743        "SAVED_MESSAGE_CREATE"      => dispatch!(on_saved_message_create, SavedMessageCreate),
744        "SAVED_MESSAGE_DELETE"      => dispatch!(on_saved_message_delete, SavedMessageDelete),
745        "PASSIVE_UPDATES"           => dispatch!(on_passive_updates, PassiveUpdates),
746        "GUILD_MEMBER_LIST_UPDATE"  => dispatch!(on_guild_member_list_update, GuildMemberListUpdate),
747        "GUILD_MEMBERS_CHUNK"       => dispatch!(on_guild_members_chunk, GuildMembersChunk),
748        "GUILD_SYNC"                => dispatch!(on_guild_sync, GuildSync),
749
750        "INTERACTION_CREATE"
751        | "STAGE_INSTANCE_CREATE"
752        | "STAGE_INSTANCE_UPDATE"
753        | "STAGE_INSTANCE_DELETE" => {}
754
755        other => {
756            handler.on_unknown_event(ctx, other.to_string(), data).await;
757        }
758    }
759}