otter_support/
packetframe.rs

1// Copyright 2020-2021 Ian Jackson and contributors to Otter
2// SPDX-License-Identifier: AGPL-3.0-or-later
3// There is NO WARRANTY.
4
5//! Frame format:
6//!    zero or more chunks
7//!    end marker
8//!
9//! Chunk format:
10//!    u16     chunk length, nonzero
11//!    <n>     chunk data
12//! Packet end marker:
13//!    0u16         marker
14//!    0xffffu16    marker, error!
15
16use crate::prelude::*;
17
18// ---------- errors (MgmtChannel, anomalous name) ----------
19
20#[derive(Debug,Error)]
21pub enum PacketFrameReadError {
22  #[error("unexpected EOF")]         EOF,
23  #[error("parse MessagePack: {0}")] Parse(String),
24  #[error("{0}")]                    IO(#[from] io::Error),
25}
26
27#[derive(Debug,Error)]
28pub enum PacketFrameWriteError {
29  Serialize(rmp_serde::encode::Error), // but not ValueWriteError so no from
30  IO(#[from] io::Error),
31}
32display_as_debug!{PacketFrameWriteError}
33
34impl From<rmp_serde::encode::Error> for PacketFrameWriteError {
35  fn from(re: rmp_serde::encode::Error) -> PacketFrameWriteError {
36    use rmp_serde::encode::Error::*;
37    use PacketFrameWriteError as PFWE;
38    use rmp::encode::ValueWriteError as RVWE;
39    match re {
40      InvalidValueWrite(RVWE::InvalidMarkerWrite(ioe)) => PFWE::IO(ioe),
41      InvalidValueWrite(RVWE::InvalidDataWrite  (ioe)) => PFWE::IO(ioe),
42      ser@ (UnknownLength | InvalidDataModel(_) |
43            DepthLimitExceeded | Syntax(_)) => PFWE::Serialize(ser),
44    }
45  }
46}
47
48// ---------- common ----------
49
50type ChunkLen = u16;
51
52const CHUNK_MAX: ChunkLen = 65534;
53const CHUNK_ERR: ChunkLen = 65535;
54const CHUNK_DEF: ChunkLen = 8192;
55
56pub const BUFREADER_CAPACITY: usize = CHUNK_DEF as usize + 4;
57
58type BO = BigEndian;
59
60#[derive(Debug,Copy,Clone,Error)]
61#[error("error occurred at peer, during construction of frame data")]
62pub struct SenderError;
63
64#[derive(Debug)]
65pub struct Fuse<RW>{ inner: Result<RW, BrokenFuse<RW>> }
66
67/// An error saved by `Fuse` so it can be repeatedly returned.
68#[derive(Clone,Error,Debug)]
69pub struct Broken {
70  msg: String,
71  kind: io::ErrorKind,
72}
73
74#[derive(Debug)]
75pub struct BrokenFuse<RW> {
76  inner: Option<RW>, // always Some unless we panic crazily
77  error: Broken,
78}
79
80// ---------- read ----------
81
82#[derive(Debug)]
83pub struct FrameReader<R: Read> {
84  state: ReaderState,
85  inner: BufReader<Fuse<R>>,
86}
87
88#[derive(Debug)]
89pub struct ReadFrame<'r,R:Read> {
90  fr: &'r mut FrameReader<R>,
91}
92
93#[derive(Debug,Copy,Clone)]
94enum ReaderState {
95  InBuffer { ibuf_used: ChunkLen, chunk_remaining: ChunkLen },
96  InChunk { remaining: ChunkLen },
97  HadFrameEnd(Result<(), SenderError>),
98}
99use ReaderState::*;
100
101#[derive(Debug,Error)]
102enum ReadHeaderError {
103  TolerableEof,
104  IO(#[from] io::Error),
105}
106display_as_debug!{ReadHeaderError}
107use ReadHeaderError as RHE;
108
109// ---------- write ----------
110
111#[derive(Debug)]
112pub struct FrameWriter<W:Write> {
113  inner: Fuse<W>,
114  in_frame: Option<()>,
115}
116
117#[derive(Debug)]
118struct WriteFrameRaw<'w,W:Write> {
119  fw: &'w mut FrameWriter<W>,
120}
121
122#[derive(Debug)]
123pub struct WriteFrame<'w,W:Write> {
124  buf: BufWriter<WriteFrameRaw<'w,W>>,
125}
126
127// ==================== implementation -====================
128
129impl From<SenderError> for io::Error {
130  fn from(se: SenderError) -> io::Error {
131    io::Error::new(io::ErrorKind::Other, se)
132  }
133}
134
135// ---------- fuse ----------
136
137impl Display for Broken {
138  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
139    f.write_str(&self.msg)
140  }
141}
142impl From<Broken> for io::Error {
143  fn from(broken: Broken) -> io::Error {
144    io::Error::new(broken.kind, broken)
145  }
146}
147
148impl<RW> Fuse<RW> {
149  pub fn new(rw: RW) -> Self { Fuse { inner: Ok(rw) } }
150
151  #[throws(io::Error)]
152  pub fn get(&mut self) -> &mut RW {
153    self.inner.as_mut().map_err(|broken| broken.error.clone())?
154  }
155
156  #[throws(io::Error)]
157  pub fn with<F,T>(&mut self, f: F) -> T
158    where F: FnOnce(&mut RW) -> Result<T, io::Error>
159  {
160    let inner = self.get()?;
161    let r = f(inner);
162    // These are often retried, or treated specially, by higher
163    // layers.  EWOULDBLOCK can generally only occur as an actual
164    // error if the fd is wrongly made nonblocking.
165    if let Err(e) = r {
166      if e.kind() ==  ErrorKind::Interrupted ||
167         e.kind() ==  ErrorKind::WouldBlock { throw!(e) }
168      let error = Broken {
169        msg: e.to_string(),
170        kind: e.kind(),
171      };
172      let inner = mem::replace(&mut self.inner, Err(BrokenFuse {
173        inner: None,
174        error,
175      }));
176      self.inner.as_mut().map(|_|()).unwrap_err().inner = Some(
177        inner.map_err(|e| e.error).unwrap()
178      );
179      throw!(e);
180    }
181    r?
182  }
183
184  pub fn inner_even_broken(&self) -> &RW {
185    self.inner.as_ref().unwrap_or_else(|e| e.inner.as_ref().unwrap())
186  }
187  pub fn inner_even_broken_mut(&mut self) -> &mut RW {
188    self.inner.as_mut().unwrap_or_else(|e| e.inner.as_mut().unwrap())
189  }
190}
191
192impl<R:Read> Read for Fuse<R> {
193  #[throws(io::Error)]
194  fn read(&mut self, buf: &mut [u8]) -> usize {
195    self.with(|inner| inner.read(buf))?
196  }
197}
198impl<W:Write> Write for Fuse<W> {
199  #[throws(io::Error)]
200  fn write(&mut self, buf: &[u8]) -> usize {
201    self.with(|inner| inner.write(buf))?
202  }
203  #[throws(io::Error)]
204  fn flush(&mut self) {
205    self.with(|inner| inner.flush())?
206  }
207}
208
209// ---------- read ----------
210
211fn badeof() -> io::Error { io::ErrorKind::UnexpectedEof.into() }
212
213impl<R:Read> FrameReader<R> {
214  pub fn new(r: R) -> FrameReader<R> {
215    let r = Fuse::new(r);
216    let r = BufReader::with_capacity(BUFREADER_CAPACITY, r);
217    Self::from_bufreader(r)
218  }
219  pub fn from_bufreader(r: BufReader<Fuse<R>>) -> FrameReader<R> {
220    FrameReader { inner: r, state: HadFrameEnd(Ok(())) }
221  }
222
223  #[throws(io::Error)]
224  pub fn new_frame<'r>(&'r mut self) -> Option<ReadFrame<'r,R>> {
225    self.finish_reading_frame()?;
226
227    match self.read_chunk_header() {
228      Ok(_) => {},
229      Err(RHE::TolerableEof) => return None,
230      Err(RHE::IO(e)) => throw!(e),
231    }
232    Some(ReadFrame { fr: self })
233  }
234
235  #[throws(io::Error)]
236  pub fn into_stream(mut self) -> BufReader<Fuse<R>> {
237    self.finish_reading_frame()?;
238    self.inner
239  }
240
241  #[throws(io::Error)]
242  fn finish_reading_frame(&mut self) {
243    while matches_doesnot!(
244      self.state,
245      = InBuffer{..} | InChunk{..},
246      ! HadFrameEnd(..),
247    ) {
248      struct Discard;
249      impl ReadOutput for Discard {
250        #[inline]
251        fn copy_from_buf(&mut self, input: &[u8]) -> usize { input.len() }
252      }
253      self.read_from_frame(&mut Discard)?;
254    }
255  }
256
257  #[throws(ReadHeaderError)]
258  fn read_chunk_header(&mut self) {
259    assert!(matches_doesnot!(
260      self.state,
261      = InChunk { remaining: 0 },
262      = HadFrameEnd(..),
263      ! InChunk { remaining: _ },
264      ! InBuffer{..},
265    ), "bad state {:?}", self.state);
266
267    let header_value = {
268      let mut lbuf = [0u8;2];
269      let mut q = &mut lbuf[..];
270      match io::copy(
271        &mut (&mut self.inner).take(2),
272        &mut q,
273      )? {
274        // length of chunk header read
275        0 => throw!(RHE::TolerableEof), // EOF on underlying stream
276        1 => throw!(badeof()),
277        2 => (&lbuf[..]).read_u16::<BO>().unwrap(),
278        _ => panic!(),
279      }
280    };
281
282    self.state = match header_value {
283      0         => HadFrameEnd(Ok(())),
284      CHUNK_ERR => HadFrameEnd(Err(SenderError)),
285      len       => InChunk { remaining: len },
286    }
287  }
288
289  #[throws(io::Error)]
290  fn read_from_frame<O:ReadOutput+?Sized>(&mut self, output: &mut O) -> usize {
291    loop {
292      if let InBuffer { ref mut ibuf_used, chunk_remaining } = self.state {
293        let ibuf = self.inner.buffer();
294        let cando = &ibuf[ (*ibuf_used).into() ..
295                             min(ibuf.len(), chunk_remaining.into()) ];
296        let got = output.copy_from_buf(cando);
297        *ibuf_used += ChunkLen::try_from(got).unwrap();
298        if got != 0 { break got }
299        assert_eq!(cando.len(), 0);
300        self.inner.consume((*ibuf_used).into());
301        let remaining = chunk_remaining - *ibuf_used;
302        self.state = InChunk { remaining };
303      }
304
305      if let InChunk { remaining } = self.state {
306        if remaining > 0 {
307          let got = self.inner.fill_buf()?.len();
308          if got == 0 { throw!(badeof()) }
309          self.state = InBuffer { ibuf_used: 0, chunk_remaining: remaining };
310          continue;
311        }
312      }
313
314      match self.state {
315        InChunk { remaining: 0 } => { },
316        HadFrameEnd(Ok(())) => break 0,
317        HadFrameEnd(Err(e)) => throw!(e),
318        _ => panic!("bad state {:?}", self.state),
319      }
320
321      match self.read_chunk_header() {
322        Ok(()) => { },
323        Err(RHE::TolerableEof) => throw!(badeof()),
324        Err(RHE::IO(e)) => throw!(e),
325      }
326    }
327  }
328
329  #[throws(PacketFrameReadError)]
330  pub fn read_withbulk<'c,T>(&'c mut self) -> (T, ReadFrame<'c,R>)
331  where T: DeserializeOwned + Debug
332  {
333    let mut f = self.new_frame()?.ok_or(PacketFrameReadError::EOF)?;
334    let v = f.read_rmp()?;
335    trace!("read OK {:?}", &v);
336    (v, f)
337  }
338
339  #[throws(PacketFrameReadError)]
340  pub fn read<T>(&mut self) -> T
341  where T: DeserializeOwned + Debug
342  {
343    self.read_withbulk()?.0
344  }
345
346pub fn inner    (&    self)->&    R{ self.inner.get_ref().inner_even_broken() }
347pub fn inner_mut(&mut self)->&mut R{ self.inner.get_mut().inner_even_broken_mut() }
348}
349
350impl<'r,R:Read> ReadFrame<'r,R> {
351  pub fn inner    (&    self) -> &    R { self.fr.inner()     }
352  pub fn inner_mut(&mut self) -> &mut R { self.fr.inner_mut() }
353}
354
355#[ext(pub, name=ReadExt)]
356impl<R: Read> R {
357  #[throws(PacketFrameReadError)]
358  fn read_rmp<T>(&mut self) -> T
359  where T: DeserializeOwned,
360        R: Read
361  {
362    use PacketFrameReadError as PFRE;
363    let r = rmp_serde::decode::from_read(self);
364    let v = r.map_err(|e| PFRE::Parse(format!("{}", &e)))?;
365    v
366  }
367}
368
369trait ReadOutput {
370  fn copy_from_buf(&mut self, input: &[u8]) -> usize;
371}
372    
373impl ReadOutput for [u8] {
374  #[inline]
375  fn copy_from_buf(&mut self, input: &[u8]) -> usize {
376    let mut p = self;
377    p.write(input).unwrap()
378  }
379}
380
381impl<'r, R:Read> Read for ReadFrame<'r, R> {
382  #[throws(io::Error)]
383  fn read(&mut self, buf: &mut [u8]) -> usize {
384    if buf.len() == 0 { return 0 }
385    self.fr.read_from_frame(buf)?
386  }
387}
388
389// ---------- write ----------
390
391impl<W:Write> FrameWriter<W> {
392  pub fn new(w: W) -> FrameWriter<W> {
393    FrameWriter { inner: Fuse::new(w), in_frame: None }
394  }
395
396  #[throws(io::Error)]
397  pub fn into_stream(mut self) -> Fuse<W> {
398    self.tidy()?;
399    self.inner
400  }
401
402  #[throws(io::Error)]
403  pub fn new_frame<'w>(&'w mut self) -> WriteFrame<'w,W> {
404    self.tidy()?;
405    self.in_frame = Some(());
406    let raw = WriteFrameRaw { fw: self };
407    let buf = BufWriter::with_capacity(CHUNK_DEF.into(), raw);
408    WriteFrame { buf }
409  }
410
411  #[throws(io::Error)]
412  pub fn flush(&mut self) {
413    self.tidy()?;
414    self.inner.flush()?;
415  }
416
417  #[throws(io::Error)]
418  fn tidy(&mut self) {
419    self.finish_any_frame(Err(SenderError))?;
420  }
421
422  #[throws(io::Error)]
423  fn finish_any_frame(&mut self, how: Result<(), SenderError>) {
424    if let Some(_) = self.in_frame {
425      self.inner.write_u16::<BO>(match how {
426        Ok(()) => 0,
427        Err(SenderError) => CHUNK_ERR,
428      })?;
429      self.in_frame = None;
430      self.inner.flush()?;
431    }
432  }
433
434  #[throws(PacketFrameWriteError)]
435  pub fn write_withbulk<'c>(&'c mut self) -> ResponseWriter<'c,W> {
436    ResponseWriter { f: self.new_frame()? }
437  }
438
439  #[throws(PacketFrameWriteError)]
440  pub fn write<T>(&mut self, val: &T)
441  where T: Serialize + Debug
442  {
443    let f = self.write_withbulk()?.respond(val)?;
444    f.finish()?;
445  }
446}
447
448impl<'w,W:Write> WriteFrame<'w,W> {
449  #[throws(io::Error)]
450  pub fn finish_with(self, how: Result<(), SenderError>) {
451    self.buf
452      .into_inner()
453      .map_err(|e| e.into_error())?
454      .fw
455      .finish_any_frame(how)?
456  }
457
458  #[throws(io::Error)]
459  pub fn finish(self) { self.finish_with(Ok(()))? }
460}
461impl<'w,W:Write> Drop for WriteFrameRaw<'w,W> {
462  fn drop(&mut self) {
463    self.fw.tidy()
464      .unwrap_or_else(|_: io::Error| () /* Fuse will replicate this */);
465  }
466}
467impl<'w,W:Write> Write for WriteFrameRaw<'w,W> {
468  #[throws(io::Error)]
469  fn write(&mut self, buf: &[u8]) -> usize {
470    let now = min(buf.len(), CHUNK_MAX.into());
471    self.fw.inner.write_u16::<BO>(now.try_into().unwrap())?;
472    self.fw.inner.write_all(&buf[0..now])?;
473    now
474  }
475
476  #[throws(io::Error)]
477  fn flush(&mut self) {
478    self.fw.inner.flush()?
479  }
480}
481impl<'w,W:Write> Write for WriteFrame<'w,W> {
482  #[throws(io::Error)]
483  fn write(&mut self, buf: &[u8]) -> usize { self.buf.write(buf)? }
484  #[throws(io::Error)]
485  fn flush(&mut self) { self.buf.flush()? }
486}
487
488pub struct ResponseWriter<'c,W:Write> { f: WriteFrame<'c,W> }
489
490impl<'c,W:Write> ResponseWriter<'c,W> {
491  #[throws(PacketFrameWriteError)]
492  pub fn respond<'t,T>(mut self, val: &'t T) -> WriteFrame<'c,W>
493  where T: Serialize + Debug
494  {
495    rmp_serde::encode::write_named(&mut self.f, val)?;
496    trace!("writing {:?}", val);
497    self.f
498  }
499
500  #[throws(PacketFrameWriteError)]
501  pub fn progress_with<RESP: Serialize>(&mut self, resp: RESP) {
502    rmp_serde::encode::write_named(&mut self.f, &resp)?;
503    self.f.flush()?;
504  }
505/*
506  #[throws(PacketFrameWriteError)]
507  pub fn progress(&mut self, pi: ProgressInfo<'_>) {
508    let resp = crate::commands::MgmtResponse::Progress(pi.into_owned());
509    rmp_serde::encode::write_named(&mut self.f, &resp)?;
510    self.f.flush()?;
511  }*/
512}
513
514// ==================== tests ====================
515
516#[test]
517fn write_test(){
518
519  // pretty printing the test message buffer
520  #[derive(Clone,Default,Deref,DerefMut)]
521  struct Framed {
522    buf: Vec<u8>,
523  }
524  impl Debug for Framed {
525    #[throws(fmt::Error)]
526    fn fmt(&self, f: &mut fmt::Formatter) {
527      let mut delim = iter::once("[").chain(iter::repeat(" "));
528      let mut p = self.buf.as_slice();
529      macro_rules! byte { () => {
530        let b = p.read_u8().unwrap();
531        write!(f, "{:02x}", b)?;
532      } }
533      while p.len() > 0 {
534        write!(f, "{}", delim.next().unwrap())?;
535        if_let!{ Ok(l) = p.read_u16::<BO>(); else byte!(); continue; }
536        write!(f, "{:04x}", l)?;
537        if l == 0 || l == CHUNK_ERR { continue }
538        write!(f, " ")?;
539        let l = l.into();
540        if_chain! {
541          if l <= p.len();
542          let s = &p[0..l];
543          if let Ok(s) = str::from_utf8(s);
544          then {
545            p = &p[l..];
546            write!(f, "{:?}", s)?;
547          }
548          else {
549            for _ in 0..min(l, p.len()) { byte!(); }
550          }
551        }
552      }
553      write!(f, "]")?;
554    }
555  }
556
557  // make the test message buffer
558  let mut msg = Framed::default();
559  let mut wr = FrameWriter::new(&mut msg.buf);
560  {
561    let mut frame = wr.new_frame().unwrap();
562    frame.write(b"hello").unwrap();
563    frame.finish().unwrap();
564  }
565  {
566    let mut frame = wr.new_frame().unwrap();
567    frame.write(b"boom").unwrap();
568  }
569  {
570    let frame = wr.new_frame().unwrap();
571    frame.finish().unwrap();
572  }
573  (||{
574    msg.buf.write_u16::<BO>(3)?;
575    msg.buf.write(b"lon")?;
576    msg.buf.write_u16::<BO>(4)?;
577    msg.buf.write(b"ger!")?;
578    msg.buf.write_u16::<BO>(0)?;
579    Ok::<_,AE>(())
580  })().unwrap();
581  dbgc!(&msg);
582
583  // utility functions for helping with test reads
584  fn expect_boom<R:Read>(rd: &mut FrameReader<R>) {
585    let mut buf = [0u8;10];
586    let mut frame = rd.new_frame().unwrap().unwrap();
587    let mut before: Vec<u8> = vec![];
588    let r = loop {
589      match frame.read(&mut buf) {
590        Ok(y) => before.extend(&buf[0..y]),
591        Err(e) => break e,
592      };
593    };
594    dbgc!(&r);
595    assert_eq!(r.kind(), ErrorKind::Other);
596    assert!(r.into_inner().unwrap().is::<SenderError>());
597    assert_eq!(before, b"boom");
598  }
599  fn expect_is_bad_eof(ioe: io::Error) {
600    assert_eq!(ioe.kind(), ErrorKind::UnexpectedEof);
601    ioe.into_inner().map(|i| panic!("unexpected {:?}", &i));
602  }
603  fn expect_bad_eof<R:Read>(frame: &mut ReadFrame<R>) {
604    let mut buf = [0u8;10];
605    let r = frame.read(&mut buf).unwrap_err();
606    expect_is_bad_eof(r);
607  }
608
609  // a very simple test as far as the first boom
610  let mut rd = FrameReader::new(&*msg.buf);
611  let mut buf = [0u8;10];
612  {
613    let mut frame = rd.new_frame().unwrap().unwrap();
614    let y = frame.read(&mut buf).unwrap();
615    dbgc!(str::from_utf8(&buf[0..y]).unwrap());
616  }
617  expect_boom(&mut rd);
618
619  // check how dropping a reading frame works
620  let mut rd = FrameReader::new(&*msg.buf);
621  {
622    let mut _frame = rd.new_frame().unwrap();
623  }
624  expect_boom(&mut rd);
625
626  // utilitiesfor reading the whole input, collecting into vecs
627  #[cfg(not(miri))]
628  fn expect_good<R:Read>(rd: &mut FrameReader<R>, expected: &[u8]) {
629    let mut buf = vec![];
630    let mut frame = rd.new_frame().unwrap().unwrap();
631    frame.read_to_end(&mut buf).unwrap();
632    assert_eq!(&*buf ,expected);
633    dbgc!(str::from_utf8(&buf).unwrap());
634  }
635  #[cfg(not(miri))]
636  fn expect_good_eof<R:Read>(rd: &mut FrameReader<R>) {
637    let frame = rd.new_frame().unwrap(); assert!(frame.is_none());
638    let frame = rd.new_frame().unwrap(); assert!(frame.is_none());
639  }
640
641  // try lumpy reads (ie, short reads) at every plausible boundary size
642  // this approach is not very principled but ought to test every boundary
643  #[cfg(not(miri))]
644  for lumpsize in 1..=msg.buf.len()+1 {
645    #[derive(Debug)]
646    struct LumpReader<R: Read> {
647      inner: R,
648      inlump: usize,
649      lumpsize: usize,
650    }
651    impl<R:Read> LumpReader<R> {
652      fn new(lumpsize: usize, inner: R) -> Self {
653        LumpReader { inner, lumpsize, inlump: 0 }
654      }
655    }
656    impl<R:Read> Read for LumpReader<R> {
657      #[throws(io::Error)]
658      fn read(&mut self, buf: &mut [u8]) -> usize {
659        if self.inlump == 0 { self.inlump = self.lumpsize }
660        let want = min(self.inlump, buf.len());
661        let r = self.inner.read(&mut buf[0..want])?;
662        self.inlump -= r;
663        r
664      }
665    }
666
667    for bufsize in 1..=msg.buf.len()+1 {
668      dbgc!(lumpsize, bufsize);
669      let rd = LumpReader::new(lumpsize, &*msg.buf);
670      let rd = Fuse::new(rd);
671      let rd = BufReader::with_capacity(bufsize, rd);
672      let mut rd = FrameReader::from_bufreader(rd);
673
674      expect_good(&mut rd, b"hello");
675      expect_boom(&mut rd);
676      expect_good(&mut rd, b"");
677      expect_good(&mut rd, b"longer!");
678      expect_good_eof(&mut rd);
679    }
680  }
681
682  // Unexpected EOF mid-chunk-header
683  {
684    let mut rd = FrameReader::new(&[0x55][..]);
685    let r = rd.new_frame().unwrap_err();
686    expect_is_bad_eof(r);
687  }
688
689  // Unexpected EOF mid-data
690  {
691    let mut rd = FrameReader::new(&msg.buf[0..3]);
692    let mut frame = rd.new_frame().unwrap().unwrap();
693    let y = frame.read(&mut buf).unwrap();
694    assert_eq!(y, 1);
695    expect_bad_eof(&mut frame);
696  }
697
698  // Unexpected EOF after nonempty chunk
699  {
700    let mut rd = FrameReader::new(&msg.buf[0..7]);
701    let mut frame = rd.new_frame().unwrap().unwrap();
702    let y = frame.read(&mut buf).unwrap();
703    assert_eq!(&buf[0..y], b"hello");
704    expect_bad_eof(&mut frame);
705  }
706}