1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
41 std::{
42 future::Future,
43 io::{
44 self,
45 prelude::*,
46 },
47 pin::Pin,
48 },
49 tokio::io::{
50 AsyncRead,
51 AsyncWrite,
52 },
53};
54#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
55 std::{
56 iter,
57 mem,
58 },
59 fallible_collections::FallibleVec,
60 futures::{
61 Sink,
62 SinkExt as _,
63 future::{
64 self,
65 Either,
66 },
67 stream::{
68 self,
69 Stream,
70 StreamExt as _,
71 TryStreamExt as _,
72 },
73 },
74};
75#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
76#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
77#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
78pub use {
79 async_proto_derive::{
80 Protocol,
81 bitflags,
82 },
83 crate::error::*,
84};
85#[doc(hidden)] pub use tokio; mod error;
88mod impls;
89
90#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
92
93pub trait Protocol: Sized {
95 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
101 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
107 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
109 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
111
112 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
116 Box::pin(async move {
117 let value = Self::read(&mut stream).await?;
118 Ok((stream, value))
119 })
120 }
121
122 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
165 let mut temp_buf = vec![0; 8];
166 loop {
167 let mut slice = &mut &**buf;
168 match Self::read_sync(&mut slice) {
169 Ok(value) => {
170 let value_len = slice.len();
171 buf.drain(..buf.len() - value_len);
172 return Ok(Some(value))
173 }
174 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
175 Err(e) => return Err(e),
176 }
177 match stream.read(&mut temp_buf) {
178 Ok(0) => return Err(ReadError {
179 context: ErrorContext::DefaultImpl,
180 kind: ReadErrorKind::EndOfStream,
181 }),
182 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
183 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
184 Err(e) => return Err(ReadError {
185 context: ErrorContext::DefaultImpl,
186 kind: e.into(),
187 }),
188 }
189 }
190 }
191
192 #[cfg(feature = "tokio-tungstenite021")]
193 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
194 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
200 Box::pin(async move {
201 let packet = stream.try_next().await.map_err(|e| ReadError {
202 context: ErrorContext::DefaultImpl,
203 kind: e.into(),
204 })?.ok_or_else(|| ReadError {
205 context: ErrorContext::DefaultImpl,
206 kind: ReadErrorKind::EndOfStream,
207 })?;
208 match packet {
209 tungstenite021::Message::Text(data) => match data.chars().next() {
210 Some('m') => {
211 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
212 context: ErrorContext::DefaultImpl,
213 kind: e.into(),
214 })?;
215 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
216 context: ErrorContext::DefaultImpl,
217 kind: e.into(),
218 })?;
219 while buf.len() < len {
220 let packet = stream.try_next().await.map_err(|e| ReadError {
221 context: ErrorContext::DefaultImpl,
222 kind: e.into(),
223 })?.ok_or_else(|| ReadError {
224 context: ErrorContext::DefaultImpl,
225 kind: ReadErrorKind::EndOfStream,
226 })?;
227 if let tungstenite021::Message::Binary(data) = packet {
228 buf.extend_from_slice(&data);
229 } else {
230 return Err(ReadError {
231 context: ErrorContext::DefaultImpl,
232 kind: ReadErrorKind::MessageKind021(packet),
233 })
234 }
235 }
236 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
237 context: ErrorContext::WebSocket {
238 source: Box::new(context),
239 },
240 kind,
241 })
242 }
243 _ => Err(ReadError {
244 context: ErrorContext::DefaultImpl,
245 kind: ReadErrorKind::WebSocketTextMessage024(data),
246 }),
247 },
248 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
249 context: ErrorContext::WebSocket {
250 source: Box::new(context),
251 },
252 kind,
253 }),
254 _ => Err(ReadError {
255 context: ErrorContext::DefaultImpl,
256 kind: ReadErrorKind::MessageKind021(packet),
257 }),
258 }
259 })
260 }
261
262 #[cfg(feature = "tokio-tungstenite024")]
263 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
264 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
270 Box::pin(async move {
271 let packet = stream.try_next().await.map_err(|e| ReadError {
272 context: ErrorContext::DefaultImpl,
273 kind: e.into(),
274 })?.ok_or_else(|| ReadError {
275 context: ErrorContext::DefaultImpl,
276 kind: ReadErrorKind::EndOfStream,
277 })?;
278 match packet {
279 tungstenite024::Message::Text(data) => match data.chars().next() {
280 Some('m') => {
281 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
282 context: ErrorContext::DefaultImpl,
283 kind: e.into(),
284 })?;
285 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
286 context: ErrorContext::DefaultImpl,
287 kind: e.into(),
288 })?;
289 while buf.len() < len {
290 let packet = stream.try_next().await.map_err(|e| ReadError {
291 context: ErrorContext::DefaultImpl,
292 kind: e.into(),
293 })?.ok_or_else(|| ReadError {
294 context: ErrorContext::DefaultImpl,
295 kind: ReadErrorKind::EndOfStream,
296 })?;
297 if let tungstenite024::Message::Binary(data) = packet {
298 buf.extend_from_slice(&data);
299 } else {
300 return Err(ReadError {
301 context: ErrorContext::DefaultImpl,
302 kind: ReadErrorKind::MessageKind024(packet),
303 })
304 }
305 }
306 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
307 context: ErrorContext::WebSocket {
308 source: Box::new(context),
309 },
310 kind,
311 })
312 }
313 _ => Err(ReadError {
314 context: ErrorContext::DefaultImpl,
315 kind: ReadErrorKind::WebSocketTextMessage024(data),
316 }),
317 },
318 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
319 context: ErrorContext::WebSocket {
320 source: Box::new(context),
321 },
322 kind,
323 }),
324 _ => Err(ReadError {
325 context: ErrorContext::DefaultImpl,
326 kind: ReadErrorKind::MessageKind024(packet),
327 }),
328 }
329 })
330 }
331
332 #[cfg(feature = "tokio-tungstenite027")]
333 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
334 fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
340 Box::pin(async move {
341 let packet = stream.try_next().await.map_err(|e| ReadError {
342 context: ErrorContext::DefaultImpl,
343 kind: e.into(),
344 })?.ok_or_else(|| ReadError {
345 context: ErrorContext::DefaultImpl,
346 kind: ReadErrorKind::EndOfStream,
347 })?;
348 match packet {
349 tungstenite027::Message::Text(data) => match data.chars().next() {
350 Some('m') => {
351 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
352 context: ErrorContext::DefaultImpl,
353 kind: e.into(),
354 })?;
355 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
356 context: ErrorContext::DefaultImpl,
357 kind: e.into(),
358 })?;
359 while buf.len() < len {
360 let packet = stream.try_next().await.map_err(|e| ReadError {
361 context: ErrorContext::DefaultImpl,
362 kind: e.into(),
363 })?.ok_or_else(|| ReadError {
364 context: ErrorContext::DefaultImpl,
365 kind: ReadErrorKind::EndOfStream,
366 })?;
367 if let tungstenite027::Message::Binary(data) = packet {
368 buf.extend_from_slice(&data);
369 } else {
370 return Err(ReadError {
371 context: ErrorContext::DefaultImpl,
372 kind: ReadErrorKind::MessageKind027(packet),
373 })
374 }
375 }
376 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
377 context: ErrorContext::WebSocket {
378 source: Box::new(context),
379 },
380 kind,
381 })
382 }
383 _ => Err(ReadError {
384 context: ErrorContext::DefaultImpl,
385 kind: ReadErrorKind::WebSocketTextMessage027(data),
386 }),
387 },
388 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
389 context: ErrorContext::WebSocket {
390 source: Box::new(context),
391 },
392 kind,
393 }),
394 _ => Err(ReadError {
395 context: ErrorContext::DefaultImpl,
396 kind: ReadErrorKind::MessageKind027(packet),
397 }),
398 }
399 })
400 }
401
402 #[cfg(feature = "tokio-tungstenite021")]
403 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
404 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
410 where Self: Sync {
411 Box::pin(async move {
412 let mut buf = Vec::default();
413 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
414 context: ErrorContext::WebSocket {
415 source: Box::new(context),
416 },
417 kind,
418 })?;
419 if buf.len() <= WS_MAX_MESSAGE_SIZE {
420 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
421 context: ErrorContext::DefaultImpl,
422 kind: e.into(),
423 })?;
424 } else {
425 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
426 context: ErrorContext::DefaultImpl,
427 kind: e.into(),
428 })?;
429 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
430 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
431 context: ErrorContext::DefaultImpl,
432 kind: e.into(),
433 })?;
434 }
435 }
436 Ok(())
437 })
438 }
439
440 #[cfg(feature = "tokio-tungstenite024")]
441 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
442 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
448 where Self: Sync {
449 Box::pin(async move {
450 let mut buf = Vec::default();
451 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
452 context: ErrorContext::WebSocket {
453 source: Box::new(context),
454 },
455 kind,
456 })?;
457 if buf.len() <= WS_MAX_MESSAGE_SIZE {
458 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
459 context: ErrorContext::DefaultImpl,
460 kind: e.into(),
461 })?;
462 } else {
463 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
464 context: ErrorContext::DefaultImpl,
465 kind: e.into(),
466 })?;
467 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
468 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
469 context: ErrorContext::DefaultImpl,
470 kind: e.into(),
471 })?;
472 }
473 }
474 Ok(())
475 })
476 }
477
478 #[cfg(feature = "tokio-tungstenite027")]
479 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
480 fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
486 where Self: Sync {
487 Box::pin(async move {
488 let mut buf = Vec::default();
489 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
490 context: ErrorContext::WebSocket {
491 source: Box::new(context),
492 },
493 kind,
494 })?;
495 if buf.len() <= WS_MAX_MESSAGE_SIZE {
496 sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
497 context: ErrorContext::DefaultImpl,
498 kind: e.into(),
499 })?;
500 } else {
501 sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
502 context: ErrorContext::DefaultImpl,
503 kind: e.into(),
504 })?;
505 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
506 sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
507 context: ErrorContext::DefaultImpl,
508 kind: e.into(),
509 })?;
510 }
511 }
512 Ok(())
513 })
514 }
515
516 #[cfg(feature = "tokio-tungstenite021")]
517 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
518 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
520 let packet = websocket.read().map_err(|e| ReadError {
521 context: ErrorContext::DefaultImpl,
522 kind: e.into(),
523 })?;
524 match packet {
525 tungstenite021::Message::Text(data) => match data.chars().next() {
526 Some('m') => {
527 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
528 context: ErrorContext::DefaultImpl,
529 kind: e.into(),
530 })?;
531 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
532 context: ErrorContext::DefaultImpl,
533 kind: e.into(),
534 })?;
535 while buf.len() < len {
536 let packet = websocket.read().map_err(|e| ReadError {
537 context: ErrorContext::DefaultImpl,
538 kind: e.into(),
539 })?;
540 if let tungstenite021::Message::Binary(data) = packet {
541 buf.extend_from_slice(&data);
542 } else {
543 return Err(ReadError {
544 context: ErrorContext::DefaultImpl,
545 kind: ReadErrorKind::MessageKind021(packet),
546 })
547 }
548 }
549 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
550 context: ErrorContext::WebSocket {
551 source: Box::new(context),
552 },
553 kind,
554 })
555 }
556 _ => return Err(ReadError {
557 context: ErrorContext::DefaultImpl,
558 kind: ReadErrorKind::WebSocketTextMessage024(data),
559 }),
560 },
561 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
562 context: ErrorContext::WebSocket {
563 source: Box::new(context),
564 },
565 kind,
566 }),
567 _ => Err(ReadError {
568 context: ErrorContext::DefaultImpl,
569 kind: ReadErrorKind::MessageKind021(packet),
570 }),
571 }
572 }
573
574 #[cfg(feature = "tokio-tungstenite024")]
575 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
576 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
578 let packet = websocket.read().map_err(|e| ReadError {
579 context: ErrorContext::DefaultImpl,
580 kind: e.into(),
581 })?;
582 match packet {
583 tungstenite024::Message::Text(data) => match data.chars().next() {
584 Some('m') => {
585 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
586 context: ErrorContext::DefaultImpl,
587 kind: e.into(),
588 })?;
589 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
590 context: ErrorContext::DefaultImpl,
591 kind: e.into(),
592 })?;
593 while buf.len() < len {
594 let packet = websocket.read().map_err(|e| ReadError {
595 context: ErrorContext::DefaultImpl,
596 kind: e.into(),
597 })?;
598 if let tungstenite024::Message::Binary(data) = packet {
599 buf.extend_from_slice(&data);
600 } else {
601 return Err(ReadError {
602 context: ErrorContext::DefaultImpl,
603 kind: ReadErrorKind::MessageKind024(packet),
604 })
605 }
606 }
607 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
608 context: ErrorContext::WebSocket {
609 source: Box::new(context),
610 },
611 kind,
612 })
613 }
614 _ => return Err(ReadError {
615 context: ErrorContext::DefaultImpl,
616 kind: ReadErrorKind::WebSocketTextMessage024(data),
617 }),
618 },
619 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
620 context: ErrorContext::WebSocket {
621 source: Box::new(context),
622 },
623 kind,
624 }),
625 _ => Err(ReadError {
626 context: ErrorContext::DefaultImpl,
627 kind: ReadErrorKind::MessageKind024(packet),
628 }),
629 }
630 }
631
632 #[cfg(feature = "tokio-tungstenite027")]
633 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
634 fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
636 let packet = websocket.read().map_err(|e| ReadError {
637 context: ErrorContext::DefaultImpl,
638 kind: e.into(),
639 })?;
640 match packet {
641 tungstenite027::Message::Text(data) => match data.chars().next() {
642 Some('m') => {
643 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
644 context: ErrorContext::DefaultImpl,
645 kind: e.into(),
646 })?;
647 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
648 context: ErrorContext::DefaultImpl,
649 kind: e.into(),
650 })?;
651 while buf.len() < len {
652 let packet = websocket.read().map_err(|e| ReadError {
653 context: ErrorContext::DefaultImpl,
654 kind: e.into(),
655 })?;
656 if let tungstenite027::Message::Binary(data) = packet {
657 buf.extend_from_slice(&data);
658 } else {
659 return Err(ReadError {
660 context: ErrorContext::DefaultImpl,
661 kind: ReadErrorKind::MessageKind027(packet),
662 })
663 }
664 }
665 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
666 context: ErrorContext::WebSocket {
667 source: Box::new(context),
668 },
669 kind,
670 })
671 }
672 _ => return Err(ReadError {
673 context: ErrorContext::DefaultImpl,
674 kind: ReadErrorKind::WebSocketTextMessage027(data),
675 }),
676 },
677 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
678 context: ErrorContext::WebSocket {
679 source: Box::new(context),
680 },
681 kind,
682 }),
683 _ => Err(ReadError {
684 context: ErrorContext::DefaultImpl,
685 kind: ReadErrorKind::MessageKind027(packet),
686 }),
687 }
688 }
689
690 #[cfg(feature = "tokio-tungstenite021")]
691 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
692 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
694 let mut buf = Vec::default();
695 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
696 context: ErrorContext::WebSocket {
697 source: Box::new(context),
698 },
699 kind,
700 })?;
701 if buf.len() <= WS_MAX_MESSAGE_SIZE {
702 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
703 context: ErrorContext::DefaultImpl,
704 kind: e.into(),
705 })?;
706 } else {
707 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
708 context: ErrorContext::DefaultImpl,
709 kind: e.into(),
710 })?;
711 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
712 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
713 context: ErrorContext::DefaultImpl,
714 kind: e.into(),
715 })?;
716 }
717 }
718 websocket.flush().map_err(|e| WriteError {
719 context: ErrorContext::DefaultImpl,
720 kind: e.into(),
721 })?;
722 Ok(())
723 }
724
725 #[cfg(feature = "tokio-tungstenite024")]
726 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
727 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
729 let mut buf = Vec::default();
730 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
731 context: ErrorContext::WebSocket {
732 source: Box::new(context),
733 },
734 kind,
735 })?;
736 if buf.len() <= WS_MAX_MESSAGE_SIZE {
737 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
738 context: ErrorContext::DefaultImpl,
739 kind: e.into(),
740 })?;
741 } else {
742 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
743 context: ErrorContext::DefaultImpl,
744 kind: e.into(),
745 })?;
746 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
747 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
748 context: ErrorContext::DefaultImpl,
749 kind: e.into(),
750 })?;
751 }
752 }
753 websocket.flush().map_err(|e| WriteError {
754 context: ErrorContext::DefaultImpl,
755 kind: e.into(),
756 })?;
757 Ok(())
758 }
759
760 #[cfg(feature = "tokio-tungstenite027")]
761 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
762 fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
764 let mut buf = Vec::default();
765 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
766 context: ErrorContext::WebSocket {
767 source: Box::new(context),
768 },
769 kind,
770 })?;
771 if buf.len() <= WS_MAX_MESSAGE_SIZE {
772 websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
773 context: ErrorContext::DefaultImpl,
774 kind: e.into(),
775 })?;
776 } else {
777 websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
778 context: ErrorContext::DefaultImpl,
779 kind: e.into(),
780 })?;
781 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
782 websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
783 context: ErrorContext::DefaultImpl,
784 kind: e.into(),
785 })?;
786 }
787 }
788 websocket.flush().map_err(|e| WriteError {
789 context: ErrorContext::DefaultImpl,
790 kind: e.into(),
791 })?;
792 Ok(())
793 }
794
795 #[cfg(feature = "tokio-tungstenite021")]
796 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
797 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
801 Box::pin(async move {
802 let value = Self::read_ws021(&mut stream).await?;
803 Ok((stream, value))
804 })
805 }
806
807 #[cfg(feature = "tokio-tungstenite024")]
808 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
809 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
813 Box::pin(async move {
814 let value = Self::read_ws024(&mut stream).await?;
815 Ok((stream, value))
816 })
817 }
818
819 #[cfg(feature = "tokio-tungstenite027")]
820 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
821 fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
825 Box::pin(async move {
826 let value = Self::read_ws027(&mut stream).await?;
827 Ok((stream, value))
828 })
829 }
830}
831
832pub trait LengthPrefixed: Protocol {
836 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
838 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
840 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
842 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
844}
845
846#[cfg(feature = "tokio-tungstenite021")]
850#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
851pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
852 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
853 let (sink, stream) = sock.split();
854 Ok((
855 sink.sink_map_err(|e| WriteError {
856 context: ErrorContext::WebSocketSink,
857 kind: e.into(),
858 }).with_flat_map::<W, _, _>(|msg| {
859 let mut buf = Vec::default();
860 match msg.write_sync(&mut buf) {
861 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
862 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
863 } else {
864 Either::Right(stream::iter(
865 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
866 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
867 .collect::<Vec<_>>()
868 ))
869 }.map(Ok)),
870 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
871 context: ErrorContext::WebSocket {
872 source: Box::new(context),
873 },
874 kind,
875 }))),
876 }
877 }),
878 stream.scan(None, |state, res| {
879 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
880 let packet = res.map_err(|e| ReadError {
881 context: ErrorContext::WebSocketStream,
882 kind: e.into(),
883 })?;
884 Ok(if let Some((len, buf)) = state {
885 if let tungstenite021::Message::Binary(data) = packet {
886 buf.extend_from_slice(&data);
887 } else {
888 return Err(ReadError {
889 context: ErrorContext::DefaultImpl,
890 kind: ReadErrorKind::MessageKind021(packet),
891 })
892 }
893 if buf.len() >= *len {
894 let buf = mem::take(buf);
895 *state = None;
896 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
897 context: ErrorContext::WebSocket {
898 source: Box::new(context),
899 },
900 kind,
901 })?)))
902 } else {
903 Either::Left(stream::empty())
904 }
905 } else {
906 match packet {
907 tungstenite021::Message::Text(data) => match data.chars().next() {
908 Some('m') => {
909 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
910 context: ErrorContext::DefaultImpl,
911 kind: e.into(),
912 })?;
913 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
914 context: ErrorContext::DefaultImpl,
915 kind: e.into(),
916 })?;
917 *state = Some((len, buf));
918 Either::Left(stream::empty())
919 }
920 _ => return Err(ReadError {
921 context: ErrorContext::DefaultImpl,
922 kind: ReadErrorKind::WebSocketTextMessage024(data),
923 }),
924 },
925 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
926 context: ErrorContext::WebSocket {
927 source: Box::new(context),
928 },
929 kind,
930 })?))),
931 _ => return Err(ReadError {
932 context: ErrorContext::DefaultImpl,
933 kind: ReadErrorKind::MessageKind021(packet),
934 }),
935 }
936 })
937 }
938
939 future::ready(Some(scanner(state, res)))
940 }).try_flatten(),
941 ))
942}
943
944#[cfg(feature = "tokio-tungstenite024")]
948#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
949pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
950 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
951 let (sink, stream) = sock.split();
952 Ok((
953 sink.sink_map_err(|e| WriteError {
954 context: ErrorContext::WebSocketSink,
955 kind: e.into(),
956 }).with_flat_map::<W, _, _>(|msg| {
957 let mut buf = Vec::default();
958 match msg.write_sync(&mut buf) {
959 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
960 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
961 } else {
962 Either::Right(stream::iter(
963 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
964 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
965 .collect::<Vec<_>>()
966 ))
967 }.map(Ok)),
968 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
969 context: ErrorContext::WebSocket {
970 source: Box::new(context),
971 },
972 kind,
973 }))),
974 }
975 }),
976 stream.scan(None, |state, res| {
977 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
978 let packet = res.map_err(|e| ReadError {
979 context: ErrorContext::WebSocketStream,
980 kind: e.into(),
981 })?;
982 Ok(if let Some((len, buf)) = state {
983 if let tungstenite024::Message::Binary(data) = packet {
984 buf.extend_from_slice(&data);
985 } else {
986 return Err(ReadError {
987 context: ErrorContext::DefaultImpl,
988 kind: ReadErrorKind::MessageKind024(packet),
989 })
990 }
991 if buf.len() >= *len {
992 let buf = mem::take(buf);
993 *state = None;
994 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
995 context: ErrorContext::WebSocket {
996 source: Box::new(context),
997 },
998 kind,
999 })?)))
1000 } else {
1001 Either::Left(stream::empty())
1002 }
1003 } else {
1004 match packet {
1005 tungstenite024::Message::Text(data) => match data.chars().next() {
1006 Some('m') => {
1007 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1008 context: ErrorContext::DefaultImpl,
1009 kind: e.into(),
1010 })?;
1011 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1012 context: ErrorContext::DefaultImpl,
1013 kind: e.into(),
1014 })?;
1015 *state = Some((len, buf));
1016 Either::Left(stream::empty())
1017 }
1018 _ => return Err(ReadError {
1019 context: ErrorContext::DefaultImpl,
1020 kind: ReadErrorKind::WebSocketTextMessage024(data),
1021 }),
1022 },
1023 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1024 context: ErrorContext::WebSocket {
1025 source: Box::new(context),
1026 },
1027 kind,
1028 })?))),
1029 _ => return Err(ReadError {
1030 context: ErrorContext::DefaultImpl,
1031 kind: ReadErrorKind::MessageKind024(packet),
1032 }),
1033 }
1034 })
1035 }
1036
1037 future::ready(Some(scanner(state, res)))
1038 }).try_flatten(),
1039 ))
1040}
1041
1042#[cfg(feature = "tokio-tungstenite027")]
1046#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1047pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1048 let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1049 let (sink, stream) = sock.split();
1050 Ok((
1051 sink.sink_map_err(|e| WriteError {
1052 context: ErrorContext::WebSocketSink,
1053 kind: e.into(),
1054 }).with_flat_map::<W, _, _>(|msg| {
1055 let mut buf = Vec::default();
1056 match msg.write_sync(&mut buf) {
1057 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1058 Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1059 } else {
1060 Either::Right(stream::iter(
1061 iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1062 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1063 .collect::<Vec<_>>()
1064 ))
1065 }.map(Ok)),
1066 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1067 context: ErrorContext::WebSocket {
1068 source: Box::new(context),
1069 },
1070 kind,
1071 }))),
1072 }
1073 }),
1074 stream.scan(None, |state, res| {
1075 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1076 let packet = res.map_err(|e| ReadError {
1077 context: ErrorContext::WebSocketStream,
1078 kind: e.into(),
1079 })?;
1080 Ok(if let Some((len, buf)) = state {
1081 if let tungstenite027::Message::Binary(data) = packet {
1082 buf.extend_from_slice(&data);
1083 } else {
1084 return Err(ReadError {
1085 context: ErrorContext::DefaultImpl,
1086 kind: ReadErrorKind::MessageKind027(packet),
1087 })
1088 }
1089 if buf.len() >= *len {
1090 let buf = mem::take(buf);
1091 *state = None;
1092 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1093 context: ErrorContext::WebSocket {
1094 source: Box::new(context),
1095 },
1096 kind,
1097 })?)))
1098 } else {
1099 Either::Left(stream::empty())
1100 }
1101 } else {
1102 match packet {
1103 tungstenite027::Message::Text(data) => match data.chars().next() {
1104 Some('m') => {
1105 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1106 context: ErrorContext::DefaultImpl,
1107 kind: e.into(),
1108 })?;
1109 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1110 context: ErrorContext::DefaultImpl,
1111 kind: e.into(),
1112 })?;
1113 *state = Some((len, buf));
1114 Either::Left(stream::empty())
1115 }
1116 _ => return Err(ReadError {
1117 context: ErrorContext::DefaultImpl,
1118 kind: ReadErrorKind::WebSocketTextMessage027(data),
1119 }),
1120 },
1121 tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1122 context: ErrorContext::WebSocket {
1123 source: Box::new(context),
1124 },
1125 kind,
1126 })?))),
1127 _ => return Err(ReadError {
1128 context: ErrorContext::DefaultImpl,
1129 kind: ReadErrorKind::MessageKind027(packet),
1130 }),
1131 }
1132 })
1133 }
1134
1135 future::ready(Some(scanner(state, res)))
1136 }).try_flatten(),
1137 ))
1138}