rosu_render/websocket/
mod.rs1#![cfg(any(
2 feature = "native-tls",
3 feature = "rustls-native-roots",
4 feature = "rustls-webpki-roots"
5))]
6
7use crate::WebsocketError;
8
9use self::{
10 engineio::{
11 packet::{Packet as EnginePacket, PacketId as EnginePacketId},
12 EngineIo,
13 },
14 event::RawEvent,
15 packet::{Packet, PacketKind},
16 reconnect::Reconnect,
17};
18
19mod engineio;
20mod packet;
21mod reconnect;
22
23pub mod error;
24pub mod event;
25
26pub struct OrdrWebsocket {
32 engineio: EngineIo,
33 reconnect: Reconnect,
34}
35
36impl OrdrWebsocket {
37 pub async fn connect() -> Result<Self, WebsocketError> {
39 let engineio = EngineIo::connect().await?;
40
41 let mut this = Self {
42 engineio,
43 reconnect: Reconnect::default(),
44 };
45
46 this.open().await?;
47
48 Ok(this)
49 }
50
51 pub async fn next_event(&mut self) -> Result<RawEvent, WebsocketError> {
53 loop {
54 let Some(bytes) = self.engineio.next_message().await? else {
55 self.reconnect().await?;
56
57 continue;
58 };
59
60 let packet = Packet::from_bytes(&bytes)?;
61
62 match packet.kind {
63 PacketKind::Event => {}
64 PacketKind::Ack => self.ack(&packet).await?,
65 PacketKind::Connect => continue,
66 PacketKind::Disconnect | PacketKind::ConnectError => {
67 self.reconnect().await?;
68
69 continue;
70 }
71 }
72
73 if let Some(data) = packet.data {
74 return RawEvent::from_bytes(data);
75 }
76 }
77 }
78
79 pub async fn disconnect(self) -> Result<(), WebsocketError> {
81 self.engineio
82 .disconnect()
83 .await
84 .map_err(WebsocketError::EngineIo)
85 }
86
87 async fn reconnect(&mut self) -> Result<(), WebsocketError> {
88 if let Some(delay) = self.reconnect.delay() {
89 trace!(?delay, "Delaying reconnect...");
90 tokio::time::sleep(delay).await;
91 }
92
93 let err = match self.engineio.reconnect().await {
94 Ok(()) => match self.open().await {
95 Ok(()) => return Ok(()),
96 Err(err) => err,
97 },
98 Err(err) => WebsocketError::EngineIo(err),
99 };
100
101 self.reconnect.backoff();
102
103 Err(err)
104 }
105
106 async fn emit(&mut self, packet: Packet) -> Result<(), WebsocketError> {
107 let msg = EnginePacket::new(EnginePacketId::Message, packet.to_bytes());
108
109 self.engineio
110 .emit(msg)
111 .await
112 .map_err(WebsocketError::EngineIo)
113 }
114
115 async fn open(&mut self) -> Result<(), WebsocketError> {
116 self.emit(Packet::new(PacketKind::Connect, None)).await
117 }
118
119 async fn ack(&mut self, packet: &Packet) -> Result<(), WebsocketError> {
120 let Some(id) = packet.id else { return Ok(()) };
121
122 self.emit(Packet::new_ack(id)).await
123 }
124}