1#![warn(missing_docs)]
61#[cfg(not(target_family = "wasm"))]
62compile_error!("websocket-web requires a WebAssembly target");
63
64mod closed;
65mod standard;
66mod stream;
67mod util;
68
69use futures_core::Stream;
70use futures_sink::Sink;
71use futures_util::{SinkExt, StreamExt};
72use js_sys::{Reflect, Uint8Array};
73use std::{
74 fmt, io,
75 io::ErrorKind,
76 pin::Pin,
77 rc::Rc,
78 task::{ready, Context, Poll},
79};
80use tokio::io::{AsyncRead, AsyncWrite};
81use util::uint8_array_for_api;
82use wasm_bindgen::prelude::*;
83
84pub use closed::{CloseCode, Closed, ClosedReason};
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum Interface {
89 Stream,
93 Standard,
97}
98
99impl Interface {
100 pub fn is_supported(&self) -> bool {
102 let global = js_sys::global();
103 match self {
104 Self::Stream => Reflect::has(&global, &JsValue::from_str("WebSocketStream")).unwrap_or_default(),
105 Self::Standard => Reflect::has(&global, &JsValue::from_str("WebSocket")).unwrap_or_default(),
106 }
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum Msg {
113 Text(String),
115 Binary(Vec<u8>),
117}
118
119impl Msg {
120 pub const fn is_text(&self) -> bool {
122 matches!(self, Self::Text(_))
123 }
124
125 pub const fn is_binary(&self) -> bool {
127 matches!(self, Self::Binary(_))
128 }
129
130 pub fn to_vec(self) -> Vec<u8> {
132 match self {
133 Self::Text(text) => text.into_bytes(),
134 Self::Binary(vec) => vec,
135 }
136 }
137
138 pub fn len(&self) -> usize {
140 match self {
141 Self::Text(text) => text.len(),
142 Self::Binary(vec) => vec.len(),
143 }
144 }
145
146 pub fn is_empty(&self) -> bool {
148 match self {
149 Self::Text(text) => text.is_empty(),
150 Self::Binary(vec) => vec.is_empty(),
151 }
152 }
153}
154
155impl fmt::Display for Msg {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 match self {
158 Self::Text(text) => write!(f, "{text}"),
159 Self::Binary(binary) => write!(f, "{}", String::from_utf8_lossy(binary)),
160 }
161 }
162}
163
164impl From<Msg> for Vec<u8> {
165 fn from(msg: Msg) -> Self {
166 msg.to_vec()
167 }
168}
169
170impl AsRef<[u8]> for Msg {
171 fn as_ref(&self) -> &[u8] {
172 match self {
173 Self::Text(text) => text.as_bytes(),
174 Self::Binary(vec) => vec,
175 }
176 }
177}
178
179#[derive(Debug, Clone)]
181pub struct WebSocketBuilder {
182 url: String,
183 protocols: Vec<String>,
184 interface: Option<Interface>,
185 send_buffer_size: Option<usize>,
186 receive_buffer_size: Option<usize>,
187}
188
189impl WebSocketBuilder {
190 pub fn new(url: impl AsRef<str>) -> Self {
192 Self {
193 url: url.as_ref().to_string(),
194 protocols: Vec::new(),
195 interface: None,
196 send_buffer_size: None,
197 receive_buffer_size: None,
198 }
199 }
200
201 pub fn set_interface(&mut self, interface: Interface) {
205 self.interface = Some(interface);
206 }
207
208 pub fn set_protocols<P>(&mut self, protocols: impl IntoIterator<Item = P>)
220 where
221 P: AsRef<str>,
222 {
223 self.protocols = protocols.into_iter().map(|s| s.as_ref().to_string()).collect();
224 }
225
226 pub fn set_send_buffer_size(&mut self, send_buffer_size: usize) {
238 self.send_buffer_size = Some(send_buffer_size);
239 }
240
241 pub fn set_receive_buffer_size(&mut self, receive_buffer_size: usize) {
251 self.receive_buffer_size = Some(receive_buffer_size);
252 }
253
254 pub async fn connect(self) -> io::Result<WebSocket> {
256 let interface = match self.interface {
257 Some(interface) => interface,
258 None if Interface::Stream.is_supported() => Interface::Stream,
259 None => Interface::Standard,
260 };
261
262 if !interface.is_supported() {
263 match interface {
264 Interface::Stream => {
265 return Err(io::Error::new(ErrorKind::Unsupported, "WebSocketStream not supported"))
266 }
267 Interface::Standard => {
268 return Err(io::Error::new(ErrorKind::Unsupported, "WebSocket not supported"))
269 }
270 }
271 }
272
273 match interface {
274 Interface::Stream => {
275 let (stream, info) = stream::Inner::new(self).await?;
276 Ok(WebSocket { inner: Inner::Stream(stream), info: Rc::new(info), read_buf: Vec::new() })
277 }
278 Interface::Standard => {
279 let (standard, info) = standard::Inner::new(self).await?;
280 Ok(WebSocket { inner: Inner::Standard(standard), info: Rc::new(info), read_buf: Vec::new() })
281 }
282 }
283 }
284}
285
286struct Info {
287 url: String,
288 protocol: String,
289 interface: Interface,
290}
291
292pub struct WebSocket {
296 inner: Inner,
297 info: Rc<Info>,
298 read_buf: Vec<u8>,
299}
300
301enum Inner {
302 Stream(stream::Inner),
303 Standard(standard::Inner),
304}
305
306impl fmt::Debug for WebSocket {
307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308 f.debug_struct("WebSocket")
309 .field("url", &self.info.url)
310 .field("protocol", &self.protocol())
311 .field("interface", &self.interface())
312 .finish()
313 }
314}
315
316impl WebSocket {
317 pub async fn connect(url: impl AsRef<str>) -> io::Result<Self> {
319 WebSocketBuilder::new(url).connect().await
320 }
321
322 pub fn url(&self) -> &str {
324 &self.info.url
325 }
326
327 pub fn protocol(&self) -> &str {
333 &self.info.protocol
334 }
335
336 pub fn interface(&self) -> Interface {
338 self.info.interface
339 }
340
341 pub fn into_split(self) -> (WebSocketSender, WebSocketReceiver) {
343 let Self { inner, info, read_buf } = self;
344 match inner {
345 Inner::Stream(inner) => {
346 let (sender, receiver) = inner.into_split();
347 let sender = WebSocketSender { inner: SenderInner::Stream(sender), info: info.clone() };
348 let receiver = WebSocketReceiver { inner: ReceiverInner::Stream(receiver), info, read_buf };
349 (sender, receiver)
350 }
351 Inner::Standard(inner) => {
352 let (sender, receiver) = inner.into_split();
353 let sender = WebSocketSender { inner: SenderInner::Standard(sender), info: info.clone() };
354 let receiver =
355 WebSocketReceiver { inner: ReceiverInner::Standard(receiver), info, read_buf: Vec::new() };
356 (sender, receiver)
357 }
358 }
359 }
360
361 pub fn close(self) {
363 self.into_split().0.close();
364 }
365
366 #[track_caller]
372 pub fn close_with_reason(self, code: CloseCode, reason: &str) {
373 self.into_split().0.close_with_reason(code, reason);
374 }
375
376 pub fn closed(&self) -> Closed {
378 match &self.inner {
379 Inner::Stream(inner) => inner.closed(),
380 Inner::Standard(inner) => inner.closed(),
381 }
382 }
383
384 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
385 match &mut self.inner {
386 Inner::Stream(inner) => inner.sender.poll_ready_unpin(cx),
387 Inner::Standard(inner) => inner.sender.poll_ready_unpin(cx),
388 }
389 }
390
391 fn start_send(mut self: Pin<&mut Self>, item: &JsValue, len: usize) -> Result<(), io::Error> {
392 match &mut self.inner {
393 Inner::Stream(inner) => inner.sender.start_send_unpin((item, len)),
394 Inner::Standard(inner) => inner.sender.start_send_unpin(item),
395 }
396 }
397
398 fn start_send_binary(mut self: Pin<&mut Self>, data: &[u8]) -> Result<(), io::Error> {
399 match &mut self.inner {
400 Inner::Stream(inner) => {
401 let array: JsValue = Uint8Array::from(data).into();
402 inner.sender.start_send_unpin((&array, data.len()))
403 }
404 Inner::Standard(inner) => {
405 let array: JsValue = unsafe { uint8_array_for_api(data) }.into();
407 inner.sender.start_send_unpin(&array)
408 }
409 }
410 }
411
412 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
413 match &mut self.inner {
414 Inner::Stream(inner) => inner.sender.poll_flush_unpin(cx),
415 Inner::Standard(inner) => inner.sender.poll_flush_unpin(cx),
416 }
417 }
418
419 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
420 match &mut self.inner {
421 Inner::Stream(inner) => inner.sender.poll_close_unpin(cx),
422 Inner::Standard(inner) => inner.sender.poll_close_unpin(cx),
423 }
424 }
425}
426
427impl Sink<&str> for WebSocket {
428 type Error = io::Error;
429
430 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
431 self.poll_ready(cx)
432 }
433
434 fn start_send(self: Pin<&mut Self>, item: &str) -> Result<(), Self::Error> {
435 self.start_send(&JsValue::from_str(item), item.len())
436 }
437
438 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
439 self.poll_flush(cx)
440 }
441
442 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
443 self.poll_close(cx)
444 }
445}
446
447impl Sink<String> for WebSocket {
448 type Error = io::Error;
449
450 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
451 self.poll_ready(cx)
452 }
453
454 fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
455 self.start_send(&JsValue::from_str(&item), item.len())
456 }
457
458 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
459 self.poll_flush(cx)
460 }
461
462 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
463 self.poll_close(cx)
464 }
465}
466
467impl Sink<&[u8]> for WebSocket {
468 type Error = io::Error;
469
470 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
471 self.poll_ready(cx)
472 }
473
474 fn start_send(self: Pin<&mut Self>, item: &[u8]) -> Result<(), Self::Error> {
475 self.start_send_binary(item)
476 }
477
478 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
479 self.poll_flush(cx)
480 }
481
482 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
483 self.poll_close(cx)
484 }
485}
486
487impl Sink<Vec<u8>> for WebSocket {
488 type Error = io::Error;
489
490 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
491 self.poll_ready(cx)
492 }
493
494 fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
495 self.start_send_binary(&item)
496 }
497
498 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
499 self.poll_flush(cx)
500 }
501
502 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
503 self.poll_close(cx)
504 }
505}
506
507impl Sink<Msg> for WebSocket {
508 type Error = io::Error;
509
510 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
511 self.poll_ready(cx)
512 }
513
514 fn start_send(self: Pin<&mut Self>, item: Msg) -> Result<(), Self::Error> {
515 match item {
516 Msg::Text(text) => self.start_send(&JsValue::from_str(&text), text.len()),
517 Msg::Binary(vec) => self.start_send_binary(&vec),
518 }
519 }
520
521 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
522 self.poll_flush(cx)
523 }
524
525 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
526 self.poll_close(cx)
527 }
528}
529
530impl AsyncWrite for WebSocket {
531 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
532 ready!(self.as_mut().poll_ready(cx))?;
533 self.start_send_binary(buf)?;
534 Poll::Ready(Ok(buf.len()))
535 }
536
537 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
538 self.poll_flush(cx)
539 }
540
541 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
542 self.poll_close(cx)
543 }
544}
545
546impl Stream for WebSocket {
547 type Item = io::Result<Msg>;
548
549 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
550 match &mut self.inner {
551 Inner::Stream(inner) => inner.receiver.poll_next_unpin(cx),
552 Inner::Standard(inner) => inner.receiver.poll_next_unpin(cx),
553 }
554 }
555}
556
557impl AsyncRead for WebSocket {
558 fn poll_read(
559 mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut tokio::io::ReadBuf,
560 ) -> Poll<io::Result<()>> {
561 while self.read_buf.is_empty() {
562 let Some(msg) = ready!(self.as_mut().poll_next(cx)?) else { return Poll::Ready(Ok(())) };
563 self.read_buf = msg.to_vec();
564 }
565
566 let to_copy = buf.remaining().min(self.read_buf.len());
567 buf.put_slice(&self.read_buf[..to_copy]);
568 self.read_buf.drain(..to_copy);
569
570 Poll::Ready(Ok(()))
571 }
572}
573
574pub struct WebSocketSender {
579 inner: SenderInner,
580 info: Rc<Info>,
581}
582
583enum SenderInner {
584 Stream(stream::Sender),
585 Standard(standard::Sender),
586}
587
588impl fmt::Debug for WebSocketSender {
589 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
590 f.debug_struct("WebSocketSender")
591 .field("url", &self.info.url)
592 .field("protocol", &self.protocol())
593 .field("interface", &self.interface())
594 .finish()
595 }
596}
597
598impl WebSocketSender {
599 pub fn url(&self) -> &str {
601 &self.info.url
602 }
603
604 pub fn protocol(&self) -> &str {
606 &self.info.protocol
607 }
608
609 pub fn interface(&self) -> Interface {
611 self.info.interface
612 }
613
614 pub fn close(self) {
618 self.close_with_reason(CloseCode::NormalClosure, "");
619 }
620
621 #[track_caller]
629 pub fn close_with_reason(self, code: CloseCode, reason: &str) {
630 if !code.is_valid() {
631 panic!("WebSocket close code {code} is invalid");
632 }
633
634 match self.inner {
635 SenderInner::Stream(sender) => sender.close(code.into(), reason),
636 SenderInner::Standard(sender) => sender.close(code.into(), reason),
637 }
638 }
639
640 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
641 match &mut self.inner {
642 SenderInner::Stream(inner) => inner.poll_ready_unpin(cx),
643 SenderInner::Standard(inner) => inner.poll_ready_unpin(cx),
644 }
645 }
646
647 fn start_send(mut self: Pin<&mut Self>, item: &JsValue, len: usize) -> Result<(), io::Error> {
648 match &mut self.inner {
649 SenderInner::Stream(inner) => inner.start_send_unpin((item, len)),
650 SenderInner::Standard(inner) => inner.start_send_unpin(item),
651 }
652 }
653
654 fn start_send_binary(mut self: Pin<&mut Self>, data: &[u8]) -> Result<(), io::Error> {
655 match &mut self.inner {
656 SenderInner::Stream(inner) => {
657 let array: JsValue = Uint8Array::from(data).into();
658 inner.start_send_unpin((&array, data.len()))
659 }
660 SenderInner::Standard(inner) => {
661 let array: JsValue = unsafe { uint8_array_for_api(data) }.into();
663 inner.start_send_unpin(&array)
664 }
665 }
666 }
667
668 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
669 match &mut self.inner {
670 SenderInner::Stream(inner) => inner.poll_flush_unpin(cx),
671 SenderInner::Standard(inner) => inner.poll_flush_unpin(cx),
672 }
673 }
674
675 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
676 match &mut self.inner {
677 SenderInner::Stream(inner) => inner.poll_close_unpin(cx),
678 SenderInner::Standard(inner) => inner.poll_close_unpin(cx),
679 }
680 }
681}
682
683impl Sink<&str> for WebSocketSender {
684 type Error = io::Error;
685
686 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
687 self.poll_ready(cx)
688 }
689
690 fn start_send(self: Pin<&mut Self>, item: &str) -> Result<(), Self::Error> {
691 self.start_send(&JsValue::from_str(item), item.len())
692 }
693
694 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
695 self.poll_flush(cx)
696 }
697
698 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
699 self.poll_close(cx)
700 }
701}
702
703impl Sink<String> for WebSocketSender {
704 type Error = io::Error;
705
706 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
707 self.poll_ready(cx)
708 }
709
710 fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
711 self.start_send(&JsValue::from_str(&item), item.len())
712 }
713
714 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
715 self.poll_flush(cx)
716 }
717
718 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
719 self.poll_close(cx)
720 }
721}
722
723impl Sink<&[u8]> for WebSocketSender {
724 type Error = io::Error;
725
726 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
727 self.poll_ready(cx)
728 }
729
730 fn start_send(self: Pin<&mut Self>, item: &[u8]) -> Result<(), Self::Error> {
731 self.start_send_binary(item)
732 }
733
734 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
735 self.poll_flush(cx)
736 }
737
738 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
739 self.poll_close(cx)
740 }
741}
742
743impl Sink<Vec<u8>> for WebSocketSender {
744 type Error = io::Error;
745
746 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
747 self.poll_ready(cx)
748 }
749
750 fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
751 self.start_send_binary(&item)
752 }
753
754 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
755 self.poll_flush(cx)
756 }
757
758 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
759 self.poll_close(cx)
760 }
761}
762
763impl Sink<Msg> for WebSocketSender {
764 type Error = io::Error;
765
766 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
767 self.poll_ready(cx)
768 }
769
770 fn start_send(self: Pin<&mut Self>, item: Msg) -> Result<(), Self::Error> {
771 match item {
772 Msg::Text(text) => self.start_send(&JsValue::from_str(&text), text.len()),
773 Msg::Binary(vec) => self.start_send_binary(&vec),
774 }
775 }
776
777 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
778 self.poll_flush(cx)
779 }
780
781 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
782 self.poll_close(cx)
783 }
784}
785
786impl AsyncWrite for WebSocketSender {
787 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
788 ready!(self.as_mut().poll_ready(cx))?;
789 self.start_send_binary(buf)?;
790 Poll::Ready(Ok(buf.len()))
791 }
792
793 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
794 self.poll_flush(cx)
795 }
796
797 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
798 self.poll_close(cx)
799 }
800}
801
802pub struct WebSocketReceiver {
807 inner: ReceiverInner,
808 info: Rc<Info>,
809 read_buf: Vec<u8>,
810}
811
812enum ReceiverInner {
813 Stream(stream::Receiver),
814 Standard(standard::Receiver),
815}
816
817impl fmt::Debug for WebSocketReceiver {
818 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
819 f.debug_struct("WebSocketReceiver")
820 .field("url", &self.info.url)
821 .field("protocol", &self.protocol())
822 .field("interface", &self.interface())
823 .finish()
824 }
825}
826
827impl WebSocketReceiver {
828 pub fn url(&self) -> &str {
830 &self.info.url
831 }
832
833 pub fn protocol(&self) -> &str {
835 &self.info.protocol
836 }
837
838 pub fn interface(&self) -> Interface {
840 self.info.interface
841 }
842}
843
844impl Stream for WebSocketReceiver {
845 type Item = io::Result<Msg>;
846
847 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
848 match &mut self.inner {
849 ReceiverInner::Stream(inner) => inner.poll_next_unpin(cx),
850 ReceiverInner::Standard(inner) => inner.poll_next_unpin(cx),
851 }
852 }
853}
854
855impl AsyncRead for WebSocketReceiver {
856 fn poll_read(
857 mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut tokio::io::ReadBuf,
858 ) -> Poll<io::Result<()>> {
859 while self.read_buf.is_empty() {
860 let Some(msg) = ready!(self.as_mut().poll_next(cx)?) else { return Poll::Ready(Ok(())) };
861 self.read_buf = msg.to_vec();
862 }
863
864 let to_copy = buf.remaining().min(self.read_buf.len());
865 buf.put_slice(&self.read_buf[..to_copy]);
866 self.read_buf.drain(..to_copy);
867
868 Poll::Ready(Ok(()))
869 }
870}