1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
44 std::{
45 future::Future,
46 io::{
47 self,
48 prelude::*,
49 },
50 pin::Pin,
51 },
52 tokio::io::{
53 AsyncRead,
54 AsyncWrite,
55 },
56};
57#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] use {
58 std::{
59 iter,
60 mem,
61 },
62 fallible_collections::FallibleVec,
63 futures::{
64 Sink,
65 SinkExt as _,
66 future::{
67 self,
68 Either,
69 },
70 stream::{
71 self,
72 Stream,
73 StreamExt as _,
74 TryStreamExt as _,
75 },
76 },
77};
78#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
79#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
80#[cfg(feature = "tokio-tungstenite028")] use tokio_tungstenite028::tungstenite as tungstenite028;
81pub use {
82 async_proto_derive::{
83 Protocol,
84 bitflags,
85 },
86 crate::error::*,
87};
88#[doc(hidden)] pub use tokio; mod error;
91mod impls;
92
93#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
95
96pub trait Protocol: Sized {
98 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
104 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
110 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
112 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
114
115 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
119 Box::pin(async move {
120 let value = Self::read(&mut stream).await?;
121 Ok((stream, value))
122 })
123 }
124
125 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
168 let mut temp_buf = vec![0; 8];
169 loop {
170 let mut slice = &mut &**buf;
171 match Self::read_sync(&mut slice) {
172 Ok(value) => {
173 let value_len = slice.len();
174 buf.drain(..buf.len() - value_len);
175 return Ok(Some(value))
176 }
177 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
178 Err(e) => return Err(e),
179 }
180 match stream.read(&mut temp_buf) {
181 Ok(0) => return Err(ReadError {
182 context: ErrorContext::DefaultImpl,
183 kind: ReadErrorKind::EndOfStream,
184 }),
185 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
186 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
187 Err(e) => return Err(ReadError {
188 context: ErrorContext::DefaultImpl,
189 kind: e.into(),
190 }),
191 }
192 }
193 }
194
195 #[cfg(feature = "tokio-tungstenite021")]
196 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
197 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
203 Box::pin(async move {
204 let packet = stream.try_next().await.map_err(|e| ReadError {
205 context: ErrorContext::DefaultImpl,
206 kind: e.into(),
207 })?.ok_or_else(|| ReadError {
208 context: ErrorContext::DefaultImpl,
209 kind: ReadErrorKind::EndOfStream,
210 })?;
211 match packet {
212 tungstenite021::Message::Text(data) => match data.chars().next() {
213 Some('m') => {
214 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
215 context: ErrorContext::DefaultImpl,
216 kind: e.into(),
217 })?;
218 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
219 context: ErrorContext::DefaultImpl,
220 kind: e.into(),
221 })?;
222 while buf.len() < len {
223 let packet = stream.try_next().await.map_err(|e| ReadError {
224 context: ErrorContext::DefaultImpl,
225 kind: e.into(),
226 })?.ok_or_else(|| ReadError {
227 context: ErrorContext::DefaultImpl,
228 kind: ReadErrorKind::EndOfStream,
229 })?;
230 if let tungstenite021::Message::Binary(data) = packet {
231 buf.extend_from_slice(&data);
232 } else {
233 return Err(ReadError {
234 context: ErrorContext::DefaultImpl,
235 kind: ReadErrorKind::MessageKind021(packet),
236 })
237 }
238 }
239 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
240 context: ErrorContext::WebSocket {
241 source: Box::new(context),
242 },
243 kind,
244 })
245 }
246 _ => Err(ReadError {
247 context: ErrorContext::DefaultImpl,
248 kind: ReadErrorKind::WebSocketTextMessage024(data),
249 }),
250 },
251 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
252 context: ErrorContext::WebSocket {
253 source: Box::new(context),
254 },
255 kind,
256 }),
257 _ => Err(ReadError {
258 context: ErrorContext::DefaultImpl,
259 kind: ReadErrorKind::MessageKind021(packet),
260 }),
261 }
262 })
263 }
264
265 #[cfg(feature = "tokio-tungstenite024")]
266 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
267 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
273 Box::pin(async move {
274 let packet = stream.try_next().await.map_err(|e| ReadError {
275 context: ErrorContext::DefaultImpl,
276 kind: e.into(),
277 })?.ok_or_else(|| ReadError {
278 context: ErrorContext::DefaultImpl,
279 kind: ReadErrorKind::EndOfStream,
280 })?;
281 match packet {
282 tungstenite024::Message::Text(data) => match data.chars().next() {
283 Some('m') => {
284 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
285 context: ErrorContext::DefaultImpl,
286 kind: e.into(),
287 })?;
288 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
289 context: ErrorContext::DefaultImpl,
290 kind: e.into(),
291 })?;
292 while buf.len() < len {
293 let packet = stream.try_next().await.map_err(|e| ReadError {
294 context: ErrorContext::DefaultImpl,
295 kind: e.into(),
296 })?.ok_or_else(|| ReadError {
297 context: ErrorContext::DefaultImpl,
298 kind: ReadErrorKind::EndOfStream,
299 })?;
300 if let tungstenite024::Message::Binary(data) = packet {
301 buf.extend_from_slice(&data);
302 } else {
303 return Err(ReadError {
304 context: ErrorContext::DefaultImpl,
305 kind: ReadErrorKind::MessageKind024(packet),
306 })
307 }
308 }
309 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
310 context: ErrorContext::WebSocket {
311 source: Box::new(context),
312 },
313 kind,
314 })
315 }
316 _ => Err(ReadError {
317 context: ErrorContext::DefaultImpl,
318 kind: ReadErrorKind::WebSocketTextMessage024(data),
319 }),
320 },
321 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
322 context: ErrorContext::WebSocket {
323 source: Box::new(context),
324 },
325 kind,
326 }),
327 _ => Err(ReadError {
328 context: ErrorContext::DefaultImpl,
329 kind: ReadErrorKind::MessageKind024(packet),
330 }),
331 }
332 })
333 }
334
335 #[cfg(feature = "tokio-tungstenite028")]
336 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
337 fn read_ws028<'a, R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
343 Box::pin(async move {
344 let packet = stream.try_next().await.map_err(|e| ReadError {
345 context: ErrorContext::DefaultImpl,
346 kind: e.into(),
347 })?.ok_or_else(|| ReadError {
348 context: ErrorContext::DefaultImpl,
349 kind: ReadErrorKind::EndOfStream,
350 })?;
351 match packet {
352 tungstenite028::Message::Text(data) => match data.chars().next() {
353 Some('m') => {
354 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
355 context: ErrorContext::DefaultImpl,
356 kind: e.into(),
357 })?;
358 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
359 context: ErrorContext::DefaultImpl,
360 kind: e.into(),
361 })?;
362 while buf.len() < len {
363 let packet = stream.try_next().await.map_err(|e| ReadError {
364 context: ErrorContext::DefaultImpl,
365 kind: e.into(),
366 })?.ok_or_else(|| ReadError {
367 context: ErrorContext::DefaultImpl,
368 kind: ReadErrorKind::EndOfStream,
369 })?;
370 if let tungstenite028::Message::Binary(data) = packet {
371 buf.extend_from_slice(&data);
372 } else {
373 return Err(ReadError {
374 context: ErrorContext::DefaultImpl,
375 kind: ReadErrorKind::MessageKind028(packet),
376 })
377 }
378 }
379 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
380 context: ErrorContext::WebSocket {
381 source: Box::new(context),
382 },
383 kind,
384 })
385 }
386 _ => Err(ReadError {
387 context: ErrorContext::DefaultImpl,
388 kind: ReadErrorKind::WebSocketTextMessage028(data),
389 }),
390 },
391 tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
392 context: ErrorContext::WebSocket {
393 source: Box::new(context),
394 },
395 kind,
396 }),
397 _ => Err(ReadError {
398 context: ErrorContext::DefaultImpl,
399 kind: ReadErrorKind::MessageKind028(packet),
400 }),
401 }
402 })
403 }
404
405 #[cfg(feature = "tokio-tungstenite021")]
406 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
407 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
413 where Self: Sync {
414 Box::pin(async move {
415 let mut buf = Vec::default();
416 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
417 context: ErrorContext::WebSocket {
418 source: Box::new(context),
419 },
420 kind,
421 })?;
422 if buf.len() <= WS_MAX_MESSAGE_SIZE {
423 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
424 context: ErrorContext::DefaultImpl,
425 kind: e.into(),
426 })?;
427 } else {
428 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
429 context: ErrorContext::DefaultImpl,
430 kind: e.into(),
431 })?;
432 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
433 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
434 context: ErrorContext::DefaultImpl,
435 kind: e.into(),
436 })?;
437 }
438 }
439 Ok(())
440 })
441 }
442
443 #[cfg(feature = "tokio-tungstenite024")]
444 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
445 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
451 where Self: Sync {
452 Box::pin(async move {
453 let mut buf = Vec::default();
454 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
455 context: ErrorContext::WebSocket {
456 source: Box::new(context),
457 },
458 kind,
459 })?;
460 if buf.len() <= WS_MAX_MESSAGE_SIZE {
461 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
462 context: ErrorContext::DefaultImpl,
463 kind: e.into(),
464 })?;
465 } else {
466 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
467 context: ErrorContext::DefaultImpl,
468 kind: e.into(),
469 })?;
470 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
471 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
472 context: ErrorContext::DefaultImpl,
473 kind: e.into(),
474 })?;
475 }
476 }
477 Ok(())
478 })
479 }
480
481 #[cfg(feature = "tokio-tungstenite028")]
482 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
483 fn write_ws028<'a, W: Sink<tungstenite028::Message, Error = tungstenite028::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
489 where Self: Sync {
490 Box::pin(async move {
491 let mut buf = Vec::default();
492 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
493 context: ErrorContext::WebSocket {
494 source: Box::new(context),
495 },
496 kind,
497 })?;
498 if buf.len() <= WS_MAX_MESSAGE_SIZE {
499 sink.send(tungstenite028::Message::binary(buf)).await.map_err(|e| WriteError {
500 context: ErrorContext::DefaultImpl,
501 kind: e.into(),
502 })?;
503 } else {
504 sink.send(tungstenite028::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
505 context: ErrorContext::DefaultImpl,
506 kind: e.into(),
507 })?;
508 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
509 sink.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
510 context: ErrorContext::DefaultImpl,
511 kind: e.into(),
512 })?;
513 }
514 }
515 Ok(())
516 })
517 }
518
519 #[cfg(feature = "tokio-tungstenite021")]
520 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
521 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
523 let packet = websocket.read().map_err(|e| ReadError {
524 context: ErrorContext::DefaultImpl,
525 kind: e.into(),
526 })?;
527 match packet {
528 tungstenite021::Message::Text(data) => match data.chars().next() {
529 Some('m') => {
530 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
531 context: ErrorContext::DefaultImpl,
532 kind: e.into(),
533 })?;
534 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
535 context: ErrorContext::DefaultImpl,
536 kind: e.into(),
537 })?;
538 while buf.len() < len {
539 let packet = websocket.read().map_err(|e| ReadError {
540 context: ErrorContext::DefaultImpl,
541 kind: e.into(),
542 })?;
543 if let tungstenite021::Message::Binary(data) = packet {
544 buf.extend_from_slice(&data);
545 } else {
546 return Err(ReadError {
547 context: ErrorContext::DefaultImpl,
548 kind: ReadErrorKind::MessageKind021(packet),
549 })
550 }
551 }
552 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
553 context: ErrorContext::WebSocket {
554 source: Box::new(context),
555 },
556 kind,
557 })
558 }
559 _ => return Err(ReadError {
560 context: ErrorContext::DefaultImpl,
561 kind: ReadErrorKind::WebSocketTextMessage024(data),
562 }),
563 },
564 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
565 context: ErrorContext::WebSocket {
566 source: Box::new(context),
567 },
568 kind,
569 }),
570 _ => Err(ReadError {
571 context: ErrorContext::DefaultImpl,
572 kind: ReadErrorKind::MessageKind021(packet),
573 }),
574 }
575 }
576
577 #[cfg(feature = "tokio-tungstenite024")]
578 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
579 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
581 let packet = websocket.read().map_err(|e| ReadError {
582 context: ErrorContext::DefaultImpl,
583 kind: e.into(),
584 })?;
585 match packet {
586 tungstenite024::Message::Text(data) => match data.chars().next() {
587 Some('m') => {
588 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
589 context: ErrorContext::DefaultImpl,
590 kind: e.into(),
591 })?;
592 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
593 context: ErrorContext::DefaultImpl,
594 kind: e.into(),
595 })?;
596 while buf.len() < len {
597 let packet = websocket.read().map_err(|e| ReadError {
598 context: ErrorContext::DefaultImpl,
599 kind: e.into(),
600 })?;
601 if let tungstenite024::Message::Binary(data) = packet {
602 buf.extend_from_slice(&data);
603 } else {
604 return Err(ReadError {
605 context: ErrorContext::DefaultImpl,
606 kind: ReadErrorKind::MessageKind024(packet),
607 })
608 }
609 }
610 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
611 context: ErrorContext::WebSocket {
612 source: Box::new(context),
613 },
614 kind,
615 })
616 }
617 _ => return Err(ReadError {
618 context: ErrorContext::DefaultImpl,
619 kind: ReadErrorKind::WebSocketTextMessage024(data),
620 }),
621 },
622 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
623 context: ErrorContext::WebSocket {
624 source: Box::new(context),
625 },
626 kind,
627 }),
628 _ => Err(ReadError {
629 context: ErrorContext::DefaultImpl,
630 kind: ReadErrorKind::MessageKind024(packet),
631 }),
632 }
633 }
634
635 #[cfg(feature = "tokio-tungstenite028")]
636 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
637 fn read_ws_sync028(websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
639 let packet = websocket.read().map_err(|e| ReadError {
640 context: ErrorContext::DefaultImpl,
641 kind: e.into(),
642 })?;
643 match packet {
644 tungstenite028::Message::Text(data) => match data.chars().next() {
645 Some('m') => {
646 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
647 context: ErrorContext::DefaultImpl,
648 kind: e.into(),
649 })?;
650 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
651 context: ErrorContext::DefaultImpl,
652 kind: e.into(),
653 })?;
654 while buf.len() < len {
655 let packet = websocket.read().map_err(|e| ReadError {
656 context: ErrorContext::DefaultImpl,
657 kind: e.into(),
658 })?;
659 if let tungstenite028::Message::Binary(data) = packet {
660 buf.extend_from_slice(&data);
661 } else {
662 return Err(ReadError {
663 context: ErrorContext::DefaultImpl,
664 kind: ReadErrorKind::MessageKind028(packet),
665 })
666 }
667 }
668 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
669 context: ErrorContext::WebSocket {
670 source: Box::new(context),
671 },
672 kind,
673 })
674 }
675 _ => return Err(ReadError {
676 context: ErrorContext::DefaultImpl,
677 kind: ReadErrorKind::WebSocketTextMessage028(data),
678 }),
679 },
680 tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
681 context: ErrorContext::WebSocket {
682 source: Box::new(context),
683 },
684 kind,
685 }),
686 _ => Err(ReadError {
687 context: ErrorContext::DefaultImpl,
688 kind: ReadErrorKind::MessageKind028(packet),
689 }),
690 }
691 }
692
693 #[cfg(feature = "tokio-tungstenite021")]
694 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
695 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
697 let mut buf = Vec::default();
698 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
699 context: ErrorContext::WebSocket {
700 source: Box::new(context),
701 },
702 kind,
703 })?;
704 if buf.len() <= WS_MAX_MESSAGE_SIZE {
705 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
706 context: ErrorContext::DefaultImpl,
707 kind: e.into(),
708 })?;
709 } else {
710 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
711 context: ErrorContext::DefaultImpl,
712 kind: e.into(),
713 })?;
714 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
715 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
716 context: ErrorContext::DefaultImpl,
717 kind: e.into(),
718 })?;
719 }
720 }
721 websocket.flush().map_err(|e| WriteError {
722 context: ErrorContext::DefaultImpl,
723 kind: e.into(),
724 })?;
725 Ok(())
726 }
727
728 #[cfg(feature = "tokio-tungstenite024")]
729 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
730 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
732 let mut buf = Vec::default();
733 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
734 context: ErrorContext::WebSocket {
735 source: Box::new(context),
736 },
737 kind,
738 })?;
739 if buf.len() <= WS_MAX_MESSAGE_SIZE {
740 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
741 context: ErrorContext::DefaultImpl,
742 kind: e.into(),
743 })?;
744 } else {
745 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
746 context: ErrorContext::DefaultImpl,
747 kind: e.into(),
748 })?;
749 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
750 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
751 context: ErrorContext::DefaultImpl,
752 kind: e.into(),
753 })?;
754 }
755 }
756 websocket.flush().map_err(|e| WriteError {
757 context: ErrorContext::DefaultImpl,
758 kind: e.into(),
759 })?;
760 Ok(())
761 }
762
763 #[cfg(feature = "tokio-tungstenite028")]
764 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
765 fn write_ws_sync028(&self, websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
767 let mut buf = Vec::default();
768 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
769 context: ErrorContext::WebSocket {
770 source: Box::new(context),
771 },
772 kind,
773 })?;
774 if buf.len() <= WS_MAX_MESSAGE_SIZE {
775 websocket.send(tungstenite028::Message::binary(buf)).map_err(|e| WriteError {
776 context: ErrorContext::DefaultImpl,
777 kind: e.into(),
778 })?;
779 } else {
780 websocket.send(tungstenite028::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
781 context: ErrorContext::DefaultImpl,
782 kind: e.into(),
783 })?;
784 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
785 websocket.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
786 context: ErrorContext::DefaultImpl,
787 kind: e.into(),
788 })?;
789 }
790 }
791 websocket.flush().map_err(|e| WriteError {
792 context: ErrorContext::DefaultImpl,
793 kind: e.into(),
794 })?;
795 Ok(())
796 }
797
798 #[cfg(feature = "tokio-tungstenite021")]
799 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
800 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
804 Box::pin(async move {
805 let value = Self::read_ws021(&mut stream).await?;
806 Ok((stream, value))
807 })
808 }
809
810 #[cfg(feature = "tokio-tungstenite024")]
811 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
812 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
816 Box::pin(async move {
817 let value = Self::read_ws024(&mut stream).await?;
818 Ok((stream, value))
819 })
820 }
821
822 #[cfg(feature = "tokio-tungstenite028")]
823 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
824 fn read_ws_owned028<R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
828 Box::pin(async move {
829 let value = Self::read_ws028(&mut stream).await?;
830 Ok((stream, value))
831 })
832 }
833}
834
835pub trait LengthPrefixed: Protocol {
839 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
841 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
843 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
845 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
847}
848
849#[cfg(feature = "tokio-tungstenite021")]
853#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
854pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
855 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
856 let (sink, stream) = sock.split();
857 Ok((
858 sink.sink_map_err(|e| WriteError {
859 context: ErrorContext::WebSocketSink,
860 kind: e.into(),
861 }).with_flat_map::<W, _, _>(|msg| {
862 let mut buf = Vec::default();
863 match msg.write_sync(&mut buf) {
864 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
865 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
866 } else {
867 Either::Right(stream::iter(
868 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
869 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
870 .collect::<Vec<_>>()
871 ))
872 }.map(Ok)),
873 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
874 context: ErrorContext::WebSocket {
875 source: Box::new(context),
876 },
877 kind,
878 }))),
879 }
880 }),
881 stream.scan(None, |state, res| {
882 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
883 let packet = res.map_err(|e| ReadError {
884 context: ErrorContext::WebSocketStream,
885 kind: e.into(),
886 })?;
887 Ok(if let Some((len, buf)) = state {
888 if let tungstenite021::Message::Binary(data) = packet {
889 buf.extend_from_slice(&data);
890 } else {
891 return Err(ReadError {
892 context: ErrorContext::DefaultImpl,
893 kind: ReadErrorKind::MessageKind021(packet),
894 })
895 }
896 if buf.len() >= *len {
897 let buf = mem::take(buf);
898 *state = None;
899 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
900 context: ErrorContext::WebSocket {
901 source: Box::new(context),
902 },
903 kind,
904 })?)))
905 } else {
906 Either::Left(stream::empty())
907 }
908 } else {
909 match packet {
910 tungstenite021::Message::Text(data) => match data.chars().next() {
911 Some('m') => {
912 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
913 context: ErrorContext::DefaultImpl,
914 kind: e.into(),
915 })?;
916 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
917 context: ErrorContext::DefaultImpl,
918 kind: e.into(),
919 })?;
920 *state = Some((len, buf));
921 Either::Left(stream::empty())
922 }
923 _ => return Err(ReadError {
924 context: ErrorContext::DefaultImpl,
925 kind: ReadErrorKind::WebSocketTextMessage024(data),
926 }),
927 },
928 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
929 context: ErrorContext::WebSocket {
930 source: Box::new(context),
931 },
932 kind,
933 })?))),
934 _ => return Err(ReadError {
935 context: ErrorContext::DefaultImpl,
936 kind: ReadErrorKind::MessageKind021(packet),
937 }),
938 }
939 })
940 }
941
942 future::ready(Some(scanner(state, res)))
943 }).try_flatten(),
944 ))
945}
946
947#[cfg(feature = "tokio-tungstenite024")]
951#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
952pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
953 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
954 let (sink, stream) = sock.split();
955 Ok((
956 sink.sink_map_err(|e| WriteError {
957 context: ErrorContext::WebSocketSink,
958 kind: e.into(),
959 }).with_flat_map::<W, _, _>(|msg| {
960 let mut buf = Vec::default();
961 match msg.write_sync(&mut buf) {
962 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
963 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
964 } else {
965 Either::Right(stream::iter(
966 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
967 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
968 .collect::<Vec<_>>()
969 ))
970 }.map(Ok)),
971 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
972 context: ErrorContext::WebSocket {
973 source: Box::new(context),
974 },
975 kind,
976 }))),
977 }
978 }),
979 stream.scan(None, |state, res| {
980 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
981 let packet = res.map_err(|e| ReadError {
982 context: ErrorContext::WebSocketStream,
983 kind: e.into(),
984 })?;
985 Ok(if let Some((len, buf)) = state {
986 if let tungstenite024::Message::Binary(data) = packet {
987 buf.extend_from_slice(&data);
988 } else {
989 return Err(ReadError {
990 context: ErrorContext::DefaultImpl,
991 kind: ReadErrorKind::MessageKind024(packet),
992 })
993 }
994 if buf.len() >= *len {
995 let buf = mem::take(buf);
996 *state = None;
997 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
998 context: ErrorContext::WebSocket {
999 source: Box::new(context),
1000 },
1001 kind,
1002 })?)))
1003 } else {
1004 Either::Left(stream::empty())
1005 }
1006 } else {
1007 match packet {
1008 tungstenite024::Message::Text(data) => match data.chars().next() {
1009 Some('m') => {
1010 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1011 context: ErrorContext::DefaultImpl,
1012 kind: e.into(),
1013 })?;
1014 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1015 context: ErrorContext::DefaultImpl,
1016 kind: e.into(),
1017 })?;
1018 *state = Some((len, buf));
1019 Either::Left(stream::empty())
1020 }
1021 _ => return Err(ReadError {
1022 context: ErrorContext::DefaultImpl,
1023 kind: ReadErrorKind::WebSocketTextMessage024(data),
1024 }),
1025 },
1026 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1027 context: ErrorContext::WebSocket {
1028 source: Box::new(context),
1029 },
1030 kind,
1031 })?))),
1032 _ => return Err(ReadError {
1033 context: ErrorContext::DefaultImpl,
1034 kind: ReadErrorKind::MessageKind024(packet),
1035 }),
1036 }
1037 })
1038 }
1039
1040 future::ready(Some(scanner(state, res)))
1041 }).try_flatten(),
1042 ))
1043}
1044
1045#[cfg(feature = "tokio-tungstenite028")]
1049#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
1050pub async fn websocket028<R: Protocol, W: Protocol>(request: impl tungstenite028::client::IntoClientRequest + Unpin) -> tungstenite028::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1051 let (sock, _) = tokio_tungstenite028::connect_async(request).await?;
1052 let (sink, stream) = sock.split();
1053 Ok((
1054 sink.sink_map_err(|e| WriteError {
1055 context: ErrorContext::WebSocketSink,
1056 kind: e.into(),
1057 }).with_flat_map::<W, _, _>(|msg| {
1058 let mut buf = Vec::default();
1059 match msg.write_sync(&mut buf) {
1060 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1061 Either::Left(stream::once(future::ready(tungstenite028::Message::binary(buf))))
1062 } else {
1063 Either::Right(stream::iter(
1064 iter::once(tungstenite028::Message::text(format!("m{}", buf.len())))
1065 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))))
1066 .collect::<Vec<_>>()
1067 ))
1068 }.map(Ok)),
1069 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1070 context: ErrorContext::WebSocket {
1071 source: Box::new(context),
1072 },
1073 kind,
1074 }))),
1075 }
1076 }),
1077 stream.scan(None, |state, res| {
1078 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite028::Result<tungstenite028::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1079 let packet = res.map_err(|e| ReadError {
1080 context: ErrorContext::WebSocketStream,
1081 kind: e.into(),
1082 })?;
1083 Ok(if let Some((len, buf)) = state {
1084 if let tungstenite028::Message::Binary(data) = packet {
1085 buf.extend_from_slice(&data);
1086 } else {
1087 return Err(ReadError {
1088 context: ErrorContext::DefaultImpl,
1089 kind: ReadErrorKind::MessageKind028(packet),
1090 })
1091 }
1092 if buf.len() >= *len {
1093 let buf = mem::take(buf);
1094 *state = None;
1095 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1096 context: ErrorContext::WebSocket {
1097 source: Box::new(context),
1098 },
1099 kind,
1100 })?)))
1101 } else {
1102 Either::Left(stream::empty())
1103 }
1104 } else {
1105 match packet {
1106 tungstenite028::Message::Text(data) => match data.chars().next() {
1107 Some('m') => {
1108 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1109 context: ErrorContext::DefaultImpl,
1110 kind: e.into(),
1111 })?;
1112 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1113 context: ErrorContext::DefaultImpl,
1114 kind: e.into(),
1115 })?;
1116 *state = Some((len, buf));
1117 Either::Left(stream::empty())
1118 }
1119 _ => return Err(ReadError {
1120 context: ErrorContext::DefaultImpl,
1121 kind: ReadErrorKind::WebSocketTextMessage028(data),
1122 }),
1123 },
1124 tungstenite028::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1125 context: ErrorContext::WebSocket {
1126 source: Box::new(context),
1127 },
1128 kind,
1129 })?))),
1130 _ => return Err(ReadError {
1131 context: ErrorContext::DefaultImpl,
1132 kind: ReadErrorKind::MessageKind028(packet),
1133 }),
1134 }
1135 })
1136 }
1137
1138 future::ready(Some(scanner(state, res)))
1139 }).try_flatten(),
1140 ))
1141}