1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use std::io::{Read, Write};
use std::mem::MaybeUninit;

use bytes::{BufMut, BytesMut};
use tokio_util::codec::{Decoder, Encoder};

pub struct Framed<S, C> {
    stream: S,
    codec: C,
    read_buf: BytesMut,
    write_buf: BytesMut,
}

impl<S, C> Framed<S, C> {
    pub fn new(stream: S, codec: C) -> Self {
        Framed {
            stream,
            codec,
            read_buf: BytesMut::new(),
            write_buf: BytesMut::new(),
        }
    }

    pub fn replace_codec<D>(self, codec: D) -> Framed<S, D> {
        Framed {
            stream: self.stream,
            codec,
            read_buf: self.read_buf,
            write_buf: self.write_buf,
        }
    }
}

impl<S: Write, C: Encoder> Framed<S, C> {
    pub fn send(&mut self, item: C::Item) -> Result<(), C::Error> {
        self.write_buf.truncate(0);
        self.codec.encode(item, &mut self.write_buf)?;
        self.stream.write_all(&self.write_buf)?;
        Ok(())
    }
}

impl<S: Read, C: Decoder> Framed<S, C> {
    pub fn receive(&mut self) -> Result<Option<C::Item>, C::Error> {
        loop {
            if self.read_buf.capacity() == 0 {
                self.read_buf.reserve(8 * 1024);
            } else {
                if let Some(frame) = self.codec.decode(&mut self.read_buf)? {
                    return Ok(Some(frame));
                }

                self.read_buf.reserve(1);
            }

            let buf = self.read_buf.bytes_mut();

            let buf = unsafe {
                for x in buf.iter_mut() {
                    *x.as_mut_ptr() = 0;
                    x.assume_init();
                }

                &mut *(buf as *mut [MaybeUninit<u8>] as *mut [u8])
            };

            let n = self.stream.read(buf)?;

            unsafe { self.read_buf.advance_mut(n) };

            if n == 0 {
                return self.codec.decode_eof(&mut self.read_buf);
            }
        }
    }
}