rosu_render/websocket/
mod.rs

1#![cfg(any(
2    feature = "native-tls",
3    feature = "rustls-native-roots",
4    feature = "rustls-webpki-roots"
5))]
6
7use crate::WebsocketError;
8
9use self::{
10    engineio::{
11        packet::{Packet as EnginePacket, PacketId as EnginePacketId},
12        EngineIo,
13    },
14    event::RawEvent,
15    packet::{Packet, PacketKind},
16    reconnect::Reconnect,
17};
18
19mod engineio;
20mod packet;
21mod reconnect;
22
23pub mod error;
24pub mod event;
25
26/// Connection to the o!rdr websocket.
27///
28/// Await events with [`OrdrWebsocket::next_event`].
29///
30/// To gracefully shut the connection down, use [`OrdrWebsocket::disconnect`].
31pub struct OrdrWebsocket {
32    engineio: EngineIo,
33    reconnect: Reconnect,
34}
35
36impl OrdrWebsocket {
37    /// Connect to the o!rdr websocket.
38    pub async fn connect() -> Result<Self, WebsocketError> {
39        let engineio = EngineIo::connect().await?;
40
41        let mut this = Self {
42            engineio,
43            reconnect: Reconnect::default(),
44        };
45
46        this.open().await?;
47
48        Ok(this)
49    }
50
51    /// Await the next o!rdr websocket event.
52    pub async fn next_event(&mut self) -> Result<RawEvent, WebsocketError> {
53        loop {
54            let Some(bytes) = self.engineio.next_message().await? else {
55                self.reconnect().await?;
56
57                continue;
58            };
59
60            let packet = Packet::from_bytes(&bytes)?;
61
62            match packet.kind {
63                PacketKind::Event => {}
64                PacketKind::Ack => self.ack(&packet).await?,
65                PacketKind::Connect => continue,
66                PacketKind::Disconnect | PacketKind::ConnectError => {
67                    self.reconnect().await?;
68
69                    continue;
70                }
71            }
72
73            if let Some(data) = packet.data {
74                return RawEvent::from_bytes(data);
75            }
76        }
77    }
78
79    /// Gracefully disconnect from the websocket.
80    pub async fn disconnect(self) -> Result<(), WebsocketError> {
81        self.engineio
82            .disconnect()
83            .await
84            .map_err(WebsocketError::EngineIo)
85    }
86
87    async fn reconnect(&mut self) -> Result<(), WebsocketError> {
88        if let Some(delay) = self.reconnect.delay() {
89            trace!(?delay, "Delaying reconnect...");
90            tokio::time::sleep(delay).await;
91        }
92
93        let err = match self.engineio.reconnect().await {
94            Ok(()) => match self.open().await {
95                Ok(()) => return Ok(()),
96                Err(err) => err,
97            },
98            Err(err) => WebsocketError::EngineIo(err),
99        };
100
101        self.reconnect.backoff();
102
103        Err(err)
104    }
105
106    async fn emit(&mut self, packet: Packet) -> Result<(), WebsocketError> {
107        let msg = EnginePacket::new(EnginePacketId::Message, packet.to_bytes());
108
109        self.engineio
110            .emit(msg)
111            .await
112            .map_err(WebsocketError::EngineIo)
113    }
114
115    async fn open(&mut self) -> Result<(), WebsocketError> {
116        self.emit(Packet::new(PacketKind::Connect, None)).await
117    }
118
119    async fn ack(&mut self, packet: &Packet) -> Result<(), WebsocketError> {
120        let Some(id) = packet.id else { return Ok(()) };
121
122        self.emit(Packet::new_ack(id)).await
123    }
124}