1use std::fmt;
2use std::io;
3use std::hash;
4use std::mem;
5use std::cmp;
6use std::ops::{Deref, DerefMut};
7use std::sync::Arc;
8
9use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
10
11use io::Io;
12
13const INITIAL_CAPACITY: usize = 8 * 1024;
14
15#[derive(Clone, Eq)]
22pub struct EasyBuf {
23 buf: Arc<Vec<u8>>,
24 start: usize,
25 end: usize,
26}
27
28pub struct EasyBufMut<'a> {
31 buf: &'a mut Vec<u8>,
32 end: &'a mut usize,
33}
34
35impl EasyBuf {
36 pub fn new() -> EasyBuf {
38 EasyBuf::with_capacity(INITIAL_CAPACITY)
39 }
40
41 pub fn with_capacity(cap: usize) -> EasyBuf {
43 EasyBuf {
44 buf: Arc::new(Vec::with_capacity(cap)),
45 start: 0,
46 end: 0,
47 }
48 }
49
50 fn set_start(&mut self, start: usize) -> &mut EasyBuf {
59 assert!(start <= self.buf.as_ref().len());
60 assert!(start <= self.end);
61 self.start = start;
62 self
63 }
64
65 fn set_end(&mut self, end: usize) -> &mut EasyBuf {
74 assert!(end <= self.buf.len());
75 assert!(self.start <= end);
76 self.end = end;
77 self
78 }
79
80 pub fn len(&self) -> usize {
82 self.end - self.start
83 }
84
85 pub fn as_slice(&self) -> &[u8] {
87 self.as_ref()
88 }
89
90 pub fn split_off(&mut self, at: usize) -> EasyBuf {
102 let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
103 let idx = self.start + at;
104 other.set_start(idx);
105 self.set_end(idx);
106 return other
107 }
108
109 pub fn drain_to(&mut self, at: usize) -> EasyBuf {
121 let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
122 let idx = self.start + at;
123 other.set_end(idx);
124 self.set_start(idx);
125 return other
126 }
127
128 pub fn get_mut(&mut self) -> EasyBufMut {
143 if Arc::get_mut(&mut self.buf).is_some() {
148 let buf = Arc::get_mut(&mut self.buf).unwrap();
149 buf.drain(self.end..);
150 buf.drain(..self.start);
151 self.start = 0;
152 return EasyBufMut { buf: buf, end: &mut self.end }
153 }
154
155 let mut v = Vec::with_capacity(cmp::max(INITIAL_CAPACITY, self.as_ref().len()));
158 v.extend_from_slice(self.as_ref());
159 self.start = 0;
160 self.buf = Arc::new(v);
161 EasyBufMut {
162 buf: Arc::get_mut(&mut self.buf).unwrap(),
163 end: &mut self.end,
164 }
165 }
166}
167
168impl AsRef<[u8]> for EasyBuf {
169 fn as_ref(&self) -> &[u8] {
170 &self.buf[self.start..self.end]
171 }
172}
173
174impl<'a> Deref for EasyBufMut<'a> {
175 type Target = Vec<u8>;
176
177 fn deref(&self) -> &Vec<u8> {
178 self.buf
179 }
180}
181
182impl<'a> DerefMut for EasyBufMut<'a> {
183 fn deref_mut(&mut self) -> &mut Vec<u8> {
184 self.buf
185 }
186}
187
188impl From<Vec<u8>> for EasyBuf {
189 fn from(vec: Vec<u8>) -> EasyBuf {
190 let end = vec.len();
191 EasyBuf {
192 buf: Arc::new(vec),
193 start: 0,
194 end: end,
195 }
196 }
197}
198
199impl<T: AsRef<[u8]>> PartialEq<T> for EasyBuf {
200 fn eq(&self, other: &T) -> bool {
201 self.as_slice().eq(other.as_ref())
202 }
203}
204
205impl Ord for EasyBuf {
206 fn cmp(&self, other: &Self) -> cmp::Ordering {
207 self.as_slice().cmp(other.as_slice())
208 }
209}
210
211impl<T: AsRef<[u8]>> PartialOrd<T> for EasyBuf {
212 fn partial_cmp(&self, other: &T) -> Option<cmp::Ordering> {
213 self.as_slice().partial_cmp(other.as_ref())
214 }
215}
216
217impl hash::Hash for EasyBuf {
218 fn hash<H: hash::Hasher>(&self, state: &mut H) {
219 self.as_slice().hash(state)
220 }
221}
222
223impl<'a> Drop for EasyBufMut<'a> {
224 fn drop(&mut self) {
225 *self.end = self.buf.len();
226 }
227}
228
229impl fmt::Debug for EasyBuf {
230 fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
231 let bytes = self.as_ref();
232 let len = self.len();
233 if len < 10 {
234 write!(formatter, "EasyBuf{{len={}/{} {:?}}}", self.len(), self.buf.len(), bytes)
235 } else { write!(formatter, "EasyBuf{{len={}/{} [{}, {}, {}, {}, ..., {}, {}, {}, {}]}}", self.len(), self.buf.len(), bytes[0], bytes[1], bytes[2], bytes[3], bytes[len-4], bytes[len-3], bytes[len-2], bytes[len-1])
237 }
238 }
239}
240
241impl Into<Vec<u8>> for EasyBuf {
242 fn into(mut self) -> Vec<u8> {
243 mem::replace(self.get_mut().buf, vec![])
244 }
245}
246
247pub trait Codec {
259 type In;
261
262 type Out;
264
265 fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>>;
286
287 fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result<Self::In> {
294 match try!(self.decode(buf)) {
295 Some(frame) => Ok(frame),
296 None => Err(io::Error::new(io::ErrorKind::Other,
297 "bytes remaining on stream")),
298 }
299 }
300
301 fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()>;
307}
308
309#[must_use = "streams do nothing unless polled"]
314pub struct Framed<T, C> {
315 upstream: T,
316 codec: C,
317 eof: bool,
318 is_readable: bool,
319 rd: EasyBuf,
320 wr: Vec<u8>,
321}
322
323impl<T: Io, C: Codec> Stream for Framed<T, C> {
324 type Item = C::In;
325 type Error = io::Error;
326
327 fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
328 loop {
329 if self.is_readable {
333 if self.eof {
334 if self.rd.len() == 0 {
335 return Ok(None.into())
336 } else {
337 let frame = try!(self.codec.decode_eof(&mut self.rd));
338 return Ok(Async::Ready(Some(frame)))
339 }
340 }
341 trace!("attempting to decode a frame");
342 if let Some(frame) = try!(self.codec.decode(&mut self.rd)) {
343 trace!("frame decoded from buffer");
344 return Ok(Async::Ready(Some(frame)));
345 }
346 self.is_readable = false;
347 }
348
349 assert!(!self.eof);
350
351 let before = self.rd.len();
355 let ret = self.upstream.read_to_end(&mut self.rd.get_mut());
356 match ret {
357 Ok(_n) => self.eof = true,
358 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
359 if self.rd.len() == before {
360 return Ok(Async::NotReady)
361 }
362 }
363 Err(e) => return Err(e),
364 }
365 self.is_readable = true;
366 }
367 }
368}
369
370impl<T: Io, C: Codec> Sink for Framed<T, C> {
371 type SinkItem = C::Out;
372 type SinkError = io::Error;
373
374 fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
375 const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
378 if self.wr.len() > BACKPRESSURE_BOUNDARY {
379 try!(self.poll_complete());
380 if self.wr.len() > BACKPRESSURE_BOUNDARY {
381 return Ok(AsyncSink::NotReady(item));
382 }
383 }
384
385 try!(self.codec.encode(item, &mut self.wr));
386 Ok(AsyncSink::Ready)
387 }
388
389 fn poll_complete(&mut self) -> Poll<(), io::Error> {
390 trace!("flushing framed transport");
391
392 while !self.wr.is_empty() {
393 trace!("writing; remaining={}", self.wr.len());
394 let n = try_nb!(self.upstream.write(&self.wr));
395 if n == 0 {
396 return Err(io::Error::new(io::ErrorKind::WriteZero,
397 "failed to write frame to transport"));
398 }
399 self.wr.drain(..n);
400 }
401
402 try_nb!(self.upstream.flush());
404
405 trace!("framed transport flushed");
406 return Ok(Async::Ready(()));
407 }
408
409 fn close(&mut self) -> Poll<(), io::Error> {
410 try_ready!(self.poll_complete());
411 Ok(().into())
412 }
413}
414
415pub fn framed<T, C>(io: T, codec: C) -> Framed<T, C> {
416 Framed {
417 upstream: io,
418 codec: codec,
419 eof: false,
420 is_readable: false,
421 rd: EasyBuf::new(),
422 wr: Vec::with_capacity(INITIAL_CAPACITY),
423 }
424}
425
426impl<T, C> Framed<T, C> {
427
428 pub fn get_ref(&self) -> &T {
434 &self.upstream
435 }
436
437 pub fn get_mut(&mut self) -> &mut T {
444 &mut self.upstream
445 }
446
447 pub fn into_inner(self) -> T {
453 self.upstream
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::{INITIAL_CAPACITY, EasyBuf};
460 use std::mem;
461
462 #[test]
463 fn debug_empty_easybuf() {
464 let buf: EasyBuf = vec![].into();
465 assert_eq!("EasyBuf{len=0/0 []}", format!("{:?}", buf));
466 }
467
468 #[test]
469 fn debug_small_easybuf() {
470 let buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
471 assert_eq!("EasyBuf{len=6/6 [1, 2, 3, 4, 5, 6]}", format!("{:?}", buf));
472 }
473
474 #[test]
475 fn debug_small_easybuf_split() {
476 let mut buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
477 let split = buf.split_off(4);
478 assert_eq!("EasyBuf{len=4/6 [1, 2, 3, 4]}", format!("{:?}", buf));
479 assert_eq!("EasyBuf{len=2/6 [5, 6]}", format!("{:?}", split));
480 }
481
482 #[test]
483 fn debug_large_easybuf() {
484 let vec: Vec<u8> = (0u8..255u8).collect();
485 let buf: EasyBuf = vec.into();
486 assert_eq!("EasyBuf{len=255/255 [0, 1, 2, 3, ..., 251, 252, 253, 254]}", format!("{:?}", buf));
487 }
488
489 #[test]
490 fn easybuf_get_mut_sliced() {
491 let vec: Vec<u8> = (0u8..10u8).collect();
492 let mut buf: EasyBuf = vec.into();
493 buf.split_off(9);
494 buf.drain_to(3);
495 assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
496 }
497
498 #[test]
499 fn easybuf_get_mut_sliced_allocating_at_least_initial_capacity() {
500 let vec: Vec<u8> = (0u8..10u8).collect();
501 let mut buf: EasyBuf = vec.into();
502 buf.split_off(9);
503 buf.drain_to(3);
504 let clone = buf.clone();
506 assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
507 assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY);
508 mem::drop(clone); }
510
511 #[test]
512 fn easybuf_get_mut_sliced_allocating_required_capacity() {
513 let vec: Vec<u8> = (0..INITIAL_CAPACITY * 2).map(|_|0u8).collect();
514 let mut buf: EasyBuf = vec.into();
515 buf.drain_to(INITIAL_CAPACITY / 2);
516 let clone = buf.clone();
517 assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY + INITIAL_CAPACITY / 2);
518 mem::drop(clone)
519 }
520
521 #[test]
522 fn easybuf_into_vec_simple() {
523 let vec: Vec<u8> = (0u8..10u8).collect();
524 let reference = vec.clone();
525 let buf: EasyBuf = vec.into();
526 let original_pointer = buf.buf.as_ref().as_ptr();
527 let result: Vec<u8> = buf.into();
528 assert_eq!(result, reference);
529 let new_pointer = result.as_ptr();
530 assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
531 }
532
533 #[test]
534 fn easybuf_into_vec_sliced() {
535 let vec: Vec<u8> = (0u8..10u8).collect();
536 let mut buf: EasyBuf = vec.into();
537 let original_pointer = buf.buf.as_ref().as_ptr();
538 buf.split_off(9);
539 buf.drain_to(3);
540 let result: Vec<u8> = buf.into();
541 let reference: Vec<u8> = (3u8..9u8).collect();
542 assert_eq!(result, reference);
543 let new_pointer = result.as_ptr();
544 assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
545 }
546
547 #[test]
548 fn easybuf_into_vec_sliced_allocating() {
549 let vec: Vec<u8> = (0u8..10u8).collect();
550 let mut buf: EasyBuf = vec.into();
551 let original_pointer = buf.buf.as_ref().as_ptr();
552 let original = buf.clone();
554 buf.split_off(9);
555 buf.drain_to(3);
556 let result: Vec<u8> = buf.into();
557 let reference: Vec<u8> = (3u8..9u8).collect();
558 assert_eq!(result, reference);
559 let original_reference: EasyBuf =(0u8..10u8).collect::<Vec<u8>>().into();
560 assert_eq!(original.as_ref(), original_reference.as_ref());
561 let new_pointer = result.as_ptr();
562 assert_ne!(original_pointer, new_pointer, "A new vec should be allocated");
563 }
564
565 #[test]
566 fn easybuf_equality_same_underlying_vec() {
567 let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
568 assert_eq!(buf, buf);
569 let other = buf.drain_to(5);
570 assert_ne!(buf, other);
571
572 let buf: EasyBuf = (0u8..5).collect::<Vec<_>>().into();
573 assert_eq!(buf, other);
574 }
575
576 #[test]
577 fn easybuf_equality_different_underlying_vec() {
578 let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
579 let mut other: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
580 assert_eq!(buf, other);
581
582 buf = buf.drain_to(5);
583 assert_ne!(buf, other);
584
585 other = other.drain_to(5);
586 assert_eq!(buf, other);
587 }
588}