1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
37 std::{
38 future::Future,
39 io::{
40 self,
41 prelude::*,
42 },
43 pin::Pin,
44 },
45 tokio::io::{
46 AsyncRead,
47 AsyncWrite,
48 },
49};
50#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] use {
51 std::{
52 iter,
53 mem,
54 },
55 fallible_collections::FallibleVec,
56 futures::{
57 Sink,
58 SinkExt as _,
59 future::{
60 self,
61 Either,
62 },
63 stream::{
64 self,
65 Stream,
66 StreamExt as _,
67 TryStreamExt as _,
68 },
69 },
70};
71#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
72#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
73pub use {
74 async_proto_derive::{
75 Protocol,
76 bitflags,
77 },
78 crate::error::*,
79};
80#[doc(hidden)] pub use tokio; mod error;
83mod impls;
84
85#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
87
88pub trait Protocol: Sized {
90 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
96 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
102 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
104 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
106
107 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
111 Box::pin(async move {
112 let value = Self::read(&mut stream).await?;
113 Ok((stream, value))
114 })
115 }
116
117 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
160 let mut temp_buf = vec![0; 8];
161 loop {
162 let mut slice = &mut &**buf;
163 match Self::read_sync(&mut slice) {
164 Ok(value) => {
165 let value_len = slice.len();
166 buf.drain(..buf.len() - value_len);
167 return Ok(Some(value))
168 }
169 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
170 Err(e) => return Err(e),
171 }
172 match stream.read(&mut temp_buf) {
173 Ok(0) => return Err(ReadError {
174 context: ErrorContext::DefaultImpl,
175 kind: ReadErrorKind::EndOfStream,
176 }),
177 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
178 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
179 Err(e) => return Err(ReadError {
180 context: ErrorContext::DefaultImpl,
181 kind: e.into(),
182 }),
183 }
184 }
185 }
186
187 #[cfg(feature = "tokio-tungstenite021")]
188 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
189 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
195 Box::pin(async move {
196 let packet = stream.try_next().await.map_err(|e| ReadError {
197 context: ErrorContext::DefaultImpl,
198 kind: e.into(),
199 })?.ok_or_else(|| ReadError {
200 context: ErrorContext::DefaultImpl,
201 kind: ReadErrorKind::EndOfStream,
202 })?;
203 match packet {
204 tungstenite021::Message::Text(data) => match data.chars().next() {
205 Some('m') => {
206 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
207 context: ErrorContext::DefaultImpl,
208 kind: e.into(),
209 })?;
210 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
211 context: ErrorContext::DefaultImpl,
212 kind: e.into(),
213 })?;
214 while buf.len() < len {
215 let packet = stream.try_next().await.map_err(|e| ReadError {
216 context: ErrorContext::DefaultImpl,
217 kind: e.into(),
218 })?.ok_or_else(|| ReadError {
219 context: ErrorContext::DefaultImpl,
220 kind: ReadErrorKind::EndOfStream,
221 })?;
222 if let tungstenite021::Message::Binary(data) = packet {
223 buf.extend_from_slice(&data);
224 } else {
225 return Err(ReadError {
226 context: ErrorContext::DefaultImpl,
227 kind: ReadErrorKind::MessageKind021(packet),
228 })
229 }
230 }
231 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
232 context: ErrorContext::WebSocket {
233 source: Box::new(context),
234 },
235 kind,
236 })
237 }
238 _ => Err(ReadError {
239 context: ErrorContext::DefaultImpl,
240 kind: ReadErrorKind::WebSocketTextMessage(data),
241 }),
242 },
243 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
244 context: ErrorContext::WebSocket {
245 source: Box::new(context),
246 },
247 kind,
248 }),
249 _ => Err(ReadError {
250 context: ErrorContext::DefaultImpl,
251 kind: ReadErrorKind::MessageKind021(packet),
252 }),
253 }
254 })
255 }
256
257 #[cfg(feature = "tokio-tungstenite024")]
258 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
259 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
265 Box::pin(async move {
266 let packet = stream.try_next().await.map_err(|e| ReadError {
267 context: ErrorContext::DefaultImpl,
268 kind: e.into(),
269 })?.ok_or_else(|| ReadError {
270 context: ErrorContext::DefaultImpl,
271 kind: ReadErrorKind::EndOfStream,
272 })?;
273 match packet {
274 tungstenite024::Message::Text(data) => match data.chars().next() {
275 Some('m') => {
276 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
277 context: ErrorContext::DefaultImpl,
278 kind: e.into(),
279 })?;
280 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
281 context: ErrorContext::DefaultImpl,
282 kind: e.into(),
283 })?;
284 while buf.len() < len {
285 let packet = stream.try_next().await.map_err(|e| ReadError {
286 context: ErrorContext::DefaultImpl,
287 kind: e.into(),
288 })?.ok_or_else(|| ReadError {
289 context: ErrorContext::DefaultImpl,
290 kind: ReadErrorKind::EndOfStream,
291 })?;
292 if let tungstenite024::Message::Binary(data) = packet {
293 buf.extend_from_slice(&data);
294 } else {
295 return Err(ReadError {
296 context: ErrorContext::DefaultImpl,
297 kind: ReadErrorKind::MessageKind024(packet),
298 })
299 }
300 }
301 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
302 context: ErrorContext::WebSocket {
303 source: Box::new(context),
304 },
305 kind,
306 })
307 }
308 _ => Err(ReadError {
309 context: ErrorContext::DefaultImpl,
310 kind: ReadErrorKind::WebSocketTextMessage(data),
311 }),
312 },
313 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
314 context: ErrorContext::WebSocket {
315 source: Box::new(context),
316 },
317 kind,
318 }),
319 _ => Err(ReadError {
320 context: ErrorContext::DefaultImpl,
321 kind: ReadErrorKind::MessageKind024(packet),
322 }),
323 }
324 })
325 }
326
327 #[cfg(feature = "tokio-tungstenite021")]
328 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
329 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
335 where Self: Sync {
336 Box::pin(async move {
337 let mut buf = Vec::default();
338 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
339 context: ErrorContext::WebSocket {
340 source: Box::new(context),
341 },
342 kind,
343 })?;
344 if buf.len() <= WS_MAX_MESSAGE_SIZE {
345 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
346 context: ErrorContext::DefaultImpl,
347 kind: e.into(),
348 })?;
349 } else {
350 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
351 context: ErrorContext::DefaultImpl,
352 kind: e.into(),
353 })?;
354 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
355 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
356 context: ErrorContext::DefaultImpl,
357 kind: e.into(),
358 })?;
359 }
360 }
361 Ok(())
362 })
363 }
364
365 #[cfg(feature = "tokio-tungstenite024")]
366 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
367 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
373 where Self: Sync {
374 Box::pin(async move {
375 let mut buf = Vec::default();
376 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
377 context: ErrorContext::WebSocket {
378 source: Box::new(context),
379 },
380 kind,
381 })?;
382 if buf.len() <= WS_MAX_MESSAGE_SIZE {
383 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
384 context: ErrorContext::DefaultImpl,
385 kind: e.into(),
386 })?;
387 } else {
388 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
389 context: ErrorContext::DefaultImpl,
390 kind: e.into(),
391 })?;
392 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
393 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
394 context: ErrorContext::DefaultImpl,
395 kind: e.into(),
396 })?;
397 }
398 }
399 Ok(())
400 })
401 }
402
403 #[cfg(feature = "tokio-tungstenite021")]
404 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
405 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
407 let packet = websocket.read().map_err(|e| ReadError {
408 context: ErrorContext::DefaultImpl,
409 kind: e.into(),
410 })?;
411 match packet {
412 tungstenite021::Message::Text(data) => match data.chars().next() {
413 Some('m') => {
414 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
415 context: ErrorContext::DefaultImpl,
416 kind: e.into(),
417 })?;
418 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
419 context: ErrorContext::DefaultImpl,
420 kind: e.into(),
421 })?;
422 while buf.len() < len {
423 let packet = websocket.read().map_err(|e| ReadError {
424 context: ErrorContext::DefaultImpl,
425 kind: e.into(),
426 })?;
427 if let tungstenite021::Message::Binary(data) = packet {
428 buf.extend_from_slice(&data);
429 } else {
430 return Err(ReadError {
431 context: ErrorContext::DefaultImpl,
432 kind: ReadErrorKind::MessageKind021(packet),
433 })
434 }
435 }
436 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
437 context: ErrorContext::WebSocket {
438 source: Box::new(context),
439 },
440 kind,
441 })
442 }
443 _ => return Err(ReadError {
444 context: ErrorContext::DefaultImpl,
445 kind: ReadErrorKind::WebSocketTextMessage(data),
446 }),
447 },
448 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
449 context: ErrorContext::WebSocket {
450 source: Box::new(context),
451 },
452 kind,
453 }),
454 _ => Err(ReadError {
455 context: ErrorContext::DefaultImpl,
456 kind: ReadErrorKind::MessageKind021(packet),
457 }),
458 }
459 }
460
461 #[cfg(feature = "tokio-tungstenite024")]
462 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
463 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
465 let packet = websocket.read().map_err(|e| ReadError {
466 context: ErrorContext::DefaultImpl,
467 kind: e.into(),
468 })?;
469 match packet {
470 tungstenite024::Message::Text(data) => match data.chars().next() {
471 Some('m') => {
472 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
473 context: ErrorContext::DefaultImpl,
474 kind: e.into(),
475 })?;
476 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
477 context: ErrorContext::DefaultImpl,
478 kind: e.into(),
479 })?;
480 while buf.len() < len {
481 let packet = websocket.read().map_err(|e| ReadError {
482 context: ErrorContext::DefaultImpl,
483 kind: e.into(),
484 })?;
485 if let tungstenite024::Message::Binary(data) = packet {
486 buf.extend_from_slice(&data);
487 } else {
488 return Err(ReadError {
489 context: ErrorContext::DefaultImpl,
490 kind: ReadErrorKind::MessageKind024(packet),
491 })
492 }
493 }
494 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
495 context: ErrorContext::WebSocket {
496 source: Box::new(context),
497 },
498 kind,
499 })
500 }
501 _ => return Err(ReadError {
502 context: ErrorContext::DefaultImpl,
503 kind: ReadErrorKind::WebSocketTextMessage(data),
504 }),
505 },
506 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
507 context: ErrorContext::WebSocket {
508 source: Box::new(context),
509 },
510 kind,
511 }),
512 _ => Err(ReadError {
513 context: ErrorContext::DefaultImpl,
514 kind: ReadErrorKind::MessageKind024(packet),
515 }),
516 }
517 }
518
519 #[cfg(feature = "tokio-tungstenite021")]
520 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
521 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
523 let mut buf = Vec::default();
524 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
525 context: ErrorContext::WebSocket {
526 source: Box::new(context),
527 },
528 kind,
529 })?;
530 if buf.len() <= WS_MAX_MESSAGE_SIZE {
531 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
532 context: ErrorContext::DefaultImpl,
533 kind: e.into(),
534 })?;
535 } else {
536 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
537 context: ErrorContext::DefaultImpl,
538 kind: e.into(),
539 })?;
540 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
541 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
542 context: ErrorContext::DefaultImpl,
543 kind: e.into(),
544 })?;
545 }
546 }
547 websocket.flush().map_err(|e| WriteError {
548 context: ErrorContext::DefaultImpl,
549 kind: e.into(),
550 })?;
551 Ok(())
552 }
553
554 #[cfg(feature = "tokio-tungstenite024")]
555 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
556 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
558 let mut buf = Vec::default();
559 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
560 context: ErrorContext::WebSocket {
561 source: Box::new(context),
562 },
563 kind,
564 })?;
565 if buf.len() <= WS_MAX_MESSAGE_SIZE {
566 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
567 context: ErrorContext::DefaultImpl,
568 kind: e.into(),
569 })?;
570 } else {
571 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
572 context: ErrorContext::DefaultImpl,
573 kind: e.into(),
574 })?;
575 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
576 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
577 context: ErrorContext::DefaultImpl,
578 kind: e.into(),
579 })?;
580 }
581 }
582 websocket.flush().map_err(|e| WriteError {
583 context: ErrorContext::DefaultImpl,
584 kind: e.into(),
585 })?;
586 Ok(())
587 }
588
589 #[cfg(feature = "tokio-tungstenite021")]
590 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
591 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
595 Box::pin(async move {
596 let value = Self::read_ws021(&mut stream).await?;
597 Ok((stream, value))
598 })
599 }
600
601 #[cfg(feature = "tokio-tungstenite024")]
602 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
603 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
607 Box::pin(async move {
608 let value = Self::read_ws024(&mut stream).await?;
609 Ok((stream, value))
610 })
611 }
612}
613
614#[cfg(feature = "tokio-tungstenite021")]
618#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
619pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
620 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
621 let (sink, stream) = sock.split();
622 Ok((
623 sink.sink_map_err(|e| WriteError {
624 context: ErrorContext::WebSocketSink,
625 kind: e.into(),
626 }).with_flat_map::<W, _, _>(|msg| {
627 let mut buf = Vec::default();
628 match msg.write_sync(&mut buf) {
629 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
630 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
631 } else {
632 Either::Right(stream::iter(
633 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
634 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
635 .collect::<Vec<_>>()
636 ))
637 }.map(Ok)),
638 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
639 context: ErrorContext::WebSocket {
640 source: Box::new(context),
641 },
642 kind,
643 }))),
644 }
645 }),
646 stream.scan(None, |state, res| {
666 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
667 let packet = res.map_err(|e| ReadError {
668 context: ErrorContext::WebSocketStream,
669 kind: e.into(),
670 })?;
671 Ok(if let Some((len, buf)) = state {
672 if let tungstenite021::Message::Binary(data) = packet {
673 buf.extend_from_slice(&data);
674 } else {
675 return Err(ReadError {
676 context: ErrorContext::DefaultImpl,
677 kind: ReadErrorKind::MessageKind021(packet),
678 })
679 }
680 if buf.len() >= *len {
681 let buf = mem::take(buf);
682 *state = None;
683 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
684 context: ErrorContext::WebSocket {
685 source: Box::new(context),
686 },
687 kind,
688 })?)))
689 } else {
690 Either::Left(stream::empty())
691 }
692 } else {
693 match packet {
694 tungstenite021::Message::Text(data) => match data.chars().next() {
695 Some('m') => {
696 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
697 context: ErrorContext::DefaultImpl,
698 kind: e.into(),
699 })?;
700 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
701 context: ErrorContext::DefaultImpl,
702 kind: e.into(),
703 })?;
704 *state = Some((len, buf));
705 Either::Left(stream::empty())
706 }
707 _ => return Err(ReadError {
708 context: ErrorContext::DefaultImpl,
709 kind: ReadErrorKind::WebSocketTextMessage(data),
710 }),
711 },
712 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
713 context: ErrorContext::WebSocket {
714 source: Box::new(context),
715 },
716 kind,
717 })?))),
718 _ => return Err(ReadError {
719 context: ErrorContext::DefaultImpl,
720 kind: ReadErrorKind::MessageKind021(packet),
721 }),
722 }
723 })
724 }
725
726 future::ready(Some(scanner(state, res)))
727 }).try_flatten(),
728 ))
729}
730
731#[cfg(feature = "tokio-tungstenite024")]
735#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
736pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
737 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
738 let (sink, stream) = sock.split();
739 Ok((
740 sink.sink_map_err(|e| WriteError {
741 context: ErrorContext::WebSocketSink,
742 kind: e.into(),
743 }).with_flat_map::<W, _, _>(|msg| {
744 let mut buf = Vec::default();
745 match msg.write_sync(&mut buf) {
746 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
747 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
748 } else {
749 Either::Right(stream::iter(
750 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
751 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
752 .collect::<Vec<_>>()
753 ))
754 }.map(Ok)),
755 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
756 context: ErrorContext::WebSocket {
757 source: Box::new(context),
758 },
759 kind,
760 }))),
761 }
762 }),
763 stream.scan(None, |state, res| {
783 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
784 let packet = res.map_err(|e| ReadError {
785 context: ErrorContext::WebSocketStream,
786 kind: e.into(),
787 })?;
788 Ok(if let Some((len, buf)) = state {
789 if let tungstenite024::Message::Binary(data) = packet {
790 buf.extend_from_slice(&data);
791 } else {
792 return Err(ReadError {
793 context: ErrorContext::DefaultImpl,
794 kind: ReadErrorKind::MessageKind024(packet),
795 })
796 }
797 if buf.len() >= *len {
798 let buf = mem::take(buf);
799 *state = None;
800 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
801 context: ErrorContext::WebSocket {
802 source: Box::new(context),
803 },
804 kind,
805 })?)))
806 } else {
807 Either::Left(stream::empty())
808 }
809 } else {
810 match packet {
811 tungstenite024::Message::Text(data) => match data.chars().next() {
812 Some('m') => {
813 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
814 context: ErrorContext::DefaultImpl,
815 kind: e.into(),
816 })?;
817 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
818 context: ErrorContext::DefaultImpl,
819 kind: e.into(),
820 })?;
821 *state = Some((len, buf));
822 Either::Left(stream::empty())
823 }
824 _ => return Err(ReadError {
825 context: ErrorContext::DefaultImpl,
826 kind: ReadErrorKind::WebSocketTextMessage(data),
827 }),
828 },
829 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
830 context: ErrorContext::WebSocket {
831 source: Box::new(context),
832 },
833 kind,
834 })?))),
835 _ => return Err(ReadError {
836 context: ErrorContext::DefaultImpl,
837 kind: ReadErrorKind::MessageKind024(packet),
838 }),
839 }
840 })
841 }
842
843 future::ready(Some(scanner(state, res)))
844 }).try_flatten(),
845 ))
846}