1use streaming::{Message, Body};
9use futures::sync::mpsc;
10use futures::{Future, Poll, Async, Stream, Sink, AsyncSink, StartSend};
11use std::collections::hash_map::Entry;
12use std::collections::{HashMap, VecDeque};
13use std::{fmt, io};
14use super::frame_buf::{FrameBuf, FrameDeque};
15use super::{Frame, RequestId, Transport};
16use buffer_one::BufferOne;
17
18const MAX_BUFFERED_FRAMES: usize = 128;
37
38pub struct Multiplex<T> where T: Dispatch {
44 run: bool,
46
47 made_progress: bool,
49
50 blocked_on_dispatch: bool,
52
53 blocked_on_flush: WriteState,
55
56 dispatch: BufferOne<DispatchSink<T>>,
58
59 exchanges: HashMap<RequestId, Exchange<T>>,
61
62 is_flushed: bool,
64
65 dispatch_deque: VecDeque<RequestId>,
67
68 frame_buf: FrameBuf<Option<Result<T::BodyOut, T::Error>>>,
70
71 scratch: Vec<RequestId>,
73}
74
75impl<T> fmt::Debug for Multiplex<T>
76 where T: Dispatch + fmt::Debug,
77 T::In: fmt::Debug,
78 T::Out: fmt::Debug,
79 T::BodyIn: fmt::Debug,
80 T::BodyOut: fmt::Debug,
81 T::Error: fmt::Debug,
82 T::Stream: fmt::Debug,
83{
84 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85 f.debug_struct("Multiplex")
86 .field("run", &self.run)
87 .field("made_progress", &self.made_progress)
88 .field("blocked_on_dispatch", &self.blocked_on_dispatch)
89 .field("dispatch", &self.dispatch)
90 .field("exhanges", &self.exchanges)
91 .field("is_flushed", &self.is_flushed)
92 .field("dispatch_deque", &self.dispatch_deque)
93 .field("frame_buf", &"FrameBuf { ... }")
94 .field("scratch", &self.scratch)
95 .finish()
96 }
97}
98
99#[derive(Debug)]
100struct DispatchSink<T> {
101 inner: T,
102}
103
104type BodySender<B, E> = mpsc::Sender<Result<B, E>>;
105
106struct Exchange<T: Dispatch> {
108 request: Request<T>,
118
119 responded: bool,
121
122 out_body: Option<BodySender<T::BodyOut, T::Error>>,
124
125 out_deque: FrameDeque<Option<Result<T::BodyOut, T::Error>>>,
127
128 out_is_ready: bool,
136
137 in_body: Option<T::Stream>,
139}
140
141#[derive(Debug)]
142enum Request<T: Dispatch> {
143 In, Out(Option<Message<T::Out, Body<T::BodyOut, T::Error>>>),
145}
146
147#[derive(Debug, PartialEq, Eq, Clone, Copy)]
148enum WriteState {
149 NoWrite,
150 Wrote,
151 Blocked,
152}
153
154#[derive(Debug)]
156pub struct MultiplexMessage<T, B, E> {
157 pub id: RequestId,
159 pub message: Result<Message<T, B>, E>,
161 pub solo: bool,
163}
164
165pub trait Dispatch {
167 type Io;
169
170 type In;
172
173 type BodyIn;
175
176 type Out;
178
179 type BodyOut;
181
182 type Error: From<io::Error>;
184
185 type Stream: Stream<Item = Self::BodyIn, Error = Self::Error>;
187
188 type Transport: Transport<Self::BodyOut,
190 Item = Frame<Self::Out, Self::BodyOut, Self::Error>,
191 SinkItem = Frame<Self::In, Self::BodyIn, Self::Error>>;
192
193 fn transport(&mut self) -> &mut Self::Transport;
195
196 fn poll(&mut self) -> Poll<Option<MultiplexMessage<Self::In, Self::Stream, Self::Error>>, io::Error>;
198
199 fn poll_ready(&self) -> Async<()>;
201
202 fn dispatch(&mut self, message: MultiplexMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()>;
204
205 fn cancel(&mut self, request_id: RequestId) -> io::Result<()>;
207}
208
209impl<T> Multiplex<T> where T: Dispatch {
216 pub fn new(dispatch: T) -> Multiplex<T> {
219 let dispatch = DispatchSink { inner: dispatch };
221
222 let dispatch = BufferOne::new(dispatch);
224
225 let frame_buf = FrameBuf::with_capacity(MAX_BUFFERED_FRAMES);
226
227 Multiplex {
228 run: true,
229 made_progress: false,
230 blocked_on_dispatch: false,
231 blocked_on_flush: WriteState::NoWrite,
232 dispatch: dispatch,
233 exchanges: HashMap::new(),
234 is_flushed: true,
235 dispatch_deque: VecDeque::new(),
236 frame_buf: frame_buf,
237 scratch: vec![],
238 }
239 }
240
241 fn is_done(&self) -> bool {
243 !self.run && self.is_flushed && self.exchanges.len() == 0
244 }
245
246 fn flush_dispatch_deque(&mut self) -> io::Result<()> {
248 while self.dispatch.get_mut().inner.poll_ready().is_ready() {
249 let id = match self.dispatch_deque.pop_front() {
250 Some(id) => id,
251 None => return Ok(()),
252 };
253
254 let exchange = match self.exchanges.get_mut(&id) {
256 Some(exchange) => exchange,
257 None => continue,
258 };
259
260 if let Some(message) = exchange.take_buffered_out_request() {
261 let message = MultiplexMessage {
262 id: id,
263 message: Ok(message),
264 solo: exchange.responded,
265 };
266
267 try!(self.dispatch.get_mut().inner.dispatch(message));
268 }
269 }
270
271 self.blocked_on_dispatch = true;
273
274 Ok(())
275 }
276
277 fn flush_out_bodies(&mut self) -> io::Result<()> {
279 trace!("flush out bodies");
280
281 self.scratch.clear();
282
283 for (id, exchange) in self.exchanges.iter_mut() {
284 trace!(" --> request={}", id);
285 try!(exchange.flush_out_body());
286
287 if exchange.is_complete() {
289 self.scratch.push(*id);
290 }
291 }
292
293 for id in &self.scratch {
295 trace!("drop exchange; id={}", id);
296 self.exchanges.remove(id);
297 }
298
299 Ok(())
300 }
301
302 fn read_out_frames(&mut self) -> io::Result<()> {
304 while self.run {
305 if let Async::Ready(frame) = try!(self.dispatch.get_mut().inner.transport().poll()) {
308 try!(self.process_out_frame(frame));
309 } else {
310 break;
311 }
312 }
313
314 Ok(())
315 }
316
317 fn process_out_frame(&mut self,
319 frame: Option<Frame<T::Out, T::BodyOut, T::Error>>)
320 -> io::Result<()> {
321 trace!("Multiplex::process_out_frame");
322
323 match frame {
324 Some(Frame::Message { id, message, body, solo }) => {
325 if body {
326 let (tx, rx) = Body::pair();
327 let message = Message::WithBody(message, rx);
328
329 try!(self.process_out_message(id, message, Some(tx), solo));
330 } else {
331 let message = Message::WithoutBody(message);
332
333 try!(self.process_out_message(id, message, None, solo));
334 }
335 }
336 Some(Frame::Body { id, chunk }) => {
337 trace!(" --> read out body chunk");
338 self.process_out_body_chunk(id, Ok(chunk));
339 }
340 Some(Frame::Error { id, error }) => {
341 try!(self.process_out_err(id, error));
342 }
343 None => {
344 trace!("read None");
345 self.run = false;
347 }
348 }
349
350 Ok(())
351 }
352
353 fn process_out_message(&mut self,
355 id: RequestId,
356 message: Message<T::Out, Body<T::BodyOut, T::Error>>,
357 body: Option<mpsc::Sender<Result<T::BodyOut, T::Error>>>,
358 solo: bool)
359 -> io::Result<()>
360 {
361 trace!(" --> process message; body={:?}", body.is_some());
362
363 match self.exchanges.entry(id) {
364 Entry::Occupied(mut e) => {
365 assert!(!e.get().responded, "invalid exchange state");
366 assert!(e.get().is_inbound());
367
368 try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage {
373 id: id,
374 message: Ok(message),
375 solo: solo,
376 }));
377
378 e.get_mut().responded = true;
380
381 e.get_mut().out_body = body;
383
384 if e.get().is_complete() {
386 e.remove();
387 }
388 }
389 Entry::Vacant(e) => {
390 if self.dispatch.get_mut().inner.poll_ready().is_ready() {
391 trace!(" --> dispatch ready -- dispatching");
392
393 assert!(self.dispatch_deque.is_empty());
395
396 let mut exchange = Exchange::new(
398 Request::Out(None),
399 self.frame_buf.deque());
400
401 exchange.out_body = body;
402
403 exchange.set_expect_response(solo);
405
406 if !exchange.is_complete() {
407 e.insert(exchange);
409 }
410
411 try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage {
413 id: id,
414 message: Ok(message),
415 solo: solo,
416 }));
417 } else {
418 trace!(" --> dispatch not ready");
419
420 self.blocked_on_dispatch = true;
421
422 let mut exchange = Exchange::new(
424 Request::Out(Some(message)),
425 self.frame_buf.deque());
426
427 exchange.out_body = body;
428
429 exchange.set_expect_response(solo);
431
432 assert!(!exchange.is_complete());
433
434 e.insert(exchange);
436
437 self.dispatch_deque.push_back(id);
439 }
440 }
441 }
442
443 Ok(())
444 }
445
446 fn process_out_err(&mut self, id: RequestId, err: T::Error) -> io::Result<()> {
448 trace!(" --> process error frame");
449
450 let mut remove = false;
451
452 if let Some(exchange) = self.exchanges.get_mut(&id) {
453 if !exchange.is_dispatched() {
454 remove = true;
457
458 assert!(exchange.out_body.is_none());
459 assert!(exchange.in_body.is_none());
460 } else if exchange.is_outbound() {
461 exchange.send_out_chunk(Err(err));
464
465 if !exchange.responded {
468 try!(self.dispatch.get_mut().inner.cancel(id));
469 }
470
471 remove = exchange.is_complete();
472 } else {
473 if !exchange.responded {
474 try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage::error(id, err)));
477
478 exchange.responded = true;
479 } else {
480 exchange.send_out_chunk(Err(err));
483 }
484
485 remove = exchange.is_complete();
486 }
487 } else {
488 trace!(" --> no in-flight exchange; dropping error");
489 }
490
491 if remove {
492 self.exchanges.remove(&id);
493 }
494
495 Ok(())
496 }
497
498 fn process_out_body_chunk(&mut self, id: RequestId, chunk: Result<Option<T::BodyOut>, T::Error>) {
499 trace!("process out body chunk; id={:?}", id);
500
501 {
502 let exchange = match self.exchanges.get_mut(&id) {
503 Some(v) => v,
504 _ => {
505 trace!(" --> exchange previously aborted; id={:?}", id);
506 return;
507 }
508 };
509
510 exchange.send_out_chunk(chunk);
511
512 if !exchange.is_complete() {
513 return;
514 }
515 }
516
517 trace!("dropping out body handle; id={:?}", id);
518 self.exchanges.remove(&id);
519 }
520
521 fn write_in_frames(&mut self) -> io::Result<()> {
522 try!(self.write_in_messages());
523 try!(self.write_in_body());
524 Ok(())
525 }
526
527 fn write_in_messages(&mut self) -> io::Result<()> {
528 trace!("write in messages");
529
530 while self.dispatch.poll_ready().is_ready() {
531 trace!(" --> polling for in frame");
532
533 match try!(self.dispatch.get_mut().inner.poll()) {
534 Async::Ready(Some(message)) => {
535 self.dispatch_made_progress();
536
537 match message.message {
538 Ok(m) => {
539 trace!(" --> got message");
540 try!(self.write_in_message(message.id, m, message.solo));
541 }
542 Err(error) => {
543 trace!(" --> got error");
544 try!(self.write_in_error(message.id, error));
545 }
546 }
547 }
548 Async::Ready(None) => {
549 trace!(" --> got error");
550 trace!(" --> got None");
551 break;
560 }
561 Async::NotReady => break,
563 }
564 }
565
566 trace!(" --> transport not ready");
567 self.blocked_on_flush.transport_not_write_ready();
568
569 Ok(())
570 }
571
572 fn write_in_message(&mut self,
573 id: RequestId,
574 message: Message<T::In, T::Stream>,
575 solo: bool)
576 -> io::Result<()>
577 {
578 let (message, body) = match message {
579 Message::WithBody(message, rx) => (message, Some(rx)),
580 Message::WithoutBody(message) => (message, None),
581 };
582
583 let frame = Frame::Message {
585 id: id,
586 message: message,
587 body: body.is_some(),
588 solo: solo,
589 };
590
591 try!(assert_send(&mut self.dispatch, frame));
593 self.blocked_on_flush.wrote_frame();
594
595 match self.exchanges.entry(id) {
596 Entry::Occupied(mut e) => {
597 assert!(!e.get().responded, "invalid exchange state");
598 assert!(e.get().is_outbound());
599 assert!(!solo);
600
601 e.get_mut().responded = true;
603
604 e.get_mut().in_body = body;
606
607 if e.get().is_complete() {
609 e.remove();
610 }
611 }
612 Entry::Vacant(e) => {
613 let mut exchange = Exchange::new(
615 Request::In,
616 self.frame_buf.deque());
617
618 exchange.in_body = body;
620 exchange.set_expect_response(solo);
621
622 if !exchange.is_complete() {
623 e.insert(exchange);
625 }
626 }
627 }
628
629 Ok(())
630 }
631
632 fn write_in_error(&mut self,
633 id: RequestId,
634 error: T::Error)
635 -> io::Result<()>
636 {
637 if let Entry::Occupied(mut e) = self.exchanges.entry(id) {
638 assert!(!e.get().responded, "exchange already responded");
639
640 e.get_mut().responded = true;
643 e.get_mut().out_body = None;
644 e.get_mut().in_body = None;
645 e.get_mut().out_deque.clear();
646
647 assert!(e.get().is_complete());
648
649 let frame = Frame::Error { id: id, error: error };
651 try!(assert_send(&mut self.dispatch, frame));
652 self.blocked_on_flush.wrote_frame();
653
654 e.remove();
655 } else {
656 trace!("exchange does not exist; id={:?}", id);
657 }
658
659 Ok(())
660 }
661
662 fn write_in_body(&mut self) -> io::Result<()> {
663 trace!("write in body chunks");
664
665 self.scratch.clear();
666
667 'outer:
669 for (&id, exchange) in &mut self.exchanges {
670 trace!(" --> checking request {:?}", id);
671
672 loop {
673 if !try!(self.dispatch.poll_complete()).is_ready() {
674 trace!(" --> blocked on transport");
675 self.blocked_on_flush.transport_not_write_ready();
676 break 'outer;
677 }
678
679 match exchange.try_poll_in_body() {
680 Ok(Async::Ready(Some(chunk))) => {
681 trace!(" --> got chunk");
682
683 let frame = Frame::Body { id: id, chunk: Some(chunk) };
684 try!(assert_send(&mut self.dispatch, frame));
685 self.blocked_on_flush.wrote_frame();
686 }
687 Ok(Async::Ready(None)) => {
688 trace!(" --> end of stream");
689
690 let frame = Frame::Body { id: id, chunk: None };
691 try!(assert_send(&mut self.dispatch, frame));
692 self.blocked_on_flush.wrote_frame();
693
694 exchange.in_body = None;
696 break;
697 }
698 Err(error) => {
699 trace!(" --> got error");
700
701 let frame = Frame::Error { id: id, error: error };
703 try!(assert_send(&mut self.dispatch, frame));
704 self.blocked_on_flush.wrote_frame();
705
706 exchange.responded = true;
707 exchange.in_body = None;
708 exchange.out_body = None;
709 exchange.out_deque.clear();
710
711 debug_assert!(exchange.is_complete());
712 break;
713 }
714 Ok(Async::NotReady) => {
715 trace!(" --> no pending chunks");
716 continue 'outer;
717 }
718 }
719 }
720
721 if exchange.is_complete() {
722 self.scratch.push(id);
723 }
724 }
725
726 for id in &self.scratch {
727 trace!("dropping in body handle; id={:?}", id);
728 self.exchanges.remove(id);
729 }
730
731 Ok(())
732 }
733
734 fn flush(&mut self) -> io::Result<()> {
735 self.is_flushed = try!(self.dispatch.poll_complete()).is_ready();
736
737 if self.is_flushed && self.blocked_on_flush == WriteState::Blocked {
743 self.made_progress = true;
744 }
745
746 Ok(())
747 }
748
749 fn reset_flags(&mut self) {
750 self.made_progress = false;
751 self.blocked_on_dispatch = false;
752 self.blocked_on_flush = WriteState::NoWrite;
753 }
754
755 fn dispatch_made_progress(&mut self) {
756 if self.blocked_on_dispatch {
757 self.made_progress = true;
758 }
759 }
760}
761
762impl<T> Future for Multiplex<T>
763 where T: Dispatch,
764{
765 type Item = ();
766 type Error = io::Error;
767
768 fn poll(&mut self) -> Poll<(), io::Error> {
770 trace!("Multiplex::tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~");
771
772 self.dispatch.get_mut().inner.transport().tick();
774
775 try!(self.flush_out_bodies());
780
781 self.made_progress = true;
783
784 while self.made_progress {
786 trace!("~~ multiplex primary loop tick ~~");
787
788 self.reset_flags();
790
791 try!(self.flush_dispatch_deque());
793
794 try!(self.read_out_frames());
796
797 try!(self.write_in_frames());
799
800 try!(self.flush());
802 }
803
804 if self.is_done() {
818 trace!("multiplex done; terminating");
819 return Ok(Async::Ready(()));
820 }
821
822 trace!("tick done; waiting for wake-up");
823
824 Ok(Async::NotReady)
826 }
827}
828
829impl<T: Dispatch> Drop for Multiplex<T> {
830 fn drop(&mut self) {
831 if !self.exchanges.is_empty() {
832 warn!("multiplexer dropping with in-flight exchanges");
833 }
834 }
835}
836
837impl<T: Dispatch> Exchange<T> {
838 fn new(request: Request<T>, deque: FrameDeque<Option<Result<T::BodyOut, T::Error>>>) -> Exchange<T> {
839 Exchange {
840 request: request,
841 responded: false,
842 out_body: None,
843 out_deque: deque,
844 out_is_ready: true,
845 in_body: None,
846 }
847 }
848
849 fn is_inbound(&self) -> bool {
850 match self.request {
851 Request::In => true,
852 Request::Out(_) => false,
853 }
854 }
855
856 fn is_outbound(&self) -> bool {
857 !self.is_inbound()
858 }
859
860 fn is_dispatched(&self) -> bool {
861 match self.request {
862 Request::Out(Some(_)) => false,
863 _ => true,
864 }
865 }
866
867 fn is_complete(&self) -> bool {
869 self.responded &&
872 self.out_body.is_none() &&
873 self.in_body.is_none() &&
874 self.request.is_none()
875 }
876
877 fn set_expect_response(&mut self, solo: bool) {
878 self.responded = solo;
879
880 if solo {
881 if self.is_inbound() {
882 assert!(self.out_body.is_none());
883 } else {
884 assert!(self.in_body.is_none());
885 }
886 }
887 }
888
889 fn take_buffered_out_request(&mut self) -> Option<Message<T::Out, Body<T::BodyOut, T::Error>>> {
891 match self.request {
892 Request::Out(ref mut request) => request.take(),
893 _ => None,
894 }
895 }
896
897 fn send_out_chunk(&mut self, chunk: Result<Option<T::BodyOut>, T::Error>) {
898 let chunk = match chunk {
900 Ok(Some(v)) => Some(Ok(v)),
901 Ok(None) => None,
902 Err(e) => Some(Err(e)),
903 };
904
905 {
907 let sender = match self.out_body {
908 Some(ref mut v) => v,
909 _ => {
910 return;
911 }
912 };
913
914 if self.out_is_ready {
915 trace!(" --> send chunk; end-of-stream={:?}", chunk.is_none());
916
917 if let Some(chunk) = chunk {
920 match sender.start_send(chunk) {
921 Ok(AsyncSink::Ready) => {
922 trace!(" --> ready for more");
923 return;
925 }
926 Ok(AsyncSink::NotReady(chunk)) => {
927 self.out_deque.push(Some(chunk));
929 self.out_is_ready = false;
930
931 return;
932 }
933 Err(_) => {
934 }
936 }
937 }
938
939 assert!(self.out_deque.is_empty());
940 } else {
941 trace!(" --> queueing chunk");
942
943 self.out_deque.push(chunk);
944 return;
945 }
946 }
947
948 self.out_is_ready = false;
949 self.out_body = None;
950 }
951
952 fn try_poll_in_body(&mut self) -> Poll<Option<T::BodyIn>, T::Error> {
953 match self.in_body {
954 Some(ref mut b) => b.poll(),
955 None => {
956 trace!(" !!! no in body??");
957 Ok(Async::NotReady)
958 }
959 }
960 }
961
962 fn flush_out_body(&mut self) -> io::Result<()> {
964 {
965 let sender = match self.out_body {
966 Some(ref mut sender) => sender,
967 None => {
968 assert!(self.out_deque.is_empty(), "pending out frames but no sender");
969 return Ok(());
970 }
971 };
972
973 self.out_is_ready = true;
974
975 loop {
976 let msg = match self.out_deque.pop() {
978 Some(Some(msg)) => msg,
979 Some(None) => break,
980 None => {
981 return Ok(());
983 }
984 };
985
986 let done = msg.is_err();
988
989 match sender.start_send(msg) {
990 Ok(AsyncSink::Ready) => {}
991 Ok(AsyncSink::NotReady(msg)) => {
992 trace!(" --> not ready");
993
994 self.out_deque.push_front(Some(msg));
996 self.out_is_ready = false;
997
998 return Ok(());
999 }
1000 Err(_) => {
1001 break;
1009 }
1010 }
1011
1012 if done {
1013 break
1014 }
1015 }
1016 }
1017
1018 self.out_deque.clear();
1020 self.out_is_ready = false;
1021 self.out_body = None;
1022
1023 Ok(())
1024 }
1025}
1026
1027impl<T> fmt::Debug for Exchange<T>
1028 where T: Dispatch + fmt::Debug,
1029 T::In: fmt::Debug,
1030 T::Out: fmt::Debug,
1031 T::BodyIn: fmt::Debug,
1032 T::BodyOut: fmt::Debug,
1033 T::Error: fmt::Debug,
1034 T::Stream: fmt::Debug,
1035{
1036 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1037 f.debug_struct("Exchange")
1038 .field("request", &self.request)
1039 .field("responded", &self.responded)
1040 .field("out_body", &"Sender { ... }")
1041 .field("out_deque", &"FrameDeque { ... }")
1042 .field("out_is_ready", &self.out_is_ready)
1043 .field("in_body", &self.in_body)
1044 .finish()
1045 }
1046}
1047
1048impl<T: Dispatch> Request<T> {
1049 fn is_none(&self) -> bool {
1050 match *self {
1051 Request::In => true,
1052 Request::Out(None) => true,
1053 _ => false,
1054 }
1055 }
1056}
1057
1058impl WriteState {
1059 fn transport_not_write_ready(&mut self) {
1060 if *self == WriteState::Wrote {
1061 *self = WriteState::Blocked;
1062 }
1063 }
1064
1065 fn wrote_frame(&mut self) {
1066 if *self == WriteState::NoWrite {
1067 *self = WriteState::Wrote;
1068 }
1069 }
1070}
1071
1072fn assert_send<T>(s: &mut T, item: T::SinkItem) -> Result<(), T::SinkError>
1073 where T: Sink
1074{
1075 match try!(s.start_send(item)) {
1076 AsyncSink::Ready => Ok(()),
1077 AsyncSink::NotReady(_) => {
1078 panic!("sink reported itself as ready after `poll_ready` but was \
1079 then unable to accept a message")
1080 }
1081 }
1082}
1083
1084impl<T, B, E> MultiplexMessage<T, B, E> {
1091 pub fn new(id: RequestId, message: Message<T, B>) -> MultiplexMessage<T, B, E> {
1093 MultiplexMessage {
1094 id: id,
1095 message: Ok(message),
1096 solo: false,
1097 }
1098 }
1099
1100 pub fn error(id: RequestId, error: E) -> MultiplexMessage<T, B, E> {
1102 MultiplexMessage {
1103 id: id,
1104 message: Err(error),
1105 solo: false,
1106 }
1107 }
1108}
1109
1110impl<T: Dispatch> Sink for DispatchSink<T> {
1111 type SinkItem = <T::Transport as Sink>::SinkItem;
1112 type SinkError = io::Error;
1113
1114 fn start_send(&mut self, item: Self::SinkItem)
1115 -> StartSend<Self::SinkItem, io::Error>
1116 {
1117 self.inner.transport().start_send(item)
1118 }
1119
1120 fn poll_complete(&mut self) -> Poll<(), io::Error> {
1121 self.inner.transport().poll_complete()
1122 }
1123
1124 fn close(&mut self) -> Poll<(), io::Error> {
1125 self.inner.transport().close()
1126 }
1127}