1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#![allow(dead_code)]
#![allow(unused_variables)]


use tokio::net::{TcpStream, ToSocketAddrs, tcp::OwnedReadHalf, tcp::OwnedWriteHalf};
use tokio_util::codec::{Framed, FramedRead, FramedWrite, Decoder, Encoder};
use bytes::{BufMut, BytesMut, Buf};


mod default_server;


pub struct Stem {
    stream: TcpStream
}

impl Stem {
    pub async fn new<T: ToSocketAddrs>(addr: T) -> std::io::Result<Stem> {
        Ok(Stem {
            stream: TcpStream::connect(addr).await?
        })
    }
    pub async fn with_server() -> std::io::Result<Stem> {
        Ok(Stem {
            stream: TcpStream::connect(default_server::DEFAULT_SERVER).await?
        })
    }

    pub fn framed_split(self) -> (FramedRead<OwnedReadHalf, StemCodec>, FramedWrite<OwnedWriteHalf, StemCodec>) {
        let (read, write) = self.stream.into_split();
        (FramedRead::new(read, StemCodec), FramedWrite::new(write, StemCodec))
    }

    pub fn framed(self) -> Framed<TcpStream, StemCodec> {
        Framed::new(self.stream, StemCodec {})
    }
}

pub struct StemCodec;

impl Decoder for StemCodec {

    type Item = StemPacket;
    type Error = std::io::Error;

    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 2 {
            return Ok(None)
        }
        let size = src.get_u16();
        let id = src.get_u8();
        match id {
            0x00 => {
                let channel_len = src.get_u8();

                let mut buf = vec![0; channel_len as usize];
                src.copy_to_slice(buf.as_mut_slice());
                let channel = String::from_utf8(buf).unwrap();

                let mut buf = vec![0; (size - 3 - channel_len as u16) as usize];
                src.copy_to_slice(buf.as_mut_slice());
                let message = String::from_utf8(buf).unwrap();

                Ok(Some(StemPacket::Message(channel, message)))
            }
            0x01 | 0x02 => {
                let channel_len = src.get_u8();

                let mut buf = vec![0; channel_len as usize];
                src.copy_to_slice(buf.as_mut_slice());
                let channel = String::from_utf8(buf).unwrap();

                if id == 0x01 {
                    Ok(Some(StemPacket::Sub(channel)))
                } else {
                    Ok(Some(StemPacket::UnSub(channel)))
                }
            }
            0x03 | 0x04 => {
                let mut buf = vec![0; usize::from(size - 1)];
                src.copy_to_slice(buf.as_mut_slice());
                if id == 0x03 {
                    Ok(Some(StemPacket::Ping(buf)))
                } else {
                    Ok(Some(StemPacket::Pong(buf)))
                }
            }
            _ => {
                Ok(Some(StemPacket::Unknown))
            }
        }
    }
    
}

impl Encoder<StemPacket> for StemCodec {

    type Error = std::io::Error;

    fn encode(&mut self, item: StemPacket, dst: &mut BytesMut) -> Result<(), Self::Error> {
        match item {
            StemPacket::Message(channel, message) => {
                let size = 2 + channel.len() + message.len();
                dst.put_u16(size as u16);
                dst.put_u8(0x00);
                dst.put_u8(channel.len() as u8);
                dst.put_slice(channel.as_bytes());
                dst.put_slice(message.as_bytes());
            }
            StemPacket::Sub(channel) => {
                let size = 2 + channel.len();
                dst.put_u16(size as u16);
                dst.put_u8(0x01);
                dst.put_u8(channel.len() as u8);
                dst.put_slice(channel.as_bytes());
            }
            StemPacket::UnSub(channel) => {
                let size = 2 + channel.len();
                dst.put_u16(size as u16);
                dst.put_u8(0x02);
                dst.put_u8(channel.len() as u8);
                dst.put_slice(channel.as_bytes());
            }
            StemPacket::Ping(data) => {
                let size = 1 + data.len();
                dst.put_u16(size as u16);
                dst.put_u8(0x03);
                dst.put_slice(data.as_slice());
            }
            StemPacket::Pong(data) => {
                let size = 1 + data.len();
                dst.put_u16(size as u16);
                dst.put_u8(0x03);
                dst.put_slice(data.as_slice());
            }
            _ => {}
        }
        Ok(())
    }

}

#[derive(Debug)]
pub enum StemPacket {
    Message(String, String),
    Sub(String),
    UnSub(String),
    Ping(Vec<u8>),
    Pong(Vec<u8>),
    Unknown
}