crystal_server/
client.rs

1use crate::types::NewSync;
2
3use super::{
4    buffer::Buffer,
5    leb::Leb,
6    types::{
7        self, Achievement, AdminAction, Administrator, CallbackServerUpdate, ChangeFriendStatus,
8        DataUpdate, FetchBdbServerUpdate, Highscore, LoginCode, LoginPassw, NewSyncQueue,
9        OptionalValue, Player, PlayerQueue, PlayerRequest, PlayerVariableServerUpdate,
10        RegistrationCode, SelfSync, ServerUpdateCallback, SyncEvent, SyncIter, SyncType,
11        SyncUpdate, SyncVariableServerUpdate, Value, VariableUpdate,
12    },
13};
14use age::{
15    x25519::{Identity, Recipient},
16    Decryptor, Encryptor,
17};
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use futures_util::{
21    stream::{SplitSink, SplitStream},
22    AsyncReadExt, AsyncWriteExt, SinkExt, Stream, StreamExt,
23};
24use integer_hasher::{IntMap, IntSet};
25use machineid_crystal::{Encryption, HWIDComponent, IdBuilder};
26use num_enum::TryFromPrimitive;
27use std::{
28    collections::{HashMap, HashSet},
29    io::{Cursor, Error, ErrorKind, Result as IoResult},
30    iter,
31    str::FromStr,
32    sync::Arc,
33};
34use tokio::{
35    net::TcpStream,
36    sync::{Mutex, RwLock},
37    task::JoinHandle,
38    time::Instant,
39};
40use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
41#[cfg(feature = "__dev")]
42use tracing::{info, warn};
43
44/// A "bridge" between a server and the client.
45///
46/// First, you need to define (and connect to at any time)
47/// ```rust
48/// let mut cs = CrystalServer::init("your-game-id");
49/// // Setup here things like game version, callbacks, etc...
50/// cs.connect().await?; // Connecting requires a mutable reference
51/// ```
52/// Since you need to continuously call the cs.update() function,
53/// you can do this on a separate async task, but you need to do so
54/// after connecting to the server:
55/// ```rust
56/// let cs = Arc::new(cs);
57/// {
58///     let cs = cs.clone();
59///     tokio::spawn(async {
60///         loop {
61///             cs.update().await?; // Update client data...
62///             tokio::time::sleep(Duration::from_secs_f64(1.0 / 60.0)).await;
63///         }
64///     });
65/// }
66/// ```
67/// Or in a single rust task, which is easier to handle:
68/// ```rust
69/// // On a new frame
70/// cs.update().await?;
71/// ```
72/// <div class="warning">
73/// You shouldn't update this faster than once in a frame, this may have unintended consequences
74/// on the network usage of your game.
75/// </div>
76///
77/// After making sure you're always triggering this function once in a frame (at most), you may do
78/// anything else with the other functions.
79pub struct CrystalServer {
80    writer: Option<Arc<Mutex<StreamWriter>>>,
81    data: Arc<RwLock<StreamData>>,
82}
83
84struct StreamHandler;
85
86struct StreamReader {
87    stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
88
89    pub identity: Option<Identity>,
90}
91
92struct StreamWriter {
93    stream: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
94
95    pub recipient: Option<Recipient>,
96}
97
98type CallbackRoom = Box<dyn FnMut() -> String + Sync + Send>;
99type CallbackP2P = Box<dyn FnMut(Option<u64>, i16, Vec<Value>) + Sync + Send>;
100type CallbackRegister = Box<dyn FnMut(RegistrationCode) + Sync + Send>;
101type CallbackLogin = Box<dyn FnMut(LoginCode, Option<DateTime<Utc>>, Option<String>) + Sync + Send>;
102type CallbackBanned = Box<dyn FnMut(String, DateTime<Utc>) + Sync + Send>;
103type CallbackKicked = Box<dyn FnMut(String) + Sync + Send>;
104type CallbackDisconnected = Box<dyn FnMut() + Sync + Send>;
105type CallbackLoginToken = Box<dyn FnMut(String) + Sync + Send>;
106type CallbackDataUpdate = Box<dyn FnMut(DataUpdate) + Sync + Send>;
107
108#[derive(Default)]
109struct StreamData {
110    thread: Option<JoinHandle<()>>,
111    last_host: Option<String>,
112
113    is_connected: bool,
114    is_loggedin: bool,
115    is_connecting: bool,
116    is_reconnecting: bool,
117
118    game_id: String,
119    version: f64,
120    session: String,
121    game_token: String,
122    room: String,
123
124    func_room: Option<CallbackRoom>,
125    func_p2p: Option<CallbackP2P>,
126    func_register: Option<CallbackRegister>,
127    func_login: Option<CallbackLogin>,
128    func_banned: Option<CallbackBanned>,
129    func_kicked: Option<CallbackKicked>,
130    func_disconnected: Option<CallbackDisconnected>,
131    func_login_token: Option<CallbackLoginToken>,
132    func_data_update: Option<CallbackDataUpdate>,
133
134    player_id: Option<u64>,
135    player_name: Option<String>,
136    player_save: HashMap<String, Value>,
137    player_open_save: String,
138    player_friends: IntSet<u64>,
139    player_incoming_friends: IntSet<u64>,
140    player_outgoing_friends: IntSet<u64>,
141
142    game_save: HashMap<String, Value>,
143    game_open_save: String,
144    game_achievements: IntMap<Leb<u64>, Achievement>,
145    game_highscores: IntMap<Leb<u64>, Highscore>,
146    game_administrators: IntMap<Leb<u64>, Administrator>,
147    game_version: f64,
148
149    pub players: IntMap<u64, Player>,
150    players_logout: IntSet<u64>,
151    player_queue: IntMap<u64, PlayerQueue>,
152    variables: HashMap<String, Value>,
153    syncs: Vec<Option<SelfSync>>,
154    syncs_remove: Vec<usize>,
155
156    ping: f64,
157    last_ping: Option<Instant>,
158
159    new_sync_queue: Vec<NewSyncQueue>,
160    update_variable: HashSet<String>,
161    update_playerini: HashSet<String>,
162    update_gameini: HashSet<String>,
163    call_disconnected: bool,
164
165    callback_server_update: IntMap<u64, Option<CallbackServerUpdate>>,
166    callback_server_index: u64,
167
168    handshake_completed: bool,
169
170    registered_errors: Vec<ClientError>,
171}
172
173#[allow(dead_code)]
174#[derive(Debug)]
175enum ReaderError {
176    StreamError(String),
177    StreamEmpty(String),
178    StreamClosed(String),
179    Unknown(String),
180}
181
182#[allow(dead_code)]
183#[derive(Debug)]
184enum WriterError {
185    StreamError(String),
186    StreamClosed(String),
187    Unknown(String),
188}
189
190#[allow(dead_code, clippy::enum_variant_names)]
191#[derive(Debug)]
192enum ClientError {
193    HandlerResult(IoResult<()>),
194    HandlerResultString(String),
195    HandlerPanic(String),
196}
197
198macro_rules! unwrap_return {
199    ($value: expr, $return: expr) => {
200        if $value.is_err() {
201            return $return;
202        }
203    };
204    ($value: expr) => {
205        if $value.is_err() {
206            return;
207        }
208    };
209}
210
211impl StreamData {
212    pub async fn clear(&mut self, full: bool) {
213        self.is_loggedin = false;
214        self.call_disconnected = true;
215
216        self.player_name.take();
217        self.player_id.take();
218        self.player_save.clear();
219        self.player_open_save.clear();
220        self.player_queue.clear();
221        self.players_logout.clear();
222        self.new_sync_queue.clear();
223        self.update_variable.clear();
224        self.update_playerini.clear();
225        self.callback_server_update.clear();
226        self.callback_server_index = 0;
227        self.players.clear();
228
229        if full {
230            self.is_connecting = false;
231            self.is_connected = false;
232            self.is_reconnecting = false;
233
234            self.game_save.clear();
235            self.game_open_save.clear();
236            self.game_achievements.clear();
237            self.game_highscores.clear();
238            self.game_administrators.clear();
239            self.update_gameini.clear();
240
241            self.last_ping.take();
242            self.handshake_completed = false;
243        }
244    }
245}
246
247impl StreamHandler {
248    pub async fn split_stream(
249        stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
250    ) -> (StreamReader, StreamWriter) {
251        let split = stream.split();
252        (
253            StreamReader {
254                stream: Some(split.1),
255
256                identity: None,
257            },
258            StreamWriter {
259                stream: Some(split.0),
260
261                recipient: None,
262            },
263        )
264    }
265}
266
267impl StreamReader {
268    #[inline(always)]
269    pub async fn read(&mut self) -> Result<Buffer, ReaderError> {
270        if let Some(stream) = self.stream.as_mut() {
271            if let Some(Ok(frame)) = stream.next().await {
272                if frame.is_binary() {
273                    let data = frame.into_data();
274                    if !data.is_empty() {
275                        if let Some(identity) = &self.identity {
276                            let len = data.len();
277                            if let Ok(decryptor) =
278                                Decryptor::new_async_buffered(data.to_vec().as_slice()).await
279                            {
280                                if let Ok(mut reader) =
281                                    decryptor.decrypt_async(iter::once(identity as _))
282                                {
283                                    let mut output = Vec::with_capacity(len);
284                                    if reader.read_to_end(&mut output).await.is_ok() {
285                                        Ok(Buffer::new(Cursor::new(output)))
286                                    } else {
287                                        Err(ReaderError::Unknown(String::from(
288                                            "decryptor reader errored out while reading",
289                                        )))
290                                    }
291                                } else {
292                                    Err(ReaderError::Unknown(String::from(
293                                        "unable to make decryptor reader",
294                                    )))
295                                }
296                            } else {
297                                Err(ReaderError::Unknown(String::from(
298                                    "unable to make decryptor",
299                                )))
300                            }
301                        } else {
302                            Ok(Buffer::new(Cursor::new(data.to_vec())))
303                        }
304                    } else {
305                        Err(ReaderError::StreamEmpty(format!(
306                            "tried to read {} byte(s) from ws",
307                            data.len(),
308                        )))
309                    }
310                } else if frame.is_close() {
311                    Err(ReaderError::StreamClosed(String::from(
312                        "ws stream requested to close",
313                    )))
314                } else if frame.is_ping() {
315                    Ok(Buffer::empty())
316                } else {
317                    Err(ReaderError::Unknown(format!(
318                        "obtained an unexpected code for ws: {frame:?}",
319                    )))
320                }
321            } else {
322                Err(ReaderError::StreamClosed(String::from(
323                    "unable to obtain the next ws frame",
324                )))
325            }
326        } else {
327            Err(ReaderError::Unknown(String::from(
328                "no stream open to read from",
329            )))
330        }
331    }
332
333    /*#[inline(always)]
334    pub async fn shutdown(&mut self) {
335        self.stream = None;
336    }*/
337}
338
339impl StreamWriter {
340    #[inline(always)]
341    pub async fn prepare_buffer(&self, mut buffer: Buffer) -> IoResult<Buffer> {
342        if let Some(recipient) = &self.recipient {
343            let encryptor = Encryptor::with_recipients(iter::once(recipient as _))
344                .expect("unable to make encryptor");
345            let mut output = Vec::with_capacity(buffer.len()? as usize);
346            let mut writer = encryptor.wrap_async_output(&mut output).await?;
347            writer.write_all(buffer.container.get_ref()).await?;
348            writer.flush().await?;
349            writer.close().await?;
350            buffer.container = Cursor::new(output);
351        }
352        Ok(buffer)
353    }
354
355    #[inline(always)]
356    pub async fn write(&mut self, data: Buffer) -> Result<(), WriterError> {
357        if let Ok(mut data) = self.prepare_buffer(data).await {
358            self.write_raw(&mut data).await
359        } else {
360            Err(WriterError::StreamError(String::from(
361                "unable to fetch new buffer",
362            )))
363        }
364    }
365
366    #[inline(always)]
367    pub async fn write_raw(&mut self, data: &mut Buffer) -> Result<(), WriterError> {
368        /*#[cfg(feature = "__dev")]
369        info!("wrote data: {:?}", data.container.get_ref().to_str_lossy());*/
370        let data = {
371            let mut d = Vec::new();
372            unwrap_return!(
373                data.read_all(&mut d),
374                Err(WriterError::Unknown(String::from(
375                    "unable to convert buffer into bytes"
376                )))
377            );
378            d
379        };
380        if let Some(stream) = self.stream.as_mut() {
381            unwrap_return!(
382                stream
383                    .send(Message::Binary(Bytes::copy_from_slice(&data)))
384                    .await,
385                Err(WriterError::StreamError(format!(
386                    "unable to write {:?} byte(s) to a ws",
387                    data.len(),
388                )))
389            );
390            unwrap_return!(
391                stream.flush().await,
392                Err(WriterError::StreamError(String::from(
393                    "unable to flush the writer of a ws {:?}",
394                )))
395            );
396            Ok(())
397        } else {
398            Err(WriterError::Unknown(String::from(
399                "no stream open to write to",
400            )))
401        }
402    }
403
404    #[inline(always)]
405    pub async fn write_pong(&mut self) -> IoResult<()> {
406        if let Some(stream) = self.stream.as_mut() {
407            if stream.send(Message::Pong(Bytes::new())).await.is_err() {
408                Err(Error::from(ErrorKind::BrokenPipe))
409            } else {
410                Ok(())
411            }
412        } else {
413            Err(Error::from(ErrorKind::BrokenPipe))
414        }
415    }
416
417    #[inline(always)]
418    pub async fn shutdown(&mut self) {
419        if let Some(mut stream) = self.stream.take() {
420            let _ = stream.close().await;
421        }
422    }
423}
424
425#[derive(Debug, Clone)]
426enum ReadPacket {
427    /// Registration Code
428    Registration(RegistrationCode),
429    /// Login Code
430    Login(LoginCode),
431    /// Login Code, Reason, Unban time
432    LoginBan(LoginCode, String, i64),
433    /// Player ID, Player Name, Token, Savefile, Friends, Incoming Friends, Outgoing Friends, Game Achievements
434    LoginOk(
435        u64,
436        String,
437        Option<String>,
438        HashMap<String, Value>,
439        IntSet<Leb<u64>>,
440        IntSet<Leb<u64>>,
441        IntSet<Leb<u64>>,
442        IntMap<Leb<u64>, Achievement>,
443    ),
444    /// Player ID, Player Name, Player Variables, Player Syncs, Room
445    PlayerLoggedIn(
446        u64,
447        String,
448        HashMap<String, Value>,
449        Vec<Option<types::Sync>>,
450        String,
451    ),
452    /// Player ID
453    PlayerLoggedOut(u64),
454    /// Game Save, Game Achievements, Game Highscores, Game Administrators, Version
455    SyncGameInfo(
456        HashMap<String, Value>,
457        IntMap<Leb<u64>, Achievement>,
458        IntMap<Leb<u64>, Highscore>,
459        IntMap<Leb<u64>, Administrator>,
460        f64,
461    ),
462    /// Player ID or Server, Message ID, Data
463    P2P(Option<Leb<u64>>, i16, Vec<Value>),
464    /// Player ID, Variables
465    UpdatePlayerVariable(u64, Vec<VariableUpdate>),
466    /// Ping (ms)
467    Ping(Option<f64>),
468    ClearPlayers(),
469    /// Variables
470    GameIniWrite(Vec<VariableUpdate>),
471    /// Player ID, Vec<Slot, Kind, Type, Variables>
472    NewSync(u64, Vec<(u64, i16, SyncType, HashMap<String, Value>)>),
473    /// Player ID, Room
474    PlayerChangedRooms(u64, String),
475    /// Player ID, Sync Variables
476    UpdateSync(u64, Vec<SyncUpdate>),
477    /// Player ID, Highscore ID, Score
478    HighscoreUpdate(u64, u64, f64),
479    /// Player ID, Player Syncs, Player Variables
480    UpdatePlayerData(u64, Vec<Option<types::Sync>>, HashMap<String, Value>),
481    /// Callback Index, Variable
482    RequestPlayerVariable(u64, OptionalValue),
483    // Admin Action
484    AdminAction(AdminAction),
485    /// Callback Index, Variable
486    RequestSyncVariable(u64, OptionalValue),
487    /// Game Version
488    ChangeGameVersion(f64),
489    /// Player ID, Administrator
490    ModifyAdministrator(u64, Administrator),
491    /// Administrator ID
492    RemoveAdministrator(u64),
493    ForceDisconnection(),
494    /// Variables
495    PlayerIniWrite(Vec<VariableUpdate>),
496    /// Callback Index, BDB
497    RequestBdb(u64, Option<Vec<u8>>),
498    /// Change Friend Status, Player ID
499    ChangeFriendStatus(ChangeFriendStatus, u64),
500    /// Key
501    Handshake(Option<String>),
502    /// Message
503    ServerMessage(String),
504    /// Target Host
505    ChangeConnection(String),
506}
507
508#[derive(Debug, Clone)]
509#[doc(hidden)]
510enum WritePacket {
511    /// Hash, Lib Version, Device ID, Game ID, Game Version, Game Session
512    InitializationHandshake([u64; 4], u64, String, String, f64, String),
513    /// Username, Passw/Token, Game Token, Variables, Syncs, Room
514    Login(
515        String,
516        LoginPassw,
517        String,
518        HashMap<String, Value>,
519        Vec<Option<SelfSync>>,
520        String,
521    ),
522    /// Username, Email, Passw, Repeat Passw
523    Register(String, String, String, String),
524    /// Player ID, Callback Index, Variable Name
525    RequestPlayerVariable(PlayerRequest, u64, String),
526    /// Player ID, Message ID, Payload
527    P2P(PlayerRequest, i16, Vec<Value>),
528    /// Game Version
529    UpdateGameVersion(f64),
530    /// Game Session
531    UpdateGameSession(String),
532    /// Variables
533    UpdatePlayerVariable(Vec<VariableUpdate>),
534    Ping(),
535    /// Variables
536    GameIniWrite(Vec<VariableUpdate>),
537    /// Variables
538    PlayerIniWrite(Vec<VariableUpdate>),
539    /// Room
540    UpdateRoom(String),
541    /// Vec<Slot, Kind, Sync Type, Variables>
542    NewSync(Vec<(u64, i16, SyncType, HashMap<String, Value>)>),
543    /// Sync Update
544    UpdateSync(Vec<SyncUpdate>),
545    /// Achievement ID
546    UpdateAchievement(u64),
547    /// Highscore ID, Score
548    UpdateHighscore(u64, f64),
549    /// Admin Action, Player ID
550    AdminAction(AdminAction, u64),
551    /// Player ID, Callback Index, Sync Slot, Variable Name
552    RequestSyncVariable(u64, u64, u64, String),
553    Logout(),
554    /// Callback Index, BDB Name
555    RequestBdb(u64, String),
556    /// BDB Name, Data
557    SetBdb(String, Vec<u8>),
558    /// Change Friend Status, Player ID
559    RequestChangeFriendStatus(ChangeFriendStatus, u64),
560    /// Key
561    Handshake(String),
562}
563
564impl CrystalServer {
565    pub fn init(game_id: &str) -> Self {
566        Self {
567            writer: None,
568            data: Arc::new(RwLock::new(StreamData {
569                game_id: game_id.to_owned(),
570                ..Default::default()
571            })),
572        }
573    }
574
575    /// Try to establish a connection between the client and the server.
576    pub async fn connect(&mut self) {
577        #[cfg(feature = "__dev")]
578        info!("Connecting to the server");
579        {
580            #[cfg(feature = "__dev")]
581            info!("Fetching stream data");
582            let mut lock = self.data.write().await;
583            if let Some(thread) = &mut lock.thread {
584                #[cfg(feature = "__dev")]
585                info!("Aborting thread");
586                if !thread.is_finished() {
587                    thread.abort();
588                }
589            }
590            lock.clear(true).await;
591            lock.is_connecting = true;
592            #[cfg(feature = "__dev")]
593            info!("Stream data OK");
594        }
595        let url = if let Some(url) = self.data.read().await.last_host.clone() {
596            url
597        } else if cfg!(feature = "__local") {
598            String::from("ws://localhost:16562")
599        } else {
600            String::from("ws://server.crystal-server.co:16562")
601        };
602        #[cfg(feature = "__dev")]
603        info!("Connecting to server at {url}");
604        match tokio_tungstenite::connect_async(url).await {
605            Ok((ws, _)) => {
606                #[cfg(feature = "__dev")]
607                info!("Connection OK, initializing");
608                let stream = StreamHandler::split_stream(ws).await;
609                let writer = Arc::new(Mutex::new(stream.1));
610                self.writer = Some(writer.clone());
611                #[cfg(feature = "__dev")]
612                info!("Stream setup OK");
613                let thread =
614                    tokio::spawn(Self::stream_handler(stream.0, writer, self.data.clone()));
615                {
616                    let mut lock = self.data.write().await;
617                    lock.thread = Some(thread);
618                    #[cfg(feature = "__dev")]
619                    info!("Stream data thread OK");
620                }
621            }
622            Err(_e) => {
623                #[cfg(feature = "__dev")]
624                info!("Connection error: {_e:?}");
625                let mut dlock = self.data.write().await;
626                dlock.clear(true).await;
627                if dlock.call_disconnected {
628                    if let Some(func) = dlock.func_disconnected.as_mut() {
629                        func();
630                    }
631                    if let Some(dup) = dlock.func_data_update.as_mut() {
632                        dup(DataUpdate::Disconnected());
633                    }
634                    dlock.call_disconnected = false;
635                }
636                if dlock.last_host.take().is_some() {
637                    drop(dlock);
638                    Box::pin(self.connect()).await;
639                } else {
640                    dlock.is_connecting = false;
641                }
642            }
643        }
644    }
645
646    async fn stream_handler(
647        mut reader: StreamReader,
648        writer: Arc<Mutex<StreamWriter>>,
649        data: Arc<RwLock<StreamData>>,
650    ) {
651        let cdata = data.clone();
652        let cwriter = writer.clone();
653
654        let task = tokio::spawn(async move {
655            macro_rules! write_packet {
656                ($packet: expr) => {
657                    unwrap_return!(
658                        writer
659                            .lock()
660                            .await
661                            .write(Self::get_packet_write(&$packet)?)
662                            .await,
663                        Err(Error::from(ErrorKind::BrokenPipe))
664                    );
665                };
666            }
667
668            #[cfg(feature = "__dev")]
669            info!("Initialized stream handle task");
670
671            loop {
672                match reader.read().await {
673                    Ok(mut buffer) => {
674                        if buffer.is_empty()? {
675                            continue;
676                        }
677                        if let Ok(packet) = CrystalServer::get_packet_read(buffer) {
678                            #[cfg(feature = "__dev")]
679                            info!("reading packet: {packet:?}");
680                            match packet {
681                                ReadPacket::Handshake(key) => {
682                                    if let Some(key) = key {
683                                        let encode_read = Identity::generate();
684                                        let encodepub_read = encode_read.to_public();
685                                        reader.identity = Some(encode_read);
686                                        match Recipient::from_str(&key) {
687                                            Ok(key) => {
688                                                {
689                                                    let mut dlock = data.write().await;
690                                                    dlock.is_connecting = false;
691                                                    dlock.is_reconnecting = false;
692                                                    dlock.is_connected = true;
693                                                }
694                                                writer.lock().await.recipient = Some(key);
695                                            }
696                                            Err(_e) => {
697                                                #[cfg(feature = "__dev")]
698                                                warn!("error while parsing key: {_e:?}");
699                                                {
700                                                    let mut dlock = data.write().await;
701                                                    dlock.clear(true).await;
702                                                    dlock.registered_errors.push(
703                                                        ClientError::HandlerResultString(String::from(
704                                                            "unable to set write key for data writer",
705                                                        )),
706                                                    );
707                                                }
708                                                return Ok(());
709                                            }
710                                        }
711                                        write_packet!(WritePacket::Handshake(
712                                            encodepub_read.to_string(),
713                                        ));
714                                    } else {
715                                        let hwid = if let Ok(hwid) =
716                                            IdBuilder::new(Encryption::SHA256)
717                                                .add_component(HWIDComponent::CPUID)
718                                                .add_component(HWIDComponent::MacAddress)
719                                                .add_component(HWIDComponent::SystemID)
720                                                .build(None)
721                                        {
722                                            hwid
723                                        } else {
724                                            return Err(Error::from(ErrorKind::InvalidData));
725                                        };
726                                        let dlock = data.read().await;
727                                        write_packet!(WritePacket::InitializationHandshake(
728                                            [
729                                                0x3a0b1a04c51a2811,
730                                                0x97a18f1dc9ee891d,
731                                                0xfc7eb64f732a37fd,
732                                                0xbe65cbabde15c305,
733                                            ],
734                                            1,
735                                            hwid,
736                                            dlock.game_id.clone(),
737                                            dlock.version,
738                                            dlock.session.clone()
739                                        ));
740                                    }
741                                }
742                                ReadPacket::SyncGameInfo(
743                                    game_save,
744                                    game_achievements,
745                                    game_highscores,
746                                    game_administrators,
747                                    game_version,
748                                ) => {
749                                    let mut dlock = data.write().await;
750                                    dlock.game_save = game_save;
751                                    dlock.game_achievements = game_achievements;
752                                    dlock.game_highscores = game_highscores;
753                                    dlock.game_administrators = game_administrators;
754                                    dlock.game_version = game_version;
755                                    dlock.handshake_completed = true;
756                                }
757                                ReadPacket::Ping(ping) => {
758                                    if let Some(ping) = ping {
759                                        let mut dlock = data.write().await;
760                                        dlock.ping = ping;
761                                        dlock.last_ping = Some(Instant::now());
762                                    } else {
763                                        {
764                                            write_packet!(WritePacket::Ping());
765                                        }
766                                        writer.lock().await.write_pong().await?;
767                                    }
768                                }
769                                ReadPacket::ForceDisconnection() => {
770                                    return Err(Error::new(
771                                        ErrorKind::BrokenPipe,
772                                        "forced disconnection registered",
773                                    ));
774                                }
775                                ReadPacket::Registration(code) => {
776                                    let mut dlock = data.write().await;
777                                    if let Some(reg) = &mut dlock.func_register {
778                                        reg(code);
779                                    }
780                                    if let Some(dup) = &mut dlock.func_data_update {
781                                        dup(DataUpdate::Registration(code));
782                                    }
783                                }
784                                ReadPacket::Login(code) => {
785                                    let mut dlock = data.write().await;
786                                    if let Some(reg) = &mut dlock.func_login {
787                                        reg(code, None, None);
788                                    }
789                                    if let Some(dup) = &mut dlock.func_data_update {
790                                        dup(DataUpdate::Login(code));
791                                    }
792                                }
793                                ReadPacket::LoginOk(
794                                    pid,
795                                    pname,
796                                    token,
797                                    savefile,
798                                    friends,
799                                    incoming_friends,
800                                    outgoing_friends,
801                                    game_achievements,
802                                ) => {
803                                    let mut dlock = data.write().await;
804                                    dlock.player_id = Some(pid);
805                                    dlock.player_name = Some(pname.clone());
806                                    dlock.player_save = savefile;
807                                    dlock.game_achievements = game_achievements;
808                                    dlock.player_friends =
809                                        IntSet::from_iter(friends.iter().map(|pid| **pid));
810                                    dlock.player_incoming_friends =
811                                        IntSet::from_iter(incoming_friends.iter().map(|pid| **pid));
812                                    dlock.player_outgoing_friends =
813                                        IntSet::from_iter(outgoing_friends.iter().map(|pid| **pid));
814                                    dlock.is_loggedin = true;
815                                    if let Some(log) = &mut dlock.func_login {
816                                        log(LoginCode::Ok, None, None);
817                                    }
818                                    if let Some(dup) = &mut dlock.func_data_update {
819                                        dup(DataUpdate::LoginOk(pid, pname));
820                                    }
821                                    if let Some(token) = token {
822                                        if let Some(log) = &mut dlock.func_login_token {
823                                            log(token.clone());
824                                        }
825                                        if let Some(dup) = &mut dlock.func_data_update {
826                                            dup(DataUpdate::LoginToken(token));
827                                        }
828                                    }
829                                }
830                                ReadPacket::LoginBan(code, reason, unban_time) => {
831                                    let mut dlock = data.write().await;
832                                    if let Some(log) = &mut dlock.func_login {
833                                        log(
834                                            code,
835                                            DateTime::from_timestamp(unban_time, 0),
836                                            Some(reason.clone()),
837                                        );
838                                    }
839                                    if let Some(dup) = &mut dlock.func_data_update {
840                                        dup(DataUpdate::LoginBan(code, reason.clone(), unban_time));
841                                    }
842                                }
843                                ReadPacket::PlayerLoggedIn(pid, pname, vari, syncs, room) => {
844                                    let mut dlock = data.write().await;
845                                    dlock.players.insert(
846                                        pid,
847                                        Player {
848                                            name: pname.clone(),
849                                            room: room.clone(),
850                                            syncs,
851                                            variables: vari,
852                                        },
853                                    );
854                                    Self::iter_missing_data(&mut dlock, pid).await?;
855                                    if let Some(dup) = &mut dlock.func_data_update {
856                                        dup(DataUpdate::PlayerLoggedIn(pid, pname, room));
857                                    }
858                                }
859                                ReadPacket::PlayerLoggedOut(pid) => {
860                                    let mut dlock = data.write().await;
861                                    dlock.players.remove(&pid);
862                                    dlock.player_queue.remove(&pid);
863                                    if let Some(dup) = &mut dlock.func_data_update {
864                                        dup(DataUpdate::PlayerLoggedOut(pid));
865                                    }
866                                }
867                                ReadPacket::P2P(pid, mid, payload) => {
868                                    let mut dlock = data.write().await;
869                                    if let Some(p2p) = &mut dlock.func_p2p {
870                                        p2p(pid.map(|v| *v), mid, payload.clone());
871                                    }
872                                    if let Some(dup) = &mut dlock.func_data_update {
873                                        dup(DataUpdate::P2P(pid.map(|v| *v), mid, payload));
874                                    }
875                                }
876                                ReadPacket::UpdatePlayerVariable(pid, upds) => {
877                                    let mut dlock = data.write().await;
878                                    Self::iter_missing_data(&mut dlock, pid).await?;
879                                    if let Some(player) = dlock.players.get_mut(&pid) {
880                                        for upd in &upds {
881                                            if let OptionalValue::Some(value) = upd.value.clone() {
882                                                player
883                                                    .variables
884                                                    .insert(upd.name.clone(), value.clone());
885                                            } else {
886                                                player.variables.remove(&upd.name);
887                                            }
888                                        }
889                                        for upd in upds {
890                                            if let Some(dup) = &mut dlock.func_data_update {
891                                                dup(DataUpdate::UpdateVariable(
892                                                    pid, upd.name, upd.value,
893                                                ));
894                                            }
895                                        }
896                                    } else {
897                                        if !dlock.player_queue.contains_key(&pid) {
898                                            dlock
899                                                .player_queue
900                                                .entry(pid)
901                                                .or_insert(PlayerQueue::default());
902                                        }
903                                        let pq = dlock
904                                            .player_queue
905                                            .get_mut(&pid)
906                                            .expect("expected to have data stored");
907                                        for upd in upds {
908                                            pq.variables.insert(upd.name, upd.value);
909                                        }
910                                    }
911                                }
912                                ReadPacket::UpdateSync(pid, upds) => {
913                                    // There's surely a better way to do this, more performantly.
914                                    let mut dlock = data.write().await;
915                                    Self::iter_missing_data(&mut dlock, pid).await?;
916                                    let mut exists = IntSet::default();
917                                    let mut add_pq = IntSet::default();
918                                    #[cfg(feature = "__dev")]
919                                    info!("UpdateSync >>> {pid:?},{upds:?}");
920                                    if let Some(player) = dlock.players.get_mut(&pid) {
921                                        #[cfg(feature = "__dev")]
922                                        info!("Got Player");
923                                        for upd in &upds {
924                                            #[cfg(feature = "__dev")]
925                                            info!("Iterating over update {upd:?}");
926                                            if let Some(Some(sync)) = player.syncs.get_mut(upd.slot)
927                                            {
928                                                #[cfg(feature = "__dev")]
929                                                info!("Got sync {sync:?}");
930                                                exists.insert(upd.slot);
931                                                if let Some(vari) = &upd.variables {
932                                                    #[cfg(feature = "__dev")]
933                                                    info!("Got variable updates: {vari:?}");
934                                                    for (vname, value) in vari {
935                                                        #[cfg(feature = "__dev")]
936                                                        info!(
937                                                            "Itering over {vname:?} >>> {value:?}"
938                                                        );
939                                                        if let OptionalValue::Some(value) =
940                                                            value.clone()
941                                                        {
942                                                            #[cfg(feature = "__dev")]
943                                                            info!("Set");
944                                                            sync.variables
945                                                                .insert(vname.clone(), value);
946                                                        } else {
947                                                            #[cfg(feature = "__dev")]
948                                                            info!("Removed");
949                                                            sync.variables.remove(vname);
950                                                        }
951                                                    }
952                                                } else if upd.remove_sync {
953                                                    #[cfg(feature = "__dev")]
954                                                    info!("Marked as sync is ending");
955                                                    sync.is_ending = true;
956                                                } else {
957                                                    #[cfg(feature = "__dev")]
958                                                    info!("No updates triggered");
959                                                }
960                                                #[cfg(feature = "__dev")]
961                                                info!("Finished iterating over sync {sync:?}");
962                                            } else {
963                                                #[cfg(feature = "__dev")]
964                                                info!("Added sync to PlayerQueue");
965                                                add_pq.insert(pid);
966                                            }
967                                        }
968                                    } else {
969                                        #[cfg(feature = "__dev")]
970                                        info!("Added to PlayerQueue");
971                                        add_pq.insert(pid);
972                                    }
973                                    for slot in exists {
974                                        if let Some(upd) =
975                                            upds.iter().find(|supd| supd.slot == slot)
976                                        {
977                                            if let Some(dup) = &mut dlock.func_data_update {
978                                                if let Some(vari) = &upd.variables {
979                                                    for (vname, value) in vari {
980                                                        dup(DataUpdate::UpdateSyncVariable(
981                                                            pid,
982                                                            slot,
983                                                            vname.clone(),
984                                                            value.clone(),
985                                                        ));
986                                                    }
987                                                } else if upd.remove_sync {
988                                                    dup(DataUpdate::UpdateSyncRemoval(pid, slot));
989                                                }
990                                            }
991                                        }
992                                    }
993                                    for pid in add_pq {
994                                        for upd in &upds {
995                                            if let Some(player) = dlock.players.get(&pid) {
996                                                if let Some(Some(_)) = player.syncs.get(upd.slot) {
997                                                    continue;
998                                                }
999                                            }
1000                                            dlock
1001                                                .player_queue
1002                                                .entry(pid)
1003                                                .or_insert(PlayerQueue::default());
1004                                            let pq = dlock
1005                                                .player_queue
1006                                                .get_mut(&pid)
1007                                                .expect("expected to have data stored");
1008                                            if let Some(vari) = &upd.variables {
1009                                                pq.syncs.entry(upd.slot).or_insert(HashMap::new());
1010                                                let su = pq
1011                                                    .syncs
1012                                                    .get_mut(&upd.slot)
1013                                                    .expect("expected to have data stored");
1014                                                for (vname, value) in vari {
1015                                                    su.insert(vname.clone(), value.clone());
1016                                                }
1017                                            } else if upd.remove_sync
1018                                                && !pq.remove_syncs.contains(&upd.slot)
1019                                            {
1020                                                pq.remove_syncs.push(upd.slot);
1021                                            }
1022                                        }
1023                                    }
1024                                }
1025                                ReadPacket::ClearPlayers() => {
1026                                    let mut dlock = data.write().await;
1027                                    for pid in dlock.players.keys().cloned().collect::<Vec<u64>>() {
1028                                        if let Some(dup) = &mut dlock.func_data_update {
1029                                            dup(DataUpdate::PlayerLoggedOut(pid));
1030                                        }
1031                                    }
1032                                    dlock.players.clear();
1033                                }
1034                                ReadPacket::GameIniWrite(upds) => {
1035                                    let mut dlock = data.write().await;
1036                                    for upd in upds {
1037                                        if let OptionalValue::Some(value) = upd.value.clone() {
1038                                            dlock.game_save.insert(upd.name.clone(), value.clone());
1039                                        } else {
1040                                            dlock.game_save.remove(&upd.name);
1041                                        }
1042                                        if let Some(dup) = &mut dlock.func_data_update {
1043                                            let keys = upd
1044                                                .name
1045                                                .split(">")
1046                                                .map(|entry| {
1047                                                    urlencoding::decode(entry).expect(
1048                                                        "unable to decode uri on gameini update",
1049                                                    )
1050                                                })
1051                                                .collect::<Vec<_>>();
1052                                            if keys.len() == 3 {
1053                                                dup(DataUpdate::UpdateGameIni(
1054                                                    Some(
1055                                                        keys.first()
1056                                                            .expect("unable to fetch key 0")
1057                                                            .to_string(),
1058                                                    ),
1059                                                    keys.get(1)
1060                                                        .expect("unable to fetch key 1")
1061                                                        .to_string(),
1062                                                    keys.get(2)
1063                                                        .expect("unable to fetch key 2")
1064                                                        .to_string(),
1065                                                    upd.value,
1066                                                ));
1067                                            } else {
1068                                                dup(DataUpdate::UpdateGameIni(
1069                                                    None,
1070                                                    keys.first()
1071                                                        .expect("unable to fetch key 0 on empty")
1072                                                        .to_string(),
1073                                                    keys.get(1)
1074                                                        .expect("unable to fetch key 1 on empty")
1075                                                        .to_string(),
1076                                                    upd.value,
1077                                                ));
1078                                            }
1079                                        }
1080                                    }
1081                                }
1082                                ReadPacket::NewSync(pid, upds) => {
1083                                    let mut dlock = data.write().await;
1084                                    for (slot, kind, stype, vari) in upds {
1085                                        if let Some(player) = dlock.players.get_mut(&pid) {
1086                                            player.syncs.insert(
1087                                                slot as usize,
1088                                                Some(types::Sync {
1089                                                    kind,
1090                                                    sync_type: stype,
1091                                                    variables: vari,
1092                                                    event: SyncEvent::New,
1093                                                    is_ending: false,
1094                                                }),
1095                                            );
1096                                            Self::iter_missing_data(&mut dlock, pid).await?;
1097                                        } else {
1098                                            dlock
1099                                                .player_queue
1100                                                .entry(pid)
1101                                                .or_insert(PlayerQueue::default());
1102                                            let pq = dlock
1103                                                .player_queue
1104                                                .get_mut(&pid)
1105                                                .expect("expected to have data stored");
1106                                            pq.new_syncs.push(NewSync {
1107                                                kind,
1108                                                slot: slot as usize,
1109                                                sync_type: stype,
1110                                                variables: vari,
1111                                            });
1112                                        }
1113                                    }
1114                                }
1115                                ReadPacket::PlayerChangedRooms(pid, room) => {
1116                                    let mut dlock = data.write().await;
1117                                    Self::iter_missing_data(&mut dlock, pid).await?;
1118                                    if let Some(player) = dlock.players.get_mut(&pid) {
1119                                        player.room = room;
1120                                    }
1121                                }
1122                                ReadPacket::HighscoreUpdate(pid, hid, score) => {
1123                                    let mut dlock = data.write().await;
1124                                    if let Some(highscore) =
1125                                        dlock.game_highscores.get_mut(&Leb(hid))
1126                                    {
1127                                        highscore.scores.insert(Leb(pid), score);
1128                                    }
1129                                }
1130                                ReadPacket::UpdatePlayerData(pid, syncs, vari) => {
1131                                    let mut dlock = data.write().await;
1132                                    #[cfg(feature = "__dev")]
1133                                    info!("{pid:?}, {syncs:?}, {vari:?}");
1134                                    if dlock.players.contains_key(&pid) {
1135                                        #[cfg(feature = "__dev")]
1136                                        info!("OK");
1137                                        dlock.player_queue.remove(&pid);
1138                                        if let Some(player) = dlock.players.get_mut(&pid) {
1139                                            player.syncs = syncs;
1140                                            player.variables = vari;
1141                                            #[cfg(feature = "__dev")]
1142                                            info!("{player:?}");
1143                                        }
1144                                    }
1145                                    #[cfg(feature = "__dev")]
1146                                    info!("END");
1147                                }
1148                                ReadPacket::RequestPlayerVariable(index, vari) => {
1149                                    let mut dlock = data.write().await;
1150                                    #[cfg(feature = "__dev")]
1151                                    info!(
1152                                        "RequestPlayerVariable->{:?}->{index:?}",
1153                                        dlock.callback_server_update
1154                                    );
1155                                    if let Some(Some(csu)) = dlock
1156                                        .callback_server_update
1157                                        .get_mut(&index)
1158                                        .map(|csu| csu.take())
1159                                    {
1160                                        if let ServerUpdateCallback::PlayerVariable(callback, pid) =
1161                                            csu.callback
1162                                        {
1163                                            Self::iter_missing_data(&mut dlock, pid).await?;
1164                                            if let Some(player) = dlock.players.get_mut(&pid) {
1165                                                if let OptionalValue::Some(value) = vari.clone() {
1166                                                    player
1167                                                        .variables
1168                                                        .insert(csu.name.clone(), value);
1169                                                } else {
1170                                                    player.variables.remove(&*csu.name);
1171                                                }
1172                                            }
1173                                            if let Some(mut callback) = callback {
1174                                                callback(pid, csu.name, vari);
1175                                            }
1176                                        }
1177                                    }
1178                                }
1179                                ReadPacket::AdminAction(aa) => {
1180                                    let mut dlock = data.write().await;
1181                                    dlock.clear(false).await;
1182                                    dlock.is_loggedin = false;
1183                                    match aa.clone() {
1184                                        AdminAction::Ban(reason, unban_time) => {
1185                                            if let Some(callback) = &mut dlock.func_banned {
1186                                                callback(
1187                                                    reason.clone(),
1188                                                    DateTime::from_timestamp(unban_time, 0)
1189                                                        .expect("unable to parse ban timestamp"),
1190                                                );
1191                                            }
1192                                            if let Some(dup) = dlock.func_data_update.as_mut() {
1193                                                dup(DataUpdate::Banned(
1194                                                    reason,
1195                                                    DateTime::from_timestamp(unban_time, 0)
1196                                                        .expect("unable to parse ban timestamp"),
1197                                                ));
1198                                            }
1199                                        }
1200                                        AdminAction::Kick(reason) => {
1201                                            if let Some(callback) = &mut dlock.func_kicked {
1202                                                callback(reason.clone());
1203                                            }
1204                                            if let Some(dup) = dlock.func_data_update.as_mut() {
1205                                                dup(DataUpdate::Kicked(reason));
1206                                            }
1207                                        }
1208                                        AdminAction::Unban => {
1209                                            panic!("this should never happen");
1210                                        }
1211                                    }
1212                                    if let Some(dup) = &mut dlock.func_data_update {
1213                                        dup(DataUpdate::AdminAction(aa));
1214                                    }
1215                                }
1216                                ReadPacket::RequestSyncVariable(index, vari) => {
1217                                    let mut dlock = data.write().await;
1218                                    if let Some(Some(csu)) =
1219                                        dlock.callback_server_update.remove(&index)
1220                                    {
1221                                        if let ServerUpdateCallback::SyncVariable(
1222                                            callback,
1223                                            pid,
1224                                            slot,
1225                                        ) = csu.callback
1226                                        {
1227                                            let obtained = if let Some(player) =
1228                                                dlock.players.get_mut(&pid)
1229                                            {
1230                                                if let Some(Some(sync)) = player.syncs.get_mut(slot)
1231                                                {
1232                                                    if let OptionalValue::Some(value) = vari.clone()
1233                                                    {
1234                                                        sync.variables
1235                                                            .insert(csu.name.clone(), value);
1236                                                    } else {
1237                                                        sync.variables.remove(&*csu.name);
1238                                                    }
1239                                                }
1240                                                true
1241                                            } else {
1242                                                false
1243                                            };
1244                                            if obtained {
1245                                                if let Some(dup) = &mut dlock.func_data_update {
1246                                                    dup(DataUpdate::UpdateSyncVariable(
1247                                                        pid,
1248                                                        slot,
1249                                                        csu.name.clone(),
1250                                                        vari.clone(),
1251                                                    ));
1252                                                }
1253                                            }
1254                                            if let Some(mut callback) = callback {
1255                                                callback(pid, csu.name, vari);
1256                                            }
1257                                        }
1258                                    }
1259                                }
1260                                ReadPacket::ChangeGameVersion(ver) => {
1261                                    let mut dlock = data.write().await;
1262                                    dlock.game_version = ver;
1263                                    if let Some(dup) = &mut dlock.func_data_update {
1264                                        dup(DataUpdate::UpdateGameVersion(ver));
1265                                    }
1266                                }
1267                                ReadPacket::ModifyAdministrator(pid, admin) => {
1268                                    let mut dlock = data.write().await;
1269                                    dlock.game_administrators.insert(Leb(pid), admin);
1270                                    if let Some(dup) = &mut dlock.func_data_update {
1271                                        dup(DataUpdate::UpdateAdministrator(pid, Some(admin)));
1272                                    }
1273                                }
1274                                ReadPacket::RemoveAdministrator(pid) => {
1275                                    let mut dlock = data.write().await;
1276                                    dlock.game_administrators.remove(&Leb(pid));
1277                                    if let Some(dup) = &mut dlock.func_data_update {
1278                                        dup(DataUpdate::UpdateAdministrator(pid, None));
1279                                    }
1280                                }
1281                                ReadPacket::PlayerIniWrite(upds) => {
1282                                    let mut dlock = data.write().await;
1283                                    for upd in upds {
1284                                        if let OptionalValue::Some(value) = upd.value.clone() {
1285                                            dlock.game_save.insert(upd.name.clone(), value.clone());
1286                                        } else {
1287                                            dlock.game_save.remove(&upd.name);
1288                                        }
1289                                        if let Some(dup) = &mut dlock.func_data_update {
1290                                            let keys = upd
1291                                                .name
1292                                                .split(">")
1293                                                .map(|entry| {
1294                                                    urlencoding::decode(entry).expect(
1295                                                        "unable to decode uri on playerini update",
1296                                                    )
1297                                                })
1298                                                .collect::<Vec<_>>();
1299                                            if keys.len() == 3 {
1300                                                dup(DataUpdate::UpdatePlayerIni(
1301                                                    Some(
1302                                                        keys.first()
1303                                                            .expect("unable to fetch key 0")
1304                                                            .to_string(),
1305                                                    ),
1306                                                    keys.get(1)
1307                                                        .expect("unable to fetch key 1")
1308                                                        .to_string(),
1309                                                    keys.first()
1310                                                        .expect("unable to fetch key 2")
1311                                                        .to_string(),
1312                                                    upd.value,
1313                                                ));
1314                                            } else {
1315                                                dup(DataUpdate::UpdatePlayerIni(
1316                                                    None,
1317                                                    keys.first()
1318                                                        .expect("unable to fetch key 0 on empty")
1319                                                        .to_string(),
1320                                                    keys.get(1)
1321                                                        .expect("unable to fetch key 1 on empty")
1322                                                        .to_string(),
1323                                                    upd.value,
1324                                                ));
1325                                            }
1326                                        }
1327                                    }
1328                                }
1329                                ReadPacket::RequestBdb(index, bdb) => {
1330                                    let mut dlock = data.write().await;
1331                                    if let Some(Some(mut csu)) = dlock
1332                                        .callback_server_update
1333                                        .get_mut(&index)
1334                                        .map(|csu| csu.take())
1335                                    {
1336                                        if let ServerUpdateCallback::FetchBdb(Some(callback)) =
1337                                            &mut csu.callback
1338                                        {
1339                                            callback(csu.name.clone(), bdb.clone());
1340                                        }
1341                                        if let Some(dup) = &mut dlock.func_data_update {
1342                                            dup(DataUpdate::FetchBdb(csu.name.clone(), bdb));
1343                                        }
1344                                    }
1345                                }
1346                                ReadPacket::ChangeFriendStatus(cfs, pid) => {
1347                                    let mut dlock = data.write().await;
1348                                    // Tl;dr: This is stupid
1349                                    match cfs {
1350                                        ChangeFriendStatus::Request => {
1351                                            dlock.player_incoming_friends.insert(pid);
1352                                            dlock.player_friends.remove(&pid);
1353                                            dlock.player_outgoing_friends.remove(&pid);
1354                                        }
1355                                        ChangeFriendStatus::Accept | ChangeFriendStatus::Friend => {
1356                                            dlock.player_friends.insert(pid);
1357                                            dlock.player_incoming_friends.remove(&pid);
1358                                            dlock.player_outgoing_friends.remove(&pid);
1359                                        }
1360                                        ChangeFriendStatus::Deny
1361                                        | ChangeFriendStatus::Cancel
1362                                        | ChangeFriendStatus::Remove
1363                                        | ChangeFriendStatus::NotFriend => {
1364                                            dlock.player_friends.remove(&pid);
1365                                            dlock.player_incoming_friends.remove(&pid);
1366                                            dlock.player_outgoing_friends.remove(&pid);
1367                                        }
1368                                    }
1369                                    if let Some(dup) = &mut dlock.func_data_update {
1370                                        dup(DataUpdate::ChangeFriendStatus(pid));
1371                                    }
1372                                }
1373                                ReadPacket::ServerMessage(msg) => {
1374                                    let mut dlock = data.write().await;
1375                                    if let Some(dup) = &mut dlock.func_data_update {
1376                                        dup(DataUpdate::ServerMessage(msg.clone()));
1377                                    }
1378                                }
1379                                ReadPacket::ChangeConnection(host) => {
1380                                    let mut dlock = data.write().await;
1381                                    dlock.is_connecting = true;
1382                                    dlock.is_reconnecting = true;
1383                                    if let Some(dup) = &mut dlock.func_data_update {
1384                                        dup(DataUpdate::Reconnecting());
1385                                    }
1386                                    if let Ok((ws, _)) =
1387                                        tokio_tungstenite::connect_async(&host).await
1388                                    {
1389                                        let (sread, swrite) = StreamHandler::split_stream(ws).await;
1390                                        *writer.lock().await = swrite;
1391                                        reader = sread;
1392                                        dlock.last_host = Some(host);
1393                                    } else {
1394                                        return Err(Error::new(
1395                                            ErrorKind::ConnectionRefused,
1396                                            "unable to connect to new host",
1397                                        ));
1398                                    }
1399                                }
1400                            }
1401                        }
1402                    }
1403                    Err(_e) => {
1404                        return Err(Error::new(ErrorKind::BrokenPipe, "invalid read data"));
1405                    }
1406                }
1407            }
1408        });
1409        let mut dlock;
1410        if let Err(_e) = task.await {
1411            dlock = cdata.write().await;
1412            #[cfg(feature = "__dev")]
1413            info!("Stream handler closed with error: {_e:?}");
1414            dlock.registered_errors.push(if _e.is_panic() {
1415                ClientError::HandlerPanic(format!("{:?}", _e.into_panic()))
1416            } else {
1417                ClientError::HandlerResult(Ok(())) // TODO: Change this into the actual error
1418            });
1419        } else {
1420            dlock = cdata.write().await;
1421            dlock
1422                .registered_errors
1423                .push(ClientError::HandlerResult(Ok(())));
1424        }
1425        {
1426            dlock.clear(true).await;
1427            if dlock.call_disconnected {
1428                if let Some(func) = dlock.func_disconnected.as_mut() {
1429                    func();
1430                }
1431                if let Some(dup) = dlock.func_data_update.as_mut() {
1432                    dup(DataUpdate::Disconnected());
1433                }
1434                dlock.call_disconnected = false;
1435            }
1436        }
1437        cwriter.lock().await.shutdown().await;
1438    }
1439
1440    #[inline(always)]
1441    fn get_packet_write(data: &WritePacket) -> IoResult<Buffer> {
1442        #[cfg(feature = "__dev")]
1443        info!("writing packet: {data:?}");
1444        let mut b = Buffer::empty();
1445        match data {
1446            WritePacket::InitializationHandshake(
1447                hash,
1448                lib_version,
1449                device_id,
1450                game_id,
1451                version,
1452                session,
1453            ) => {
1454                b.write_leb_u64(0)?;
1455                for hash in hash {
1456                    b.write_u64(*hash)?;
1457                }
1458                b.write_leb_u64(*lib_version)?;
1459                b.write_string(device_id)?;
1460                b.write_string(game_id)?;
1461                b.write_f64(*version)?;
1462                b.write_string(session)?;
1463            }
1464            WritePacket::Login(username, passw, game_token, variables, syncs, room) => {
1465                b.write_leb_u64(1)?;
1466                b.write_string(username)?;
1467                match passw {
1468                    LoginPassw::Token(token) => {
1469                        b.write_bool(true)?;
1470                        b.write_string(token)?;
1471                    }
1472                    LoginPassw::Passw(passw) => {
1473                        b.write_bool(false)?;
1474                        b.write_string(passw)?;
1475                    }
1476                }
1477                b.write_string(game_token)?;
1478                b.write(variables)?;
1479                b.write(syncs)?;
1480                b.write_string(room)?;
1481            }
1482            WritePacket::Register(username, email, passw, repeat_passw) => {
1483                b.write_leb_u64(2)?;
1484                b.write_string(username)?;
1485                b.write_string(email)?;
1486                b.write_string(passw)?;
1487                b.write_string(repeat_passw)?;
1488            }
1489            WritePacket::RequestPlayerVariable(player_request, index, name) => {
1490                b.write_leb_u64(3)?;
1491                match player_request {
1492                    PlayerRequest::ID(pid) => {
1493                        b.write_bool(true)?;
1494                        b.write_leb_u64(*pid)?;
1495                    }
1496                    _ => {
1497                        b.write_bool(false)?;
1498                    }
1499                }
1500                b.write_string(name)?;
1501                b.write_leb_u64(*index)?;
1502            }
1503            WritePacket::P2P(player_request, mid, payload) => {
1504                b.write_leb_u64(4)?;
1505                b.write(player_request)?;
1506                b.write_i16(*mid)?;
1507                b.write(payload)?;
1508            }
1509            WritePacket::UpdateGameVersion(version) => {
1510                b.write_leb_u64(5)?;
1511                b.write_f64(*version)?;
1512            }
1513            WritePacket::UpdateGameSession(session) => {
1514                b.write_leb_u64(6)?;
1515                b.write_string(session)?;
1516            }
1517            WritePacket::UpdatePlayerVariable(updates) => {
1518                b.write_leb_u64(7)?;
1519                b.write(updates)?;
1520            }
1521            WritePacket::Ping() => {
1522                b.write_leb_u64(8)?;
1523            }
1524            WritePacket::GameIniWrite(updates) => {
1525                b.write_leb_u64(9)?;
1526                b.write(updates)?;
1527            }
1528            WritePacket::PlayerIniWrite(updates) => {
1529                b.write_leb_u64(10)?;
1530                b.write(updates)?;
1531            }
1532            WritePacket::UpdateRoom(room) => {
1533                b.write_leb_u64(11)?;
1534                b.write_string(room)?;
1535            }
1536            WritePacket::NewSync(upds) => {
1537                b.write_leb_u64(12)?;
1538                b.write_leb_u64(upds.len() as u64)?;
1539                for (slot, kind, sync_type, variables) in upds {
1540                    b.write_leb_u64(*slot)?;
1541                    b.write_i16(*kind)?;
1542                    b.write_u8(*sync_type as u8)?;
1543                    b.write(variables)?;
1544                }
1545            }
1546            WritePacket::UpdateSync(updates) => {
1547                b.write_leb_u64(13)?;
1548                b.write(updates)?;
1549            }
1550            WritePacket::UpdateAchievement(aid) => {
1551                b.write_leb_u64(14)?;
1552                b.write_leb_u64(*aid)?;
1553            }
1554            WritePacket::UpdateHighscore(hid, score) => {
1555                b.write_leb_u64(15)?;
1556                b.write_leb_u64(*hid)?;
1557                b.write_f64(*score)?;
1558            }
1559            WritePacket::AdminAction(admin_action, pid) => {
1560                b.write_leb_u64(16)?;
1561                b.write_leb_u64(*pid)?;
1562                match admin_action {
1563                    AdminAction::Ban(reason, time) => {
1564                        b.write_u8(0)?;
1565                        b.write_string(reason)?;
1566                        b.write_i64(*time)?;
1567                    }
1568                    AdminAction::Unban => {
1569                        b.write_u8(1)?;
1570                    }
1571                    AdminAction::Kick(reason) => {
1572                        b.write_u8(2)?;
1573                        b.write_string(reason)?;
1574                    }
1575                }
1576            }
1577            WritePacket::RequestSyncVariable(pid, index, slot, name) => {
1578                b.write_leb_u64(17)?;
1579                b.write_leb_u64(*pid)?;
1580                b.write_string(name)?;
1581                b.write_leb_u64(*index)?;
1582                b.write_leb_u64(*slot)?;
1583            }
1584            WritePacket::Logout() => {
1585                b.write_leb_u64(18)?;
1586            }
1587            WritePacket::RequestBdb(index, name) => {
1588                b.write_leb_u64(19)?;
1589                b.write_leb_u64(*index)?;
1590                b.write_string(name)?;
1591            }
1592            WritePacket::SetBdb(name, payload) => {
1593                b.write_leb_u64(20)?;
1594                b.write_string(name)?;
1595                b.write_bytes(payload)?;
1596            }
1597            WritePacket::RequestChangeFriendStatus(status, pid) => {
1598                b.write_leb_u64(21)?;
1599                b.write_u8(*status as u8)?;
1600                b.write_leb_u64(*pid)?;
1601            }
1602            WritePacket::Handshake(key) => {
1603                b.write_leb_u64(22)?;
1604                b.write_string(key)?;
1605            }
1606        }
1607        Ok(b)
1608    }
1609
1610    #[inline(always)]
1611    fn get_packet_read(mut b: Buffer) -> IoResult<ReadPacket> {
1612        let event = b.read_leb_u64()?;
1613        match event {
1614            0 => {
1615                let code = RegistrationCode::try_from_primitive(b.read_u8()?).unwrap_or_default();
1616                Ok(ReadPacket::Registration(code))
1617            }
1618            1 => {
1619                let code = LoginCode::try_from_primitive(b.read_u8()?).unwrap_or_default();
1620                match code {
1621                    LoginCode::GameBan => {
1622                        let reason = b.read_string()?;
1623                        let unban_time = b.read_i64()?;
1624                        Ok(ReadPacket::LoginBan(code, reason, unban_time))
1625                    }
1626                    LoginCode::Ok => {
1627                        let pid = b.read_leb_u64()?;
1628                        let name = b.read_string()?;
1629                        let token = b.read()?;
1630                        let player_save = b.read()?;
1631                        let friends = b.read()?;
1632                        let incoming_friends = b.read()?;
1633                        let outgoing_friends = b.read()?;
1634                        let achievements = b.read()?;
1635                        Ok(ReadPacket::LoginOk(
1636                            pid,
1637                            name,
1638                            token,
1639                            player_save,
1640                            friends,
1641                            incoming_friends,
1642                            outgoing_friends,
1643                            achievements,
1644                        ))
1645                    }
1646                    _ => Ok(ReadPacket::Login(code)),
1647                }
1648            }
1649            2 => {
1650                let pid = b.read_leb_u64()?;
1651                let name = b.read_string()?;
1652                let room = b.read_string()?;
1653                let variables = b.read()?;
1654                let syncs = b.read()?;
1655                Ok(ReadPacket::PlayerLoggedIn(
1656                    pid, name, variables, syncs, room,
1657                ))
1658            }
1659            3 => {
1660                let pid = b.read_leb_u64()?;
1661                Ok(ReadPacket::PlayerLoggedOut(pid))
1662            }
1663            4 => {
1664                let game_save = b.read()?;
1665                let game_achievements = b.read()?;
1666                let game_highscores = b.read()?;
1667                let game_administrators = b.read()?;
1668                let version = b.read_f64()?;
1669                Ok(ReadPacket::SyncGameInfo(
1670                    game_save,
1671                    game_achievements,
1672                    game_highscores,
1673                    game_administrators,
1674                    version,
1675                ))
1676            }
1677            5 => {
1678                let pid = b.read()?;
1679                let mid = b.read_i16()?;
1680                let payload = b.read()?;
1681                Ok(ReadPacket::P2P(pid, mid, payload))
1682            }
1683            6 => {
1684                let pid = b.read_leb_u64()?;
1685                let updates = b.read()?;
1686                Ok(ReadPacket::UpdatePlayerVariable(pid, updates))
1687            }
1688            7 => {
1689                let ping = b.read()?;
1690                Ok(ReadPacket::Ping(ping))
1691            }
1692            8 => Ok(ReadPacket::ClearPlayers()),
1693            9 => {
1694                let updates = b.read()?;
1695                Ok(ReadPacket::GameIniWrite(updates))
1696            }
1697            10 => {
1698                let pid = b.read_leb_u64()?;
1699                let mut data = Vec::new();
1700                for _ in 0..b.read_leb_u64()? {
1701                    let slot = b.read_leb_u64()?;
1702                    let kind = b.read_i16()?;
1703                    let sync_type = SyncType::try_from_primitive(b.read_u8()?).unwrap_or_default();
1704                    let variables = b.read()?;
1705                    data.push((slot, kind, sync_type, variables));
1706                }
1707                Ok(ReadPacket::NewSync(pid, data))
1708            }
1709            11 => {
1710                let pid = b.read_leb_u64()?;
1711                let room = b.read_string()?;
1712                Ok(ReadPacket::PlayerChangedRooms(pid, room))
1713            }
1714            12 => {
1715                let pid = b.read_leb_u64()?;
1716                let sync_updates = b.read()?;
1717                Ok(ReadPacket::UpdateSync(pid, sync_updates))
1718            }
1719            13 => {
1720                let pid = b.read_leb_u64()?;
1721                let hid = b.read_leb_u64()?;
1722                let score = b.read_f64()?;
1723                Ok(ReadPacket::HighscoreUpdate(pid, hid, score))
1724            }
1725            14 => {
1726                let pid = b.read_leb_u64()?;
1727                let syncs = b.read()?;
1728                let variables = b.read()?;
1729                Ok(ReadPacket::UpdatePlayerData(pid, syncs, variables))
1730            }
1731            15 => {
1732                let slot = b.read_leb_u64()?;
1733                let variable = b.read()?;
1734                Ok(ReadPacket::RequestPlayerVariable(slot, variable))
1735            }
1736            16 => {
1737                if b.read_bool()? {
1738                    let reason = b.read_string()?;
1739                    Ok(ReadPacket::AdminAction(AdminAction::Kick(reason)))
1740                } else {
1741                    let reason = b.read_string()?;
1742                    let time = b.read_i64()?;
1743                    Ok(ReadPacket::AdminAction(AdminAction::Ban(reason, time)))
1744                }
1745            }
1746            17 => {
1747                let slot = b.read_leb_u64()?;
1748                let variable = b.read()?;
1749                Ok(ReadPacket::RequestSyncVariable(slot, variable))
1750            }
1751            18 => {
1752                let version = b.read_f64()?;
1753                Ok(ReadPacket::ChangeGameVersion(version))
1754            }
1755            19 => {
1756                if b.read_bool()? {
1757                    let pid = b.read_leb_u64()?;
1758                    let administrator = b.read()?;
1759                    Ok(ReadPacket::ModifyAdministrator(pid, administrator))
1760                } else {
1761                    let pid = b.read_leb_u64()?;
1762                    Ok(ReadPacket::RemoveAdministrator(pid))
1763                }
1764            }
1765            20 => Ok(ReadPacket::ForceDisconnection()),
1766            21 => {
1767                let updates = b.read()?;
1768                Ok(ReadPacket::PlayerIniWrite(updates))
1769            }
1770            22 => {
1771                let index = b.read_leb_u64()?;
1772                let data = b.read()?;
1773                Ok(ReadPacket::RequestBdb(index, data))
1774            }
1775            23 => {
1776                let status =
1777                    ChangeFriendStatus::try_from_primitive(b.read_u8()?).unwrap_or_default();
1778                let pid = b.read_leb_u64()?;
1779                Ok(ReadPacket::ChangeFriendStatus(status, pid))
1780            }
1781            24 => Ok(ReadPacket::Handshake(b.read()?)),
1782            25 => Ok(ReadPacket::ServerMessage(b.read_string()?)),
1783            26 => Ok(ReadPacket::ChangeConnection(b.read_string()?)),
1784            _ => Err(Error::new(
1785                ErrorKind::InvalidData,
1786                format!("unknown event {event}, erroring out"),
1787            )),
1788        }
1789    }
1790
1791    #[inline(always)]
1792    async fn iter_missing_data(data: &mut StreamData, pid: u64) -> IoResult<()> {
1793        if let Some(pq) = data.player_queue.get_mut(&pid) {
1794            if let Some(player) = data.players.get_mut(&pid) {
1795                for (name, value) in pq.variables.drain() {
1796                    if let OptionalValue::Some(value) = value {
1797                        player.variables.insert(name, value);
1798                    } else {
1799                        player.variables.remove(&name);
1800                    }
1801                }
1802                for (index, osync) in player.syncs.iter_mut().enumerate() {
1803                    if osync.is_none() {
1804                        if let Some((sni, sn)) = pq
1805                            .new_syncs
1806                            .iter()
1807                            .enumerate()
1808                            .find(|(_, sn)| sn.slot == index)
1809                        {
1810                            *osync = Some(types::Sync {
1811                                event: SyncEvent::New,
1812                                kind: sn.kind,
1813                                sync_type: sn.sync_type,
1814                                variables: sn.variables.clone(),
1815                                is_ending: false,
1816                            });
1817                            pq.new_syncs.remove(sni);
1818                        }
1819                    }
1820                    if let Some(sync) = osync {
1821                        if let Some((sni, _)) = pq
1822                            .new_syncs
1823                            .iter()
1824                            .enumerate()
1825                            .find(|(_, sn)| sn.slot == index)
1826                        {
1827                            pq.new_syncs.remove(sni);
1828                        }
1829                        if let Some(is) = pq.syncs.remove(&index) {
1830                            for (name, value) in is {
1831                                if let OptionalValue::Some(value) = value {
1832                                    sync.variables.insert(name, value);
1833                                } else {
1834                                    sync.variables.remove(&name);
1835                                }
1836                            }
1837                        }
1838                        if let Some((index, _)) = pq
1839                            .remove_syncs
1840                            .iter()
1841                            .enumerate()
1842                            .find(|(rindex, _)| *rindex == index)
1843                        {
1844                            sync.is_ending = true;
1845                            pq.remove_syncs.remove(index);
1846                        }
1847                    }
1848                }
1849            }
1850        }
1851
1852        Ok(())
1853    }
1854
1855    /// Update the Crystal Server runtime.
1856    /// This ensures that things are kept up-to-date between the server
1857    /// and the client.
1858    #[inline(always)]
1859    pub async fn update(&self) -> IoResult<()> {
1860        let conn = self.is_connected().await;
1861        let mut dlock = self.data.write().await;
1862        if let Some(room_callback) = &mut dlock.func_room {
1863            let result = room_callback();
1864            if dlock.room != result && conn {
1865                self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateRoom(
1866                    result.clone(),
1867                ))?)
1868                .await?;
1869            }
1870            dlock.room = result;
1871        }
1872        if conn {
1873            let lroom = dlock.room.clone();
1874            //let timestamp = Utc::now().timestamp();
1875            for pid in dlock.players.keys().cloned().collect::<Vec<u64>>() {
1876                let player_logged_out = dlock.players_logout.contains(&pid);
1877                let mut is_all_none = true;
1878                Self::iter_missing_data(&mut dlock, pid).await?;
1879                let player = dlock
1880                    .players
1881                    .get_mut(&pid)
1882                    .ok_or(Error::new(ErrorKind::NotFound, "player not found"))?;
1883                for sync in &mut player.syncs {
1884                    if let Some(msync) = sync {
1885                        /*#[cfg(feature = "__dev")]
1886                        info!("Got sync {msync:?} for pid {pid}");*/
1887                        is_all_none = false;
1888                        if msync.event == SyncEvent::End {
1889                            #[cfg(feature = "__dev")]
1890                            info!("Sync END {pid}");
1891                            drop(sync.take());
1892                            continue;
1893                        }
1894                        if msync.sync_type == SyncType::Once
1895                            || player.room != lroom
1896                            || player_logged_out
1897                        {
1898                            #[cfg(feature = "__dev")]
1899                            info!("Marked as Sync END {pid}");
1900                            msync.event = SyncEvent::End;
1901                        }
1902                    }
1903                }
1904                if is_all_none && player_logged_out {
1905                    dlock.players.remove(&pid);
1906                    dlock.players_logout.remove(&pid);
1907                    if dlock.player_queue.contains_key(&pid) {
1908                        return Err(Error::new(
1909                            ErrorKind::AlreadyExists,
1910                            "found player queue but expected none",
1911                        ));
1912                    }
1913                }
1914            }
1915            if !dlock.syncs.is_empty() || !dlock.syncs_remove.is_empty() {
1916                let mut upds = Vec::new();
1917                for (index, sync) in dlock.syncs.iter_mut().enumerate() {
1918                    if let Some(sync) = sync {
1919                        if !sync.to_sync.is_empty() {
1920                            #[cfg(feature = "__dev")]
1921                            info!(
1922                                "updating sync variables on slot {index}: {:?}",
1923                                sync.to_sync
1924                            );
1925                            let mut variupd = HashMap::new();
1926                            for upd in sync.to_sync.drain() {
1927                                let vari = sync.variables.get(&upd).cloned().into();
1928                                variupd.insert(upd, vari);
1929                            }
1930                            upds.push(SyncUpdate {
1931                                slot: index,
1932                                remove_sync: false,
1933                                variables: Some(variupd),
1934                            });
1935                        }
1936                    }
1937                }
1938                for remove in dlock.syncs_remove.drain(..) {
1939                    upds.push(SyncUpdate {
1940                        slot: remove,
1941                        remove_sync: true,
1942                        variables: None,
1943                    });
1944                }
1945                if !upds.is_empty() {
1946                    self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateSync(upds))?)
1947                        .await?;
1948                }
1949            }
1950            if !dlock.new_sync_queue.is_empty() {
1951                let mut data = Vec::new();
1952                let nsqvec = dlock.new_sync_queue.clone();
1953                dlock.new_sync_queue.clear();
1954                for nsq in nsqvec {
1955                    if let Some(sync) = dlock.syncs.get(nsq.slot).ok_or(Error::new(
1956                        ErrorKind::NotFound,
1957                        "expected sync but found none",
1958                    ))? {
1959                        data.push((
1960                            nsq.slot as u64,
1961                            nsq.kind,
1962                            nsq.sync_type,
1963                            sync.variables.clone(),
1964                        ));
1965                    } else {
1966                        return Err(Error::new(
1967                            ErrorKind::NotFound,
1968                            "expected sync on reserved slot but found none",
1969                        ));
1970                    }
1971                }
1972                self.internal_iosend(Self::get_packet_write(&WritePacket::NewSync(data))?)
1973                    .await?;
1974            }
1975            if !dlock.update_variable.is_empty() {
1976                let mut data = Vec::new();
1977                let variupds = dlock.update_variable.drain().collect::<Vec<String>>();
1978                for name in variupds {
1979                    let value = dlock.variables.get(&name).cloned().into();
1980                    data.push(VariableUpdate { name, value });
1981                }
1982                self.internal_iosend(Self::get_packet_write(&WritePacket::UpdatePlayerVariable(
1983                    data,
1984                ))?)
1985                .await?;
1986            }
1987            if !dlock.update_playerini.is_empty() {
1988                let mut data: Vec<VariableUpdate> = Vec::new();
1989                let variupds = dlock.update_playerini.drain().collect::<Vec<String>>();
1990                for name in variupds {
1991                    let value = dlock.player_save.get(&name).cloned().into();
1992                    data.push(VariableUpdate { name, value });
1993                }
1994                self.internal_iosend(Self::get_packet_write(&WritePacket::PlayerIniWrite(data))?)
1995                    .await?;
1996            }
1997            if !dlock.update_gameini.is_empty() {
1998                let mut data: Vec<VariableUpdate> = Vec::new();
1999                let variupds = dlock.update_gameini.drain().collect::<Vec<String>>();
2000                for name in variupds {
2001                    let value = dlock.game_save.get(&name).cloned().into();
2002                    data.push(VariableUpdate { name, value });
2003                }
2004                self.internal_iosend(Self::get_packet_write(&WritePacket::GameIniWrite(data))?)
2005                    .await?;
2006            }
2007            if let Some(ping) = dlock.last_ping {
2008                if ping.elapsed().as_secs_f64() >= 90.0 {
2009                    drop(dlock);
2010                    self.disconnect().await;
2011                    if let Some(dup) = self.data.write().await.func_data_update.as_mut() {
2012                        dup(DataUpdate::Disconnected());
2013                    }
2014                }
2015            }
2016        } else {
2017            if dlock.is_loggedin {
2018                dlock.is_loggedin = false;
2019            }
2020            dlock.last_ping.take();
2021        }
2022        Ok(())
2023    }
2024
2025    /// This is where you tell the other clients in which *room* you are
2026    /// The function receives 0 arguments and must return a [String]
2027    /// ```rust
2028    /// static CURRENT_ROOM: LazyLock<Mutex<String>> = LazyLock::new(|| Mutex::new(String::from("kitchen")));
2029    ///
2030    /// /// ...now I am in the living room
2031    /// *CURRENT_ROOM.lock().unwrap() = String::from("living room");
2032    /// ```
2033    /// And you'd tell that to the server through this function:
2034    /// ```rust
2035    /// cs.callback_set_room(Box::new(|| CURRENT_ROOM.lock().unwrap().clone())).await;
2036    /// ```
2037    pub async fn callback_set_room(&self, callback: CallbackRoom) {
2038        self.data.write().await.func_room = Some(callback);
2039    }
2040
2041    /// This is the callback when the client receives data from the server or another player.
2042    /// The function receives 3 arguments: Player ID: [Option<u64>], Message ID: [i64], Payload/Data: [Vec<Value>], and must return nothing.
2043    pub async fn callback_set_p2p(&self, callback: CallbackP2P) {
2044        self.data.write().await.func_p2p = Some(callback);
2045    }
2046
2047    /// This is the callback when a registration event has been carried out.
2048    /// The function receives 1 argument: Result: [RegistrationCode], and must return nothing.
2049    pub async fn callback_set_register(&self, callback: CallbackRegister) {
2050        self.data.write().await.func_register = Some(callback);
2051    }
2052
2053    /// This is the callback when a login event has been carried out.
2054    /// The function receives 1 argument: Result: [LoginCode], and must return nothing.
2055    pub async fn callback_set_login(&self, callback: CallbackLogin) {
2056        self.data.write().await.func_login = Some(callback);
2057    }
2058
2059    /// This is the callback when the player has been banned while playing.
2060    /// The function receives 2 arguments: Reason: [String], Unban Time: [DateTime<Utc>], and must return nothing.
2061    pub async fn callback_set_banned(&self, callback: CallbackBanned) {
2062        self.data.write().await.func_banned = Some(callback);
2063    }
2064
2065    /// This is the callback when the player has been kicked while playing.
2066    /// The function receives 2 arguments: Reason: [String], and must return nothing.
2067    pub async fn callback_set_kicked(&self, callback: CallbackKicked) {
2068        self.data.write().await.func_kicked = Some(callback);
2069    }
2070
2071    /// This is the callback when the client has disconnected from the server.
2072    /// The function receives 0 arguments and must return nothing.
2073    pub async fn callback_set_disconnected(&self, callback: CallbackDisconnected) {
2074        self.data.write().await.func_disconnected = Some(callback);
2075    }
2076
2077    /// This is the callback when the client has disconnected from the server.
2078    /// The function receives 0 arguments and must return nothing.
2079    ///
2080    /// The login-token is only valid once and only works for the game it was
2081    /// generated on.
2082    pub async fn callback_set_login_token(&self, callback: CallbackLoginToken) {
2083        self.data.write().await.func_login_token = Some(callback);
2084    }
2085
2086    /// This is the callback when an event has been triggered by the server.
2087    /// This may work as a replacement for all of the other callback functions
2088    /// except [CrystalServer::callback_set_room].
2089    /// The function receives 1 argument, Data: [DataUpdate] and must return nothing.
2090    pub async fn callback_set_data_update(&self, callback: CallbackDataUpdate) {
2091        self.data.write().await.func_data_update = Some(callback);
2092    }
2093
2094    /// Checks if the client has an active connection to the server.
2095    pub async fn is_connected(&self) -> bool {
2096        let dlock = self.data.read().await;
2097        dlock.is_connected && dlock.handshake_completed && {
2098            if let Some(thread) = &dlock.thread {
2099                !thread.is_finished()
2100            } else {
2101                true
2102            }
2103        }
2104    }
2105
2106    /// Checks if the client is trying to establish an active connection to the server.
2107    pub async fn is_connecting(&self) -> bool {
2108        let dlock = self.data.read().await;
2109        dlock.is_connecting || (!dlock.handshake_completed && dlock.is_connected)
2110    }
2111
2112    /// Check if the client is currently logged in.
2113    pub async fn is_loggedin(&self) -> bool {
2114        self.is_connected().await && {
2115            let dlock = self.data.read().await;
2116            dlock.is_loggedin && dlock.handshake_completed
2117        }
2118    }
2119
2120    /// Obtain the ping between the client and the server in ms.
2121    pub async fn get_ping(&self) -> f64 {
2122        self.data.read().await.ping
2123    }
2124
2125    /// Sets the Game Token so that the client may be able to log in.
2126    ///
2127    /// The Game Token is like a "password" for your game and that
2128    /// the game the player is playing on must be the same one as the
2129    /// one in the server.
2130    pub async fn set_game_token(&self, token: &str) {
2131        self.data.write().await.game_token = token.to_owned();
2132    }
2133
2134    /// Disconnects the client from an active connection with the server.
2135    pub async fn disconnect(&self) {
2136        let mut lock = self.data.write().await;
2137        if let Some(thread) = lock.thread.take() {
2138            thread.abort();
2139        }
2140        lock.clear(true).await;
2141    }
2142
2143    async fn internal_iosend(&self, data: Buffer) -> IoResult<()> {
2144        if let Some(writer) = &self.writer {
2145            if let Err(_e) = writer.lock().await.write(data).await {
2146                #[cfg(feature = "__dev")]
2147                info!("unable to send data to server with error: {_e:?}");
2148                let mut dlock = self.data.write().await;
2149                dlock.clear(true).await;
2150                if dlock.call_disconnected {
2151                    if let Some(func) = dlock.func_disconnected.as_mut() {
2152                        func();
2153                    }
2154                    if let Some(dup) = dlock.func_data_update.as_mut() {
2155                        dup(DataUpdate::Disconnected());
2156                    }
2157                    dlock.call_disconnected = false;
2158                }
2159            }
2160        } else {
2161            #[cfg(feature = "__dev")]
2162            info!("stream is not open for writing");
2163        }
2164        Ok(())
2165    }
2166
2167    async fn internal_login(&self, username: &str, loginpassw: LoginPassw) -> IoResult<()> {
2168        self.internal_iosend({
2169            let dlock = self.data.read().await;
2170            Self::get_packet_write(&WritePacket::Login(
2171                username.to_owned(),
2172                loginpassw,
2173                dlock.game_token.clone(),
2174                dlock.variables.clone(),
2175                dlock.syncs.clone(),
2176                dlock.room.clone(),
2177            ))?
2178        })
2179        .await
2180    }
2181
2182    /// Try to log into an existing account.
2183    ///
2184    /// The login result will be sent as a callback event.
2185    /// Use [CrystalServer::callback_set_login] and [CrystalServer::callback_set_login_token]
2186    /// respectively to obtain data from the login attempt.
2187    pub async fn login(&self, username: &str, passw: &str) -> IoResult<()> {
2188        self.internal_login(username, LoginPassw::Passw(passw.to_owned()))
2189            .await
2190    }
2191
2192    /// Try to log into an existing account using a login-token.
2193    ///
2194    /// The login result will be sent as a callback event.
2195    /// Use [CrystalServer::callback_set_login] and [CrystalServer::callback_set_login_token]
2196    /// respectively to obtain data from the login attempt.
2197    pub async fn login_with_token(&self, username: &str, token: &str) -> IoResult<()> {
2198        self.internal_login(username, LoginPassw::Token(token.to_owned()))
2199            .await
2200    }
2201
2202    /// Try to register a new account.
2203    ///
2204    /// The register result will be sent as a callback event.
2205    /// Use [CrystalServer::callback_set_login] to obtain data from the registration attempt.
2206    pub async fn register(
2207        &self,
2208        username: &str,
2209        email: &str,
2210        passw: &str,
2211        repeat_passw: &str,
2212    ) -> IoResult<()> {
2213        self.internal_iosend(Self::get_packet_write(&WritePacket::Register(
2214            username.to_owned(),
2215            email.to_owned(),
2216            passw.to_owned(),
2217            repeat_passw.to_owned(),
2218        ))?)
2219        .await
2220    }
2221
2222    /// Obtain the current Player ID.
2223    /// Note that this will only return something if the player is logged in.
2224    pub async fn get_player_id(&self) -> Option<u64> {
2225        self.data.read().await.player_id
2226    }
2227
2228    /// Obtain the current Player Name.
2229    /// Note that this will only return something if the player is logged in.
2230    pub async fn get_player_name(&self) -> Option<String> {
2231        self.data.read().await.player_name.clone()
2232    }
2233
2234    /// Sets a variable with the name and value provided.
2235    pub async fn set_variable(&self, name: &str, value: Value) {
2236        let mut dlock = self.data.write().await;
2237        if let Some(orgvalue) = dlock.variables.get(name) {
2238            if value == *orgvalue {
2239                return;
2240            }
2241        }
2242        dlock.variables.insert(name.to_owned(), value);
2243        dlock.update_variable.insert(name.to_owned());
2244    }
2245
2246    /// Removes a variable with the provided name.
2247    pub async fn remove_variable(&self, name: &str) {
2248        let mut dlock = self.data.write().await;
2249        if dlock.variables.remove(name).is_some() {
2250            dlock.update_variable.insert(name.to_owned());
2251        }
2252    }
2253
2254    /// An iterator to obtain the other connected players data. (data such as syncs, variables, etc.)
2255    pub async fn iter_other_players(&self) -> impl Stream<Item = (u64, Player)> {
2256        let data = self.data.clone();
2257        async_stream::stream! {
2258            for id in data.read().await.players.keys().cloned().collect::<Vec<u64>>() {
2259                if let Some(player) = data.read().await.players.get(&id).cloned() {
2260                    yield (id, player);
2261                }
2262            }
2263        }
2264    }
2265
2266    /// Gets the current amount of other players.
2267    pub async fn other_player_count(&self) -> usize {
2268        self.data.read().await.players.len()
2269    }
2270
2271    /// Gets the data of a player from the Player ID if the player is connected.
2272    /// If the player is not found, it returns [None].
2273    pub async fn get_other_player(&self, pid: u64) -> Option<Player> {
2274        self.data.read().await.players.get(&pid).cloned()
2275    }
2276
2277    /// Gets the data of a player from the Player Name if the player is connected.
2278    /// /// If the player is not found, it returns [None].
2279    pub async fn get_other_player_name(&self, name: &str) -> Option<(u64, Player)> {
2280        let name = name.to_lowercase();
2281        self.data
2282            .read()
2283            .await
2284            .players
2285            .iter()
2286            .find(|(_pid, player)| player.name.to_lowercase() == name)
2287            .map(|(pid, player)| (*pid, player.clone()))
2288    }
2289
2290    /// Force an update of a variable, note that this will only be useful if the
2291    /// player is not in the same room as you.
2292    ///
2293    /// The result will be saved as normal and will be sent as a callback event.
2294    /// Use [CrystalServer::callback_set_data_update] or the `callback` variable to fetch it.
2295    pub async fn request_other_player_variable(
2296        &self,
2297        pid: u64,
2298        name: &str,
2299        callback: Option<PlayerVariableServerUpdate>,
2300    ) -> IoResult<()> {
2301        let mut dlock = self.data.write().await;
2302        if dlock.players.contains_key(&pid) {
2303            let index = if let Some((index, csu)) = dlock
2304                .callback_server_update
2305                .iter_mut()
2306                .find(|(_, csu)| csu.is_none())
2307            {
2308                *csu = Some(CallbackServerUpdate {
2309                    name: name.to_owned(),
2310                    callback: ServerUpdateCallback::PlayerVariable(callback, pid),
2311                });
2312                *index
2313            } else {
2314                let index = dlock.callback_server_index;
2315                dlock.callback_server_update.insert(
2316                    index,
2317                    Some(CallbackServerUpdate {
2318                        name: name.to_owned(),
2319                        callback: ServerUpdateCallback::PlayerVariable(callback, pid),
2320                    }),
2321                );
2322                dlock.callback_server_index += 1;
2323                index
2324            };
2325            self.internal_iosend(Self::get_packet_write(
2326                &WritePacket::RequestPlayerVariable(
2327                    PlayerRequest::ID(pid),
2328                    index as u64,
2329                    name.to_owned(),
2330                ),
2331            )?)
2332            .await?;
2333            Ok(())
2334        } else {
2335            Err(Error::new(ErrorKind::NotFound, "unable to find player id"))
2336        }
2337    }
2338
2339    /// Sends a message "peer-to-peer" to the requested target.
2340    /// The Message ID will allow you to quickly differentiate what message it's
2341    /// supposed to be.
2342    pub async fn p2p(
2343        &self,
2344        target: PlayerRequest,
2345        message_id: i16,
2346        payload: Vec<Value>,
2347    ) -> IoResult<()> {
2348        self.internal_iosend(Self::get_packet_write(&WritePacket::P2P(
2349            target, message_id, payload,
2350        ))?)
2351        .await
2352    }
2353
2354    /// Sets the current game version.
2355    pub async fn set_version(&self, version: f64) -> IoResult<()> {
2356        self.data.write().await.version = version;
2357        self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateGameVersion(
2358            version,
2359        ))?)
2360        .await
2361    }
2362
2363    /// Gets the current set version.
2364    pub async fn get_version(&self) -> f64 {
2365        self.data.read().await.version
2366    }
2367
2368    /// Gets the registered server version.
2369    pub async fn get_server_version(&self) -> f64 {
2370        self.data.read().await.game_version
2371    }
2372
2373    /// Sets the current game session.
2374    pub async fn set_session(&self, session: &str) -> IoResult<()> {
2375        self.data.write().await.session = session.to_owned();
2376        self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateGameSession(
2377            session.to_owned(),
2378        ))?)
2379        .await
2380    }
2381
2382    /// Gets the current set session.
2383    pub async fn get_session(&self) -> String {
2384        self.data.read().await.session.clone()
2385    }
2386
2387    fn get_save_key(file: &str, section: &str, key: &str) -> String {
2388        if file.is_empty() {
2389            format!(
2390                "{}>{}",
2391                urlencoding::encode(section),
2392                urlencoding::encode(key)
2393            )
2394        } else {
2395            format!(
2396                "{}>{}>{}",
2397                urlencoding::encode(file),
2398                urlencoding::encode(section),
2399                urlencoding::encode(key)
2400            )
2401        }
2402    }
2403
2404    /// Gets the currently open playerini file.
2405    /// The result will be empty is no files are open.
2406    pub async fn get_open_playerini(&self) -> String {
2407        self.data.read().await.player_open_save.clone()
2408    }
2409
2410    /// Opens a playerini file.
2411    pub async fn open_playerini(&self, file: &str) {
2412        self.data.write().await.player_open_save = file.to_owned();
2413    }
2414
2415    /// Closes the currently open playerini file.
2416    pub async fn close_playerini(&self) {
2417        self.data.write().await.player_open_save.clear();
2418    }
2419
2420    /// Checks if the requested section & key exists in the currently open playerini file.
2421    pub async fn has_playerini(&self, section: &str, key: &str) -> bool {
2422        let dlock = self.data.read().await;
2423        dlock
2424            .player_save
2425            .contains_key(&Self::get_save_key(&dlock.player_open_save, section, key))
2426    }
2427
2428    /// Returns the saved value in the section & key of the currently open playerini file.
2429    /// If the value is not found, it returns [None].
2430    pub async fn get_playerini(&self, section: &str, key: &str) -> Option<Value> {
2431        let dlock = self.data.read().await;
2432        dlock
2433            .player_save
2434            .get(&Self::get_save_key(&dlock.player_open_save, section, key))
2435            .cloned()
2436    }
2437
2438    /// Saves a new value for the requested section & key of the currently open playerini file.
2439    pub async fn set_playerini(&self, section: &str, key: &str, value: Value) {
2440        let mut dlock = self.data.write().await;
2441        let save_key = Self::get_save_key(&dlock.player_open_save, section, key);
2442        if let Some(orgvalue) = dlock.player_save.get(&save_key) {
2443            if value == *orgvalue {
2444                return;
2445            }
2446        }
2447        dlock.player_save.insert(save_key.clone(), value.clone());
2448        dlock.update_playerini.insert(save_key);
2449    }
2450
2451    /// Removes the saved value for the requested section & key of the currently open playerini file.
2452    pub async fn remove_playerini(&self, section: &str, key: &str) {
2453        let mut dlock = self.data.write().await;
2454        let save_key = Self::get_save_key(&dlock.player_open_save, section, key);
2455        if dlock.player_save.remove(&save_key).is_some() {
2456            dlock.update_playerini.insert(save_key);
2457        }
2458    }
2459
2460    /// Gets the currently open gameini file.
2461    /// The result will be empty is no files are open.
2462    pub async fn get_open_gameini(&self) -> String {
2463        self.data.read().await.game_open_save.clone()
2464    }
2465
2466    /// Opens a gameini file.
2467    pub async fn open_gameini(&self, file: &str) {
2468        self.data.write().await.game_open_save = file.to_owned();
2469    }
2470
2471    /// Closes the currently open gameini file.
2472    pub async fn close_gameini(&self) {
2473        self.data.write().await.game_open_save.clear();
2474    }
2475
2476    /// Checks if the requested section & key exists in the currently open gameini file.
2477    pub async fn has_gameini(&self, section: &str, key: &str) -> bool {
2478        let dlock = self.data.read().await;
2479        dlock
2480            .game_save
2481            .contains_key(&Self::get_save_key(&dlock.game_open_save, section, key))
2482    }
2483
2484    /// Returns the saved value in the section & key of the currently open gameini file.
2485    /// If the value is not found, it returns [None].
2486    pub async fn get_gameini(&self, section: &str, key: &str) -> Option<Value> {
2487        let dlock = self.data.read().await;
2488        dlock
2489            .game_save
2490            .get(&Self::get_save_key(&dlock.game_open_save, section, key))
2491            .cloned()
2492    }
2493
2494    /// Saves a new value for the requested section & key of the currently open gameini file.
2495    pub async fn set_gameini(&self, section: &str, key: &str, value: Value) {
2496        let mut dlock = self.data.write().await;
2497        let save_key = Self::get_save_key(&dlock.game_open_save, section, key);
2498        if let Some(orgvalue) = dlock.game_save.get(&save_key) {
2499            if value == *orgvalue {
2500                return;
2501            }
2502        }
2503        dlock.game_save.insert(save_key.clone(), value.clone());
2504        dlock.update_gameini.insert(save_key);
2505    }
2506
2507    /// Removes the saved value for the requested section & key of the currently open gameini file.
2508    pub async fn remove_gameini(&self, section: &str, key: &str) {
2509        let mut dlock = self.data.write().await;
2510        let save_key = Self::get_save_key(&dlock.game_open_save, section, key);
2511        if dlock.game_save.remove(&save_key).is_some() {
2512            dlock.update_gameini.insert(save_key);
2513        }
2514    }
2515
2516    /// Checks if an achievement exists.
2517    pub async fn has_achievement(&self, aid: u64) -> bool {
2518        self.data
2519            .read()
2520            .await
2521            .game_achievements
2522            .contains_key(&Leb(aid))
2523    }
2524
2525    /// Gets an achievement data.
2526    /// If it doesn't exist it returns [None].
2527    pub async fn get_achievement(&self, aid: u64) -> Option<Achievement> {
2528        self.data
2529            .read()
2530            .await
2531            .game_achievements
2532            .get(&Leb(aid))
2533            .cloned()
2534    }
2535
2536    /// Checks if the player has reached an achievement.
2537    pub async fn has_reached_achievement(&self, aid: u64) -> bool {
2538        let dlock = self.data.read().await;
2539        if dlock.player_id.is_some() {
2540            dlock
2541                .game_achievements
2542                .get(&Leb(aid))
2543                .map(|achievement| achievement.unlocked.is_some())
2544                .unwrap_or(false)
2545        } else {
2546            false
2547        }
2548    }
2549
2550    /// Get the timestamp (on Unix UTC) of when the achievement
2551    /// was unlocked.
2552    /// If the player hasn't unlocked the achievement yet, it returns [None].
2553    pub async fn get_reached_achievement(&self, aid: u64) -> Option<i64> {
2554        let dlock = self.data.read().await;
2555        if dlock.player_id.is_some() {
2556            dlock
2557                .game_achievements
2558                .get(&Leb(aid))
2559                .map(|achievement| achievement.unlocked)?
2560        } else {
2561            None
2562        }
2563    }
2564
2565    /// Make the current player reach an achievement.
2566    pub async fn reach_achievement(&self, aid: u64) -> IoResult<()> {
2567        if !self.has_reached_achievement(aid).await {
2568            let mut dlock = self.data.write().await;
2569            if dlock.player_id.is_some() {
2570                if let Some(achievement) = dlock.game_achievements.get_mut(&Leb(aid)) {
2571                    achievement.unlocked = Some(Utc::now().timestamp());
2572                    self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateAchievement(
2573                        aid,
2574                    ))?)
2575                    .await?;
2576                }
2577            }
2578        }
2579        Ok(())
2580    }
2581
2582    /// Checks if a highscore exists.
2583    pub async fn has_highscore(&self, hid: u64) -> bool {
2584        self.data
2585            .read()
2586            .await
2587            .game_achievements
2588            .contains_key(&Leb(hid))
2589    }
2590
2591    /// Gets a highscore data.
2592    /// If the highscore doesn't exist, it returns [None].
2593    pub async fn get_highscore(&self, hid: u64) -> Option<Highscore> {
2594        self.data
2595            .read()
2596            .await
2597            .game_highscores
2598            .get(&Leb(hid))
2599            .cloned()
2600    }
2601
2602    /// Checks if the player has a score on a highscore.
2603    pub async fn has_score_highscore(&self, hid: u64) -> bool {
2604        let dlock = self.data.read().await;
2605        if let Some(player_id) = dlock.player_id {
2606            if let Some(highscore) = dlock.game_highscores.get(&Leb(hid)) {
2607                highscore.scores.contains_key(&Leb(player_id))
2608            } else {
2609                false
2610            }
2611        } else {
2612            false
2613        }
2614    }
2615
2616    /// Gets the last recorded score from the player on this highscore.
2617    /// If the highscore doesn't exist, it returns [None].
2618    pub async fn get_score_highscore(&self, hid: u64) -> Option<f64> {
2619        let dlock = self.data.read().await;
2620        if let Some(player_id) = dlock.player_id {
2621            if let Some(highscore) = dlock.game_highscores.get(&Leb(hid)) {
2622                highscore.scores.get(&Leb(player_id)).cloned()
2623            } else {
2624                None
2625            }
2626        } else {
2627            None
2628        }
2629    }
2630
2631    /// Records a new player score on the specified highscore.
2632    pub async fn set_score_highscore(&self, hid: u64, score: f64) -> IoResult<()> {
2633        let mut dlock = self.data.write().await;
2634        if let Some(player_id) = dlock.player_id {
2635            if let Some(highscore) = dlock.game_highscores.get_mut(&Leb(hid)) {
2636                if let Some(hscore) = highscore.scores.get_mut(&Leb(player_id)) {
2637                    if *hscore != score {
2638                        *hscore = score;
2639                        self.internal_iosend(Self::get_packet_write(
2640                            &WritePacket::UpdateHighscore(hid, score),
2641                        )?)
2642                        .await?;
2643                    }
2644                } else {
2645                    highscore.scores.insert(Leb(player_id), score);
2646                    self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateHighscore(
2647                        hid, score,
2648                    ))?)
2649                    .await?;
2650                }
2651            }
2652        }
2653        Ok(())
2654    }
2655
2656    /// Creates a new sync.
2657    pub async fn create_sync(&self, sync_type: SyncType, kind: i16) -> usize {
2658        let mut dlock = self.data.write().await;
2659        let slot = if let Some((index, sync)) = dlock
2660            .syncs
2661            .iter_mut()
2662            .enumerate()
2663            .find(|(_, sync)| sync.is_none())
2664        {
2665            *sync = Some(SelfSync {
2666                kind,
2667                sync_type,
2668                variables: HashMap::new(),
2669                to_sync: HashSet::new(),
2670            });
2671            index
2672        } else {
2673            dlock.syncs.push(Some(SelfSync {
2674                kind,
2675                sync_type,
2676                variables: HashMap::new(),
2677                to_sync: HashSet::new(),
2678            }));
2679            dlock.syncs.len() - 1
2680        };
2681        dlock.new_sync_queue.push(NewSyncQueue {
2682            slot,
2683            kind,
2684            sync_type,
2685        });
2686        slot
2687    }
2688
2689    /// Destroys a created sync.
2690    pub async fn destroy_sync(&self, sync: usize) {
2691        let mut dlock = self.data.write().await;
2692        if let Some(Some(_)) = dlock.syncs.get(sync) {
2693            dlock.syncs_remove.push(sync);
2694        }
2695    }
2696
2697    /// Set a sync variable with the specified name and value.
2698    pub async fn set_variable_sync(&self, sync: usize, name: &str, value: Value) {
2699        let mut dlock = self.data.write().await;
2700        if let Some(Some(sync)) = dlock.syncs.get_mut(sync) {
2701            if let Some(orgvalue) = sync.variables.get(name) {
2702                if value == *orgvalue {
2703                    return;
2704                }
2705            }
2706            sync.variables.insert(name.to_owned(), value);
2707            sync.to_sync.insert(name.to_owned());
2708        }
2709    }
2710
2711    /// Remove a sync variable with the specified name.
2712    pub async fn remove_variable_sync(&self, sync: usize, name: &str) {
2713        let mut dlock = self.data.write().await;
2714        if let Some(Some(sync)) = dlock.syncs.get_mut(sync) {
2715            if sync.variables.remove(name).is_some() {
2716                sync.to_sync.insert(name.to_owned());
2717            }
2718        }
2719    }
2720
2721    /// Gets a sync variable from another player with the specified Sync ID and name.
2722    pub async fn get_variable_other_sync(
2723        &self,
2724        pid: u64,
2725        sync: usize,
2726        name: &str,
2727    ) -> Option<Value> {
2728        let dlock = self.data.read().await;
2729        if let Some(player) = dlock.players.get(&pid) {
2730            if let Some(Some(sync)) = player.syncs.get(sync) {
2731                sync.variables.get(name).cloned()
2732            } else {
2733                None
2734            }
2735        } else {
2736            None
2737        }
2738    }
2739
2740    /// Checks if a sync variable exists of another player with the specified Sync ID and name.
2741    /// If the player or sync is not found, it returns [None].
2742    pub async fn has_variable_other_sync(&self, pid: u64, sync: usize, name: &str) -> Option<bool> {
2743        let dlock = self.data.read().await;
2744        if let Some(player) = dlock.players.get(&pid) {
2745            if let Some(Some(sync)) = player.syncs.get(sync) {
2746                Some(sync.variables.contains_key(name))
2747            } else {
2748                None
2749            }
2750        } else {
2751            None
2752        }
2753    }
2754
2755    /// An iterator to obtain all the other players' sync data. (data such as variables, current event, etc.)
2756    pub async fn iter_other_syncs(&self) -> impl Stream<Item = SyncIter> {
2757        let data = self.data.clone();
2758
2759        Box::pin(async_stream::stream! {
2760            let pids = data.read().await.players.keys().cloned().collect::<Vec<u64>>();
2761            for id in pids {
2762                let player_data = data.read().await.players.get(&id).cloned();
2763                if let Some(player) = player_data {
2764                    let syncs_data = player.syncs.iter().cloned().enumerate();
2765                    for (index, sync) in syncs_data {
2766                        if let Some(sync) = sync {
2767                            {
2768                                if data.read().await.room != player.room && sync.event != SyncEvent::End {
2769                                    continue;
2770                                }
2771                            }
2772                            let iter = SyncIter {
2773                                player_id: id,
2774                                player_name: player.name.clone(),
2775                                slot: index,
2776                                event: sync.event,
2777                                kind: sync.kind,
2778                                variables: sync.variables.clone(),
2779                            };
2780                            if sync.event == SyncEvent::New {
2781                                if let Some(player) = data.write().await.players.get_mut(&id) {
2782                                    if let Some(Some(sync)) = player.syncs.get_mut(index) {
2783                                        if sync.event == SyncEvent::New { // Make sure nothing happened while we were iterating
2784                                            sync.event = SyncEvent::Step;
2785                                        }
2786                                    }
2787                                }
2788                            }
2789                            yield iter;
2790                        }
2791                    }
2792                }
2793            }
2794        })
2795    }
2796
2797    /// Checks if a player is an administrator.
2798    pub async fn is_player_admin(&self, pid: u64) -> bool {
2799        self.data
2800            .read()
2801            .await
2802            .game_administrators
2803            .contains_key(&Leb(pid))
2804    }
2805
2806    /// Obtains the administrator data of a player.
2807    /// If the player is not an administrator, it returns [None].
2808    pub async fn get_player_admin(&self, pid: u64) -> Option<Administrator> {
2809        self.data
2810            .read()
2811            .await
2812            .game_administrators
2813            .get(&Leb(pid))
2814            .cloned()
2815    }
2816
2817    /// Kicks the specified player from the game.
2818    /// If the player doesn't have permission to kick the player,
2819    /// the function will return [false] and nothing will happen.
2820    /// You can kick yourself with this function without any checks.
2821    pub async fn player_kick(&self, pid: u64, reason: &str) -> IoResult<bool> {
2822        if self.get_player_id().await == Some(pid)
2823            || self
2824                .get_player_admin(self.get_player_id().await.unwrap_or(u64::MAX))
2825                .await
2826                .unwrap_or_default()
2827                .can_kick
2828        {
2829            self.internal_iosend(Self::get_packet_write(&WritePacket::AdminAction(
2830                AdminAction::Kick(reason.to_owned()),
2831                pid,
2832            ))?)
2833            .await?;
2834            Ok(true)
2835        } else {
2836            Ok(false)
2837        }
2838    }
2839
2840    /// Bans the specified player from the game.
2841    /// If the player doesn't have permission to ban the player,
2842    /// the function will return [false] and nothing will happen.
2843    /// You can ban yourself with this function without any checks.
2844    pub async fn player_ban(
2845        &self,
2846        pid: u64,
2847        reason: &str,
2848        unban_time: DateTime<Utc>,
2849    ) -> IoResult<bool> {
2850        if self.get_player_id().await == Some(pid)
2851            || self
2852                .get_player_admin(self.get_player_id().await.unwrap_or(u64::MAX))
2853                .await
2854                .unwrap_or_default()
2855                .can_ban
2856        {
2857            self.internal_iosend(Self::get_packet_write(&WritePacket::AdminAction(
2858                AdminAction::Ban(reason.to_string(), unban_time.timestamp()),
2859                pid,
2860            ))?)
2861            .await?;
2862            Ok(true)
2863        } else {
2864            Ok(false)
2865        }
2866    }
2867
2868    /// Unbans the specified player from the game.
2869    /// If the player doesn't have permission to unban the player,
2870    /// the function will return [false] and nothing will happen.
2871    pub async fn player_unban(&self, pid: u64) -> IoResult<bool> {
2872        if self
2873            .get_player_admin(self.get_player_id().await.unwrap_or(u64::MAX))
2874            .await
2875            .unwrap_or_default()
2876            .can_unban
2877        {
2878            self.internal_iosend(Self::get_packet_write(&WritePacket::AdminAction(
2879                AdminAction::Unban,
2880                pid,
2881            ))?)
2882            .await?;
2883            Ok(true)
2884        } else {
2885            Ok(false)
2886        }
2887    }
2888
2889    /// Logs out from the account currently playing in.
2890    pub async fn logout(&self) -> IoResult<bool> {
2891        if self.is_loggedin().await {
2892            self.internal_iosend(Self::get_packet_write(&WritePacket::Logout())?)
2893                .await?;
2894            Ok(true)
2895        } else {
2896            Ok(false)
2897        }
2898    }
2899
2900    /// Force an update of a variable from a sync, note that this will only be useful if the
2901    /// player is not in the same room as you.
2902    ///
2903    /// The result will be saved as normal and will be sent as a callback event.
2904    /// Use [CrystalServer::callback_set_data_update] or the `callback` variable to fetch it.
2905    pub async fn request_other_sync_variable(
2906        &self,
2907        pid: u64,
2908        slot: usize,
2909        name: &str,
2910        callback: Option<SyncVariableServerUpdate>,
2911    ) -> IoResult<()> {
2912        let mut dlock = self.data.write().await;
2913        if let Some(player) = dlock.players.get(&pid) {
2914            if let Some(Some(_)) = player.syncs.get(slot) {
2915                let index = if let Some((index, csu)) = dlock
2916                    .callback_server_update
2917                    .iter_mut()
2918                    .find(|(_, csu)| csu.is_none())
2919                {
2920                    *csu = Some(CallbackServerUpdate {
2921                        name: name.to_owned(),
2922                        callback: ServerUpdateCallback::SyncVariable(callback, pid, slot),
2923                    });
2924                    *index
2925                } else {
2926                    let index = dlock.callback_server_index;
2927                    dlock.callback_server_update.insert(
2928                        index,
2929                        Some(CallbackServerUpdate {
2930                            name: name.to_owned(),
2931                            callback: ServerUpdateCallback::SyncVariable(callback, pid, slot),
2932                        }),
2933                    );
2934                    dlock.callback_server_index += 1;
2935                    index
2936                };
2937                self.internal_iosend(Self::get_packet_write(&WritePacket::RequestSyncVariable(
2938                    pid,
2939                    index as u64,
2940                    slot as u64,
2941                    name.to_owned(),
2942                ))?)
2943                .await?;
2944                Ok(())
2945            } else {
2946                Err(Error::new(ErrorKind::NotFound, "unable to find sync"))
2947            }
2948        } else {
2949            Err(Error::new(ErrorKind::NotFound, "unable to find player id"))
2950        }
2951    }
2952
2953    /// Get the data of a Binary Data Block.
2954    ///
2955    /// The result will be saved as normal and will be sent as a callback event.
2956    /// Use [CrystalServer::callback_set_data_update] or the `callback` variable to fetch it.
2957    pub async fn fetch_bdb(
2958        &self,
2959        name: &str,
2960        callback: Option<FetchBdbServerUpdate>,
2961    ) -> IoResult<()> {
2962        let mut dlock = self.data.write().await;
2963        let index = if let Some((index, csu)) = dlock
2964            .callback_server_update
2965            .iter_mut()
2966            .find(|(_, csu)| csu.is_none())
2967        {
2968            *csu = Some(CallbackServerUpdate {
2969                name: name.to_owned(),
2970                callback: ServerUpdateCallback::FetchBdb(callback),
2971            });
2972            *index
2973        } else {
2974            let index = dlock.callback_server_index;
2975            dlock.callback_server_update.insert(
2976                index,
2977                Some(CallbackServerUpdate {
2978                    name: name.to_owned(),
2979                    callback: ServerUpdateCallback::FetchBdb(callback),
2980                }),
2981            );
2982            dlock.callback_server_index += 1;
2983            index
2984        };
2985        self.internal_iosend(Self::get_packet_write(&WritePacket::RequestBdb(
2986            index as u64,
2987            name.to_owned(),
2988        ))?)
2989        .await?;
2990        Ok(())
2991    }
2992
2993    /// Update the data of a Binary Data Block.
2994    pub async fn set_bdb(&self, name: &str, data: Vec<u8>) -> IoResult<()> {
2995        self.internal_iosend(Self::get_packet_write(&WritePacket::SetBdb(
2996            name.to_owned(),
2997            data,
2998        ))?)
2999        .await
3000    }
3001
3002    /// Obtain the player's incoming friend requests.
3003    /// If the player is not logged in, it returns [None].
3004    pub async fn get_incoming_friends(&self) -> Option<IntSet<u64>> {
3005        if self.is_loggedin().await {
3006            Some(self.data.read().await.player_incoming_friends.clone())
3007        } else {
3008            None
3009        }
3010    }
3011
3012    /// Obtain the player's outgoing friend requests.
3013    /// If the player is not logged in, it returns [None].
3014    pub async fn get_outgoing_friends(&self) -> Option<IntSet<u64>> {
3015        if self.is_loggedin().await {
3016            Some(self.data.read().await.player_outgoing_friends.clone())
3017        } else {
3018            None
3019        }
3020    }
3021
3022    /// Obtain the player's current friends.
3023    /// If the player is not logged in, it returns [None].
3024    pub async fn get_friends(&self) -> Option<IntSet<u64>> {
3025        if self.is_loggedin().await {
3026            Some(self.data.read().await.player_friends.clone())
3027        } else {
3028            None
3029        }
3030    }
3031
3032    /// Send a friend request to a player.
3033    pub async fn send_outgoing_friend(&self, pid: u64) -> IoResult<()> {
3034        if self.is_loggedin().await {
3035            {
3036                let mut dlock = self.data.write().await;
3037                if dlock.player_friends.contains(&pid)
3038                    || dlock.player_outgoing_friends.contains(&pid)
3039                    || dlock.player_incoming_friends.contains(&pid)
3040                {
3041                    return Ok(());
3042                }
3043                dlock.player_outgoing_friends.insert(pid);
3044            }
3045            self.internal_iosend(Self::get_packet_write(
3046                &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Request, pid),
3047            )?)
3048            .await?;
3049        }
3050        Ok(())
3051    }
3052
3053    /// Cancel the sent friend request of a player.
3054    pub async fn remove_outgoing_friend(&self, pid: u64) -> IoResult<()> {
3055        if self.is_loggedin().await {
3056            {
3057                let mut dlock = self.data.write().await;
3058                if !dlock.player_outgoing_friends.contains(&pid) {
3059                    return Ok(());
3060                }
3061                dlock.player_outgoing_friends.remove(&pid);
3062            }
3063            self.internal_iosend(Self::get_packet_write(
3064                &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Cancel, pid),
3065            )?)
3066            .await?;
3067        }
3068        Ok(())
3069    }
3070
3071    /// Deny the received friend request of a player.
3072    pub async fn deny_incoming_friend(&self, pid: u64) -> IoResult<()> {
3073        if self.is_loggedin().await {
3074            {
3075                let mut dlock = self.data.write().await;
3076                if !dlock.player_incoming_friends.contains(&pid) {
3077                    return Ok(());
3078                }
3079                dlock.player_incoming_friends.remove(&pid);
3080            }
3081            self.internal_iosend(Self::get_packet_write(
3082                &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Deny, pid),
3083            )?)
3084            .await?;
3085        }
3086        Ok(())
3087    }
3088
3089    /// Accept the received the friend request of a player.
3090    pub async fn accept_incoming_friend(&self, pid: u64) -> IoResult<()> {
3091        if self.is_loggedin().await {
3092            {
3093                let mut dlock = self.data.write().await;
3094                if !dlock.player_incoming_friends.contains(&pid)
3095                    || dlock.player_friends.contains(&pid)
3096                {
3097                    return Ok(());
3098                }
3099                dlock.player_incoming_friends.remove(&pid);
3100                dlock.player_friends.insert(pid);
3101            }
3102            self.internal_iosend(Self::get_packet_write(
3103                &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Accept, pid),
3104            )?)
3105            .await?;
3106        }
3107        Ok(())
3108    }
3109
3110    /// Remove a friend from the player's friend list.
3111    pub async fn remove_friend(&self, pid: u64) -> IoResult<()> {
3112        if self.is_loggedin().await {
3113            {
3114                let mut dlock = self.data.write().await;
3115                if !dlock.player_friends.contains(&pid) {
3116                    return Ok(());
3117                }
3118                dlock.player_friends.remove(&pid);
3119            }
3120            self.internal_iosend(Self::get_packet_write(
3121                &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Remove, pid),
3122            )?)
3123            .await?;
3124        }
3125        Ok(())
3126    }
3127}