1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
42 std::{
43 future::Future,
44 io::{
45 self,
46 prelude::*,
47 },
48 pin::Pin,
49 },
50 tokio::io::{
51 AsyncRead,
52 AsyncWrite,
53 },
54};
55#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
56 std::{
57 iter,
58 mem,
59 },
60 fallible_collections::FallibleVec,
61 futures::{
62 Sink,
63 SinkExt as _,
64 future::{
65 self,
66 Either,
67 },
68 stream::{
69 self,
70 Stream,
71 StreamExt as _,
72 TryStreamExt as _,
73 },
74 },
75};
76#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
77#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
78#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
79pub use {
80 async_proto_derive::{
81 Protocol,
82 bitflags,
83 },
84 crate::error::*,
85};
86#[doc(hidden)] pub use tokio; mod error;
89mod impls;
90
91#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
93
94pub trait Protocol: Sized {
96 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
102 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
108 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
110 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
112
113 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
117 Box::pin(async move {
118 let value = Self::read(&mut stream).await?;
119 Ok((stream, value))
120 })
121 }
122
123 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
166 let mut temp_buf = vec![0; 8];
167 loop {
168 let mut slice = &mut &**buf;
169 match Self::read_sync(&mut slice) {
170 Ok(value) => {
171 let value_len = slice.len();
172 buf.drain(..buf.len() - value_len);
173 return Ok(Some(value))
174 }
175 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
176 Err(e) => return Err(e),
177 }
178 match stream.read(&mut temp_buf) {
179 Ok(0) => return Err(ReadError {
180 context: ErrorContext::DefaultImpl,
181 kind: ReadErrorKind::EndOfStream,
182 }),
183 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
184 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
185 Err(e) => return Err(ReadError {
186 context: ErrorContext::DefaultImpl,
187 kind: e.into(),
188 }),
189 }
190 }
191 }
192
193 #[cfg(feature = "tokio-tungstenite021")]
194 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
195 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
201 Box::pin(async move {
202 let packet = stream.try_next().await.map_err(|e| ReadError {
203 context: ErrorContext::DefaultImpl,
204 kind: e.into(),
205 })?.ok_or_else(|| ReadError {
206 context: ErrorContext::DefaultImpl,
207 kind: ReadErrorKind::EndOfStream,
208 })?;
209 match packet {
210 tungstenite021::Message::Text(data) => match data.chars().next() {
211 Some('m') => {
212 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
213 context: ErrorContext::DefaultImpl,
214 kind: e.into(),
215 })?;
216 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
217 context: ErrorContext::DefaultImpl,
218 kind: e.into(),
219 })?;
220 while buf.len() < len {
221 let packet = stream.try_next().await.map_err(|e| ReadError {
222 context: ErrorContext::DefaultImpl,
223 kind: e.into(),
224 })?.ok_or_else(|| ReadError {
225 context: ErrorContext::DefaultImpl,
226 kind: ReadErrorKind::EndOfStream,
227 })?;
228 if let tungstenite021::Message::Binary(data) = packet {
229 buf.extend_from_slice(&data);
230 } else {
231 return Err(ReadError {
232 context: ErrorContext::DefaultImpl,
233 kind: ReadErrorKind::MessageKind021(packet),
234 })
235 }
236 }
237 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
238 context: ErrorContext::WebSocket {
239 source: Box::new(context),
240 },
241 kind,
242 })
243 }
244 _ => Err(ReadError {
245 context: ErrorContext::DefaultImpl,
246 kind: ReadErrorKind::WebSocketTextMessage024(data),
247 }),
248 },
249 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
250 context: ErrorContext::WebSocket {
251 source: Box::new(context),
252 },
253 kind,
254 }),
255 _ => Err(ReadError {
256 context: ErrorContext::DefaultImpl,
257 kind: ReadErrorKind::MessageKind021(packet),
258 }),
259 }
260 })
261 }
262
263 #[cfg(feature = "tokio-tungstenite024")]
264 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
265 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
271 Box::pin(async move {
272 let packet = stream.try_next().await.map_err(|e| ReadError {
273 context: ErrorContext::DefaultImpl,
274 kind: e.into(),
275 })?.ok_or_else(|| ReadError {
276 context: ErrorContext::DefaultImpl,
277 kind: ReadErrorKind::EndOfStream,
278 })?;
279 match packet {
280 tungstenite024::Message::Text(data) => match data.chars().next() {
281 Some('m') => {
282 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
283 context: ErrorContext::DefaultImpl,
284 kind: e.into(),
285 })?;
286 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
287 context: ErrorContext::DefaultImpl,
288 kind: e.into(),
289 })?;
290 while buf.len() < len {
291 let packet = stream.try_next().await.map_err(|e| ReadError {
292 context: ErrorContext::DefaultImpl,
293 kind: e.into(),
294 })?.ok_or_else(|| ReadError {
295 context: ErrorContext::DefaultImpl,
296 kind: ReadErrorKind::EndOfStream,
297 })?;
298 if let tungstenite024::Message::Binary(data) = packet {
299 buf.extend_from_slice(&data);
300 } else {
301 return Err(ReadError {
302 context: ErrorContext::DefaultImpl,
303 kind: ReadErrorKind::MessageKind024(packet),
304 })
305 }
306 }
307 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
308 context: ErrorContext::WebSocket {
309 source: Box::new(context),
310 },
311 kind,
312 })
313 }
314 _ => Err(ReadError {
315 context: ErrorContext::DefaultImpl,
316 kind: ReadErrorKind::WebSocketTextMessage024(data),
317 }),
318 },
319 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
320 context: ErrorContext::WebSocket {
321 source: Box::new(context),
322 },
323 kind,
324 }),
325 _ => Err(ReadError {
326 context: ErrorContext::DefaultImpl,
327 kind: ReadErrorKind::MessageKind024(packet),
328 }),
329 }
330 })
331 }
332
333 #[cfg(feature = "tokio-tungstenite027")]
334 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
335 fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
341 Box::pin(async move {
342 let packet = stream.try_next().await.map_err(|e| ReadError {
343 context: ErrorContext::DefaultImpl,
344 kind: e.into(),
345 })?.ok_or_else(|| ReadError {
346 context: ErrorContext::DefaultImpl,
347 kind: ReadErrorKind::EndOfStream,
348 })?;
349 match packet {
350 tungstenite027::Message::Text(data) => match data.chars().next() {
351 Some('m') => {
352 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
353 context: ErrorContext::DefaultImpl,
354 kind: e.into(),
355 })?;
356 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
357 context: ErrorContext::DefaultImpl,
358 kind: e.into(),
359 })?;
360 while buf.len() < len {
361 let packet = stream.try_next().await.map_err(|e| ReadError {
362 context: ErrorContext::DefaultImpl,
363 kind: e.into(),
364 })?.ok_or_else(|| ReadError {
365 context: ErrorContext::DefaultImpl,
366 kind: ReadErrorKind::EndOfStream,
367 })?;
368 if let tungstenite027::Message::Binary(data) = packet {
369 buf.extend_from_slice(&data);
370 } else {
371 return Err(ReadError {
372 context: ErrorContext::DefaultImpl,
373 kind: ReadErrorKind::MessageKind027(packet),
374 })
375 }
376 }
377 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
378 context: ErrorContext::WebSocket {
379 source: Box::new(context),
380 },
381 kind,
382 })
383 }
384 _ => Err(ReadError {
385 context: ErrorContext::DefaultImpl,
386 kind: ReadErrorKind::WebSocketTextMessage027(data),
387 }),
388 },
389 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
390 context: ErrorContext::WebSocket {
391 source: Box::new(context),
392 },
393 kind,
394 }),
395 _ => Err(ReadError {
396 context: ErrorContext::DefaultImpl,
397 kind: ReadErrorKind::MessageKind027(packet),
398 }),
399 }
400 })
401 }
402
403 #[cfg(feature = "tokio-tungstenite021")]
404 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
405 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
411 where Self: Sync {
412 Box::pin(async move {
413 let mut buf = Vec::default();
414 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
415 context: ErrorContext::WebSocket {
416 source: Box::new(context),
417 },
418 kind,
419 })?;
420 if buf.len() <= WS_MAX_MESSAGE_SIZE {
421 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
422 context: ErrorContext::DefaultImpl,
423 kind: e.into(),
424 })?;
425 } else {
426 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
427 context: ErrorContext::DefaultImpl,
428 kind: e.into(),
429 })?;
430 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
431 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
432 context: ErrorContext::DefaultImpl,
433 kind: e.into(),
434 })?;
435 }
436 }
437 Ok(())
438 })
439 }
440
441 #[cfg(feature = "tokio-tungstenite024")]
442 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
443 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
449 where Self: Sync {
450 Box::pin(async move {
451 let mut buf = Vec::default();
452 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
453 context: ErrorContext::WebSocket {
454 source: Box::new(context),
455 },
456 kind,
457 })?;
458 if buf.len() <= WS_MAX_MESSAGE_SIZE {
459 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
460 context: ErrorContext::DefaultImpl,
461 kind: e.into(),
462 })?;
463 } else {
464 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
465 context: ErrorContext::DefaultImpl,
466 kind: e.into(),
467 })?;
468 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
469 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
470 context: ErrorContext::DefaultImpl,
471 kind: e.into(),
472 })?;
473 }
474 }
475 Ok(())
476 })
477 }
478
479 #[cfg(feature = "tokio-tungstenite027")]
480 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
481 fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
487 where Self: Sync {
488 Box::pin(async move {
489 let mut buf = Vec::default();
490 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
491 context: ErrorContext::WebSocket {
492 source: Box::new(context),
493 },
494 kind,
495 })?;
496 if buf.len() <= WS_MAX_MESSAGE_SIZE {
497 sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
498 context: ErrorContext::DefaultImpl,
499 kind: e.into(),
500 })?;
501 } else {
502 sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
503 context: ErrorContext::DefaultImpl,
504 kind: e.into(),
505 })?;
506 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
507 sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
508 context: ErrorContext::DefaultImpl,
509 kind: e.into(),
510 })?;
511 }
512 }
513 Ok(())
514 })
515 }
516
517 #[cfg(feature = "tokio-tungstenite021")]
518 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
519 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
521 let packet = websocket.read().map_err(|e| ReadError {
522 context: ErrorContext::DefaultImpl,
523 kind: e.into(),
524 })?;
525 match packet {
526 tungstenite021::Message::Text(data) => match data.chars().next() {
527 Some('m') => {
528 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
529 context: ErrorContext::DefaultImpl,
530 kind: e.into(),
531 })?;
532 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
533 context: ErrorContext::DefaultImpl,
534 kind: e.into(),
535 })?;
536 while buf.len() < len {
537 let packet = websocket.read().map_err(|e| ReadError {
538 context: ErrorContext::DefaultImpl,
539 kind: e.into(),
540 })?;
541 if let tungstenite021::Message::Binary(data) = packet {
542 buf.extend_from_slice(&data);
543 } else {
544 return Err(ReadError {
545 context: ErrorContext::DefaultImpl,
546 kind: ReadErrorKind::MessageKind021(packet),
547 })
548 }
549 }
550 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
551 context: ErrorContext::WebSocket {
552 source: Box::new(context),
553 },
554 kind,
555 })
556 }
557 _ => return Err(ReadError {
558 context: ErrorContext::DefaultImpl,
559 kind: ReadErrorKind::WebSocketTextMessage024(data),
560 }),
561 },
562 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
563 context: ErrorContext::WebSocket {
564 source: Box::new(context),
565 },
566 kind,
567 }),
568 _ => Err(ReadError {
569 context: ErrorContext::DefaultImpl,
570 kind: ReadErrorKind::MessageKind021(packet),
571 }),
572 }
573 }
574
575 #[cfg(feature = "tokio-tungstenite024")]
576 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
577 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
579 let packet = websocket.read().map_err(|e| ReadError {
580 context: ErrorContext::DefaultImpl,
581 kind: e.into(),
582 })?;
583 match packet {
584 tungstenite024::Message::Text(data) => match data.chars().next() {
585 Some('m') => {
586 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
587 context: ErrorContext::DefaultImpl,
588 kind: e.into(),
589 })?;
590 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
591 context: ErrorContext::DefaultImpl,
592 kind: e.into(),
593 })?;
594 while buf.len() < len {
595 let packet = websocket.read().map_err(|e| ReadError {
596 context: ErrorContext::DefaultImpl,
597 kind: e.into(),
598 })?;
599 if let tungstenite024::Message::Binary(data) = packet {
600 buf.extend_from_slice(&data);
601 } else {
602 return Err(ReadError {
603 context: ErrorContext::DefaultImpl,
604 kind: ReadErrorKind::MessageKind024(packet),
605 })
606 }
607 }
608 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
609 context: ErrorContext::WebSocket {
610 source: Box::new(context),
611 },
612 kind,
613 })
614 }
615 _ => return Err(ReadError {
616 context: ErrorContext::DefaultImpl,
617 kind: ReadErrorKind::WebSocketTextMessage024(data),
618 }),
619 },
620 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
621 context: ErrorContext::WebSocket {
622 source: Box::new(context),
623 },
624 kind,
625 }),
626 _ => Err(ReadError {
627 context: ErrorContext::DefaultImpl,
628 kind: ReadErrorKind::MessageKind024(packet),
629 }),
630 }
631 }
632
633 #[cfg(feature = "tokio-tungstenite027")]
634 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
635 fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
637 let packet = websocket.read().map_err(|e| ReadError {
638 context: ErrorContext::DefaultImpl,
639 kind: e.into(),
640 })?;
641 match packet {
642 tungstenite027::Message::Text(data) => match data.chars().next() {
643 Some('m') => {
644 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
645 context: ErrorContext::DefaultImpl,
646 kind: e.into(),
647 })?;
648 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
649 context: ErrorContext::DefaultImpl,
650 kind: e.into(),
651 })?;
652 while buf.len() < len {
653 let packet = websocket.read().map_err(|e| ReadError {
654 context: ErrorContext::DefaultImpl,
655 kind: e.into(),
656 })?;
657 if let tungstenite027::Message::Binary(data) = packet {
658 buf.extend_from_slice(&data);
659 } else {
660 return Err(ReadError {
661 context: ErrorContext::DefaultImpl,
662 kind: ReadErrorKind::MessageKind027(packet),
663 })
664 }
665 }
666 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
667 context: ErrorContext::WebSocket {
668 source: Box::new(context),
669 },
670 kind,
671 })
672 }
673 _ => return Err(ReadError {
674 context: ErrorContext::DefaultImpl,
675 kind: ReadErrorKind::WebSocketTextMessage027(data),
676 }),
677 },
678 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
679 context: ErrorContext::WebSocket {
680 source: Box::new(context),
681 },
682 kind,
683 }),
684 _ => Err(ReadError {
685 context: ErrorContext::DefaultImpl,
686 kind: ReadErrorKind::MessageKind027(packet),
687 }),
688 }
689 }
690
691 #[cfg(feature = "tokio-tungstenite021")]
692 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
693 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
695 let mut buf = Vec::default();
696 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
697 context: ErrorContext::WebSocket {
698 source: Box::new(context),
699 },
700 kind,
701 })?;
702 if buf.len() <= WS_MAX_MESSAGE_SIZE {
703 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
704 context: ErrorContext::DefaultImpl,
705 kind: e.into(),
706 })?;
707 } else {
708 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
709 context: ErrorContext::DefaultImpl,
710 kind: e.into(),
711 })?;
712 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
713 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
714 context: ErrorContext::DefaultImpl,
715 kind: e.into(),
716 })?;
717 }
718 }
719 websocket.flush().map_err(|e| WriteError {
720 context: ErrorContext::DefaultImpl,
721 kind: e.into(),
722 })?;
723 Ok(())
724 }
725
726 #[cfg(feature = "tokio-tungstenite024")]
727 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
728 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
730 let mut buf = Vec::default();
731 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
732 context: ErrorContext::WebSocket {
733 source: Box::new(context),
734 },
735 kind,
736 })?;
737 if buf.len() <= WS_MAX_MESSAGE_SIZE {
738 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
739 context: ErrorContext::DefaultImpl,
740 kind: e.into(),
741 })?;
742 } else {
743 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
744 context: ErrorContext::DefaultImpl,
745 kind: e.into(),
746 })?;
747 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
748 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
749 context: ErrorContext::DefaultImpl,
750 kind: e.into(),
751 })?;
752 }
753 }
754 websocket.flush().map_err(|e| WriteError {
755 context: ErrorContext::DefaultImpl,
756 kind: e.into(),
757 })?;
758 Ok(())
759 }
760
761 #[cfg(feature = "tokio-tungstenite027")]
762 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
763 fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
765 let mut buf = Vec::default();
766 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
767 context: ErrorContext::WebSocket {
768 source: Box::new(context),
769 },
770 kind,
771 })?;
772 if buf.len() <= WS_MAX_MESSAGE_SIZE {
773 websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
774 context: ErrorContext::DefaultImpl,
775 kind: e.into(),
776 })?;
777 } else {
778 websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
779 context: ErrorContext::DefaultImpl,
780 kind: e.into(),
781 })?;
782 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
783 websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
784 context: ErrorContext::DefaultImpl,
785 kind: e.into(),
786 })?;
787 }
788 }
789 websocket.flush().map_err(|e| WriteError {
790 context: ErrorContext::DefaultImpl,
791 kind: e.into(),
792 })?;
793 Ok(())
794 }
795
796 #[cfg(feature = "tokio-tungstenite021")]
797 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
798 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
802 Box::pin(async move {
803 let value = Self::read_ws021(&mut stream).await?;
804 Ok((stream, value))
805 })
806 }
807
808 #[cfg(feature = "tokio-tungstenite024")]
809 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
810 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
814 Box::pin(async move {
815 let value = Self::read_ws024(&mut stream).await?;
816 Ok((stream, value))
817 })
818 }
819
820 #[cfg(feature = "tokio-tungstenite027")]
821 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
822 fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
826 Box::pin(async move {
827 let value = Self::read_ws027(&mut stream).await?;
828 Ok((stream, value))
829 })
830 }
831}
832
833pub trait LengthPrefixed: Protocol {
837 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
839 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
841 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
843 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
845}
846
847#[cfg(feature = "tokio-tungstenite021")]
851#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
852pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
853 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
854 let (sink, stream) = sock.split();
855 Ok((
856 sink.sink_map_err(|e| WriteError {
857 context: ErrorContext::WebSocketSink,
858 kind: e.into(),
859 }).with_flat_map::<W, _, _>(|msg| {
860 let mut buf = Vec::default();
861 match msg.write_sync(&mut buf) {
862 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
863 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
864 } else {
865 Either::Right(stream::iter(
866 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
867 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
868 .collect::<Vec<_>>()
869 ))
870 }.map(Ok)),
871 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
872 context: ErrorContext::WebSocket {
873 source: Box::new(context),
874 },
875 kind,
876 }))),
877 }
878 }),
879 stream.scan(None, |state, res| {
880 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
881 let packet = res.map_err(|e| ReadError {
882 context: ErrorContext::WebSocketStream,
883 kind: e.into(),
884 })?;
885 Ok(if let Some((len, buf)) = state {
886 if let tungstenite021::Message::Binary(data) = packet {
887 buf.extend_from_slice(&data);
888 } else {
889 return Err(ReadError {
890 context: ErrorContext::DefaultImpl,
891 kind: ReadErrorKind::MessageKind021(packet),
892 })
893 }
894 if buf.len() >= *len {
895 let buf = mem::take(buf);
896 *state = None;
897 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
898 context: ErrorContext::WebSocket {
899 source: Box::new(context),
900 },
901 kind,
902 })?)))
903 } else {
904 Either::Left(stream::empty())
905 }
906 } else {
907 match packet {
908 tungstenite021::Message::Text(data) => match data.chars().next() {
909 Some('m') => {
910 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
911 context: ErrorContext::DefaultImpl,
912 kind: e.into(),
913 })?;
914 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
915 context: ErrorContext::DefaultImpl,
916 kind: e.into(),
917 })?;
918 *state = Some((len, buf));
919 Either::Left(stream::empty())
920 }
921 _ => return Err(ReadError {
922 context: ErrorContext::DefaultImpl,
923 kind: ReadErrorKind::WebSocketTextMessage024(data),
924 }),
925 },
926 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
927 context: ErrorContext::WebSocket {
928 source: Box::new(context),
929 },
930 kind,
931 })?))),
932 _ => return Err(ReadError {
933 context: ErrorContext::DefaultImpl,
934 kind: ReadErrorKind::MessageKind021(packet),
935 }),
936 }
937 })
938 }
939
940 future::ready(Some(scanner(state, res)))
941 }).try_flatten(),
942 ))
943}
944
945#[cfg(feature = "tokio-tungstenite024")]
949#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
950pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
951 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
952 let (sink, stream) = sock.split();
953 Ok((
954 sink.sink_map_err(|e| WriteError {
955 context: ErrorContext::WebSocketSink,
956 kind: e.into(),
957 }).with_flat_map::<W, _, _>(|msg| {
958 let mut buf = Vec::default();
959 match msg.write_sync(&mut buf) {
960 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
961 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
962 } else {
963 Either::Right(stream::iter(
964 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
965 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
966 .collect::<Vec<_>>()
967 ))
968 }.map(Ok)),
969 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
970 context: ErrorContext::WebSocket {
971 source: Box::new(context),
972 },
973 kind,
974 }))),
975 }
976 }),
977 stream.scan(None, |state, res| {
978 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
979 let packet = res.map_err(|e| ReadError {
980 context: ErrorContext::WebSocketStream,
981 kind: e.into(),
982 })?;
983 Ok(if let Some((len, buf)) = state {
984 if let tungstenite024::Message::Binary(data) = packet {
985 buf.extend_from_slice(&data);
986 } else {
987 return Err(ReadError {
988 context: ErrorContext::DefaultImpl,
989 kind: ReadErrorKind::MessageKind024(packet),
990 })
991 }
992 if buf.len() >= *len {
993 let buf = mem::take(buf);
994 *state = None;
995 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
996 context: ErrorContext::WebSocket {
997 source: Box::new(context),
998 },
999 kind,
1000 })?)))
1001 } else {
1002 Either::Left(stream::empty())
1003 }
1004 } else {
1005 match packet {
1006 tungstenite024::Message::Text(data) => match data.chars().next() {
1007 Some('m') => {
1008 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1009 context: ErrorContext::DefaultImpl,
1010 kind: e.into(),
1011 })?;
1012 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1013 context: ErrorContext::DefaultImpl,
1014 kind: e.into(),
1015 })?;
1016 *state = Some((len, buf));
1017 Either::Left(stream::empty())
1018 }
1019 _ => return Err(ReadError {
1020 context: ErrorContext::DefaultImpl,
1021 kind: ReadErrorKind::WebSocketTextMessage024(data),
1022 }),
1023 },
1024 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1025 context: ErrorContext::WebSocket {
1026 source: Box::new(context),
1027 },
1028 kind,
1029 })?))),
1030 _ => return Err(ReadError {
1031 context: ErrorContext::DefaultImpl,
1032 kind: ReadErrorKind::MessageKind024(packet),
1033 }),
1034 }
1035 })
1036 }
1037
1038 future::ready(Some(scanner(state, res)))
1039 }).try_flatten(),
1040 ))
1041}
1042
1043#[cfg(feature = "tokio-tungstenite027")]
1047#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1048pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1049 let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1050 let (sink, stream) = sock.split();
1051 Ok((
1052 sink.sink_map_err(|e| WriteError {
1053 context: ErrorContext::WebSocketSink,
1054 kind: e.into(),
1055 }).with_flat_map::<W, _, _>(|msg| {
1056 let mut buf = Vec::default();
1057 match msg.write_sync(&mut buf) {
1058 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1059 Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1060 } else {
1061 Either::Right(stream::iter(
1062 iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1063 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1064 .collect::<Vec<_>>()
1065 ))
1066 }.map(Ok)),
1067 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1068 context: ErrorContext::WebSocket {
1069 source: Box::new(context),
1070 },
1071 kind,
1072 }))),
1073 }
1074 }),
1075 stream.scan(None, |state, res| {
1076 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1077 let packet = res.map_err(|e| ReadError {
1078 context: ErrorContext::WebSocketStream,
1079 kind: e.into(),
1080 })?;
1081 Ok(if let Some((len, buf)) = state {
1082 if let tungstenite027::Message::Binary(data) = packet {
1083 buf.extend_from_slice(&data);
1084 } else {
1085 return Err(ReadError {
1086 context: ErrorContext::DefaultImpl,
1087 kind: ReadErrorKind::MessageKind027(packet),
1088 })
1089 }
1090 if buf.len() >= *len {
1091 let buf = mem::take(buf);
1092 *state = None;
1093 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1094 context: ErrorContext::WebSocket {
1095 source: Box::new(context),
1096 },
1097 kind,
1098 })?)))
1099 } else {
1100 Either::Left(stream::empty())
1101 }
1102 } else {
1103 match packet {
1104 tungstenite027::Message::Text(data) => match data.chars().next() {
1105 Some('m') => {
1106 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1107 context: ErrorContext::DefaultImpl,
1108 kind: e.into(),
1109 })?;
1110 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1111 context: ErrorContext::DefaultImpl,
1112 kind: e.into(),
1113 })?;
1114 *state = Some((len, buf));
1115 Either::Left(stream::empty())
1116 }
1117 _ => return Err(ReadError {
1118 context: ErrorContext::DefaultImpl,
1119 kind: ReadErrorKind::WebSocketTextMessage027(data),
1120 }),
1121 },
1122 tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1123 context: ErrorContext::WebSocket {
1124 source: Box::new(context),
1125 },
1126 kind,
1127 })?))),
1128 _ => return Err(ReadError {
1129 context: ErrorContext::DefaultImpl,
1130 kind: ReadErrorKind::MessageKind027(packet),
1131 }),
1132 }
1133 })
1134 }
1135
1136 future::ready(Some(scanner(state, res)))
1137 }).try_flatten(),
1138 ))
1139}