1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
35 std::{
36 future::Future,
37 io::{
38 self,
39 prelude::*,
40 },
41 pin::Pin,
42 },
43 tokio::io::{
44 AsyncRead,
45 AsyncWrite,
46 },
47};
48#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] use {
49 std::{
50 iter,
51 mem,
52 },
53 fallible_collections::FallibleVec,
54 futures::{
55 Sink,
56 SinkExt as _,
57 future::{
58 self,
59 Either,
60 },
61 stream::{
62 self,
63 Stream,
64 StreamExt as _,
65 TryStreamExt as _,
66 },
67 },
68};
69#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
70#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
71pub use {
72 async_proto_derive::{
73 Protocol,
74 bitflags,
75 },
76 crate::error::*,
77};
78#[doc(hidden)] pub use tokio; mod error;
81mod impls;
82
83#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
85
86pub trait Protocol: Sized {
88 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
94 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
100 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
102 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
104
105 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
109 Box::pin(async move {
110 let value = Self::read(&mut stream).await?;
111 Ok((stream, value))
112 })
113 }
114
115 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
158 let mut temp_buf = vec![0; 8];
159 loop {
160 let mut slice = &mut &**buf;
161 match Self::read_sync(&mut slice) {
162 Ok(value) => {
163 let value_len = slice.len();
164 buf.drain(..buf.len() - value_len);
165 return Ok(Some(value))
166 }
167 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
168 Err(e) => return Err(e),
169 }
170 match stream.read(&mut temp_buf) {
171 Ok(0) => return Err(ReadError {
172 context: ErrorContext::DefaultImpl,
173 kind: ReadErrorKind::EndOfStream,
174 }),
175 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
176 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
177 Err(e) => return Err(ReadError {
178 context: ErrorContext::DefaultImpl,
179 kind: e.into(),
180 }),
181 }
182 }
183 }
184
185 #[cfg(feature = "tokio-tungstenite021")]
186 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
187 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
193 Box::pin(async move {
194 let packet = stream.try_next().await.map_err(|e| ReadError {
195 context: ErrorContext::DefaultImpl,
196 kind: e.into(),
197 })?.ok_or_else(|| ReadError {
198 context: ErrorContext::DefaultImpl,
199 kind: ReadErrorKind::EndOfStream,
200 })?;
201 match packet {
202 tungstenite021::Message::Text(data) => match data.chars().next() {
203 Some('m') => {
204 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
205 context: ErrorContext::DefaultImpl,
206 kind: e.into(),
207 })?;
208 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
209 context: ErrorContext::DefaultImpl,
210 kind: e.into(),
211 })?;
212 while buf.len() < len {
213 let packet = stream.try_next().await.map_err(|e| ReadError {
214 context: ErrorContext::DefaultImpl,
215 kind: e.into(),
216 })?.ok_or_else(|| ReadError {
217 context: ErrorContext::DefaultImpl,
218 kind: ReadErrorKind::EndOfStream,
219 })?;
220 if let tungstenite021::Message::Binary(data) = packet {
221 buf.extend_from_slice(&data);
222 } else {
223 return Err(ReadError {
224 context: ErrorContext::DefaultImpl,
225 kind: ReadErrorKind::MessageKind021(packet),
226 })
227 }
228 }
229 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
230 context: ErrorContext::WebSocket {
231 source: Box::new(context),
232 },
233 kind,
234 })
235 }
236 _ => Err(ReadError {
237 context: ErrorContext::DefaultImpl,
238 kind: ReadErrorKind::WebSocketTextMessage(data),
239 }),
240 },
241 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
242 context: ErrorContext::WebSocket {
243 source: Box::new(context),
244 },
245 kind,
246 }),
247 _ => Err(ReadError {
248 context: ErrorContext::DefaultImpl,
249 kind: ReadErrorKind::MessageKind021(packet),
250 }),
251 }
252 })
253 }
254
255 #[cfg(feature = "tokio-tungstenite024")]
256 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
257 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
263 Box::pin(async move {
264 let packet = stream.try_next().await.map_err(|e| ReadError {
265 context: ErrorContext::DefaultImpl,
266 kind: e.into(),
267 })?.ok_or_else(|| ReadError {
268 context: ErrorContext::DefaultImpl,
269 kind: ReadErrorKind::EndOfStream,
270 })?;
271 match packet {
272 tungstenite024::Message::Text(data) => match data.chars().next() {
273 Some('m') => {
274 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
275 context: ErrorContext::DefaultImpl,
276 kind: e.into(),
277 })?;
278 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
279 context: ErrorContext::DefaultImpl,
280 kind: e.into(),
281 })?;
282 while buf.len() < len {
283 let packet = stream.try_next().await.map_err(|e| ReadError {
284 context: ErrorContext::DefaultImpl,
285 kind: e.into(),
286 })?.ok_or_else(|| ReadError {
287 context: ErrorContext::DefaultImpl,
288 kind: ReadErrorKind::EndOfStream,
289 })?;
290 if let tungstenite024::Message::Binary(data) = packet {
291 buf.extend_from_slice(&data);
292 } else {
293 return Err(ReadError {
294 context: ErrorContext::DefaultImpl,
295 kind: ReadErrorKind::MessageKind024(packet),
296 })
297 }
298 }
299 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
300 context: ErrorContext::WebSocket {
301 source: Box::new(context),
302 },
303 kind,
304 })
305 }
306 _ => Err(ReadError {
307 context: ErrorContext::DefaultImpl,
308 kind: ReadErrorKind::WebSocketTextMessage(data),
309 }),
310 },
311 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
312 context: ErrorContext::WebSocket {
313 source: Box::new(context),
314 },
315 kind,
316 }),
317 _ => Err(ReadError {
318 context: ErrorContext::DefaultImpl,
319 kind: ReadErrorKind::MessageKind024(packet),
320 }),
321 }
322 })
323 }
324
325 #[cfg(feature = "tokio-tungstenite021")]
326 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
327 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
333 where Self: Sync {
334 Box::pin(async move {
335 let mut buf = Vec::default();
336 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
337 context: ErrorContext::WebSocket {
338 source: Box::new(context),
339 },
340 kind,
341 })?;
342 if buf.len() <= WS_MAX_MESSAGE_SIZE {
343 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
344 context: ErrorContext::DefaultImpl,
345 kind: e.into(),
346 })?;
347 } else {
348 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
349 context: ErrorContext::DefaultImpl,
350 kind: e.into(),
351 })?;
352 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
353 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
354 context: ErrorContext::DefaultImpl,
355 kind: e.into(),
356 })?;
357 }
358 }
359 Ok(())
360 })
361 }
362
363 #[cfg(feature = "tokio-tungstenite024")]
364 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
365 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
371 where Self: Sync {
372 Box::pin(async move {
373 let mut buf = Vec::default();
374 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
375 context: ErrorContext::WebSocket {
376 source: Box::new(context),
377 },
378 kind,
379 })?;
380 if buf.len() <= WS_MAX_MESSAGE_SIZE {
381 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
382 context: ErrorContext::DefaultImpl,
383 kind: e.into(),
384 })?;
385 } else {
386 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
387 context: ErrorContext::DefaultImpl,
388 kind: e.into(),
389 })?;
390 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
391 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
392 context: ErrorContext::DefaultImpl,
393 kind: e.into(),
394 })?;
395 }
396 }
397 Ok(())
398 })
399 }
400
401 #[cfg(feature = "tokio-tungstenite021")]
402 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
403 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
405 let packet = websocket.read().map_err(|e| ReadError {
406 context: ErrorContext::DefaultImpl,
407 kind: e.into(),
408 })?;
409 match packet {
410 tungstenite021::Message::Text(data) => match data.chars().next() {
411 Some('m') => {
412 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
413 context: ErrorContext::DefaultImpl,
414 kind: e.into(),
415 })?;
416 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
417 context: ErrorContext::DefaultImpl,
418 kind: e.into(),
419 })?;
420 while buf.len() < len {
421 let packet = websocket.read().map_err(|e| ReadError {
422 context: ErrorContext::DefaultImpl,
423 kind: e.into(),
424 })?;
425 if let tungstenite021::Message::Binary(data) = packet {
426 buf.extend_from_slice(&data);
427 } else {
428 return Err(ReadError {
429 context: ErrorContext::DefaultImpl,
430 kind: ReadErrorKind::MessageKind021(packet),
431 })
432 }
433 }
434 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
435 context: ErrorContext::WebSocket {
436 source: Box::new(context),
437 },
438 kind,
439 })
440 }
441 _ => return Err(ReadError {
442 context: ErrorContext::DefaultImpl,
443 kind: ReadErrorKind::WebSocketTextMessage(data),
444 }),
445 },
446 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
447 context: ErrorContext::WebSocket {
448 source: Box::new(context),
449 },
450 kind,
451 }),
452 _ => Err(ReadError {
453 context: ErrorContext::DefaultImpl,
454 kind: ReadErrorKind::MessageKind021(packet),
455 }),
456 }
457 }
458
459 #[cfg(feature = "tokio-tungstenite024")]
460 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
461 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
463 let packet = websocket.read().map_err(|e| ReadError {
464 context: ErrorContext::DefaultImpl,
465 kind: e.into(),
466 })?;
467 match packet {
468 tungstenite024::Message::Text(data) => match data.chars().next() {
469 Some('m') => {
470 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
471 context: ErrorContext::DefaultImpl,
472 kind: e.into(),
473 })?;
474 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
475 context: ErrorContext::DefaultImpl,
476 kind: e.into(),
477 })?;
478 while buf.len() < len {
479 let packet = websocket.read().map_err(|e| ReadError {
480 context: ErrorContext::DefaultImpl,
481 kind: e.into(),
482 })?;
483 if let tungstenite024::Message::Binary(data) = packet {
484 buf.extend_from_slice(&data);
485 } else {
486 return Err(ReadError {
487 context: ErrorContext::DefaultImpl,
488 kind: ReadErrorKind::MessageKind024(packet),
489 })
490 }
491 }
492 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
493 context: ErrorContext::WebSocket {
494 source: Box::new(context),
495 },
496 kind,
497 })
498 }
499 _ => return Err(ReadError {
500 context: ErrorContext::DefaultImpl,
501 kind: ReadErrorKind::WebSocketTextMessage(data),
502 }),
503 },
504 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
505 context: ErrorContext::WebSocket {
506 source: Box::new(context),
507 },
508 kind,
509 }),
510 _ => Err(ReadError {
511 context: ErrorContext::DefaultImpl,
512 kind: ReadErrorKind::MessageKind024(packet),
513 }),
514 }
515 }
516
517 #[cfg(feature = "tokio-tungstenite021")]
518 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
519 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
521 let mut buf = Vec::default();
522 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
523 context: ErrorContext::WebSocket {
524 source: Box::new(context),
525 },
526 kind,
527 })?;
528 if buf.len() <= WS_MAX_MESSAGE_SIZE {
529 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
530 context: ErrorContext::DefaultImpl,
531 kind: e.into(),
532 })?;
533 } else {
534 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
535 context: ErrorContext::DefaultImpl,
536 kind: e.into(),
537 })?;
538 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
539 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
540 context: ErrorContext::DefaultImpl,
541 kind: e.into(),
542 })?;
543 }
544 }
545 websocket.flush().map_err(|e| WriteError {
546 context: ErrorContext::DefaultImpl,
547 kind: e.into(),
548 })?;
549 Ok(())
550 }
551
552 #[cfg(feature = "tokio-tungstenite024")]
553 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
554 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
556 let mut buf = Vec::default();
557 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
558 context: ErrorContext::WebSocket {
559 source: Box::new(context),
560 },
561 kind,
562 })?;
563 if buf.len() <= WS_MAX_MESSAGE_SIZE {
564 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
565 context: ErrorContext::DefaultImpl,
566 kind: e.into(),
567 })?;
568 } else {
569 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
570 context: ErrorContext::DefaultImpl,
571 kind: e.into(),
572 })?;
573 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
574 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
575 context: ErrorContext::DefaultImpl,
576 kind: e.into(),
577 })?;
578 }
579 }
580 websocket.flush().map_err(|e| WriteError {
581 context: ErrorContext::DefaultImpl,
582 kind: e.into(),
583 })?;
584 Ok(())
585 }
586
587 #[cfg(feature = "tokio-tungstenite021")]
588 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
589 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
593 Box::pin(async move {
594 let value = Self::read_ws021(&mut stream).await?;
595 Ok((stream, value))
596 })
597 }
598
599 #[cfg(feature = "tokio-tungstenite024")]
600 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
601 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
605 Box::pin(async move {
606 let value = Self::read_ws024(&mut stream).await?;
607 Ok((stream, value))
608 })
609 }
610}
611
612#[cfg(feature = "tokio-tungstenite021")]
616#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
617pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
618 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
619 let (sink, stream) = sock.split();
620 Ok((
621 sink.sink_map_err(|e| WriteError {
622 context: ErrorContext::WebSocketSink,
623 kind: e.into(),
624 }).with_flat_map::<W, _, _>(|msg| {
625 let mut buf = Vec::default();
626 match msg.write_sync(&mut buf) {
627 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
628 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
629 } else {
630 Either::Right(stream::iter(
631 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
632 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
633 .collect::<Vec<_>>()
634 ))
635 }.map(Ok)),
636 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
637 context: ErrorContext::WebSocket {
638 source: Box::new(context),
639 },
640 kind,
641 }))),
642 }
643 }),
644 stream.scan(None, |state, res| {
664 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>>, ReadError> {
665 let packet = res.map_err(|e| ReadError {
666 context: ErrorContext::WebSocketStream,
667 kind: e.into(),
668 })?;
669 Ok(if let Some((len, buf)) = state {
670 if let tungstenite021::Message::Binary(data) = packet {
671 buf.extend_from_slice(&data);
672 } else {
673 return Err(ReadError {
674 context: ErrorContext::DefaultImpl,
675 kind: ReadErrorKind::MessageKind021(packet),
676 })
677 }
678 if buf.len() >= *len {
679 let buf = mem::take(buf);
680 *state = None;
681 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
682 context: ErrorContext::WebSocket {
683 source: Box::new(context),
684 },
685 kind,
686 })?)))
687 } else {
688 Either::Left(stream::empty())
689 }
690 } else {
691 match packet {
692 tungstenite021::Message::Text(data) => match data.chars().next() {
693 Some('m') => {
694 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
695 context: ErrorContext::DefaultImpl,
696 kind: e.into(),
697 })?;
698 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
699 context: ErrorContext::DefaultImpl,
700 kind: e.into(),
701 })?;
702 *state = Some((len, buf));
703 Either::Left(stream::empty())
704 }
705 _ => return Err(ReadError {
706 context: ErrorContext::DefaultImpl,
707 kind: ReadErrorKind::WebSocketTextMessage(data),
708 }),
709 },
710 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
711 context: ErrorContext::WebSocket {
712 source: Box::new(context),
713 },
714 kind,
715 })?))),
716 _ => return Err(ReadError {
717 context: ErrorContext::DefaultImpl,
718 kind: ReadErrorKind::MessageKind021(packet),
719 }),
720 }
721 })
722 }
723
724 future::ready(Some(scanner(state, res)))
725 }).try_flatten(),
726 ))
727}
728
729#[cfg(feature = "tokio-tungstenite024")]
733#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
734pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
735 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
736 let (sink, stream) = sock.split();
737 Ok((
738 sink.sink_map_err(|e| WriteError {
739 context: ErrorContext::WebSocketSink,
740 kind: e.into(),
741 }).with_flat_map::<W, _, _>(|msg| {
742 let mut buf = Vec::default();
743 match msg.write_sync(&mut buf) {
744 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
745 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
746 } else {
747 Either::Right(stream::iter(
748 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
749 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
750 .collect::<Vec<_>>()
751 ))
752 }.map(Ok)),
753 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
754 context: ErrorContext::WebSocket {
755 source: Box::new(context),
756 },
757 kind,
758 }))),
759 }
760 }),
761 stream.scan(None, |state, res| {
781 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>>, ReadError> {
782 let packet = res.map_err(|e| ReadError {
783 context: ErrorContext::WebSocketStream,
784 kind: e.into(),
785 })?;
786 Ok(if let Some((len, buf)) = state {
787 if let tungstenite024::Message::Binary(data) = packet {
788 buf.extend_from_slice(&data);
789 } else {
790 return Err(ReadError {
791 context: ErrorContext::DefaultImpl,
792 kind: ReadErrorKind::MessageKind024(packet),
793 })
794 }
795 if buf.len() >= *len {
796 let buf = mem::take(buf);
797 *state = None;
798 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
799 context: ErrorContext::WebSocket {
800 source: Box::new(context),
801 },
802 kind,
803 })?)))
804 } else {
805 Either::Left(stream::empty())
806 }
807 } else {
808 match packet {
809 tungstenite024::Message::Text(data) => match data.chars().next() {
810 Some('m') => {
811 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
812 context: ErrorContext::DefaultImpl,
813 kind: e.into(),
814 })?;
815 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
816 context: ErrorContext::DefaultImpl,
817 kind: e.into(),
818 })?;
819 *state = Some((len, buf));
820 Either::Left(stream::empty())
821 }
822 _ => return Err(ReadError {
823 context: ErrorContext::DefaultImpl,
824 kind: ReadErrorKind::WebSocketTextMessage(data),
825 }),
826 },
827 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
828 context: ErrorContext::WebSocket {
829 source: Box::new(context),
830 },
831 kind,
832 })?))),
833 _ => return Err(ReadError {
834 context: ErrorContext::DefaultImpl,
835 kind: ReadErrorKind::MessageKind024(packet),
836 }),
837 }
838 })
839 }
840
841 future::ready(Some(scanner(state, res)))
842 }).try_flatten(),
843 ))
844}