asciicker_rs/y6/
bot.rs

1use super::packets::{
2    Bytes, ExitBroadcast, JoinBroadcast, JoinRequest, JoinResponse, LagStamp, PlayerPose,
3    PoseBroadcast, PoseRequest, RawJoinResponse, TalkBroadcast, TalkRequest,
4};
5use super::utils::RuntimeError;
6
7use std::{
8    ffi::CString, future::Future, mem::swap, pin::Pin, sync::Arc, thread::sleep, time::Duration,
9};
10
11use crossbeam::channel::{unbounded, Sender as channel_Sender};
12use futures_util::{SinkExt, StreamExt};
13use macro_rules_attribute::apply;
14use tokio::{sync::Mutex, task::JoinHandle, time::Instant};
15use tokio_tungstenite::tungstenite::Message as ws_Message;
16
17/// Result type for callbacks ([`JoinCallback`], [`ExitCallback`], [`PoseCallback`], [`TalkCallback`]), internal functions ([`patch_world`]...).
18pub type BotResult = Result<(), RuntimeError>;
19/// Type alias for two main connection threads.
20pub type ConnectionThread = JoinHandle<Result<(), RuntimeError>>;
21/// Type alias for sender handle of the message channel.
22pub type MessageSender = Arc<channel_Sender<String>>;
23/// Box-pinned [`BotResult`].
24pub type FutureBotResult = Pin<Box<dyn Future<Output = BotResult> + Send>>;
25/// Type alias for join callback.
26pub type JoinCallback =
27    fn(JoinBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
28/// Type alias for exit callback.
29pub type ExitCallback =
30    fn(ExitBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
31/// Type alias for pose callback.
32pub type PoseCallback =
33    fn(PoseBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
34/// Type alias for talk callback.
35pub type TalkCallback =
36    fn(TalkBroadcast, Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender) -> FutureBotResult;
37/// Type alias for main bot data
38pub type BotData = (Arc<Mutex<Player>>, Arc<Mutex<World>>, MessageSender);
39
40/// Middle level abstraction.
41///
42/// Represents an asciicker player
43///
44/// Not used internally, but created by [`Bot::run`] and passed into callbacks + main bot function
45/// as representation of the bot in the asciicker world.
46///
47/// There is also [`Vec<Player>`] in [`World`] that represents all current players
48/// (excluding the bot) and managed by [`Receiver`] thread.
49#[derive(Debug, Clone, PartialOrd, PartialEq)]
50pub struct Player {
51    /// Nickname
52    pub nickname: String,
53    /// Current position + animation + sprite
54    pub pose: PlayerPose,
55    /// ID
56    pub id: u16,
57}
58
59/// Middle level abstraction.
60///
61/// Represents a message sent by someone in asciicker
62#[derive(Debug, Clone, PartialOrd, PartialEq)]
63pub struct Message {
64    /// Contents of the message
65    pub content: String,
66    /// Author id
67    pub author: u16,
68    /// When the message was sent
69    pub when: Instant,
70}
71
72impl Message {
73    /// Creates a new instance of [`Message`].
74    pub fn new<S: Into<String>>(content: S, author: u16, when: Instant) -> Self {
75        Self {
76            content: content.into(),
77            author,
78            when,
79        }
80    }
81}
82
83/// Middle level abstraction.
84///
85/// Represents any asciicker world.
86///
87/// Not used internally, but created by [`Bot::run`] and updated by the [`Receiver`] thread.
88/// Main purpose is to give the user of the library an accurate representation of what is happening.
89#[derive(Default, Debug, Clone, PartialOrd, PartialEq)]
90pub struct World {
91    /// Max amount of client the server supports.
92    pub max_clients: u8,
93    /// Current clients
94    pub clients: Vec<Player>,
95    /// Stack of messages, need to be popped manually
96    pub messages: Vec<Message>,
97    /// [`LagStamp`]
98    pub lag: LagStamp,
99}
100
101/// A high-level abstraction function that is used
102/// internally by the receiver thread
103/// to patch the [`World`] by some packet from server.
104///
105/// For example, if this function receives a [`JoinBroadcast`],
106/// it will add a new [`Player`] to the [`World`] and call
107/// [`JoinCallback`] that was passed in.
108///
109/// [`World`]: ./struct.World.html
110/// [`Player`]: ./struct.Player.html
111/// [`JoinBroadcast`]: ../packets/struct.JoinBroadcast.html
112/// [`JoinCallback`]: ./type.JoinCallback.html
113// FIXME: This function should not call callbacks if there is none.
114// Right now we just supply `default_join` and similar, but
115// really we shouldn't create and call them at all
116// if we don't need to.
117pub async fn patch_world(
118    callbacks: Arc<(JoinCallback, ExitCallback, PoseCallback, TalkCallback)>,
119    data: Bytes,
120    world: Arc<Mutex<World>>,
121    bot: Arc<Mutex<Player>>,
122    replace_invalid_utf8: bool,
123    sender: MessageSender,
124) -> BotResult {
125    match data[0] {
126        /* Accept only stuff we care about, aka broadcasts */
127        b'j' => {
128            // Someone has joined
129            let join_brc: JoinBroadcast = match data.try_into() {
130                Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
131                Ok(brc) => brc,
132            };
133            match (&callbacks.0)(
134                join_brc.clone(),
135                Arc::clone(&bot),
136                Arc::clone(&world),
137                sender,
138            )
139            .await
140            {
141                Err(e) => return Err(e),
142                _ => {}
143            }
144            let nickname = match replace_invalid_utf8 {
145                true => join_brc
146                    .name
147                    .to_string_lossy()
148                    .into_owned()
149                    .replace('\u{0}', ""),
150                false => join_brc.name.to_string_lossy().into_owned(),
151            };
152            let mut world = world.lock().await;
153            world.clients.push(Player {
154                nickname,
155                pose: join_brc.player_pose,
156                id: join_brc.id,
157            });
158        }
159
160        b'e' => {
161            // Someone has left
162            let exit_brc: ExitBroadcast = match data.try_into() {
163                Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
164                Ok(brc) => brc,
165            };
166            match (&callbacks.1)(
167                exit_brc.clone(),
168                Arc::clone(&bot),
169                Arc::clone(&world),
170                sender,
171            )
172            .await
173            {
174                Err(e) => return Err(e),
175                _ => {}
176            }
177            let mut world = world.lock().await;
178            let idx = world
179                .clients
180                .iter()
181                .position(|c| c.id == exit_brc.id)
182                .unwrap();
183            world.clients.remove(idx);
184        }
185
186        b'p' => {
187            // Someone has moved or their pose changed for any reason
188            let pose_brc: PoseBroadcast = match data.try_into() {
189                Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
190                Ok(brc) => brc,
191            };
192            match (&callbacks.2)(
193                pose_brc.clone(),
194                Arc::clone(&bot),
195                Arc::clone(&world),
196                sender,
197            )
198            .await
199            {
200                Err(e) => return Err(e),
201                _ => {}
202            }
203            let mut world = world.lock().await;
204            let mut client = match world.clients.iter_mut().find(|c| c.id == pose_brc.id) {
205                Some(v) => v,
206                None => return Ok(()),
207            };
208            client.pose = pose_brc.player_pose;
209        }
210
211        b't' => {
212            // Someone has said something
213            let talk_brc: TalkBroadcast = match data.try_into() {
214                Err(e) => return Err(RuntimeError::from_string(format!("{:?}", e))),
215                Ok(brc) => brc,
216            };
217            match (&callbacks.3)(
218                talk_brc.clone(),
219                Arc::clone(&bot),
220                Arc::clone(&world),
221                sender,
222            )
223            .await
224            {
225                Err(e) => return Err(e),
226                _ => {}
227            }
228            let content = match replace_invalid_utf8 {
229                true => talk_brc
230                    .str
231                    .to_string_lossy()
232                    .into_owned()
233                    .replace('\u{0}', ""),
234                false => talk_brc.str.to_string_lossy().into_owned(),
235            };
236            world
237                .lock()
238                .await
239                .messages
240                .push(Message::new(content, talk_brc.id, Instant::now()));
241        }
242
243        _ => {} // Don't care
244    }
245
246    Ok(())
247}
248
249/// Macro to transform `async fn` to return
250/// `Pin<Box<impl Future<Output=T>>>` instead of
251/// `impl Future<Output=T>`
252/// and is required for functions which are planned to be
253/// passed as an argument to [`Bot::on_talk`] or similar methods.
254///
255/// Stolen from [here](https://users.rust-lang.org/t/how-to-store-async-function-pointer/38343/4)
256/// , thanks to [Yandros](https://users.rust-lang.org/u/Yandros).
257#[macro_export]
258macro_rules! callback {(
259    $( #[$attr:meta] )* // includes doc strings
260    $pub:vis
261    async
262    fn $fname:ident( $($args:tt)* ) $(-> $Ret:ty)?
263    {
264        $($body:tt)*
265    }
266) => (
267    $( #[$attr] )*
268    #[allow(unused_parens)]
269    $pub
270    fn $fname( $($args)* ) -> ::std::pin::Pin<::std::boxed::Box<
271        dyn ::std::future::Future<Output = ($($Ret)?)>
272            + ::std::marker::Send
273    >>
274    {
275        ::std::boxed::Box::pin(async move { $($body)* })
276    }
277)}
278
279/// Describes a receiver thread.
280///
281/// Receiver thread is last of the two threads created on [`Bot::run`]
282/// and serves a purpose of receiving websocket messages
283/// from specified asciicker server and [patching the world].
284///
285/// [patching the world]: ./fn.patch_world.html
286pub struct Receiver {
287    /// Thread [`JoinHandle`]
288    pub thread: ConnectionThread,
289    /// `true` if thread is still alive
290    pub is_finished: Arc<Mutex<bool>>,
291}
292
293/// Describes a sender thread.
294///
295/// Sender thread is first of the two threads created on [`Bot::run`]
296/// and serves a purpose of sending pose requests
297/// (required to get broadcasts) and sending talk requests when needed.
298///
299/// Sender has a receiver handle of message channel, so anyone with a [sender
300/// handle] (callbacks, main function) can push those messages in and next time sender thread wants to
301/// send a pose request, it will also send the requested messages from the channel.
302///
303/// [sender handle]: ./type.MessageSender.html
304pub struct Sender {
305    /// Thread [`JoinHandle`]
306    pub thread: ConnectionThread,
307    /// `true` if thread is still alive
308    pub is_finished: Arc<Mutex<bool>>,
309}
310
311/// Provides highest level of abstraction.
312///
313/// Can be easily constructed with [`Bot::new`] and ran with [`Bot::run`].
314///
315/// # Examples
316///
317/// ## Creating a new bot and running it:
318/// ```
319/// use asciicker_rs::y6::prelude::*;
320///
321/// let bot = Bot::new("bot", "ws://asciicker.com/ws/y6/", true);
322///
323/// bot.run();
324/// loop {}
325/// ```
326pub struct Bot {
327    nickname: String,
328    join_callback: Option<JoinCallback>,
329    exit_callback: Option<ExitCallback>,
330    pose_callback: Option<PoseCallback>,
331    talk_callback: Option<TalkCallback>,
332    replace_invalid_utf8: bool,
333    address: String,
334}
335
336impl Bot {
337    /// Constructs a new [`Bot`] instance.
338    pub fn new<S: Into<String>>(nickname: S, address: S, replace_invalid_utf8: bool) -> Self {
339        let nickname = nickname.into();
340        let address = address.into();
341        debug_assert!(
342            nickname.len() <= 31,
343            "Bot's name cannot be longer than 31 character"
344        );
345        Self {
346            nickname,
347            join_callback: None,
348            exit_callback: None,
349            pose_callback: None,
350            talk_callback: None,
351            replace_invalid_utf8,
352            address,
353        }
354    }
355
356    /// Replaces [`JoinCallback`] and returns [`Some(JoinCallback)`] if any was set already.
357    /// [`Some(JoinCallback)`]: [Option::Some]
358    pub fn on_join(&mut self, callback: JoinCallback) -> Option<JoinCallback> {
359        let mut callback = Some(callback);
360        swap(&mut callback, &mut self.join_callback);
361        callback
362    }
363
364    /// Replaces [`ExitCallback`] and returns [`Some(ExitCallback)`] if any was set already.
365    /// [`Some(ExitCallback)`]: [Option::Some]
366    pub fn on_exit(&mut self, callback: ExitCallback) -> Option<ExitCallback> {
367        let mut callback = Some(callback);
368        swap(&mut callback, &mut self.exit_callback);
369        callback
370    }
371
372    /// Replaces [`PoseCallback`] and returns [`Some(PoseCallback)`] if any was set already.
373    /// [`Some(PoseCallback)`]: [Option::Some]
374    pub fn on_pose(&mut self, callback: PoseCallback) -> Option<PoseCallback> {
375        let mut callback = Some(callback);
376        swap(&mut callback, &mut self.pose_callback);
377        callback
378    }
379
380    /// Replaces [`TalkCallback`] and returns [`Some(TalkCallback)`] if any was set already.
381    /// [`Some(TalkCallback)`]: [Option::Some]
382    pub fn on_talk(&mut self, callback: TalkCallback) -> Option<TalkCallback> {
383        let mut callback = Some(callback);
384        swap(&mut callback, &mut self.talk_callback);
385        callback
386    }
387
388    /// Runs the bot.
389    ///
390    /// Spawns two threads: [`Receiver`], [`Sender`] and returns them with [`BotData`] if connecting was successful.
391    pub async fn run(self) -> Result<((Receiver, Sender), BotData), RuntimeError> {
392        let (mut ws_s, mut ws_r) = match tokio_tungstenite::connect_async(self.address).await {
393            Ok(ws) => ws.0.split(),
394            Err(e) => {
395                return Err(RuntimeError::from_string(format!(
396                    "Connection failed: {:?}",
397                    e
398                )))
399            }
400        };
401        let join_req: Bytes = JoinRequest {
402            name: match CString::new(self.nickname.clone()) {
403                Ok(s) => s,
404                Err(e) => {
405                    return Err(RuntimeError::from_string(format!(
406                        "Failed to make new CString: {:?}",
407                        e
408                    )))
409                }
410            },
411        }
412        .into();
413        ws_s.send(ws_Message::Binary(join_req)).await.unwrap();
414        let join_rsp = JoinResponse::from(
415            RawJoinResponse::try_from(match ws_r.next().await {
416                Some(message) => match message.unwrap() {
417                    ws_Message::Binary(data) => data,
418                    _ => panic!("Server returned unknown data."),
419                },
420                None => panic!("Server dropped connection"),
421            })
422            .unwrap(),
423        );
424        let (tx, rx) = unbounded();
425        let rx = Arc::new(rx);
426        let tx = Arc::new(tx);
427        let bot = Arc::new(Mutex::new(Player {
428            nickname: self.nickname,
429            pose: Default::default(),
430            id: join_rsp.id,
431        }));
432        let world = Arc::new(Mutex::new(World {
433            max_clients: join_rsp.max_clients,
434            clients: vec![],
435            messages: vec![],
436            lag: [0u8; 3],
437        }));
438        let s_bot = Arc::clone(&bot);
439        let sender_finished = Arc::new(Mutex::new(false));
440        let _sender_finished = Arc::clone(&sender_finished);
441        let a_rx = Arc::clone(&rx);
442        let sender = tokio::spawn(async move {
443            loop {
444                match ws_s
445                    .send(ws_Message::Binary(
446                        PoseRequest {
447                            player_pose: s_bot.lock().await.pose.clone(),
448                        }
449                        .into(),
450                    ))
451                    .await
452                {
453                    Err(e) => {
454                        *sender_finished.lock().await = true;
455                        return Err(RuntimeError::from_string(format!("{:?}", e)));
456                    }
457                    _ => {}
458                };
459                while let Ok(m) = Arc::clone(&a_rx).try_recv() {
460                    match ws_s
461                        .send(ws_Message::Binary(
462                            TalkRequest {
463                                str: match CString::new(m) {
464                                    Ok(b) => b,
465                                    Err(e) => {
466                                        *sender_finished.lock().await = true;
467                                        return Err(RuntimeError::from_string(format!(
468                                            "CString::new failed: {:?}",
469                                            e
470                                        )));
471                                    }
472                                },
473                            }
474                            .into(),
475                        ))
476                        .await
477                    {
478                        Err(e) => {
479                            *sender_finished.lock().await = true;
480                            return Err(RuntimeError::from_string(format!("{:?}", e)));
481                        }
482                        Ok(_) => {}
483                    };
484                }
485                sleep(Duration::from_millis(10));
486            }
487        });
488        let w = Arc::clone(&world);
489        let b = Arc::clone(&bot);
490        let callbacks = Arc::new((
491            match self.join_callback {
492                Some(f) => f,
493                None => default_join,
494            },
495            match self.exit_callback {
496                Some(f) => f,
497                None => default_exit,
498            },
499            match self.pose_callback {
500                Some(f) => f,
501                None => default_pose,
502            },
503            match self.talk_callback {
504                Some(f) => f,
505                None => default_talk,
506            },
507        ));
508        let receiver_finished = Arc::new(Mutex::new(false));
509        let _receiver_finished = Arc::clone(&receiver_finished);
510        let a_tx = Arc::clone(&tx);
511        let receiver = tokio::spawn(async move {
512            while let Some(message) = ws_r.next().await {
513                match message {
514                    Ok(m) => match m {
515                        ws_Message::Binary(data) => {
516                            match patch_world(
517                                Arc::clone(&callbacks),
518                                data,
519                                Arc::clone(&w),
520                                Arc::clone(&b),
521                                self.replace_invalid_utf8,
522                                Arc::clone(&a_tx),
523                            )
524                            .await
525                            {
526                                Err(e) => {
527                                    *receiver_finished.lock().await = true;
528                                    return Err(RuntimeError::from_string(e.to_string()));
529                                }
530                                _ => {}
531                            }
532                        }
533                        _ => {}
534                    },
535                    Err(e) => {
536                        *receiver_finished.lock().await = true;
537                        return Err(RuntimeError::from_string(e.to_string()));
538                    }
539                }
540            }
541            Ok(())
542        });
543        let main_world = Arc::clone(&world);
544        let main_bot = Arc::clone(&bot);
545        let main_sender = Arc::clone(&tx);
546        Ok((
547            (
548                Receiver {
549                    thread: receiver,
550                    is_finished: Arc::clone(&_receiver_finished),
551                },
552                Sender {
553                    thread: sender,
554                    is_finished: Arc::clone(&_sender_finished),
555                },
556            ),
557            (main_bot, main_world, main_sender),
558        ))
559    }
560}
561
562#[doc(hidden)]
563#[apply(callback!)]
564async fn default_join(
565    _: JoinBroadcast,
566    _: Arc<Mutex<Player>>,
567    _: Arc<Mutex<World>>,
568    _: MessageSender,
569) -> BotResult {
570    Ok(())
571}
572
573#[doc(hidden)]
574#[apply(callback!)]
575async fn default_exit(
576    _: ExitBroadcast,
577    _: Arc<Mutex<Player>>,
578    _: Arc<Mutex<World>>,
579    _: MessageSender,
580) -> BotResult {
581    Ok(())
582}
583
584#[doc(hidden)]
585#[apply(callback!)]
586async fn default_pose(
587    _: PoseBroadcast,
588    _: Arc<Mutex<Player>>,
589    _: Arc<Mutex<World>>,
590    _: MessageSender,
591) -> BotResult {
592    Ok(())
593}
594
595#[doc(hidden)]
596#[apply(callback!)]
597async fn default_talk(
598    _: TalkBroadcast,
599    _: Arc<Mutex<Player>>,
600    _: Arc<Mutex<World>>,
601    _: MessageSender,
602) -> BotResult {
603    Ok(())
604}