1#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
2#![forbid(unsafe_code)]
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6use {
40 std::{
41 future::Future,
42 io::{
43 self,
44 prelude::*,
45 },
46 pin::Pin,
47 },
48 tokio::io::{
49 AsyncRead,
50 AsyncWrite,
51 },
52};
53#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] use {
54 std::{
55 iter,
56 mem,
57 },
58 fallible_collections::FallibleVec,
59 futures::{
60 Sink,
61 SinkExt as _,
62 future::{
63 self,
64 Either,
65 },
66 stream::{
67 self,
68 Stream,
69 StreamExt as _,
70 TryStreamExt as _,
71 },
72 },
73};
74#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
75#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
76#[cfg(feature = "tokio-tungstenite027")] use tokio_tungstenite027::tungstenite as tungstenite027;
77pub use {
78 async_proto_derive::{
79 Protocol,
80 bitflags,
81 },
82 crate::error::*,
83};
84#[doc(hidden)] pub use tokio; mod error;
87mod impls;
88
89#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite027"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
91
92pub trait Protocol: Sized {
94 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
100 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
106 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
108 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
110
111 fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
115 Box::pin(async move {
116 let value = Self::read(&mut stream).await?;
117 Ok((stream, value))
118 })
119 }
120
121 fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
164 let mut temp_buf = vec![0; 8];
165 loop {
166 let mut slice = &mut &**buf;
167 match Self::read_sync(&mut slice) {
168 Ok(value) => {
169 let value_len = slice.len();
170 buf.drain(..buf.len() - value_len);
171 return Ok(Some(value))
172 }
173 Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
174 Err(e) => return Err(e),
175 }
176 match stream.read(&mut temp_buf) {
177 Ok(0) => return Err(ReadError {
178 context: ErrorContext::DefaultImpl,
179 kind: ReadErrorKind::EndOfStream,
180 }),
181 Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
182 Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
183 Err(e) => return Err(ReadError {
184 context: ErrorContext::DefaultImpl,
185 kind: e.into(),
186 }),
187 }
188 }
189 }
190
191 #[cfg(feature = "tokio-tungstenite021")]
192 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
193 fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
199 Box::pin(async move {
200 let packet = stream.try_next().await.map_err(|e| ReadError {
201 context: ErrorContext::DefaultImpl,
202 kind: e.into(),
203 })?.ok_or_else(|| ReadError {
204 context: ErrorContext::DefaultImpl,
205 kind: ReadErrorKind::EndOfStream,
206 })?;
207 match packet {
208 tungstenite021::Message::Text(data) => match data.chars().next() {
209 Some('m') => {
210 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
211 context: ErrorContext::DefaultImpl,
212 kind: e.into(),
213 })?;
214 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
215 context: ErrorContext::DefaultImpl,
216 kind: e.into(),
217 })?;
218 while buf.len() < len {
219 let packet = stream.try_next().await.map_err(|e| ReadError {
220 context: ErrorContext::DefaultImpl,
221 kind: e.into(),
222 })?.ok_or_else(|| ReadError {
223 context: ErrorContext::DefaultImpl,
224 kind: ReadErrorKind::EndOfStream,
225 })?;
226 if let tungstenite021::Message::Binary(data) = packet {
227 buf.extend_from_slice(&data);
228 } else {
229 return Err(ReadError {
230 context: ErrorContext::DefaultImpl,
231 kind: ReadErrorKind::MessageKind021(packet),
232 })
233 }
234 }
235 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
236 context: ErrorContext::WebSocket {
237 source: Box::new(context),
238 },
239 kind,
240 })
241 }
242 _ => Err(ReadError {
243 context: ErrorContext::DefaultImpl,
244 kind: ReadErrorKind::WebSocketTextMessage024(data),
245 }),
246 },
247 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
248 context: ErrorContext::WebSocket {
249 source: Box::new(context),
250 },
251 kind,
252 }),
253 _ => Err(ReadError {
254 context: ErrorContext::DefaultImpl,
255 kind: ReadErrorKind::MessageKind021(packet),
256 }),
257 }
258 })
259 }
260
261 #[cfg(feature = "tokio-tungstenite024")]
262 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
263 fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
269 Box::pin(async move {
270 let packet = stream.try_next().await.map_err(|e| ReadError {
271 context: ErrorContext::DefaultImpl,
272 kind: e.into(),
273 })?.ok_or_else(|| ReadError {
274 context: ErrorContext::DefaultImpl,
275 kind: ReadErrorKind::EndOfStream,
276 })?;
277 match packet {
278 tungstenite024::Message::Text(data) => match data.chars().next() {
279 Some('m') => {
280 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
281 context: ErrorContext::DefaultImpl,
282 kind: e.into(),
283 })?;
284 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
285 context: ErrorContext::DefaultImpl,
286 kind: e.into(),
287 })?;
288 while buf.len() < len {
289 let packet = stream.try_next().await.map_err(|e| ReadError {
290 context: ErrorContext::DefaultImpl,
291 kind: e.into(),
292 })?.ok_or_else(|| ReadError {
293 context: ErrorContext::DefaultImpl,
294 kind: ReadErrorKind::EndOfStream,
295 })?;
296 if let tungstenite024::Message::Binary(data) = packet {
297 buf.extend_from_slice(&data);
298 } else {
299 return Err(ReadError {
300 context: ErrorContext::DefaultImpl,
301 kind: ReadErrorKind::MessageKind024(packet),
302 })
303 }
304 }
305 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
306 context: ErrorContext::WebSocket {
307 source: Box::new(context),
308 },
309 kind,
310 })
311 }
312 _ => Err(ReadError {
313 context: ErrorContext::DefaultImpl,
314 kind: ReadErrorKind::WebSocketTextMessage024(data),
315 }),
316 },
317 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
318 context: ErrorContext::WebSocket {
319 source: Box::new(context),
320 },
321 kind,
322 }),
323 _ => Err(ReadError {
324 context: ErrorContext::DefaultImpl,
325 kind: ReadErrorKind::MessageKind024(packet),
326 }),
327 }
328 })
329 }
330
331 #[cfg(feature = "tokio-tungstenite027")]
332 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
333 fn read_ws027<'a, R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
339 Box::pin(async move {
340 let packet = stream.try_next().await.map_err(|e| ReadError {
341 context: ErrorContext::DefaultImpl,
342 kind: e.into(),
343 })?.ok_or_else(|| ReadError {
344 context: ErrorContext::DefaultImpl,
345 kind: ReadErrorKind::EndOfStream,
346 })?;
347 match packet {
348 tungstenite027::Message::Text(data) => match data.chars().next() {
349 Some('m') => {
350 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
351 context: ErrorContext::DefaultImpl,
352 kind: e.into(),
353 })?;
354 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
355 context: ErrorContext::DefaultImpl,
356 kind: e.into(),
357 })?;
358 while buf.len() < len {
359 let packet = stream.try_next().await.map_err(|e| ReadError {
360 context: ErrorContext::DefaultImpl,
361 kind: e.into(),
362 })?.ok_or_else(|| ReadError {
363 context: ErrorContext::DefaultImpl,
364 kind: ReadErrorKind::EndOfStream,
365 })?;
366 if let tungstenite027::Message::Binary(data) = packet {
367 buf.extend_from_slice(&data);
368 } else {
369 return Err(ReadError {
370 context: ErrorContext::DefaultImpl,
371 kind: ReadErrorKind::MessageKind027(packet),
372 })
373 }
374 }
375 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
376 context: ErrorContext::WebSocket {
377 source: Box::new(context),
378 },
379 kind,
380 })
381 }
382 _ => Err(ReadError {
383 context: ErrorContext::DefaultImpl,
384 kind: ReadErrorKind::WebSocketTextMessage027(data),
385 }),
386 },
387 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
388 context: ErrorContext::WebSocket {
389 source: Box::new(context),
390 },
391 kind,
392 }),
393 _ => Err(ReadError {
394 context: ErrorContext::DefaultImpl,
395 kind: ReadErrorKind::MessageKind027(packet),
396 }),
397 }
398 })
399 }
400
401 #[cfg(feature = "tokio-tungstenite021")]
402 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
403 fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
409 where Self: Sync {
410 Box::pin(async move {
411 let mut buf = Vec::default();
412 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
413 context: ErrorContext::WebSocket {
414 source: Box::new(context),
415 },
416 kind,
417 })?;
418 if buf.len() <= WS_MAX_MESSAGE_SIZE {
419 sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
420 context: ErrorContext::DefaultImpl,
421 kind: e.into(),
422 })?;
423 } else {
424 sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
425 context: ErrorContext::DefaultImpl,
426 kind: e.into(),
427 })?;
428 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
429 sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
430 context: ErrorContext::DefaultImpl,
431 kind: e.into(),
432 })?;
433 }
434 }
435 Ok(())
436 })
437 }
438
439 #[cfg(feature = "tokio-tungstenite024")]
440 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
441 fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
447 where Self: Sync {
448 Box::pin(async move {
449 let mut buf = Vec::default();
450 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
451 context: ErrorContext::WebSocket {
452 source: Box::new(context),
453 },
454 kind,
455 })?;
456 if buf.len() <= WS_MAX_MESSAGE_SIZE {
457 sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
458 context: ErrorContext::DefaultImpl,
459 kind: e.into(),
460 })?;
461 } else {
462 sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
463 context: ErrorContext::DefaultImpl,
464 kind: e.into(),
465 })?;
466 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
467 sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
468 context: ErrorContext::DefaultImpl,
469 kind: e.into(),
470 })?;
471 }
472 }
473 Ok(())
474 })
475 }
476
477 #[cfg(feature = "tokio-tungstenite027")]
478 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
479 fn write_ws027<'a, W: Sink<tungstenite027::Message, Error = tungstenite027::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
485 where Self: Sync {
486 Box::pin(async move {
487 let mut buf = Vec::default();
488 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
489 context: ErrorContext::WebSocket {
490 source: Box::new(context),
491 },
492 kind,
493 })?;
494 if buf.len() <= WS_MAX_MESSAGE_SIZE {
495 sink.send(tungstenite027::Message::binary(buf)).await.map_err(|e| WriteError {
496 context: ErrorContext::DefaultImpl,
497 kind: e.into(),
498 })?;
499 } else {
500 sink.send(tungstenite027::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
501 context: ErrorContext::DefaultImpl,
502 kind: e.into(),
503 })?;
504 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
505 sink.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
506 context: ErrorContext::DefaultImpl,
507 kind: e.into(),
508 })?;
509 }
510 }
511 Ok(())
512 })
513 }
514
515 #[cfg(feature = "tokio-tungstenite021")]
516 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
517 fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
519 let packet = websocket.read().map_err(|e| ReadError {
520 context: ErrorContext::DefaultImpl,
521 kind: e.into(),
522 })?;
523 match packet {
524 tungstenite021::Message::Text(data) => match data.chars().next() {
525 Some('m') => {
526 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
527 context: ErrorContext::DefaultImpl,
528 kind: e.into(),
529 })?;
530 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
531 context: ErrorContext::DefaultImpl,
532 kind: e.into(),
533 })?;
534 while buf.len() < len {
535 let packet = websocket.read().map_err(|e| ReadError {
536 context: ErrorContext::DefaultImpl,
537 kind: e.into(),
538 })?;
539 if let tungstenite021::Message::Binary(data) = packet {
540 buf.extend_from_slice(&data);
541 } else {
542 return Err(ReadError {
543 context: ErrorContext::DefaultImpl,
544 kind: ReadErrorKind::MessageKind021(packet),
545 })
546 }
547 }
548 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
549 context: ErrorContext::WebSocket {
550 source: Box::new(context),
551 },
552 kind,
553 })
554 }
555 _ => return Err(ReadError {
556 context: ErrorContext::DefaultImpl,
557 kind: ReadErrorKind::WebSocketTextMessage024(data),
558 }),
559 },
560 tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
561 context: ErrorContext::WebSocket {
562 source: Box::new(context),
563 },
564 kind,
565 }),
566 _ => Err(ReadError {
567 context: ErrorContext::DefaultImpl,
568 kind: ReadErrorKind::MessageKind021(packet),
569 }),
570 }
571 }
572
573 #[cfg(feature = "tokio-tungstenite024")]
574 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
575 fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
577 let packet = websocket.read().map_err(|e| ReadError {
578 context: ErrorContext::DefaultImpl,
579 kind: e.into(),
580 })?;
581 match packet {
582 tungstenite024::Message::Text(data) => match data.chars().next() {
583 Some('m') => {
584 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
585 context: ErrorContext::DefaultImpl,
586 kind: e.into(),
587 })?;
588 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
589 context: ErrorContext::DefaultImpl,
590 kind: e.into(),
591 })?;
592 while buf.len() < len {
593 let packet = websocket.read().map_err(|e| ReadError {
594 context: ErrorContext::DefaultImpl,
595 kind: e.into(),
596 })?;
597 if let tungstenite024::Message::Binary(data) = packet {
598 buf.extend_from_slice(&data);
599 } else {
600 return Err(ReadError {
601 context: ErrorContext::DefaultImpl,
602 kind: ReadErrorKind::MessageKind024(packet),
603 })
604 }
605 }
606 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
607 context: ErrorContext::WebSocket {
608 source: Box::new(context),
609 },
610 kind,
611 })
612 }
613 _ => return Err(ReadError {
614 context: ErrorContext::DefaultImpl,
615 kind: ReadErrorKind::WebSocketTextMessage024(data),
616 }),
617 },
618 tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
619 context: ErrorContext::WebSocket {
620 source: Box::new(context),
621 },
622 kind,
623 }),
624 _ => Err(ReadError {
625 context: ErrorContext::DefaultImpl,
626 kind: ReadErrorKind::MessageKind024(packet),
627 }),
628 }
629 }
630
631 #[cfg(feature = "tokio-tungstenite027")]
632 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
633 fn read_ws_sync027(websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
635 let packet = websocket.read().map_err(|e| ReadError {
636 context: ErrorContext::DefaultImpl,
637 kind: e.into(),
638 })?;
639 match packet {
640 tungstenite027::Message::Text(data) => match data.chars().next() {
641 Some('m') => {
642 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
643 context: ErrorContext::DefaultImpl,
644 kind: e.into(),
645 })?;
646 let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
647 context: ErrorContext::DefaultImpl,
648 kind: e.into(),
649 })?;
650 while buf.len() < len {
651 let packet = websocket.read().map_err(|e| ReadError {
652 context: ErrorContext::DefaultImpl,
653 kind: e.into(),
654 })?;
655 if let tungstenite027::Message::Binary(data) = packet {
656 buf.extend_from_slice(&data);
657 } else {
658 return Err(ReadError {
659 context: ErrorContext::DefaultImpl,
660 kind: ReadErrorKind::MessageKind027(packet),
661 })
662 }
663 }
664 Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
665 context: ErrorContext::WebSocket {
666 source: Box::new(context),
667 },
668 kind,
669 })
670 }
671 _ => return Err(ReadError {
672 context: ErrorContext::DefaultImpl,
673 kind: ReadErrorKind::WebSocketTextMessage027(data),
674 }),
675 },
676 tungstenite027::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
677 context: ErrorContext::WebSocket {
678 source: Box::new(context),
679 },
680 kind,
681 }),
682 _ => Err(ReadError {
683 context: ErrorContext::DefaultImpl,
684 kind: ReadErrorKind::MessageKind027(packet),
685 }),
686 }
687 }
688
689 #[cfg(feature = "tokio-tungstenite021")]
690 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
691 fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
693 let mut buf = Vec::default();
694 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
695 context: ErrorContext::WebSocket {
696 source: Box::new(context),
697 },
698 kind,
699 })?;
700 if buf.len() <= WS_MAX_MESSAGE_SIZE {
701 websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
702 context: ErrorContext::DefaultImpl,
703 kind: e.into(),
704 })?;
705 } else {
706 websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
707 context: ErrorContext::DefaultImpl,
708 kind: e.into(),
709 })?;
710 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
711 websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
712 context: ErrorContext::DefaultImpl,
713 kind: e.into(),
714 })?;
715 }
716 }
717 websocket.flush().map_err(|e| WriteError {
718 context: ErrorContext::DefaultImpl,
719 kind: e.into(),
720 })?;
721 Ok(())
722 }
723
724 #[cfg(feature = "tokio-tungstenite024")]
725 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
726 fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
728 let mut buf = Vec::default();
729 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
730 context: ErrorContext::WebSocket {
731 source: Box::new(context),
732 },
733 kind,
734 })?;
735 if buf.len() <= WS_MAX_MESSAGE_SIZE {
736 websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
737 context: ErrorContext::DefaultImpl,
738 kind: e.into(),
739 })?;
740 } else {
741 websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
742 context: ErrorContext::DefaultImpl,
743 kind: e.into(),
744 })?;
745 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
746 websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
747 context: ErrorContext::DefaultImpl,
748 kind: e.into(),
749 })?;
750 }
751 }
752 websocket.flush().map_err(|e| WriteError {
753 context: ErrorContext::DefaultImpl,
754 kind: e.into(),
755 })?;
756 Ok(())
757 }
758
759 #[cfg(feature = "tokio-tungstenite027")]
760 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
761 fn write_ws_sync027(&self, websocket: &mut tungstenite027::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
763 let mut buf = Vec::default();
764 self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
765 context: ErrorContext::WebSocket {
766 source: Box::new(context),
767 },
768 kind,
769 })?;
770 if buf.len() <= WS_MAX_MESSAGE_SIZE {
771 websocket.send(tungstenite027::Message::binary(buf)).map_err(|e| WriteError {
772 context: ErrorContext::DefaultImpl,
773 kind: e.into(),
774 })?;
775 } else {
776 websocket.send(tungstenite027::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
777 context: ErrorContext::DefaultImpl,
778 kind: e.into(),
779 })?;
780 for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
781 websocket.send(tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
782 context: ErrorContext::DefaultImpl,
783 kind: e.into(),
784 })?;
785 }
786 }
787 websocket.flush().map_err(|e| WriteError {
788 context: ErrorContext::DefaultImpl,
789 kind: e.into(),
790 })?;
791 Ok(())
792 }
793
794 #[cfg(feature = "tokio-tungstenite021")]
795 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
796 fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
800 Box::pin(async move {
801 let value = Self::read_ws021(&mut stream).await?;
802 Ok((stream, value))
803 })
804 }
805
806 #[cfg(feature = "tokio-tungstenite024")]
807 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
808 fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
812 Box::pin(async move {
813 let value = Self::read_ws024(&mut stream).await?;
814 Ok((stream, value))
815 })
816 }
817
818 #[cfg(feature = "tokio-tungstenite027")]
819 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
820 fn read_ws_owned027<R: Stream<Item = Result<tungstenite027::Message, tungstenite027::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
824 Box::pin(async move {
825 let value = Self::read_ws027(&mut stream).await?;
826 Ok((stream, value))
827 })
828 }
829}
830
831pub trait LengthPrefixed: Protocol {
835 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
837 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
839 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
841 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
843}
844
845#[cfg(feature = "tokio-tungstenite021")]
849#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
850pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
851 let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
852 let (sink, stream) = sock.split();
853 Ok((
854 sink.sink_map_err(|e| WriteError {
855 context: ErrorContext::WebSocketSink,
856 kind: e.into(),
857 }).with_flat_map::<W, _, _>(|msg| {
858 let mut buf = Vec::default();
859 match msg.write_sync(&mut buf) {
860 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
861 Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
862 } else {
863 Either::Right(stream::iter(
864 iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
865 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
866 .collect::<Vec<_>>()
867 ))
868 }.map(Ok)),
869 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
870 context: ErrorContext::WebSocket {
871 source: Box::new(context),
872 },
873 kind,
874 }))),
875 }
876 }),
877 stream.scan(None, |state, res| {
878 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
879 let packet = res.map_err(|e| ReadError {
880 context: ErrorContext::WebSocketStream,
881 kind: e.into(),
882 })?;
883 Ok(if let Some((len, buf)) = state {
884 if let tungstenite021::Message::Binary(data) = packet {
885 buf.extend_from_slice(&data);
886 } else {
887 return Err(ReadError {
888 context: ErrorContext::DefaultImpl,
889 kind: ReadErrorKind::MessageKind021(packet),
890 })
891 }
892 if buf.len() >= *len {
893 let buf = mem::take(buf);
894 *state = None;
895 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
896 context: ErrorContext::WebSocket {
897 source: Box::new(context),
898 },
899 kind,
900 })?)))
901 } else {
902 Either::Left(stream::empty())
903 }
904 } else {
905 match packet {
906 tungstenite021::Message::Text(data) => match data.chars().next() {
907 Some('m') => {
908 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
909 context: ErrorContext::DefaultImpl,
910 kind: e.into(),
911 })?;
912 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
913 context: ErrorContext::DefaultImpl,
914 kind: e.into(),
915 })?;
916 *state = Some((len, buf));
917 Either::Left(stream::empty())
918 }
919 _ => return Err(ReadError {
920 context: ErrorContext::DefaultImpl,
921 kind: ReadErrorKind::WebSocketTextMessage024(data),
922 }),
923 },
924 tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
925 context: ErrorContext::WebSocket {
926 source: Box::new(context),
927 },
928 kind,
929 })?))),
930 _ => return Err(ReadError {
931 context: ErrorContext::DefaultImpl,
932 kind: ReadErrorKind::MessageKind021(packet),
933 }),
934 }
935 })
936 }
937
938 future::ready(Some(scanner(state, res)))
939 }).try_flatten(),
940 ))
941}
942
943#[cfg(feature = "tokio-tungstenite024")]
947#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
948pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
949 let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
950 let (sink, stream) = sock.split();
951 Ok((
952 sink.sink_map_err(|e| WriteError {
953 context: ErrorContext::WebSocketSink,
954 kind: e.into(),
955 }).with_flat_map::<W, _, _>(|msg| {
956 let mut buf = Vec::default();
957 match msg.write_sync(&mut buf) {
958 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
959 Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
960 } else {
961 Either::Right(stream::iter(
962 iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
963 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
964 .collect::<Vec<_>>()
965 ))
966 }.map(Ok)),
967 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
968 context: ErrorContext::WebSocket {
969 source: Box::new(context),
970 },
971 kind,
972 }))),
973 }
974 }),
975 stream.scan(None, |state, res| {
976 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
977 let packet = res.map_err(|e| ReadError {
978 context: ErrorContext::WebSocketStream,
979 kind: e.into(),
980 })?;
981 Ok(if let Some((len, buf)) = state {
982 if let tungstenite024::Message::Binary(data) = packet {
983 buf.extend_from_slice(&data);
984 } else {
985 return Err(ReadError {
986 context: ErrorContext::DefaultImpl,
987 kind: ReadErrorKind::MessageKind024(packet),
988 })
989 }
990 if buf.len() >= *len {
991 let buf = mem::take(buf);
992 *state = None;
993 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
994 context: ErrorContext::WebSocket {
995 source: Box::new(context),
996 },
997 kind,
998 })?)))
999 } else {
1000 Either::Left(stream::empty())
1001 }
1002 } else {
1003 match packet {
1004 tungstenite024::Message::Text(data) => match data.chars().next() {
1005 Some('m') => {
1006 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1007 context: ErrorContext::DefaultImpl,
1008 kind: e.into(),
1009 })?;
1010 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1011 context: ErrorContext::DefaultImpl,
1012 kind: e.into(),
1013 })?;
1014 *state = Some((len, buf));
1015 Either::Left(stream::empty())
1016 }
1017 _ => return Err(ReadError {
1018 context: ErrorContext::DefaultImpl,
1019 kind: ReadErrorKind::WebSocketTextMessage024(data),
1020 }),
1021 },
1022 tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1023 context: ErrorContext::WebSocket {
1024 source: Box::new(context),
1025 },
1026 kind,
1027 })?))),
1028 _ => return Err(ReadError {
1029 context: ErrorContext::DefaultImpl,
1030 kind: ReadErrorKind::MessageKind024(packet),
1031 }),
1032 }
1033 })
1034 }
1035
1036 future::ready(Some(scanner(state, res)))
1037 }).try_flatten(),
1038 ))
1039}
1040
1041#[cfg(feature = "tokio-tungstenite027")]
1045#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite027")))]
1046pub async fn websocket027<R: Protocol, W: Protocol>(request: impl tungstenite027::client::IntoClientRequest + Unpin) -> tungstenite027::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
1047 let (sock, _) = tokio_tungstenite027::connect_async(request).await?;
1048 let (sink, stream) = sock.split();
1049 Ok((
1050 sink.sink_map_err(|e| WriteError {
1051 context: ErrorContext::WebSocketSink,
1052 kind: e.into(),
1053 }).with_flat_map::<W, _, _>(|msg| {
1054 let mut buf = Vec::default();
1055 match msg.write_sync(&mut buf) {
1056 Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
1057 Either::Left(stream::once(future::ready(tungstenite027::Message::binary(buf))))
1058 } else {
1059 Either::Right(stream::iter(
1060 iter::once(tungstenite027::Message::text(format!("m{}", buf.len())))
1061 .chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite027::Message::binary(tungstenite027::Bytes::copy_from_slice(chunk))))
1062 .collect::<Vec<_>>()
1063 ))
1064 }.map(Ok)),
1065 Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
1066 context: ErrorContext::WebSocket {
1067 source: Box::new(context),
1068 },
1069 kind,
1070 }))),
1071 }
1072 }),
1073 stream.scan(None, |state, res| {
1074 fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite027::Result<tungstenite027::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
1075 let packet = res.map_err(|e| ReadError {
1076 context: ErrorContext::WebSocketStream,
1077 kind: e.into(),
1078 })?;
1079 Ok(if let Some((len, buf)) = state {
1080 if let tungstenite027::Message::Binary(data) = packet {
1081 buf.extend_from_slice(&data);
1082 } else {
1083 return Err(ReadError {
1084 context: ErrorContext::DefaultImpl,
1085 kind: ReadErrorKind::MessageKind027(packet),
1086 })
1087 }
1088 if buf.len() >= *len {
1089 let buf = mem::take(buf);
1090 *state = None;
1091 Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
1092 context: ErrorContext::WebSocket {
1093 source: Box::new(context),
1094 },
1095 kind,
1096 })?)))
1097 } else {
1098 Either::Left(stream::empty())
1099 }
1100 } else {
1101 match packet {
1102 tungstenite027::Message::Text(data) => match data.chars().next() {
1103 Some('m') => {
1104 let len = data[1..].parse::<usize>().map_err(|e| ReadError {
1105 context: ErrorContext::DefaultImpl,
1106 kind: e.into(),
1107 })?;
1108 let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
1109 context: ErrorContext::DefaultImpl,
1110 kind: e.into(),
1111 })?;
1112 *state = Some((len, buf));
1113 Either::Left(stream::empty())
1114 }
1115 _ => return Err(ReadError {
1116 context: ErrorContext::DefaultImpl,
1117 kind: ReadErrorKind::WebSocketTextMessage027(data),
1118 }),
1119 },
1120 tungstenite027::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
1121 context: ErrorContext::WebSocket {
1122 source: Box::new(context),
1123 },
1124 kind,
1125 })?))),
1126 _ => return Err(ReadError {
1127 context: ErrorContext::DefaultImpl,
1128 kind: ReadErrorKind::MessageKind027(packet),
1129 }),
1130 }
1131 })
1132 }
1133
1134 future::ready(Some(scanner(state, res)))
1135 }).try_flatten(),
1136 ))
1137}