1use std::collections::HashMap;
4use std::sync::Arc;
5use futures::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
9use crate::cache::Cache;
10use crate::error::ClientError;
11use crate::event::EventHandler;
12use crate::http::Http;
13use crate::model::voice::VoiceState;
14use std::time::Duration;
15
16const DEFAULT_API_URL: &str = "https://api.fluxer.app/v1";
17const DEFAULT_GATEWAY_URL: &str = "wss://gateway.fluxer.app/?v=1&encoding=json";
18
19enum LoopControl {
20 Done,
21 Reconnect { resume: bool },
22}
23
24#[derive(Clone)]
34pub struct Context {
35 pub http: Arc<Http>,
37 pub cache: Arc<Cache>,
39 pub gateway_tx: Arc<tokio::sync::mpsc::Sender<String>>,
42 pub voice_states: Arc<Mutex<HashMap<String, VoiceState>>>,
45 pub(crate) live_rooms: Arc<Mutex<HashMap<String, std::sync::Arc<livekit::Room>>>>,
46}
47
48impl Context {
49 pub async fn join_voice(
52 &self,
53 guild_id: &str,
54 channel_id: &str,
55 ) -> Result<crate::voice::FluxerVoiceConnection, ClientError> {
56 {
57 let mut states = self.voice_states.lock().await;
58 states.remove(guild_id);
59 }
60
61 let join_payload = serde_json::json!({
62 "op": 4,
63 "d": {
64 "guild_id": guild_id,
65 "channel_id": channel_id,
66 "self_mute": false,
67 "self_deaf": false
68 }
69 });
70 self.gateway_tx
71 .send(join_payload.to_string())
72 .await
73 .map_err(|e| ClientError::Voice(e.to_string()))?;
74
75 let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
76 let voice_state = loop {
77 {
78 let states = self.voice_states.lock().await;
79 if let Some(vs) = states.get(guild_id) {
80 if !vs.token.is_empty() && !vs.endpoint.is_empty() {
81 break vs.clone();
82 }
83 }
84 }
85 if tokio::time::Instant::now() >= deadline {
86 return Err(ClientError::Voice(
87 "Timed out waiting for VOICE_SERVER_UPDATE".into(),
88 ));
89 }
90 tokio::time::sleep(Duration::from_millis(100)).await;
91 };
92
93 let conn = crate::voice::FluxerVoiceConnection::connect(
94 &voice_state.endpoint,
95 &voice_state.token,
96 )
97 .await
98 .map_err(|e| ClientError::Voice(e.to_string()))?;
99
100 self.live_rooms.lock().await.insert(guild_id.to_string(), conn.room.clone());
101
102 Ok(conn)
103 }
104
105 pub async fn leave_voice(&self, guild_id: &str) -> Result<(), ClientError> {
107 if let Some(room) = self.live_rooms.lock().await.remove(guild_id) {
108 let _ = room.close().await;
109 }
110
111 let payload = serde_json::json!({
112 "op": 4,
113 "d": {
114 "guild_id": guild_id,
115 "channel_id": null,
116 "self_mute": false,
117 "self_deaf": false
118 }
119 });
120 self.gateway_tx
121 .send(payload.to_string())
122 .await
123 .map_err(|e| ClientError::Voice(e.to_string()))?;
124 self.voice_states.lock().await.remove(guild_id);
125 Ok(())
126 }
127}
128
129pub struct ClientBuilder {
142 token: String,
143 api_url: String,
144 handler: Option<Arc<dyn EventHandler>>,
145}
146
147impl ClientBuilder {
148 pub fn new(token: impl Into<String>) -> Self {
149 Self {
150 token: token.into(),
151 api_url: DEFAULT_API_URL.to_string(),
152 handler: None,
153 }
154 }
155
156 pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
158 self.handler = Some(Arc::new(handler));
159 self
160 }
161
162 pub fn api_url(mut self, url: impl Into<String>) -> Self {
164 self.api_url = url.into();
165 self
166 }
167
168 pub fn build(self) -> Client {
169 let http = Arc::new(Http::new(&self.token, self.api_url));
170 Client {
171 http,
172 cache: Cache::new(),
173 handler: self.handler.expect("call .event_handler() before .build()"),
174 }
175 }
176}
177
178pub struct Client {
185 pub(crate) http: Arc<Http>,
186 cache: Arc<Cache>,
187 handler: Arc<dyn EventHandler>,
188}
189
190impl Client {
191 pub fn builder(token: impl Into<String>) -> ClientBuilder {
192 ClientBuilder::new(token)
193 }
194
195 pub async fn start(&mut self) -> Result<(), ClientError> {
198 let mut session_id: Option<String> = None;
199 let mut resume_url: Option<String> = None;
200 let mut last_seq: Option<u64> = None;
201 let mut backoff = Duration::from_secs(1);
202
203 loop {
204 let result = self
205 .run_session(&mut session_id, &mut resume_url, &mut last_seq)
206 .await;
207
208 match result {
209 Ok(LoopControl::Done) => return Ok(()),
210
211 Ok(LoopControl::Reconnect { resume }) => {
212 if !resume {
213 session_id = None;
214 resume_url = None;
215 last_seq = None;
216 let jitter = Duration::from_millis(1000 + (rand::random::<u64>() % 4000));
217 tokio::time::sleep(jitter).await;
218 } else {
219 eprintln!("[fluxer-rs] Reconnecting in {:?} (will resume)...", backoff);
220 tokio::time::sleep(backoff).await;
221 backoff = (backoff * 2).min(Duration::from_secs(60));
222 continue;
223 }
224 }
225
226 Err(ClientError::ConnectionClosed) => {
227 eprintln!("[fluxer-rs] Connection closed, reconnecting in {:?}...", backoff);
228 tokio::time::sleep(backoff).await;
229 backoff = (backoff * 2).min(Duration::from_secs(60));
230 continue;
231 }
232
233 Err(e) => return Err(e),
234 }
235
236 backoff = Duration::from_secs(1);
237 }
238 }
239
240 async fn run_session(
241 &self,
242 session_id: &mut Option<String>,
243 resume_url: &mut Option<String>,
244 last_seq: &mut Option<u64>,
245 ) -> Result<LoopControl, ClientError> {
246 let gateway_url = if session_id.is_some() {
247 resume_url
248 .clone()
249 .unwrap_or_else(|| DEFAULT_GATEWAY_URL.to_string())
250 } else {
251 match self.http.get_gateway().await {
252 Ok(url) => {
253 let base = url.trim_end_matches('/');
254 format!("{}/?v=1&encoding=json", base)
255 }
256 Err(_) => DEFAULT_GATEWAY_URL.to_string(),
257 }
258 };
259
260 let (ws_stream, _) = connect_async(&gateway_url).await?;
261 let (write, mut read) = ws_stream.split();
262 let write = Arc::new(Mutex::new(write));
263 let seq_shared: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(*last_seq));
264 let ack_shared: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
265 let (gateway_tx, mut gateway_rx) = tokio::sync::mpsc::channel::<String>(64);
266 {
267 let write_fwd = write.clone();
268 tokio::spawn(async move {
269 while let Some(msg) = gateway_rx.recv().await {
270 let mut guard = write_fwd.lock().await;
271 if guard
272 .send(WsMessage::Text(msg.into()))
273 .await
274 .is_err()
275 {
276 break;
277 }
278 }
279 });
280 }
281
282 let ctx = Context {
283 http: self.http.clone(),
284 cache: self.cache.clone(),
285 gateway_tx: Arc::new(gateway_tx),
286 voice_states: Arc::new(Mutex::new(HashMap::new())),
287 live_rooms: Arc::new(Mutex::new(HashMap::new())),
288 };
289
290 let token = self.http.get_token().to_string();
291 if let (Some(sid), Some(seq)) = (session_id.as_deref(), *last_seq) {
292 let resume_payload = serde_json::json!({
293 "op": 6,
294 "d": { "token": token, "session_id": sid, "seq": seq }
295 });
296 write
297 .lock()
298 .await
299 .send(WsMessage::Text(resume_payload.to_string().into()))
300 .await?;
301 } else {
302 let identify = serde_json::json!({
303 "op": 2,
304 "d": {
305 "token": token,
306 "intents": 0, "properties": {
308 "os": "linux",
309 "browser": "fluxer-rust",
310 "device": "fluxer-rust"
311 }
312 }
313 });
314 write
315 .lock()
316 .await
317 .send(WsMessage::Text(identify.to_string().into()))
318 .await?;
319 }
320
321 let handler = self.handler.clone();
322
323 while let Some(msg_result) = read.next().await {
324 let text = match msg_result? {
325 WsMessage::Text(t) => t,
326 WsMessage::Close(frame) => {
327 let code = frame.as_ref().map(|f| u16::from(f.code)).unwrap_or(0);
328 match code {
329 4004 => {
330 eprintln!("[fluxer-rs] Authentication failed (4004) — invalid token, shutting down.");
331 return Ok(LoopControl::Done);
332 }
333 4010 => {
334 eprintln!("[fluxer-rs] Invalid shard (4010) — shutting down.");
335 return Ok(LoopControl::Done);
336 }
337 4011 => {
338 eprintln!("[fluxer-rs] Sharding required (4011) — shutting down.");
339 return Ok(LoopControl::Done);
340 }
341 4012 => {
342 eprintln!("[fluxer-rs] Invalid API version (4012) — shutting down.");
343 return Ok(LoopControl::Done);
344 }
345 _ => return Err(ClientError::ConnectionClosed),
346 }
347 }
348 WsMessage::Ping(d) => {
349 let _ = write.lock().await.send(WsMessage::Pong(d)).await;
350 continue;
351 }
352 _ => continue,
353 };
354
355 let payload: Value = serde_json::from_str(text.as_str())?;
356 let op = payload["op"].as_u64().unwrap_or(255);
357
358 if let Some(s) = payload["s"].as_u64() {
359 *last_seq = Some(s);
360 *seq_shared.lock().await = Some(s);
361 }
362
363 match op {
364 10 => {
365 let interval_ms = payload["d"]["heartbeat_interval"]
366 .as_u64()
367 .unwrap_or(41_250);
368
369 let write_hb = write.clone();
370 let seq_hb = seq_shared.clone();
371 let ack_hb = ack_shared.clone();
372
373 tokio::spawn(async move {
374 let jitter = Duration::from_millis(
375 (rand::random::<u64>() % interval_ms).max(1),
376 );
377 tokio::time::sleep(jitter).await;
378
379 let mut ticker =
380 tokio::time::interval(Duration::from_millis(interval_ms));
381 loop {
382 ticker.tick().await;
383
384 {
385 let mut ack = ack_hb.lock().await;
386 if !*ack {
387 eprintln!(
388 "[fluxer-rs] No heartbeat ACK — zombie connection, dropping."
389 );
390 break;
391 }
392 *ack = false;
393 }
394
395 let seq = *seq_hb.lock().await;
396 let hb = serde_json::json!({ "op": 1, "d": seq });
397 let mut guard = write_hb.lock().await;
398 if guard
399 .send(WsMessage::Text(hb.to_string().into()))
400 .await
401 .is_err()
402 {
403 break;
404 }
405 }
406 });
407 }
408
409 11 => {
410 *ack_shared.lock().await = true;
411 }
412
413 0 => {
414 let event_type = payload["t"].as_str().unwrap_or("").to_string();
415 let data = payload["d"].clone();
416 let ctx2 = ctx.clone();
417 let handler2 = handler.clone();
418
419 if event_type == "READY" {
420 if let Some(sid) = data["session_id"].as_str() {
421 *session_id = Some(sid.to_string());
422 }
423 if let Some(rurl) = data["resume_gateway_url"].as_str() {
424 *resume_url = Some(format!(
425 "{}/?v=1&encoding=json",
426 rurl.trim_end_matches('/')
427 ));
428 }
429 }
430
431 tokio::spawn(async move {
432 cache_update(&ctx2.cache, &event_type, &data).await;
433 dispatch_event(event_type, data, ctx2, handler2).await;
434 });
435 }
436
437 7 => {
438 eprintln!("[fluxer-rs] Received op 7 Reconnect.");
439 return Ok(LoopControl::Reconnect { resume: true });
440 }
441
442 9 => {
443 let resumable = payload["d"].as_bool().unwrap_or(false);
444 eprintln!("[fluxer-rs] Invalid session (resumable={resumable}).");
445 return Ok(LoopControl::Reconnect { resume: resumable });
446 }
447
448 1 => {
449 let seq = *seq_shared.lock().await;
450 let hb = serde_json::json!({ "op": 1, "d": seq });
451 let _ = write
452 .lock()
453 .await
454 .send(WsMessage::Text(hb.to_string().into()))
455 .await;
456 }
457
458 _ => {}
459 }
460 }
461
462 Err(ClientError::ConnectionClosed)
463 }
464}
465
466async fn cache_update(cache: &Cache, event_type: &str, data: &Value) {
467 match event_type {
468 "READY" => {
469 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
470 *cache.current_user.write().await = Some(user);
471 }
472 }
473 "GUILD_CREATE" => {
474 if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
475 let guild_id = guild.id.clone();
476
477 if let Some(channels) = data["channels"].as_array() {
478 let mut ch_map = cache.channels.write().await;
479 for ch_val in channels {
480 if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
481 ch_map.insert(ch.id.clone(), ch);
482 }
483 }
484 }
485
486 if let Some(members) = data["members"].as_array() {
487 let mut user_map = cache.users.write().await;
488 for m_val in members {
489 if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
490 user_map.insert(user.id.clone(), user);
491 }
492 }
493 }
494 cache.guilds.write().await.insert(guild_id, guild);
495 }
496 }
497 "GUILD_UPDATE" => {
498 if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
499 cache.guilds.write().await.insert(guild.id.clone(), guild);
500 }
501 }
502 "GUILD_DELETE" => {
503 if let Some(id) = data["id"].as_str() {
504 cache.guilds.write().await.remove(id);
505 }
506 }
507 "CHANNEL_CREATE" | "CHANNEL_UPDATE" => {
508 if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(data.clone()) {
509 cache.channels.write().await.insert(ch.id.clone(), ch);
510 }
511 }
512 "CHANNEL_DELETE" => {
513 if let Some(id) = data["id"].as_str() {
514 cache.channels.write().await.remove(id);
515 }
516 }
517 "GUILD_MEMBER_ADD" | "GUILD_MEMBER_UPDATE" => {
518 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
519 cache.users.write().await.insert(user.id.clone(), user);
520 }
521 }
522 "MESSAGE_CREATE" => {
523 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["author"].clone()) {
524 cache.users.write().await.insert(user.id.clone(), user);
525 }
526 }
527 _ => {}
528 }
529}
530
531async fn dispatch_event(
532 event_type: String,
533 data: Value,
534 ctx: Context,
535 handler: Arc<dyn EventHandler>,
536) {
537 use crate::model::{
538 Channel, ChannelPinsUpdate, ChannelUpdateBulk, Guild, GuildBanAdd, GuildBanRemove,
539 GuildEmojisUpdate, GuildMemberAdd, GuildMemberRemove, GuildMemberUpdate,
540 GuildRoleCreate, GuildRoleDelete, GuildRoleUpdate, GuildRoleUpdateBulk,
541 GuildStickersUpdate, InviteCreate, InviteDelete, WebhooksUpdate,
542 Message, MessageDelete, MessageDeleteBulk, MessageUpdate, ReactionAdd,
543 ReactionRemove, ReactionRemoveAll, ReactionRemoveEmoji, Ready, TypingStart,
544 UnavailableGuild,
545 };
546 use crate::model::voice::VoiceState;
547
548 macro_rules! dispatch {
549 ($method:ident, $ty:ty) => {{
550 match serde_json::from_value::<$ty>(data.clone()) {
551 Ok(v) => handler.$method(ctx, v).await,
552 Err(e) => eprintln!(
553 "[fluxer-rs] Failed to deserialize {} event: {}",
554 stringify!($ty),
555 e
556 ),
557 }
558 }};
559 }
560
561 match event_type.as_str() {
562 "READY" => dispatch!(on_ready, Ready),
563 "RESUMED" => eprintln!("[fluxer-rs] Session resumed successfully."),
564 "MESSAGE_CREATE" => dispatch!(on_message, Message),
565 "MESSAGE_UPDATE" => dispatch!(on_message_update, MessageUpdate),
566 "MESSAGE_DELETE" => dispatch!(on_message_delete, MessageDelete),
567 "MESSAGE_DELETE_BULK" => dispatch!(on_message_delete_bulk, MessageDeleteBulk),
568 "MESSAGE_REACTION_ADD" => dispatch!(on_reaction_add, ReactionAdd),
569 "MESSAGE_REACTION_REMOVE" => dispatch!(on_reaction_remove, ReactionRemove),
570 "MESSAGE_REACTION_REMOVE_ALL" => dispatch!(on_reaction_remove_all, ReactionRemoveAll),
571 "MESSAGE_REACTION_REMOVE_EMOJI" => dispatch!(on_reaction_remove_emoji, ReactionRemoveEmoji),
572 "TYPING_START" => dispatch!(on_typing_start, TypingStart),
573 "CHANNEL_CREATE" => dispatch!(on_channel_create, Channel),
574 "CHANNEL_UPDATE" => dispatch!(on_channel_update, Channel),
575 "CHANNEL_DELETE" => dispatch!(on_channel_delete, Channel),
576 "CHANNEL_PINS_UPDATE" => dispatch!(on_channel_pins_update, ChannelPinsUpdate),
577 "GUILD_CREATE" => dispatch!(on_guild_create, Guild),
578 "GUILD_UPDATE" => dispatch!(on_guild_update, Guild),
579 "GUILD_DELETE" => dispatch!(on_guild_delete, UnavailableGuild),
580 "GUILD_MEMBER_ADD" => dispatch!(on_guild_member_add, GuildMemberAdd),
581 "GUILD_MEMBER_UPDATE" => dispatch!(on_guild_member_update, GuildMemberUpdate),
582 "GUILD_MEMBER_REMOVE" => dispatch!(on_guild_member_remove, GuildMemberRemove),
583 "GUILD_BAN_ADD" => dispatch!(on_guild_ban_add, GuildBanAdd),
584 "GUILD_BAN_REMOVE" => dispatch!(on_guild_ban_remove, GuildBanRemove),
585 "GUILD_ROLE_CREATE" => dispatch!(on_guild_role_create, GuildRoleCreate),
586 "GUILD_ROLE_UPDATE" => dispatch!(on_guild_role_update, GuildRoleUpdate),
587 "GUILD_ROLE_UPDATE_BULK" => dispatch!(on_guild_role_update_bulk, GuildRoleUpdateBulk),
588 "GUILD_ROLE_DELETE" => dispatch!(on_guild_role_delete, GuildRoleDelete),
589 "GUILD_EMOJIS_UPDATE" => dispatch!(on_guild_emojis_update, GuildEmojisUpdate),
590 "GUILD_STICKERS_UPDATE" => dispatch!(on_guild_stickers_update, GuildStickersUpdate),
591 "CHANNEL_UPDATE_BULK" => dispatch!(on_channel_update_bulk, ChannelUpdateBulk),
592 "INVITE_CREATE" => dispatch!(on_invite_create, InviteCreate),
593 "INVITE_DELETE" => dispatch!(on_invite_delete, InviteDelete),
594 "WEBHOOKS_UPDATE" => dispatch!(on_webhooks_update, WebhooksUpdate),
595 "VOICE_STATE_UPDATE" => {
596 let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
597 let sess = data["session_id"].as_str().unwrap_or("").to_string();
598 if !guild_id.is_empty() && !sess.is_empty() {
599 let mut states = ctx.voice_states.lock().await;
600 let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
601 token: String::new(),
602 endpoint: String::new(),
603 session_id: None,
604 });
605 entry.session_id = Some(sess);
606 }
607 }
608 "VOICE_SERVER_UPDATE" => {
609 let token = data["token"].as_str().unwrap_or("").to_string();
610 let endpoint = data["endpoint"].as_str().unwrap_or("").to_string();
611 let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
612 if !guild_id.is_empty() && !token.is_empty() && !endpoint.is_empty() {
613 let mut states = ctx.voice_states.lock().await;
614 let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
615 token: String::new(),
616 endpoint: String::new(),
617 session_id: None,
618 });
619 entry.token = token;
620 entry.endpoint = if endpoint.starts_with("wss://")
621 || endpoint.starts_with("https://")
622 {
623 endpoint
624 } else {
625 format!("wss://{}", endpoint)
626 };
627 }
628 }
629
630 "INTERACTION_CREATE"
631 | "SESSIONS_REPLACE"
632 | "STAGE_INSTANCE_CREATE"
633 | "STAGE_INSTANCE_UPDATE"
634 | "STAGE_INSTANCE_DELETE" => {}
635
636 other => {
637 eprintln!("[fluxer-rs] Unknown event: {}", other);
638 }
639 }
640}