1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
39 std::{
40 future::Future,
41 io::{
42 self,
43 prelude::*,
44 },
45 pin::Pin,
46 },
47 tokio::io::{
48 AsyncRead,
49 AsyncWrite,
50 },
51};
52#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
53 std::{
54 iter,
55 mem,
56 },
57 fallible_collections::FallibleVec,
58 futures::{
59 Sink,
60 SinkExt as _,
61 future::{
62 self,
63 Either,
64 },
65 stream::{
66 self,
67 Stream,
68 StreamExt as _,
69 TryStreamExt as _,
70 },
71 },
72};
73#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
74#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
75#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
76pub use {
77 async_proto_derive::{
78 Protocol,
79 bitflags,
80 },
81 crate::error::*,
82};
83#[doc(hidden)] pub use tokio; mod error;
86mod impls;
87
88#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
90
91pub trait Protocol: Sized {
93 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
99 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
105 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
107 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
109
110 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
114 Box::pin(async move {
115 let value = Self::read(&mut stream).await?;
116 Ok((stream, value))
117 })
118 }
119
120 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
163 let mut temp_buf = vec![0; 8];
164 loop {
165 let mut slice = &mut &**buf;
166 match Self::read_sync(&mut slice) {
167 Ok(value) => {
168 let value_len = slice.len();
169 buf.drain(..buf.len() - value_len);
170 return Ok(Some(value))
171 }
172 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
173 Err(e) => return Err(e),
174 }
175 match stream.read(&mut temp_buf) {
176 Ok(0) => return Err(ReadError {
177 context: ErrorContext::DefaultImpl,
178 kind: ReadErrorKind::EndOfStream,
179 }),
180 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
181 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
182 Err(e) => return Err(ReadError {
183 context: ErrorContext::DefaultImpl,
184 kind: e.into(),
185 }),
186 }
187 }
188 }
189
190 #[cfg(feature = "tokio-tungstenite021")]
191 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
192 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
198 Box::pin(async move {
199 let packet = stream.try_next().await.map_err(|e| ReadError {
200 context: ErrorContext::DefaultImpl,
201 kind: e.into(),
202 })?.ok_or_else(|| ReadError {
203 context: ErrorContext::DefaultImpl,
204 kind: ReadErrorKind::EndOfStream,
205 })?;
206 match packet {
207 tungstenite021::Message::Text(data) => match data.chars().next() {
208 Some('m') => {
209 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
210 context: ErrorContext::DefaultImpl,
211 kind: e.into(),
212 })?;
213 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
214 context: ErrorContext::DefaultImpl,
215 kind: e.into(),
216 })?;
217 while buf.len() < len {
218 let packet = stream.try_next().await.map_err(|e| ReadError {
219 context: ErrorContext::DefaultImpl,
220 kind: e.into(),
221 })?.ok_or_else(|| ReadError {
222 context: ErrorContext::DefaultImpl,
223 kind: ReadErrorKind::EndOfStream,
224 })?;
225 if let tungstenite021::Message::Binary(data) = packet {
226 buf.extend_from_slice(&data);
227 } else {
228 return Err(ReadError {
229 context: ErrorContext::DefaultImpl,
230 kind: ReadErrorKind::MessageKind021(packet),
231 })
232 }
233 }
234 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
235 context: ErrorContext::WebSocket {
236 source: Box::new(context),
237 },
238 kind,
239 })
240 }
241 _ => Err(ReadError {
242 context: ErrorContext::DefaultImpl,
243 kind: ReadErrorKind::WebSocketTextMessage024(data),
244 }),
245 },
246 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
247 context: ErrorContext::WebSocket {
248 source: Box::new(context),
249 },
250 kind,
251 }),
252 _ => Err(ReadError {
253 context: ErrorContext::DefaultImpl,
254 kind: ReadErrorKind::MessageKind021(packet),
255 }),
256 }
257 })
258 }
259
260 #[cfg(feature = "tokio-tungstenite024")]
261 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
262 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
268 Box::pin(async move {
269 let packet = stream.try_next().await.map_err(|e| ReadError {
270 context: ErrorContext::DefaultImpl,
271 kind: e.into(),
272 })?.ok_or_else(|| ReadError {
273 context: ErrorContext::DefaultImpl,
274 kind: ReadErrorKind::EndOfStream,
275 })?;
276 match packet {
277 tungstenite024::Message::Text(data) => match data.chars().next() {
278 Some('m') => {
279 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
280 context: ErrorContext::DefaultImpl,
281 kind: e.into(),
282 })?;
283 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
284 context: ErrorContext::DefaultImpl,
285 kind: e.into(),
286 })?;
287 while buf.len() < len {
288 let packet = stream.try_next().await.map_err(|e| ReadError {
289 context: ErrorContext::DefaultImpl,
290 kind: e.into(),
291 })?.ok_or_else(|| ReadError {
292 context: ErrorContext::DefaultImpl,
293 kind: ReadErrorKind::EndOfStream,
294 })?;
295 if let tungstenite024::Message::Binary(data) = packet {
296 buf.extend_from_slice(&data);
297 } else {
298 return Err(ReadError {
299 context: ErrorContext::DefaultImpl,
300 kind: ReadErrorKind::MessageKind024(packet),
301 })
302 }
303 }
304 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
305 context: ErrorContext::WebSocket {
306 source: Box::new(context),
307 },
308 kind,
309 })
310 }
311 _ => Err(ReadError {
312 context: ErrorContext::DefaultImpl,
313 kind: ReadErrorKind::WebSocketTextMessage024(data),
314 }),
315 },
316 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
317 context: ErrorContext::WebSocket {
318 source: Box::new(context),
319 },
320 kind,
321 }),
322 _ => Err(ReadError {
323 context: ErrorContext::DefaultImpl,
324 kind: ReadErrorKind::MessageKind024(packet),
325 }),
326 }
327 })
328 }
329
330 #[cfg(feature = "tokio-tungstenite027")]
331 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
332 fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
338 Box::pin(async move {
339 let packet = stream.try_next().await.map_err(|e| ReadError {
340 context: ErrorContext::DefaultImpl,
341 kind: e.into(),
342 })?.ok_or_else(|| ReadError {
343 context: ErrorContext::DefaultImpl,
344 kind: ReadErrorKind::EndOfStream,
345 })?;
346 match packet {
347 tungstenite027::Message::Text(data) => match data.chars().next() {
348 Some('m') => {
349 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
350 context: ErrorContext::DefaultImpl,
351 kind: e.into(),
352 })?;
353 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
354 context: ErrorContext::DefaultImpl,
355 kind: e.into(),
356 })?;
357 while buf.len() < len {
358 let packet = stream.try_next().await.map_err(|e| ReadError {
359 context: ErrorContext::DefaultImpl,
360 kind: e.into(),
361 })?.ok_or_else(|| ReadError {
362 context: ErrorContext::DefaultImpl,
363 kind: ReadErrorKind::EndOfStream,
364 })?;
365 if let tungstenite027::Message::Binary(data) = packet {
366 buf.extend_from_slice(&data);
367 } else {
368 return Err(ReadError {
369 context: ErrorContext::DefaultImpl,
370 kind: ReadErrorKind::MessageKind027(packet),
371 })
372 }
373 }
374 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
375 context: ErrorContext::WebSocket {
376 source: Box::new(context),
377 },
378 kind,
379 })
380 }
381 _ => Err(ReadError {
382 context: ErrorContext::DefaultImpl,
383 kind: ReadErrorKind::WebSocketTextMessage027(data),
384 }),
385 },
386 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
387 context: ErrorContext::WebSocket {
388 source: Box::new(context),
389 },
390 kind,
391 }),
392 _ => Err(ReadError {
393 context: ErrorContext::DefaultImpl,
394 kind: ReadErrorKind::MessageKind027(packet),
395 }),
396 }
397 })
398 }
399
400 #[cfg(feature = "tokio-tungstenite021")]
401 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
402 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
408 where Self: Sync {
409 Box::pin(async move {
410 let mut buf = Vec::default();
411 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
412 context: ErrorContext::WebSocket {
413 source: Box::new(context),
414 },
415 kind,
416 })?;
417 if buf.len() <= WS_MAX_MESSAGE_SIZE {
418 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
419 context: ErrorContext::DefaultImpl,
420 kind: e.into(),
421 })?;
422 } else {
423 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
424 context: ErrorContext::DefaultImpl,
425 kind: e.into(),
426 })?;
427 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
428 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
429 context: ErrorContext::DefaultImpl,
430 kind: e.into(),
431 })?;
432 }
433 }
434 Ok(())
435 })
436 }
437
438 #[cfg(feature = "tokio-tungstenite024")]
439 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
440 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
446 where Self: Sync {
447 Box::pin(async move {
448 let mut buf = Vec::default();
449 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
450 context: ErrorContext::WebSocket {
451 source: Box::new(context),
452 },
453 kind,
454 })?;
455 if buf.len() <= WS_MAX_MESSAGE_SIZE {
456 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
457 context: ErrorContext::DefaultImpl,
458 kind: e.into(),
459 })?;
460 } else {
461 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
462 context: ErrorContext::DefaultImpl,
463 kind: e.into(),
464 })?;
465 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
466 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
467 context: ErrorContext::DefaultImpl,
468 kind: e.into(),
469 })?;
470 }
471 }
472 Ok(())
473 })
474 }
475
476 #[cfg(feature = "tokio-tungstenite027")]
477 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
478 fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
484 where Self: Sync {
485 Box::pin(async move {
486 let mut buf = Vec::default();
487 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
488 context: ErrorContext::WebSocket {
489 source: Box::new(context),
490 },
491 kind,
492 })?;
493 if buf.len() <= WS_MAX_MESSAGE_SIZE {
494 sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
495 context: ErrorContext::DefaultImpl,
496 kind: e.into(),
497 })?;
498 } else {
499 sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
500 context: ErrorContext::DefaultImpl,
501 kind: e.into(),
502 })?;
503 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
504 sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
505 context: ErrorContext::DefaultImpl,
506 kind: e.into(),
507 })?;
508 }
509 }
510 Ok(())
511 })
512 }
513
514 #[cfg(feature = "tokio-tungstenite021")]
515 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
516 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
518 let packet = websocket.read().map_err(|e| ReadError {
519 context: ErrorContext::DefaultImpl,
520 kind: e.into(),
521 })?;
522 match packet {
523 tungstenite021::Message::Text(data) => match data.chars().next() {
524 Some('m') => {
525 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
526 context: ErrorContext::DefaultImpl,
527 kind: e.into(),
528 })?;
529 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
530 context: ErrorContext::DefaultImpl,
531 kind: e.into(),
532 })?;
533 while buf.len() < len {
534 let packet = websocket.read().map_err(|e| ReadError {
535 context: ErrorContext::DefaultImpl,
536 kind: e.into(),
537 })?;
538 if let tungstenite021::Message::Binary(data) = packet {
539 buf.extend_from_slice(&data);
540 } else {
541 return Err(ReadError {
542 context: ErrorContext::DefaultImpl,
543 kind: ReadErrorKind::MessageKind021(packet),
544 })
545 }
546 }
547 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
548 context: ErrorContext::WebSocket {
549 source: Box::new(context),
550 },
551 kind,
552 })
553 }
554 _ => return Err(ReadError {
555 context: ErrorContext::DefaultImpl,
556 kind: ReadErrorKind::WebSocketTextMessage024(data),
557 }),
558 },
559 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
560 context: ErrorContext::WebSocket {
561 source: Box::new(context),
562 },
563 kind,
564 }),
565 _ => Err(ReadError {
566 context: ErrorContext::DefaultImpl,
567 kind: ReadErrorKind::MessageKind021(packet),
568 }),
569 }
570 }
571
572 #[cfg(feature = "tokio-tungstenite024")]
573 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
574 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
576 let packet = websocket.read().map_err(|e| ReadError {
577 context: ErrorContext::DefaultImpl,
578 kind: e.into(),
579 })?;
580 match packet {
581 tungstenite024::Message::Text(data) => match data.chars().next() {
582 Some('m') => {
583 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
584 context: ErrorContext::DefaultImpl,
585 kind: e.into(),
586 })?;
587 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
588 context: ErrorContext::DefaultImpl,
589 kind: e.into(),
590 })?;
591 while buf.len() < len {
592 let packet = websocket.read().map_err(|e| ReadError {
593 context: ErrorContext::DefaultImpl,
594 kind: e.into(),
595 })?;
596 if let tungstenite024::Message::Binary(data) = packet {
597 buf.extend_from_slice(&data);
598 } else {
599 return Err(ReadError {
600 context: ErrorContext::DefaultImpl,
601 kind: ReadErrorKind::MessageKind024(packet),
602 })
603 }
604 }
605 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
606 context: ErrorContext::WebSocket {
607 source: Box::new(context),
608 },
609 kind,
610 })
611 }
612 _ => return Err(ReadError {
613 context: ErrorContext::DefaultImpl,
614 kind: ReadErrorKind::WebSocketTextMessage024(data),
615 }),
616 },
617 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
618 context: ErrorContext::WebSocket {
619 source: Box::new(context),
620 },
621 kind,
622 }),
623 _ => Err(ReadError {
624 context: ErrorContext::DefaultImpl,
625 kind: ReadErrorKind::MessageKind024(packet),
626 }),
627 }
628 }
629
630 #[cfg(feature = "tokio-tungstenite027")]
631 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
632 fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
634 let packet = websocket.read().map_err(|e| ReadError {
635 context: ErrorContext::DefaultImpl,
636 kind: e.into(),
637 })?;
638 match packet {
639 tungstenite027::Message::Text(data) => match data.chars().next() {
640 Some('m') => {
641 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
642 context: ErrorContext::DefaultImpl,
643 kind: e.into(),
644 })?;
645 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
646 context: ErrorContext::DefaultImpl,
647 kind: e.into(),
648 })?;
649 while buf.len() < len {
650 let packet = websocket.read().map_err(|e| ReadError {
651 context: ErrorContext::DefaultImpl,
652 kind: e.into(),
653 })?;
654 if let tungstenite027::Message::Binary(data) = packet {
655 buf.extend_from_slice(&data);
656 } else {
657 return Err(ReadError {
658 context: ErrorContext::DefaultImpl,
659 kind: ReadErrorKind::MessageKind027(packet),
660 })
661 }
662 }
663 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
664 context: ErrorContext::WebSocket {
665 source: Box::new(context),
666 },
667 kind,
668 })
669 }
670 _ => return Err(ReadError {
671 context: ErrorContext::DefaultImpl,
672 kind: ReadErrorKind::WebSocketTextMessage027(data),
673 }),
674 },
675 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
676 context: ErrorContext::WebSocket {
677 source: Box::new(context),
678 },
679 kind,
680 }),
681 _ => Err(ReadError {
682 context: ErrorContext::DefaultImpl,
683 kind: ReadErrorKind::MessageKind027(packet),
684 }),
685 }
686 }
687
688 #[cfg(feature = "tokio-tungstenite021")]
689 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
690 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
692 let mut buf = Vec::default();
693 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
694 context: ErrorContext::WebSocket {
695 source: Box::new(context),
696 },
697 kind,
698 })?;
699 if buf.len() <= WS_MAX_MESSAGE_SIZE {
700 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
701 context: ErrorContext::DefaultImpl,
702 kind: e.into(),
703 })?;
704 } else {
705 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
706 context: ErrorContext::DefaultImpl,
707 kind: e.into(),
708 })?;
709 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
710 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
711 context: ErrorContext::DefaultImpl,
712 kind: e.into(),
713 })?;
714 }
715 }
716 websocket.flush().map_err(|e| WriteError {
717 context: ErrorContext::DefaultImpl,
718 kind: e.into(),
719 })?;
720 Ok(())
721 }
722
723 #[cfg(feature = "tokio-tungstenite024")]
724 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
725 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
727 let mut buf = Vec::default();
728 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
729 context: ErrorContext::WebSocket {
730 source: Box::new(context),
731 },
732 kind,
733 })?;
734 if buf.len() <= WS_MAX_MESSAGE_SIZE {
735 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
736 context: ErrorContext::DefaultImpl,
737 kind: e.into(),
738 })?;
739 } else {
740 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
741 context: ErrorContext::DefaultImpl,
742 kind: e.into(),
743 })?;
744 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
745 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
746 context: ErrorContext::DefaultImpl,
747 kind: e.into(),
748 })?;
749 }
750 }
751 websocket.flush().map_err(|e| WriteError {
752 context: ErrorContext::DefaultImpl,
753 kind: e.into(),
754 })?;
755 Ok(())
756 }
757
758 #[cfg(feature = "tokio-tungstenite027")]
759 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
760 fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
762 let mut buf = Vec::default();
763 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
764 context: ErrorContext::WebSocket {
765 source: Box::new(context),
766 },
767 kind,
768 })?;
769 if buf.len() <= WS_MAX_MESSAGE_SIZE {
770 websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
771 context: ErrorContext::DefaultImpl,
772 kind: e.into(),
773 })?;
774 } else {
775 websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
776 context: ErrorContext::DefaultImpl,
777 kind: e.into(),
778 })?;
779 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
780 websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
781 context: ErrorContext::DefaultImpl,
782 kind: e.into(),
783 })?;
784 }
785 }
786 websocket.flush().map_err(|e| WriteError {
787 context: ErrorContext::DefaultImpl,
788 kind: e.into(),
789 })?;
790 Ok(())
791 }
792
793 #[cfg(feature = "tokio-tungstenite021")]
794 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
795 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
799 Box::pin(async move {
800 let value = Self::read_ws021(&mut stream).await?;
801 Ok((stream, value))
802 })
803 }
804
805 #[cfg(feature = "tokio-tungstenite024")]
806 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
807 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
811 Box::pin(async move {
812 let value = Self::read_ws024(&mut stream).await?;
813 Ok((stream, value))
814 })
815 }
816
817 #[cfg(feature = "tokio-tungstenite027")]
818 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
819 fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
823 Box::pin(async move {
824 let value = Self::read_ws027(&mut stream).await?;
825 Ok((stream, value))
826 })
827 }
828}
829
830#[cfg(feature = "tokio-tungstenite021")]
834#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
835pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
836 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
837 let (sink, stream) = sock.split();
838 Ok((
839 sink.sink_map_err(|e| WriteError {
840 context: ErrorContext::WebSocketSink,
841 kind: e.into(),
842 }).with_flat_map::<W, _, _>(|msg| {
843 let mut buf = Vec::default();
844 match msg.write_sync(&mut buf) {
845 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
846 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
847 } else {
848 Either::Right(stream::iter(
849 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
850 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
851 .collect::<Vec<_>>()
852 ))
853 }.map(Ok)),
854 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
855 context: ErrorContext::WebSocket {
856 source: Box::new(context),
857 },
858 kind,
859 }))),
860 }
861 }),
862 stream.scan(None, |state, res| {
863 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
864 let packet = res.map_err(|e| ReadError {
865 context: ErrorContext::WebSocketStream,
866 kind: e.into(),
867 })?;
868 Ok(if let Some((len, buf)) = state {
869 if let tungstenite021::Message::Binary(data) = packet {
870 buf.extend_from_slice(&data);
871 } else {
872 return Err(ReadError {
873 context: ErrorContext::DefaultImpl,
874 kind: ReadErrorKind::MessageKind021(packet),
875 })
876 }
877 if buf.len() >= *len {
878 let buf = mem::take(buf);
879 *state = None;
880 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
881 context: ErrorContext::WebSocket {
882 source: Box::new(context),
883 },
884 kind,
885 })?)))
886 } else {
887 Either::Left(stream::empty())
888 }
889 } else {
890 match packet {
891 tungstenite021::Message::Text(data) => match data.chars().next() {
892 Some('m') => {
893 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
894 context: ErrorContext::DefaultImpl,
895 kind: e.into(),
896 })?;
897 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
898 context: ErrorContext::DefaultImpl,
899 kind: e.into(),
900 })?;
901 *state = Some((len, buf));
902 Either::Left(stream::empty())
903 }
904 _ => return Err(ReadError {
905 context: ErrorContext::DefaultImpl,
906 kind: ReadErrorKind::WebSocketTextMessage024(data),
907 }),
908 },
909 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
910 context: ErrorContext::WebSocket {
911 source: Box::new(context),
912 },
913 kind,
914 })?))),
915 _ => return Err(ReadError {
916 context: ErrorContext::DefaultImpl,
917 kind: ReadErrorKind::MessageKind021(packet),
918 }),
919 }
920 })
921 }
922
923 future::ready(Some(scanner(state, res)))
924 }).try_flatten(),
925 ))
926}
927
928#[cfg(feature = "tokio-tungstenite024")]
932#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
933pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
934 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
935 let (sink, stream) = sock.split();
936 Ok((
937 sink.sink_map_err(|e| WriteError {
938 context: ErrorContext::WebSocketSink,
939 kind: e.into(),
940 }).with_flat_map::<W, _, _>(|msg| {
941 let mut buf = Vec::default();
942 match msg.write_sync(&mut buf) {
943 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
944 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
945 } else {
946 Either::Right(stream::iter(
947 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
948 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
949 .collect::<Vec<_>>()
950 ))
951 }.map(Ok)),
952 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
953 context: ErrorContext::WebSocket {
954 source: Box::new(context),
955 },
956 kind,
957 }))),
958 }
959 }),
960 stream.scan(None, |state, res| {
961 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
962 let packet = res.map_err(|e| ReadError {
963 context: ErrorContext::WebSocketStream,
964 kind: e.into(),
965 })?;
966 Ok(if let Some((len, buf)) = state {
967 if let tungstenite024::Message::Binary(data) = packet {
968 buf.extend_from_slice(&data);
969 } else {
970 return Err(ReadError {
971 context: ErrorContext::DefaultImpl,
972 kind: ReadErrorKind::MessageKind024(packet),
973 })
974 }
975 if buf.len() >= *len {
976 let buf = mem::take(buf);
977 *state = None;
978 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
979 context: ErrorContext::WebSocket {
980 source: Box::new(context),
981 },
982 kind,
983 })?)))
984 } else {
985 Either::Left(stream::empty())
986 }
987 } else {
988 match packet {
989 tungstenite024::Message::Text(data) => match data.chars().next() {
990 Some('m') => {
991 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
992 context: ErrorContext::DefaultImpl,
993 kind: e.into(),
994 })?;
995 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
996 context: ErrorContext::DefaultImpl,
997 kind: e.into(),
998 })?;
999 *state = Some((len, buf));
1000 Either::Left(stream::empty())
1001 }
1002 _ => return Err(ReadError {
1003 context: ErrorContext::DefaultImpl,
1004 kind: ReadErrorKind::WebSocketTextMessage024(data),
1005 }),
1006 },
1007 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1008 context: ErrorContext::WebSocket {
1009 source: Box::new(context),
1010 },
1011 kind,
1012 })?))),
1013 _ => return Err(ReadError {
1014 context: ErrorContext::DefaultImpl,
1015 kind: ReadErrorKind::MessageKind024(packet),
1016 }),
1017 }
1018 })
1019 }
1020
1021 future::ready(Some(scanner(state, res)))
1022 }).try_flatten(),
1023 ))
1024}
1025
1026#[cfg(feature = "tokio-tungstenite027")]
1030#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1031pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1032 let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1033 let (sink, stream) = sock.split();
1034 Ok((
1035 sink.sink_map_err(|e| WriteError {
1036 context: ErrorContext::WebSocketSink,
1037 kind: e.into(),
1038 }).with_flat_map::<W, _, _>(|msg| {
1039 let mut buf = Vec::default();
1040 match msg.write_sync(&mut buf) {
1041 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1042 Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1043 } else {
1044 Either::Right(stream::iter(
1045 iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1046 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1047 .collect::<Vec<_>>()
1048 ))
1049 }.map(Ok)),
1050 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1051 context: ErrorContext::WebSocket {
1052 source: Box::new(context),
1053 },
1054 kind,
1055 }))),
1056 }
1057 }),
1058 stream.scan(None, |state, res| {
1059 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1060 let packet = res.map_err(|e| ReadError {
1061 context: ErrorContext::WebSocketStream,
1062 kind: e.into(),
1063 })?;
1064 Ok(if let Some((len, buf)) = state {
1065 if let tungstenite027::Message::Binary(data) = packet {
1066 buf.extend_from_slice(&data);
1067 } else {
1068 return Err(ReadError {
1069 context: ErrorContext::DefaultImpl,
1070 kind: ReadErrorKind::MessageKind027(packet),
1071 })
1072 }
1073 if buf.len() >= *len {
1074 let buf = mem::take(buf);
1075 *state = None;
1076 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1077 context: ErrorContext::WebSocket {
1078 source: Box::new(context),
1079 },
1080 kind,
1081 })?)))
1082 } else {
1083 Either::Left(stream::empty())
1084 }
1085 } else {
1086 match packet {
1087 tungstenite027::Message::Text(data) => match data.chars().next() {
1088 Some('m') => {
1089 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1090 context: ErrorContext::DefaultImpl,
1091 kind: e.into(),
1092 })?;
1093 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1094 context: ErrorContext::DefaultImpl,
1095 kind: e.into(),
1096 })?;
1097 *state = Some((len, buf));
1098 Either::Left(stream::empty())
1099 }
1100 _ => return Err(ReadError {
1101 context: ErrorContext::DefaultImpl,
1102 kind: ReadErrorKind::WebSocketTextMessage027(data),
1103 }),
1104 },
1105 tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1106 context: ErrorContext::WebSocket {
1107 source: Box::new(context),
1108 },
1109 kind,
1110 })?))),
1111 _ => return Err(ReadError {
1112 context: ErrorContext::DefaultImpl,
1113 kind: ReadErrorKind::MessageKind027(packet),
1114 }),
1115 }
1116 })
1117 }
1118
1119 future::ready(Some(scanner(state, res)))
1120 }).try_flatten(),
1121 ))
1122}