1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
38 std::{
39 future::Future,
40 io::{
41 self,
42 prelude::*,
43 },
44 pin::Pin,
45 },
46 tokio::io::{
47 AsyncRead,
48 AsyncWrite,
49 },
50};
51#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite026"))] use {
52 std::{
53 iter,
54 mem,
55 },
56 fallible_collections::FallibleVec,
57 futures::{
58 Sink,
59 SinkExt as _,
60 future::{
61 self,
62 Either,
63 },
64 stream::{
65 self,
66 Stream,
67 StreamExt as _,
68 TryStreamExt as _,
69 },
70 },
71};
72#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
73#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
74#[cfg(feature = "tokio-tungstenite026")] use tokio_tungstenite026::tungstenite as tungstenite026;
75pub use {
76 async_proto_derive::{
77 Protocol,
78 bitflags,
79 },
80 crate::error::*,
81};
82#[doc(hidden)] pub use tokio; mod error;
85mod impls;
86
87#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite026"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
89
90pub trait Protocol: Sized {
92 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
98 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
104 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
106 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
108
109 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
113 Box::pin(async move {
114 let value = Self::read(&mut stream).await?;
115 Ok((stream, value))
116 })
117 }
118
119 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
162 let mut temp_buf = vec![0; 8];
163 loop {
164 let mut slice = &mut &**buf;
165 match Self::read_sync(&mut slice) {
166 Ok(value) => {
167 let value_len = slice.len();
168 buf.drain(..buf.len() - value_len);
169 return Ok(Some(value))
170 }
171 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
172 Err(e) => return Err(e),
173 }
174 match stream.read(&mut temp_buf) {
175 Ok(0) => return Err(ReadError {
176 context: ErrorContext::DefaultImpl,
177 kind: ReadErrorKind::EndOfStream,
178 }),
179 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
180 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
181 Err(e) => return Err(ReadError {
182 context: ErrorContext::DefaultImpl,
183 kind: e.into(),
184 }),
185 }
186 }
187 }
188
189 #[cfg(feature = "tokio-tungstenite021")]
190 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
191 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
197 Box::pin(async move {
198 let packet = stream.try_next().await.map_err(|e| ReadError {
199 context: ErrorContext::DefaultImpl,
200 kind: e.into(),
201 })?.ok_or_else(|| ReadError {
202 context: ErrorContext::DefaultImpl,
203 kind: ReadErrorKind::EndOfStream,
204 })?;
205 match packet {
206 tungstenite021::Message::Text(data) => match data.chars().next() {
207 Some('m') => {
208 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
209 context: ErrorContext::DefaultImpl,
210 kind: e.into(),
211 })?;
212 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
213 context: ErrorContext::DefaultImpl,
214 kind: e.into(),
215 })?;
216 while buf.len() < len {
217 let packet = stream.try_next().await.map_err(|e| ReadError {
218 context: ErrorContext::DefaultImpl,
219 kind: e.into(),
220 })?.ok_or_else(|| ReadError {
221 context: ErrorContext::DefaultImpl,
222 kind: ReadErrorKind::EndOfStream,
223 })?;
224 if let tungstenite021::Message::Binary(data) = packet {
225 buf.extend_from_slice(&data);
226 } else {
227 return Err(ReadError {
228 context: ErrorContext::DefaultImpl,
229 kind: ReadErrorKind::MessageKind021(packet),
230 })
231 }
232 }
233 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
234 context: ErrorContext::WebSocket {
235 source: Box::new(context),
236 },
237 kind,
238 })
239 }
240 _ => Err(ReadError {
241 context: ErrorContext::DefaultImpl,
242 kind: ReadErrorKind::WebSocketTextMessage024(data),
243 }),
244 },
245 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
246 context: ErrorContext::WebSocket {
247 source: Box::new(context),
248 },
249 kind,
250 }),
251 _ => Err(ReadError {
252 context: ErrorContext::DefaultImpl,
253 kind: ReadErrorKind::MessageKind021(packet),
254 }),
255 }
256 })
257 }
258
259 #[cfg(feature = "tokio-tungstenite024")]
260 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
261 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
267 Box::pin(async move {
268 let packet = stream.try_next().await.map_err(|e| ReadError {
269 context: ErrorContext::DefaultImpl,
270 kind: e.into(),
271 })?.ok_or_else(|| ReadError {
272 context: ErrorContext::DefaultImpl,
273 kind: ReadErrorKind::EndOfStream,
274 })?;
275 match packet {
276 tungstenite024::Message::Text(data) => match data.chars().next() {
277 Some('m') => {
278 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
279 context: ErrorContext::DefaultImpl,
280 kind: e.into(),
281 })?;
282 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
283 context: ErrorContext::DefaultImpl,
284 kind: e.into(),
285 })?;
286 while buf.len() < len {
287 let packet = stream.try_next().await.map_err(|e| ReadError {
288 context: ErrorContext::DefaultImpl,
289 kind: e.into(),
290 })?.ok_or_else(|| ReadError {
291 context: ErrorContext::DefaultImpl,
292 kind: ReadErrorKind::EndOfStream,
293 })?;
294 if let tungstenite024::Message::Binary(data) = packet {
295 buf.extend_from_slice(&data);
296 } else {
297 return Err(ReadError {
298 context: ErrorContext::DefaultImpl,
299 kind: ReadErrorKind::MessageKind024(packet),
300 })
301 }
302 }
303 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
304 context: ErrorContext::WebSocket {
305 source: Box::new(context),
306 },
307 kind,
308 })
309 }
310 _ => Err(ReadError {
311 context: ErrorContext::DefaultImpl,
312 kind: ReadErrorKind::WebSocketTextMessage024(data),
313 }),
314 },
315 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
316 context: ErrorContext::WebSocket {
317 source: Box::new(context),
318 },
319 kind,
320 }),
321 _ => Err(ReadError {
322 context: ErrorContext::DefaultImpl,
323 kind: ReadErrorKind::MessageKind024(packet),
324 }),
325 }
326 })
327 }
328
329 #[cfg(feature = "tokio-tungstenite026")]
330 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
331 fn read_ws026<'a, R: Stream<Item = Result<tungstenite026::Message, tungstenite026::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
337 Box::pin(async move {
338 let packet = stream.try_next().await.map_err(|e| ReadError {
339 context: ErrorContext::DefaultImpl,
340 kind: e.into(),
341 })?.ok_or_else(|| ReadError {
342 context: ErrorContext::DefaultImpl,
343 kind: ReadErrorKind::EndOfStream,
344 })?;
345 match packet {
346 tungstenite026::Message::Text(data) => match data.chars().next() {
347 Some('m') => {
348 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
349 context: ErrorContext::DefaultImpl,
350 kind: e.into(),
351 })?;
352 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
353 context: ErrorContext::DefaultImpl,
354 kind: e.into(),
355 })?;
356 while buf.len() < len {
357 let packet = stream.try_next().await.map_err(|e| ReadError {
358 context: ErrorContext::DefaultImpl,
359 kind: e.into(),
360 })?.ok_or_else(|| ReadError {
361 context: ErrorContext::DefaultImpl,
362 kind: ReadErrorKind::EndOfStream,
363 })?;
364 if let tungstenite026::Message::Binary(data) = packet {
365 buf.extend_from_slice(&data);
366 } else {
367 return Err(ReadError {
368 context: ErrorContext::DefaultImpl,
369 kind: ReadErrorKind::MessageKind026(packet),
370 })
371 }
372 }
373 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
374 context: ErrorContext::WebSocket {
375 source: Box::new(context),
376 },
377 kind,
378 })
379 }
380 _ => Err(ReadError {
381 context: ErrorContext::DefaultImpl,
382 kind: ReadErrorKind::WebSocketTextMessage026(data),
383 }),
384 },
385 tungstenite026::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
386 context: ErrorContext::WebSocket {
387 source: Box::new(context),
388 },
389 kind,
390 }),
391 _ => Err(ReadError {
392 context: ErrorContext::DefaultImpl,
393 kind: ReadErrorKind::MessageKind026(packet),
394 }),
395 }
396 })
397 }
398
399 #[cfg(feature = "tokio-tungstenite021")]
400 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
401 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
407 where Self: Sync {
408 Box::pin(async move {
409 let mut buf = Vec::default();
410 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
411 context: ErrorContext::WebSocket {
412 source: Box::new(context),
413 },
414 kind,
415 })?;
416 if buf.len() <= WS_MAX_MESSAGE_SIZE {
417 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
418 context: ErrorContext::DefaultImpl,
419 kind: e.into(),
420 })?;
421 } else {
422 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
423 context: ErrorContext::DefaultImpl,
424 kind: e.into(),
425 })?;
426 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
427 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
428 context: ErrorContext::DefaultImpl,
429 kind: e.into(),
430 })?;
431 }
432 }
433 Ok(())
434 })
435 }
436
437 #[cfg(feature = "tokio-tungstenite024")]
438 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
439 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
445 where Self: Sync {
446 Box::pin(async move {
447 let mut buf = Vec::default();
448 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
449 context: ErrorContext::WebSocket {
450 source: Box::new(context),
451 },
452 kind,
453 })?;
454 if buf.len() <= WS_MAX_MESSAGE_SIZE {
455 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
456 context: ErrorContext::DefaultImpl,
457 kind: e.into(),
458 })?;
459 } else {
460 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
461 context: ErrorContext::DefaultImpl,
462 kind: e.into(),
463 })?;
464 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
465 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
466 context: ErrorContext::DefaultImpl,
467 kind: e.into(),
468 })?;
469 }
470 }
471 Ok(())
472 })
473 }
474
475 #[cfg(feature = "tokio-tungstenite026")]
476 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
477 fn write_ws026<'a, W: Sink<tungstenite026::Message, Error = tungstenite026::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
483 where Self: Sync {
484 Box::pin(async move {
485 let mut buf = Vec::default();
486 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
487 context: ErrorContext::WebSocket {
488 source: Box::new(context),
489 },
490 kind,
491 })?;
492 if buf.len() <= WS_MAX_MESSAGE_SIZE {
493 sink.send(tungstenite026::Message::binary(buf)).await.map_err(|e| WriteError {
494 context: ErrorContext::DefaultImpl,
495 kind: e.into(),
496 })?;
497 } else {
498 sink.send(tungstenite026::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
499 context: ErrorContext::DefaultImpl,
500 kind: e.into(),
501 })?;
502 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
503 sink.send(tungstenite026::Message::binary(tungstenite026::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
504 context: ErrorContext::DefaultImpl,
505 kind: e.into(),
506 })?;
507 }
508 }
509 Ok(())
510 })
511 }
512
513 #[cfg(feature = "tokio-tungstenite021")]
514 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
515 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
517 let packet = websocket.read().map_err(|e| ReadError {
518 context: ErrorContext::DefaultImpl,
519 kind: e.into(),
520 })?;
521 match packet {
522 tungstenite021::Message::Text(data) => match data.chars().next() {
523 Some('m') => {
524 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
525 context: ErrorContext::DefaultImpl,
526 kind: e.into(),
527 })?;
528 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
529 context: ErrorContext::DefaultImpl,
530 kind: e.into(),
531 })?;
532 while buf.len() < len {
533 let packet = websocket.read().map_err(|e| ReadError {
534 context: ErrorContext::DefaultImpl,
535 kind: e.into(),
536 })?;
537 if let tungstenite021::Message::Binary(data) = packet {
538 buf.extend_from_slice(&data);
539 } else {
540 return Err(ReadError {
541 context: ErrorContext::DefaultImpl,
542 kind: ReadErrorKind::MessageKind021(packet),
543 })
544 }
545 }
546 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
547 context: ErrorContext::WebSocket {
548 source: Box::new(context),
549 },
550 kind,
551 })
552 }
553 _ => return Err(ReadError {
554 context: ErrorContext::DefaultImpl,
555 kind: ReadErrorKind::WebSocketTextMessage024(data),
556 }),
557 },
558 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
559 context: ErrorContext::WebSocket {
560 source: Box::new(context),
561 },
562 kind,
563 }),
564 _ => Err(ReadError {
565 context: ErrorContext::DefaultImpl,
566 kind: ReadErrorKind::MessageKind021(packet),
567 }),
568 }
569 }
570
571 #[cfg(feature = "tokio-tungstenite024")]
572 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
573 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
575 let packet = websocket.read().map_err(|e| ReadError {
576 context: ErrorContext::DefaultImpl,
577 kind: e.into(),
578 })?;
579 match packet {
580 tungstenite024::Message::Text(data) => match data.chars().next() {
581 Some('m') => {
582 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
583 context: ErrorContext::DefaultImpl,
584 kind: e.into(),
585 })?;
586 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
587 context: ErrorContext::DefaultImpl,
588 kind: e.into(),
589 })?;
590 while buf.len() < len {
591 let packet = websocket.read().map_err(|e| ReadError {
592 context: ErrorContext::DefaultImpl,
593 kind: e.into(),
594 })?;
595 if let tungstenite024::Message::Binary(data) = packet {
596 buf.extend_from_slice(&data);
597 } else {
598 return Err(ReadError {
599 context: ErrorContext::DefaultImpl,
600 kind: ReadErrorKind::MessageKind024(packet),
601 })
602 }
603 }
604 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
605 context: ErrorContext::WebSocket {
606 source: Box::new(context),
607 },
608 kind,
609 })
610 }
611 _ => return Err(ReadError {
612 context: ErrorContext::DefaultImpl,
613 kind: ReadErrorKind::WebSocketTextMessage024(data),
614 }),
615 },
616 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
617 context: ErrorContext::WebSocket {
618 source: Box::new(context),
619 },
620 kind,
621 }),
622 _ => Err(ReadError {
623 context: ErrorContext::DefaultImpl,
624 kind: ReadErrorKind::MessageKind024(packet),
625 }),
626 }
627 }
628
629 #[cfg(feature = "tokio-tungstenite026")]
630 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
631 fn read_ws_sync026(websocket: &mut tungstenite026::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
633 let packet = websocket.read().map_err(|e| ReadError {
634 context: ErrorContext::DefaultImpl,
635 kind: e.into(),
636 })?;
637 match packet {
638 tungstenite026::Message::Text(data) => match data.chars().next() {
639 Some('m') => {
640 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
641 context: ErrorContext::DefaultImpl,
642 kind: e.into(),
643 })?;
644 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
645 context: ErrorContext::DefaultImpl,
646 kind: e.into(),
647 })?;
648 while buf.len() < len {
649 let packet = websocket.read().map_err(|e| ReadError {
650 context: ErrorContext::DefaultImpl,
651 kind: e.into(),
652 })?;
653 if let tungstenite026::Message::Binary(data) = packet {
654 buf.extend_from_slice(&data);
655 } else {
656 return Err(ReadError {
657 context: ErrorContext::DefaultImpl,
658 kind: ReadErrorKind::MessageKind026(packet),
659 })
660 }
661 }
662 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
663 context: ErrorContext::WebSocket {
664 source: Box::new(context),
665 },
666 kind,
667 })
668 }
669 _ => return Err(ReadError {
670 context: ErrorContext::DefaultImpl,
671 kind: ReadErrorKind::WebSocketTextMessage026(data),
672 }),
673 },
674 tungstenite026::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
675 context: ErrorContext::WebSocket {
676 source: Box::new(context),
677 },
678 kind,
679 }),
680 _ => Err(ReadError {
681 context: ErrorContext::DefaultImpl,
682 kind: ReadErrorKind::MessageKind026(packet),
683 }),
684 }
685 }
686
687 #[cfg(feature = "tokio-tungstenite021")]
688 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
689 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
691 let mut buf = Vec::default();
692 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
693 context: ErrorContext::WebSocket {
694 source: Box::new(context),
695 },
696 kind,
697 })?;
698 if buf.len() <= WS_MAX_MESSAGE_SIZE {
699 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
700 context: ErrorContext::DefaultImpl,
701 kind: e.into(),
702 })?;
703 } else {
704 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
705 context: ErrorContext::DefaultImpl,
706 kind: e.into(),
707 })?;
708 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
709 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
710 context: ErrorContext::DefaultImpl,
711 kind: e.into(),
712 })?;
713 }
714 }
715 websocket.flush().map_err(|e| WriteError {
716 context: ErrorContext::DefaultImpl,
717 kind: e.into(),
718 })?;
719 Ok(())
720 }
721
722 #[cfg(feature = "tokio-tungstenite024")]
723 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
724 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
726 let mut buf = Vec::default();
727 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
728 context: ErrorContext::WebSocket {
729 source: Box::new(context),
730 },
731 kind,
732 })?;
733 if buf.len() <= WS_MAX_MESSAGE_SIZE {
734 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
735 context: ErrorContext::DefaultImpl,
736 kind: e.into(),
737 })?;
738 } else {
739 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
740 context: ErrorContext::DefaultImpl,
741 kind: e.into(),
742 })?;
743 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
744 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
745 context: ErrorContext::DefaultImpl,
746 kind: e.into(),
747 })?;
748 }
749 }
750 websocket.flush().map_err(|e| WriteError {
751 context: ErrorContext::DefaultImpl,
752 kind: e.into(),
753 })?;
754 Ok(())
755 }
756
757 #[cfg(feature = "tokio-tungstenite026")]
758 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
759 fn write_ws_sync026(&self, websocket: &mut tungstenite026::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
761 let mut buf = Vec::default();
762 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
763 context: ErrorContext::WebSocket {
764 source: Box::new(context),
765 },
766 kind,
767 })?;
768 if buf.len() <= WS_MAX_MESSAGE_SIZE {
769 websocket.send(tungstenite026::Message::binary(buf)).map_err(|e| WriteError {
770 context: ErrorContext::DefaultImpl,
771 kind: e.into(),
772 })?;
773 } else {
774 websocket.send(tungstenite026::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
775 context: ErrorContext::DefaultImpl,
776 kind: e.into(),
777 })?;
778 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
779 websocket.send(tungstenite026::Message::binary(tungstenite026::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
780 context: ErrorContext::DefaultImpl,
781 kind: e.into(),
782 })?;
783 }
784 }
785 websocket.flush().map_err(|e| WriteError {
786 context: ErrorContext::DefaultImpl,
787 kind: e.into(),
788 })?;
789 Ok(())
790 }
791
792 #[cfg(feature = "tokio-tungstenite021")]
793 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
794 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
798 Box::pin(async move {
799 let value = Self::read_ws021(&mut stream).await?;
800 Ok((stream, value))
801 })
802 }
803
804 #[cfg(feature = "tokio-tungstenite024")]
805 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
806 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
810 Box::pin(async move {
811 let value = Self::read_ws024(&mut stream).await?;
812 Ok((stream, value))
813 })
814 }
815
816 #[cfg(feature = "tokio-tungstenite026")]
817 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
818 fn read_ws_owned026<R: Stream<Item = Result<tungstenite026::Message, tungstenite026::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
822 Box::pin(async move {
823 let value = Self::read_ws026(&mut stream).await?;
824 Ok((stream, value))
825 })
826 }
827}
828
829#[cfg(feature = "tokio-tungstenite021")]
833#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
834pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
835 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
836 let (sink, stream) = sock.split();
837 Ok((
838 sink.sink_map_err(|e| WriteError {
839 context: ErrorContext::WebSocketSink,
840 kind: e.into(),
841 }).with_flat_map::<W, _, _>(|msg| {
842 let mut buf = Vec::default();
843 match msg.write_sync(&mut buf) {
844 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
845 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
846 } else {
847 Either::Right(stream::iter(
848 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
849 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
850 .collect::<Vec<_>>()
851 ))
852 }.map(Ok)),
853 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
854 context: ErrorContext::WebSocket {
855 source: Box::new(context),
856 },
857 kind,
858 }))),
859 }
860 }),
861 stream.scan(None, |state, res| {
862 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
863 let packet = res.map_err(|e| ReadError {
864 context: ErrorContext::WebSocketStream,
865 kind: e.into(),
866 })?;
867 Ok(if let Some((len, buf)) = state {
868 if let tungstenite021::Message::Binary(data) = packet {
869 buf.extend_from_slice(&data);
870 } else {
871 return Err(ReadError {
872 context: ErrorContext::DefaultImpl,
873 kind: ReadErrorKind::MessageKind021(packet),
874 })
875 }
876 if buf.len() >= *len {
877 let buf = mem::take(buf);
878 *state = None;
879 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
880 context: ErrorContext::WebSocket {
881 source: Box::new(context),
882 },
883 kind,
884 })?)))
885 } else {
886 Either::Left(stream::empty())
887 }
888 } else {
889 match packet {
890 tungstenite021::Message::Text(data) => match data.chars().next() {
891 Some('m') => {
892 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
893 context: ErrorContext::DefaultImpl,
894 kind: e.into(),
895 })?;
896 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
897 context: ErrorContext::DefaultImpl,
898 kind: e.into(),
899 })?;
900 *state = Some((len, buf));
901 Either::Left(stream::empty())
902 }
903 _ => return Err(ReadError {
904 context: ErrorContext::DefaultImpl,
905 kind: ReadErrorKind::WebSocketTextMessage024(data),
906 }),
907 },
908 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
909 context: ErrorContext::WebSocket {
910 source: Box::new(context),
911 },
912 kind,
913 })?))),
914 _ => return Err(ReadError {
915 context: ErrorContext::DefaultImpl,
916 kind: ReadErrorKind::MessageKind021(packet),
917 }),
918 }
919 })
920 }
921
922 future::ready(Some(scanner(state, res)))
923 }).try_flatten(),
924 ))
925}
926
927#[cfg(feature = "tokio-tungstenite024")]
931#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
932pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
933 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
934 let (sink, stream) = sock.split();
935 Ok((
936 sink.sink_map_err(|e| WriteError {
937 context: ErrorContext::WebSocketSink,
938 kind: e.into(),
939 }).with_flat_map::<W, _, _>(|msg| {
940 let mut buf = Vec::default();
941 match msg.write_sync(&mut buf) {
942 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
943 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
944 } else {
945 Either::Right(stream::iter(
946 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
947 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
948 .collect::<Vec<_>>()
949 ))
950 }.map(Ok)),
951 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
952 context: ErrorContext::WebSocket {
953 source: Box::new(context),
954 },
955 kind,
956 }))),
957 }
958 }),
959 stream.scan(None, |state, res| {
960 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
961 let packet = res.map_err(|e| ReadError {
962 context: ErrorContext::WebSocketStream,
963 kind: e.into(),
964 })?;
965 Ok(if let Some((len, buf)) = state {
966 if let tungstenite024::Message::Binary(data) = packet {
967 buf.extend_from_slice(&data);
968 } else {
969 return Err(ReadError {
970 context: ErrorContext::DefaultImpl,
971 kind: ReadErrorKind::MessageKind024(packet),
972 })
973 }
974 if buf.len() >= *len {
975 let buf = mem::take(buf);
976 *state = None;
977 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
978 context: ErrorContext::WebSocket {
979 source: Box::new(context),
980 },
981 kind,
982 })?)))
983 } else {
984 Either::Left(stream::empty())
985 }
986 } else {
987 match packet {
988 tungstenite024::Message::Text(data) => match data.chars().next() {
989 Some('m') => {
990 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
991 context: ErrorContext::DefaultImpl,
992 kind: e.into(),
993 })?;
994 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
995 context: ErrorContext::DefaultImpl,
996 kind: e.into(),
997 })?;
998 *state = Some((len, buf));
999 Either::Left(stream::empty())
1000 }
1001 _ => return Err(ReadError {
1002 context: ErrorContext::DefaultImpl,
1003 kind: ReadErrorKind::WebSocketTextMessage024(data),
1004 }),
1005 },
1006 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1007 context: ErrorContext::WebSocket {
1008 source: Box::new(context),
1009 },
1010 kind,
1011 })?))),
1012 _ => return Err(ReadError {
1013 context: ErrorContext::DefaultImpl,
1014 kind: ReadErrorKind::MessageKind024(packet),
1015 }),
1016 }
1017 })
1018 }
1019
1020 future::ready(Some(scanner(state, res)))
1021 }).try_flatten(),
1022 ))
1023}
1024
1025#[cfg(feature = "tokio-tungstenite026")]
1029#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite026")))]
1030pub async fn websocket026<R: Protocol, W: Protocol>(request: impl tungstenite026::client::IntoClientRequest + Unpin) -> tungstenite026::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1031 let (sock, _) = tokio_tungstenite026::connect_async(request).await?;
1032 let (sink, stream) = sock.split();
1033 Ok((
1034 sink.sink_map_err(|e| WriteError {
1035 context: ErrorContext::WebSocketSink,
1036 kind: e.into(),
1037 }).with_flat_map::<W, _, _>(|msg| {
1038 let mut buf = Vec::default();
1039 match msg.write_sync(&mut buf) {
1040 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1041 Either::Left(stream::once(future::ready(tungstenite026::Message::binary(buf))))
1042 } else {
1043 Either::Right(stream::iter(
1044 iter::once(tungstenite026::Message::text(format!("m{}", buf.len())))
1045 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite026::Message::binary(tungstenite026::Bytes::copy_from_slice(chunk))))
1046 .collect::<Vec<_>>()
1047 ))
1048 }.map(Ok)),
1049 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1050 context: ErrorContext::WebSocket {
1051 source: Box::new(context),
1052 },
1053 kind,
1054 }))),
1055 }
1056 }),
1057 stream.scan(None, |state, res| {
1058 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite026::Result<tungstenite026::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1059 let packet = res.map_err(|e| ReadError {
1060 context: ErrorContext::WebSocketStream,
1061 kind: e.into(),
1062 })?;
1063 Ok(if let Some((len, buf)) = state {
1064 if let tungstenite026::Message::Binary(data) = packet {
1065 buf.extend_from_slice(&data);
1066 } else {
1067 return Err(ReadError {
1068 context: ErrorContext::DefaultImpl,
1069 kind: ReadErrorKind::MessageKind026(packet),
1070 })
1071 }
1072 if buf.len() >= *len {
1073 let buf = mem::take(buf);
1074 *state = None;
1075 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1076 context: ErrorContext::WebSocket {
1077 source: Box::new(context),
1078 },
1079 kind,
1080 })?)))
1081 } else {
1082 Either::Left(stream::empty())
1083 }
1084 } else {
1085 match packet {
1086 tungstenite026::Message::Text(data) => match data.chars().next() {
1087 Some('m') => {
1088 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1089 context: ErrorContext::DefaultImpl,
1090 kind: e.into(),
1091 })?;
1092 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1093 context: ErrorContext::DefaultImpl,
1094 kind: e.into(),
1095 })?;
1096 *state = Some((len, buf));
1097 Either::Left(stream::empty())
1098 }
1099 _ => return Err(ReadError {
1100 context: ErrorContext::DefaultImpl,
1101 kind: ReadErrorKind::WebSocketTextMessage026(data),
1102 }),
1103 },
1104 tungstenite026::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1105 context: ErrorContext::WebSocket {
1106 source: Box::new(context),
1107 },
1108 kind,
1109 })?))),
1110 _ => return Err(ReadError {
1111 context: ErrorContext::DefaultImpl,
1112 kind: ReadErrorKind::MessageKind026(packet),
1113 }),
1114 }
1115 })
1116 }
1117
1118 future::ready(Some(scanner(state, res)))
1119 }).try_flatten(),
1120 ))
1121}