1use crate::types::NewSync;
2
3use super::{
4 buffer::Buffer,
5 leb::Leb,
6 types::{
7 self, Achievement, AdminAction, Administrator, CallbackServerUpdate, ChangeFriendStatus,
8 DataUpdate, FetchBdbServerUpdate, Highscore, LoginCode, LoginPassw, NewSyncQueue,
9 OptionalValue, Player, PlayerQueue, PlayerRequest, PlayerVariableServerUpdate,
10 RegistrationCode, SelfSync, ServerUpdateCallback, SyncEvent, SyncIter, SyncType,
11 SyncUpdate, SyncVariableServerUpdate, Value, VariableUpdate,
12 },
13};
14use age::{
15 x25519::{Identity, Recipient},
16 Decryptor, Encryptor,
17};
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use futures_util::{
21 stream::{SplitSink, SplitStream},
22 AsyncReadExt, AsyncWriteExt, SinkExt, Stream, StreamExt,
23};
24use integer_hasher::{IntMap, IntSet};
25use machineid_crystal::{Encryption, HWIDComponent, IdBuilder};
26use num_enum::TryFromPrimitive;
27use std::{
28 collections::{HashMap, HashSet},
29 io::{Cursor, Error, ErrorKind, Result as IoResult},
30 iter,
31 str::FromStr,
32 sync::Arc,
33};
34use tokio::{
35 net::TcpStream,
36 sync::{Mutex, RwLock},
37 task::JoinHandle,
38 time::Instant,
39};
40use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
41#[cfg(feature = "__dev")]
42use tracing::{info, warn};
43
44pub struct CrystalServer {
80 writer: Option<Arc<Mutex<StreamWriter>>>,
81 data: Arc<RwLock<StreamData>>,
82}
83
84struct StreamHandler;
85
86struct StreamReader {
87 stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
88
89 pub identity: Option<Identity>,
90}
91
92struct StreamWriter {
93 stream: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
94
95 pub recipient: Option<Recipient>,
96}
97
98type CallbackRoom = Box<dyn FnMut() -> String + Sync + Send>;
99type CallbackP2P = Box<dyn FnMut(Option<u64>, i16, Vec<Value>) + Sync + Send>;
100type CallbackRegister = Box<dyn FnMut(RegistrationCode) + Sync + Send>;
101type CallbackLogin = Box<dyn FnMut(LoginCode, Option<DateTime<Utc>>, Option<String>) + Sync + Send>;
102type CallbackBanned = Box<dyn FnMut(String, DateTime<Utc>) + Sync + Send>;
103type CallbackKicked = Box<dyn FnMut(String) + Sync + Send>;
104type CallbackDisconnected = Box<dyn FnMut() + Sync + Send>;
105type CallbackLoginToken = Box<dyn FnMut(String) + Sync + Send>;
106type CallbackDataUpdate = Box<dyn FnMut(DataUpdate) + Sync + Send>;
107
108#[derive(Default)]
109struct StreamData {
110 thread: Option<JoinHandle<()>>,
111 last_host: Option<String>,
112
113 is_connected: bool,
114 is_loggedin: bool,
115 is_connecting: bool,
116 is_reconnecting: bool,
117
118 game_id: String,
119 version: f64,
120 session: String,
121 game_token: String,
122 room: String,
123
124 func_room: Option<CallbackRoom>,
125 func_p2p: Option<CallbackP2P>,
126 func_register: Option<CallbackRegister>,
127 func_login: Option<CallbackLogin>,
128 func_banned: Option<CallbackBanned>,
129 func_kicked: Option<CallbackKicked>,
130 func_disconnected: Option<CallbackDisconnected>,
131 func_login_token: Option<CallbackLoginToken>,
132 func_data_update: Option<CallbackDataUpdate>,
133
134 player_id: Option<u64>,
135 player_name: Option<String>,
136 player_save: HashMap<String, Value>,
137 player_open_save: String,
138 player_friends: IntSet<u64>,
139 player_incoming_friends: IntSet<u64>,
140 player_outgoing_friends: IntSet<u64>,
141
142 game_save: HashMap<String, Value>,
143 game_open_save: String,
144 game_achievements: IntMap<Leb<u64>, Achievement>,
145 game_highscores: IntMap<Leb<u64>, Highscore>,
146 game_administrators: IntMap<Leb<u64>, Administrator>,
147 game_version: f64,
148
149 pub players: IntMap<u64, Player>,
150 players_logout: IntSet<u64>,
151 player_queue: IntMap<u64, PlayerQueue>,
152 variables: HashMap<String, Value>,
153 syncs: Vec<Option<SelfSync>>,
154 syncs_remove: Vec<usize>,
155
156 ping: f64,
157 last_ping: Option<Instant>,
158
159 new_sync_queue: Vec<NewSyncQueue>,
160 update_variable: HashSet<String>,
161 update_playerini: HashSet<String>,
162 update_gameini: HashSet<String>,
163 call_disconnected: bool,
164
165 callback_server_update: IntMap<u64, Option<CallbackServerUpdate>>,
166 callback_server_index: u64,
167
168 handshake_completed: bool,
169
170 registered_errors: Vec<ClientError>,
171}
172
173#[allow(dead_code)]
174#[derive(Debug)]
175enum ReaderError {
176 StreamError(String),
177 StreamEmpty(String),
178 StreamClosed(String),
179 Unknown(String),
180}
181
182#[allow(dead_code)]
183#[derive(Debug)]
184enum WriterError {
185 StreamError(String),
186 StreamClosed(String),
187 Unknown(String),
188}
189
190#[allow(dead_code, clippy::enum_variant_names)]
191#[derive(Debug)]
192enum ClientError {
193 HandlerResult(IoResult<()>),
194 HandlerResultString(String),
195 HandlerPanic(String),
196}
197
198macro_rules! unwrap_return {
199 ($value: expr, $return: expr) => {
200 if $value.is_err() {
201 return $return;
202 }
203 };
204 ($value: expr) => {
205 if $value.is_err() {
206 return;
207 }
208 };
209}
210
211impl StreamData {
212 pub async fn clear(&mut self, full: bool) {
213 self.is_loggedin = false;
214 self.call_disconnected = true;
215
216 self.player_name.take();
217 self.player_id.take();
218 self.player_save.clear();
219 self.player_open_save.clear();
220 self.player_queue.clear();
221 self.players_logout.clear();
222 self.new_sync_queue.clear();
223 self.update_variable.clear();
224 self.update_playerini.clear();
225 self.callback_server_update.clear();
226 self.callback_server_index = 0;
227 self.players.clear();
228
229 if full {
230 self.is_connecting = false;
231 self.is_connected = false;
232 self.is_reconnecting = false;
233
234 self.game_save.clear();
235 self.game_open_save.clear();
236 self.game_achievements.clear();
237 self.game_highscores.clear();
238 self.game_administrators.clear();
239 self.update_gameini.clear();
240
241 self.last_ping.take();
242 self.handshake_completed = false;
243 }
244 }
245}
246
247impl StreamHandler {
248 pub async fn split_stream(
249 stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
250 ) -> (StreamReader, StreamWriter) {
251 let split = stream.split();
252 (
253 StreamReader {
254 stream: Some(split.1),
255
256 identity: None,
257 },
258 StreamWriter {
259 stream: Some(split.0),
260
261 recipient: None,
262 },
263 )
264 }
265}
266
267impl StreamReader {
268 #[inline(always)]
269 pub async fn read(&mut self) -> Result<Buffer, ReaderError> {
270 if let Some(stream) = self.stream.as_mut() {
271 if let Some(Ok(frame)) = stream.next().await {
272 if frame.is_binary() {
273 let data = frame.into_data();
274 if !data.is_empty() {
275 if let Some(identity) = &self.identity {
276 let len = data.len();
277 if let Ok(decryptor) =
278 Decryptor::new_async_buffered(data.to_vec().as_slice()).await
279 {
280 if let Ok(mut reader) =
281 decryptor.decrypt_async(iter::once(identity as _))
282 {
283 let mut output = Vec::with_capacity(len);
284 if reader.read_to_end(&mut output).await.is_ok() {
285 Ok(Buffer::new(Cursor::new(output)))
286 } else {
287 Err(ReaderError::Unknown(String::from(
288 "decryptor reader errored out while reading",
289 )))
290 }
291 } else {
292 Err(ReaderError::Unknown(String::from(
293 "unable to make decryptor reader",
294 )))
295 }
296 } else {
297 Err(ReaderError::Unknown(String::from(
298 "unable to make decryptor",
299 )))
300 }
301 } else {
302 Ok(Buffer::new(Cursor::new(data.to_vec())))
303 }
304 } else {
305 Err(ReaderError::StreamEmpty(format!(
306 "tried to read {} byte(s) from ws",
307 data.len(),
308 )))
309 }
310 } else if frame.is_close() {
311 Err(ReaderError::StreamClosed(String::from(
312 "ws stream requested to close",
313 )))
314 } else if frame.is_ping() {
315 Ok(Buffer::empty())
316 } else {
317 Err(ReaderError::Unknown(format!(
318 "obtained an unexpected code for ws: {frame:?}",
319 )))
320 }
321 } else {
322 Err(ReaderError::StreamClosed(String::from(
323 "unable to obtain the next ws frame",
324 )))
325 }
326 } else {
327 Err(ReaderError::Unknown(String::from(
328 "no stream open to read from",
329 )))
330 }
331 }
332
333 }
338
339impl StreamWriter {
340 #[inline(always)]
341 pub async fn prepare_buffer(&self, mut buffer: Buffer) -> IoResult<Buffer> {
342 if let Some(recipient) = &self.recipient {
343 let encryptor = Encryptor::with_recipients(iter::once(recipient as _))
344 .expect("unable to make encryptor");
345 let mut output = Vec::with_capacity(buffer.len()? as usize);
346 let mut writer = encryptor.wrap_async_output(&mut output).await?;
347 writer.write_all(buffer.container.get_ref()).await?;
348 writer.flush().await?;
349 writer.close().await?;
350 buffer.container = Cursor::new(output);
351 }
352 Ok(buffer)
353 }
354
355 #[inline(always)]
356 pub async fn write(&mut self, data: Buffer) -> Result<(), WriterError> {
357 if let Ok(mut data) = self.prepare_buffer(data).await {
358 self.write_raw(&mut data).await
359 } else {
360 Err(WriterError::StreamError(String::from(
361 "unable to fetch new buffer",
362 )))
363 }
364 }
365
366 #[inline(always)]
367 pub async fn write_raw(&mut self, data: &mut Buffer) -> Result<(), WriterError> {
368 let data = {
371 let mut d = Vec::new();
372 unwrap_return!(
373 data.read_all(&mut d),
374 Err(WriterError::Unknown(String::from(
375 "unable to convert buffer into bytes"
376 )))
377 );
378 d
379 };
380 if let Some(stream) = self.stream.as_mut() {
381 unwrap_return!(
382 stream
383 .send(Message::Binary(Bytes::copy_from_slice(&data)))
384 .await,
385 Err(WriterError::StreamError(format!(
386 "unable to write {:?} byte(s) to a ws",
387 data.len(),
388 )))
389 );
390 unwrap_return!(
391 stream.flush().await,
392 Err(WriterError::StreamError(String::from(
393 "unable to flush the writer of a ws {:?}",
394 )))
395 );
396 Ok(())
397 } else {
398 Err(WriterError::Unknown(String::from(
399 "no stream open to write to",
400 )))
401 }
402 }
403
404 #[inline(always)]
405 pub async fn write_pong(&mut self) -> IoResult<()> {
406 if let Some(stream) = self.stream.as_mut() {
407 if stream.send(Message::Pong(Bytes::new())).await.is_err() {
408 Err(Error::from(ErrorKind::BrokenPipe))
409 } else {
410 Ok(())
411 }
412 } else {
413 Err(Error::from(ErrorKind::BrokenPipe))
414 }
415 }
416
417 #[inline(always)]
418 pub async fn shutdown(&mut self) {
419 if let Some(mut stream) = self.stream.take() {
420 let _ = stream.close().await;
421 }
422 }
423}
424
425#[derive(Debug, Clone)]
426enum ReadPacket {
427 Registration(RegistrationCode),
429 Login(LoginCode),
431 LoginBan(LoginCode, String, i64),
433 LoginOk(
435 u64,
436 String,
437 Option<String>,
438 HashMap<String, Value>,
439 IntSet<Leb<u64>>,
440 IntSet<Leb<u64>>,
441 IntSet<Leb<u64>>,
442 IntMap<Leb<u64>, Achievement>,
443 ),
444 PlayerLoggedIn(
446 u64,
447 String,
448 HashMap<String, Value>,
449 Vec<Option<types::Sync>>,
450 String,
451 ),
452 PlayerLoggedOut(u64),
454 SyncGameInfo(
456 HashMap<String, Value>,
457 IntMap<Leb<u64>, Achievement>,
458 IntMap<Leb<u64>, Highscore>,
459 IntMap<Leb<u64>, Administrator>,
460 f64,
461 ),
462 P2P(Option<Leb<u64>>, i16, Vec<Value>),
464 UpdatePlayerVariable(u64, Vec<VariableUpdate>),
466 Ping(Option<f64>),
468 ClearPlayers(),
469 GameIniWrite(Vec<VariableUpdate>),
471 NewSync(u64, Vec<(u64, i16, SyncType, HashMap<String, Value>)>),
473 PlayerChangedRooms(u64, String),
475 UpdateSync(u64, Vec<SyncUpdate>),
477 HighscoreUpdate(u64, u64, f64),
479 UpdatePlayerData(u64, Vec<Option<types::Sync>>, HashMap<String, Value>),
481 RequestPlayerVariable(u64, OptionalValue),
483 AdminAction(AdminAction),
485 RequestSyncVariable(u64, OptionalValue),
487 ChangeGameVersion(f64),
489 ModifyAdministrator(u64, Administrator),
491 RemoveAdministrator(u64),
493 ForceDisconnection(),
494 PlayerIniWrite(Vec<VariableUpdate>),
496 RequestBdb(u64, Option<Vec<u8>>),
498 ChangeFriendStatus(ChangeFriendStatus, u64),
500 Handshake(Option<String>),
502 ServerMessage(String),
504 ChangeConnection(String),
506}
507
508#[derive(Debug, Clone)]
509#[doc(hidden)]
510enum WritePacket {
511 InitializationHandshake([u64; 4], u64, String, String, f64, String),
513 Login(
515 String,
516 LoginPassw,
517 String,
518 HashMap<String, Value>,
519 Vec<Option<SelfSync>>,
520 String,
521 ),
522 Register(String, String, String, String),
524 RequestPlayerVariable(PlayerRequest, u64, String),
526 P2P(PlayerRequest, i16, Vec<Value>),
528 UpdateGameVersion(f64),
530 UpdateGameSession(String),
532 UpdatePlayerVariable(Vec<VariableUpdate>),
534 Ping(),
535 GameIniWrite(Vec<VariableUpdate>),
537 PlayerIniWrite(Vec<VariableUpdate>),
539 UpdateRoom(String),
541 NewSync(Vec<(u64, i16, SyncType, HashMap<String, Value>)>),
543 UpdateSync(Vec<SyncUpdate>),
545 UpdateAchievement(u64),
547 UpdateHighscore(u64, f64),
549 AdminAction(AdminAction, u64),
551 RequestSyncVariable(u64, u64, u64, String),
553 Logout(),
554 RequestBdb(u64, String),
556 SetBdb(String, Vec<u8>),
558 RequestChangeFriendStatus(ChangeFriendStatus, u64),
560 Handshake(String),
562}
563
564impl CrystalServer {
565 pub fn init(game_id: &str) -> Self {
566 Self {
567 writer: None,
568 data: Arc::new(RwLock::new(StreamData {
569 game_id: game_id.to_owned(),
570 ..Default::default()
571 })),
572 }
573 }
574
575 pub async fn connect(&mut self) {
577 #[cfg(feature = "__dev")]
578 info!("Connecting to the server");
579 {
580 #[cfg(feature = "__dev")]
581 info!("Fetching stream data");
582 let mut lock = self.data.write().await;
583 if let Some(thread) = &mut lock.thread {
584 #[cfg(feature = "__dev")]
585 info!("Aborting thread");
586 if !thread.is_finished() {
587 thread.abort();
588 }
589 }
590 lock.clear(true).await;
591 lock.is_connecting = true;
592 #[cfg(feature = "__dev")]
593 info!("Stream data OK");
594 }
595 let url = if let Some(url) = self.data.read().await.last_host.clone() {
596 url
597 } else if cfg!(feature = "__local") {
598 String::from("ws://localhost:16562")
599 } else {
600 String::from("ws://server.crystal-server.co:16562")
601 };
602 #[cfg(feature = "__dev")]
603 info!("Connecting to server at {url}");
604 match tokio_tungstenite::connect_async(url).await {
605 Ok((ws, _)) => {
606 #[cfg(feature = "__dev")]
607 info!("Connection OK, initializing");
608 let stream = StreamHandler::split_stream(ws).await;
609 let writer = Arc::new(Mutex::new(stream.1));
610 self.writer = Some(writer.clone());
611 #[cfg(feature = "__dev")]
612 info!("Stream setup OK");
613 let thread =
614 tokio::spawn(Self::stream_handler(stream.0, writer, self.data.clone()));
615 {
616 let mut lock = self.data.write().await;
617 lock.thread = Some(thread);
618 #[cfg(feature = "__dev")]
619 info!("Stream data thread OK");
620 }
621 }
622 Err(_e) => {
623 #[cfg(feature = "__dev")]
624 info!("Connection error: {_e:?}");
625 let mut dlock = self.data.write().await;
626 dlock.clear(true).await;
627 if dlock.call_disconnected {
628 if let Some(func) = dlock.func_disconnected.as_mut() {
629 func();
630 }
631 if let Some(dup) = dlock.func_data_update.as_mut() {
632 dup(DataUpdate::Disconnected());
633 }
634 dlock.call_disconnected = false;
635 }
636 if dlock.last_host.take().is_some() {
637 drop(dlock);
638 Box::pin(self.connect()).await;
639 } else {
640 dlock.is_connecting = false;
641 }
642 }
643 }
644 }
645
646 async fn stream_handler(
647 mut reader: StreamReader,
648 writer: Arc<Mutex<StreamWriter>>,
649 data: Arc<RwLock<StreamData>>,
650 ) {
651 let cdata = data.clone();
652 let cwriter = writer.clone();
653
654 let task = tokio::spawn(async move {
655 macro_rules! write_packet {
656 ($packet: expr) => {
657 unwrap_return!(
658 writer
659 .lock()
660 .await
661 .write(Self::get_packet_write(&$packet)?)
662 .await,
663 Err(Error::from(ErrorKind::BrokenPipe))
664 );
665 };
666 }
667
668 #[cfg(feature = "__dev")]
669 info!("Initialized stream handle task");
670
671 loop {
672 match reader.read().await {
673 Ok(mut buffer) => {
674 if buffer.is_empty()? {
675 continue;
676 }
677 if let Ok(packet) = CrystalServer::get_packet_read(buffer) {
678 #[cfg(feature = "__dev")]
679 info!("reading packet: {packet:?}");
680 match packet {
681 ReadPacket::Handshake(key) => {
682 if let Some(key) = key {
683 let encode_read = Identity::generate();
684 let encodepub_read = encode_read.to_public();
685 reader.identity = Some(encode_read);
686 match Recipient::from_str(&key) {
687 Ok(key) => {
688 {
689 let mut dlock = data.write().await;
690 dlock.is_connecting = false;
691 dlock.is_reconnecting = false;
692 dlock.is_connected = true;
693 }
694 writer.lock().await.recipient = Some(key);
695 }
696 Err(_e) => {
697 #[cfg(feature = "__dev")]
698 warn!("error while parsing key: {_e:?}");
699 {
700 let mut dlock = data.write().await;
701 dlock.clear(true).await;
702 dlock.registered_errors.push(
703 ClientError::HandlerResultString(String::from(
704 "unable to set write key for data writer",
705 )),
706 );
707 }
708 return Ok(());
709 }
710 }
711 write_packet!(WritePacket::Handshake(
712 encodepub_read.to_string(),
713 ));
714 } else {
715 let hwid = if let Ok(hwid) =
716 IdBuilder::new(Encryption::SHA256)
717 .add_component(HWIDComponent::CPUID)
718 .add_component(HWIDComponent::MacAddress)
719 .add_component(HWIDComponent::SystemID)
720 .build(None)
721 {
722 hwid
723 } else {
724 return Err(Error::from(ErrorKind::InvalidData));
725 };
726 let dlock = data.read().await;
727 write_packet!(WritePacket::InitializationHandshake(
728 [
729 0x3a0b1a04c51a2811,
730 0x97a18f1dc9ee891d,
731 0xfc7eb64f732a37fd,
732 0xbe65cbabde15c305,
733 ],
734 1,
735 hwid,
736 dlock.game_id.clone(),
737 dlock.version,
738 dlock.session.clone()
739 ));
740 }
741 }
742 ReadPacket::SyncGameInfo(
743 game_save,
744 game_achievements,
745 game_highscores,
746 game_administrators,
747 game_version,
748 ) => {
749 let mut dlock = data.write().await;
750 dlock.game_save = game_save;
751 dlock.game_achievements = game_achievements;
752 dlock.game_highscores = game_highscores;
753 dlock.game_administrators = game_administrators;
754 dlock.game_version = game_version;
755 dlock.handshake_completed = true;
756 }
757 ReadPacket::Ping(ping) => {
758 if let Some(ping) = ping {
759 let mut dlock = data.write().await;
760 dlock.ping = ping;
761 dlock.last_ping = Some(Instant::now());
762 } else {
763 {
764 write_packet!(WritePacket::Ping());
765 }
766 writer.lock().await.write_pong().await?;
767 }
768 }
769 ReadPacket::ForceDisconnection() => {
770 return Err(Error::new(
771 ErrorKind::BrokenPipe,
772 "forced disconnection registered",
773 ));
774 }
775 ReadPacket::Registration(code) => {
776 let mut dlock = data.write().await;
777 if let Some(reg) = &mut dlock.func_register {
778 reg(code);
779 }
780 if let Some(dup) = &mut dlock.func_data_update {
781 dup(DataUpdate::Registration(code));
782 }
783 }
784 ReadPacket::Login(code) => {
785 let mut dlock = data.write().await;
786 if let Some(reg) = &mut dlock.func_login {
787 reg(code, None, None);
788 }
789 if let Some(dup) = &mut dlock.func_data_update {
790 dup(DataUpdate::Login(code));
791 }
792 }
793 ReadPacket::LoginOk(
794 pid,
795 pname,
796 token,
797 savefile,
798 friends,
799 incoming_friends,
800 outgoing_friends,
801 game_achievements,
802 ) => {
803 let mut dlock = data.write().await;
804 dlock.player_id = Some(pid);
805 dlock.player_name = Some(pname.clone());
806 dlock.player_save = savefile;
807 dlock.game_achievements = game_achievements;
808 dlock.player_friends =
809 IntSet::from_iter(friends.iter().map(|pid| **pid));
810 dlock.player_incoming_friends =
811 IntSet::from_iter(incoming_friends.iter().map(|pid| **pid));
812 dlock.player_outgoing_friends =
813 IntSet::from_iter(outgoing_friends.iter().map(|pid| **pid));
814 dlock.is_loggedin = true;
815 if let Some(log) = &mut dlock.func_login {
816 log(LoginCode::Ok, None, None);
817 }
818 if let Some(dup) = &mut dlock.func_data_update {
819 dup(DataUpdate::LoginOk(pid, pname));
820 }
821 if let Some(token) = token {
822 if let Some(log) = &mut dlock.func_login_token {
823 log(token.clone());
824 }
825 if let Some(dup) = &mut dlock.func_data_update {
826 dup(DataUpdate::LoginToken(token));
827 }
828 }
829 }
830 ReadPacket::LoginBan(code, reason, unban_time) => {
831 let mut dlock = data.write().await;
832 if let Some(log) = &mut dlock.func_login {
833 log(
834 code,
835 DateTime::from_timestamp(unban_time, 0),
836 Some(reason.clone()),
837 );
838 }
839 if let Some(dup) = &mut dlock.func_data_update {
840 dup(DataUpdate::LoginBan(code, reason.clone(), unban_time));
841 }
842 }
843 ReadPacket::PlayerLoggedIn(pid, pname, vari, syncs, room) => {
844 let mut dlock = data.write().await;
845 dlock.players.insert(
846 pid,
847 Player {
848 name: pname.clone(),
849 room: room.clone(),
850 syncs,
851 variables: vari,
852 },
853 );
854 Self::iter_missing_data(&mut dlock, pid).await?;
855 if let Some(dup) = &mut dlock.func_data_update {
856 dup(DataUpdate::PlayerLoggedIn(pid, pname, room));
857 }
858 }
859 ReadPacket::PlayerLoggedOut(pid) => {
860 let mut dlock = data.write().await;
861 dlock.players.remove(&pid);
862 dlock.player_queue.remove(&pid);
863 if let Some(dup) = &mut dlock.func_data_update {
864 dup(DataUpdate::PlayerLoggedOut(pid));
865 }
866 }
867 ReadPacket::P2P(pid, mid, payload) => {
868 let mut dlock = data.write().await;
869 if let Some(p2p) = &mut dlock.func_p2p {
870 p2p(pid.map(|v| *v), mid, payload.clone());
871 }
872 if let Some(dup) = &mut dlock.func_data_update {
873 dup(DataUpdate::P2P(pid.map(|v| *v), mid, payload));
874 }
875 }
876 ReadPacket::UpdatePlayerVariable(pid, upds) => {
877 let mut dlock = data.write().await;
878 Self::iter_missing_data(&mut dlock, pid).await?;
879 if let Some(player) = dlock.players.get_mut(&pid) {
880 for upd in &upds {
881 if let OptionalValue::Some(value) = upd.value.clone() {
882 player
883 .variables
884 .insert(upd.name.clone(), value.clone());
885 } else {
886 player.variables.remove(&upd.name);
887 }
888 }
889 for upd in upds {
890 if let Some(dup) = &mut dlock.func_data_update {
891 dup(DataUpdate::UpdateVariable(
892 pid, upd.name, upd.value,
893 ));
894 }
895 }
896 } else {
897 if !dlock.player_queue.contains_key(&pid) {
898 dlock
899 .player_queue
900 .entry(pid)
901 .or_insert(PlayerQueue::default());
902 }
903 let pq = dlock
904 .player_queue
905 .get_mut(&pid)
906 .expect("expected to have data stored");
907 for upd in upds {
908 pq.variables.insert(upd.name, upd.value);
909 }
910 }
911 }
912 ReadPacket::UpdateSync(pid, upds) => {
913 let mut dlock = data.write().await;
915 Self::iter_missing_data(&mut dlock, pid).await?;
916 let mut exists = IntSet::default();
917 let mut add_pq = IntSet::default();
918 #[cfg(feature = "__dev")]
919 info!("UpdateSync >>> {pid:?},{upds:?}");
920 if let Some(player) = dlock.players.get_mut(&pid) {
921 #[cfg(feature = "__dev")]
922 info!("Got Player");
923 for upd in &upds {
924 #[cfg(feature = "__dev")]
925 info!("Iterating over update {upd:?}");
926 if let Some(Some(sync)) = player.syncs.get_mut(upd.slot)
927 {
928 #[cfg(feature = "__dev")]
929 info!("Got sync {sync:?}");
930 exists.insert(upd.slot);
931 if let Some(vari) = &upd.variables {
932 #[cfg(feature = "__dev")]
933 info!("Got variable updates: {vari:?}");
934 for (vname, value) in vari {
935 #[cfg(feature = "__dev")]
936 info!(
937 "Itering over {vname:?} >>> {value:?}"
938 );
939 if let OptionalValue::Some(value) =
940 value.clone()
941 {
942 #[cfg(feature = "__dev")]
943 info!("Set");
944 sync.variables
945 .insert(vname.clone(), value);
946 } else {
947 #[cfg(feature = "__dev")]
948 info!("Removed");
949 sync.variables.remove(vname);
950 }
951 }
952 } else if upd.remove_sync {
953 #[cfg(feature = "__dev")]
954 info!("Marked as sync is ending");
955 sync.is_ending = true;
956 } else {
957 #[cfg(feature = "__dev")]
958 info!("No updates triggered");
959 }
960 #[cfg(feature = "__dev")]
961 info!("Finished iterating over sync {sync:?}");
962 } else {
963 #[cfg(feature = "__dev")]
964 info!("Added sync to PlayerQueue");
965 add_pq.insert(pid);
966 }
967 }
968 } else {
969 #[cfg(feature = "__dev")]
970 info!("Added to PlayerQueue");
971 add_pq.insert(pid);
972 }
973 for slot in exists {
974 if let Some(upd) =
975 upds.iter().find(|supd| supd.slot == slot)
976 {
977 if let Some(dup) = &mut dlock.func_data_update {
978 if let Some(vari) = &upd.variables {
979 for (vname, value) in vari {
980 dup(DataUpdate::UpdateSyncVariable(
981 pid,
982 slot,
983 vname.clone(),
984 value.clone(),
985 ));
986 }
987 } else if upd.remove_sync {
988 dup(DataUpdate::UpdateSyncRemoval(pid, slot));
989 }
990 }
991 }
992 }
993 for pid in add_pq {
994 for upd in &upds {
995 if let Some(player) = dlock.players.get(&pid) {
996 if let Some(Some(_)) = player.syncs.get(upd.slot) {
997 continue;
998 }
999 }
1000 dlock
1001 .player_queue
1002 .entry(pid)
1003 .or_insert(PlayerQueue::default());
1004 let pq = dlock
1005 .player_queue
1006 .get_mut(&pid)
1007 .expect("expected to have data stored");
1008 if let Some(vari) = &upd.variables {
1009 pq.syncs.entry(upd.slot).or_insert(HashMap::new());
1010 let su = pq
1011 .syncs
1012 .get_mut(&upd.slot)
1013 .expect("expected to have data stored");
1014 for (vname, value) in vari {
1015 su.insert(vname.clone(), value.clone());
1016 }
1017 } else if upd.remove_sync
1018 && !pq.remove_syncs.contains(&upd.slot)
1019 {
1020 pq.remove_syncs.push(upd.slot);
1021 }
1022 }
1023 }
1024 }
1025 ReadPacket::ClearPlayers() => {
1026 let mut dlock = data.write().await;
1027 for pid in dlock.players.keys().cloned().collect::<Vec<u64>>() {
1028 if let Some(dup) = &mut dlock.func_data_update {
1029 dup(DataUpdate::PlayerLoggedOut(pid));
1030 }
1031 }
1032 dlock.players.clear();
1033 }
1034 ReadPacket::GameIniWrite(upds) => {
1035 let mut dlock = data.write().await;
1036 for upd in upds {
1037 if let OptionalValue::Some(value) = upd.value.clone() {
1038 dlock.game_save.insert(upd.name.clone(), value.clone());
1039 } else {
1040 dlock.game_save.remove(&upd.name);
1041 }
1042 if let Some(dup) = &mut dlock.func_data_update {
1043 let keys = upd
1044 .name
1045 .split(">")
1046 .map(|entry| {
1047 urlencoding::decode(entry).expect(
1048 "unable to decode uri on gameini update",
1049 )
1050 })
1051 .collect::<Vec<_>>();
1052 if keys.len() == 3 {
1053 dup(DataUpdate::UpdateGameIni(
1054 Some(
1055 keys.first()
1056 .expect("unable to fetch key 0")
1057 .to_string(),
1058 ),
1059 keys.get(1)
1060 .expect("unable to fetch key 1")
1061 .to_string(),
1062 keys.get(2)
1063 .expect("unable to fetch key 2")
1064 .to_string(),
1065 upd.value,
1066 ));
1067 } else {
1068 dup(DataUpdate::UpdateGameIni(
1069 None,
1070 keys.first()
1071 .expect("unable to fetch key 0 on empty")
1072 .to_string(),
1073 keys.get(1)
1074 .expect("unable to fetch key 1 on empty")
1075 .to_string(),
1076 upd.value,
1077 ));
1078 }
1079 }
1080 }
1081 }
1082 ReadPacket::NewSync(pid, upds) => {
1083 let mut dlock = data.write().await;
1084 for (slot, kind, stype, vari) in upds {
1085 if let Some(player) = dlock.players.get_mut(&pid) {
1086 player.syncs.insert(
1087 slot as usize,
1088 Some(types::Sync {
1089 kind,
1090 sync_type: stype,
1091 variables: vari,
1092 event: SyncEvent::New,
1093 is_ending: false,
1094 }),
1095 );
1096 Self::iter_missing_data(&mut dlock, pid).await?;
1097 } else {
1098 dlock
1099 .player_queue
1100 .entry(pid)
1101 .or_insert(PlayerQueue::default());
1102 let pq = dlock
1103 .player_queue
1104 .get_mut(&pid)
1105 .expect("expected to have data stored");
1106 pq.new_syncs.push(NewSync {
1107 kind,
1108 slot: slot as usize,
1109 sync_type: stype,
1110 variables: vari,
1111 });
1112 }
1113 }
1114 }
1115 ReadPacket::PlayerChangedRooms(pid, room) => {
1116 let mut dlock = data.write().await;
1117 Self::iter_missing_data(&mut dlock, pid).await?;
1118 if let Some(player) = dlock.players.get_mut(&pid) {
1119 player.room = room;
1120 }
1121 }
1122 ReadPacket::HighscoreUpdate(pid, hid, score) => {
1123 let mut dlock = data.write().await;
1124 if let Some(highscore) =
1125 dlock.game_highscores.get_mut(&Leb(hid))
1126 {
1127 highscore.scores.insert(Leb(pid), score);
1128 }
1129 }
1130 ReadPacket::UpdatePlayerData(pid, syncs, vari) => {
1131 let mut dlock = data.write().await;
1132 #[cfg(feature = "__dev")]
1133 info!("{pid:?}, {syncs:?}, {vari:?}");
1134 if dlock.players.contains_key(&pid) {
1135 #[cfg(feature = "__dev")]
1136 info!("OK");
1137 dlock.player_queue.remove(&pid);
1138 if let Some(player) = dlock.players.get_mut(&pid) {
1139 player.syncs = syncs;
1140 player.variables = vari;
1141 #[cfg(feature = "__dev")]
1142 info!("{player:?}");
1143 }
1144 }
1145 #[cfg(feature = "__dev")]
1146 info!("END");
1147 }
1148 ReadPacket::RequestPlayerVariable(index, vari) => {
1149 let mut dlock = data.write().await;
1150 #[cfg(feature = "__dev")]
1151 info!(
1152 "RequestPlayerVariable->{:?}->{index:?}",
1153 dlock.callback_server_update
1154 );
1155 if let Some(Some(csu)) = dlock
1156 .callback_server_update
1157 .get_mut(&index)
1158 .map(|csu| csu.take())
1159 {
1160 if let ServerUpdateCallback::PlayerVariable(callback, pid) =
1161 csu.callback
1162 {
1163 Self::iter_missing_data(&mut dlock, pid).await?;
1164 if let Some(player) = dlock.players.get_mut(&pid) {
1165 if let OptionalValue::Some(value) = vari.clone() {
1166 player
1167 .variables
1168 .insert(csu.name.clone(), value);
1169 } else {
1170 player.variables.remove(&*csu.name);
1171 }
1172 }
1173 if let Some(mut callback) = callback {
1174 callback(pid, csu.name, vari);
1175 }
1176 }
1177 }
1178 }
1179 ReadPacket::AdminAction(aa) => {
1180 let mut dlock = data.write().await;
1181 dlock.clear(false).await;
1182 dlock.is_loggedin = false;
1183 match aa.clone() {
1184 AdminAction::Ban(reason, unban_time) => {
1185 if let Some(callback) = &mut dlock.func_banned {
1186 callback(
1187 reason.clone(),
1188 DateTime::from_timestamp(unban_time, 0)
1189 .expect("unable to parse ban timestamp"),
1190 );
1191 }
1192 if let Some(dup) = dlock.func_data_update.as_mut() {
1193 dup(DataUpdate::Banned(
1194 reason,
1195 DateTime::from_timestamp(unban_time, 0)
1196 .expect("unable to parse ban timestamp"),
1197 ));
1198 }
1199 }
1200 AdminAction::Kick(reason) => {
1201 if let Some(callback) = &mut dlock.func_kicked {
1202 callback(reason.clone());
1203 }
1204 if let Some(dup) = dlock.func_data_update.as_mut() {
1205 dup(DataUpdate::Kicked(reason));
1206 }
1207 }
1208 AdminAction::Unban => {
1209 panic!("this should never happen");
1210 }
1211 }
1212 if let Some(dup) = &mut dlock.func_data_update {
1213 dup(DataUpdate::AdminAction(aa));
1214 }
1215 }
1216 ReadPacket::RequestSyncVariable(index, vari) => {
1217 let mut dlock = data.write().await;
1218 if let Some(Some(csu)) =
1219 dlock.callback_server_update.remove(&index)
1220 {
1221 if let ServerUpdateCallback::SyncVariable(
1222 callback,
1223 pid,
1224 slot,
1225 ) = csu.callback
1226 {
1227 let obtained = if let Some(player) =
1228 dlock.players.get_mut(&pid)
1229 {
1230 if let Some(Some(sync)) = player.syncs.get_mut(slot)
1231 {
1232 if let OptionalValue::Some(value) = vari.clone()
1233 {
1234 sync.variables
1235 .insert(csu.name.clone(), value);
1236 } else {
1237 sync.variables.remove(&*csu.name);
1238 }
1239 }
1240 true
1241 } else {
1242 false
1243 };
1244 if obtained {
1245 if let Some(dup) = &mut dlock.func_data_update {
1246 dup(DataUpdate::UpdateSyncVariable(
1247 pid,
1248 slot,
1249 csu.name.clone(),
1250 vari.clone(),
1251 ));
1252 }
1253 }
1254 if let Some(mut callback) = callback {
1255 callback(pid, csu.name, vari);
1256 }
1257 }
1258 }
1259 }
1260 ReadPacket::ChangeGameVersion(ver) => {
1261 let mut dlock = data.write().await;
1262 dlock.game_version = ver;
1263 if let Some(dup) = &mut dlock.func_data_update {
1264 dup(DataUpdate::UpdateGameVersion(ver));
1265 }
1266 }
1267 ReadPacket::ModifyAdministrator(pid, admin) => {
1268 let mut dlock = data.write().await;
1269 dlock.game_administrators.insert(Leb(pid), admin);
1270 if let Some(dup) = &mut dlock.func_data_update {
1271 dup(DataUpdate::UpdateAdministrator(pid, Some(admin)));
1272 }
1273 }
1274 ReadPacket::RemoveAdministrator(pid) => {
1275 let mut dlock = data.write().await;
1276 dlock.game_administrators.remove(&Leb(pid));
1277 if let Some(dup) = &mut dlock.func_data_update {
1278 dup(DataUpdate::UpdateAdministrator(pid, None));
1279 }
1280 }
1281 ReadPacket::PlayerIniWrite(upds) => {
1282 let mut dlock = data.write().await;
1283 for upd in upds {
1284 if let OptionalValue::Some(value) = upd.value.clone() {
1285 dlock.game_save.insert(upd.name.clone(), value.clone());
1286 } else {
1287 dlock.game_save.remove(&upd.name);
1288 }
1289 if let Some(dup) = &mut dlock.func_data_update {
1290 let keys = upd
1291 .name
1292 .split(">")
1293 .map(|entry| {
1294 urlencoding::decode(entry).expect(
1295 "unable to decode uri on playerini update",
1296 )
1297 })
1298 .collect::<Vec<_>>();
1299 if keys.len() == 3 {
1300 dup(DataUpdate::UpdatePlayerIni(
1301 Some(
1302 keys.first()
1303 .expect("unable to fetch key 0")
1304 .to_string(),
1305 ),
1306 keys.get(1)
1307 .expect("unable to fetch key 1")
1308 .to_string(),
1309 keys.first()
1310 .expect("unable to fetch key 2")
1311 .to_string(),
1312 upd.value,
1313 ));
1314 } else {
1315 dup(DataUpdate::UpdatePlayerIni(
1316 None,
1317 keys.first()
1318 .expect("unable to fetch key 0 on empty")
1319 .to_string(),
1320 keys.get(1)
1321 .expect("unable to fetch key 1 on empty")
1322 .to_string(),
1323 upd.value,
1324 ));
1325 }
1326 }
1327 }
1328 }
1329 ReadPacket::RequestBdb(index, bdb) => {
1330 let mut dlock = data.write().await;
1331 if let Some(Some(mut csu)) = dlock
1332 .callback_server_update
1333 .get_mut(&index)
1334 .map(|csu| csu.take())
1335 {
1336 if let ServerUpdateCallback::FetchBdb(Some(callback)) =
1337 &mut csu.callback
1338 {
1339 callback(csu.name.clone(), bdb.clone());
1340 }
1341 if let Some(dup) = &mut dlock.func_data_update {
1342 dup(DataUpdate::FetchBdb(csu.name.clone(), bdb));
1343 }
1344 }
1345 }
1346 ReadPacket::ChangeFriendStatus(cfs, pid) => {
1347 let mut dlock = data.write().await;
1348 match cfs {
1350 ChangeFriendStatus::Request => {
1351 dlock.player_incoming_friends.insert(pid);
1352 dlock.player_friends.remove(&pid);
1353 dlock.player_outgoing_friends.remove(&pid);
1354 }
1355 ChangeFriendStatus::Accept | ChangeFriendStatus::Friend => {
1356 dlock.player_friends.insert(pid);
1357 dlock.player_incoming_friends.remove(&pid);
1358 dlock.player_outgoing_friends.remove(&pid);
1359 }
1360 ChangeFriendStatus::Deny
1361 | ChangeFriendStatus::Cancel
1362 | ChangeFriendStatus::Remove
1363 | ChangeFriendStatus::NotFriend => {
1364 dlock.player_friends.remove(&pid);
1365 dlock.player_incoming_friends.remove(&pid);
1366 dlock.player_outgoing_friends.remove(&pid);
1367 }
1368 }
1369 if let Some(dup) = &mut dlock.func_data_update {
1370 dup(DataUpdate::ChangeFriendStatus(pid));
1371 }
1372 }
1373 ReadPacket::ServerMessage(msg) => {
1374 let mut dlock = data.write().await;
1375 if let Some(dup) = &mut dlock.func_data_update {
1376 dup(DataUpdate::ServerMessage(msg.clone()));
1377 }
1378 }
1379 ReadPacket::ChangeConnection(host) => {
1380 let mut dlock = data.write().await;
1381 dlock.is_connecting = true;
1382 dlock.is_reconnecting = true;
1383 if let Some(dup) = &mut dlock.func_data_update {
1384 dup(DataUpdate::Reconnecting());
1385 }
1386 if let Ok((ws, _)) =
1387 tokio_tungstenite::connect_async(&host).await
1388 {
1389 let (sread, swrite) = StreamHandler::split_stream(ws).await;
1390 *writer.lock().await = swrite;
1391 reader = sread;
1392 dlock.last_host = Some(host);
1393 } else {
1394 return Err(Error::new(
1395 ErrorKind::ConnectionRefused,
1396 "unable to connect to new host",
1397 ));
1398 }
1399 }
1400 }
1401 }
1402 }
1403 Err(_e) => {
1404 return Err(Error::new(ErrorKind::BrokenPipe, "invalid read data"));
1405 }
1406 }
1407 }
1408 });
1409 let mut dlock;
1410 if let Err(_e) = task.await {
1411 dlock = cdata.write().await;
1412 #[cfg(feature = "__dev")]
1413 info!("Stream handler closed with error: {_e:?}");
1414 dlock.registered_errors.push(if _e.is_panic() {
1415 ClientError::HandlerPanic(format!("{:?}", _e.into_panic()))
1416 } else {
1417 ClientError::HandlerResult(Ok(())) });
1419 } else {
1420 dlock = cdata.write().await;
1421 dlock
1422 .registered_errors
1423 .push(ClientError::HandlerResult(Ok(())));
1424 }
1425 {
1426 dlock.clear(true).await;
1427 if dlock.call_disconnected {
1428 if let Some(func) = dlock.func_disconnected.as_mut() {
1429 func();
1430 }
1431 if let Some(dup) = dlock.func_data_update.as_mut() {
1432 dup(DataUpdate::Disconnected());
1433 }
1434 dlock.call_disconnected = false;
1435 }
1436 }
1437 cwriter.lock().await.shutdown().await;
1438 }
1439
1440 #[inline(always)]
1441 fn get_packet_write(data: &WritePacket) -> IoResult<Buffer> {
1442 #[cfg(feature = "__dev")]
1443 info!("writing packet: {data:?}");
1444 let mut b = Buffer::empty();
1445 match data {
1446 WritePacket::InitializationHandshake(
1447 hash,
1448 lib_version,
1449 device_id,
1450 game_id,
1451 version,
1452 session,
1453 ) => {
1454 b.write_leb_u64(0)?;
1455 for hash in hash {
1456 b.write_u64(*hash)?;
1457 }
1458 b.write_leb_u64(*lib_version)?;
1459 b.write_string(device_id)?;
1460 b.write_string(game_id)?;
1461 b.write_f64(*version)?;
1462 b.write_string(session)?;
1463 }
1464 WritePacket::Login(username, passw, game_token, variables, syncs, room) => {
1465 b.write_leb_u64(1)?;
1466 b.write_string(username)?;
1467 match passw {
1468 LoginPassw::Token(token) => {
1469 b.write_bool(true)?;
1470 b.write_string(token)?;
1471 }
1472 LoginPassw::Passw(passw) => {
1473 b.write_bool(false)?;
1474 b.write_string(passw)?;
1475 }
1476 }
1477 b.write_string(game_token)?;
1478 b.write(variables)?;
1479 b.write(syncs)?;
1480 b.write_string(room)?;
1481 }
1482 WritePacket::Register(username, email, passw, repeat_passw) => {
1483 b.write_leb_u64(2)?;
1484 b.write_string(username)?;
1485 b.write_string(email)?;
1486 b.write_string(passw)?;
1487 b.write_string(repeat_passw)?;
1488 }
1489 WritePacket::RequestPlayerVariable(player_request, index, name) => {
1490 b.write_leb_u64(3)?;
1491 match player_request {
1492 PlayerRequest::ID(pid) => {
1493 b.write_bool(true)?;
1494 b.write_leb_u64(*pid)?;
1495 }
1496 _ => {
1497 b.write_bool(false)?;
1498 }
1499 }
1500 b.write_string(name)?;
1501 b.write_leb_u64(*index)?;
1502 }
1503 WritePacket::P2P(player_request, mid, payload) => {
1504 b.write_leb_u64(4)?;
1505 b.write(player_request)?;
1506 b.write_i16(*mid)?;
1507 b.write(payload)?;
1508 }
1509 WritePacket::UpdateGameVersion(version) => {
1510 b.write_leb_u64(5)?;
1511 b.write_f64(*version)?;
1512 }
1513 WritePacket::UpdateGameSession(session) => {
1514 b.write_leb_u64(6)?;
1515 b.write_string(session)?;
1516 }
1517 WritePacket::UpdatePlayerVariable(updates) => {
1518 b.write_leb_u64(7)?;
1519 b.write(updates)?;
1520 }
1521 WritePacket::Ping() => {
1522 b.write_leb_u64(8)?;
1523 }
1524 WritePacket::GameIniWrite(updates) => {
1525 b.write_leb_u64(9)?;
1526 b.write(updates)?;
1527 }
1528 WritePacket::PlayerIniWrite(updates) => {
1529 b.write_leb_u64(10)?;
1530 b.write(updates)?;
1531 }
1532 WritePacket::UpdateRoom(room) => {
1533 b.write_leb_u64(11)?;
1534 b.write_string(room)?;
1535 }
1536 WritePacket::NewSync(upds) => {
1537 b.write_leb_u64(12)?;
1538 b.write_leb_u64(upds.len() as u64)?;
1539 for (slot, kind, sync_type, variables) in upds {
1540 b.write_leb_u64(*slot)?;
1541 b.write_i16(*kind)?;
1542 b.write_u8(*sync_type as u8)?;
1543 b.write(variables)?;
1544 }
1545 }
1546 WritePacket::UpdateSync(updates) => {
1547 b.write_leb_u64(13)?;
1548 b.write(updates)?;
1549 }
1550 WritePacket::UpdateAchievement(aid) => {
1551 b.write_leb_u64(14)?;
1552 b.write_leb_u64(*aid)?;
1553 }
1554 WritePacket::UpdateHighscore(hid, score) => {
1555 b.write_leb_u64(15)?;
1556 b.write_leb_u64(*hid)?;
1557 b.write_f64(*score)?;
1558 }
1559 WritePacket::AdminAction(admin_action, pid) => {
1560 b.write_leb_u64(16)?;
1561 b.write_leb_u64(*pid)?;
1562 match admin_action {
1563 AdminAction::Ban(reason, time) => {
1564 b.write_u8(0)?;
1565 b.write_string(reason)?;
1566 b.write_i64(*time)?;
1567 }
1568 AdminAction::Unban => {
1569 b.write_u8(1)?;
1570 }
1571 AdminAction::Kick(reason) => {
1572 b.write_u8(2)?;
1573 b.write_string(reason)?;
1574 }
1575 }
1576 }
1577 WritePacket::RequestSyncVariable(pid, index, slot, name) => {
1578 b.write_leb_u64(17)?;
1579 b.write_leb_u64(*pid)?;
1580 b.write_string(name)?;
1581 b.write_leb_u64(*index)?;
1582 b.write_leb_u64(*slot)?;
1583 }
1584 WritePacket::Logout() => {
1585 b.write_leb_u64(18)?;
1586 }
1587 WritePacket::RequestBdb(index, name) => {
1588 b.write_leb_u64(19)?;
1589 b.write_leb_u64(*index)?;
1590 b.write_string(name)?;
1591 }
1592 WritePacket::SetBdb(name, payload) => {
1593 b.write_leb_u64(20)?;
1594 b.write_string(name)?;
1595 b.write_bytes(payload)?;
1596 }
1597 WritePacket::RequestChangeFriendStatus(status, pid) => {
1598 b.write_leb_u64(21)?;
1599 b.write_u8(*status as u8)?;
1600 b.write_leb_u64(*pid)?;
1601 }
1602 WritePacket::Handshake(key) => {
1603 b.write_leb_u64(22)?;
1604 b.write_string(key)?;
1605 }
1606 }
1607 Ok(b)
1608 }
1609
1610 #[inline(always)]
1611 fn get_packet_read(mut b: Buffer) -> IoResult<ReadPacket> {
1612 let event = b.read_leb_u64()?;
1613 match event {
1614 0 => {
1615 let code = RegistrationCode::try_from_primitive(b.read_u8()?).unwrap_or_default();
1616 Ok(ReadPacket::Registration(code))
1617 }
1618 1 => {
1619 let code = LoginCode::try_from_primitive(b.read_u8()?).unwrap_or_default();
1620 match code {
1621 LoginCode::GameBan => {
1622 let reason = b.read_string()?;
1623 let unban_time = b.read_i64()?;
1624 Ok(ReadPacket::LoginBan(code, reason, unban_time))
1625 }
1626 LoginCode::Ok => {
1627 let pid = b.read_leb_u64()?;
1628 let name = b.read_string()?;
1629 let token = b.read()?;
1630 let player_save = b.read()?;
1631 let friends = b.read()?;
1632 let incoming_friends = b.read()?;
1633 let outgoing_friends = b.read()?;
1634 let achievements = b.read()?;
1635 Ok(ReadPacket::LoginOk(
1636 pid,
1637 name,
1638 token,
1639 player_save,
1640 friends,
1641 incoming_friends,
1642 outgoing_friends,
1643 achievements,
1644 ))
1645 }
1646 _ => Ok(ReadPacket::Login(code)),
1647 }
1648 }
1649 2 => {
1650 let pid = b.read_leb_u64()?;
1651 let name = b.read_string()?;
1652 let room = b.read_string()?;
1653 let variables = b.read()?;
1654 let syncs = b.read()?;
1655 Ok(ReadPacket::PlayerLoggedIn(
1656 pid, name, variables, syncs, room,
1657 ))
1658 }
1659 3 => {
1660 let pid = b.read_leb_u64()?;
1661 Ok(ReadPacket::PlayerLoggedOut(pid))
1662 }
1663 4 => {
1664 let game_save = b.read()?;
1665 let game_achievements = b.read()?;
1666 let game_highscores = b.read()?;
1667 let game_administrators = b.read()?;
1668 let version = b.read_f64()?;
1669 Ok(ReadPacket::SyncGameInfo(
1670 game_save,
1671 game_achievements,
1672 game_highscores,
1673 game_administrators,
1674 version,
1675 ))
1676 }
1677 5 => {
1678 let pid = b.read()?;
1679 let mid = b.read_i16()?;
1680 let payload = b.read()?;
1681 Ok(ReadPacket::P2P(pid, mid, payload))
1682 }
1683 6 => {
1684 let pid = b.read_leb_u64()?;
1685 let updates = b.read()?;
1686 Ok(ReadPacket::UpdatePlayerVariable(pid, updates))
1687 }
1688 7 => {
1689 let ping = b.read()?;
1690 Ok(ReadPacket::Ping(ping))
1691 }
1692 8 => Ok(ReadPacket::ClearPlayers()),
1693 9 => {
1694 let updates = b.read()?;
1695 Ok(ReadPacket::GameIniWrite(updates))
1696 }
1697 10 => {
1698 let pid = b.read_leb_u64()?;
1699 let mut data = Vec::new();
1700 for _ in 0..b.read_leb_u64()? {
1701 let slot = b.read_leb_u64()?;
1702 let kind = b.read_i16()?;
1703 let sync_type = SyncType::try_from_primitive(b.read_u8()?).unwrap_or_default();
1704 let variables = b.read()?;
1705 data.push((slot, kind, sync_type, variables));
1706 }
1707 Ok(ReadPacket::NewSync(pid, data))
1708 }
1709 11 => {
1710 let pid = b.read_leb_u64()?;
1711 let room = b.read_string()?;
1712 Ok(ReadPacket::PlayerChangedRooms(pid, room))
1713 }
1714 12 => {
1715 let pid = b.read_leb_u64()?;
1716 let sync_updates = b.read()?;
1717 Ok(ReadPacket::UpdateSync(pid, sync_updates))
1718 }
1719 13 => {
1720 let pid = b.read_leb_u64()?;
1721 let hid = b.read_leb_u64()?;
1722 let score = b.read_f64()?;
1723 Ok(ReadPacket::HighscoreUpdate(pid, hid, score))
1724 }
1725 14 => {
1726 let pid = b.read_leb_u64()?;
1727 let syncs = b.read()?;
1728 let variables = b.read()?;
1729 Ok(ReadPacket::UpdatePlayerData(pid, syncs, variables))
1730 }
1731 15 => {
1732 let slot = b.read_leb_u64()?;
1733 let variable = b.read()?;
1734 Ok(ReadPacket::RequestPlayerVariable(slot, variable))
1735 }
1736 16 => {
1737 if b.read_bool()? {
1738 let reason = b.read_string()?;
1739 Ok(ReadPacket::AdminAction(AdminAction::Kick(reason)))
1740 } else {
1741 let reason = b.read_string()?;
1742 let time = b.read_i64()?;
1743 Ok(ReadPacket::AdminAction(AdminAction::Ban(reason, time)))
1744 }
1745 }
1746 17 => {
1747 let slot = b.read_leb_u64()?;
1748 let variable = b.read()?;
1749 Ok(ReadPacket::RequestSyncVariable(slot, variable))
1750 }
1751 18 => {
1752 let version = b.read_f64()?;
1753 Ok(ReadPacket::ChangeGameVersion(version))
1754 }
1755 19 => {
1756 if b.read_bool()? {
1757 let pid = b.read_leb_u64()?;
1758 let administrator = b.read()?;
1759 Ok(ReadPacket::ModifyAdministrator(pid, administrator))
1760 } else {
1761 let pid = b.read_leb_u64()?;
1762 Ok(ReadPacket::RemoveAdministrator(pid))
1763 }
1764 }
1765 20 => Ok(ReadPacket::ForceDisconnection()),
1766 21 => {
1767 let updates = b.read()?;
1768 Ok(ReadPacket::PlayerIniWrite(updates))
1769 }
1770 22 => {
1771 let index = b.read_leb_u64()?;
1772 let data = b.read()?;
1773 Ok(ReadPacket::RequestBdb(index, data))
1774 }
1775 23 => {
1776 let status =
1777 ChangeFriendStatus::try_from_primitive(b.read_u8()?).unwrap_or_default();
1778 let pid = b.read_leb_u64()?;
1779 Ok(ReadPacket::ChangeFriendStatus(status, pid))
1780 }
1781 24 => Ok(ReadPacket::Handshake(b.read()?)),
1782 25 => Ok(ReadPacket::ServerMessage(b.read_string()?)),
1783 26 => Ok(ReadPacket::ChangeConnection(b.read_string()?)),
1784 _ => Err(Error::new(
1785 ErrorKind::InvalidData,
1786 format!("unknown event {event}, erroring out"),
1787 )),
1788 }
1789 }
1790
1791 #[inline(always)]
1792 async fn iter_missing_data(data: &mut StreamData, pid: u64) -> IoResult<()> {
1793 if let Some(pq) = data.player_queue.get_mut(&pid) {
1794 if let Some(player) = data.players.get_mut(&pid) {
1795 for (name, value) in pq.variables.drain() {
1796 if let OptionalValue::Some(value) = value {
1797 player.variables.insert(name, value);
1798 } else {
1799 player.variables.remove(&name);
1800 }
1801 }
1802 for (index, osync) in player.syncs.iter_mut().enumerate() {
1803 if osync.is_none() {
1804 if let Some((sni, sn)) = pq
1805 .new_syncs
1806 .iter()
1807 .enumerate()
1808 .find(|(_, sn)| sn.slot == index)
1809 {
1810 *osync = Some(types::Sync {
1811 event: SyncEvent::New,
1812 kind: sn.kind,
1813 sync_type: sn.sync_type,
1814 variables: sn.variables.clone(),
1815 is_ending: false,
1816 });
1817 pq.new_syncs.remove(sni);
1818 }
1819 }
1820 if let Some(sync) = osync {
1821 if let Some((sni, _)) = pq
1822 .new_syncs
1823 .iter()
1824 .enumerate()
1825 .find(|(_, sn)| sn.slot == index)
1826 {
1827 pq.new_syncs.remove(sni);
1828 }
1829 if let Some(is) = pq.syncs.remove(&index) {
1830 for (name, value) in is {
1831 if let OptionalValue::Some(value) = value {
1832 sync.variables.insert(name, value);
1833 } else {
1834 sync.variables.remove(&name);
1835 }
1836 }
1837 }
1838 if let Some((index, _)) = pq
1839 .remove_syncs
1840 .iter()
1841 .enumerate()
1842 .find(|(rindex, _)| *rindex == index)
1843 {
1844 sync.is_ending = true;
1845 pq.remove_syncs.remove(index);
1846 }
1847 }
1848 }
1849 }
1850 }
1851
1852 Ok(())
1853 }
1854
1855 #[inline(always)]
1859 pub async fn update(&self) -> IoResult<()> {
1860 let conn = self.is_connected().await;
1861 let mut dlock = self.data.write().await;
1862 if let Some(room_callback) = &mut dlock.func_room {
1863 let result = room_callback();
1864 if dlock.room != result && conn {
1865 self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateRoom(
1866 result.clone(),
1867 ))?)
1868 .await?;
1869 }
1870 dlock.room = result;
1871 }
1872 if conn {
1873 let lroom = dlock.room.clone();
1874 for pid in dlock.players.keys().cloned().collect::<Vec<u64>>() {
1876 let player_logged_out = dlock.players_logout.contains(&pid);
1877 let mut is_all_none = true;
1878 Self::iter_missing_data(&mut dlock, pid).await?;
1879 let player = dlock
1880 .players
1881 .get_mut(&pid)
1882 .ok_or(Error::new(ErrorKind::NotFound, "player not found"))?;
1883 for sync in &mut player.syncs {
1884 if let Some(msync) = sync {
1885 is_all_none = false;
1888 if msync.event == SyncEvent::End {
1889 #[cfg(feature = "__dev")]
1890 info!("Sync END {pid}");
1891 drop(sync.take());
1892 continue;
1893 }
1894 if msync.sync_type == SyncType::Once
1895 || player.room != lroom
1896 || player_logged_out
1897 {
1898 #[cfg(feature = "__dev")]
1899 info!("Marked as Sync END {pid}");
1900 msync.event = SyncEvent::End;
1901 }
1902 }
1903 }
1904 if is_all_none && player_logged_out {
1905 dlock.players.remove(&pid);
1906 dlock.players_logout.remove(&pid);
1907 if dlock.player_queue.contains_key(&pid) {
1908 return Err(Error::new(
1909 ErrorKind::AlreadyExists,
1910 "found player queue but expected none",
1911 ));
1912 }
1913 }
1914 }
1915 if !dlock.syncs.is_empty() || !dlock.syncs_remove.is_empty() {
1916 let mut upds = Vec::new();
1917 for (index, sync) in dlock.syncs.iter_mut().enumerate() {
1918 if let Some(sync) = sync {
1919 if !sync.to_sync.is_empty() {
1920 #[cfg(feature = "__dev")]
1921 info!(
1922 "updating sync variables on slot {index}: {:?}",
1923 sync.to_sync
1924 );
1925 let mut variupd = HashMap::new();
1926 for upd in sync.to_sync.drain() {
1927 let vari = sync.variables.get(&upd).cloned().into();
1928 variupd.insert(upd, vari);
1929 }
1930 upds.push(SyncUpdate {
1931 slot: index,
1932 remove_sync: false,
1933 variables: Some(variupd),
1934 });
1935 }
1936 }
1937 }
1938 for remove in dlock.syncs_remove.drain(..) {
1939 upds.push(SyncUpdate {
1940 slot: remove,
1941 remove_sync: true,
1942 variables: None,
1943 });
1944 }
1945 if !upds.is_empty() {
1946 self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateSync(upds))?)
1947 .await?;
1948 }
1949 }
1950 if !dlock.new_sync_queue.is_empty() {
1951 let mut data = Vec::new();
1952 let nsqvec = dlock.new_sync_queue.clone();
1953 dlock.new_sync_queue.clear();
1954 for nsq in nsqvec {
1955 if let Some(sync) = dlock.syncs.get(nsq.slot).ok_or(Error::new(
1956 ErrorKind::NotFound,
1957 "expected sync but found none",
1958 ))? {
1959 data.push((
1960 nsq.slot as u64,
1961 nsq.kind,
1962 nsq.sync_type,
1963 sync.variables.clone(),
1964 ));
1965 } else {
1966 return Err(Error::new(
1967 ErrorKind::NotFound,
1968 "expected sync on reserved slot but found none",
1969 ));
1970 }
1971 }
1972 self.internal_iosend(Self::get_packet_write(&WritePacket::NewSync(data))?)
1973 .await?;
1974 }
1975 if !dlock.update_variable.is_empty() {
1976 let mut data = Vec::new();
1977 let variupds = dlock.update_variable.drain().collect::<Vec<String>>();
1978 for name in variupds {
1979 let value = dlock.variables.get(&name).cloned().into();
1980 data.push(VariableUpdate { name, value });
1981 }
1982 self.internal_iosend(Self::get_packet_write(&WritePacket::UpdatePlayerVariable(
1983 data,
1984 ))?)
1985 .await?;
1986 }
1987 if !dlock.update_playerini.is_empty() {
1988 let mut data: Vec<VariableUpdate> = Vec::new();
1989 let variupds = dlock.update_playerini.drain().collect::<Vec<String>>();
1990 for name in variupds {
1991 let value = dlock.player_save.get(&name).cloned().into();
1992 data.push(VariableUpdate { name, value });
1993 }
1994 self.internal_iosend(Self::get_packet_write(&WritePacket::PlayerIniWrite(data))?)
1995 .await?;
1996 }
1997 if !dlock.update_gameini.is_empty() {
1998 let mut data: Vec<VariableUpdate> = Vec::new();
1999 let variupds = dlock.update_gameini.drain().collect::<Vec<String>>();
2000 for name in variupds {
2001 let value = dlock.game_save.get(&name).cloned().into();
2002 data.push(VariableUpdate { name, value });
2003 }
2004 self.internal_iosend(Self::get_packet_write(&WritePacket::GameIniWrite(data))?)
2005 .await?;
2006 }
2007 if let Some(ping) = dlock.last_ping {
2008 if ping.elapsed().as_secs_f64() >= 90.0 {
2009 drop(dlock);
2010 self.disconnect().await;
2011 if let Some(dup) = self.data.write().await.func_data_update.as_mut() {
2012 dup(DataUpdate::Disconnected());
2013 }
2014 }
2015 }
2016 } else {
2017 if dlock.is_loggedin {
2018 dlock.is_loggedin = false;
2019 }
2020 dlock.last_ping.take();
2021 }
2022 Ok(())
2023 }
2024
2025 pub async fn callback_set_room(&self, callback: CallbackRoom) {
2038 self.data.write().await.func_room = Some(callback);
2039 }
2040
2041 pub async fn callback_set_p2p(&self, callback: CallbackP2P) {
2044 self.data.write().await.func_p2p = Some(callback);
2045 }
2046
2047 pub async fn callback_set_register(&self, callback: CallbackRegister) {
2050 self.data.write().await.func_register = Some(callback);
2051 }
2052
2053 pub async fn callback_set_login(&self, callback: CallbackLogin) {
2056 self.data.write().await.func_login = Some(callback);
2057 }
2058
2059 pub async fn callback_set_banned(&self, callback: CallbackBanned) {
2062 self.data.write().await.func_banned = Some(callback);
2063 }
2064
2065 pub async fn callback_set_kicked(&self, callback: CallbackKicked) {
2068 self.data.write().await.func_kicked = Some(callback);
2069 }
2070
2071 pub async fn callback_set_disconnected(&self, callback: CallbackDisconnected) {
2074 self.data.write().await.func_disconnected = Some(callback);
2075 }
2076
2077 pub async fn callback_set_login_token(&self, callback: CallbackLoginToken) {
2083 self.data.write().await.func_login_token = Some(callback);
2084 }
2085
2086 pub async fn callback_set_data_update(&self, callback: CallbackDataUpdate) {
2091 self.data.write().await.func_data_update = Some(callback);
2092 }
2093
2094 pub async fn is_connected(&self) -> bool {
2096 let dlock = self.data.read().await;
2097 dlock.is_connected && dlock.handshake_completed && {
2098 if let Some(thread) = &dlock.thread {
2099 !thread.is_finished()
2100 } else {
2101 true
2102 }
2103 }
2104 }
2105
2106 pub async fn is_connecting(&self) -> bool {
2108 let dlock = self.data.read().await;
2109 dlock.is_connecting || (!dlock.handshake_completed && dlock.is_connected)
2110 }
2111
2112 pub async fn is_loggedin(&self) -> bool {
2114 self.is_connected().await && {
2115 let dlock = self.data.read().await;
2116 dlock.is_loggedin && dlock.handshake_completed
2117 }
2118 }
2119
2120 pub async fn get_ping(&self) -> f64 {
2122 self.data.read().await.ping
2123 }
2124
2125 pub async fn set_game_token(&self, token: &str) {
2131 self.data.write().await.game_token = token.to_owned();
2132 }
2133
2134 pub async fn disconnect(&self) {
2136 let mut lock = self.data.write().await;
2137 if let Some(thread) = lock.thread.take() {
2138 thread.abort();
2139 }
2140 lock.clear(true).await;
2141 }
2142
2143 async fn internal_iosend(&self, data: Buffer) -> IoResult<()> {
2144 if let Some(writer) = &self.writer {
2145 if let Err(_e) = writer.lock().await.write(data).await {
2146 #[cfg(feature = "__dev")]
2147 info!("unable to send data to server with error: {_e:?}");
2148 let mut dlock = self.data.write().await;
2149 dlock.clear(true).await;
2150 if dlock.call_disconnected {
2151 if let Some(func) = dlock.func_disconnected.as_mut() {
2152 func();
2153 }
2154 if let Some(dup) = dlock.func_data_update.as_mut() {
2155 dup(DataUpdate::Disconnected());
2156 }
2157 dlock.call_disconnected = false;
2158 }
2159 }
2160 } else {
2161 #[cfg(feature = "__dev")]
2162 info!("stream is not open for writing");
2163 }
2164 Ok(())
2165 }
2166
2167 async fn internal_login(&self, username: &str, loginpassw: LoginPassw) -> IoResult<()> {
2168 self.internal_iosend({
2169 let dlock = self.data.read().await;
2170 Self::get_packet_write(&WritePacket::Login(
2171 username.to_owned(),
2172 loginpassw,
2173 dlock.game_token.clone(),
2174 dlock.variables.clone(),
2175 dlock.syncs.clone(),
2176 dlock.room.clone(),
2177 ))?
2178 })
2179 .await
2180 }
2181
2182 pub async fn login(&self, username: &str, passw: &str) -> IoResult<()> {
2188 self.internal_login(username, LoginPassw::Passw(passw.to_owned()))
2189 .await
2190 }
2191
2192 pub async fn login_with_token(&self, username: &str, token: &str) -> IoResult<()> {
2198 self.internal_login(username, LoginPassw::Token(token.to_owned()))
2199 .await
2200 }
2201
2202 pub async fn register(
2207 &self,
2208 username: &str,
2209 email: &str,
2210 passw: &str,
2211 repeat_passw: &str,
2212 ) -> IoResult<()> {
2213 self.internal_iosend(Self::get_packet_write(&WritePacket::Register(
2214 username.to_owned(),
2215 email.to_owned(),
2216 passw.to_owned(),
2217 repeat_passw.to_owned(),
2218 ))?)
2219 .await
2220 }
2221
2222 pub async fn get_player_id(&self) -> Option<u64> {
2225 self.data.read().await.player_id
2226 }
2227
2228 pub async fn get_player_name(&self) -> Option<String> {
2231 self.data.read().await.player_name.clone()
2232 }
2233
2234 pub async fn set_variable(&self, name: &str, value: Value) {
2236 let mut dlock = self.data.write().await;
2237 if let Some(orgvalue) = dlock.variables.get(name) {
2238 if value == *orgvalue {
2239 return;
2240 }
2241 }
2242 dlock.variables.insert(name.to_owned(), value);
2243 dlock.update_variable.insert(name.to_owned());
2244 }
2245
2246 pub async fn remove_variable(&self, name: &str) {
2248 let mut dlock = self.data.write().await;
2249 if dlock.variables.remove(name).is_some() {
2250 dlock.update_variable.insert(name.to_owned());
2251 }
2252 }
2253
2254 pub async fn iter_other_players(&self) -> impl Stream<Item = (u64, Player)> {
2256 let data = self.data.clone();
2257 async_stream::stream! {
2258 for id in data.read().await.players.keys().cloned().collect::<Vec<u64>>() {
2259 if let Some(player) = data.read().await.players.get(&id).cloned() {
2260 yield (id, player);
2261 }
2262 }
2263 }
2264 }
2265
2266 pub async fn other_player_count(&self) -> usize {
2268 self.data.read().await.players.len()
2269 }
2270
2271 pub async fn get_other_player(&self, pid: u64) -> Option<Player> {
2274 self.data.read().await.players.get(&pid).cloned()
2275 }
2276
2277 pub async fn get_other_player_name(&self, name: &str) -> Option<(u64, Player)> {
2280 let name = name.to_lowercase();
2281 self.data
2282 .read()
2283 .await
2284 .players
2285 .iter()
2286 .find(|(_pid, player)| player.name.to_lowercase() == name)
2287 .map(|(pid, player)| (*pid, player.clone()))
2288 }
2289
2290 pub async fn request_other_player_variable(
2296 &self,
2297 pid: u64,
2298 name: &str,
2299 callback: Option<PlayerVariableServerUpdate>,
2300 ) -> IoResult<()> {
2301 let mut dlock = self.data.write().await;
2302 if dlock.players.contains_key(&pid) {
2303 let index = if let Some((index, csu)) = dlock
2304 .callback_server_update
2305 .iter_mut()
2306 .find(|(_, csu)| csu.is_none())
2307 {
2308 *csu = Some(CallbackServerUpdate {
2309 name: name.to_owned(),
2310 callback: ServerUpdateCallback::PlayerVariable(callback, pid),
2311 });
2312 *index
2313 } else {
2314 let index = dlock.callback_server_index;
2315 dlock.callback_server_update.insert(
2316 index,
2317 Some(CallbackServerUpdate {
2318 name: name.to_owned(),
2319 callback: ServerUpdateCallback::PlayerVariable(callback, pid),
2320 }),
2321 );
2322 dlock.callback_server_index += 1;
2323 index
2324 };
2325 self.internal_iosend(Self::get_packet_write(
2326 &WritePacket::RequestPlayerVariable(
2327 PlayerRequest::ID(pid),
2328 index as u64,
2329 name.to_owned(),
2330 ),
2331 )?)
2332 .await?;
2333 Ok(())
2334 } else {
2335 Err(Error::new(ErrorKind::NotFound, "unable to find player id"))
2336 }
2337 }
2338
2339 pub async fn p2p(
2343 &self,
2344 target: PlayerRequest,
2345 message_id: i16,
2346 payload: Vec<Value>,
2347 ) -> IoResult<()> {
2348 self.internal_iosend(Self::get_packet_write(&WritePacket::P2P(
2349 target, message_id, payload,
2350 ))?)
2351 .await
2352 }
2353
2354 pub async fn set_version(&self, version: f64) -> IoResult<()> {
2356 self.data.write().await.version = version;
2357 self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateGameVersion(
2358 version,
2359 ))?)
2360 .await
2361 }
2362
2363 pub async fn get_version(&self) -> f64 {
2365 self.data.read().await.version
2366 }
2367
2368 pub async fn get_server_version(&self) -> f64 {
2370 self.data.read().await.game_version
2371 }
2372
2373 pub async fn set_session(&self, session: &str) -> IoResult<()> {
2375 self.data.write().await.session = session.to_owned();
2376 self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateGameSession(
2377 session.to_owned(),
2378 ))?)
2379 .await
2380 }
2381
2382 pub async fn get_session(&self) -> String {
2384 self.data.read().await.session.clone()
2385 }
2386
2387 fn get_save_key(file: &str, section: &str, key: &str) -> String {
2388 if file.is_empty() {
2389 format!(
2390 "{}>{}",
2391 urlencoding::encode(section),
2392 urlencoding::encode(key)
2393 )
2394 } else {
2395 format!(
2396 "{}>{}>{}",
2397 urlencoding::encode(file),
2398 urlencoding::encode(section),
2399 urlencoding::encode(key)
2400 )
2401 }
2402 }
2403
2404 pub async fn get_open_playerini(&self) -> String {
2407 self.data.read().await.player_open_save.clone()
2408 }
2409
2410 pub async fn open_playerini(&self, file: &str) {
2412 self.data.write().await.player_open_save = file.to_owned();
2413 }
2414
2415 pub async fn close_playerini(&self) {
2417 self.data.write().await.player_open_save.clear();
2418 }
2419
2420 pub async fn has_playerini(&self, section: &str, key: &str) -> bool {
2422 let dlock = self.data.read().await;
2423 dlock
2424 .player_save
2425 .contains_key(&Self::get_save_key(&dlock.player_open_save, section, key))
2426 }
2427
2428 pub async fn get_playerini(&self, section: &str, key: &str) -> Option<Value> {
2431 let dlock = self.data.read().await;
2432 dlock
2433 .player_save
2434 .get(&Self::get_save_key(&dlock.player_open_save, section, key))
2435 .cloned()
2436 }
2437
2438 pub async fn set_playerini(&self, section: &str, key: &str, value: Value) {
2440 let mut dlock = self.data.write().await;
2441 let save_key = Self::get_save_key(&dlock.player_open_save, section, key);
2442 if let Some(orgvalue) = dlock.player_save.get(&save_key) {
2443 if value == *orgvalue {
2444 return;
2445 }
2446 }
2447 dlock.player_save.insert(save_key.clone(), value.clone());
2448 dlock.update_playerini.insert(save_key);
2449 }
2450
2451 pub async fn remove_playerini(&self, section: &str, key: &str) {
2453 let mut dlock = self.data.write().await;
2454 let save_key = Self::get_save_key(&dlock.player_open_save, section, key);
2455 if dlock.player_save.remove(&save_key).is_some() {
2456 dlock.update_playerini.insert(save_key);
2457 }
2458 }
2459
2460 pub async fn get_open_gameini(&self) -> String {
2463 self.data.read().await.game_open_save.clone()
2464 }
2465
2466 pub async fn open_gameini(&self, file: &str) {
2468 self.data.write().await.game_open_save = file.to_owned();
2469 }
2470
2471 pub async fn close_gameini(&self) {
2473 self.data.write().await.game_open_save.clear();
2474 }
2475
2476 pub async fn has_gameini(&self, section: &str, key: &str) -> bool {
2478 let dlock = self.data.read().await;
2479 dlock
2480 .game_save
2481 .contains_key(&Self::get_save_key(&dlock.game_open_save, section, key))
2482 }
2483
2484 pub async fn get_gameini(&self, section: &str, key: &str) -> Option<Value> {
2487 let dlock = self.data.read().await;
2488 dlock
2489 .game_save
2490 .get(&Self::get_save_key(&dlock.game_open_save, section, key))
2491 .cloned()
2492 }
2493
2494 pub async fn set_gameini(&self, section: &str, key: &str, value: Value) {
2496 let mut dlock = self.data.write().await;
2497 let save_key = Self::get_save_key(&dlock.game_open_save, section, key);
2498 if let Some(orgvalue) = dlock.game_save.get(&save_key) {
2499 if value == *orgvalue {
2500 return;
2501 }
2502 }
2503 dlock.game_save.insert(save_key.clone(), value.clone());
2504 dlock.update_gameini.insert(save_key);
2505 }
2506
2507 pub async fn remove_gameini(&self, section: &str, key: &str) {
2509 let mut dlock = self.data.write().await;
2510 let save_key = Self::get_save_key(&dlock.game_open_save, section, key);
2511 if dlock.game_save.remove(&save_key).is_some() {
2512 dlock.update_gameini.insert(save_key);
2513 }
2514 }
2515
2516 pub async fn has_achievement(&self, aid: u64) -> bool {
2518 self.data
2519 .read()
2520 .await
2521 .game_achievements
2522 .contains_key(&Leb(aid))
2523 }
2524
2525 pub async fn get_achievement(&self, aid: u64) -> Option<Achievement> {
2528 self.data
2529 .read()
2530 .await
2531 .game_achievements
2532 .get(&Leb(aid))
2533 .cloned()
2534 }
2535
2536 pub async fn has_reached_achievement(&self, aid: u64) -> bool {
2538 let dlock = self.data.read().await;
2539 if dlock.player_id.is_some() {
2540 dlock
2541 .game_achievements
2542 .get(&Leb(aid))
2543 .map(|achievement| achievement.unlocked.is_some())
2544 .unwrap_or(false)
2545 } else {
2546 false
2547 }
2548 }
2549
2550 pub async fn get_reached_achievement(&self, aid: u64) -> Option<i64> {
2554 let dlock = self.data.read().await;
2555 if dlock.player_id.is_some() {
2556 dlock
2557 .game_achievements
2558 .get(&Leb(aid))
2559 .map(|achievement| achievement.unlocked)?
2560 } else {
2561 None
2562 }
2563 }
2564
2565 pub async fn reach_achievement(&self, aid: u64) -> IoResult<()> {
2567 if !self.has_reached_achievement(aid).await {
2568 let mut dlock = self.data.write().await;
2569 if dlock.player_id.is_some() {
2570 if let Some(achievement) = dlock.game_achievements.get_mut(&Leb(aid)) {
2571 achievement.unlocked = Some(Utc::now().timestamp());
2572 self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateAchievement(
2573 aid,
2574 ))?)
2575 .await?;
2576 }
2577 }
2578 }
2579 Ok(())
2580 }
2581
2582 pub async fn has_highscore(&self, hid: u64) -> bool {
2584 self.data
2585 .read()
2586 .await
2587 .game_achievements
2588 .contains_key(&Leb(hid))
2589 }
2590
2591 pub async fn get_highscore(&self, hid: u64) -> Option<Highscore> {
2594 self.data
2595 .read()
2596 .await
2597 .game_highscores
2598 .get(&Leb(hid))
2599 .cloned()
2600 }
2601
2602 pub async fn has_score_highscore(&self, hid: u64) -> bool {
2604 let dlock = self.data.read().await;
2605 if let Some(player_id) = dlock.player_id {
2606 if let Some(highscore) = dlock.game_highscores.get(&Leb(hid)) {
2607 highscore.scores.contains_key(&Leb(player_id))
2608 } else {
2609 false
2610 }
2611 } else {
2612 false
2613 }
2614 }
2615
2616 pub async fn get_score_highscore(&self, hid: u64) -> Option<f64> {
2619 let dlock = self.data.read().await;
2620 if let Some(player_id) = dlock.player_id {
2621 if let Some(highscore) = dlock.game_highscores.get(&Leb(hid)) {
2622 highscore.scores.get(&Leb(player_id)).cloned()
2623 } else {
2624 None
2625 }
2626 } else {
2627 None
2628 }
2629 }
2630
2631 pub async fn set_score_highscore(&self, hid: u64, score: f64) -> IoResult<()> {
2633 let mut dlock = self.data.write().await;
2634 if let Some(player_id) = dlock.player_id {
2635 if let Some(highscore) = dlock.game_highscores.get_mut(&Leb(hid)) {
2636 if let Some(hscore) = highscore.scores.get_mut(&Leb(player_id)) {
2637 if *hscore != score {
2638 *hscore = score;
2639 self.internal_iosend(Self::get_packet_write(
2640 &WritePacket::UpdateHighscore(hid, score),
2641 )?)
2642 .await?;
2643 }
2644 } else {
2645 highscore.scores.insert(Leb(player_id), score);
2646 self.internal_iosend(Self::get_packet_write(&WritePacket::UpdateHighscore(
2647 hid, score,
2648 ))?)
2649 .await?;
2650 }
2651 }
2652 }
2653 Ok(())
2654 }
2655
2656 pub async fn create_sync(&self, sync_type: SyncType, kind: i16) -> usize {
2658 let mut dlock = self.data.write().await;
2659 let slot = if let Some((index, sync)) = dlock
2660 .syncs
2661 .iter_mut()
2662 .enumerate()
2663 .find(|(_, sync)| sync.is_none())
2664 {
2665 *sync = Some(SelfSync {
2666 kind,
2667 sync_type,
2668 variables: HashMap::new(),
2669 to_sync: HashSet::new(),
2670 });
2671 index
2672 } else {
2673 dlock.syncs.push(Some(SelfSync {
2674 kind,
2675 sync_type,
2676 variables: HashMap::new(),
2677 to_sync: HashSet::new(),
2678 }));
2679 dlock.syncs.len() - 1
2680 };
2681 dlock.new_sync_queue.push(NewSyncQueue {
2682 slot,
2683 kind,
2684 sync_type,
2685 });
2686 slot
2687 }
2688
2689 pub async fn destroy_sync(&self, sync: usize) {
2691 let mut dlock = self.data.write().await;
2692 if let Some(Some(_)) = dlock.syncs.get(sync) {
2693 dlock.syncs_remove.push(sync);
2694 }
2695 }
2696
2697 pub async fn set_variable_sync(&self, sync: usize, name: &str, value: Value) {
2699 let mut dlock = self.data.write().await;
2700 if let Some(Some(sync)) = dlock.syncs.get_mut(sync) {
2701 if let Some(orgvalue) = sync.variables.get(name) {
2702 if value == *orgvalue {
2703 return;
2704 }
2705 }
2706 sync.variables.insert(name.to_owned(), value);
2707 sync.to_sync.insert(name.to_owned());
2708 }
2709 }
2710
2711 pub async fn remove_variable_sync(&self, sync: usize, name: &str) {
2713 let mut dlock = self.data.write().await;
2714 if let Some(Some(sync)) = dlock.syncs.get_mut(sync) {
2715 if sync.variables.remove(name).is_some() {
2716 sync.to_sync.insert(name.to_owned());
2717 }
2718 }
2719 }
2720
2721 pub async fn get_variable_other_sync(
2723 &self,
2724 pid: u64,
2725 sync: usize,
2726 name: &str,
2727 ) -> Option<Value> {
2728 let dlock = self.data.read().await;
2729 if let Some(player) = dlock.players.get(&pid) {
2730 if let Some(Some(sync)) = player.syncs.get(sync) {
2731 sync.variables.get(name).cloned()
2732 } else {
2733 None
2734 }
2735 } else {
2736 None
2737 }
2738 }
2739
2740 pub async fn has_variable_other_sync(&self, pid: u64, sync: usize, name: &str) -> Option<bool> {
2743 let dlock = self.data.read().await;
2744 if let Some(player) = dlock.players.get(&pid) {
2745 if let Some(Some(sync)) = player.syncs.get(sync) {
2746 Some(sync.variables.contains_key(name))
2747 } else {
2748 None
2749 }
2750 } else {
2751 None
2752 }
2753 }
2754
2755 pub async fn iter_other_syncs(&self) -> impl Stream<Item = SyncIter> {
2757 let data = self.data.clone();
2758
2759 Box::pin(async_stream::stream! {
2760 let pids = data.read().await.players.keys().cloned().collect::<Vec<u64>>();
2761 for id in pids {
2762 let player_data = data.read().await.players.get(&id).cloned();
2763 if let Some(player) = player_data {
2764 let syncs_data = player.syncs.iter().cloned().enumerate();
2765 for (index, sync) in syncs_data {
2766 if let Some(sync) = sync {
2767 {
2768 if data.read().await.room != player.room && sync.event != SyncEvent::End {
2769 continue;
2770 }
2771 }
2772 let iter = SyncIter {
2773 player_id: id,
2774 player_name: player.name.clone(),
2775 slot: index,
2776 event: sync.event,
2777 kind: sync.kind,
2778 variables: sync.variables.clone(),
2779 };
2780 if sync.event == SyncEvent::New {
2781 if let Some(player) = data.write().await.players.get_mut(&id) {
2782 if let Some(Some(sync)) = player.syncs.get_mut(index) {
2783 if sync.event == SyncEvent::New { sync.event = SyncEvent::Step;
2785 }
2786 }
2787 }
2788 }
2789 yield iter;
2790 }
2791 }
2792 }
2793 }
2794 })
2795 }
2796
2797 pub async fn is_player_admin(&self, pid: u64) -> bool {
2799 self.data
2800 .read()
2801 .await
2802 .game_administrators
2803 .contains_key(&Leb(pid))
2804 }
2805
2806 pub async fn get_player_admin(&self, pid: u64) -> Option<Administrator> {
2809 self.data
2810 .read()
2811 .await
2812 .game_administrators
2813 .get(&Leb(pid))
2814 .cloned()
2815 }
2816
2817 pub async fn player_kick(&self, pid: u64, reason: &str) -> IoResult<bool> {
2822 if self.get_player_id().await == Some(pid)
2823 || self
2824 .get_player_admin(self.get_player_id().await.unwrap_or(u64::MAX))
2825 .await
2826 .unwrap_or_default()
2827 .can_kick
2828 {
2829 self.internal_iosend(Self::get_packet_write(&WritePacket::AdminAction(
2830 AdminAction::Kick(reason.to_owned()),
2831 pid,
2832 ))?)
2833 .await?;
2834 Ok(true)
2835 } else {
2836 Ok(false)
2837 }
2838 }
2839
2840 pub async fn player_ban(
2845 &self,
2846 pid: u64,
2847 reason: &str,
2848 unban_time: DateTime<Utc>,
2849 ) -> IoResult<bool> {
2850 if self.get_player_id().await == Some(pid)
2851 || self
2852 .get_player_admin(self.get_player_id().await.unwrap_or(u64::MAX))
2853 .await
2854 .unwrap_or_default()
2855 .can_ban
2856 {
2857 self.internal_iosend(Self::get_packet_write(&WritePacket::AdminAction(
2858 AdminAction::Ban(reason.to_string(), unban_time.timestamp()),
2859 pid,
2860 ))?)
2861 .await?;
2862 Ok(true)
2863 } else {
2864 Ok(false)
2865 }
2866 }
2867
2868 pub async fn player_unban(&self, pid: u64) -> IoResult<bool> {
2872 if self
2873 .get_player_admin(self.get_player_id().await.unwrap_or(u64::MAX))
2874 .await
2875 .unwrap_or_default()
2876 .can_unban
2877 {
2878 self.internal_iosend(Self::get_packet_write(&WritePacket::AdminAction(
2879 AdminAction::Unban,
2880 pid,
2881 ))?)
2882 .await?;
2883 Ok(true)
2884 } else {
2885 Ok(false)
2886 }
2887 }
2888
2889 pub async fn logout(&self) -> IoResult<bool> {
2891 if self.is_loggedin().await {
2892 self.internal_iosend(Self::get_packet_write(&WritePacket::Logout())?)
2893 .await?;
2894 Ok(true)
2895 } else {
2896 Ok(false)
2897 }
2898 }
2899
2900 pub async fn request_other_sync_variable(
2906 &self,
2907 pid: u64,
2908 slot: usize,
2909 name: &str,
2910 callback: Option<SyncVariableServerUpdate>,
2911 ) -> IoResult<()> {
2912 let mut dlock = self.data.write().await;
2913 if let Some(player) = dlock.players.get(&pid) {
2914 if let Some(Some(_)) = player.syncs.get(slot) {
2915 let index = if let Some((index, csu)) = dlock
2916 .callback_server_update
2917 .iter_mut()
2918 .find(|(_, csu)| csu.is_none())
2919 {
2920 *csu = Some(CallbackServerUpdate {
2921 name: name.to_owned(),
2922 callback: ServerUpdateCallback::SyncVariable(callback, pid, slot),
2923 });
2924 *index
2925 } else {
2926 let index = dlock.callback_server_index;
2927 dlock.callback_server_update.insert(
2928 index,
2929 Some(CallbackServerUpdate {
2930 name: name.to_owned(),
2931 callback: ServerUpdateCallback::SyncVariable(callback, pid, slot),
2932 }),
2933 );
2934 dlock.callback_server_index += 1;
2935 index
2936 };
2937 self.internal_iosend(Self::get_packet_write(&WritePacket::RequestSyncVariable(
2938 pid,
2939 index as u64,
2940 slot as u64,
2941 name.to_owned(),
2942 ))?)
2943 .await?;
2944 Ok(())
2945 } else {
2946 Err(Error::new(ErrorKind::NotFound, "unable to find sync"))
2947 }
2948 } else {
2949 Err(Error::new(ErrorKind::NotFound, "unable to find player id"))
2950 }
2951 }
2952
2953 pub async fn fetch_bdb(
2958 &self,
2959 name: &str,
2960 callback: Option<FetchBdbServerUpdate>,
2961 ) -> IoResult<()> {
2962 let mut dlock = self.data.write().await;
2963 let index = if let Some((index, csu)) = dlock
2964 .callback_server_update
2965 .iter_mut()
2966 .find(|(_, csu)| csu.is_none())
2967 {
2968 *csu = Some(CallbackServerUpdate {
2969 name: name.to_owned(),
2970 callback: ServerUpdateCallback::FetchBdb(callback),
2971 });
2972 *index
2973 } else {
2974 let index = dlock.callback_server_index;
2975 dlock.callback_server_update.insert(
2976 index,
2977 Some(CallbackServerUpdate {
2978 name: name.to_owned(),
2979 callback: ServerUpdateCallback::FetchBdb(callback),
2980 }),
2981 );
2982 dlock.callback_server_index += 1;
2983 index
2984 };
2985 self.internal_iosend(Self::get_packet_write(&WritePacket::RequestBdb(
2986 index as u64,
2987 name.to_owned(),
2988 ))?)
2989 .await?;
2990 Ok(())
2991 }
2992
2993 pub async fn set_bdb(&self, name: &str, data: Vec<u8>) -> IoResult<()> {
2995 self.internal_iosend(Self::get_packet_write(&WritePacket::SetBdb(
2996 name.to_owned(),
2997 data,
2998 ))?)
2999 .await
3000 }
3001
3002 pub async fn get_incoming_friends(&self) -> Option<IntSet<u64>> {
3005 if self.is_loggedin().await {
3006 Some(self.data.read().await.player_incoming_friends.clone())
3007 } else {
3008 None
3009 }
3010 }
3011
3012 pub async fn get_outgoing_friends(&self) -> Option<IntSet<u64>> {
3015 if self.is_loggedin().await {
3016 Some(self.data.read().await.player_outgoing_friends.clone())
3017 } else {
3018 None
3019 }
3020 }
3021
3022 pub async fn get_friends(&self) -> Option<IntSet<u64>> {
3025 if self.is_loggedin().await {
3026 Some(self.data.read().await.player_friends.clone())
3027 } else {
3028 None
3029 }
3030 }
3031
3032 pub async fn send_outgoing_friend(&self, pid: u64) -> IoResult<()> {
3034 if self.is_loggedin().await {
3035 {
3036 let mut dlock = self.data.write().await;
3037 if dlock.player_friends.contains(&pid)
3038 || dlock.player_outgoing_friends.contains(&pid)
3039 || dlock.player_incoming_friends.contains(&pid)
3040 {
3041 return Ok(());
3042 }
3043 dlock.player_outgoing_friends.insert(pid);
3044 }
3045 self.internal_iosend(Self::get_packet_write(
3046 &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Request, pid),
3047 )?)
3048 .await?;
3049 }
3050 Ok(())
3051 }
3052
3053 pub async fn remove_outgoing_friend(&self, pid: u64) -> IoResult<()> {
3055 if self.is_loggedin().await {
3056 {
3057 let mut dlock = self.data.write().await;
3058 if !dlock.player_outgoing_friends.contains(&pid) {
3059 return Ok(());
3060 }
3061 dlock.player_outgoing_friends.remove(&pid);
3062 }
3063 self.internal_iosend(Self::get_packet_write(
3064 &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Cancel, pid),
3065 )?)
3066 .await?;
3067 }
3068 Ok(())
3069 }
3070
3071 pub async fn deny_incoming_friend(&self, pid: u64) -> IoResult<()> {
3073 if self.is_loggedin().await {
3074 {
3075 let mut dlock = self.data.write().await;
3076 if !dlock.player_incoming_friends.contains(&pid) {
3077 return Ok(());
3078 }
3079 dlock.player_incoming_friends.remove(&pid);
3080 }
3081 self.internal_iosend(Self::get_packet_write(
3082 &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Deny, pid),
3083 )?)
3084 .await?;
3085 }
3086 Ok(())
3087 }
3088
3089 pub async fn accept_incoming_friend(&self, pid: u64) -> IoResult<()> {
3091 if self.is_loggedin().await {
3092 {
3093 let mut dlock = self.data.write().await;
3094 if !dlock.player_incoming_friends.contains(&pid)
3095 || dlock.player_friends.contains(&pid)
3096 {
3097 return Ok(());
3098 }
3099 dlock.player_incoming_friends.remove(&pid);
3100 dlock.player_friends.insert(pid);
3101 }
3102 self.internal_iosend(Self::get_packet_write(
3103 &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Accept, pid),
3104 )?)
3105 .await?;
3106 }
3107 Ok(())
3108 }
3109
3110 pub async fn remove_friend(&self, pid: u64) -> IoResult<()> {
3112 if self.is_loggedin().await {
3113 {
3114 let mut dlock = self.data.write().await;
3115 if !dlock.player_friends.contains(&pid) {
3116 return Ok(());
3117 }
3118 dlock.player_friends.remove(&pid);
3119 }
3120 self.internal_iosend(Self::get_packet_write(
3121 &WritePacket::RequestChangeFriendStatus(ChangeFriendStatus::Remove, pid),
3122 )?)
3123 .await?;
3124 }
3125 Ok(())
3126 }
3127}