1use crate::WsMessage;
2use anyhow::Context;
3use anyhow::Result;
4use anyhow::anyhow;
5use bytes::Bytes;
6use futures_util::sink::SinkExt;
7use futures_util::stream::StreamExt;
8use hyper::upgrade::Upgraded;
9use hyper_util::rt::TokioIo;
10use serde::Serialize;
11use serde::de::DeserializeOwned;
12use serde_json::Value;
13use std::fmt::Debug;
14use std::fmt::Display;
15use tokio_tungstenite::WebSocketStream;
16use tokio_tungstenite::tungstenite::protocol::Role;
17
18#[cfg(feature = "pretty-assertions")]
19use pretty_assertions::assert_eq;
20
21#[cfg(not(feature = "old-json-diff"))]
22use expect_json::expect;
23#[cfg(not(feature = "old-json-diff"))]
24use expect_json::expect_json_eq;
25
26#[derive(Debug)]
27pub struct TestWebSocket {
28 stream: WebSocketStream<TokioIo<Upgraded>>,
29}
30
31impl TestWebSocket {
32 pub(crate) async fn new(upgraded: Upgraded) -> Self {
33 let upgraded_io = TokioIo::new(upgraded);
34 let stream = WebSocketStream::from_raw_socket(upgraded_io, Role::Client, None).await;
35
36 Self { stream }
37 }
38
39 pub async fn close(mut self) {
40 self.stream
41 .close(None)
42 .await
43 .expect("Failed to close WebSocket stream");
44 }
45
46 pub async fn send_text<T>(&mut self, raw_text: T)
47 where
48 T: Display,
49 {
50 let text = raw_text.to_string();
51 self.send_message(WsMessage::Text(text.into())).await;
52 }
53
54 pub async fn send_json<J>(&mut self, body: &J)
55 where
56 J: ?Sized + Serialize,
57 {
58 let raw_json =
59 ::serde_json::to_string(body).expect("It should serialize the content into Json");
60
61 self.send_message(WsMessage::Text(raw_json.into())).await;
62 }
63
64 #[cfg(feature = "yaml")]
65 pub async fn send_yaml<Y>(&mut self, body: &Y)
66 where
67 Y: ?Sized + Serialize,
68 {
69 let raw_yaml =
70 ::serde_yaml::to_string(body).expect("It should serialize the content into Yaml");
71
72 self.send_message(WsMessage::Text(raw_yaml.into())).await;
73 }
74
75 #[cfg(feature = "msgpack")]
76 pub async fn send_msgpack<M>(&mut self, body: &M)
77 where
78 M: ?Sized + Serialize,
79 {
80 let body_bytes =
81 ::rmp_serde::to_vec(body).expect("It should serialize the content into MsgPack");
82
83 self.send_message(WsMessage::Binary(body_bytes.into()))
84 .await;
85 }
86
87 pub async fn send_message(&mut self, message: WsMessage) {
88 self.stream.send(message).await.unwrap();
89 }
90
91 #[must_use]
92 pub async fn receive_text(&mut self) -> String {
93 let message = self.receive_message().await;
94
95 message_to_text(message)
96 .context("Failed to read message as a String")
97 .unwrap()
98 }
99
100 #[must_use]
101 pub async fn receive_json<T>(&mut self) -> T
102 where
103 T: DeserializeOwned,
104 {
105 let bytes = self.receive_bytes().await;
106 serde_json::from_slice::<T>(&bytes)
107 .context("Failed to deserialize message as Json")
108 .unwrap()
109 }
110
111 #[cfg(feature = "yaml")]
112 #[must_use]
113 pub async fn receive_yaml<T>(&mut self) -> T
114 where
115 T: DeserializeOwned,
116 {
117 let bytes = self.receive_bytes().await;
118 serde_yaml::from_slice::<T>(&bytes)
119 .context("Failed to deserialize message as Yaml")
120 .unwrap()
121 }
122
123 #[cfg(feature = "msgpack")]
124 #[must_use]
125 pub async fn receive_msgpack<T>(&mut self) -> T
126 where
127 T: DeserializeOwned,
128 {
129 let received_bytes = self.receive_bytes().await;
130 rmp_serde::from_slice::<T>(&received_bytes)
131 .context("Failed to deserializing message as MsgPack")
132 .unwrap()
133 }
134
135 #[must_use]
136 pub async fn receive_bytes(&mut self) -> Bytes {
137 let message = self.receive_message().await;
138
139 message_to_bytes(message)
140 .context("Failed to read message as a Bytes")
141 .unwrap()
142 }
143
144 #[must_use]
145 pub async fn receive_message(&mut self) -> WsMessage {
146 self.maybe_receive_message()
147 .await
148 .expect("No message found on WebSocket stream")
149 }
150
151 #[must_use]
152 async fn maybe_receive_message(&mut self) -> Option<WsMessage> {
153 let maybe_message = self.stream.next().await;
154
155 match maybe_message {
156 None => None,
157 Some(message_result) => {
158 let message =
159 message_result.expect("Failed to receive message from WebSocket stream");
160 Some(message)
161 }
162 }
163 }
164
165 pub async fn assert_receive_json<T>(&mut self, expected: &T)
166 where
167 T: Serialize + DeserializeOwned + PartialEq<T> + Debug,
168 {
169 let received = self.receive_json::<T>().await;
170
171 #[cfg(feature = "old-json-diff")]
172 {
173 assert_eq!(*expected, received);
174 }
175
176 #[cfg(not(feature = "old-json-diff"))]
177 {
178 if *expected != received {
179 if let Err(error) = expect_json_eq(&received, &expected) {
180 panic!(
181 "
182{error}
183",
184 );
185 }
186 }
187 }
188 }
189
190 pub async fn assert_receive_json_contains<T>(&mut self, expected: &T)
191 where
192 T: Serialize,
193 {
194 let received = self.receive_json::<Value>().await;
195
196 #[cfg(feature = "old-json-diff")]
197 {
198 assert_json_diff::assert_json_include!(actual: received, expected: expected);
199 }
200
201 #[cfg(not(feature = "old-json-diff"))]
202 {
203 let expected_value = serde_json::to_value(expected).unwrap();
204 let result = expect_json_eq(
205 &received,
206 &expect::object().propagated_contains(expected_value),
207 );
208 if let Err(error) = result {
209 panic!(
210 "
211{error}
212",
213 );
214 }
215 }
216 }
217
218 pub async fn assert_receive_text<C>(&mut self, expected: C)
219 where
220 C: AsRef<str>,
221 {
222 let expected_contents = expected.as_ref();
223 assert_eq!(expected_contents, &self.receive_text().await);
224 }
225
226 pub async fn assert_receive_text_contains<C>(&mut self, expected: C)
227 where
228 C: AsRef<str>,
229 {
230 let expected_contents = expected.as_ref();
231 let received = self.receive_text().await;
232 let is_contained = received.contains(expected_contents);
233
234 assert!(
235 is_contained,
236 "Failed to find '{expected_contents}', received '{received}'"
237 );
238 }
239
240 #[cfg(feature = "yaml")]
241 pub async fn assert_receive_yaml<T>(&mut self, expected: &T)
242 where
243 T: DeserializeOwned + PartialEq<T> + Debug,
244 {
245 assert_eq!(*expected, self.receive_yaml::<T>().await);
246 }
247
248 #[cfg(feature = "msgpack")]
249 pub async fn assert_receive_msgpack<T>(&mut self, expected: &T)
250 where
251 T: DeserializeOwned + PartialEq<T> + Debug,
252 {
253 assert_eq!(*expected, self.receive_msgpack::<T>().await);
254 }
255}
256
257fn message_to_text(message: WsMessage) -> Result<String> {
258 let text = match message {
259 WsMessage::Text(text) => text.to_string(),
260 WsMessage::Binary(data) => {
261 String::from_utf8(data.to_vec()).map_err(|err| err.utf8_error())?
262 }
263 WsMessage::Ping(data) => {
264 String::from_utf8(data.to_vec()).map_err(|err| err.utf8_error())?
265 }
266 WsMessage::Pong(data) => {
267 String::from_utf8(data.to_vec()).map_err(|err| err.utf8_error())?
268 }
269 WsMessage::Close(None) => String::new(),
270 WsMessage::Close(Some(frame)) => frame.reason.to_string(),
271 WsMessage::Frame(_) => {
272 return Err(anyhow!(
273 "Unexpected Frame, did not expect Frame message whilst reading"
274 ));
275 }
276 };
277
278 Ok(text)
279}
280
281fn message_to_bytes(message: WsMessage) -> Result<Bytes> {
282 let bytes = match message {
283 WsMessage::Text(string) => string.into(),
284 WsMessage::Binary(data) => data,
285 WsMessage::Ping(data) => data,
286 WsMessage::Pong(data) => data,
287 WsMessage::Close(None) => Bytes::new(),
288 WsMessage::Close(Some(frame)) => frame.reason.into(),
289 WsMessage::Frame(_) => {
290 return Err(anyhow!(
291 "Unexpected Frame, did not expect Frame message whilst reading"
292 ));
293 }
294 };
295
296 Ok(bytes)
297}
298
299#[cfg(test)]
300mod test_assert_receive_text {
301 use crate::TestServer;
302
303 use axum::Router;
304 use axum::extract::WebSocketUpgrade;
305 use axum::extract::ws::Message;
306 use axum::extract::ws::WebSocket;
307 use axum::response::Response;
308 use axum::routing::get;
309
310 fn new_test_app() -> TestServer {
311 pub async fn route_get_websocket_ping_pong(ws: WebSocketUpgrade) -> Response {
312 async fn handle_ping_pong(mut socket: WebSocket) {
313 while let Some(maybe_message) = socket.recv().await {
314 let message_text = maybe_message.unwrap().into_text().unwrap();
315
316 let encoded_text = format!("Text: {message_text}").try_into().unwrap();
317 let encoded_data = format!("Binary: {message_text}").into_bytes().into();
318
319 socket.send(Message::Text(encoded_text)).await.unwrap();
320 socket.send(Message::Binary(encoded_data)).await.unwrap();
321 }
322 }
323
324 ws.on_upgrade(move |socket| handle_ping_pong(socket))
325 }
326
327 let app = Router::new().route(&"/ws-ping-pong", get(route_get_websocket_ping_pong));
328 TestServer::builder().http_transport().build(app).unwrap()
329 }
330
331 #[tokio::test]
332 async fn it_should_ping_pong_text_in_text_and_binary() {
333 let server = new_test_app();
334
335 let mut websocket = server
336 .get_websocket(&"/ws-ping-pong")
337 .await
338 .into_websocket()
339 .await;
340
341 websocket.send_text("Hello World!").await;
342
343 websocket.assert_receive_text("Text: Hello World!").await;
344 websocket.assert_receive_text("Binary: Hello World!").await;
345 }
346
347 #[tokio::test]
348 async fn it_should_ping_pong_large_text_blobs() {
349 const LARGE_BLOB_SIZE: usize = 16777200; let large_blob = (0..LARGE_BLOB_SIZE).map(|_| "X").collect::<String>();
351
352 let server = new_test_app();
353 let mut websocket = server
354 .get_websocket(&"/ws-ping-pong")
355 .await
356 .into_websocket()
357 .await;
358
359 websocket.send_text(&large_blob).await;
360
361 websocket
362 .assert_receive_text(format!("Text: {large_blob}"))
363 .await;
364 websocket
365 .assert_receive_text(format!("Binary: {large_blob}"))
366 .await;
367 }
368
369 #[tokio::test]
370 #[should_panic]
371 async fn it_should_not_match_partial_text_match() {
372 let server = new_test_app();
373
374 let mut websocket = server
375 .get_websocket(&"/ws-ping-pong")
376 .await
377 .into_websocket()
378 .await;
379
380 websocket.send_text("Hello World!").await;
381 websocket.assert_receive_text("Hello World!").await;
382 }
383
384 #[tokio::test]
385 #[should_panic]
386 async fn it_should_not_match_different_text() {
387 let server = new_test_app();
388
389 let mut websocket = server
390 .get_websocket(&"/ws-ping-pong")
391 .await
392 .into_websocket()
393 .await;
394
395 websocket.send_text("Hello World!").await;
396 websocket.assert_receive_text("🦊").await;
397 }
398}
399
400#[cfg(test)]
401mod test_assert_receive_text_contains {
402 use crate::TestServer;
403 use axum::Router;
404 use axum::extract::WebSocketUpgrade;
405 use axum::extract::ws::Message;
406 use axum::extract::ws::WebSocket;
407 use axum::response::Response;
408 use axum::routing::get;
409
410 fn new_test_app() -> TestServer {
411 pub async fn route_get_websocket_ping_pong(ws: WebSocketUpgrade) -> Response {
412 async fn handle_ping_pong(mut socket: WebSocket) {
413 while let Some(maybe_message) = socket.recv().await {
414 let message_text = maybe_message.unwrap().into_text().unwrap();
415 let encoded_text = format!("Text: {message_text}").try_into().unwrap();
416
417 socket.send(Message::Text(encoded_text)).await.unwrap();
418 }
419 }
420
421 ws.on_upgrade(move |socket| handle_ping_pong(socket))
422 }
423
424 let app = Router::new().route(&"/ws-ping-pong", get(route_get_websocket_ping_pong));
425 TestServer::builder().http_transport().build(app).unwrap()
426 }
427
428 #[tokio::test]
429 async fn it_should_assert_whole_text_match() {
430 let server = new_test_app();
431
432 let mut websocket = server
433 .get_websocket(&"/ws-ping-pong")
434 .await
435 .into_websocket()
436 .await;
437
438 websocket.send_text("Hello World!").await;
439 websocket
440 .assert_receive_text_contains("Text: Hello World!")
441 .await;
442 }
443
444 #[tokio::test]
445 async fn it_should_assert_partial_text_match() {
446 let server = new_test_app();
447
448 let mut websocket = server
449 .get_websocket(&"/ws-ping-pong")
450 .await
451 .into_websocket()
452 .await;
453
454 websocket.send_text("Hello World!").await;
455 websocket.assert_receive_text_contains("Hello World!").await;
456 }
457
458 #[tokio::test]
459 #[should_panic]
460 async fn it_should_not_match_different_text() {
461 let server = new_test_app();
462
463 let mut websocket = server
464 .get_websocket(&"/ws-ping-pong")
465 .await
466 .into_websocket()
467 .await;
468
469 websocket.send_text("Hello World!").await;
470 websocket.assert_receive_text_contains("🦊").await;
471 }
472}
473
474#[cfg(test)]
475mod test_assert_receive_json {
476 use crate::TestServer;
477 use axum::Router;
478 use axum::extract::WebSocketUpgrade;
479 use axum::extract::ws::Message;
480 use axum::extract::ws::WebSocket;
481 use axum::response::Response;
482 use axum::routing::get;
483 use serde_json::Value;
484 use serde_json::json;
485
486 #[cfg(not(feature = "old-json-diff"))]
487 use crate::testing::ExpectStrMinLen;
488 #[cfg(not(feature = "old-json-diff"))]
489 use expect_json::expect;
490
491 fn new_test_app() -> TestServer {
492 pub async fn route_get_websocket_ping_pong(ws: WebSocketUpgrade) -> Response {
493 async fn handle_ping_pong(mut socket: WebSocket) {
494 while let Some(maybe_message) = socket.recv().await {
495 let message_text = maybe_message.unwrap().into_text().unwrap();
496 let decoded = serde_json::from_str::<Value>(&message_text).unwrap();
497
498 let encoded_text = serde_json::to_string(&json!({
499 "format": "text",
500 "message": decoded
501 }))
502 .unwrap()
503 .try_into()
504 .unwrap();
505 let encoded_data = serde_json::to_vec(&json!({
506 "format": "binary",
507 "message": decoded
508 }))
509 .unwrap()
510 .into();
511
512 socket.send(Message::Text(encoded_text)).await.unwrap();
513 socket.send(Message::Binary(encoded_data)).await.unwrap();
514 }
515 }
516
517 ws.on_upgrade(move |socket| handle_ping_pong(socket))
518 }
519
520 let app = Router::new().route(&"/ws-ping-pong", get(route_get_websocket_ping_pong));
521 TestServer::builder().http_transport().build(app).unwrap()
522 }
523
524 #[tokio::test]
525 async fn it_should_ping_pong_json_in_text_and_binary() {
526 let server = new_test_app();
527
528 let mut websocket = server
529 .get_websocket(&"/ws-ping-pong")
530 .await
531 .into_websocket()
532 .await;
533
534 websocket
535 .send_json(&json!({
536 "hello": "world",
537 "numbers": [1, 2, 3],
538 }))
539 .await;
540
541 websocket
543 .assert_receive_json(&json!({
544 "format": "text",
545 "message": {
546 "hello": "world",
547 "numbers": [1, 2, 3],
548 },
549 }))
550 .await;
551
552 websocket
554 .assert_receive_json(&json!({
555 "format": "binary",
556 "message": {
557 "hello": "world",
558 "numbers": [1, 2, 3],
559 },
560 }))
561 .await;
562 }
563
564 #[cfg(not(feature = "old-json-diff"))]
565 #[tokio::test]
566 async fn it_should_work_with_custom_expect_op() {
567 let server = new_test_app();
568 let mut websocket = server
569 .get_websocket(&"/ws-ping-pong")
570 .await
571 .into_websocket()
572 .await;
573
574 websocket
575 .send_json(&json!({
576 "hello": "world",
577 "numbers": [1, 2, 3],
578 }))
579 .await;
580
581 websocket
583 .assert_receive_json(&json!({
584 "format": "text",
585 "message": {
586 "hello": ExpectStrMinLen { min: 3 },
587 "numbers": expect::array().len(3).all(expect::integer()),
588 },
589 }))
590 .await;
591
592 websocket
594 .assert_receive_json(&json!({
595 "format": "binary",
596 "message": {
597 "hello": ExpectStrMinLen { min: 3 },
598 "numbers": expect::array().len(3).all(expect::integer()),
599 },
600 }))
601 .await;
602 }
603
604 #[cfg(not(feature = "old-json-diff"))]
605 #[tokio::test]
606 #[should_panic]
607 async fn it_should_panic_if_custom_expect_op_fails() {
608 let server = new_test_app();
609 let mut websocket = server
610 .get_websocket(&"/ws-ping-pong")
611 .await
612 .into_websocket()
613 .await;
614
615 websocket
616 .send_json(&json!({
617 "hello": "world",
618 "numbers": [1, 2, 3],
619 }))
620 .await;
621
622 websocket
624 .assert_receive_json(&json!({
625 "format": "text",
626 "message": {
627 "hello": ExpectStrMinLen { min: 10 },
628 "numbers": expect::array().len(3).all(expect::integer()),
629 },
630 }))
631 .await;
632 }
633}
634
635#[cfg(test)]
636mod test_assert_receive_json_contains {
637 use crate::TestServer;
638 use axum::Router;
639 use axum::extract::WebSocketUpgrade;
640 use axum::extract::ws::Message;
641 use axum::extract::ws::WebSocket;
642 use axum::response::Response;
643 use axum::routing::get;
644 use serde_json::Value;
645 use serde_json::json;
646
647 fn new_test_app() -> TestServer {
648 pub async fn route_get_websocket_ping_pong(ws: WebSocketUpgrade) -> Response {
649 async fn handle_ping_pong(mut socket: WebSocket) {
650 while let Some(maybe_message) = socket.recv().await {
651 let message_text = maybe_message.unwrap().into_text().unwrap();
652 let decoded = serde_json::from_str::<Value>(&message_text).unwrap();
653
654 let encoded_text = serde_json::to_string(&json!({
655 "format": "text",
656 "message": decoded
657 }))
658 .unwrap()
659 .try_into()
660 .unwrap();
661 let encoded_data = serde_json::to_vec(&json!({
662 "format": "binary",
663 "message": decoded
664 }))
665 .unwrap()
666 .into();
667
668 socket.send(Message::Text(encoded_text)).await.unwrap();
669 socket.send(Message::Binary(encoded_data)).await.unwrap();
670 }
671 }
672
673 ws.on_upgrade(move |socket| handle_ping_pong(socket))
674 }
675
676 let app = Router::new().route(&"/ws-ping-pong", get(route_get_websocket_ping_pong));
677 TestServer::builder().http_transport().build(app).unwrap()
678 }
679
680 #[tokio::test]
681 async fn it_should_ping_pong_json_in_text_and_binary_with_root_content_missing_in_contains() {
682 let server = new_test_app();
683
684 let mut websocket = server
685 .get_websocket(&"/ws-ping-pong")
686 .await
687 .into_websocket()
688 .await;
689
690 websocket
691 .send_json(&json!({
692 "hello": "world",
693 "numbers": [1, 2, 3],
694 }))
695 .await;
696
697 websocket
699 .assert_receive_json_contains(&json!({
700 "message": {
702 "hello": "world",
703 "numbers": [1, 2, 3],
704 },
705 }))
706 .await;
707
708 websocket
710 .assert_receive_json_contains(&json!({
711 "format": "binary",
712 }))
714 .await;
715 }
716
717 #[tokio::test]
718 async fn it_should_ping_pong_json_in_text_and_binary_with_nested_content_missing_in_contains() {
719 let server = new_test_app();
720
721 let mut websocket = server
722 .get_websocket(&"/ws-ping-pong")
723 .await
724 .into_websocket()
725 .await;
726
727 websocket
728 .send_json(&json!({
729 "hello": "world",
730 "numbers": [1, 2, 3],
731 }))
732 .await;
733
734 websocket
736 .assert_receive_json_contains(&json!({
737 "format": "text",
738 "message": {
739 "numbers": [1, 2, 3],
741 },
742 }))
743 .await;
744
745 websocket
747 .assert_receive_json_contains(&json!({
748 "format": "binary",
749 "message": {
750 "hello": "world",
751 },
753 }))
754 .await;
755 }
756}
757
758#[cfg(feature = "yaml")]
759#[cfg(test)]
760mod test_assert_receive_yaml {
761 use crate::TestServer;
762
763 use axum::Router;
764 use axum::extract::WebSocketUpgrade;
765 use axum::extract::ws::Message;
766 use axum::extract::ws::WebSocket;
767 use axum::response::Response;
768 use axum::routing::get;
769 use serde_json::Value;
770 use serde_json::json;
771
772 fn new_test_app() -> TestServer {
773 pub async fn route_get_websocket_ping_pong(ws: WebSocketUpgrade) -> Response {
774 async fn handle_ping_pong(mut socket: WebSocket) {
775 while let Some(maybe_message) = socket.recv().await {
776 let message_text = maybe_message.unwrap().into_text().unwrap();
777 let decoded = serde_yaml::from_str::<Value>(&message_text).unwrap();
778
779 let encoded_text = serde_yaml::to_string(&json!({
780 "format": "text",
781 "message": decoded
782 }))
783 .unwrap()
784 .try_into()
785 .unwrap();
786 let encoded_data = serde_yaml::to_string(&json!({
787 "format": "binary",
788 "message": decoded
789 }))
790 .unwrap()
791 .into();
792
793 socket.send(Message::Text(encoded_text)).await.unwrap();
794 socket.send(Message::Binary(encoded_data)).await.unwrap();
795 }
796 }
797
798 ws.on_upgrade(move |socket| handle_ping_pong(socket))
799 }
800
801 let app = Router::new().route(&"/ws-ping-pong", get(route_get_websocket_ping_pong));
802 TestServer::builder().http_transport().build(app).unwrap()
803 }
804
805 #[tokio::test]
806 async fn it_should_ping_pong_yaml_in_text_and_binary() {
807 let server = new_test_app();
808
809 let mut websocket = server
810 .get_websocket(&"/ws-ping-pong")
811 .await
812 .into_websocket()
813 .await;
814
815 websocket
816 .send_json(&json!({
817 "hello": "world",
818 "numbers": [1, 2, 3],
819 }))
820 .await;
821
822 websocket
824 .assert_receive_yaml(&json!({
825 "format": "text",
826 "message": {
827 "hello": "world",
828 "numbers": [1, 2, 3],
829 },
830 }))
831 .await;
832
833 websocket
835 .assert_receive_yaml(&json!({
836 "format": "binary",
837 "message": {
838 "hello": "world",
839 "numbers": [1, 2, 3],
840 },
841 }))
842 .await;
843 }
844}
845
846#[cfg(feature = "msgpack")]
847#[cfg(test)]
848mod test_assert_receive_msgpack {
849 use crate::TestServer;
850
851 use axum::Router;
852 use axum::extract::WebSocketUpgrade;
853 use axum::extract::ws::Message;
854 use axum::extract::ws::WebSocket;
855 use axum::response::Response;
856 use axum::routing::get;
857 use serde_json::Value;
858 use serde_json::json;
859
860 fn new_test_app() -> TestServer {
861 pub async fn route_get_websocket_ping_pong(ws: WebSocketUpgrade) -> Response {
862 async fn handle_ping_pong(mut socket: WebSocket) {
863 while let Some(maybe_message) = socket.recv().await {
864 let message_data = maybe_message.unwrap().into_data();
865 let decoded = rmp_serde::from_slice::<Value>(&message_data).unwrap();
866
867 let encoded_data = ::rmp_serde::to_vec(&json!({
868 "format": "binary",
869 "message": decoded
870 }))
871 .unwrap()
872 .into();
873
874 socket.send(Message::Binary(encoded_data)).await.unwrap();
875 }
876 }
877
878 ws.on_upgrade(move |socket| handle_ping_pong(socket))
879 }
880
881 let app = Router::new().route(&"/ws-ping-pong", get(route_get_websocket_ping_pong));
882 TestServer::builder().http_transport().build(app).unwrap()
883 }
884
885 #[tokio::test]
886 async fn it_should_ping_pong_msgpack_in_binary() {
887 let server = new_test_app();
888
889 let mut websocket = server
890 .get_websocket(&"/ws-ping-pong")
891 .await
892 .into_websocket()
893 .await;
894
895 websocket
896 .send_msgpack(&json!({
897 "hello": "world",
898 "numbers": [1, 2, 3],
899 }))
900 .await;
901
902 websocket
903 .assert_receive_msgpack(&json!({
904 "format": "binary",
905 "message": {
906 "hello": "world",
907 "numbers": [1, 2, 3],
908 },
909 }))
910 .await;
911 }
912}