1use std::collections::HashMap;
4use std::sync::Arc;
5use futures::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
9use crate::cache::Cache;
10use crate::error::ClientError;
11use crate::event::EventHandler;
12use crate::http::Http;
13use crate::model::voice::VoiceState;
14use std::time::Duration;
15
16const DEFAULT_API_URL: &str = "https://api.fluxer.app/v1";
17const DEFAULT_GATEWAY_URL: &str = "wss://gateway.fluxer.app/?v=1&encoding=json";
18
19enum LoopControl {
20 Done,
21 Reconnect { resume: bool },
22}
23
24#[derive(Clone)]
34pub struct Context {
35 pub http: Arc<Http>,
37 pub cache: Arc<Cache>,
39 pub gateway_tx: Arc<tokio::sync::mpsc::Sender<String>>,
42 pub voice_states: Arc<Mutex<HashMap<String, VoiceState>>>,
45 pub(crate) live_rooms: Arc<Mutex<HashMap<String, std::sync::Arc<livekit::Room>>>>,
46 pub(crate) handler: Arc<dyn EventHandler>,
47}
48
49impl Context {
50 pub async fn join_voice(
53 &self,
54 guild_id: &str,
55 channel_id: &str,
56 ) -> Result<crate::voice::FluxerVoiceConnection, ClientError> {
57 {
58 let mut states = self.voice_states.lock().await;
59 states.remove(guild_id);
60 }
61
62 let join_payload = serde_json::json!({
63 "op": 4,
64 "d": {
65 "guild_id": guild_id,
66 "channel_id": channel_id,
67 "self_mute": false,
68 "self_deaf": false
69 }
70 });
71 self.gateway_tx
72 .send(join_payload.to_string())
73 .await
74 .map_err(|e| ClientError::Voice(e.to_string()))?;
75
76 let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
77 let voice_state = loop {
78 {
79 let states = self.voice_states.lock().await;
80 if let Some(vs) = states.get(guild_id) {
81 if !vs.token.is_empty() && !vs.endpoint.is_empty() {
82 break vs.clone();
83 }
84 }
85 }
86 if tokio::time::Instant::now() >= deadline {
87 return Err(ClientError::Voice(
88 "Timed out waiting for VOICE_SERVER_UPDATE".into(),
89 ));
90 }
91 tokio::time::sleep(Duration::from_millis(100)).await;
92 };
93
94 let conn = crate::voice::FluxerVoiceConnection::connect(
95 &voice_state.endpoint,
96 &voice_state.token,
97 self.clone(),
98 )
99 .await
100 .map_err(|e| ClientError::Voice(e.to_string()))?;
101
102 self.live_rooms.lock().await.insert(guild_id.to_string(), conn.room.clone());
103
104 Ok(conn)
105 }
106
107 pub async fn leave_voice(&self, guild_id: &str) -> Result<(), ClientError> {
109 if let Some(room) = self.live_rooms.lock().await.remove(guild_id) {
110 let _ = room.close().await;
111 }
112
113 let payload = serde_json::json!({
114 "op": 4,
115 "d": {
116 "guild_id": guild_id,
117 "channel_id": null,
118 "self_mute": false,
119 "self_deaf": false
120 }
121 });
122 self.gateway_tx
123 .send(payload.to_string())
124 .await
125 .map_err(|e| ClientError::Voice(e.to_string()))?;
126 self.voice_states.lock().await.remove(guild_id);
127 Ok(())
128 }
129
130 pub async fn subscribe_guild(&self, guild_id: &str) -> Result<(), ClientError> {
134 let payload = serde_json::json!({
135 "op": 14,
136 "d": {
137 "subscriptions": {
138 guild_id: { "active": true, "typing": true }
139 }
140 }
141 });
142 self.gateway_tx
143 .send(payload.to_string())
144 .await
145 .map_err(|e| ClientError::Voice(e.to_string()))
146 }
147}
148
149pub struct ClientBuilder {
162 token: String,
163 api_url: String,
164 handler: Option<Arc<dyn EventHandler>>,
165 user_token: bool,
166}
167
168impl ClientBuilder {
169 pub fn new(token: impl Into<String>) -> Self {
170 Self {
171 token: token.into(),
172 api_url: DEFAULT_API_URL.to_string(),
173 handler: None,
174 user_token: false,
175 }
176 }
177
178 pub fn user_token(mut self) -> Self {
180 self.user_token = true;
181 self
182 }
183
184 pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
186 self.handler = Some(Arc::new(handler));
187 self
188 }
189
190 pub fn api_url(mut self, url: impl Into<String>) -> Self {
192 self.api_url = url.into();
193 self
194 }
195
196 pub fn build(self) -> Client {
197 let _ = rustls::crypto::ring::default_provider().install_default();
198 let http = if self.user_token {
199 Arc::new(Http::new_user(&self.token, self.api_url))
200 } else {
201 Arc::new(Http::new(&self.token, self.api_url))
202 };
203 Client {
204 http,
205 cache: Cache::new(),
206 handler: self.handler.expect("call .event_handler() before .build()"),
207 }
208 }
209}
210
211pub struct Client {
218 pub(crate) http: Arc<Http>,
219 cache: Arc<Cache>,
220 handler: Arc<dyn EventHandler>,
221}
222
223impl Client {
224 pub fn builder(token: impl Into<String>) -> ClientBuilder {
225 ClientBuilder::new(token)
226 }
227
228 pub async fn start(&mut self) -> Result<(), ClientError> {
231 let mut session_id: Option<String> = None;
232 let mut resume_url: Option<String> = None;
233 let mut last_seq: Option<u64> = None;
234 let mut backoff = Duration::from_secs(1);
235
236 loop {
237 let result = self
238 .run_session(&mut session_id, &mut resume_url, &mut last_seq)
239 .await;
240
241 match result {
242 Ok(LoopControl::Done) => return Ok(()),
243
244 Ok(LoopControl::Reconnect { resume }) => {
245 if !resume {
246 session_id = None;
247 resume_url = None;
248 last_seq = None;
249 let jitter = Duration::from_millis(1000 + (rand::random::<u64>() % 4000));
250 tokio::time::sleep(jitter).await;
251 } else {
252 eprintln!("[fluxer-rs] Reconnecting in {:?} (will resume)...", backoff);
253 tokio::time::sleep(backoff).await;
254 backoff = (backoff * 2).min(Duration::from_secs(60));
255 continue;
256 }
257 }
258
259 Err(ClientError::ConnectionClosed) => {
260 eprintln!("[fluxer-rs] Connection closed, reconnecting in {:?}...", backoff);
261 tokio::time::sleep(backoff).await;
262 backoff = (backoff * 2).min(Duration::from_secs(60));
263 continue;
264 }
265
266 Err(e) => return Err(e),
267 }
268
269 backoff = Duration::from_secs(1);
270 }
271 }
272
273 async fn run_session(
274 &self,
275 session_id: &mut Option<String>,
276 resume_url: &mut Option<String>,
277 last_seq: &mut Option<u64>,
278 ) -> Result<LoopControl, ClientError> {
279 let gateway_url = if session_id.is_some() {
280 resume_url
281 .clone()
282 .unwrap_or_else(|| DEFAULT_GATEWAY_URL.to_string())
283 } else {
284 match self.http.get_gateway().await {
285 Ok(url) => {
286 let base = url.trim_end_matches('/');
287 format!("{}/?v=1&encoding=json", base)
288 }
289 Err(_) => DEFAULT_GATEWAY_URL.to_string(),
290 }
291 };
292
293 let (ws_stream, _) = connect_async(&gateway_url).await?;
294 let (write, mut read) = ws_stream.split();
295 let write = Arc::new(Mutex::new(write));
296 let seq_shared: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(*last_seq));
297 let ack_shared: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
298 let (gateway_tx, mut gateway_rx) = tokio::sync::mpsc::channel::<String>(64);
299 {
300 let write_fwd = write.clone();
301 tokio::spawn(async move {
302 while let Some(msg) = gateway_rx.recv().await {
303 let mut guard = write_fwd.lock().await;
304 if guard
305 .send(WsMessage::Text(msg.into()))
306 .await
307 .is_err()
308 {
309 break;
310 }
311 }
312 });
313 }
314
315 let ctx = Context {
316 http: self.http.clone(),
317 cache: self.cache.clone(),
318 gateway_tx: Arc::new(gateway_tx),
319 voice_states: Arc::new(Mutex::new(HashMap::new())),
320 live_rooms: Arc::new(Mutex::new(HashMap::new())),
321 handler: self.handler.clone(),
322 };
323
324 let token = self.http.get_token().to_string();
325 if let (Some(sid), Some(seq)) = (session_id.as_deref(), *last_seq) {
326 let resume_payload = serde_json::json!({
327 "op": 6,
328 "d": { "token": token, "session_id": sid, "seq": seq }
329 });
330 write
331 .lock()
332 .await
333 .send(WsMessage::Text(resume_payload.to_string().into()))
334 .await?;
335 } else {
336 let identify = serde_json::json!({
337 "op": 2,
338 "d": {
339 "token": token,
340 "intents": 0, "properties": {
342 "os": "linux",
343 "browser": "fluxer-rust",
344 "device": "fluxer-rust"
345 }
346 }
347 });
348 write
349 .lock()
350 .await
351 .send(WsMessage::Text(identify.to_string().into()))
352 .await?;
353 }
354
355 let handler = self.handler.clone();
356
357 while let Some(msg_result) = read.next().await {
358 let text = match msg_result? {
359 WsMessage::Text(t) => t,
360 WsMessage::Close(frame) => {
361 let code = frame.as_ref().map(|f| u16::from(f.code)).unwrap_or(0);
362 match code {
363 4004 => {
364 eprintln!("[fluxer-rs] Authentication failed (4004) — invalid token, shutting down.");
365 return Ok(LoopControl::Done);
366 }
367 4010 => {
368 eprintln!("[fluxer-rs] Invalid shard (4010) — shutting down.");
369 return Ok(LoopControl::Done);
370 }
371 4011 => {
372 eprintln!("[fluxer-rs] Sharding required (4011) — shutting down.");
373 return Ok(LoopControl::Done);
374 }
375 4012 => {
376 eprintln!("[fluxer-rs] Invalid API version (4012) — shutting down.");
377 return Ok(LoopControl::Done);
378 }
379 _ => return Err(ClientError::ConnectionClosed),
380 }
381 }
382 WsMessage::Ping(d) => {
383 let _ = write.lock().await.send(WsMessage::Pong(d)).await;
384 continue;
385 }
386 _ => continue,
387 };
388
389 let payload: Value = serde_json::from_str(text.as_str())?;
390 let op = payload["op"].as_u64().unwrap_or(255);
391
392 if let Some(s) = payload["s"].as_u64() {
393 *last_seq = Some(s);
394 *seq_shared.lock().await = Some(s);
395 }
396
397 match op {
398 10 => {
399 let interval_ms = payload["d"]["heartbeat_interval"]
400 .as_u64()
401 .unwrap_or(41_250);
402
403 let write_hb = write.clone();
404 let seq_hb = seq_shared.clone();
405 let ack_hb = ack_shared.clone();
406
407 tokio::spawn(async move {
408 let jitter = Duration::from_millis(
409 (rand::random::<u64>() % interval_ms).max(1),
410 );
411 tokio::time::sleep(jitter).await;
412
413 let mut ticker =
414 tokio::time::interval(Duration::from_millis(interval_ms));
415 loop {
416 ticker.tick().await;
417
418 {
419 let mut ack = ack_hb.lock().await;
420 if !*ack {
421 eprintln!(
422 "[fluxer-rs] No heartbeat ACK — zombie connection, dropping."
423 );
424 break;
425 }
426 *ack = false;
427 }
428
429 let seq = *seq_hb.lock().await;
430 let hb = serde_json::json!({ "op": 1, "d": seq });
431 let mut guard = write_hb.lock().await;
432 if guard
433 .send(WsMessage::Text(hb.to_string().into()))
434 .await
435 .is_err()
436 {
437 break;
438 }
439 }
440 });
441 }
442
443 11 => {
444 *ack_shared.lock().await = true;
445 }
446
447 0 => {
448 let event_type = payload["t"].as_str().unwrap_or("").to_string();
449 let data = payload["d"].clone();
450 let ctx2 = ctx.clone();
451 let handler2 = handler.clone();
452
453 if event_type == "READY" {
454 if let Some(sid) = data["session_id"].as_str() {
455 *session_id = Some(sid.to_string());
456 }
457 if let Some(rurl) = data["resume_gateway_url"].as_str() {
458 *resume_url = Some(format!(
459 "{}/?v=1&encoding=json",
460 rurl.trim_end_matches('/')
461 ));
462 }
463 if let Some(guilds) = data["guilds"].as_array() {
466 let mut subscriptions = serde_json::Map::new();
467 for g in guilds {
468 if let Some(id) = g["id"].as_str() {
469 subscriptions.insert(
470 id.to_string(),
471 serde_json::json!({ "active": true, "typing": true }),
472 );
473 }
474 }
475 if !subscriptions.is_empty() {
476 let lazy = serde_json::json!({
477 "op": 14,
478 "d": { "subscriptions": subscriptions }
479 });
480 let _ = ctx.gateway_tx.send(lazy.to_string()).await;
481 }
482 }
483 }
484
485 tokio::spawn(async move {
486 cache_update(&ctx2.cache, &event_type, &data).await;
487 dispatch_event(event_type, data, ctx2, handler2).await;
488 });
489 }
490
491 7 => {
492 eprintln!("[fluxer-rs] Received op 7 Reconnect.");
493 return Ok(LoopControl::Reconnect { resume: true });
494 }
495
496 9 => {
497 let resumable = payload["d"].as_bool().unwrap_or(false);
498 eprintln!("[fluxer-rs] Invalid session (resumable={resumable}).");
499 return Ok(LoopControl::Reconnect { resume: resumable });
500 }
501
502 1 => {
503 let seq = *seq_shared.lock().await;
504 let hb = serde_json::json!({ "op": 1, "d": seq });
505 let _ = write
506 .lock()
507 .await
508 .send(WsMessage::Text(hb.to_string().into()))
509 .await;
510 }
511
512 _ => {}
513 }
514 }
515
516 Err(ClientError::ConnectionClosed)
517 }
518}
519
520async fn cache_update(cache: &Cache, event_type: &str, data: &Value) {
521 match event_type {
522 "READY" => {
523 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
524 *cache.current_user.write().await = Some(user);
525 }
526 }
527 "GUILD_CREATE" => {
528 if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
529 let guild_id = guild.id.clone();
530
531 if let Some(channels) = data["channels"].as_array() {
532 let mut ch_map = cache.channels.write().await;
533 for ch_val in channels {
534 if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
535 ch_map.insert(ch.id.clone(), ch);
536 }
537 }
538 }
539
540 if let Some(members) = data["members"].as_array() {
541 let mut user_map = cache.users.write().await;
542 for m_val in members {
543 if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
544 user_map.insert(user.id.clone(), user);
545 }
546 }
547 }
548 cache.guilds.write().await.insert(guild_id, guild);
549 }
550 }
551 "GUILD_SYNC" => {
552 if let Some(channels) = data["channels"].as_array() {
553 let mut ch_map = cache.channels.write().await;
554 for ch_val in channels {
555 if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
556 ch_map.insert(ch.id.clone(), ch);
557 }
558 }
559 }
560 if let Some(members) = data["members"].as_array() {
561 let mut user_map = cache.users.write().await;
562 for m_val in members {
563 if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
564 user_map.insert(user.id.clone(), user);
565 }
566 }
567 }
568 }
569 "GUILD_UPDATE" => {
570 if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
571 cache.guilds.write().await.insert(guild.id.clone(), guild);
572 }
573 }
574 "GUILD_DELETE" => {
575 if let Some(id) = data["id"].as_str() {
576 cache.guilds.write().await.remove(id);
577 }
578 }
579 "CHANNEL_CREATE" | "CHANNEL_UPDATE" => {
580 if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(data.clone()) {
581 cache.channels.write().await.insert(ch.id.clone(), ch);
582 }
583 }
584 "CHANNEL_DELETE" => {
585 if let Some(id) = data["id"].as_str() {
586 cache.channels.write().await.remove(id);
587 }
588 }
589 "GUILD_MEMBER_ADD" | "GUILD_MEMBER_UPDATE" => {
590 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
591 cache.users.write().await.insert(user.id.clone(), user);
592 }
593 }
594 "MESSAGE_CREATE" => {
595 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["author"].clone()) {
596 cache.users.write().await.insert(user.id.clone(), user);
597 }
598 }
599 _ => {}
600 }
601}
602
603async fn dispatch_event(
604 event_type: String,
605 data: Value,
606 ctx: Context,
607 handler: Arc<dyn EventHandler>,
608) {
609 use crate::model::{
610 AuthSessionChange, CallCreate, CallDelete, CallUpdate, Channel, ChannelPinsAck,
611 ChannelPinsUpdate, ChannelRecipientAdd, ChannelRecipientRemove, ChannelUpdateBulk, Guild,
612 GuildAuditLogEntryCreate, GuildBanAdd, GuildBanRemove, GuildEmojisUpdate, GuildMemberAdd,
613 GuildMemberListUpdate, GuildMemberRemove, GuildMembersChunk, GuildMemberUpdate,
614 GuildRoleCreate, GuildRoleDelete, GuildRoleUpdate, GuildRoleUpdateBulk, GuildStickersUpdate,
615 GuildSync, InviteCreate, InviteDelete, Message, MessageAck, MessageDelete, MessageDeleteBulk,
616 MessageReactionAddMany, MessageUpdate, PassiveUpdates, PresenceUpdate, PresenceUpdateBulk,
617 ReactionAdd, ReactionRemove, ReactionRemoveAll, ReactionRemoveEmoji, Ready,
618 RecentMentionDelete, RelationshipAdd, RelationshipRemove, RelationshipUpdate, Resumed,
619 SavedMessageCreate, SavedMessageDelete, SessionsReplace, TypingStart, UnavailableGuild,
620 UserConnectionsUpdate, UserGuildSettingsUpdate, UserNoteUpdate, UserPinnedDmsUpdate,
621 UserSettingsUpdate, UserUpdate, VoiceServerUpdate, VoiceStateUpdate, WebhooksUpdate,
622 };
623 use crate::model::voice::VoiceState;
624
625 macro_rules! dispatch {
626 ($method:ident, $ty:ty) => {{
627 match serde_json::from_value::<$ty>(data.clone()) {
628 Ok(v) => handler.$method(ctx, v).await,
629 Err(e) => eprintln!(
630 "[fluxer-rs] Failed to deserialize {} event: {}",
631 stringify!($ty),
632 e
633 ),
634 }
635 }};
636 }
637
638 match event_type.as_str() {
639 "READY" => dispatch!(on_ready, Ready),
640 "RESUMED" => dispatch!(on_resumed, Resumed),
641 "MESSAGE_CREATE" => dispatch!(on_message, Message),
642 "MESSAGE_UPDATE" => dispatch!(on_message_update, MessageUpdate),
643 "MESSAGE_DELETE" => dispatch!(on_message_delete, MessageDelete),
644 "MESSAGE_DELETE_BULK" => dispatch!(on_message_delete_bulk, MessageDeleteBulk),
645 "MESSAGE_REACTION_ADD" => dispatch!(on_reaction_add, ReactionAdd),
646 "MESSAGE_REACTION_REMOVE" => dispatch!(on_reaction_remove, ReactionRemove),
647 "MESSAGE_REACTION_REMOVE_ALL" => dispatch!(on_reaction_remove_all, ReactionRemoveAll),
648 "MESSAGE_REACTION_REMOVE_EMOJI" => dispatch!(on_reaction_remove_emoji, ReactionRemoveEmoji),
649 "TYPING_START" => dispatch!(on_typing_start, TypingStart),
650 "CHANNEL_CREATE" => dispatch!(on_channel_create, Channel),
651 "CHANNEL_UPDATE" => dispatch!(on_channel_update, Channel),
652 "CHANNEL_DELETE" => dispatch!(on_channel_delete, Channel),
653 "CHANNEL_PINS_UPDATE" => dispatch!(on_channel_pins_update, ChannelPinsUpdate),
654 "CHANNEL_PINS_ACK" => dispatch!(on_channel_pins_ack, ChannelPinsAck),
655 "GUILD_CREATE" => dispatch!(on_guild_create, Guild),
656 "GUILD_UPDATE" => dispatch!(on_guild_update, Guild),
657 "GUILD_DELETE" => dispatch!(on_guild_delete, UnavailableGuild),
658 "GUILD_MEMBER_ADD" => dispatch!(on_guild_member_add, GuildMemberAdd),
659 "GUILD_MEMBER_UPDATE" => dispatch!(on_guild_member_update, GuildMemberUpdate),
660 "GUILD_MEMBER_REMOVE" => dispatch!(on_guild_member_remove, GuildMemberRemove),
661 "GUILD_BAN_ADD" => dispatch!(on_guild_ban_add, GuildBanAdd),
662 "GUILD_BAN_REMOVE" => dispatch!(on_guild_ban_remove, GuildBanRemove),
663 "GUILD_ROLE_CREATE" => dispatch!(on_guild_role_create, GuildRoleCreate),
664 "GUILD_ROLE_UPDATE" => dispatch!(on_guild_role_update, GuildRoleUpdate),
665 "GUILD_ROLE_UPDATE_BULK" => dispatch!(on_guild_role_update_bulk, GuildRoleUpdateBulk),
666 "GUILD_ROLE_DELETE" => dispatch!(on_guild_role_delete, GuildRoleDelete),
667 "GUILD_EMOJIS_UPDATE" => dispatch!(on_guild_emojis_update, GuildEmojisUpdate),
668 "GUILD_STICKERS_UPDATE" => dispatch!(on_guild_stickers_update, GuildStickersUpdate),
669 "GUILD_AUDIT_LOG_ENTRY_CREATE" => dispatch!(on_guild_audit_log_entry_create, GuildAuditLogEntryCreate),
670 "CHANNEL_UPDATE_BULK" => dispatch!(on_channel_update_bulk, ChannelUpdateBulk),
671 "INVITE_CREATE" => dispatch!(on_invite_create, InviteCreate),
672 "INVITE_DELETE" => dispatch!(on_invite_delete, InviteDelete),
673 "WEBHOOKS_UPDATE" => dispatch!(on_webhooks_update, WebhooksUpdate),
674 "VOICE_STATE_UPDATE" => {
675 let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
676 let sess = data["session_id"].as_str().unwrap_or("").to_string();
677 if !guild_id.is_empty() && !sess.is_empty() {
678 let mut states = ctx.voice_states.lock().await;
679 let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
680 token: String::new(),
681 endpoint: String::new(),
682 session_id: None,
683 });
684 entry.session_id = Some(sess);
685 }
686 match serde_json::from_value::<VoiceStateUpdate>(data.clone()) {
687 Ok(v) => handler.on_voice_state_update(ctx, v).await,
688 Err(e) => eprintln!("[fluxer-rs] Failed to deserialize VOICE_STATE_UPDATE: {}", e),
689 }
690 }
691 "VOICE_SERVER_UPDATE" => {
692 let token = data["token"].as_str().unwrap_or("").to_string();
693 let endpoint = data["endpoint"].as_str().unwrap_or("").to_string();
694 let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
695 if !guild_id.is_empty() && !token.is_empty() && !endpoint.is_empty() {
696 let mut states = ctx.voice_states.lock().await;
697 let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
698 token: String::new(),
699 endpoint: String::new(),
700 session_id: None,
701 });
702 entry.token = token.clone();
703 entry.endpoint = if endpoint.starts_with("wss://")
704 || endpoint.starts_with("https://")
705 {
706 endpoint.clone()
707 } else {
708 format!("wss://{}", endpoint)
709 };
710 }
711 match serde_json::from_value::<VoiceServerUpdate>(data.clone()) {
712 Ok(v) => handler.on_voice_server_update(ctx, v).await,
713 Err(e) => eprintln!("[fluxer-rs] Failed to deserialize VOICE_SERVER_UPDATE: {}", e),
714 }
715 }
716
717 "PRESENCE_UPDATE" => dispatch!(on_presence_update, PresenceUpdate),
718 "PRESENCE_UPDATE_BULK" => dispatch!(on_presence_update_bulk, PresenceUpdateBulk),
719 "USER_SETTINGS_UPDATE" => dispatch!(on_user_settings_update, UserSettingsUpdate),
720 "USER_UPDATE" => dispatch!(on_user_update, UserUpdate),
721 "USER_GUILD_SETTINGS_UPDATE" => dispatch!(on_user_guild_settings_update, UserGuildSettingsUpdate),
722 "USER_PINNED_DMS_UPDATE" => dispatch!(on_user_pinned_dms_update, UserPinnedDmsUpdate),
723 "USER_NOTE_UPDATE" => dispatch!(on_user_note_update, UserNoteUpdate),
724 "USER_CONNECTIONS_UPDATE" => dispatch!(on_user_connections_update, UserConnectionsUpdate),
725 "AUTH_SESSION_CHANGE" => dispatch!(on_auth_session_change, AuthSessionChange),
726 "MESSAGE_ACK" => dispatch!(on_message_ack, MessageAck),
727 "SESSIONS_REPLACE" => {
728 match serde_json::from_value::<Vec<crate::model::SessionEntry>>(data.clone()) {
729 Ok(v) => handler.on_sessions_replace(ctx, SessionsReplace(v)).await,
730 Err(e) => eprintln!("[fluxer-rs] Failed to deserialize SESSIONS_REPLACE: {}", e),
731 }
732 }
733 "RELATIONSHIP_ADD" => dispatch!(on_relationship_add, RelationshipAdd),
734 "RELATIONSHIP_UPDATE" => dispatch!(on_relationship_update, RelationshipUpdate),
735 "RELATIONSHIP_REMOVE" => dispatch!(on_relationship_remove, RelationshipRemove),
736 "CALL_CREATE" => dispatch!(on_call_create, CallCreate),
737 "CALL_UPDATE" => dispatch!(on_call_update, CallUpdate),
738 "CALL_DELETE" => dispatch!(on_call_delete, CallDelete),
739 "CHANNEL_RECIPIENT_ADD" => dispatch!(on_channel_recipient_add, ChannelRecipientAdd),
740 "CHANNEL_RECIPIENT_REMOVE" => dispatch!(on_channel_recipient_remove, ChannelRecipientRemove),
741 "MESSAGE_REACTION_ADD_MANY" => dispatch!(on_message_reaction_add_many, MessageReactionAddMany),
742 "RECENT_MENTION_DELETE" => dispatch!(on_recent_mention_delete, RecentMentionDelete),
743 "SAVED_MESSAGE_CREATE" => dispatch!(on_saved_message_create, SavedMessageCreate),
744 "SAVED_MESSAGE_DELETE" => dispatch!(on_saved_message_delete, SavedMessageDelete),
745 "PASSIVE_UPDATES" => dispatch!(on_passive_updates, PassiveUpdates),
746 "GUILD_MEMBER_LIST_UPDATE" => dispatch!(on_guild_member_list_update, GuildMemberListUpdate),
747 "GUILD_MEMBERS_CHUNK" => dispatch!(on_guild_members_chunk, GuildMembersChunk),
748 "GUILD_SYNC" => dispatch!(on_guild_sync, GuildSync),
749
750 "INTERACTION_CREATE"
751 | "STAGE_INSTANCE_CREATE"
752 | "STAGE_INSTANCE_UPDATE"
753 | "STAGE_INSTANCE_DELETE" => {}
754
755 other => {
756 handler.on_unknown_event(ctx, other.to_string(), data).await;
757 }
758 }
759}