tokio_proto/streaming/multiplex/
advanced.rs

1//! Provides the substrate for implementing multiplexed, streaming protocols.
2//!
3//! In most cases, it's sufficient to work with `streaming::pipeline::{Client,
4//! Server}` instead. But for some advanced protocols in which the client and
5//! servers have more of a peer relationship, it's useful to work directly with
6//! these implementation details.
7
8use streaming::{Message, Body};
9use futures::sync::mpsc;
10use futures::{Future, Poll, Async, Stream, Sink, AsyncSink, StartSend};
11use std::collections::hash_map::Entry;
12use std::collections::{HashMap, VecDeque};
13use std::{fmt, io};
14use super::frame_buf::{FrameBuf, FrameDeque};
15use super::{Frame, RequestId, Transport};
16use buffer_one::BufferOne;
17
18/*
19 * TODO:
20 *
21 * - Handle errors correctly
22 *    * When the FramedIo returns an error, how is it handled?
23 *    * Is it sent to the dispatch?
24 *    * Is it sent to the body?
25 *    * What happens if there are in-flight *in* bodies
26 *    * What happens if the out message is buffered?
27 * - [BUG] Can only poll from body sender FutureSender in `flush`
28 * - Move constants to configuration settings
29 *
30 */
31
32/// The max number of buffered frames that the connection can support. Once
33/// this number is reached.
34///
35/// See module docs for more detail
36const MAX_BUFFERED_FRAMES: usize = 128;
37
38/// Task that drives multiplexed protocols
39///
40/// Provides protocol multiplexing functionality in a generic way over clients
41/// and servers. Used internally by `multiplex::Client` and
42/// `multiplex::Server`.
43pub struct Multiplex<T> where T: Dispatch {
44    // True as long as the connection has more request frames to read.
45    run: bool,
46
47    // Used to track if any operations make progress
48    made_progress: bool,
49
50    // True when blocked on dispatch
51    blocked_on_dispatch: bool,
52
53    // True when blocked on flush
54    blocked_on_flush: WriteState,
55
56    // Glues the service with the pipeline task
57    dispatch: BufferOne<DispatchSink<T>>,
58
59    // Tracks in-progress exchanges
60    exchanges: HashMap<RequestId, Exchange<T>>,
61
62    // True when the transport is fully flushed
63    is_flushed: bool,
64
65    // RequestIds of exchanges that have not yet been dispatched
66    dispatch_deque: VecDeque<RequestId>,
67
68    // Storage for buffered frames
69    frame_buf: FrameBuf<Option<Result<T::BodyOut, T::Error>>>,
70
71    // Temporary storage for RequestIds...
72    scratch: Vec<RequestId>,
73}
74
75impl<T> fmt::Debug for Multiplex<T>
76    where T: Dispatch + fmt::Debug,
77          T::In: fmt::Debug,
78          T::Out: fmt::Debug,
79          T::BodyIn: fmt::Debug,
80          T::BodyOut: fmt::Debug,
81          T::Error: fmt::Debug,
82          T::Stream: fmt::Debug,
83{
84    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85        f.debug_struct("Multiplex")
86            .field("run", &self.run)
87            .field("made_progress", &self.made_progress)
88            .field("blocked_on_dispatch", &self.blocked_on_dispatch)
89            .field("dispatch", &self.dispatch)
90            .field("exhanges", &self.exchanges)
91            .field("is_flushed", &self.is_flushed)
92            .field("dispatch_deque", &self.dispatch_deque)
93            .field("frame_buf", &"FrameBuf { ... }")
94            .field("scratch", &self.scratch)
95            .finish()
96    }
97}
98
99#[derive(Debug)]
100struct DispatchSink<T> {
101    inner: T,
102}
103
104type BodySender<B, E> = mpsc::Sender<Result<B, E>>;
105
106/// Manages the state of a single in / out exchange
107struct Exchange<T: Dispatch> {
108    // Tracks the direction of the request as well as potentially buffers the
109    // request message.
110    //
111    // The request message is only buffered when the dispatch is at capacity.
112    // This case *shouldn't* happen and if it does it indicates a poorly
113    // configured multiplex protocol or something a bit weird is happening.
114    //
115    // However, the world is full of multiplex protocols that don't have proper
116    // flow control, so the case needs to be handled.
117    request: Request<T>,
118
119    // True indicates that the response has been handled
120    responded: bool,
121
122    // The outbound body stream sender
123    out_body: Option<BodySender<T::BodyOut, T::Error>>,
124
125    // Buffers outbound body chunks until the sender is ready
126    out_deque: FrameDeque<Option<Result<T::BodyOut, T::Error>>>,
127
128    // Tracks if the sender is ready. This value is computed on each tick when
129    // the senders are flushed and before new frames are read.
130    //
131    // The reason readiness is tracked here is because if readiness changes
132    // during the progress of the multiplex tick, an outbound body chunk can't
133    // simply be dispatched. Order must be maintained, so any buffered outbound
134    // chunks must be dispatched first.
135    out_is_ready: bool,
136
137    // The inbound body stream receiver
138    in_body: Option<T::Stream>,
139}
140
141#[derive(Debug)]
142enum Request<T: Dispatch> {
143    In, // TODO: Handle inbound message buffering?
144    Out(Option<Message<T::Out, Body<T::BodyOut, T::Error>>>),
145}
146
147#[derive(Debug, PartialEq, Eq, Clone, Copy)]
148enum WriteState {
149    NoWrite,
150    Wrote,
151    Blocked,
152}
153
154/// Message used to communicate through the multiplex dispatch
155#[derive(Debug)]
156pub struct MultiplexMessage<T, B, E> {
157    /// Request ID
158    pub id: RequestId,
159    /// Message
160    pub message: Result<Message<T, B>, E>,
161    /// True if message has no pair (request / response)
162    pub solo: bool,
163}
164
165/// Dispatch messages from the transport to the service
166pub trait Dispatch {
167    /// Type of underlying I/O object
168    type Io;
169
170    /// Messages written to the transport
171    type In;
172
173    /// Inbound body frame
174    type BodyIn;
175
176    /// Messages read from the transport
177    type Out;
178
179    /// Outbound body frame
180    type BodyOut;
181
182    /// Transport error
183    type Error: From<io::Error>;
184
185    /// Inbound body stream type
186    type Stream: Stream<Item = Self::BodyIn, Error = Self::Error>;
187
188    /// Transport type
189    type Transport: Transport<Self::BodyOut,
190                              Item = Frame<Self::Out, Self::BodyOut, Self::Error>,
191                              SinkItem = Frame<Self::In, Self::BodyIn, Self::Error>>;
192
193    /// Mutable reference to the transport
194    fn transport(&mut self) -> &mut Self::Transport;
195
196    /// Poll the next available message
197    fn poll(&mut self) -> Poll<Option<MultiplexMessage<Self::In, Self::Stream, Self::Error>>, io::Error>;
198
199    /// The `Dispatch` is ready to accept another message
200    fn poll_ready(&self) -> Async<()>;
201
202    /// Process an out message
203    fn dispatch(&mut self, message: MultiplexMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()>;
204
205    /// Cancel interest in the exchange identified by RequestId
206    fn cancel(&mut self, request_id: RequestId) -> io::Result<()>;
207}
208
209/*
210 *
211 * ===== impl Multiplex =====
212 *
213 */
214
215impl<T> Multiplex<T> where T: Dispatch {
216    /// Create a new pipeline `Multiplex` dispatcher with the given service and
217    /// transport
218    pub fn new(dispatch: T) -> Multiplex<T> {
219        // Add `Sink` impl for `Dispatch`
220        let dispatch = DispatchSink { inner: dispatch };
221
222        // Add a single slot buffer for the sink
223        let dispatch = BufferOne::new(dispatch);
224
225        let frame_buf = FrameBuf::with_capacity(MAX_BUFFERED_FRAMES);
226
227        Multiplex {
228            run: true,
229            made_progress: false,
230            blocked_on_dispatch: false,
231            blocked_on_flush: WriteState::NoWrite,
232            dispatch: dispatch,
233            exchanges: HashMap::new(),
234            is_flushed: true,
235            dispatch_deque: VecDeque::new(),
236            frame_buf: frame_buf,
237            scratch: vec![],
238        }
239    }
240
241    /// Returns true if the multiplexer has nothing left to do
242    fn is_done(&self) -> bool {
243        !self.run && self.is_flushed && self.exchanges.len() == 0
244    }
245
246    /// Attempt to dispatch any outbound request messages
247    fn flush_dispatch_deque(&mut self) -> io::Result<()> {
248        while self.dispatch.get_mut().inner.poll_ready().is_ready() {
249            let id = match self.dispatch_deque.pop_front() {
250                Some(id) => id,
251                None => return Ok(()),
252            };
253
254            // Get the exchange
255            let exchange = match self.exchanges.get_mut(&id) {
256                Some(exchange) => exchange,
257                None => continue,
258            };
259
260            if let Some(message) = exchange.take_buffered_out_request() {
261                let message = MultiplexMessage {
262                    id: id,
263                    message: Ok(message),
264                    solo: exchange.responded,
265                };
266
267                try!(self.dispatch.get_mut().inner.dispatch(message));
268            }
269        }
270
271        // At this point, the task is blocked on the dispatcher
272        self.blocked_on_dispatch = true;
273
274        Ok(())
275    }
276
277    /// Dispatch any buffered outbound body frames to the sender
278    fn flush_out_bodies(&mut self) -> io::Result<()> {
279        trace!("flush out bodies");
280
281        self.scratch.clear();
282
283        for (id, exchange) in self.exchanges.iter_mut() {
284            trace!("   --> request={}", id);
285            try!(exchange.flush_out_body());
286
287            // If the exchange is complete, track it for removal
288            if exchange.is_complete() {
289                self.scratch.push(*id);
290            }
291        }
292
293        // Purge the scratch
294        for id in &self.scratch {
295            trace!("drop exchange; id={}", id);
296            self.exchanges.remove(id);
297        }
298
299        Ok(())
300    }
301
302    /// Read and process frames from transport
303    fn read_out_frames(&mut self) -> io::Result<()> {
304        while self.run {
305            // TODO: Only read frames if there is available space in the frame
306            // buffer
307            if let Async::Ready(frame) = try!(self.dispatch.get_mut().inner.transport().poll()) {
308                try!(self.process_out_frame(frame));
309            } else {
310                break;
311            }
312        }
313
314        Ok(())
315    }
316
317    /// Process outbound frame
318    fn process_out_frame(&mut self,
319                         frame: Option<Frame<T::Out, T::BodyOut, T::Error>>)
320                         -> io::Result<()> {
321        trace!("Multiplex::process_out_frame");
322
323        match frame {
324            Some(Frame::Message { id, message, body, solo }) => {
325                if body {
326                    let (tx, rx) = Body::pair();
327                    let message = Message::WithBody(message, rx);
328
329                    try!(self.process_out_message(id, message, Some(tx), solo));
330                } else {
331                    let message = Message::WithoutBody(message);
332
333                    try!(self.process_out_message(id, message, None, solo));
334                }
335            }
336            Some(Frame::Body { id, chunk }) => {
337                trace!("   --> read out body chunk");
338                self.process_out_body_chunk(id, Ok(chunk));
339            }
340            Some(Frame::Error { id, error }) => {
341                try!(self.process_out_err(id, error));
342            }
343            None => {
344                trace!("read None");
345                // TODO: Ensure all bodies have been completed
346                self.run = false;
347            }
348        }
349
350        Ok(())
351    }
352
353    /// Process an outbound message
354    fn process_out_message(&mut self,
355                           id: RequestId,
356                           message: Message<T::Out, Body<T::BodyOut, T::Error>>,
357                           body: Option<mpsc::Sender<Result<T::BodyOut, T::Error>>>,
358                           solo: bool)
359                           -> io::Result<()>
360    {
361        trace!("   --> process message; body={:?}", body.is_some());
362
363        match self.exchanges.entry(id) {
364            Entry::Occupied(mut e) => {
365                assert!(!e.get().responded, "invalid exchange state");
366                assert!(e.get().is_inbound());
367
368                // Dispatch the message. The dispatcher is not checked for
369                // readiness in this case. This is because the message is a
370                // response to a request initiated by the dispatch. It is
371                // assumed that dispatcher can always process responses.
372                try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage {
373                    id: id,
374                    message: Ok(message),
375                    solo: solo,
376                }));
377
378                // Track that the exchange has been responded to
379                e.get_mut().responded = true;
380
381                // Set the body sender
382                e.get_mut().out_body = body;
383
384                // If the exchange is complete, clean up resources
385                if e.get().is_complete() {
386                    e.remove();
387                }
388            }
389            Entry::Vacant(e) => {
390                if self.dispatch.get_mut().inner.poll_ready().is_ready() {
391                    trace!("   --> dispatch ready -- dispatching");
392
393                    // Only should be here if there are no queued messages
394                    assert!(self.dispatch_deque.is_empty());
395
396                    // Create the exchange state
397                    let mut exchange = Exchange::new(
398                        Request::Out(None),
399                        self.frame_buf.deque());
400
401                    exchange.out_body = body;
402
403                    // Set expect response
404                    exchange.set_expect_response(solo);
405
406                    if !exchange.is_complete() {
407                        // Track the exchange
408                        e.insert(exchange);
409                    }
410
411                    // Dispatch the message
412                    try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage {
413                        id: id,
414                        message: Ok(message),
415                        solo: solo,
416                    }));
417                } else {
418                    trace!("   --> dispatch not ready");
419
420                    self.blocked_on_dispatch = true;
421
422                    // Create the exchange state, including the buffered message
423                    let mut exchange = Exchange::new(
424                        Request::Out(Some(message)),
425                        self.frame_buf.deque());
426
427                    exchange.out_body = body;
428
429                    // Set expect response
430                    exchange.set_expect_response(solo);
431
432                    assert!(!exchange.is_complete());
433
434                    // Track the exchange state
435                    e.insert(exchange);
436
437                    // Track the request ID as pending dispatch
438                    self.dispatch_deque.push_back(id);
439                }
440            }
441        }
442
443        Ok(())
444    }
445
446    // Process an error
447    fn process_out_err(&mut self, id: RequestId, err: T::Error) -> io::Result<()> {
448        trace!("   --> process error frame");
449
450        let mut remove = false;
451
452        if let Some(exchange) = self.exchanges.get_mut(&id) {
453            if !exchange.is_dispatched() {
454                // The exchange is buffered and hasn't exited the multiplexer.
455                // At this point it is safe to just drop the state
456                remove = true;
457
458                assert!(exchange.out_body.is_none());
459                assert!(exchange.in_body.is_none());
460            } else if exchange.is_outbound() {
461                // Outbound exchanges can only have errors dispatched via the
462                // body
463                exchange.send_out_chunk(Err(err));
464
465                // The downstream dispatch has not provided a response to the
466                // exchange, indicate that interest has been canceled.
467                if !exchange.responded {
468                    try!(self.dispatch.get_mut().inner.cancel(id));
469                }
470
471                remove = exchange.is_complete();
472            } else {
473                if !exchange.responded {
474                    // A response has not been provided yet, send the error via
475                    // the dispatch
476                    try!(self.dispatch.get_mut().inner.dispatch(MultiplexMessage::error(id, err)));
477
478                    exchange.responded = true;
479                } else {
480                    // A response has already been sent, send the error via the
481                    // body stream
482                    exchange.send_out_chunk(Err(err));
483                }
484
485                remove = exchange.is_complete();
486            }
487        } else {
488            trace!("   --> no in-flight exchange; dropping error");
489        }
490
491        if remove {
492            self.exchanges.remove(&id);
493        }
494
495        Ok(())
496    }
497
498    fn process_out_body_chunk(&mut self, id: RequestId, chunk: Result<Option<T::BodyOut>, T::Error>) {
499        trace!("process out body chunk; id={:?}", id);
500
501        {
502            let exchange = match self.exchanges.get_mut(&id) {
503                Some(v) => v,
504                _ => {
505                    trace!("   --> exchange previously aborted; id={:?}", id);
506                    return;
507                }
508            };
509
510            exchange.send_out_chunk(chunk);
511
512            if !exchange.is_complete() {
513                return;
514            }
515        }
516
517        trace!("dropping out body handle; id={:?}", id);
518        self.exchanges.remove(&id);
519    }
520
521    fn write_in_frames(&mut self) -> io::Result<()> {
522        try!(self.write_in_messages());
523        try!(self.write_in_body());
524        Ok(())
525    }
526
527    fn write_in_messages(&mut self) -> io::Result<()> {
528        trace!("write in messages");
529
530        while self.dispatch.poll_ready().is_ready() {
531            trace!("   --> polling for in frame");
532
533            match try!(self.dispatch.get_mut().inner.poll()) {
534                Async::Ready(Some(message)) => {
535                    self.dispatch_made_progress();
536
537                    match message.message {
538                        Ok(m) => {
539                            trace!("   --> got message");
540                            try!(self.write_in_message(message.id, m, message.solo));
541                        }
542                        Err(error) => {
543                            trace!("   --> got error");
544                            try!(self.write_in_error(message.id, error));
545                        }
546                    }
547                }
548                Async::Ready(None) => {
549                    trace!("   --> got error");
550                    trace!("   --> got None");
551                    // The service is done with the connection. In this case, a
552                    // `Done` frame should be written to the transport and the
553                    // transport should start shutting down.
554                    //
555                    // However, the `Done` frame should only be written once
556                    // all the in-flight bodies have been written.
557                    //
558                    // For now, do nothing...
559                    break;
560                }
561                // Nothing to dispatch
562                Async::NotReady => break,
563            }
564        }
565
566        trace!("   --> transport not ready");
567        self.blocked_on_flush.transport_not_write_ready();
568
569        Ok(())
570    }
571
572    fn write_in_message(&mut self,
573                        id: RequestId,
574                        message: Message<T::In, T::Stream>,
575                        solo: bool)
576                        -> io::Result<()>
577    {
578        let (message, body) = match message {
579            Message::WithBody(message, rx) => (message, Some(rx)),
580            Message::WithoutBody(message) => (message, None),
581        };
582
583        // Create the frame
584        let frame = Frame::Message {
585            id: id,
586            message: message,
587            body: body.is_some(),
588            solo: solo,
589        };
590
591        // Write the frame
592        try!(assert_send(&mut self.dispatch, frame));
593        self.blocked_on_flush.wrote_frame();
594
595        match self.exchanges.entry(id) {
596            Entry::Occupied(mut e) => {
597                assert!(!e.get().responded, "invalid exchange state");
598                assert!(e.get().is_outbound());
599                assert!(!solo);
600
601                // Track that the exchange has been responded to
602                e.get_mut().responded = true;
603
604                // Set the body receiver
605                e.get_mut().in_body = body;
606
607                // If the exchange is complete, clean up the resources
608                if e.get().is_complete() {
609                    e.remove();
610                }
611            }
612            Entry::Vacant(e) => {
613                // Create the exchange state
614                let mut exchange = Exchange::new(
615                    Request::In,
616                    self.frame_buf.deque());
617
618                // Set the body receiver
619                exchange.in_body = body;
620                exchange.set_expect_response(solo);
621
622                if !exchange.is_complete() {
623                    // Track the exchange
624                    e.insert(exchange);
625                }
626            }
627        }
628
629        Ok(())
630    }
631
632    fn write_in_error(&mut self,
633                      id: RequestId,
634                      error: T::Error)
635                      -> io::Result<()>
636    {
637        if let Entry::Occupied(mut e) = self.exchanges.entry(id) {
638            assert!(!e.get().responded, "exchange already responded");
639
640            // TODO: should the outbound body be canceled? In theory, if the
641            // consuming end doesn't want it anymore, it should drop interest
642            e.get_mut().responded = true;
643            e.get_mut().out_body = None;
644            e.get_mut().in_body = None;
645            e.get_mut().out_deque.clear();
646
647            assert!(e.get().is_complete());
648
649            // Write the error frame
650            let frame = Frame::Error { id: id, error: error };
651            try!(assert_send(&mut self.dispatch, frame));
652            self.blocked_on_flush.wrote_frame();
653
654            e.remove();
655        } else {
656            trace!("exchange does not exist; id={:?}", id);
657        }
658
659        Ok(())
660    }
661
662    fn write_in_body(&mut self) -> io::Result<()> {
663        trace!("write in body chunks");
664
665        self.scratch.clear();
666
667        // Now, write the ready streams
668        'outer:
669        for (&id, exchange) in &mut self.exchanges {
670            trace!("   --> checking request {:?}", id);
671
672            loop {
673                if !try!(self.dispatch.poll_complete()).is_ready() {
674                    trace!("   --> blocked on transport");
675                    self.blocked_on_flush.transport_not_write_ready();
676                    break 'outer;
677                }
678
679                match exchange.try_poll_in_body() {
680                    Ok(Async::Ready(Some(chunk))) => {
681                        trace!("   --> got chunk");
682
683                        let frame = Frame::Body { id: id, chunk: Some(chunk) };
684                        try!(assert_send(&mut self.dispatch, frame));
685                        self.blocked_on_flush.wrote_frame();
686                    }
687                    Ok(Async::Ready(None)) => {
688                        trace!("   --> end of stream");
689
690                        let frame = Frame::Body { id: id, chunk: None };
691                        try!(assert_send(&mut self.dispatch, frame));
692                        self.blocked_on_flush.wrote_frame();
693
694                        // in_body is fully written.
695                        exchange.in_body = None;
696                        break;
697                    }
698                    Err(error) => {
699                        trace!("   --> got error");
700
701                        // Write the error frame
702                        let frame = Frame::Error { id: id, error: error };
703                        try!(assert_send(&mut self.dispatch, frame));
704                        self.blocked_on_flush.wrote_frame();
705
706                        exchange.responded = true;
707                        exchange.in_body = None;
708                        exchange.out_body = None;
709                        exchange.out_deque.clear();
710
711                        debug_assert!(exchange.is_complete());
712                        break;
713                    }
714                    Ok(Async::NotReady) => {
715                        trace!("   --> no pending chunks");
716                        continue 'outer;
717                    }
718                }
719            }
720
721            if exchange.is_complete() {
722                self.scratch.push(id);
723            }
724        }
725
726        for id in &self.scratch {
727            trace!("dropping in body handle; id={:?}", id);
728            self.exchanges.remove(id);
729        }
730
731        Ok(())
732    }
733
734    fn flush(&mut self) -> io::Result<()> {
735        self.is_flushed = try!(self.dispatch.poll_complete()).is_ready();
736
737        // TODO: Technically, poll_complete needs to be called on the exchange body senders.
738        // However, mpsc::Sender doesn't actually need to have poll_complete called as it is
739        // currently a no-op. So, I'm just going to punt on figuring out the best way to handle
740        // poll_complete.
741
742        if self.is_flushed && self.blocked_on_flush == WriteState::Blocked {
743            self.made_progress = true;
744        }
745
746        Ok(())
747    }
748
749    fn reset_flags(&mut self) {
750        self.made_progress = false;
751        self.blocked_on_dispatch = false;
752        self.blocked_on_flush = WriteState::NoWrite;
753    }
754
755    fn dispatch_made_progress(&mut self) {
756        if self.blocked_on_dispatch {
757            self.made_progress = true;
758        }
759    }
760}
761
762impl<T> Future for Multiplex<T>
763    where T: Dispatch,
764{
765    type Item = ();
766    type Error = io::Error;
767
768    // Tick the pipeline state machine
769    fn poll(&mut self) -> Poll<(), io::Error> {
770        trace!("Multiplex::tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~");
771
772        // Always tick the transport first
773        self.dispatch.get_mut().inner.transport().tick();
774
775        // Try to send any buffered body chunks on their senders
776        //
777        // This has to happen at the start of the tick. The sender readiness is computed for later
778        // on.
779        try!(self.flush_out_bodies());
780
781        // Initially set the made_progress flag to true
782        self.made_progress = true;
783
784        // Keep looping as long as at least one operation succeeds
785        while self.made_progress {
786            trace!("~~ multiplex primary loop tick ~~");
787
788            // Reset various flags tracking the state throughout this loop.
789            self.reset_flags();
790
791            // Try to dispatch any buffered messages
792            try!(self.flush_dispatch_deque());
793
794            // First read off data from the socket
795            try!(self.read_out_frames());
796
797            // Handle completed responses
798            try!(self.write_in_frames());
799
800            // Try flushing buffered writes
801            try!(self.flush());
802        }
803
804        // Clean shutdown of the pipeline server can happen when
805        //
806        // 1. The server is done running, this is signaled by Transport::poll()
807        //    returning None.
808        //
809        // 2. The transport is done writing all data to the socket, this is
810        //    signaled by Transport::flush() returning Ok(Some(())).
811        //
812        // 3. There are no further responses to write to the transport.
813        //
814        // It is necessary to perfom these three checks in order to handle the
815        // case where the client shuts down half the socket.
816        //
817        if self.is_done() {
818            trace!("multiplex done; terminating");
819            return Ok(Async::Ready(()));
820        }
821
822        trace!("tick done; waiting for wake-up");
823
824        // Tick again later
825        Ok(Async::NotReady)
826    }
827}
828
829impl<T: Dispatch> Drop for Multiplex<T> {
830    fn drop(&mut self) {
831        if !self.exchanges.is_empty() {
832            warn!("multiplexer dropping with in-flight exchanges");
833        }
834    }
835}
836
837impl<T: Dispatch> Exchange<T> {
838    fn new(request: Request<T>, deque: FrameDeque<Option<Result<T::BodyOut, T::Error>>>) -> Exchange<T> {
839        Exchange {
840            request: request,
841            responded: false,
842            out_body: None,
843            out_deque: deque,
844            out_is_ready: true,
845            in_body: None,
846        }
847    }
848
849    fn is_inbound(&self) -> bool {
850        match self.request {
851            Request::In => true,
852            Request::Out(_) => false,
853        }
854    }
855
856    fn is_outbound(&self) -> bool {
857        !self.is_inbound()
858    }
859
860    fn is_dispatched(&self) -> bool {
861        match self.request {
862            Request::Out(Some(_)) => false,
863            _ => true,
864        }
865    }
866
867    /// Returns true if the exchange is complete
868    fn is_complete(&self) -> bool {
869        // The exchange is completed if the response has been seen and bodies
870        // in both directions are fully flushed
871        self.responded &&
872            self.out_body.is_none() &&
873            self.in_body.is_none() &&
874            self.request.is_none()
875    }
876
877    fn set_expect_response(&mut self, solo: bool) {
878        self.responded = solo;
879
880        if solo {
881            if self.is_inbound() {
882                assert!(self.out_body.is_none());
883            } else {
884                assert!(self.in_body.is_none());
885            }
886        }
887    }
888
889    /// Takes the buffered out request out of the value and returns it
890    fn take_buffered_out_request(&mut self) -> Option<Message<T::Out, Body<T::BodyOut, T::Error>>> {
891        match self.request {
892            Request::Out(ref mut request) => request.take(),
893            _ => None,
894        }
895    }
896
897    fn send_out_chunk(&mut self, chunk: Result<Option<T::BodyOut>, T::Error>) {
898        // Reverse Result & Option
899        let chunk = match chunk {
900            Ok(Some(v)) => Some(Ok(v)),
901            Ok(None) => None,
902            Err(e) => Some(Err(e)),
903        };
904
905        // Get a reference to the sender
906        {
907            let sender = match self.out_body {
908                Some(ref mut v) => v,
909                _ =>  {
910                    return;
911                }
912            };
913
914            if self.out_is_ready {
915                trace!("   --> send chunk; end-of-stream={:?}", chunk.is_none());
916
917                // If there is a chunk (vs. None which represents end of
918                // stream)
919                if let Some(chunk) = chunk {
920                    match sender.start_send(chunk) {
921                        Ok(AsyncSink::Ready) => {
922                            trace!("   --> ready for more");
923                            // The sender is ready for another message
924                            return;
925                        }
926                        Ok(AsyncSink::NotReady(chunk)) => {
927                            // The sender is not ready for another message
928                            self.out_deque.push(Some(chunk));
929                            self.out_is_ready = false;
930
931                            return;
932                        }
933                        Err(_) => {
934                            // The sender is complete, it should be removed
935                        }
936                    }
937                }
938
939                assert!(self.out_deque.is_empty());
940            } else {
941                trace!("   --> queueing chunk");
942
943                self.out_deque.push(chunk);
944                return;
945            }
946        }
947
948        self.out_is_ready = false;
949        self.out_body = None;
950    }
951
952    fn try_poll_in_body(&mut self) -> Poll<Option<T::BodyIn>, T::Error> {
953        match self.in_body {
954            Some(ref mut b) => b.poll(),
955            None => {
956                trace!(" !!! no in body??");
957                Ok(Async::NotReady)
958            }
959        }
960    }
961
962    /// Write as many buffered body chunks to the sender
963    fn flush_out_body(&mut self) -> io::Result<()> {
964        {
965            let sender = match self.out_body {
966                Some(ref mut sender) => sender,
967                None => {
968                    assert!(self.out_deque.is_empty(), "pending out frames but no sender");
969                    return Ok(());
970                }
971            };
972
973            self.out_is_ready = true;
974
975            loop {
976                // Pop a pending frame
977                let msg = match self.out_deque.pop() {
978                    Some(Some(msg)) => msg,
979                    Some(None) => break,
980                    None => {
981                        // No more frames to flush
982                        return Ok(());
983                    }
984                };
985
986                // Errors represent the last message to send
987                let done = msg.is_err();
988
989                match sender.start_send(msg) {
990                    Ok(AsyncSink::Ready) => {}
991                    Ok(AsyncSink::NotReady(msg)) => {
992                        trace!("   --> not ready");
993
994                        // Sender not ready
995                        self.out_deque.push_front(Some(msg));
996                        self.out_is_ready = false;
997
998                        return Ok(());
999                    }
1000                    Err(_) => {
1001                        // The receiving end dropped interest in the body
1002                        // stream. In this case, the sender and the frame
1003                        // buffer is dropped. If future body frames are
1004                        // received, the sender will be gone and the frames
1005                        // will be dropped.
1006                        //
1007                        // TODO: Notify transport
1008                        break;
1009                    }
1010                }
1011
1012                if done {
1013                    break
1014                }
1015            }
1016        }
1017
1018        // At this point, the outbound body is complete.
1019        self.out_deque.clear();
1020        self.out_is_ready = false;
1021        self.out_body = None;
1022
1023        Ok(())
1024    }
1025}
1026
1027impl<T> fmt::Debug for Exchange<T>
1028    where T: Dispatch + fmt::Debug,
1029          T::In: fmt::Debug,
1030          T::Out: fmt::Debug,
1031          T::BodyIn: fmt::Debug,
1032          T::BodyOut: fmt::Debug,
1033          T::Error: fmt::Debug,
1034          T::Stream: fmt::Debug,
1035{
1036    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1037        f.debug_struct("Exchange")
1038            .field("request", &self.request)
1039            .field("responded", &self.responded)
1040            .field("out_body", &"Sender { ... }")
1041            .field("out_deque", &"FrameDeque { ... }")
1042            .field("out_is_ready", &self.out_is_ready)
1043            .field("in_body", &self.in_body)
1044            .finish()
1045    }
1046}
1047
1048impl<T: Dispatch> Request<T> {
1049    fn is_none(&self) -> bool {
1050        match *self {
1051            Request::In => true,
1052            Request::Out(None) => true,
1053            _ => false,
1054        }
1055    }
1056}
1057
1058impl WriteState {
1059    fn transport_not_write_ready(&mut self) {
1060        if *self == WriteState::Wrote {
1061            *self = WriteState::Blocked;
1062        }
1063    }
1064
1065    fn wrote_frame(&mut self) {
1066        if *self == WriteState::NoWrite {
1067            *self = WriteState::Wrote;
1068        }
1069    }
1070}
1071
1072fn assert_send<T>(s: &mut T, item: T::SinkItem) -> Result<(), T::SinkError>
1073    where T: Sink
1074{
1075    match try!(s.start_send(item)) {
1076        AsyncSink::Ready => Ok(()),
1077        AsyncSink::NotReady(_) => {
1078            panic!("sink reported itself as ready after `poll_ready` but was \
1079                    then unable to accept a message")
1080        }
1081    }
1082}
1083
1084/*
1085 *
1086 * ===== impl MultiplexMessage =====
1087 *
1088 */
1089
1090impl<T, B, E> MultiplexMessage<T, B, E> {
1091    /// Create a new MultiplexMessage
1092    pub fn new(id: RequestId, message: Message<T, B>) -> MultiplexMessage<T, B, E> {
1093        MultiplexMessage {
1094            id: id,
1095            message: Ok(message),
1096            solo: false,
1097        }
1098    }
1099
1100    /// Create a new errored MultiplexMessage
1101    pub fn error(id: RequestId, error: E) -> MultiplexMessage<T, B, E> {
1102        MultiplexMessage {
1103            id: id,
1104            message: Err(error),
1105            solo: false,
1106        }
1107    }
1108}
1109
1110impl<T: Dispatch> Sink for DispatchSink<T> {
1111    type SinkItem = <T::Transport as Sink>::SinkItem;
1112    type SinkError = io::Error;
1113
1114    fn start_send(&mut self, item: Self::SinkItem)
1115                  -> StartSend<Self::SinkItem, io::Error>
1116    {
1117        self.inner.transport().start_send(item)
1118    }
1119
1120    fn poll_complete(&mut self) -> Poll<(), io::Error> {
1121        self.inner.transport().poll_complete()
1122    }
1123
1124    fn close(&mut self) -> Poll<(), io::Error> {
1125        self.inner.transport().close()
1126    }
1127}