1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
#![allow(dead_code)] #![allow(unused_variables)] use tokio::net::{TcpStream, ToSocketAddrs, tcp::OwnedReadHalf, tcp::OwnedWriteHalf}; use tokio_util::codec::{Framed, FramedRead, FramedWrite, Decoder, Encoder}; use bytes::{BufMut, BytesMut, Buf}; mod default_server; pub struct Stem { stream: TcpStream } impl Stem { pub async fn new<T: ToSocketAddrs>(addr: T) -> std::io::Result<Stem> { Ok(Stem { stream: TcpStream::connect(addr).await? }) } pub async fn with_server() -> std::io::Result<Stem> { Ok(Stem { stream: TcpStream::connect(default_server::DEFAULT_SERVER).await? }) } pub fn framed_split(self) -> (FramedRead<OwnedReadHalf, StemCodec>, FramedWrite<OwnedWriteHalf, StemCodec>) { let (read, write) = self.stream.into_split(); (FramedRead::new(read, StemCodec), FramedWrite::new(write, StemCodec)) } pub fn framed(self) -> Framed<TcpStream, StemCodec> { Framed::new(self.stream, StemCodec {}) } } pub struct StemCodec; impl Decoder for StemCodec { type Item = StemPacket; type Error = std::io::Error; fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> { if src.len() < 2 { return Ok(None) } let size = src.get_u16(); let id = src.get_u8(); match id { 0x00 => { let channel_len = src.get_u8(); let mut buf = vec![0; channel_len as usize]; src.copy_to_slice(buf.as_mut_slice()); let channel = String::from_utf8(buf).unwrap(); let mut buf = vec![0; (size - 3 - channel_len as u16) as usize]; src.copy_to_slice(buf.as_mut_slice()); let message = String::from_utf8(buf).unwrap(); Ok(Some(StemPacket::Message(channel, message))) } 0x01 | 0x02 => { let channel_len = src.get_u8(); let mut buf = vec![0; channel_len as usize]; src.copy_to_slice(buf.as_mut_slice()); let channel = String::from_utf8(buf).unwrap(); if id == 0x01 { Ok(Some(StemPacket::Sub(channel))) } else { Ok(Some(StemPacket::UnSub(channel))) } } 0x03 | 0x04 => { let mut buf = vec![0; usize::from(size - 1)]; src.copy_to_slice(buf.as_mut_slice()); if id == 0x03 { Ok(Some(StemPacket::Ping(buf))) } else { Ok(Some(StemPacket::Pong(buf))) } } _ => { Ok(Some(StemPacket::Unknown)) } } } } impl Encoder<StemPacket> for StemCodec { type Error = std::io::Error; fn encode(&mut self, item: StemPacket, dst: &mut BytesMut) -> Result<(), Self::Error> { match item { StemPacket::Message(channel, message) => { let size = 2 + channel.len() + message.len(); dst.put_u16(size as u16); dst.put_u8(0x00); dst.put_u8(channel.len() as u8); dst.put_slice(channel.as_bytes()); dst.put_slice(message.as_bytes()); } StemPacket::Sub(channel) => { let size = 2 + channel.len(); dst.put_u16(size as u16); dst.put_u8(0x01); dst.put_u8(channel.len() as u8); dst.put_slice(channel.as_bytes()); } StemPacket::UnSub(channel) => { let size = 2 + channel.len(); dst.put_u16(size as u16); dst.put_u8(0x02); dst.put_u8(channel.len() as u8); dst.put_slice(channel.as_bytes()); } StemPacket::Ping(data) => { let size = 1 + data.len(); dst.put_u16(size as u16); dst.put_u8(0x03); dst.put_slice(data.as_slice()); } StemPacket::Pong(data) => { let size = 1 + data.len(); dst.put_u16(size as u16); dst.put_u8(0x03); dst.put_slice(data.as_slice()); } _ => {} } Ok(()) } } #[derive(Debug)] pub enum StemPacket { Message(String, String), Sub(String), UnSub(String), Ping(Vec<u8>), Pong(Vec<u8>), Unknown }