1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#[cfg(test)]
mod tests {
// #[test]
// fn test_on_connection() {
// let mut wynd = Wynd::new();
// wynd.on_connection(|_| async move {
// println!("Connection");
// });
// let on_connection_cl = &wynd.on_connection_cl.unwrap();
// on_connection_cl(Conn::new());
// }
// #[test]
// fn test_on_close() {
// let mut wynd = Wynd::new();
// wynd.on_close(|| {
// println!("Closed connection");
// });
// let on_close_cl = &wynd.on_close_cl;
// on_close_cl();
// }
// #[test]
// fn test_on_error() {
// let mut wynd = Wynd::new();
// wynd.on_error(|e| {
// println!("Error: {}", e);
// });
// let on_error_cl = &wynd.on_error_cl;
// on_error_cl(WyndError::default());
// }
use crate::wynd::Wynd;
#[ignore = "Running for eternity will fix later"]
#[tokio::test]
async fn test_listen_accepts_connection_and_text() {
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use std::net::TcpListener as StdTcpListener;
use std::sync::OnceLock;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::{Message, Utf8Bytes};
static OPEN_TX: OnceLock<mpsc::UnboundedSender<()>> = OnceLock::new();
static TEXT_TX: OnceLock<mpsc::UnboundedSender<String>> = OnceLock::new();
// Find a free port by binding to port 0 and reading the assigned port
let std_listener = StdTcpListener::bind("127.0.0.1:0").expect("bind temp listener");
let port = std_listener.local_addr().unwrap().port();
drop(std_listener);
let mut wynd = Wynd::new();
let (open_tx, mut open_rx) = mpsc::unbounded();
let (text_tx, mut text_rx) = mpsc::unbounded();
OPEN_TX.set(open_tx).ok();
TEXT_TX.set(text_tx).ok();
wynd.on_connection(|conn| async move {
// Configure callbacks to forward signals via channels stored in OnceLock
conn.on_open(move |_| async move {
let sender = OPEN_TX.get().unwrap().clone();
let _ = sender.unbounded_send(());
})
.await;
conn.on_text(move |evt, _| async move {
let sender = TEXT_TX.get().unwrap().clone();
let _ = sender.unbounded_send(evt.data);
});
});
// Start the server in the background
let server = tokio::spawn(async move {
wynd.listen(port, || {}).await.unwrap();
});
// Connect a websocket client (retry briefly until server is listening)
let url = format!("ws://127.0.0.1:{}", port);
let (mut ws_stream, _) = {
use std::time::{Duration, Instant};
let deadline = Instant::now() + Duration::from_secs(2);
loop {
match connect_async(url.clone()).await {
Ok(ok) => break ok,
Err(e) => {
if Instant::now() >= deadline {
panic!("connect ws: {}", e);
}
std::thread::sleep(Duration::from_millis(50));
continue;
}
}
}
};
// Verify on_open fired
let _ = open_rx.next().await.expect("open signal");
// Send a text message and verify it's observed by the server-side handler
ws_stream
.send(Message::Text(Utf8Bytes::from("hello".to_string())))
.await
.expect("send text");
let recv_text = text_rx.next().await.expect("text signal");
assert_eq!(recv_text, "hello");
// Cleanup: close client and stop server
let _ = ws_stream.close(None).await;
server.abort();
}
}