1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
43 std::{
44 future::Future,
45 io::{
46 self,
47 prelude::*,
48 },
49 pin::Pin,
50 },
51 tokio::io::{
52 AsyncRead,
53 AsyncWrite,
54 },
55};
56#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] use {
57 std::{
58 iter,
59 mem,
60 },
61 fallible_collections::FallibleVec,
62 futures::{
63 Sink,
64 SinkExt as _,
65 future::{
66 self,
67 Either,
68 },
69 stream::{
70 self,
71 Stream,
72 StreamExt as _,
73 TryStreamExt as _,
74 },
75 },
76};
77#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
78#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
79#[cfg(feature = "tokio-tungstenite028")] use tokio_tungstenite028::tungstenite as tungstenite028;
80pub use {
81 async_proto_derive::{
82 Protocol,
83 bitflags,
84 },
85 crate::error::*,
86};
87#[doc(hidden)] pub use tokio; mod error;
90mod impls;
91
92#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite028"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
94
95pub trait Protocol: Sized {
97 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
103 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
109 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
111 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
113
114 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
118 Box::pin(async move {
119 let value = Self::read(&mut stream).await?;
120 Ok((stream, value))
121 })
122 }
123
124 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
167 let mut temp_buf = vec![0; 8];
168 loop {
169 let mut slice = &mut &**buf;
170 match Self::read_sync(&mut slice) {
171 Ok(value) => {
172 let value_len = slice.len();
173 buf.drain(..buf.len() - value_len);
174 return Ok(Some(value))
175 }
176 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
177 Err(e) => return Err(e),
178 }
179 match stream.read(&mut temp_buf) {
180 Ok(0) => return Err(ReadError {
181 context: ErrorContext::DefaultImpl,
182 kind: ReadErrorKind::EndOfStream,
183 }),
184 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
185 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
186 Err(e) => return Err(ReadError {
187 context: ErrorContext::DefaultImpl,
188 kind: e.into(),
189 }),
190 }
191 }
192 }
193
194 #[cfg(feature = "tokio-tungstenite021")]
195 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
196 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
202 Box::pin(async move {
203 let packet = stream.try_next().await.map_err(|e| ReadError {
204 context: ErrorContext::DefaultImpl,
205 kind: e.into(),
206 })?.ok_or_else(|| ReadError {
207 context: ErrorContext::DefaultImpl,
208 kind: ReadErrorKind::EndOfStream,
209 })?;
210 match packet {
211 tungstenite021::Message::Text(data) => match data.chars().next() {
212 Some('m') => {
213 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
214 context: ErrorContext::DefaultImpl,
215 kind: e.into(),
216 })?;
217 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
218 context: ErrorContext::DefaultImpl,
219 kind: e.into(),
220 })?;
221 while buf.len() < len {
222 let packet = stream.try_next().await.map_err(|e| ReadError {
223 context: ErrorContext::DefaultImpl,
224 kind: e.into(),
225 })?.ok_or_else(|| ReadError {
226 context: ErrorContext::DefaultImpl,
227 kind: ReadErrorKind::EndOfStream,
228 })?;
229 if let tungstenite021::Message::Binary(data) = packet {
230 buf.extend_from_slice(&data);
231 } else {
232 return Err(ReadError {
233 context: ErrorContext::DefaultImpl,
234 kind: ReadErrorKind::MessageKind021(packet),
235 })
236 }
237 }
238 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
239 context: ErrorContext::WebSocket {
240 source: Box::new(context),
241 },
242 kind,
243 })
244 }
245 _ => Err(ReadError {
246 context: ErrorContext::DefaultImpl,
247 kind: ReadErrorKind::WebSocketTextMessage024(data),
248 }),
249 },
250 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
251 context: ErrorContext::WebSocket {
252 source: Box::new(context),
253 },
254 kind,
255 }),
256 _ => Err(ReadError {
257 context: ErrorContext::DefaultImpl,
258 kind: ReadErrorKind::MessageKind021(packet),
259 }),
260 }
261 })
262 }
263
264 #[cfg(feature = "tokio-tungstenite024")]
265 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
266 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
272 Box::pin(async move {
273 let packet = stream.try_next().await.map_err(|e| ReadError {
274 context: ErrorContext::DefaultImpl,
275 kind: e.into(),
276 })?.ok_or_else(|| ReadError {
277 context: ErrorContext::DefaultImpl,
278 kind: ReadErrorKind::EndOfStream,
279 })?;
280 match packet {
281 tungstenite024::Message::Text(data) => match data.chars().next() {
282 Some('m') => {
283 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
284 context: ErrorContext::DefaultImpl,
285 kind: e.into(),
286 })?;
287 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
288 context: ErrorContext::DefaultImpl,
289 kind: e.into(),
290 })?;
291 while buf.len() < len {
292 let packet = stream.try_next().await.map_err(|e| ReadError {
293 context: ErrorContext::DefaultImpl,
294 kind: e.into(),
295 })?.ok_or_else(|| ReadError {
296 context: ErrorContext::DefaultImpl,
297 kind: ReadErrorKind::EndOfStream,
298 })?;
299 if let tungstenite024::Message::Binary(data) = packet {
300 buf.extend_from_slice(&data);
301 } else {
302 return Err(ReadError {
303 context: ErrorContext::DefaultImpl,
304 kind: ReadErrorKind::MessageKind024(packet),
305 })
306 }
307 }
308 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
309 context: ErrorContext::WebSocket {
310 source: Box::new(context),
311 },
312 kind,
313 })
314 }
315 _ => Err(ReadError {
316 context: ErrorContext::DefaultImpl,
317 kind: ReadErrorKind::WebSocketTextMessage024(data),
318 }),
319 },
320 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
321 context: ErrorContext::WebSocket {
322 source: Box::new(context),
323 },
324 kind,
325 }),
326 _ => Err(ReadError {
327 context: ErrorContext::DefaultImpl,
328 kind: ReadErrorKind::MessageKind024(packet),
329 }),
330 }
331 })
332 }
333
334 #[cfg(feature = "tokio-tungstenite028")]
335 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
336 fn read_ws028<'a, R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
342 Box::pin(async move {
343 let packet = stream.try_next().await.map_err(|e| ReadError {
344 context: ErrorContext::DefaultImpl,
345 kind: e.into(),
346 })?.ok_or_else(|| ReadError {
347 context: ErrorContext::DefaultImpl,
348 kind: ReadErrorKind::EndOfStream,
349 })?;
350 match packet {
351 tungstenite028::Message::Text(data) => match data.chars().next() {
352 Some('m') => {
353 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
354 context: ErrorContext::DefaultImpl,
355 kind: e.into(),
356 })?;
357 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
358 context: ErrorContext::DefaultImpl,
359 kind: e.into(),
360 })?;
361 while buf.len() < len {
362 let packet = stream.try_next().await.map_err(|e| ReadError {
363 context: ErrorContext::DefaultImpl,
364 kind: e.into(),
365 })?.ok_or_else(|| ReadError {
366 context: ErrorContext::DefaultImpl,
367 kind: ReadErrorKind::EndOfStream,
368 })?;
369 if let tungstenite028::Message::Binary(data) = packet {
370 buf.extend_from_slice(&data);
371 } else {
372 return Err(ReadError {
373 context: ErrorContext::DefaultImpl,
374 kind: ReadErrorKind::MessageKind028(packet),
375 })
376 }
377 }
378 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
379 context: ErrorContext::WebSocket {
380 source: Box::new(context),
381 },
382 kind,
383 })
384 }
385 _ => Err(ReadError {
386 context: ErrorContext::DefaultImpl,
387 kind: ReadErrorKind::WebSocketTextMessage028(data),
388 }),
389 },
390 tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
391 context: ErrorContext::WebSocket {
392 source: Box::new(context),
393 },
394 kind,
395 }),
396 _ => Err(ReadError {
397 context: ErrorContext::DefaultImpl,
398 kind: ReadErrorKind::MessageKind028(packet),
399 }),
400 }
401 })
402 }
403
404 #[cfg(feature = "tokio-tungstenite021")]
405 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
406 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
412 where Self: Sync {
413 Box::pin(async move {
414 let mut buf = Vec::default();
415 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
416 context: ErrorContext::WebSocket {
417 source: Box::new(context),
418 },
419 kind,
420 })?;
421 if buf.len() <= WS_MAX_MESSAGE_SIZE {
422 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
423 context: ErrorContext::DefaultImpl,
424 kind: e.into(),
425 })?;
426 } else {
427 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
428 context: ErrorContext::DefaultImpl,
429 kind: e.into(),
430 })?;
431 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
432 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
433 context: ErrorContext::DefaultImpl,
434 kind: e.into(),
435 })?;
436 }
437 }
438 Ok(())
439 })
440 }
441
442 #[cfg(feature = "tokio-tungstenite024")]
443 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
444 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
450 where Self: Sync {
451 Box::pin(async move {
452 let mut buf = Vec::default();
453 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
454 context: ErrorContext::WebSocket {
455 source: Box::new(context),
456 },
457 kind,
458 })?;
459 if buf.len() <= WS_MAX_MESSAGE_SIZE {
460 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
461 context: ErrorContext::DefaultImpl,
462 kind: e.into(),
463 })?;
464 } else {
465 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
466 context: ErrorContext::DefaultImpl,
467 kind: e.into(),
468 })?;
469 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
470 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
471 context: ErrorContext::DefaultImpl,
472 kind: e.into(),
473 })?;
474 }
475 }
476 Ok(())
477 })
478 }
479
480 #[cfg(feature = "tokio-tungstenite028")]
481 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
482 fn write_ws028<'a, W: Sink<tungstenite028::Message, Error = tungstenite028::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
488 where Self: Sync {
489 Box::pin(async move {
490 let mut buf = Vec::default();
491 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
492 context: ErrorContext::WebSocket {
493 source: Box::new(context),
494 },
495 kind,
496 })?;
497 if buf.len() <= WS_MAX_MESSAGE_SIZE {
498 sink.send(tungstenite028::Message::binary(buf)).await.map_err(|e| WriteError {
499 context: ErrorContext::DefaultImpl,
500 kind: e.into(),
501 })?;
502 } else {
503 sink.send(tungstenite028::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
504 context: ErrorContext::DefaultImpl,
505 kind: e.into(),
506 })?;
507 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
508 sink.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
509 context: ErrorContext::DefaultImpl,
510 kind: e.into(),
511 })?;
512 }
513 }
514 Ok(())
515 })
516 }
517
518 #[cfg(feature = "tokio-tungstenite021")]
519 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
520 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
522 let packet = websocket.read().map_err(|e| ReadError {
523 context: ErrorContext::DefaultImpl,
524 kind: e.into(),
525 })?;
526 match packet {
527 tungstenite021::Message::Text(data) => match data.chars().next() {
528 Some('m') => {
529 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
530 context: ErrorContext::DefaultImpl,
531 kind: e.into(),
532 })?;
533 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
534 context: ErrorContext::DefaultImpl,
535 kind: e.into(),
536 })?;
537 while buf.len() < len {
538 let packet = websocket.read().map_err(|e| ReadError {
539 context: ErrorContext::DefaultImpl,
540 kind: e.into(),
541 })?;
542 if let tungstenite021::Message::Binary(data) = packet {
543 buf.extend_from_slice(&data);
544 } else {
545 return Err(ReadError {
546 context: ErrorContext::DefaultImpl,
547 kind: ReadErrorKind::MessageKind021(packet),
548 })
549 }
550 }
551 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
552 context: ErrorContext::WebSocket {
553 source: Box::new(context),
554 },
555 kind,
556 })
557 }
558 _ => return Err(ReadError {
559 context: ErrorContext::DefaultImpl,
560 kind: ReadErrorKind::WebSocketTextMessage024(data),
561 }),
562 },
563 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
564 context: ErrorContext::WebSocket {
565 source: Box::new(context),
566 },
567 kind,
568 }),
569 _ => Err(ReadError {
570 context: ErrorContext::DefaultImpl,
571 kind: ReadErrorKind::MessageKind021(packet),
572 }),
573 }
574 }
575
576 #[cfg(feature = "tokio-tungstenite024")]
577 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
578 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
580 let packet = websocket.read().map_err(|e| ReadError {
581 context: ErrorContext::DefaultImpl,
582 kind: e.into(),
583 })?;
584 match packet {
585 tungstenite024::Message::Text(data) => match data.chars().next() {
586 Some('m') => {
587 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
588 context: ErrorContext::DefaultImpl,
589 kind: e.into(),
590 })?;
591 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
592 context: ErrorContext::DefaultImpl,
593 kind: e.into(),
594 })?;
595 while buf.len() < len {
596 let packet = websocket.read().map_err(|e| ReadError {
597 context: ErrorContext::DefaultImpl,
598 kind: e.into(),
599 })?;
600 if let tungstenite024::Message::Binary(data) = packet {
601 buf.extend_from_slice(&data);
602 } else {
603 return Err(ReadError {
604 context: ErrorContext::DefaultImpl,
605 kind: ReadErrorKind::MessageKind024(packet),
606 })
607 }
608 }
609 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
610 context: ErrorContext::WebSocket {
611 source: Box::new(context),
612 },
613 kind,
614 })
615 }
616 _ => return Err(ReadError {
617 context: ErrorContext::DefaultImpl,
618 kind: ReadErrorKind::WebSocketTextMessage024(data),
619 }),
620 },
621 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
622 context: ErrorContext::WebSocket {
623 source: Box::new(context),
624 },
625 kind,
626 }),
627 _ => Err(ReadError {
628 context: ErrorContext::DefaultImpl,
629 kind: ReadErrorKind::MessageKind024(packet),
630 }),
631 }
632 }
633
634 #[cfg(feature = "tokio-tungstenite028")]
635 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
636 fn read_ws_sync028(websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
638 let packet = websocket.read().map_err(|e| ReadError {
639 context: ErrorContext::DefaultImpl,
640 kind: e.into(),
641 })?;
642 match packet {
643 tungstenite028::Message::Text(data) => match data.chars().next() {
644 Some('m') => {
645 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
646 context: ErrorContext::DefaultImpl,
647 kind: e.into(),
648 })?;
649 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
650 context: ErrorContext::DefaultImpl,
651 kind: e.into(),
652 })?;
653 while buf.len() < len {
654 let packet = websocket.read().map_err(|e| ReadError {
655 context: ErrorContext::DefaultImpl,
656 kind: e.into(),
657 })?;
658 if let tungstenite028::Message::Binary(data) = packet {
659 buf.extend_from_slice(&data);
660 } else {
661 return Err(ReadError {
662 context: ErrorContext::DefaultImpl,
663 kind: ReadErrorKind::MessageKind028(packet),
664 })
665 }
666 }
667 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
668 context: ErrorContext::WebSocket {
669 source: Box::new(context),
670 },
671 kind,
672 })
673 }
674 _ => return Err(ReadError {
675 context: ErrorContext::DefaultImpl,
676 kind: ReadErrorKind::WebSocketTextMessage028(data),
677 }),
678 },
679 tungstenite028::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
680 context: ErrorContext::WebSocket {
681 source: Box::new(context),
682 },
683 kind,
684 }),
685 _ => Err(ReadError {
686 context: ErrorContext::DefaultImpl,
687 kind: ReadErrorKind::MessageKind028(packet),
688 }),
689 }
690 }
691
692 #[cfg(feature = "tokio-tungstenite021")]
693 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
694 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
696 let mut buf = Vec::default();
697 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
698 context: ErrorContext::WebSocket {
699 source: Box::new(context),
700 },
701 kind,
702 })?;
703 if buf.len() <= WS_MAX_MESSAGE_SIZE {
704 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
705 context: ErrorContext::DefaultImpl,
706 kind: e.into(),
707 })?;
708 } else {
709 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
710 context: ErrorContext::DefaultImpl,
711 kind: e.into(),
712 })?;
713 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
714 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
715 context: ErrorContext::DefaultImpl,
716 kind: e.into(),
717 })?;
718 }
719 }
720 websocket.flush().map_err(|e| WriteError {
721 context: ErrorContext::DefaultImpl,
722 kind: e.into(),
723 })?;
724 Ok(())
725 }
726
727 #[cfg(feature = "tokio-tungstenite024")]
728 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
729 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
731 let mut buf = Vec::default();
732 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
733 context: ErrorContext::WebSocket {
734 source: Box::new(context),
735 },
736 kind,
737 })?;
738 if buf.len() <= WS_MAX_MESSAGE_SIZE {
739 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
740 context: ErrorContext::DefaultImpl,
741 kind: e.into(),
742 })?;
743 } else {
744 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
745 context: ErrorContext::DefaultImpl,
746 kind: e.into(),
747 })?;
748 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
749 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
750 context: ErrorContext::DefaultImpl,
751 kind: e.into(),
752 })?;
753 }
754 }
755 websocket.flush().map_err(|e| WriteError {
756 context: ErrorContext::DefaultImpl,
757 kind: e.into(),
758 })?;
759 Ok(())
760 }
761
762 #[cfg(feature = "tokio-tungstenite028")]
763 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
764 fn write_ws_sync028(&self, websocket: &mut tungstenite028::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
766 let mut buf = Vec::default();
767 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
768 context: ErrorContext::WebSocket {
769 source: Box::new(context),
770 },
771 kind,
772 })?;
773 if buf.len() <= WS_MAX_MESSAGE_SIZE {
774 websocket.send(tungstenite028::Message::binary(buf)).map_err(|e| WriteError {
775 context: ErrorContext::DefaultImpl,
776 kind: e.into(),
777 })?;
778 } else {
779 websocket.send(tungstenite028::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
780 context: ErrorContext::DefaultImpl,
781 kind: e.into(),
782 })?;
783 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
784 websocket.send(tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
785 context: ErrorContext::DefaultImpl,
786 kind: e.into(),
787 })?;
788 }
789 }
790 websocket.flush().map_err(|e| WriteError {
791 context: ErrorContext::DefaultImpl,
792 kind: e.into(),
793 })?;
794 Ok(())
795 }
796
797 #[cfg(feature = "tokio-tungstenite021")]
798 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
799 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
803 Box::pin(async move {
804 let value = Self::read_ws021(&mut stream).await?;
805 Ok((stream, value))
806 })
807 }
808
809 #[cfg(feature = "tokio-tungstenite024")]
810 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
811 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
815 Box::pin(async move {
816 let value = Self::read_ws024(&mut stream).await?;
817 Ok((stream, value))
818 })
819 }
820
821 #[cfg(feature = "tokio-tungstenite028")]
822 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
823 fn read_ws_owned028<R: Stream<Item = Result<tungstenite028::Message, tungstenite028::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
827 Box::pin(async move {
828 let value = Self::read_ws028(&mut stream).await?;
829 Ok((stream, value))
830 })
831 }
832}
833
834pub trait LengthPrefixed: Protocol {
838 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
840 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
842 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
844 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
846}
847
848#[cfg(feature = "tokio-tungstenite021")]
852#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
853pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
854 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
855 let (sink, stream) = sock.split();
856 Ok((
857 sink.sink_map_err(|e| WriteError {
858 context: ErrorContext::WebSocketSink,
859 kind: e.into(),
860 }).with_flat_map::<W, _, _>(|msg| {
861 let mut buf = Vec::default();
862 match msg.write_sync(&mut buf) {
863 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
864 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
865 } else {
866 Either::Right(stream::iter(
867 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
868 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
869 .collect::<Vec<_>>()
870 ))
871 }.map(Ok)),
872 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
873 context: ErrorContext::WebSocket {
874 source: Box::new(context),
875 },
876 kind,
877 }))),
878 }
879 }),
880 stream.scan(None, |state, res| {
881 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
882 let packet = res.map_err(|e| ReadError {
883 context: ErrorContext::WebSocketStream,
884 kind: e.into(),
885 })?;
886 Ok(if let Some((len, buf)) = state {
887 if let tungstenite021::Message::Binary(data) = packet {
888 buf.extend_from_slice(&data);
889 } else {
890 return Err(ReadError {
891 context: ErrorContext::DefaultImpl,
892 kind: ReadErrorKind::MessageKind021(packet),
893 })
894 }
895 if buf.len() >= *len {
896 let buf = mem::take(buf);
897 *state = None;
898 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
899 context: ErrorContext::WebSocket {
900 source: Box::new(context),
901 },
902 kind,
903 })?)))
904 } else {
905 Either::Left(stream::empty())
906 }
907 } else {
908 match packet {
909 tungstenite021::Message::Text(data) => match data.chars().next() {
910 Some('m') => {
911 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
912 context: ErrorContext::DefaultImpl,
913 kind: e.into(),
914 })?;
915 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
916 context: ErrorContext::DefaultImpl,
917 kind: e.into(),
918 })?;
919 *state = Some((len, buf));
920 Either::Left(stream::empty())
921 }
922 _ => return Err(ReadError {
923 context: ErrorContext::DefaultImpl,
924 kind: ReadErrorKind::WebSocketTextMessage024(data),
925 }),
926 },
927 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
928 context: ErrorContext::WebSocket {
929 source: Box::new(context),
930 },
931 kind,
932 })?))),
933 _ => return Err(ReadError {
934 context: ErrorContext::DefaultImpl,
935 kind: ReadErrorKind::MessageKind021(packet),
936 }),
937 }
938 })
939 }
940
941 future::ready(Some(scanner(state, res)))
942 }).try_flatten(),
943 ))
944}
945
946#[cfg(feature = "tokio-tungstenite024")]
950#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
951pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
952 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
953 let (sink, stream) = sock.split();
954 Ok((
955 sink.sink_map_err(|e| WriteError {
956 context: ErrorContext::WebSocketSink,
957 kind: e.into(),
958 }).with_flat_map::<W, _, _>(|msg| {
959 let mut buf = Vec::default();
960 match msg.write_sync(&mut buf) {
961 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
962 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
963 } else {
964 Either::Right(stream::iter(
965 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
966 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
967 .collect::<Vec<_>>()
968 ))
969 }.map(Ok)),
970 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
971 context: ErrorContext::WebSocket {
972 source: Box::new(context),
973 },
974 kind,
975 }))),
976 }
977 }),
978 stream.scan(None, |state, res| {
979 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
980 let packet = res.map_err(|e| ReadError {
981 context: ErrorContext::WebSocketStream,
982 kind: e.into(),
983 })?;
984 Ok(if let Some((len, buf)) = state {
985 if let tungstenite024::Message::Binary(data) = packet {
986 buf.extend_from_slice(&data);
987 } else {
988 return Err(ReadError {
989 context: ErrorContext::DefaultImpl,
990 kind: ReadErrorKind::MessageKind024(packet),
991 })
992 }
993 if buf.len() >= *len {
994 let buf = mem::take(buf);
995 *state = None;
996 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
997 context: ErrorContext::WebSocket {
998 source: Box::new(context),
999 },
1000 kind,
1001 })?)))
1002 } else {
1003 Either::Left(stream::empty())
1004 }
1005 } else {
1006 match packet {
1007 tungstenite024::Message::Text(data) => match data.chars().next() {
1008 Some('m') => {
1009 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1010 context: ErrorContext::DefaultImpl,
1011 kind: e.into(),
1012 })?;
1013 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1014 context: ErrorContext::DefaultImpl,
1015 kind: e.into(),
1016 })?;
1017 *state = Some((len, buf));
1018 Either::Left(stream::empty())
1019 }
1020 _ => return Err(ReadError {
1021 context: ErrorContext::DefaultImpl,
1022 kind: ReadErrorKind::WebSocketTextMessage024(data),
1023 }),
1024 },
1025 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1026 context: ErrorContext::WebSocket {
1027 source: Box::new(context),
1028 },
1029 kind,
1030 })?))),
1031 _ => return Err(ReadError {
1032 context: ErrorContext::DefaultImpl,
1033 kind: ReadErrorKind::MessageKind024(packet),
1034 }),
1035 }
1036 })
1037 }
1038
1039 future::ready(Some(scanner(state, res)))
1040 }).try_flatten(),
1041 ))
1042}
1043
1044#[cfg(feature = "tokio-tungstenite028")]
1048#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite028")))]
1049pub async fn websocket028<R: Protocol, W: Protocol>(request: impl tungstenite028::client::IntoClientRequest + Unpin) -> tungstenite028::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1050 let (sock, _) = tokio_tungstenite028::connect_async(request).await?;
1051 let (sink, stream) = sock.split();
1052 Ok((
1053 sink.sink_map_err(|e| WriteError {
1054 context: ErrorContext::WebSocketSink,
1055 kind: e.into(),
1056 }).with_flat_map::<W, _, _>(|msg| {
1057 let mut buf = Vec::default();
1058 match msg.write_sync(&mut buf) {
1059 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1060 Either::Left(stream::once(future::ready(tungstenite028::Message::binary(buf))))
1061 } else {
1062 Either::Right(stream::iter(
1063 iter::once(tungstenite028::Message::text(format!("m{}", buf.len())))
1064 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite028::Message::binary(tungstenite028::Bytes::copy_from_slice(chunk))))
1065 .collect::<Vec<_>>()
1066 ))
1067 }.map(Ok)),
1068 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1069 context: ErrorContext::WebSocket {
1070 source: Box::new(context),
1071 },
1072 kind,
1073 }))),
1074 }
1075 }),
1076 stream.scan(None, |state, res| {
1077 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite028::Result<tungstenite028::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1078 let packet = res.map_err(|e| ReadError {
1079 context: ErrorContext::WebSocketStream,
1080 kind: e.into(),
1081 })?;
1082 Ok(if let Some((len, buf)) = state {
1083 if let tungstenite028::Message::Binary(data) = packet {
1084 buf.extend_from_slice(&data);
1085 } else {
1086 return Err(ReadError {
1087 context: ErrorContext::DefaultImpl,
1088 kind: ReadErrorKind::MessageKind028(packet),
1089 })
1090 }
1091 if buf.len() >= *len {
1092 let buf = mem::take(buf);
1093 *state = None;
1094 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1095 context: ErrorContext::WebSocket {
1096 source: Box::new(context),
1097 },
1098 kind,
1099 })?)))
1100 } else {
1101 Either::Left(stream::empty())
1102 }
1103 } else {
1104 match packet {
1105 tungstenite028::Message::Text(data) => match data.chars().next() {
1106 Some('m') => {
1107 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1108 context: ErrorContext::DefaultImpl,
1109 kind: e.into(),
1110 })?;
1111 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1112 context: ErrorContext::DefaultImpl,
1113 kind: e.into(),
1114 })?;
1115 *state = Some((len, buf));
1116 Either::Left(stream::empty())
1117 }
1118 _ => return Err(ReadError {
1119 context: ErrorContext::DefaultImpl,
1120 kind: ReadErrorKind::WebSocketTextMessage028(data),
1121 }),
1122 },
1123 tungstenite028::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1124 context: ErrorContext::WebSocket {
1125 source: Box::new(context),
1126 },
1127 kind,
1128 })?))),
1129 _ => return Err(ReadError {
1130 context: ErrorContext::DefaultImpl,
1131 kind: ReadErrorKind::MessageKind028(packet),
1132 }),
1133 }
1134 })
1135 }
1136
1137 future::ready(Some(scanner(state, res)))
1138 }).try_flatten(),
1139 ))
1140}