1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
use crate::errors::{Result, SocketError};
use crate::EIO_VERSION;
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use url::Url;
/// Engine.IO packet types
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnginePacketType {
Open = 0,
Close = 1,
Ping = 2,
Pong = 3,
Message = 4,
Upgrade = 5,
Noop = 6,
}
impl EnginePacketType {
pub fn from_u8(value: u8) -> Result<Self> {
match value {
0 => Ok(EnginePacketType::Open),
1 => Ok(EnginePacketType::Close),
2 => Ok(EnginePacketType::Ping),
3 => Ok(EnginePacketType::Pong),
4 => Ok(EnginePacketType::Message),
5 => Ok(EnginePacketType::Upgrade),
6 => Ok(EnginePacketType::Noop),
_ => Err(SocketError::InvalidPacketType(value)),
}
}
pub fn to_u8(self) -> u8 {
self as u8
}
}
/// Engine.IO transport using WebSocket
#[derive(Debug)]
pub struct EngineTransport {
ws_sink: Arc<
Mutex<Option<SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>>>,
>,
tx: mpsc::UnboundedSender<String>,
pub(crate) rx: Arc<Mutex<mpsc::UnboundedReceiver<String>>>,
url: String,
path: String,
}
impl EngineTransport {
pub fn new(uri: &str, path: &str) -> Result<Self> {
let (tx, rx) = mpsc::unbounded_channel();
Ok(Self {
ws_sink: Arc::new(Mutex::new(None)),
tx,
rx: Arc::new(Mutex::new(rx)),
url: uri.to_string(),
path: path.to_string(),
})
}
/// Connect to the server
pub async fn connect(&mut self) -> Result<()> {
let url = self.build_handshake_url()?;
log::info!("Connecting to: {}", url);
let (ws_stream, _) = connect_async(&url)
.await
.map_err(|e| SocketError::WebSocket(format!("Connection failed: {}", e)))?;
// Split the stream into sink (for writing) and stream (for reading)
let (ws_sink, ws_stream_read) = ws_stream.split();
// Store the sink for writing
*self.ws_sink.lock().await = Some(ws_sink);
// Start receiving messages in a separate task
let tx_clone = self.tx.clone();
tokio::spawn(async move {
let mut ws_stream_read = ws_stream_read;
while let Some(msg) = ws_stream_read.next().await {
match msg {
Ok(Message::Text(text)) => {
log::debug!("Received WebSocket text message: {}", text);
if let Err(_) = tx_clone.send(text) {
break;
}
}
Ok(Message::Binary(data)) => {
// Engine.IO v3: binary data is prefixed with packet type
if let Some(&packet_type) = data.first() {
if packet_type == EnginePacketType::Message.to_u8() {
// Extract the actual message (skip first byte)
if data.len() > 1 {
let text = String::from_utf8_lossy(&data[1..]);
if let Err(_) = tx_clone.send(text.to_string()) {
break;
}
}
}
}
}
Ok(Message::Close(_)) => {
break;
}
Ok(Message::Ping(_)) => {
// Engine.IO handles PING as text message "2", not as WebSocket PING frame
// But some servers might send WebSocket PING frames
// tokio-tungstenite automatically responds with PONG, but Engine.IO
// expects a text message "2" to be sent to the channel
// So we always send "2" to the channel when we receive a WebSocket PING
if let Err(_) = tx_clone.send("2".to_string()) {
break;
}
}
Ok(Message::Pong(_)) => {
// Engine.IO handles PONG as text message "3", not as WebSocket PONG frame
// So we don't need to handle it here
}
Err(e) => {
log::error!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
});
Ok(())
}
/// Build handshake URL with EIO=3 parameter
fn build_handshake_url(&self) -> Result<String> {
let mut url =
Url::parse(&self.url).map_err(|e| SocketError::Url(format!("Invalid URL: {}", e)))?;
// Add Engine.IO version 3 parameter
url.query_pairs_mut()
.append_pair("EIO", &EIO_VERSION.to_string())
.append_pair("transport", "websocket");
// Add path
let path = if self.path.is_empty() {
"/socket.io/".to_string()
} else if self.path.ends_with('/') {
format!("{}/socket.io/", self.path.trim_end_matches('/'))
} else {
format!("{}/socket.io/", self.path)
};
url.set_path(&path);
Ok(url.to_string())
}
/// Send a message
pub async fn send(&self, data: &str) -> Result<()> {
let mut sink = self.ws_sink.lock().await;
if let Some(ref mut ws_sink) = *sink {
// Note: data already contains the Engine.IO packet type prefix
// (e.g., "4" for MESSAGE, "3" for PONG)
// So we send it as-is
log::debug!("Sending WebSocket message: {}", data);
ws_sink
.send(Message::Text(data.to_string()))
.await
.map_err(|e| SocketError::Transport(format!("Send failed: {}", e)))?;
Ok(())
} else {
Err(SocketError::Transport("Not connected".to_string()))
}
}
/// Receive a message
pub async fn recv(&self) -> Option<String> {
let mut rx = self.rx.lock().await;
rx.recv().await
}
/// Close the connection
pub async fn close(&mut self) -> Result<()> {
let mut sink = self.ws_sink.lock().await;
if let Some(ref mut ws_sink) = *sink {
ws_sink
.close()
.await
.map_err(|e| SocketError::Transport(format!("Close failed: {}", e)))?;
}
*sink = None;
Ok(())
}
/// Check if connected
pub async fn is_connected(&self) -> bool {
self.ws_sink.lock().await.is_some()
}
}