1use std::collections::HashMap;
4use std::sync::Arc;
5use futures::{SinkExt, StreamExt};
6use serde_json::Value;
7use tokio::sync::Mutex;
8use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
9use crate::cache::Cache;
10use crate::error::ClientError;
11use crate::event::EventHandler;
12use crate::http::Http;
13use crate::model::voice::VoiceState;
14use std::time::Duration;
15
16const DEFAULT_API_URL: &str = "https://api.fluxer.app/v1";
17const DEFAULT_GATEWAY_URL: &str = "wss://gateway.fluxer.app/?v=1&encoding=json";
18
19enum LoopControl {
20 Done,
21 Reconnect { resume: bool },
22}
23
24#[derive(Clone)]
34pub struct Context {
35 pub http: Arc<Http>,
37 pub cache: Arc<Cache>,
39 pub gateway_tx: Arc<tokio::sync::mpsc::Sender<String>>,
42 pub voice_states: Arc<Mutex<HashMap<String, VoiceState>>>,
45 pub(crate) live_rooms: Arc<Mutex<HashMap<String, std::sync::Arc<livekit::Room>>>>,
46}
47
48impl Context {
49 pub async fn join_voice(
52 &self,
53 guild_id: &str,
54 channel_id: &str,
55 ) -> Result<crate::voice::FluxerVoiceConnection, ClientError> {
56 {
57 let mut states = self.voice_states.lock().await;
58 states.remove(guild_id);
59 }
60
61 let join_payload = serde_json::json!({
62 "op": 4,
63 "d": {
64 "guild_id": guild_id,
65 "channel_id": channel_id,
66 "self_mute": false,
67 "self_deaf": false
68 }
69 });
70 self.gateway_tx
71 .send(join_payload.to_string())
72 .await
73 .map_err(|e| ClientError::Voice(e.to_string()))?;
74
75 let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
76 let voice_state = loop {
77 {
78 let states = self.voice_states.lock().await;
79 if let Some(vs) = states.get(guild_id) {
80 if !vs.token.is_empty() && !vs.endpoint.is_empty() {
81 break vs.clone();
82 }
83 }
84 }
85 if tokio::time::Instant::now() >= deadline {
86 return Err(ClientError::Voice(
87 "Timed out waiting for VOICE_SERVER_UPDATE".into(),
88 ));
89 }
90 tokio::time::sleep(Duration::from_millis(100)).await;
91 };
92
93 let conn = crate::voice::FluxerVoiceConnection::connect(
94 &voice_state.endpoint,
95 &voice_state.token,
96 )
97 .await
98 .map_err(|e| ClientError::Voice(e.to_string()))?;
99
100 self.live_rooms.lock().await.insert(guild_id.to_string(), conn.room.clone());
101
102 Ok(conn)
103 }
104
105 pub async fn leave_voice(&self, guild_id: &str) -> Result<(), ClientError> {
107 if let Some(room) = self.live_rooms.lock().await.remove(guild_id) {
108 let _ = room.close().await;
109 }
110
111 let payload = serde_json::json!({
112 "op": 4,
113 "d": {
114 "guild_id": guild_id,
115 "channel_id": null,
116 "self_mute": false,
117 "self_deaf": false
118 }
119 });
120 self.gateway_tx
121 .send(payload.to_string())
122 .await
123 .map_err(|e| ClientError::Voice(e.to_string()))?;
124 self.voice_states.lock().await.remove(guild_id);
125 Ok(())
126 }
127}
128
129pub struct ClientBuilder {
142 token: String,
143 api_url: String,
144 handler: Option<Arc<dyn EventHandler>>,
145}
146
147impl ClientBuilder {
148 pub fn new(token: impl Into<String>) -> Self {
149 Self {
150 token: token.into(),
151 api_url: DEFAULT_API_URL.to_string(),
152 handler: None,
153 }
154 }
155
156 pub fn event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
158 self.handler = Some(Arc::new(handler));
159 self
160 }
161
162 pub fn api_url(mut self, url: impl Into<String>) -> Self {
164 self.api_url = url.into();
165 self
166 }
167
168 pub fn build(self) -> Client {
169 let _ = rustls::crypto::ring::default_provider().install_default();
170 let http = Arc::new(Http::new(&self.token, self.api_url));
171 Client {
172 http,
173 cache: Cache::new(),
174 handler: self.handler.expect("call .event_handler() before .build()"),
175 }
176 }
177}
178
179pub struct Client {
186 pub(crate) http: Arc<Http>,
187 cache: Arc<Cache>,
188 handler: Arc<dyn EventHandler>,
189}
190
191impl Client {
192 pub fn builder(token: impl Into<String>) -> ClientBuilder {
193 ClientBuilder::new(token)
194 }
195
196 pub async fn start(&mut self) -> Result<(), ClientError> {
199 let mut session_id: Option<String> = None;
200 let mut resume_url: Option<String> = None;
201 let mut last_seq: Option<u64> = None;
202 let mut backoff = Duration::from_secs(1);
203
204 loop {
205 let result = self
206 .run_session(&mut session_id, &mut resume_url, &mut last_seq)
207 .await;
208
209 match result {
210 Ok(LoopControl::Done) => return Ok(()),
211
212 Ok(LoopControl::Reconnect { resume }) => {
213 if !resume {
214 session_id = None;
215 resume_url = None;
216 last_seq = None;
217 let jitter = Duration::from_millis(1000 + (rand::random::<u64>() % 4000));
218 tokio::time::sleep(jitter).await;
219 } else {
220 eprintln!("[fluxer-rs] Reconnecting in {:?} (will resume)...", backoff);
221 tokio::time::sleep(backoff).await;
222 backoff = (backoff * 2).min(Duration::from_secs(60));
223 continue;
224 }
225 }
226
227 Err(ClientError::ConnectionClosed) => {
228 eprintln!("[fluxer-rs] Connection closed, reconnecting in {:?}...", backoff);
229 tokio::time::sleep(backoff).await;
230 backoff = (backoff * 2).min(Duration::from_secs(60));
231 continue;
232 }
233
234 Err(e) => return Err(e),
235 }
236
237 backoff = Duration::from_secs(1);
238 }
239 }
240
241 async fn run_session(
242 &self,
243 session_id: &mut Option<String>,
244 resume_url: &mut Option<String>,
245 last_seq: &mut Option<u64>,
246 ) -> Result<LoopControl, ClientError> {
247 let gateway_url = if session_id.is_some() {
248 resume_url
249 .clone()
250 .unwrap_or_else(|| DEFAULT_GATEWAY_URL.to_string())
251 } else {
252 match self.http.get_gateway().await {
253 Ok(url) => {
254 let base = url.trim_end_matches('/');
255 format!("{}/?v=1&encoding=json", base)
256 }
257 Err(_) => DEFAULT_GATEWAY_URL.to_string(),
258 }
259 };
260
261 let (ws_stream, _) = connect_async(&gateway_url).await?;
262 let (write, mut read) = ws_stream.split();
263 let write = Arc::new(Mutex::new(write));
264 let seq_shared: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(*last_seq));
265 let ack_shared: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
266 let (gateway_tx, mut gateway_rx) = tokio::sync::mpsc::channel::<String>(64);
267 {
268 let write_fwd = write.clone();
269 tokio::spawn(async move {
270 while let Some(msg) = gateway_rx.recv().await {
271 let mut guard = write_fwd.lock().await;
272 if guard
273 .send(WsMessage::Text(msg.into()))
274 .await
275 .is_err()
276 {
277 break;
278 }
279 }
280 });
281 }
282
283 let ctx = Context {
284 http: self.http.clone(),
285 cache: self.cache.clone(),
286 gateway_tx: Arc::new(gateway_tx),
287 voice_states: Arc::new(Mutex::new(HashMap::new())),
288 live_rooms: Arc::new(Mutex::new(HashMap::new())),
289 };
290
291 let token = self.http.get_token().to_string();
292 if let (Some(sid), Some(seq)) = (session_id.as_deref(), *last_seq) {
293 let resume_payload = serde_json::json!({
294 "op": 6,
295 "d": { "token": token, "session_id": sid, "seq": seq }
296 });
297 write
298 .lock()
299 .await
300 .send(WsMessage::Text(resume_payload.to_string().into()))
301 .await?;
302 } else {
303 let identify = serde_json::json!({
304 "op": 2,
305 "d": {
306 "token": token,
307 "intents": 0, "properties": {
309 "os": "linux",
310 "browser": "fluxer-rust",
311 "device": "fluxer-rust"
312 }
313 }
314 });
315 write
316 .lock()
317 .await
318 .send(WsMessage::Text(identify.to_string().into()))
319 .await?;
320 }
321
322 let handler = self.handler.clone();
323
324 while let Some(msg_result) = read.next().await {
325 let text = match msg_result? {
326 WsMessage::Text(t) => t,
327 WsMessage::Close(frame) => {
328 let code = frame.as_ref().map(|f| u16::from(f.code)).unwrap_or(0);
329 match code {
330 4004 => {
331 eprintln!("[fluxer-rs] Authentication failed (4004) — invalid token, shutting down.");
332 return Ok(LoopControl::Done);
333 }
334 4010 => {
335 eprintln!("[fluxer-rs] Invalid shard (4010) — shutting down.");
336 return Ok(LoopControl::Done);
337 }
338 4011 => {
339 eprintln!("[fluxer-rs] Sharding required (4011) — shutting down.");
340 return Ok(LoopControl::Done);
341 }
342 4012 => {
343 eprintln!("[fluxer-rs] Invalid API version (4012) — shutting down.");
344 return Ok(LoopControl::Done);
345 }
346 _ => return Err(ClientError::ConnectionClosed),
347 }
348 }
349 WsMessage::Ping(d) => {
350 let _ = write.lock().await.send(WsMessage::Pong(d)).await;
351 continue;
352 }
353 _ => continue,
354 };
355
356 let payload: Value = serde_json::from_str(text.as_str())?;
357 let op = payload["op"].as_u64().unwrap_or(255);
358
359 if let Some(s) = payload["s"].as_u64() {
360 *last_seq = Some(s);
361 *seq_shared.lock().await = Some(s);
362 }
363
364 match op {
365 10 => {
366 let interval_ms = payload["d"]["heartbeat_interval"]
367 .as_u64()
368 .unwrap_or(41_250);
369
370 let write_hb = write.clone();
371 let seq_hb = seq_shared.clone();
372 let ack_hb = ack_shared.clone();
373
374 tokio::spawn(async move {
375 let jitter = Duration::from_millis(
376 (rand::random::<u64>() % interval_ms).max(1),
377 );
378 tokio::time::sleep(jitter).await;
379
380 let mut ticker =
381 tokio::time::interval(Duration::from_millis(interval_ms));
382 loop {
383 ticker.tick().await;
384
385 {
386 let mut ack = ack_hb.lock().await;
387 if !*ack {
388 eprintln!(
389 "[fluxer-rs] No heartbeat ACK — zombie connection, dropping."
390 );
391 break;
392 }
393 *ack = false;
394 }
395
396 let seq = *seq_hb.lock().await;
397 let hb = serde_json::json!({ "op": 1, "d": seq });
398 let mut guard = write_hb.lock().await;
399 if guard
400 .send(WsMessage::Text(hb.to_string().into()))
401 .await
402 .is_err()
403 {
404 break;
405 }
406 }
407 });
408 }
409
410 11 => {
411 *ack_shared.lock().await = true;
412 }
413
414 0 => {
415 let event_type = payload["t"].as_str().unwrap_or("").to_string();
416 let data = payload["d"].clone();
417 let ctx2 = ctx.clone();
418 let handler2 = handler.clone();
419
420 if event_type == "READY" {
421 if let Some(sid) = data["session_id"].as_str() {
422 *session_id = Some(sid.to_string());
423 }
424 if let Some(rurl) = data["resume_gateway_url"].as_str() {
425 *resume_url = Some(format!(
426 "{}/?v=1&encoding=json",
427 rurl.trim_end_matches('/')
428 ));
429 }
430 }
431
432 tokio::spawn(async move {
433 cache_update(&ctx2.cache, &event_type, &data).await;
434 dispatch_event(event_type, data, ctx2, handler2).await;
435 });
436 }
437
438 7 => {
439 eprintln!("[fluxer-rs] Received op 7 Reconnect.");
440 return Ok(LoopControl::Reconnect { resume: true });
441 }
442
443 9 => {
444 let resumable = payload["d"].as_bool().unwrap_or(false);
445 eprintln!("[fluxer-rs] Invalid session (resumable={resumable}).");
446 return Ok(LoopControl::Reconnect { resume: resumable });
447 }
448
449 1 => {
450 let seq = *seq_shared.lock().await;
451 let hb = serde_json::json!({ "op": 1, "d": seq });
452 let _ = write
453 .lock()
454 .await
455 .send(WsMessage::Text(hb.to_string().into()))
456 .await;
457 }
458
459 _ => {}
460 }
461 }
462
463 Err(ClientError::ConnectionClosed)
464 }
465}
466
467async fn cache_update(cache: &Cache, event_type: &str, data: &Value) {
468 match event_type {
469 "READY" => {
470 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
471 *cache.current_user.write().await = Some(user);
472 }
473 }
474 "GUILD_CREATE" => {
475 if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
476 let guild_id = guild.id.clone();
477
478 if let Some(channels) = data["channels"].as_array() {
479 let mut ch_map = cache.channels.write().await;
480 for ch_val in channels {
481 if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(ch_val.clone()) {
482 ch_map.insert(ch.id.clone(), ch);
483 }
484 }
485 }
486
487 if let Some(members) = data["members"].as_array() {
488 let mut user_map = cache.users.write().await;
489 for m_val in members {
490 if let Ok(user) = serde_json::from_value::<crate::model::User>(m_val["user"].clone()) {
491 user_map.insert(user.id.clone(), user);
492 }
493 }
494 }
495 cache.guilds.write().await.insert(guild_id, guild);
496 }
497 }
498 "GUILD_UPDATE" => {
499 if let Ok(guild) = serde_json::from_value::<crate::model::Guild>(data.clone()) {
500 cache.guilds.write().await.insert(guild.id.clone(), guild);
501 }
502 }
503 "GUILD_DELETE" => {
504 if let Some(id) = data["id"].as_str() {
505 cache.guilds.write().await.remove(id);
506 }
507 }
508 "CHANNEL_CREATE" | "CHANNEL_UPDATE" => {
509 if let Ok(ch) = serde_json::from_value::<crate::model::Channel>(data.clone()) {
510 cache.channels.write().await.insert(ch.id.clone(), ch);
511 }
512 }
513 "CHANNEL_DELETE" => {
514 if let Some(id) = data["id"].as_str() {
515 cache.channels.write().await.remove(id);
516 }
517 }
518 "GUILD_MEMBER_ADD" | "GUILD_MEMBER_UPDATE" => {
519 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["user"].clone()) {
520 cache.users.write().await.insert(user.id.clone(), user);
521 }
522 }
523 "MESSAGE_CREATE" => {
524 if let Ok(user) = serde_json::from_value::<crate::model::User>(data["author"].clone()) {
525 cache.users.write().await.insert(user.id.clone(), user);
526 }
527 }
528 _ => {}
529 }
530}
531
532async fn dispatch_event(
533 event_type: String,
534 data: Value,
535 ctx: Context,
536 handler: Arc<dyn EventHandler>,
537) {
538 use crate::model::{
539 Channel, ChannelPinsUpdate, ChannelUpdateBulk, Guild, GuildBanAdd, GuildBanRemove,
540 GuildEmojisUpdate, GuildMemberAdd, GuildMemberRemove, GuildMemberUpdate,
541 GuildRoleCreate, GuildRoleDelete, GuildRoleUpdate, GuildRoleUpdateBulk,
542 GuildStickersUpdate, InviteCreate, InviteDelete, WebhooksUpdate,
543 Message, MessageDelete, MessageDeleteBulk, MessageUpdate, ReactionAdd,
544 ReactionRemove, ReactionRemoveAll, ReactionRemoveEmoji, Ready, TypingStart,
545 UnavailableGuild,
546 };
547 use crate::model::voice::VoiceState;
548
549 macro_rules! dispatch {
550 ($method:ident, $ty:ty) => {{
551 match serde_json::from_value::<$ty>(data.clone()) {
552 Ok(v) => handler.$method(ctx, v).await,
553 Err(e) => eprintln!(
554 "[fluxer-rs] Failed to deserialize {} event: {}",
555 stringify!($ty),
556 e
557 ),
558 }
559 }};
560 }
561
562 match event_type.as_str() {
563 "READY" => dispatch!(on_ready, Ready),
564 "RESUMED" => eprintln!("[fluxer-rs] Session resumed successfully."),
565 "MESSAGE_CREATE" => dispatch!(on_message, Message),
566 "MESSAGE_UPDATE" => dispatch!(on_message_update, MessageUpdate),
567 "MESSAGE_DELETE" => dispatch!(on_message_delete, MessageDelete),
568 "MESSAGE_DELETE_BULK" => dispatch!(on_message_delete_bulk, MessageDeleteBulk),
569 "MESSAGE_REACTION_ADD" => dispatch!(on_reaction_add, ReactionAdd),
570 "MESSAGE_REACTION_REMOVE" => dispatch!(on_reaction_remove, ReactionRemove),
571 "MESSAGE_REACTION_REMOVE_ALL" => dispatch!(on_reaction_remove_all, ReactionRemoveAll),
572 "MESSAGE_REACTION_REMOVE_EMOJI" => dispatch!(on_reaction_remove_emoji, ReactionRemoveEmoji),
573 "TYPING_START" => dispatch!(on_typing_start, TypingStart),
574 "CHANNEL_CREATE" => dispatch!(on_channel_create, Channel),
575 "CHANNEL_UPDATE" => dispatch!(on_channel_update, Channel),
576 "CHANNEL_DELETE" => dispatch!(on_channel_delete, Channel),
577 "CHANNEL_PINS_UPDATE" => dispatch!(on_channel_pins_update, ChannelPinsUpdate),
578 "GUILD_CREATE" => dispatch!(on_guild_create, Guild),
579 "GUILD_UPDATE" => dispatch!(on_guild_update, Guild),
580 "GUILD_DELETE" => dispatch!(on_guild_delete, UnavailableGuild),
581 "GUILD_MEMBER_ADD" => dispatch!(on_guild_member_add, GuildMemberAdd),
582 "GUILD_MEMBER_UPDATE" => dispatch!(on_guild_member_update, GuildMemberUpdate),
583 "GUILD_MEMBER_REMOVE" => dispatch!(on_guild_member_remove, GuildMemberRemove),
584 "GUILD_BAN_ADD" => dispatch!(on_guild_ban_add, GuildBanAdd),
585 "GUILD_BAN_REMOVE" => dispatch!(on_guild_ban_remove, GuildBanRemove),
586 "GUILD_ROLE_CREATE" => dispatch!(on_guild_role_create, GuildRoleCreate),
587 "GUILD_ROLE_UPDATE" => dispatch!(on_guild_role_update, GuildRoleUpdate),
588 "GUILD_ROLE_UPDATE_BULK" => dispatch!(on_guild_role_update_bulk, GuildRoleUpdateBulk),
589 "GUILD_ROLE_DELETE" => dispatch!(on_guild_role_delete, GuildRoleDelete),
590 "GUILD_EMOJIS_UPDATE" => dispatch!(on_guild_emojis_update, GuildEmojisUpdate),
591 "GUILD_STICKERS_UPDATE" => dispatch!(on_guild_stickers_update, GuildStickersUpdate),
592 "CHANNEL_UPDATE_BULK" => dispatch!(on_channel_update_bulk, ChannelUpdateBulk),
593 "INVITE_CREATE" => dispatch!(on_invite_create, InviteCreate),
594 "INVITE_DELETE" => dispatch!(on_invite_delete, InviteDelete),
595 "WEBHOOKS_UPDATE" => dispatch!(on_webhooks_update, WebhooksUpdate),
596 "VOICE_STATE_UPDATE" => {
597 let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
598 let sess = data["session_id"].as_str().unwrap_or("").to_string();
599 if !guild_id.is_empty() && !sess.is_empty() {
600 let mut states = ctx.voice_states.lock().await;
601 let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
602 token: String::new(),
603 endpoint: String::new(),
604 session_id: None,
605 });
606 entry.session_id = Some(sess);
607 }
608 }
609 "VOICE_SERVER_UPDATE" => {
610 let token = data["token"].as_str().unwrap_or("").to_string();
611 let endpoint = data["endpoint"].as_str().unwrap_or("").to_string();
612 let guild_id = data["guild_id"].as_str().unwrap_or("").to_string();
613 if !guild_id.is_empty() && !token.is_empty() && !endpoint.is_empty() {
614 let mut states = ctx.voice_states.lock().await;
615 let entry = states.entry(guild_id).or_insert_with(|| VoiceState {
616 token: String::new(),
617 endpoint: String::new(),
618 session_id: None,
619 });
620 entry.token = token;
621 entry.endpoint = if endpoint.starts_with("wss://")
622 || endpoint.starts_with("https://")
623 {
624 endpoint
625 } else {
626 format!("wss://{}", endpoint)
627 };
628 }
629 }
630
631 "INTERACTION_CREATE"
632 | "SESSIONS_REPLACE"
633 | "STAGE_INSTANCE_CREATE"
634 | "STAGE_INSTANCE_UPDATE"
635 | "STAGE_INSTANCE_DELETE" => {}
636
637 other => {
638 eprintln!("[fluxer-rs] Unknown event: {}", other);
639 }
640 }
641}