1use crate::prelude::*;
17
18#[derive(Debug,Error)]
21pub enum PacketFrameReadError {
22 #[error("unexpected EOF")] EOF,
23 #[error("parse MessagePack: {0}")] Parse(String),
24 #[error("{0}")] IO(#[from] io::Error),
25}
26
27#[derive(Debug,Error)]
28pub enum PacketFrameWriteError {
29 Serialize(rmp_serde::encode::Error), IO(#[from] io::Error),
31}
32display_as_debug!{PacketFrameWriteError}
33
34impl From<rmp_serde::encode::Error> for PacketFrameWriteError {
35 fn from(re: rmp_serde::encode::Error) -> PacketFrameWriteError {
36 use rmp_serde::encode::Error::*;
37 use PacketFrameWriteError as PFWE;
38 use rmp::encode::ValueWriteError as RVWE;
39 match re {
40 InvalidValueWrite(RVWE::InvalidMarkerWrite(ioe)) => PFWE::IO(ioe),
41 InvalidValueWrite(RVWE::InvalidDataWrite (ioe)) => PFWE::IO(ioe),
42 ser@ (UnknownLength | InvalidDataModel(_) |
43 DepthLimitExceeded | Syntax(_)) => PFWE::Serialize(ser),
44 }
45 }
46}
47
48type ChunkLen = u16;
51
52const CHUNK_MAX: ChunkLen = 65534;
53const CHUNK_ERR: ChunkLen = 65535;
54const CHUNK_DEF: ChunkLen = 8192;
55
56pub const BUFREADER_CAPACITY: usize = CHUNK_DEF as usize + 4;
57
58type BO = BigEndian;
59
60#[derive(Debug,Copy,Clone,Error)]
61#[error("error occurred at peer, during construction of frame data")]
62pub struct SenderError;
63
64#[derive(Debug)]
65pub struct Fuse<RW>{ inner: Result<RW, BrokenFuse<RW>> }
66
67#[derive(Clone,Error,Debug)]
69pub struct Broken {
70 msg: String,
71 kind: io::ErrorKind,
72}
73
74#[derive(Debug)]
75pub struct BrokenFuse<RW> {
76 inner: Option<RW>, error: Broken,
78}
79
80#[derive(Debug)]
83pub struct FrameReader<R: Read> {
84 state: ReaderState,
85 inner: BufReader<Fuse<R>>,
86}
87
88#[derive(Debug)]
89pub struct ReadFrame<'r,R:Read> {
90 fr: &'r mut FrameReader<R>,
91}
92
93#[derive(Debug,Copy,Clone)]
94enum ReaderState {
95 InBuffer { ibuf_used: ChunkLen, chunk_remaining: ChunkLen },
96 InChunk { remaining: ChunkLen },
97 HadFrameEnd(Result<(), SenderError>),
98}
99use ReaderState::*;
100
101#[derive(Debug,Error)]
102enum ReadHeaderError {
103 TolerableEof,
104 IO(#[from] io::Error),
105}
106display_as_debug!{ReadHeaderError}
107use ReadHeaderError as RHE;
108
109#[derive(Debug)]
112pub struct FrameWriter<W:Write> {
113 inner: Fuse<W>,
114 in_frame: Option<()>,
115}
116
117#[derive(Debug)]
118struct WriteFrameRaw<'w,W:Write> {
119 fw: &'w mut FrameWriter<W>,
120}
121
122#[derive(Debug)]
123pub struct WriteFrame<'w,W:Write> {
124 buf: BufWriter<WriteFrameRaw<'w,W>>,
125}
126
127impl From<SenderError> for io::Error {
130 fn from(se: SenderError) -> io::Error {
131 io::Error::new(io::ErrorKind::Other, se)
132 }
133}
134
135impl Display for Broken {
138 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
139 f.write_str(&self.msg)
140 }
141}
142impl From<Broken> for io::Error {
143 fn from(broken: Broken) -> io::Error {
144 io::Error::new(broken.kind, broken)
145 }
146}
147
148impl<RW> Fuse<RW> {
149 pub fn new(rw: RW) -> Self { Fuse { inner: Ok(rw) } }
150
151 #[throws(io::Error)]
152 pub fn get(&mut self) -> &mut RW {
153 self.inner.as_mut().map_err(|broken| broken.error.clone())?
154 }
155
156 #[throws(io::Error)]
157 pub fn with<F,T>(&mut self, f: F) -> T
158 where F: FnOnce(&mut RW) -> Result<T, io::Error>
159 {
160 let inner = self.get()?;
161 let r = f(inner);
162 if let Err(e) = r {
166 if e.kind() == ErrorKind::Interrupted ||
167 e.kind() == ErrorKind::WouldBlock { throw!(e) }
168 let error = Broken {
169 msg: e.to_string(),
170 kind: e.kind(),
171 };
172 let inner = mem::replace(&mut self.inner, Err(BrokenFuse {
173 inner: None,
174 error,
175 }));
176 self.inner.as_mut().map(|_|()).unwrap_err().inner = Some(
177 inner.map_err(|e| e.error).unwrap()
178 );
179 throw!(e);
180 }
181 r?
182 }
183
184 pub fn inner_even_broken(&self) -> &RW {
185 self.inner.as_ref().unwrap_or_else(|e| e.inner.as_ref().unwrap())
186 }
187 pub fn inner_even_broken_mut(&mut self) -> &mut RW {
188 self.inner.as_mut().unwrap_or_else(|e| e.inner.as_mut().unwrap())
189 }
190}
191
192impl<R:Read> Read for Fuse<R> {
193 #[throws(io::Error)]
194 fn read(&mut self, buf: &mut [u8]) -> usize {
195 self.with(|inner| inner.read(buf))?
196 }
197}
198impl<W:Write> Write for Fuse<W> {
199 #[throws(io::Error)]
200 fn write(&mut self, buf: &[u8]) -> usize {
201 self.with(|inner| inner.write(buf))?
202 }
203 #[throws(io::Error)]
204 fn flush(&mut self) {
205 self.with(|inner| inner.flush())?
206 }
207}
208
209fn badeof() -> io::Error { io::ErrorKind::UnexpectedEof.into() }
212
213impl<R:Read> FrameReader<R> {
214 pub fn new(r: R) -> FrameReader<R> {
215 let r = Fuse::new(r);
216 let r = BufReader::with_capacity(BUFREADER_CAPACITY, r);
217 Self::from_bufreader(r)
218 }
219 pub fn from_bufreader(r: BufReader<Fuse<R>>) -> FrameReader<R> {
220 FrameReader { inner: r, state: HadFrameEnd(Ok(())) }
221 }
222
223 #[throws(io::Error)]
224 pub fn new_frame<'r>(&'r mut self) -> Option<ReadFrame<'r,R>> {
225 self.finish_reading_frame()?;
226
227 match self.read_chunk_header() {
228 Ok(_) => {},
229 Err(RHE::TolerableEof) => return None,
230 Err(RHE::IO(e)) => throw!(e),
231 }
232 Some(ReadFrame { fr: self })
233 }
234
235 #[throws(io::Error)]
236 pub fn into_stream(mut self) -> BufReader<Fuse<R>> {
237 self.finish_reading_frame()?;
238 self.inner
239 }
240
241 #[throws(io::Error)]
242 fn finish_reading_frame(&mut self) {
243 while matches_doesnot!(
244 self.state,
245 = InBuffer{..} | InChunk{..},
246 ! HadFrameEnd(..),
247 ) {
248 struct Discard;
249 impl ReadOutput for Discard {
250 #[inline]
251 fn copy_from_buf(&mut self, input: &[u8]) -> usize { input.len() }
252 }
253 self.read_from_frame(&mut Discard)?;
254 }
255 }
256
257 #[throws(ReadHeaderError)]
258 fn read_chunk_header(&mut self) {
259 assert!(matches_doesnot!(
260 self.state,
261 = InChunk { remaining: 0 },
262 = HadFrameEnd(..),
263 ! InChunk { remaining: _ },
264 ! InBuffer{..},
265 ), "bad state {:?}", self.state);
266
267 let header_value = {
268 let mut lbuf = [0u8;2];
269 let mut q = &mut lbuf[..];
270 match io::copy(
271 &mut (&mut self.inner).take(2),
272 &mut q,
273 )? {
274 0 => throw!(RHE::TolerableEof), 1 => throw!(badeof()),
277 2 => (&lbuf[..]).read_u16::<BO>().unwrap(),
278 _ => panic!(),
279 }
280 };
281
282 self.state = match header_value {
283 0 => HadFrameEnd(Ok(())),
284 CHUNK_ERR => HadFrameEnd(Err(SenderError)),
285 len => InChunk { remaining: len },
286 }
287 }
288
289 #[throws(io::Error)]
290 fn read_from_frame<O:ReadOutput+?Sized>(&mut self, output: &mut O) -> usize {
291 loop {
292 if let InBuffer { ref mut ibuf_used, chunk_remaining } = self.state {
293 let ibuf = self.inner.buffer();
294 let cando = &ibuf[ (*ibuf_used).into() ..
295 min(ibuf.len(), chunk_remaining.into()) ];
296 let got = output.copy_from_buf(cando);
297 *ibuf_used += ChunkLen::try_from(got).unwrap();
298 if got != 0 { break got }
299 assert_eq!(cando.len(), 0);
300 self.inner.consume((*ibuf_used).into());
301 let remaining = chunk_remaining - *ibuf_used;
302 self.state = InChunk { remaining };
303 }
304
305 if let InChunk { remaining } = self.state {
306 if remaining > 0 {
307 let got = self.inner.fill_buf()?.len();
308 if got == 0 { throw!(badeof()) }
309 self.state = InBuffer { ibuf_used: 0, chunk_remaining: remaining };
310 continue;
311 }
312 }
313
314 match self.state {
315 InChunk { remaining: 0 } => { },
316 HadFrameEnd(Ok(())) => break 0,
317 HadFrameEnd(Err(e)) => throw!(e),
318 _ => panic!("bad state {:?}", self.state),
319 }
320
321 match self.read_chunk_header() {
322 Ok(()) => { },
323 Err(RHE::TolerableEof) => throw!(badeof()),
324 Err(RHE::IO(e)) => throw!(e),
325 }
326 }
327 }
328
329 #[throws(PacketFrameReadError)]
330 pub fn read_withbulk<'c,T>(&'c mut self) -> (T, ReadFrame<'c,R>)
331 where T: DeserializeOwned + Debug
332 {
333 let mut f = self.new_frame()?.ok_or(PacketFrameReadError::EOF)?;
334 let v = f.read_rmp()?;
335 trace!("read OK {:?}", &v);
336 (v, f)
337 }
338
339 #[throws(PacketFrameReadError)]
340 pub fn read<T>(&mut self) -> T
341 where T: DeserializeOwned + Debug
342 {
343 self.read_withbulk()?.0
344 }
345
346pub fn inner (& self)->& R{ self.inner.get_ref().inner_even_broken() }
347pub fn inner_mut(&mut self)->&mut R{ self.inner.get_mut().inner_even_broken_mut() }
348}
349
350impl<'r,R:Read> ReadFrame<'r,R> {
351 pub fn inner (& self) -> & R { self.fr.inner() }
352 pub fn inner_mut(&mut self) -> &mut R { self.fr.inner_mut() }
353}
354
355#[ext(pub, name=ReadExt)]
356impl<R: Read> R {
357 #[throws(PacketFrameReadError)]
358 fn read_rmp<T>(&mut self) -> T
359 where T: DeserializeOwned,
360 R: Read
361 {
362 use PacketFrameReadError as PFRE;
363 let r = rmp_serde::decode::from_read(self);
364 let v = r.map_err(|e| PFRE::Parse(format!("{}", &e)))?;
365 v
366 }
367}
368
369trait ReadOutput {
370 fn copy_from_buf(&mut self, input: &[u8]) -> usize;
371}
372
373impl ReadOutput for [u8] {
374 #[inline]
375 fn copy_from_buf(&mut self, input: &[u8]) -> usize {
376 let mut p = self;
377 p.write(input).unwrap()
378 }
379}
380
381impl<'r, R:Read> Read for ReadFrame<'r, R> {
382 #[throws(io::Error)]
383 fn read(&mut self, buf: &mut [u8]) -> usize {
384 if buf.len() == 0 { return 0 }
385 self.fr.read_from_frame(buf)?
386 }
387}
388
389impl<W:Write> FrameWriter<W> {
392 pub fn new(w: W) -> FrameWriter<W> {
393 FrameWriter { inner: Fuse::new(w), in_frame: None }
394 }
395
396 #[throws(io::Error)]
397 pub fn into_stream(mut self) -> Fuse<W> {
398 self.tidy()?;
399 self.inner
400 }
401
402 #[throws(io::Error)]
403 pub fn new_frame<'w>(&'w mut self) -> WriteFrame<'w,W> {
404 self.tidy()?;
405 self.in_frame = Some(());
406 let raw = WriteFrameRaw { fw: self };
407 let buf = BufWriter::with_capacity(CHUNK_DEF.into(), raw);
408 WriteFrame { buf }
409 }
410
411 #[throws(io::Error)]
412 pub fn flush(&mut self) {
413 self.tidy()?;
414 self.inner.flush()?;
415 }
416
417 #[throws(io::Error)]
418 fn tidy(&mut self) {
419 self.finish_any_frame(Err(SenderError))?;
420 }
421
422 #[throws(io::Error)]
423 fn finish_any_frame(&mut self, how: Result<(), SenderError>) {
424 if let Some(_) = self.in_frame {
425 self.inner.write_u16::<BO>(match how {
426 Ok(()) => 0,
427 Err(SenderError) => CHUNK_ERR,
428 })?;
429 self.in_frame = None;
430 self.inner.flush()?;
431 }
432 }
433
434 #[throws(PacketFrameWriteError)]
435 pub fn write_withbulk<'c>(&'c mut self) -> ResponseWriter<'c,W> {
436 ResponseWriter { f: self.new_frame()? }
437 }
438
439 #[throws(PacketFrameWriteError)]
440 pub fn write<T>(&mut self, val: &T)
441 where T: Serialize + Debug
442 {
443 let f = self.write_withbulk()?.respond(val)?;
444 f.finish()?;
445 }
446}
447
448impl<'w,W:Write> WriteFrame<'w,W> {
449 #[throws(io::Error)]
450 pub fn finish_with(self, how: Result<(), SenderError>) {
451 self.buf
452 .into_inner()
453 .map_err(|e| e.into_error())?
454 .fw
455 .finish_any_frame(how)?
456 }
457
458 #[throws(io::Error)]
459 pub fn finish(self) { self.finish_with(Ok(()))? }
460}
461impl<'w,W:Write> Drop for WriteFrameRaw<'w,W> {
462 fn drop(&mut self) {
463 self.fw.tidy()
464 .unwrap_or_else(|_: io::Error| () );
465 }
466}
467impl<'w,W:Write> Write for WriteFrameRaw<'w,W> {
468 #[throws(io::Error)]
469 fn write(&mut self, buf: &[u8]) -> usize {
470 let now = min(buf.len(), CHUNK_MAX.into());
471 self.fw.inner.write_u16::<BO>(now.try_into().unwrap())?;
472 self.fw.inner.write_all(&buf[0..now])?;
473 now
474 }
475
476 #[throws(io::Error)]
477 fn flush(&mut self) {
478 self.fw.inner.flush()?
479 }
480}
481impl<'w,W:Write> Write for WriteFrame<'w,W> {
482 #[throws(io::Error)]
483 fn write(&mut self, buf: &[u8]) -> usize { self.buf.write(buf)? }
484 #[throws(io::Error)]
485 fn flush(&mut self) { self.buf.flush()? }
486}
487
488pub struct ResponseWriter<'c,W:Write> { f: WriteFrame<'c,W> }
489
490impl<'c,W:Write> ResponseWriter<'c,W> {
491 #[throws(PacketFrameWriteError)]
492 pub fn respond<'t,T>(mut self, val: &'t T) -> WriteFrame<'c,W>
493 where T: Serialize + Debug
494 {
495 rmp_serde::encode::write_named(&mut self.f, val)?;
496 trace!("writing {:?}", val);
497 self.f
498 }
499
500 #[throws(PacketFrameWriteError)]
501 pub fn progress_with<RESP: Serialize>(&mut self, resp: RESP) {
502 rmp_serde::encode::write_named(&mut self.f, &resp)?;
503 self.f.flush()?;
504 }
505}
513
514#[test]
517fn write_test(){
518
519 #[derive(Clone,Default,Deref,DerefMut)]
521 struct Framed {
522 buf: Vec<u8>,
523 }
524 impl Debug for Framed {
525 #[throws(fmt::Error)]
526 fn fmt(&self, f: &mut fmt::Formatter) {
527 let mut delim = iter::once("[").chain(iter::repeat(" "));
528 let mut p = self.buf.as_slice();
529 macro_rules! byte { () => {
530 let b = p.read_u8().unwrap();
531 write!(f, "{:02x}", b)?;
532 } }
533 while p.len() > 0 {
534 write!(f, "{}", delim.next().unwrap())?;
535 if_let!{ Ok(l) = p.read_u16::<BO>(); else byte!(); continue; }
536 write!(f, "{:04x}", l)?;
537 if l == 0 || l == CHUNK_ERR { continue }
538 write!(f, " ")?;
539 let l = l.into();
540 if_chain! {
541 if l <= p.len();
542 let s = &p[0..l];
543 if let Ok(s) = str::from_utf8(s);
544 then {
545 p = &p[l..];
546 write!(f, "{:?}", s)?;
547 }
548 else {
549 for _ in 0..min(l, p.len()) { byte!(); }
550 }
551 }
552 }
553 write!(f, "]")?;
554 }
555 }
556
557 let mut msg = Framed::default();
559 let mut wr = FrameWriter::new(&mut msg.buf);
560 {
561 let mut frame = wr.new_frame().unwrap();
562 frame.write(b"hello").unwrap();
563 frame.finish().unwrap();
564 }
565 {
566 let mut frame = wr.new_frame().unwrap();
567 frame.write(b"boom").unwrap();
568 }
569 {
570 let frame = wr.new_frame().unwrap();
571 frame.finish().unwrap();
572 }
573 (||{
574 msg.buf.write_u16::<BO>(3)?;
575 msg.buf.write(b"lon")?;
576 msg.buf.write_u16::<BO>(4)?;
577 msg.buf.write(b"ger!")?;
578 msg.buf.write_u16::<BO>(0)?;
579 Ok::<_,AE>(())
580 })().unwrap();
581 dbgc!(&msg);
582
583 fn expect_boom<R:Read>(rd: &mut FrameReader<R>) {
585 let mut buf = [0u8;10];
586 let mut frame = rd.new_frame().unwrap().unwrap();
587 let mut before: Vec<u8> = vec![];
588 let r = loop {
589 match frame.read(&mut buf) {
590 Ok(y) => before.extend(&buf[0..y]),
591 Err(e) => break e,
592 };
593 };
594 dbgc!(&r);
595 assert_eq!(r.kind(), ErrorKind::Other);
596 assert!(r.into_inner().unwrap().is::<SenderError>());
597 assert_eq!(before, b"boom");
598 }
599 fn expect_is_bad_eof(ioe: io::Error) {
600 assert_eq!(ioe.kind(), ErrorKind::UnexpectedEof);
601 ioe.into_inner().map(|i| panic!("unexpected {:?}", &i));
602 }
603 fn expect_bad_eof<R:Read>(frame: &mut ReadFrame<R>) {
604 let mut buf = [0u8;10];
605 let r = frame.read(&mut buf).unwrap_err();
606 expect_is_bad_eof(r);
607 }
608
609 let mut rd = FrameReader::new(&*msg.buf);
611 let mut buf = [0u8;10];
612 {
613 let mut frame = rd.new_frame().unwrap().unwrap();
614 let y = frame.read(&mut buf).unwrap();
615 dbgc!(str::from_utf8(&buf[0..y]).unwrap());
616 }
617 expect_boom(&mut rd);
618
619 let mut rd = FrameReader::new(&*msg.buf);
621 {
622 let mut _frame = rd.new_frame().unwrap();
623 }
624 expect_boom(&mut rd);
625
626 #[cfg(not(miri))]
628 fn expect_good<R:Read>(rd: &mut FrameReader<R>, expected: &[u8]) {
629 let mut buf = vec![];
630 let mut frame = rd.new_frame().unwrap().unwrap();
631 frame.read_to_end(&mut buf).unwrap();
632 assert_eq!(&*buf ,expected);
633 dbgc!(str::from_utf8(&buf).unwrap());
634 }
635 #[cfg(not(miri))]
636 fn expect_good_eof<R:Read>(rd: &mut FrameReader<R>) {
637 let frame = rd.new_frame().unwrap(); assert!(frame.is_none());
638 let frame = rd.new_frame().unwrap(); assert!(frame.is_none());
639 }
640
641 #[cfg(not(miri))]
644 for lumpsize in 1..=msg.buf.len()+1 {
645 #[derive(Debug)]
646 struct LumpReader<R: Read> {
647 inner: R,
648 inlump: usize,
649 lumpsize: usize,
650 }
651 impl<R:Read> LumpReader<R> {
652 fn new(lumpsize: usize, inner: R) -> Self {
653 LumpReader { inner, lumpsize, inlump: 0 }
654 }
655 }
656 impl<R:Read> Read for LumpReader<R> {
657 #[throws(io::Error)]
658 fn read(&mut self, buf: &mut [u8]) -> usize {
659 if self.inlump == 0 { self.inlump = self.lumpsize }
660 let want = min(self.inlump, buf.len());
661 let r = self.inner.read(&mut buf[0..want])?;
662 self.inlump -= r;
663 r
664 }
665 }
666
667 for bufsize in 1..=msg.buf.len()+1 {
668 dbgc!(lumpsize, bufsize);
669 let rd = LumpReader::new(lumpsize, &*msg.buf);
670 let rd = Fuse::new(rd);
671 let rd = BufReader::with_capacity(bufsize, rd);
672 let mut rd = FrameReader::from_bufreader(rd);
673
674 expect_good(&mut rd, b"hello");
675 expect_boom(&mut rd);
676 expect_good(&mut rd, b"");
677 expect_good(&mut rd, b"longer!");
678 expect_good_eof(&mut rd);
679 }
680 }
681
682 {
684 let mut rd = FrameReader::new(&[0x55][..]);
685 let r = rd.new_frame().unwrap_err();
686 expect_is_bad_eof(r);
687 }
688
689 {
691 let mut rd = FrameReader::new(&msg.buf[0..3]);
692 let mut frame = rd.new_frame().unwrap().unwrap();
693 let y = frame.read(&mut buf).unwrap();
694 assert_eq!(y, 1);
695 expect_bad_eof(&mut frame);
696 }
697
698 {
700 let mut rd = FrameReader::new(&msg.buf[0..7]);
701 let mut frame = rd.new_frame().unwrap().unwrap();
702 let y = frame.read(&mut buf).unwrap();
703 assert_eq!(&buf[0..y], b"hello");
704 expect_bad_eof(&mut frame);
705 }
706}