1use super::packets::{
2 Bytes, ExitBroadcast, JoinBroadcast, JoinRequest, JoinResponse, LagStamp, PlayerPose,
3 PoseBroadcast, PoseRequest, RawJoinResponse, TalkBroadcast, TalkRequest,
4};
5use super::utils::RuntimeError;
6
7use std::{
8 ffi::CString, future::Future, mem::swap, pin::Pin, sync::Arc, thread::sleep, time::Duration,
9};
10
11use crossbeam::channel::{unbounded, Sender as channel_Sender};
12use futures_util::{SinkExt, StreamExt};
13use macro_rules_attribute::apply;
14use tokio::{sync::Mutex, task::JoinHandle, time::Instant};
15use tokio_tungstenite::tungstenite::Message as ws_Message;
16
17pub type BotResult = Result<(), RuntimeError>;
19pub type ConnectionThread = JoinHandle<Result<(), RuntimeError>>;
21pub type MessageSender = Arc<channel_Sender<String>>;
23pub type FutureBotResult = Pin<Box<dyn Future<Output = BotResult> + Send>>;
25pub type JoinCallback =
27 fn(JoinBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
28pub type ExitCallback =
30 fn(ExitBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
31pub type PoseCallback =
33 fn(PoseBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
34pub type TalkCallback =
36 fn(TalkBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
37pub type BotData = (Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender);
39
40#[derive(Debug, Clone, PartialOrd, PartialEq)]
50pub struct Player {
51 pub nickname: String,
53 pub pose: PlayerPose,
55 pub id: u16,
57}
58
59#[derive(Debug, Clone, PartialOrd, PartialEq)]
63pub struct Message {
64 pub content: String,
66 pub author: u16,
68 pub when: Instant,
70}
71
72impl Message {
73 pub fn new<S: Into<String>>(content: S, author: u16, when: Instant) -> Self {
75 Self {
76 content: content.into(),
77 author,
78 when,
79 }
80 }
81}
82
83#[derive(Default, Debug, Clone, PartialOrd, PartialEq)]
90pub struct World {
91 pub max_clients: u8,
93 pub clients: Vec<Player>,
95 pub messages: Vec<Message>,
97 pub lag: LagStamp,
99}
100
101pub async fn patch_world(
118 callbacks: Arc<(JoinCallback, ExitCallback, PoseCallback, TalkCallback)>,
119 data: Bytes,
120 world: Arc<Mutex<World>>,
121 bot: Arc<Mutex<Player>>,
122 replace_invalid_utf8: bool,
123 sender: MessageSender,
124) -> BotResult {
125 match data[0] {
126 b'j' => {
128 let join_brc: JoinBroadcast = match data.try_into() {
130 Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
131 Ok(brc) => brc,
132 };
133 match (&callbacks.0)(
134 join_brc.clone(),
135 Arc::clone(&bot),
136 Arc::clone(&world),
137 sender,
138 )
139 .await
140 {
141 Err(e) => return Err(e),
142 _ => {}
143 }
144 let nickname = match replace_invalid_utf8 {
145 true => join_brc
146 .name
147 .to_string_lossy()
148 .into_owned()
149 .replace('\u{0}', ""),
150 false => join_brc.name.to_string_lossy().into_owned(),
151 };
152 let mut world = world.lock().await;
153 world.clients.push(Player {
154 nickname,
155 pose: join_brc.player_pose,
156 id: join_brc.id,
157 });
158 }
159
160 b'e' => {
161 let exit_brc: ExitBroadcast = match data.try_into() {
163 Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
164 Ok(brc) => brc,
165 };
166 match (&callbacks.1)(
167 exit_brc.clone(),
168 Arc::clone(&bot),
169 Arc::clone(&world),
170 sender,
171 )
172 .await
173 {
174 Err(e) => return Err(e),
175 _ => {}
176 }
177 let mut world = world.lock().await;
178 let idx = world
179 .clients
180 .iter()
181 .position(|c| c.id == exit_brc.id)
182 .unwrap();
183 world.clients.remove(idx);
184 }
185
186 b'p' => {
187 let pose_brc: PoseBroadcast = match data.try_into() {
189 Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
190 Ok(brc) => brc,
191 };
192 match (&callbacks.2)(
193 pose_brc.clone(),
194 Arc::clone(&bot),
195 Arc::clone(&world),
196 sender,
197 )
198 .await
199 {
200 Err(e) => return Err(e),
201 _ => {}
202 }
203 let mut world = world.lock().await;
204 let mut client = match world.clients.iter_mut().find(|c| c.id == pose_brc.id) {
205 Some(v) => v,
206 None => return Ok(()),
207 };
208 client.pose = pose_brc.player_pose;
209 }
210
211 b't' => {
212 let talk_brc: TalkBroadcast = match data.try_into() {
214 Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
215 Ok(brc) => brc,
216 };
217 match (&callbacks.3)(
218 talk_brc.clone(),
219 Arc::clone(&bot),
220 Arc::clone(&world),
221 sender,
222 )
223 .await
224 {
225 Err(e) => return Err(e),
226 _ => {}
227 }
228 let content = match replace_invalid_utf8 {
229 true => talk_brc
230 .str
231 .to_string_lossy()
232 .into_owned()
233 .replace('\u{0}', ""),
234 false => talk_brc.str.to_string_lossy().into_owned(),
235 };
236 world
237 .lock()
238 .await
239 .messages
240 .push(Message::new(content, talk_brc.id, Instant::now()));
241 }
242
243 _ => {} }
245
246 Ok(())
247}
248
249#[macro_export]
258macro_rules! callback {(
259 $( #[$attr:meta] )* $pub:vis
261 async
262 fn $fname:ident( $($args:tt)* ) $(-> $Ret:ty)?
263 {
264 $($body:tt)*
265 }
266) => (
267 $( #[$attr] )*
268 #[allow(unused_parens)]
269 $pub
270 fn $fname( $($args)* ) -> ::std::pin::Pin<::std::boxed::Box<
271 dyn ::std::future::Future<Output = ($($Ret)?)>
272 + ::std::marker::Send
273 >>
274 {
275 ::std::boxed::Box::pin(async move { $($body)* })
276 }
277)}
278
279pub struct Receiver {
287 pub thread: ConnectionThread,
289 pub is_finished: Arc<Mutex<bool>>,
291}
292
293pub struct Sender {
305 pub thread: ConnectionThread,
307 pub is_finished: Arc<Mutex<bool>>,
309}
310
311pub struct Bot {
327 nickname: String,
328 join_callback: Option<JoinCallback>,
329 exit_callback: Option<ExitCallback>,
330 pose_callback: Option<PoseCallback>,
331 talk_callback: Option<TalkCallback>,
332 replace_invalid_utf8: bool,
333 address: String,
334}
335
336impl Bot {
337 pub fn new<S: Into<String>>(nickname: S, address: S, replace_invalid_utf8: bool) -> Self {
339 let nickname = nickname.into();
340 let address = address.into();
341 debug_assert!(
342 nickname.len() <= 31,
343 "Bot's name cannot be longer than 31 character"
344 );
345 Self {
346 nickname,
347 join_callback: None,
348 exit_callback: None,
349 pose_callback: None,
350 talk_callback: None,
351 replace_invalid_utf8,
352 address,
353 }
354 }
355
356 pub fn on_join(&mut self, callback: JoinCallback) -> Option<JoinCallback> {
359 let mut callback = Some(callback);
360 swap(&mut callback, &mut self.join_callback);
361 callback
362 }
363
364 pub fn on_exit(&mut self, callback: ExitCallback) -> Option<ExitCallback> {
367 let mut callback = Some(callback);
368 swap(&mut callback, &mut self.exit_callback);
369 callback
370 }
371
372 pub fn on_pose(&mut self, callback: PoseCallback) -> Option<PoseCallback> {
375 let mut callback = Some(callback);
376 swap(&mut callback, &mut self.pose_callback);
377 callback
378 }
379
380 pub fn on_talk(&mut self, callback: TalkCallback) -> Option<TalkCallback> {
383 let mut callback = Some(callback);
384 swap(&mut callback, &mut self.talk_callback);
385 callback
386 }
387
388 pub async fn run(self) -> Result<((Receiver, Sender), BotData), RuntimeError> {
392 let (mut ws_s, mut ws_r) = match tokio_tungstenite::connect_async(self.address).await {
393 Ok(ws) => ws.0.split(),
394 Err(e) => {
395 return Err(RuntimeError::from_string(format!(
396 "Connection failed: {:?}",
397 e
398 )))
399 }
400 };
401 let join_req: Bytes = JoinRequest {
402 name: match CString::new(self.nickname.clone()) {
403 Ok(s) => s,
404 Err(e) => {
405 return Err(RuntimeError::from_string(format!(
406 "Failed to make new CString: {:?}",
407 e
408 )))
409 }
410 },
411 }
412 .into();
413 ws_s.send(ws_Message::Binary(join_req)).await.unwrap();
414 let join_rsp = JoinResponse::from(
415 RawJoinResponse::try_from(match ws_r.next().await {
416 Some(message) => match message.unwrap() {
417 ws_Message::Binary(data) => data,
418 _ => panic!("Server returned unknown data."),
419 },
420 None => panic!("Server dropped connection"),
421 })
422 .unwrap(),
423 );
424 let (tx, rx) = unbounded();
425 let rx = Arc::new(rx);
426 let tx = Arc::new(tx);
427 let bot = Arc::new(Mutex::new(Player {
428 nickname: self.nickname,
429 pose: Default::default(),
430 id: join_rsp.id,
431 }));
432 let world = Arc::new(Mutex::new(World {
433 max_clients: join_rsp.max_clients,
434 clients: vec![],
435 messages: vec![],
436 lag: [0u8; 3],
437 }));
438 let s_bot = Arc::clone(&bot);
439 let sender_finished = Arc::new(Mutex::new(false));
440 let _sender_finished = Arc::clone(&sender_finished);
441 let a_rx = Arc::clone(&rx);
442 let sender = tokio::spawn(async move {
443 loop {
444 match ws_s
445 .send(ws_Message::Binary(
446 PoseRequest {
447 player_pose: s_bot.lock().await.pose.clone(),
448 }
449 .into(),
450 ))
451 .await
452 {
453 Err(e) => {
454 *sender_finished.lock().await = true;
455 return Err(RuntimeError::from_string(format!("{:?}", e)));
456 }
457 _ => {}
458 };
459 while let Ok(m) = Arc::clone(&a_rx).try_recv() {
460 match ws_s
461 .send(ws_Message::Binary(
462 TalkRequest {
463 str: match CString::new(m) {
464 Ok(b) => b,
465 Err(e) => {
466 *sender_finished.lock().await = true;
467 return Err(RuntimeError::from_string(format!(
468 "CString::new failed: {:?}",
469 e
470 )));
471 }
472 },
473 }
474 .into(),
475 ))
476 .await
477 {
478 Err(e) => {
479 *sender_finished.lock().await = true;
480 return Err(RuntimeError::from_string(format!("{:?}", e)));
481 }
482 Ok(_) => {}
483 };
484 }
485 sleep(Duration::from_millis(10));
486 }
487 });
488 let w = Arc::clone(&world);
489 let b = Arc::clone(&bot);
490 let callbacks = Arc::new((
491 match self.join_callback {
492 Some(f) => f,
493 None => default_join,
494 },
495 match self.exit_callback {
496 Some(f) => f,
497 None => default_exit,
498 },
499 match self.pose_callback {
500 Some(f) => f,
501 None => default_pose,
502 },
503 match self.talk_callback {
504 Some(f) => f,
505 None => default_talk,
506 },
507 ));
508 let receiver_finished = Arc::new(Mutex::new(false));
509 let _receiver_finished = Arc::clone(&receiver_finished);
510 let a_tx = Arc::clone(&tx);
511 let receiver = tokio::spawn(async move {
512 while let Some(message) = ws_r.next().await {
513 match message {
514 Ok(m) => match m {
515 ws_Message::Binary(data) => {
516 match patch_world(
517 Arc::clone(&callbacks),
518 data,
519 Arc::clone(&w),
520 Arc::clone(&b),
521 self.replace_invalid_utf8,
522 Arc::clone(&a_tx),
523 )
524 .await
525 {
526 Err(e) => {
527 *receiver_finished.lock().await = true;
528 return Err(RuntimeError::from_string(e.to_string()));
529 }
530 _ => {}
531 }
532 }
533 _ => {}
534 },
535 Err(e) => {
536 *receiver_finished.lock().await = true;
537 return Err(RuntimeError::from_string(e.to_string()));
538 }
539 }
540 }
541 Ok(())
542 });
543 let main_world = Arc::clone(&world);
544 let main_bot = Arc::clone(&bot);
545 let main_sender = Arc::clone(&tx);
546 Ok((
547 (
548 Receiver {
549 thread: receiver,
550 is_finished: Arc::clone(&_receiver_finished),
551 },
552 Sender {
553 thread: sender,
554 is_finished: Arc::clone(&_sender_finished),
555 },
556 ),
557 (main_bot, main_world, main_sender),
558 ))
559 }
560}
561
562#[doc(hidden)]
563#[apply(callback!)]
564async fn default_join(
565 _: JoinBroadcast,
566 _: Arc<Mutex<Player>>,
567 _: Arc<Mutex<World>>,
568 _: MessageSender,
569) -> BotResult {
570 Ok(())
571}
572
573#[doc(hidden)]
574#[apply(callback!)]
575async fn default_exit(
576 _: ExitBroadcast,
577 _: Arc<Mutex<Player>>,
578 _: Arc<Mutex<World>>,
579 _: MessageSender,
580) -> BotResult {
581 Ok(())
582}
583
584#[doc(hidden)]
585#[apply(callback!)]
586async fn default_pose(
587 _: PoseBroadcast,
588 _: Arc<Mutex<Player>>,
589 _: Arc<Mutex<World>>,
590 _: MessageSender,
591) -> BotResult {
592 Ok(())
593}
594
595#[doc(hidden)]
596#[apply(callback!)]
597async fn default_talk(
598 _: TalkBroadcast,
599 _: Arc<Mutex<Player>>,
600 _: Arc<Mutex<World>>,
601 _: MessageSender,
602) -> BotResult {
603 Ok(())
604}