1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
39 std::{
40 future::Future,
41 io::{
42 self,
43 prelude::*,
44 },
45 pin::Pin,
46 },
47 tokio::io::{
48 AsyncRead,
49 AsyncWrite,
50 },
51};
52#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
53 std::{
54 iter,
55 mem,
56 },
57 fallible_collections::FallibleVec,
58 futures::{
59 Sink,
60 SinkExt as _,
61 future::{
62 self,
63 Either,
64 },
65 stream::{
66 self,
67 Stream,
68 StreamExt as _,
69 TryStreamExt as _,
70 },
71 },
72};
73#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
74#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
75#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
76pub use {
77 async_proto_derive::{
78 Protocol,
79 bitflags,
80 },
81 crate::error::*,
82};
83#[doc(hidden)] pub use tokio; mod error;
86mod impls;
87
88#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
90
91pub trait Protocol: Sized {
93 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
99 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
105 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
107 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
109
110 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
114 Box::pin(async move {
115 let value = Self::read(&mut stream).await?;
116 Ok((stream, value))
117 })
118 }
119
120 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
163 let mut temp_buf = vec![0; 8];
164 loop {
165 let mut slice = &mut &**buf;
166 match Self::read_sync(&mut slice) {
167 Ok(value) => {
168 let value_len = slice.len();
169 buf.drain(..buf.len() - value_len);
170 return Ok(Some(value))
171 }
172 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
173 Err(e) => return Err(e),
174 }
175 match stream.read(&mut temp_buf) {
176 Ok(0) => return Err(ReadError {
177 context: ErrorContext::DefaultImpl,
178 kind: ReadErrorKind::EndOfStream,
179 }),
180 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
181 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
182 Err(e) => return Err(ReadError {
183 context: ErrorContext::DefaultImpl,
184 kind: e.into(),
185 }),
186 }
187 }
188 }
189
190 #[cfg(feature = "tokio-tungstenite021")]
191 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
192 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
198 Box::pin(async move {
199 let packet = stream.try_next().await.map_err(|e| ReadError {
200 context: ErrorContext::DefaultImpl,
201 kind: e.into(),
202 })?.ok_or_else(|| ReadError {
203 context: ErrorContext::DefaultImpl,
204 kind: ReadErrorKind::EndOfStream,
205 })?;
206 match packet {
207 tungstenite021::Message::Text(data) => match data.chars().next() {
208 Some('m') => {
209 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
210 context: ErrorContext::DefaultImpl,
211 kind: e.into(),
212 })?;
213 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
214 context: ErrorContext::DefaultImpl,
215 kind: e.into(),
216 })?;
217 while buf.len() < len {
218 let packet = stream.try_next().await.map_err(|e| ReadError {
219 context: ErrorContext::DefaultImpl,
220 kind: e.into(),
221 })?.ok_or_else(|| ReadError {
222 context: ErrorContext::DefaultImpl,
223 kind: ReadErrorKind::EndOfStream,
224 })?;
225 if let tungstenite021::Message::Binary(data) = packet {
226 buf.extend_from_slice(&data);
227 } else {
228 return Err(ReadError {
229 context: ErrorContext::DefaultImpl,
230 kind: ReadErrorKind::MessageKind021(packet),
231 })
232 }
233 }
234 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
235 context: ErrorContext::WebSocket {
236 source: Box::new(context),
237 },
238 kind,
239 })
240 }
241 _ => Err(ReadError {
242 context: ErrorContext::DefaultImpl,
243 kind: ReadErrorKind::WebSocketTextMessage024(data),
244 }),
245 },
246 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
247 context: ErrorContext::WebSocket {
248 source: Box::new(context),
249 },
250 kind,
251 }),
252 _ => Err(ReadError {
253 context: ErrorContext::DefaultImpl,
254 kind: ReadErrorKind::MessageKind021(packet),
255 }),
256 }
257 })
258 }
259
260 #[cfg(feature = "tokio-tungstenite024")]
261 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
262 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
268 Box::pin(async move {
269 let packet = stream.try_next().await.map_err(|e| ReadError {
270 context: ErrorContext::DefaultImpl,
271 kind: e.into(),
272 })?.ok_or_else(|| ReadError {
273 context: ErrorContext::DefaultImpl,
274 kind: ReadErrorKind::EndOfStream,
275 })?;
276 match packet {
277 tungstenite024::Message::Text(data) => match data.chars().next() {
278 Some('m') => {
279 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
280 context: ErrorContext::DefaultImpl,
281 kind: e.into(),
282 })?;
283 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
284 context: ErrorContext::DefaultImpl,
285 kind: e.into(),
286 })?;
287 while buf.len() < len {
288 let packet = stream.try_next().await.map_err(|e| ReadError {
289 context: ErrorContext::DefaultImpl,
290 kind: e.into(),
291 })?.ok_or_else(|| ReadError {
292 context: ErrorContext::DefaultImpl,
293 kind: ReadErrorKind::EndOfStream,
294 })?;
295 if let tungstenite024::Message::Binary(data) = packet {
296 buf.extend_from_slice(&data);
297 } else {
298 return Err(ReadError {
299 context: ErrorContext::DefaultImpl,
300 kind: ReadErrorKind::MessageKind024(packet),
301 })
302 }
303 }
304 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
305 context: ErrorContext::WebSocket {
306 source: Box::new(context),
307 },
308 kind,
309 })
310 }
311 _ => Err(ReadError {
312 context: ErrorContext::DefaultImpl,
313 kind: ReadErrorKind::WebSocketTextMessage024(data),
314 }),
315 },
316 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
317 context: ErrorContext::WebSocket {
318 source: Box::new(context),
319 },
320 kind,
321 }),
322 _ => Err(ReadError {
323 context: ErrorContext::DefaultImpl,
324 kind: ReadErrorKind::MessageKind024(packet),
325 }),
326 }
327 })
328 }
329
330 #[cfg(feature = "tokio-tungstenite027")]
331 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
332 fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
338 Box::pin(async move {
339 let packet = stream.try_next().await.map_err(|e| ReadError {
340 context: ErrorContext::DefaultImpl,
341 kind: e.into(),
342 })?.ok_or_else(|| ReadError {
343 context: ErrorContext::DefaultImpl,
344 kind: ReadErrorKind::EndOfStream,
345 })?;
346 match packet {
347 tungstenite027::Message::Text(data) => match data.chars().next() {
348 Some('m') => {
349 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
350 context: ErrorContext::DefaultImpl,
351 kind: e.into(),
352 })?;
353 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
354 context: ErrorContext::DefaultImpl,
355 kind: e.into(),
356 })?;
357 while buf.len() < len {
358 let packet = stream.try_next().await.map_err(|e| ReadError {
359 context: ErrorContext::DefaultImpl,
360 kind: e.into(),
361 })?.ok_or_else(|| ReadError {
362 context: ErrorContext::DefaultImpl,
363 kind: ReadErrorKind::EndOfStream,
364 })?;
365 if let tungstenite027::Message::Binary(data) = packet {
366 buf.extend_from_slice(&data);
367 } else {
368 return Err(ReadError {
369 context: ErrorContext::DefaultImpl,
370 kind: ReadErrorKind::MessageKind027(packet),
371 })
372 }
373 }
374 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
375 context: ErrorContext::WebSocket {
376 source: Box::new(context),
377 },
378 kind,
379 })
380 }
381 _ => Err(ReadError {
382 context: ErrorContext::DefaultImpl,
383 kind: ReadErrorKind::WebSocketTextMessage027(data),
384 }),
385 },
386 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
387 context: ErrorContext::WebSocket {
388 source: Box::new(context),
389 },
390 kind,
391 }),
392 _ => Err(ReadError {
393 context: ErrorContext::DefaultImpl,
394 kind: ReadErrorKind::MessageKind027(packet),
395 }),
396 }
397 })
398 }
399
400 #[cfg(feature = "tokio-tungstenite021")]
401 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
402 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
408 where Self: Sync {
409 Box::pin(async move {
410 let mut buf = Vec::default();
411 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
412 context: ErrorContext::WebSocket {
413 source: Box::new(context),
414 },
415 kind,
416 })?;
417 if buf.len() <= WS_MAX_MESSAGE_SIZE {
418 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
419 context: ErrorContext::DefaultImpl,
420 kind: e.into(),
421 })?;
422 } else {
423 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
424 context: ErrorContext::DefaultImpl,
425 kind: e.into(),
426 })?;
427 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
428 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
429 context: ErrorContext::DefaultImpl,
430 kind: e.into(),
431 })?;
432 }
433 }
434 Ok(())
435 })
436 }
437
438 #[cfg(feature = "tokio-tungstenite024")]
439 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
440 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
446 where Self: Sync {
447 Box::pin(async move {
448 let mut buf = Vec::default();
449 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
450 context: ErrorContext::WebSocket {
451 source: Box::new(context),
452 },
453 kind,
454 })?;
455 if buf.len() <= WS_MAX_MESSAGE_SIZE {
456 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
457 context: ErrorContext::DefaultImpl,
458 kind: e.into(),
459 })?;
460 } else {
461 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
462 context: ErrorContext::DefaultImpl,
463 kind: e.into(),
464 })?;
465 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
466 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
467 context: ErrorContext::DefaultImpl,
468 kind: e.into(),
469 })?;
470 }
471 }
472 Ok(())
473 })
474 }
475
476 #[cfg(feature = "tokio-tungstenite027")]
477 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
478 fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
484 where Self: Sync {
485 Box::pin(async move {
486 let mut buf = Vec::default();
487 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
488 context: ErrorContext::WebSocket {
489 source: Box::new(context),
490 },
491 kind,
492 })?;
493 if buf.len() <= WS_MAX_MESSAGE_SIZE {
494 sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
495 context: ErrorContext::DefaultImpl,
496 kind: e.into(),
497 })?;
498 } else {
499 sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
500 context: ErrorContext::DefaultImpl,
501 kind: e.into(),
502 })?;
503 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
504 sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
505 context: ErrorContext::DefaultImpl,
506 kind: e.into(),
507 })?;
508 }
509 }
510 Ok(())
511 })
512 }
513
514 #[cfg(feature = "tokio-tungstenite021")]
515 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
516 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
518 let packet = websocket.read().map_err(|e| ReadError {
519 context: ErrorContext::DefaultImpl,
520 kind: e.into(),
521 })?;
522 match packet {
523 tungstenite021::Message::Text(data) => match data.chars().next() {
524 Some('m') => {
525 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
526 context: ErrorContext::DefaultImpl,
527 kind: e.into(),
528 })?;
529 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
530 context: ErrorContext::DefaultImpl,
531 kind: e.into(),
532 })?;
533 while buf.len() < len {
534 let packet = websocket.read().map_err(|e| ReadError {
535 context: ErrorContext::DefaultImpl,
536 kind: e.into(),
537 })?;
538 if let tungstenite021::Message::Binary(data) = packet {
539 buf.extend_from_slice(&data);
540 } else {
541 return Err(ReadError {
542 context: ErrorContext::DefaultImpl,
543 kind: ReadErrorKind::MessageKind021(packet),
544 })
545 }
546 }
547 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
548 context: ErrorContext::WebSocket {
549 source: Box::new(context),
550 },
551 kind,
552 })
553 }
554 _ => return Err(ReadError {
555 context: ErrorContext::DefaultImpl,
556 kind: ReadErrorKind::WebSocketTextMessage024(data),
557 }),
558 },
559 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
560 context: ErrorContext::WebSocket {
561 source: Box::new(context),
562 },
563 kind,
564 }),
565 _ => Err(ReadError {
566 context: ErrorContext::DefaultImpl,
567 kind: ReadErrorKind::MessageKind021(packet),
568 }),
569 }
570 }
571
572 #[cfg(feature = "tokio-tungstenite024")]
573 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
574 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
576 let packet = websocket.read().map_err(|e| ReadError {
577 context: ErrorContext::DefaultImpl,
578 kind: e.into(),
579 })?;
580 match packet {
581 tungstenite024::Message::Text(data) => match data.chars().next() {
582 Some('m') => {
583 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
584 context: ErrorContext::DefaultImpl,
585 kind: e.into(),
586 })?;
587 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
588 context: ErrorContext::DefaultImpl,
589 kind: e.into(),
590 })?;
591 while buf.len() < len {
592 let packet = websocket.read().map_err(|e| ReadError {
593 context: ErrorContext::DefaultImpl,
594 kind: e.into(),
595 })?;
596 if let tungstenite024::Message::Binary(data) = packet {
597 buf.extend_from_slice(&data);
598 } else {
599 return Err(ReadError {
600 context: ErrorContext::DefaultImpl,
601 kind: ReadErrorKind::MessageKind024(packet),
602 })
603 }
604 }
605 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
606 context: ErrorContext::WebSocket {
607 source: Box::new(context),
608 },
609 kind,
610 })
611 }
612 _ => return Err(ReadError {
613 context: ErrorContext::DefaultImpl,
614 kind: ReadErrorKind::WebSocketTextMessage024(data),
615 }),
616 },
617 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
618 context: ErrorContext::WebSocket {
619 source: Box::new(context),
620 },
621 kind,
622 }),
623 _ => Err(ReadError {
624 context: ErrorContext::DefaultImpl,
625 kind: ReadErrorKind::MessageKind024(packet),
626 }),
627 }
628 }
629
630 #[cfg(feature = "tokio-tungstenite027")]
631 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
632 fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
634 let packet = websocket.read().map_err(|e| ReadError {
635 context: ErrorContext::DefaultImpl,
636 kind: e.into(),
637 })?;
638 match packet {
639 tungstenite027::Message::Text(data) => match data.chars().next() {
640 Some('m') => {
641 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
642 context: ErrorContext::DefaultImpl,
643 kind: e.into(),
644 })?;
645 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
646 context: ErrorContext::DefaultImpl,
647 kind: e.into(),
648 })?;
649 while buf.len() < len {
650 let packet = websocket.read().map_err(|e| ReadError {
651 context: ErrorContext::DefaultImpl,
652 kind: e.into(),
653 })?;
654 if let tungstenite027::Message::Binary(data) = packet {
655 buf.extend_from_slice(&data);
656 } else {
657 return Err(ReadError {
658 context: ErrorContext::DefaultImpl,
659 kind: ReadErrorKind::MessageKind027(packet),
660 })
661 }
662 }
663 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
664 context: ErrorContext::WebSocket {
665 source: Box::new(context),
666 },
667 kind,
668 })
669 }
670 _ => return Err(ReadError {
671 context: ErrorContext::DefaultImpl,
672 kind: ReadErrorKind::WebSocketTextMessage027(data),
673 }),
674 },
675 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
676 context: ErrorContext::WebSocket {
677 source: Box::new(context),
678 },
679 kind,
680 }),
681 _ => Err(ReadError {
682 context: ErrorContext::DefaultImpl,
683 kind: ReadErrorKind::MessageKind027(packet),
684 }),
685 }
686 }
687
688 #[cfg(feature = "tokio-tungstenite021")]
689 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
690 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
692 let mut buf = Vec::default();
693 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
694 context: ErrorContext::WebSocket {
695 source: Box::new(context),
696 },
697 kind,
698 })?;
699 if buf.len() <= WS_MAX_MESSAGE_SIZE {
700 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
701 context: ErrorContext::DefaultImpl,
702 kind: e.into(),
703 })?;
704 } else {
705 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
706 context: ErrorContext::DefaultImpl,
707 kind: e.into(),
708 })?;
709 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
710 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
711 context: ErrorContext::DefaultImpl,
712 kind: e.into(),
713 })?;
714 }
715 }
716 websocket.flush().map_err(|e| WriteError {
717 context: ErrorContext::DefaultImpl,
718 kind: e.into(),
719 })?;
720 Ok(())
721 }
722
723 #[cfg(feature = "tokio-tungstenite024")]
724 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
725 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
727 let mut buf = Vec::default();
728 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
729 context: ErrorContext::WebSocket {
730 source: Box::new(context),
731 },
732 kind,
733 })?;
734 if buf.len() <= WS_MAX_MESSAGE_SIZE {
735 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
736 context: ErrorContext::DefaultImpl,
737 kind: e.into(),
738 })?;
739 } else {
740 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
741 context: ErrorContext::DefaultImpl,
742 kind: e.into(),
743 })?;
744 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
745 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
746 context: ErrorContext::DefaultImpl,
747 kind: e.into(),
748 })?;
749 }
750 }
751 websocket.flush().map_err(|e| WriteError {
752 context: ErrorContext::DefaultImpl,
753 kind: e.into(),
754 })?;
755 Ok(())
756 }
757
758 #[cfg(feature = "tokio-tungstenite027")]
759 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
760 fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
762 let mut buf = Vec::default();
763 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
764 context: ErrorContext::WebSocket {
765 source: Box::new(context),
766 },
767 kind,
768 })?;
769 if buf.len() <= WS_MAX_MESSAGE_SIZE {
770 websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
771 context: ErrorContext::DefaultImpl,
772 kind: e.into(),
773 })?;
774 } else {
775 websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
776 context: ErrorContext::DefaultImpl,
777 kind: e.into(),
778 })?;
779 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
780 websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
781 context: ErrorContext::DefaultImpl,
782 kind: e.into(),
783 })?;
784 }
785 }
786 websocket.flush().map_err(|e| WriteError {
787 context: ErrorContext::DefaultImpl,
788 kind: e.into(),
789 })?;
790 Ok(())
791 }
792
793 #[cfg(feature = "tokio-tungstenite021")]
794 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
795 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
799 Box::pin(async move {
800 let value = Self::read_ws021(&mut stream).await?;
801 Ok((stream, value))
802 })
803 }
804
805 #[cfg(feature = "tokio-tungstenite024")]
806 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
807 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
811 Box::pin(async move {
812 let value = Self::read_ws024(&mut stream).await?;
813 Ok((stream, value))
814 })
815 }
816
817 #[cfg(feature = "tokio-tungstenite027")]
818 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
819 fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
823 Box::pin(async move {
824 let value = Self::read_ws027(&mut stream).await?;
825 Ok((stream, value))
826 })
827 }
828}
829
830pub trait LengthPrefixed: Protocol {
834 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
836 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
838 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
840 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
842}
843
844#[cfg(feature = "tokio-tungstenite021")]
848#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
849pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
850 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
851 let (sink, stream) = sock.split();
852 Ok((
853 sink.sink_map_err(|e| WriteError {
854 context: ErrorContext::WebSocketSink,
855 kind: e.into(),
856 }).with_flat_map::<W, _, _>(|msg| {
857 let mut buf = Vec::default();
858 match msg.write_sync(&mut buf) {
859 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
860 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
861 } else {
862 Either::Right(stream::iter(
863 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
864 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
865 .collect::<Vec<_>>()
866 ))
867 }.map(Ok)),
868 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
869 context: ErrorContext::WebSocket {
870 source: Box::new(context),
871 },
872 kind,
873 }))),
874 }
875 }),
876 stream.scan(None, |state, res| {
877 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
878 let packet = res.map_err(|e| ReadError {
879 context: ErrorContext::WebSocketStream,
880 kind: e.into(),
881 })?;
882 Ok(if let Some((len, buf)) = state {
883 if let tungstenite021::Message::Binary(data) = packet {
884 buf.extend_from_slice(&data);
885 } else {
886 return Err(ReadError {
887 context: ErrorContext::DefaultImpl,
888 kind: ReadErrorKind::MessageKind021(packet),
889 })
890 }
891 if buf.len() >= *len {
892 let buf = mem::take(buf);
893 *state = None;
894 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
895 context: ErrorContext::WebSocket {
896 source: Box::new(context),
897 },
898 kind,
899 })?)))
900 } else {
901 Either::Left(stream::empty())
902 }
903 } else {
904 match packet {
905 tungstenite021::Message::Text(data) => match data.chars().next() {
906 Some('m') => {
907 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
908 context: ErrorContext::DefaultImpl,
909 kind: e.into(),
910 })?;
911 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
912 context: ErrorContext::DefaultImpl,
913 kind: e.into(),
914 })?;
915 *state = Some((len, buf));
916 Either::Left(stream::empty())
917 }
918 _ => return Err(ReadError {
919 context: ErrorContext::DefaultImpl,
920 kind: ReadErrorKind::WebSocketTextMessage024(data),
921 }),
922 },
923 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
924 context: ErrorContext::WebSocket {
925 source: Box::new(context),
926 },
927 kind,
928 })?))),
929 _ => return Err(ReadError {
930 context: ErrorContext::DefaultImpl,
931 kind: ReadErrorKind::MessageKind021(packet),
932 }),
933 }
934 })
935 }
936
937 future::ready(Some(scanner(state, res)))
938 }).try_flatten(),
939 ))
940}
941
942#[cfg(feature = "tokio-tungstenite024")]
946#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
947pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
948 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
949 let (sink, stream) = sock.split();
950 Ok((
951 sink.sink_map_err(|e| WriteError {
952 context: ErrorContext::WebSocketSink,
953 kind: e.into(),
954 }).with_flat_map::<W, _, _>(|msg| {
955 let mut buf = Vec::default();
956 match msg.write_sync(&mut buf) {
957 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
958 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
959 } else {
960 Either::Right(stream::iter(
961 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
962 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
963 .collect::<Vec<_>>()
964 ))
965 }.map(Ok)),
966 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
967 context: ErrorContext::WebSocket {
968 source: Box::new(context),
969 },
970 kind,
971 }))),
972 }
973 }),
974 stream.scan(None, |state, res| {
975 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
976 let packet = res.map_err(|e| ReadError {
977 context: ErrorContext::WebSocketStream,
978 kind: e.into(),
979 })?;
980 Ok(if let Some((len, buf)) = state {
981 if let tungstenite024::Message::Binary(data) = packet {
982 buf.extend_from_slice(&data);
983 } else {
984 return Err(ReadError {
985 context: ErrorContext::DefaultImpl,
986 kind: ReadErrorKind::MessageKind024(packet),
987 })
988 }
989 if buf.len() >= *len {
990 let buf = mem::take(buf);
991 *state = None;
992 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
993 context: ErrorContext::WebSocket {
994 source: Box::new(context),
995 },
996 kind,
997 })?)))
998 } else {
999 Either::Left(stream::empty())
1000 }
1001 } else {
1002 match packet {
1003 tungstenite024::Message::Text(data) => match data.chars().next() {
1004 Some('m') => {
1005 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1006 context: ErrorContext::DefaultImpl,
1007 kind: e.into(),
1008 })?;
1009 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1010 context: ErrorContext::DefaultImpl,
1011 kind: e.into(),
1012 })?;
1013 *state = Some((len, buf));
1014 Either::Left(stream::empty())
1015 }
1016 _ => return Err(ReadError {
1017 context: ErrorContext::DefaultImpl,
1018 kind: ReadErrorKind::WebSocketTextMessage024(data),
1019 }),
1020 },
1021 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1022 context: ErrorContext::WebSocket {
1023 source: Box::new(context),
1024 },
1025 kind,
1026 })?))),
1027 _ => return Err(ReadError {
1028 context: ErrorContext::DefaultImpl,
1029 kind: ReadErrorKind::MessageKind024(packet),
1030 }),
1031 }
1032 })
1033 }
1034
1035 future::ready(Some(scanner(state, res)))
1036 }).try_flatten(),
1037 ))
1038}
1039
1040#[cfg(feature = "tokio-tungstenite027")]
1044#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1045pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1046 let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1047 let (sink, stream) = sock.split();
1048 Ok((
1049 sink.sink_map_err(|e| WriteError {
1050 context: ErrorContext::WebSocketSink,
1051 kind: e.into(),
1052 }).with_flat_map::<W, _, _>(|msg| {
1053 let mut buf = Vec::default();
1054 match msg.write_sync(&mut buf) {
1055 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1056 Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1057 } else {
1058 Either::Right(stream::iter(
1059 iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1060 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1061 .collect::<Vec<_>>()
1062 ))
1063 }.map(Ok)),
1064 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1065 context: ErrorContext::WebSocket {
1066 source: Box::new(context),
1067 },
1068 kind,
1069 }))),
1070 }
1071 }),
1072 stream.scan(None, |state, res| {
1073 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1074 let packet = res.map_err(|e| ReadError {
1075 context: ErrorContext::WebSocketStream,
1076 kind: e.into(),
1077 })?;
1078 Ok(if let Some((len, buf)) = state {
1079 if let tungstenite027::Message::Binary(data) = packet {
1080 buf.extend_from_slice(&data);
1081 } else {
1082 return Err(ReadError {
1083 context: ErrorContext::DefaultImpl,
1084 kind: ReadErrorKind::MessageKind027(packet),
1085 })
1086 }
1087 if buf.len() >= *len {
1088 let buf = mem::take(buf);
1089 *state = None;
1090 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1091 context: ErrorContext::WebSocket {
1092 source: Box::new(context),
1093 },
1094 kind,
1095 })?)))
1096 } else {
1097 Either::Left(stream::empty())
1098 }
1099 } else {
1100 match packet {
1101 tungstenite027::Message::Text(data) => match data.chars().next() {
1102 Some('m') => {
1103 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1104 context: ErrorContext::DefaultImpl,
1105 kind: e.into(),
1106 })?;
1107 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1108 context: ErrorContext::DefaultImpl,
1109 kind: e.into(),
1110 })?;
1111 *state = Some((len, buf));
1112 Either::Left(stream::empty())
1113 }
1114 _ => return Err(ReadError {
1115 context: ErrorContext::DefaultImpl,
1116 kind: ReadErrorKind::WebSocketTextMessage027(data),
1117 }),
1118 },
1119 tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1120 context: ErrorContext::WebSocket {
1121 source: Box::new(context),
1122 },
1123 kind,
1124 })?))),
1125 _ => return Err(ReadError {
1126 context: ErrorContext::DefaultImpl,
1127 kind: ReadErrorKind::MessageKind027(packet),
1128 }),
1129 }
1130 })
1131 }
1132
1133 future::ready(Some(scanner(state, res)))
1134 }).try_flatten(),
1135 ))
1136}