Skip to main content

fluxer/client/
mod.rs

1//! Gateway client and connection management.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use futures::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
9use crate::cache::Cache;
10use crate::error::ClientError;
11use crate::event::EventHandler;
12use crate::http::Http;
13use crate::model::voice::VoiceState;
14use std::time::Duration;
15
16const DEFAULT_API_URL: &str = "https://api.fluxer.app/v1";
17const DEFAULT_GATEWAY_URL: &str = "wss://gateway.fluxer.app/?v=1&encoding=json";
18
19enum LoopControl {
20    Done,
21    Reconnect { resume: bool },
22}
23
24/// Shared state passed into every event handler call. This is how you interact
25/// with the API from inside your event handlers.
26///
27/// ```rust,no_run
28/// # use fluxer::prelude::*;
29/// # async fn example(ctx: Context) {
30/// ctx.http.send_message("channel_id", "Hello!").await.unwrap();
31/// # }
32/// ```
33#[derive(Clone)]
34pub struct Context {
35    /// HTTP client for REST API calls.
36    pub http: Arc<Http>,
37    /// In-memory cache populated automatically from gateway events.
38    pub cache: Arc<Cache>,
39    /// Raw gateway sender. You probably won't need this directly --
40    /// voice join/leave use it internally.
41    pub gateway_tx: Arc<tokio::sync::mpsc::Sender<String>>,
42    /// Per-guild voice state, populated from `VOICE_STATE_UPDATE` and `VOICE_SERVER_UPDATE`.
43    /// Used internally by `join_voice` / `leave_voice`.
44    pub voice_states: Arc<Mutex<HashMap<String, VoiceState>>>,
45    pub(crate) live_rooms: Arc<Mutex<HashMap<String, std::sync::Arc<livekit::Room>>>>,
46}
47
48impl Context {
49    /// Joins a voice channel. Sends an opcode 4 to the gateway and waits
50    /// up to 10 seconds for the server to send back connection details.
51    pub async fn join_voice(
52        &self,
53        guild_id: &str,
54        channel_id: &str,
55    ) -> Result<crate::voice::FluxerVoiceConnection, ClientError> {
56        {
57            let mut states = self.voice_states.lock().await;
58            states.remove(guild_id);
59        }
60
61        let join_payload = serde_json::json!({
62            "op": 4,
63            "d": {
64                "guild_id": guild_id,
65                "channel_id": channel_id,
66                "self_mute": false,
67                "self_deaf": false
68            }
69        });
70        self.gateway_tx
71            .send(join_payload.to_string())
72            .await
73            .map_err(|e| ClientError::Voice(e.to_string()))?;
74
75        let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
76        let voice_state = loop {
77            {
78                let states = self.voice_states.lock().await;
79                if let Some(vs) = states.get(guild_id) {
80                    if !vs.token.is_empty() && !vs.endpoint.is_empty() {
81                        break vs.clone();
82                    }
83                }
84            }
85            if tokio::time::Instant::now() >= deadline {
86                return Err(ClientError::Voice(
87                    "Timed out waiting for VOICE_SERVER_UPDATE".into(),
88                ));
89            }
90            tokio::time::sleep(Duration::from_millis(100)).await;
91        };
92
93        let conn = crate::voice::FluxerVoiceConnection::connect(
94            &voice_state.endpoint,
95            &voice_state.token,
96        )
97        .await
98        .map_err(|e| ClientError::Voice(e.to_string()))?;
99
100        self.live_rooms.lock().await.insert(guild_id.to_string(), conn.room.clone());
101
102        Ok(conn)
103    }
104
105    /// Leaves a voice channel. Closes the LiveKit room and tells the gateway.
106    pub async fn leave_voice(&self, guild_id: &str) -> Result<(), ClientError> {
107        if let Some(room) = self.live_rooms.lock().await.remove(guild_id) {
108            let _ = room.close().await;
109        }
110
111        let payload = serde_json::json!({
112            "op": 4,
113            "d": {
114                "guild_id": guild_id,
115                "channel_id": null,
116                "self_mute": false,
117                "self_deaf": false
118            }
119        });
120        self.gateway_tx
121            .send(payload.to_string())
122            .await
123            .map_err(|e| ClientError::Voice(e.to_string()))?;
124        self.voice_states.lock().await.remove(guild_id);
125        Ok(())
126    }
127}
128
129/// Builder for creating a [`Client`]. You need at minimum a token and an event handler.
130///
131/// ```rust,no_run
132/// use fluxer::prelude::*;
133/// # struct MyHandler;
134/// # #[async_trait::async_trait]
135/// # impl EventHandler for MyHandler {}
136///
137/// let client = Client::builder("token")
138///     .event_handler(MyHandler)
139///     .build();
140/// ```
141pub struct ClientBuilder {
142    token: String,
143    api_url: String,
144    handler: Option<Arc<dyn EventHandler>>,
145}
146
147impl ClientBuilder {
148    pub fn new(token: impl Into<String>) -> Self {
149        Self {
150            token: token.into(),
151            api_url: DEFAULT_API_URL.to_string(),
152            handler: None,
153        }
154    }
155
156    /// Sets the event handler. Required -- the builder panics at `.build()` without this.
157    pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
158        self.handler = Some(Arc::new(handler));
159        self
160    }
161
162    /// Override the API base URL. Defaults to `https://api.fluxer.app/v1`.
163    pub fn api_url(mut self, url: impl Into<String>) -> Self {
164        self.api_url = url.into();
165        self
166    }
167
168    pub fn build(self) -> Client {
169        let http = Arc::new(Http::new(&self.token, self.api_url));
170        Client {
171            http,
172            cache: Cache::new(),
173            handler: self.handler.expect("call .event_handler() before .build()"),
174        }
175    }
176}
177
178/// The gateway client. Manages the WebSocket connection, heartbeating,
179/// reconnection, and event dispatch.
180///
181/// Call [`start`](Client::start) to connect. It runs until a fatal error
182/// happens (like an invalid token) and reconnects automatically on transient
183/// failures.
184pub struct Client {
185    pub(crate) http: Arc<Http>,
186    cache: Arc<Cache>,
187    handler: Arc<dyn EventHandler>,
188}
189
190impl Client {
191    pub fn builder(token: impl Into<String>) -> ClientBuilder {
192        ClientBuilder::new(token)
193    }
194
195    /// Connects to the gateway and starts processing events. Blocks forever
196    /// unless a fatal error occurs.
197    pub async fn start(&mut self) -> Result<(), ClientError> {
198        let mut session_id: Option<String> = None;
199        let mut resume_url: Option<String> = None;
200        let mut last_seq: Option<u64> = None;
201        let mut backoff = Duration::from_secs(1);
202
203        loop {
204            let result = self
205                .run_session(&mut session_id, &mut resume_url, &mut last_seq)
206                .await;
207
208            match result {
209                Ok(LoopControl::Done) => return Ok(()),
210
211                Ok(LoopControl::Reconnect { resume }) => {
212                    if !resume {
213                        session_id = None;
214                        resume_url = None;
215                        last_seq = None;
216                        let jitter = Duration::from_millis(1000 + (rand::random::<u64>() % 4000));
217                        tokio::time::sleep(jitter).await;
218                    } else {
219                        eprintln!("[fluxer-rs] Reconnecting in {:?} (will resume)...", backoff);
220                        tokio::time::sleep(backoff).await;
221                        backoff = (backoff * 2).min(Duration::from_secs(60));
222                        continue;
223                    }
224                }
225
226                Err(ClientError::ConnectionClosed) => {
227                    eprintln!("[fluxer-rs] Connection closed, reconnecting in {:?}...", backoff);
228                    tokio::time::sleep(backoff).await;
229                    backoff = (backoff * 2).min(Duration::from_secs(60));
230                    continue;
231                }
232
233                Err(e) => return Err(e),
234            }
235
236            backoff = Duration::from_secs(1);
237        }
238    }
239
240    async fn run_session(
241        &self,
242        session_id: &mut Option<String>,
243        resume_url: &mut Option<String>,
244        last_seq: &mut Option<u64>,
245    ) -> Result<LoopControl, ClientError> {
246        let gateway_url = if session_id.is_some() {
247            resume_url
248                .clone()
249                .unwrap_or_else(|| DEFAULT_GATEWAY_URL.to_string())
250        } else {
251            match self.http.get_gateway().await {
252                Ok(url) => {
253                    let base = url.trim_end_matches('/');
254                    format!("{}/?v=1&encoding=json", base)
255                }
256                Err(_) => DEFAULT_GATEWAY_URL.to_string(),
257            }
258        };
259
260        let (ws_stream, _) = connect_async(&gateway_url).await?;
261        let (write, mut read) = ws_stream.split();
262        let write = Arc::new(Mutex::new(write));
263        let seq_shared: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(*last_seq));
264        let ack_shared: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
265        let (gateway_tx, mut gateway_rx) = tokio::sync::mpsc::channel::<String>(64);
266        {
267            let write_fwd = write.clone();
268            tokio::spawn(async move {
269                while let Some(msg) = gateway_rx.recv().await {
270                    let mut guard = write_fwd.lock().await;
271                    if guard
272                        .send(WsMessage::Text(msg.into()))
273                        .await
274                        .is_err()
275                    {
276                        break;
277                    }
278                }
279            });
280        }
281
282        let ctx = Context {
283            http: self.http.clone(),
284            cache: self.cache.clone(),
285            gateway_tx: Arc::new(gateway_tx),
286            voice_states: Arc::new(Mutex::new(HashMap::new())),
287            live_rooms: Arc::new(Mutex::new(HashMap::new())),
288        };
289
290        let token = self.http.get_token().to_string();
291        if let (Some(sid), Some(seq)) = (session_id.as_deref(), *last_seq) {
292            let resume_payload = serde_json::json!({
293                "op": 6,
294                "d": { "token": token, "session_id": sid, "seq": seq }
295            });
296            write
297                .lock()
298                .await
299                .send(WsMessage::Text(resume_payload.to_string().into()))
300                .await?;
301        } else {
302            let identify = serde_json::json!({
303                "op": 2,
304                "d": {
305                    "token": token,
306                    "intents": 0,   // Fluxer has no intents yet
307                    "properties": {
308                        "os": "linux",
309                        "browser": "fluxer-rust",
310                        "device": "fluxer-rust"
311                    }
312                }
313            });
314            write
315                .lock()
316                .await
317                .send(WsMessage::Text(identify.to_string().into()))
318                .await?;
319        }
320
321        let handler = self.handler.clone();
322
323        while let Some(msg_result) = read.next().await {
324            let text = match msg_result? {
325                WsMessage::Text(t) => t,
326                WsMessage::Close(frame) => {
327                    let code = frame.as_ref().map(|f| u16::from(f.code)).unwrap_or(0);
328                    match code {
329                        4004 => {
330                            eprintln!("[fluxer-rs] Authentication failed (4004) — invalid token, shutting down.");
331                            return Ok(LoopControl::Done);
332                        }
333                        4010 => {
334                            eprintln!("[fluxer-rs] Invalid shard (4010) — shutting down.");
335                            return Ok(LoopControl::Done);
336                        }
337                        4011 => {
338                            eprintln!("[fluxer-rs] Sharding required (4011) — shutting down.");
339                            return Ok(LoopControl::Done);
340                        }
341                        4012 => {
342                            eprintln!("[fluxer-rs] Invalid API version (4012) — shutting down.");
343                            return Ok(LoopControl::Done);
344                        }
345                        _ => return Err(ClientError::ConnectionClosed),
346                    }
347                }
348                WsMessage::Ping(d) => {
349                    let _ = write.lock().await.send(WsMessage::Pong(d)).await;
350                    continue;
351                }
352                _ => continue,
353            };
354
355            let payload: Value = serde_json::from_str(text.as_str())?;
356            let op = payload["op"].as_u64().unwrap_or(255);
357
358            if let Some(s) = payload["s"].as_u64() {
359                *last_seq = Some(s);
360                *seq_shared.lock().await = Some(s);
361            }
362
363            match op {
364                10 => {
365                    let interval_ms = payload["d"]["heartbeat_interval"]
366                        .as_u64()
367                        .unwrap_or(41_250);
368
369                    let write_hb = write.clone();
370                    let seq_hb = seq_shared.clone();
371                    let ack_hb = ack_shared.clone();
372
373                    tokio::spawn(async move {
374                        let jitter = Duration::from_millis(
375                            (rand::random::<u64>() % interval_ms).max(1),
376                        );
377                        tokio::time::sleep(jitter).await;
378
379                        let mut ticker =
380                            tokio::time::interval(Duration::from_millis(interval_ms));
381                        loop {
382                            ticker.tick().await;
383
384                            {
385                                let mut ack = ack_hb.lock().await;
386                                if !*ack {
387                                    eprintln!(
388                                        "[fluxer-rs] No heartbeat ACK — zombie connection, dropping."
389                                    );
390                                    break;
391                                }
392                                *ack = false;
393                            }
394
395                            let seq = *seq_hb.lock().await;
396                            let hb = serde_json::json!({ "op": 1, "d": seq });
397                            let mut guard = write_hb.lock().await;
398                            if guard
399                                .send(WsMessage::Text(hb.to_string().into()))
400                                .await
401                                .is_err()
402                            {
403                                break;
404                            }
405                        }
406                    });
407                }
408
409                11 => {
410                    *ack_shared.lock().await = true;
411                }
412
413                0 => {
414                    let event_type = payload["t"].as_str().unwrap_or("").to_string();
415                    let data = payload["d"].clone();
416                    let ctx2 = ctx.clone();
417                    let handler2 = handler.clone();
418
419                    if event_type == "READY" {
420                        if let Some(sid) = data["session_id"].as_str() {
421                            *session_id = Some(sid.to_string());
422                        }
423                        if let Some(rurl) = data["resume_gateway_url"].as_str() {
424                            *resume_url = Some(format!(
425                                "{}/?v=1&encoding=json",
426                                rurl.trim_end_matches('/')
427                            ));
428                        }
429                    }
430
431                    tokio::spawn(async move {
432                        cache_update(&ctx2.cache, &event_type, &data).await;
433                        dispatch_event(event_type, data, ctx2, handler2).await;
434                    });
435                }
436
437                7 => {
438                    eprintln!("[fluxer-rs] Received op 7 Reconnect.");
439                    return Ok(LoopControl::Reconnect { resume: true });
440                }
441
442                9 => {
443                    let resumable = payload["d"].as_bool().unwrap_or(false);
444                    eprintln!("[fluxer-rs] Invalid session (resumable={resumable}).");
445                    return Ok(LoopControl::Reconnect { resume: resumable });
446                }
447
448                1 => {
449                    let seq = *seq_shared.lock().await;
450                    let hb = serde_json::json!({ "op": 1, "d": seq });
451                    let _ = write
452                        .lock()
453                        .await
454                        .send(WsMessage::Text(hb.to_string().into()))
455                        .await;
456                }
457
458                _ => {}
459            }
460        }
461
462        Err(ClientError::ConnectionClosed)
463    }
464}
465
466async fn cache_update(cache: &Cache, event_type: &str, data: &Value) {
467    match event_type {
468        "READY" => {
469            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
470                *cache.current_user.write().await = Some(user);
471            }
472        }
473        "GUILD_CREATE" => {
474            if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
475                let guild_id = guild.id.clone();
476
477                if let Some(channels) = data["channels"].as_array() {
478                    let mut ch_map = cache.channels.write().await;
479                    for ch_val in channels {
480                        if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
481                            ch_map.insert(ch.id.clone(), ch);
482                        }
483                    }
484                }
485                
486                if let Some(members) = data["members"].as_array() {
487                    let mut user_map = cache.users.write().await;
488                    for m_val in members {
489                        if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
490                            user_map.insert(user.id.clone(), user);
491                        }
492                    }
493                }
494                cache.guilds.write().await.insert(guild_id, guild);
495            }
496        }
497        "GUILD_UPDATE" => {
498            if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
499                cache.guilds.write().await.insert(guild.id.clone(), guild);
500            }
501        }
502        "GUILD_DELETE" => {
503            if let Some(id) = data["id"].as_str() {
504                cache.guilds.write().await.remove(id);
505            }
506        }
507        "CHANNEL_CREATE" | "CHANNEL_UPDATE" => {
508            if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(data.clone()) {
509                cache.channels.write().await.insert(ch.id.clone(), ch);
510            }
511        }
512        "CHANNEL_DELETE" => {
513            if let Some(id) = data["id"].as_str() {
514                cache.channels.write().await.remove(id);
515            }
516        }
517        "GUILD_MEMBER_ADD" | "GUILD_MEMBER_UPDATE" => {
518            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
519                cache.users.write().await.insert(user.id.clone(), user);
520            }
521        }
522        "MESSAGE_CREATE" => {
523            if let Ok(user) = serde_json::from_value::<crate::model::User>(data["author"].clone()) {
524                cache.users.write().await.insert(user.id.clone(), user);
525            }
526        }
527        _ => {}
528    }
529}
530
531async fn dispatch_event(
532    event_type: String,
533    data: Value,
534    ctx: Context,
535    handler: Arc<dyn EventHandler>,
536) {
537    use crate::model::{
538        Channel, ChannelPinsUpdate, ChannelUpdateBulk, Guild, GuildBanAdd, GuildBanRemove,
539        GuildEmojisUpdate, GuildMemberAdd, GuildMemberRemove, GuildMemberUpdate,
540        GuildRoleCreate, GuildRoleDelete, GuildRoleUpdate, GuildRoleUpdateBulk,
541        GuildStickersUpdate, InviteCreate, InviteDelete, WebhooksUpdate,
542        Message, MessageDelete, MessageDeleteBulk, MessageUpdate, ReactionAdd,
543        ReactionRemove, ReactionRemoveAll, ReactionRemoveEmoji, Ready, TypingStart,
544        UnavailableGuild,
545    };
546    use crate::model::voice::VoiceState;
547
548    macro_rules! dispatch {
549        ($method:ident, $ty:ty) => {{
550            match serde_json::from_value::<$ty>(data.clone()) {
551                Ok(v) => handler.$method(ctx, v).await,
552                Err(e) => eprintln!(
553                    "[fluxer-rs] Failed to deserialize {} event: {}",
554                    stringify!($ty),
555                    e
556                ),
557            }
558        }};
559    }
560
561    match event_type.as_str() {
562        "READY"   => dispatch!(on_ready, Ready),
563        "RESUMED" => eprintln!("[fluxer-rs] Session resumed successfully."),
564        "MESSAGE_CREATE"      => dispatch!(on_message, Message),
565        "MESSAGE_UPDATE"      => dispatch!(on_message_update, MessageUpdate),
566        "MESSAGE_DELETE"      => dispatch!(on_message_delete, MessageDelete),
567        "MESSAGE_DELETE_BULK" => dispatch!(on_message_delete_bulk, MessageDeleteBulk),
568        "MESSAGE_REACTION_ADD"          => dispatch!(on_reaction_add, ReactionAdd),
569        "MESSAGE_REACTION_REMOVE"       => dispatch!(on_reaction_remove, ReactionRemove),
570        "MESSAGE_REACTION_REMOVE_ALL"   => dispatch!(on_reaction_remove_all, ReactionRemoveAll),
571        "MESSAGE_REACTION_REMOVE_EMOJI" => dispatch!(on_reaction_remove_emoji, ReactionRemoveEmoji),
572        "TYPING_START" => dispatch!(on_typing_start, TypingStart),
573        "CHANNEL_CREATE"      => dispatch!(on_channel_create, Channel),
574        "CHANNEL_UPDATE"      => dispatch!(on_channel_update, Channel),
575        "CHANNEL_DELETE"      => dispatch!(on_channel_delete, Channel),
576        "CHANNEL_PINS_UPDATE" => dispatch!(on_channel_pins_update, ChannelPinsUpdate),
577        "GUILD_CREATE" => dispatch!(on_guild_create, Guild),
578        "GUILD_UPDATE" => dispatch!(on_guild_update, Guild),
579        "GUILD_DELETE" => dispatch!(on_guild_delete, UnavailableGuild),
580        "GUILD_MEMBER_ADD"    => dispatch!(on_guild_member_add, GuildMemberAdd),
581        "GUILD_MEMBER_UPDATE" => dispatch!(on_guild_member_update, GuildMemberUpdate),
582        "GUILD_MEMBER_REMOVE" => dispatch!(on_guild_member_remove, GuildMemberRemove),
583        "GUILD_BAN_ADD"    => dispatch!(on_guild_ban_add, GuildBanAdd),
584        "GUILD_BAN_REMOVE" => dispatch!(on_guild_ban_remove, GuildBanRemove),
585        "GUILD_ROLE_CREATE"      => dispatch!(on_guild_role_create, GuildRoleCreate),
586        "GUILD_ROLE_UPDATE"      => dispatch!(on_guild_role_update, GuildRoleUpdate),
587        "GUILD_ROLE_UPDATE_BULK" => dispatch!(on_guild_role_update_bulk, GuildRoleUpdateBulk),
588        "GUILD_ROLE_DELETE"      => dispatch!(on_guild_role_delete, GuildRoleDelete),
589        "GUILD_EMOJIS_UPDATE"   => dispatch!(on_guild_emojis_update, GuildEmojisUpdate),
590        "GUILD_STICKERS_UPDATE" => dispatch!(on_guild_stickers_update, GuildStickersUpdate),
591        "CHANNEL_UPDATE_BULK" => dispatch!(on_channel_update_bulk, ChannelUpdateBulk),
592        "INVITE_CREATE" => dispatch!(on_invite_create, InviteCreate),
593        "INVITE_DELETE" => dispatch!(on_invite_delete, InviteDelete),
594        "WEBHOOKS_UPDATE" => dispatch!(on_webhooks_update, WebhooksUpdate),
595        "VOICE_STATE_UPDATE" => {
596            let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
597            let sess = data["session_id"].as_str().unwrap_or("").to_string();
598            if !guild_id.is_empty() && !sess.is_empty() {
599                let mut states = ctx.voice_states.lock().await;
600                let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
601                    token: String::new(),
602                    endpoint: String::new(),
603                    session_id: None,
604                });
605                entry.session_id = Some(sess);
606            }
607        }
608        "VOICE_SERVER_UPDATE" => {
609            let token = data["token"].as_str().unwrap_or("").to_string();
610            let endpoint = data["endpoint"].as_str().unwrap_or("").to_string();
611            let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
612            if !guild_id.is_empty() && !token.is_empty() && !endpoint.is_empty() {
613                let mut states = ctx.voice_states.lock().await;
614                let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
615                    token: String::new(),
616                    endpoint: String::new(),
617                    session_id: None,
618                });
619                entry.token = token;
620                entry.endpoint = if endpoint.starts_with("wss://")
621                    || endpoint.starts_with("https://")
622                {
623                    endpoint
624                } else {
625                    format!("wss://{}", endpoint)
626                };
627            }
628        }
629
630        "INTERACTION_CREATE"
631        | "SESSIONS_REPLACE"
632        | "STAGE_INSTANCE_CREATE"
633        | "STAGE_INSTANCE_UPDATE"
634        | "STAGE_INSTANCE_DELETE" => {}
635
636        other => {
637            eprintln!("[fluxer-rs] Unknown event: {}", other);
638        }
639    }
640}