tokio_core/io/
frame.rs

1use std::fmt;
2use std::io;
3use std::hash;
4use std::mem;
5use std::cmp;
6use std::ops::{Deref, DerefMut};
7use std::sync::Arc;
8
9use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
10
11use io::Io;
12
13const INITIAL_CAPACITY: usize = 8 * 1024;
14
15/// A reference counted buffer of bytes.
16///
17/// An `EasyBuf` is a representation of a byte buffer where sub-slices of it can
18/// be handed out efficiently, each with a `'static` lifetime which keeps the
19/// data alive. The buffer also supports mutation but may require bytes to be
20/// copied to complete the operation.
21#[derive(Clone, Eq)]
22pub struct EasyBuf {
23    buf: Arc<Vec<u8>>,
24    start: usize,
25    end: usize,
26}
27
28/// An RAII object returned from `get_mut` which provides mutable access to the
29/// underlying `Vec<u8>`.
30pub struct EasyBufMut<'a> {
31    buf: &'a mut Vec<u8>,
32    end: &'a mut usize,
33}
34
35impl EasyBuf {
36    /// Creates a new EasyBuf with no data and the default capacity.
37    pub fn new() -> EasyBuf {
38        EasyBuf::with_capacity(INITIAL_CAPACITY)
39    }
40
41    /// Creates a new EasyBuf with `cap` capacity.
42    pub fn with_capacity(cap: usize) -> EasyBuf {
43        EasyBuf {
44            buf: Arc::new(Vec::with_capacity(cap)),
45            start: 0,
46            end: 0,
47        }
48    }
49
50    /// Changes the starting index of this window to the index specified.
51    ///
52    /// Returns the windows back to chain multiple calls to this method.
53    ///
54    /// # Panics
55    ///
56    /// This method will panic if `start` is out of bounds for the underlying
57    /// slice or if it comes after the `end` configured in this window.
58    fn set_start(&mut self, start: usize) -> &mut EasyBuf {
59        assert!(start <= self.buf.as_ref().len());
60        assert!(start <= self.end);
61        self.start = start;
62        self
63    }
64
65    /// Changes the end index of this window to the index specified.
66    ///
67    /// Returns the windows back to chain multiple calls to this method.
68    ///
69    /// # Panics
70    ///
71    /// This method will panic if `end` is out of bounds for the underlying
72    /// slice or if it comes after the `end` configured in this window.
73    fn set_end(&mut self, end: usize) -> &mut EasyBuf {
74        assert!(end <= self.buf.len());
75        assert!(self.start <= end);
76        self.end = end;
77        self
78    }
79
80    /// Returns the number of bytes contained in this `EasyBuf`.
81    pub fn len(&self) -> usize {
82        self.end - self.start
83    }
84
85    /// Returns the inner contents of this `EasyBuf` as a slice.
86    pub fn as_slice(&self) -> &[u8] {
87        self.as_ref()
88    }
89
90    /// Splits the buffer into two at the given index.
91    ///
92    /// Afterwards `self` contains elements `[0, at)`, and the returned `EasyBuf`
93    /// contains elements `[at, len)`.
94    ///
95    /// This is an O(1) operation that just increases the reference count and
96    /// sets a few indexes.
97    ///
98    /// # Panics
99    ///
100    /// Panics if `at > len`
101    pub fn split_off(&mut self, at: usize) -> EasyBuf {
102        let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
103        let idx = self.start + at;
104        other.set_start(idx);
105        self.set_end(idx);
106        return other
107    }
108
109    /// Splits the buffer into two at the given index.
110    ///
111    /// Afterwards `self` contains elements `[at, len)`, and the returned `EasyBuf`
112    /// contains elements `[0, at)`.
113    ///
114    /// This is an O(1) operation that just increases the reference count and
115    /// sets a few indexes.
116    ///
117    /// # Panics
118    ///
119    /// Panics if `at > len`
120    pub fn drain_to(&mut self, at: usize) -> EasyBuf {
121        let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
122        let idx = self.start + at;
123        other.set_end(idx);
124        self.set_start(idx);
125        return other
126    }
127
128    /// Returns a mutable reference to the underlying growable buffer of bytes.
129    ///
130    /// If this `EasyBuf` is the only instance pointing at the underlying buffer
131    /// of bytes, a direct mutable reference will be returned. Otherwise the
132    /// contents of this `EasyBuf` will be reallocated in a fresh `Vec<u8>`
133    /// allocation with the same capacity as an `EasyBuf` created with `EasyBuf::new()`,
134    /// and that allocation will be returned.
135    ///
136    /// This operation **is not O(1)** as it may clone the entire contents of
137    /// this buffer.
138    ///
139    /// The returned `EasyBufMut` type implement `Deref` and `DerefMut` to
140    /// `Vec<u8>` can the byte buffer can be manipulated using the standard
141    /// `Vec<u8>` methods.
142    pub fn get_mut(&mut self) -> EasyBufMut {
143        // Fast path if we can get mutable access to our own current
144        // buffer.
145        //
146        // TODO: this should be a match or an if-let
147        if Arc::get_mut(&mut self.buf).is_some() {
148            let buf = Arc::get_mut(&mut self.buf).unwrap();
149            buf.drain(self.end..);
150            buf.drain(..self.start);
151            self.start = 0;
152            return EasyBufMut { buf: buf, end: &mut self.end }
153        }
154
155        // If we couldn't get access above then we give ourself a new buffer
156        // here.
157        let mut v = Vec::with_capacity(cmp::max(INITIAL_CAPACITY, self.as_ref().len()));
158        v.extend_from_slice(self.as_ref());
159        self.start = 0;
160        self.buf = Arc::new(v);
161        EasyBufMut {
162            buf: Arc::get_mut(&mut self.buf).unwrap(),
163            end: &mut self.end,
164        }
165    }
166}
167
168impl AsRef<[u8]> for EasyBuf {
169    fn as_ref(&self) -> &[u8] {
170        &self.buf[self.start..self.end]
171    }
172}
173
174impl<'a> Deref for EasyBufMut<'a> {
175    type Target = Vec<u8>;
176
177    fn deref(&self) -> &Vec<u8> {
178        self.buf
179    }
180}
181
182impl<'a> DerefMut for EasyBufMut<'a> {
183    fn deref_mut(&mut self) -> &mut Vec<u8> {
184        self.buf
185    }
186}
187
188impl From<Vec<u8>> for EasyBuf {
189    fn from(vec: Vec<u8>) -> EasyBuf {
190        let end = vec.len();
191        EasyBuf {
192            buf: Arc::new(vec),
193            start: 0,
194            end: end,
195        }
196    }
197}
198
199impl<T: AsRef<[u8]>> PartialEq<T> for EasyBuf {
200    fn eq(&self, other: &T) -> bool {
201        self.as_slice().eq(other.as_ref())
202    }
203}
204
205impl Ord for EasyBuf {
206    fn cmp(&self, other: &Self) -> cmp::Ordering {
207        self.as_slice().cmp(other.as_slice())
208    }
209}
210
211impl<T: AsRef<[u8]>> PartialOrd<T> for EasyBuf {
212    fn partial_cmp(&self, other: &T) -> Option<cmp::Ordering> {
213        self.as_slice().partial_cmp(other.as_ref())
214    }
215}
216
217impl hash::Hash for EasyBuf {
218    fn hash<H: hash::Hasher>(&self, state: &mut H) {
219        self.as_slice().hash(state)
220    }
221}
222
223impl<'a> Drop for EasyBufMut<'a> {
224    fn drop(&mut self) {
225        *self.end = self.buf.len();
226    }
227}
228
229impl fmt::Debug for EasyBuf {
230    fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
231        let bytes = self.as_ref();
232        let len = self.len();
233        if len < 10 {
234            write!(formatter, "EasyBuf{{len={}/{} {:?}}}", self.len(), self.buf.len(), bytes)
235        } else { // choose a more compact representation
236            write!(formatter, "EasyBuf{{len={}/{} [{}, {}, {}, {}, ..., {}, {}, {}, {}]}}", self.len(), self.buf.len(), bytes[0], bytes[1], bytes[2], bytes[3], bytes[len-4], bytes[len-3], bytes[len-2], bytes[len-1])
237        }
238    }
239}
240
241impl Into<Vec<u8>> for EasyBuf {
242    fn into(mut self) -> Vec<u8> {
243        mem::replace(self.get_mut().buf, vec![])
244    }
245}
246
247/// Encoding and decoding of frames via buffers.
248///
249/// This trait is used when constructing an instance of `Framed`. It provides
250/// two types: `In`, for decoded input frames, and `Out`, for outgoing frames
251/// that need to be encoded. It also provides methods to actually perform the
252/// encoding and decoding, which work with corresponding buffer types.
253///
254/// The trait itself is implemented on a type that can track state for decoding
255/// or encoding, which is particularly useful for streaming parsers. In many
256/// cases, though, this type will simply be a unit struct (e.g. `struct
257/// HttpCodec`).
258pub trait Codec {
259    /// The type of decoded frames.
260    type In;
261
262    /// The type of frames to be encoded.
263    type Out;
264
265    /// Attempts to decode a frame from the provided buffer of bytes.
266    ///
267    /// This method is called by `Framed` whenever bytes are ready to be parsed.
268    /// The provided buffer of bytes is what's been read so far, and this
269    /// instance of `Decode` can determine whether an entire frame is in the
270    /// buffer and is ready to be returned.
271    ///
272    /// If an entire frame is available, then this instance will remove those
273    /// bytes from the buffer provided and return them as a decoded
274    /// frame. Note that removing bytes from the provided buffer doesn't always
275    /// necessarily copy the bytes, so this should be an efficient operation in
276    /// most circumstances.
277    ///
278    /// If the bytes look valid, but a frame isn't fully available yet, then
279    /// `Ok(None)` is returned. This indicates to the `Framed` instance that
280    /// it needs to read some more bytes before calling this method again.
281    ///
282    /// Finally, if the bytes in the buffer are malformed then an error is
283    /// returned indicating why. This informs `Framed` that the stream is now
284    /// corrupt and should be terminated.
285    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>>;
286
287    /// A default method available to be called when there are no more bytes
288    /// available to be read from the underlying I/O.
289    ///
290    /// This method defaults to calling `decode` and returns an error if
291    /// `Ok(None)` is returned. Typically this doesn't need to be implemented
292    /// unless the framing protocol differs near the end of the stream.
293    fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result<Self::In> {
294        match try!(self.decode(buf)) {
295            Some(frame) => Ok(frame),
296            None => Err(io::Error::new(io::ErrorKind::Other,
297                                       "bytes remaining on stream")),
298        }
299    }
300
301    /// Encodes a frame into the buffer provided.
302    ///
303    /// This method will encode `msg` into the byte buffer provided by `buf`.
304    /// The `buf` provided is an internal buffer of the `Framed` instance and
305    /// will be written out when possible.
306    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()>;
307}
308
309/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
310/// the `Codec` trait to encode and decode frames.
311///
312/// You can acquire a `Framed` instance by using the `Io::framed` adapter.
313#[must_use = "streams do nothing unless polled"]
314pub struct Framed<T, C> {
315    upstream: T,
316    codec: C,
317    eof: bool,
318    is_readable: bool,
319    rd: EasyBuf,
320    wr: Vec<u8>,
321}
322
323impl<T: Io, C: Codec> Stream for Framed<T, C> {
324    type Item = C::In;
325    type Error = io::Error;
326
327    fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
328        loop {
329            // If the read buffer has any pending data, then it could be
330            // possible that `decode` will return a new frame. We leave it to
331            // the decoder to optimize detecting that more data is required.
332            if self.is_readable {
333                if self.eof {
334                    if self.rd.len() == 0 {
335                        return Ok(None.into())
336                    } else {
337                        let frame = try!(self.codec.decode_eof(&mut self.rd));
338                        return Ok(Async::Ready(Some(frame)))
339                    }
340                }
341                trace!("attempting to decode a frame");
342                if let Some(frame) = try!(self.codec.decode(&mut self.rd)) {
343                    trace!("frame decoded from buffer");
344                    return Ok(Async::Ready(Some(frame)));
345                }
346                self.is_readable = false;
347            }
348
349            assert!(!self.eof);
350
351            // Otherwise, try to read more data and try again
352            //
353            // TODO: shouldn't read_to_end, that may read a lot
354            let before = self.rd.len();
355            let ret = self.upstream.read_to_end(&mut self.rd.get_mut());
356            match ret {
357                Ok(_n) => self.eof = true,
358                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
359                    if self.rd.len() == before {
360                        return Ok(Async::NotReady)
361                    }
362                }
363                Err(e) => return Err(e),
364            }
365            self.is_readable = true;
366        }
367    }
368}
369
370impl<T: Io, C: Codec> Sink for Framed<T, C> {
371    type SinkItem = C::Out;
372    type SinkError = io::Error;
373
374    fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
375        // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
376        // *still* over 8KiB, then apply backpressure (reject the send).
377        const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
378        if self.wr.len() > BACKPRESSURE_BOUNDARY {
379            try!(self.poll_complete());
380            if self.wr.len() > BACKPRESSURE_BOUNDARY {
381                return Ok(AsyncSink::NotReady(item));
382            }
383        }
384
385        try!(self.codec.encode(item, &mut self.wr));
386        Ok(AsyncSink::Ready)
387    }
388
389    fn poll_complete(&mut self) -> Poll<(), io::Error> {
390        trace!("flushing framed transport");
391
392        while !self.wr.is_empty() {
393            trace!("writing; remaining={}", self.wr.len());
394            let n = try_nb!(self.upstream.write(&self.wr));
395            if n == 0 {
396                return Err(io::Error::new(io::ErrorKind::WriteZero,
397                                          "failed to write frame to transport"));
398            }
399            self.wr.drain(..n);
400        }
401
402        // Try flushing the underlying IO
403        try_nb!(self.upstream.flush());
404
405        trace!("framed transport flushed");
406        return Ok(Async::Ready(()));
407    }
408
409    fn close(&mut self) -> Poll<(), io::Error> {
410        try_ready!(self.poll_complete());
411        Ok(().into())
412    }
413}
414
415pub fn framed<T, C>(io: T, codec: C) -> Framed<T, C> {
416    Framed {
417        upstream: io,
418        codec: codec,
419        eof: false,
420        is_readable: false,
421        rd: EasyBuf::new(),
422        wr: Vec::with_capacity(INITIAL_CAPACITY),
423    }
424}
425
426impl<T, C> Framed<T, C> {
427
428    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
429    ///
430    /// Note that care should be taken to not tamper with the underlying stream
431    /// of data coming in as it may corrupt the stream of frames otherwise being
432    /// worked with.
433    pub fn get_ref(&self) -> &T {
434        &self.upstream
435    }
436
437    /// Returns a mutable reference to the underlying I/O stream wrapped by
438    /// `Framed`.
439    ///
440    /// Note that care should be taken to not tamper with the underlying stream
441    /// of data coming in as it may corrupt the stream of frames otherwise being
442    /// worked with.
443    pub fn get_mut(&mut self) -> &mut T {
444        &mut self.upstream
445    }
446
447    /// Consumes the `Framed`, returning its underlying I/O stream.
448    ///
449    /// Note that care should be taken to not tamper with the underlying stream
450    /// of data coming in as it may corrupt the stream of frames otherwise being
451    /// worked with.
452    pub fn into_inner(self) -> T {
453        self.upstream
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::{INITIAL_CAPACITY, EasyBuf};
460    use std::mem;
461
462    #[test]
463    fn debug_empty_easybuf() {
464        let buf: EasyBuf = vec![].into();
465        assert_eq!("EasyBuf{len=0/0 []}", format!("{:?}", buf));
466    }
467
468    #[test]
469    fn debug_small_easybuf() {
470        let buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
471        assert_eq!("EasyBuf{len=6/6 [1, 2, 3, 4, 5, 6]}", format!("{:?}", buf));
472    }
473
474    #[test]
475    fn debug_small_easybuf_split() {
476        let mut buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
477        let split = buf.split_off(4);
478        assert_eq!("EasyBuf{len=4/6 [1, 2, 3, 4]}", format!("{:?}", buf));
479        assert_eq!("EasyBuf{len=2/6 [5, 6]}", format!("{:?}", split));
480    }
481
482    #[test]
483    fn debug_large_easybuf() {
484        let vec: Vec<u8> = (0u8..255u8).collect();
485        let buf: EasyBuf = vec.into();
486        assert_eq!("EasyBuf{len=255/255 [0, 1, 2, 3, ..., 251, 252, 253, 254]}", format!("{:?}", buf));
487    }
488
489    #[test]
490    fn easybuf_get_mut_sliced() {
491        let vec: Vec<u8> = (0u8..10u8).collect();
492        let mut buf: EasyBuf = vec.into();
493        buf.split_off(9);
494        buf.drain_to(3);
495        assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
496    }
497
498    #[test]
499    fn easybuf_get_mut_sliced_allocating_at_least_initial_capacity() {
500        let vec: Vec<u8> = (0u8..10u8).collect();
501        let mut buf: EasyBuf = vec.into();
502        buf.split_off(9);
503        buf.drain_to(3);
504        // Clone to make shared
505        let clone = buf.clone();
506        assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
507        assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY);
508        mem::drop(clone); // prevent unused warning
509    }
510
511    #[test]
512    fn easybuf_get_mut_sliced_allocating_required_capacity() {
513        let vec: Vec<u8> = (0..INITIAL_CAPACITY * 2).map(|_|0u8).collect();
514        let mut buf: EasyBuf = vec.into();
515        buf.drain_to(INITIAL_CAPACITY / 2);
516        let clone = buf.clone();
517        assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY + INITIAL_CAPACITY / 2);
518        mem::drop(clone)
519    }
520
521    #[test]
522    fn easybuf_into_vec_simple() {
523        let vec: Vec<u8> = (0u8..10u8).collect();
524        let reference = vec.clone();
525        let buf: EasyBuf = vec.into();
526        let original_pointer = buf.buf.as_ref().as_ptr();
527        let result: Vec<u8> = buf.into();
528        assert_eq!(result, reference);
529        let new_pointer = result.as_ptr();
530        assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
531    }
532
533    #[test]
534    fn easybuf_into_vec_sliced() {
535        let vec: Vec<u8> = (0u8..10u8).collect();
536        let mut buf: EasyBuf = vec.into();
537        let original_pointer = buf.buf.as_ref().as_ptr();
538        buf.split_off(9);
539        buf.drain_to(3);
540        let result: Vec<u8> = buf.into();
541        let reference: Vec<u8> = (3u8..9u8).collect();
542        assert_eq!(result, reference);
543        let new_pointer = result.as_ptr();
544        assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
545    }
546
547    #[test]
548    fn easybuf_into_vec_sliced_allocating() {
549        let vec: Vec<u8> = (0u8..10u8).collect();
550        let mut buf: EasyBuf = vec.into();
551        let original_pointer = buf.buf.as_ref().as_ptr();
552        // Create a clone to create second reference to this EasyBuf and force allocation
553        let original = buf.clone();
554        buf.split_off(9);
555        buf.drain_to(3);
556        let result: Vec<u8> = buf.into();
557        let reference: Vec<u8> = (3u8..9u8).collect();
558        assert_eq!(result, reference);
559        let original_reference: EasyBuf =(0u8..10u8).collect::<Vec<u8>>().into();
560        assert_eq!(original.as_ref(), original_reference.as_ref());
561        let new_pointer = result.as_ptr();
562        assert_ne!(original_pointer, new_pointer, "A new vec should be allocated");
563    }
564
565    #[test]
566    fn easybuf_equality_same_underlying_vec() {
567        let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
568        assert_eq!(buf, buf);
569        let other = buf.drain_to(5);
570        assert_ne!(buf, other);
571
572        let buf: EasyBuf = (0u8..5).collect::<Vec<_>>().into();
573        assert_eq!(buf, other);
574    }
575
576    #[test]
577    fn easybuf_equality_different_underlying_vec() {
578        let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
579        let mut other: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
580        assert_eq!(buf, other);
581
582        buf = buf.drain_to(5);
583        assert_ne!(buf, other);
584
585        other = other.drain_to(5);
586        assert_eq!(buf, other);
587    }
588}