use bytes::{Buf, BufMut};
use std::{self, fmt::Debug, io};
use crate::{net::buffers::ChunkLease, prelude::SessionId};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
pub const MAGIC_NUM: u32 = 0xC0A1BA11;
pub const FRAME_HEAD_LEN: u32 = 4 + 4 + 1;
#[derive(Debug)]
pub enum FramingError {
BufferCapacity,
UnsupportedFrameType,
InvalidMagicNum((u32, Vec<u8>)),
InvalidFrame,
SerialisationError,
OptionError,
NoData,
Io(std::io::Error),
}
impl From<std::io::Error> for FramingError {
fn from(src: std::io::Error) -> Self {
FramingError::Io(src)
}
}
impl From<FramingError> for std::io::Error {
fn from(_: FramingError) -> Self {
io::Error::new(io::ErrorKind::InvalidData, "framing error")
}
}
#[derive(Debug)]
pub enum Frame {
Data(Data),
Hello(Hello),
Start(Start),
Ack(),
Bye(),
}
impl Frame {
pub fn frame_type(&self) -> FrameType {
match *self {
Frame::Data(_) => FrameType::Data,
Frame::Hello(_) => FrameType::Hello,
Frame::Start(_) => FrameType::Start,
Frame::Ack() => FrameType::Ack,
Frame::Bye() => FrameType::Bye,
}
}
pub fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
let mut head = FrameHead::new(self.frame_type(), self.encoded_len());
head.encode_into(dst);
match self {
Frame::Data(frame) => frame.encode_into(dst),
Frame::Hello(frame) => frame.encode_into(dst),
Frame::Start(frame) => frame.encode_into(dst),
Frame::Ack() => Ok(()),
Frame::Bye() => Ok(()),
}
}
pub fn encoded_len(&self) -> usize {
match *self {
Frame::Data(ref frame) => frame.encoded_len(),
Frame::Hello(ref frame) => frame.encoded_len(),
Frame::Start(ref frame) => frame.encoded_len(),
_ => 0,
}
}
}
pub(crate) trait FrameExt {
fn decode_from(src: ChunkLease) -> Result<Frame, FramingError>;
fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError>;
fn encoded_len(&self) -> usize;
}
#[derive(Debug)]
pub struct StreamRequest {
pub credit_capacity: u32,
}
#[derive(Debug)]
pub struct CreditUpdate {
pub credit: u32,
}
#[derive(Debug)]
pub struct Data {
pub payload: ChunkLease,
}
#[derive(Debug, Clone)]
pub struct Hello {
pub addr: SocketAddr,
}
#[derive(Debug)]
pub struct Start {
pub addr: SocketAddr,
pub id: SessionId,
}
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, Ord, PartialOrd, Eq)]
pub enum FrameType {
Data = 0x02,
Hello = 0x04,
Start = 0x05,
Ack = 0x06,
Bye = 0x07,
Unknown = 0x08,
}
impl From<u8> for FrameType {
fn from(byte: u8) -> Self {
match byte {
0x02 => FrameType::Data,
0x04 => FrameType::Hello,
0x05 => FrameType::Start,
0x06 => FrameType::Ack,
0x07 => FrameType::Bye,
_ => FrameType::Unknown,
}
}
}
#[derive(Debug)]
pub(crate) struct FrameHead {
frame_type: FrameType,
content_length: usize,
}
impl FrameHead {
pub(crate) fn new(frame_type: FrameType, content_length: usize) -> Self {
FrameHead {
frame_type,
content_length,
}
}
pub(crate) fn encode_into<B: BufMut>(&mut self, dst: &mut B) {
assert!(dst.remaining_mut() >= FRAME_HEAD_LEN as usize);
dst.put_u32(MAGIC_NUM);
dst.put_u32(self.content_length as u32);
dst.put_u8(self.frame_type as u8);
}
pub(crate) fn decode_from<B: Buf + ?Sized>(src: &mut B) -> Result<Self, FramingError> {
if src.remaining() < (FRAME_HEAD_LEN) as usize {
return Err(FramingError::NoData);
}
let magic_check = src.get_u32();
if magic_check != MAGIC_NUM {
eprintln!("Magic check fail: {:X}", magic_check);
return Err(FramingError::InvalidMagicNum((
magic_check,
src.chunk().to_vec(),
)));
}
let content_length = src.get_u32() as usize;
let frame_type: FrameType = src.get_u8().into();
let head = FrameHead::new(frame_type, content_length);
Ok(head)
}
pub(crate) fn content_length(&self) -> usize {
self.content_length
}
pub(crate) fn frame_type(&self) -> FrameType {
self.frame_type
}
}
impl Hello {
pub fn new(addr: SocketAddr) -> Self {
Hello { addr }
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
}
impl Start {
pub fn new(addr: SocketAddr, id: SessionId) -> Self {
Start { addr, id }
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
pub fn id(&self) -> SessionId {
self.id
}
}
impl Data {
pub fn new(payload: ChunkLease) -> Self {
Data { payload }
}
pub(crate) fn encoded_len(&self) -> usize {
self.payload.capacity()
}
pub(crate) fn payload(self) -> ChunkLease {
self.payload
}
}
impl FrameExt for Data {
fn decode_from(payload: ChunkLease) -> Result<Frame, FramingError> {
let data_frame = Data { payload };
Ok(Frame::Data(data_frame))
}
fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
assert!(dst.remaining_mut() >= (self.encoded_len()));
while self.payload.has_remaining() {
dst.put_slice(self.payload.chunk());
}
Ok(())
}
fn encoded_len(&self) -> usize {
self.payload.chunk().len()
}
}
impl FrameExt for Hello {
fn decode_from(mut src: ChunkLease) -> Result<Frame, FramingError> {
match src.get_u8() {
4 => {
let ip = Ipv4Addr::from(src.get_u32());
let port = src.get_u16();
let addr = SocketAddr::new(IpAddr::V4(ip), port);
Ok(Frame::Hello(Hello::new(addr)))
}
6 => {
let ip = Ipv6Addr::from(src.get_u128());
let port = src.get_u16();
let addr = SocketAddr::new(IpAddr::V6(ip), port);
Ok(Frame::Hello(Hello::new(addr)))
}
_ => {
panic!("Faulty Hello Message!");
}
}
}
fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
match self.addr {
SocketAddr::V4(v4) => {
dst.put_u8(4); dst.put_slice(&v4.ip().octets()); dst.put_u16(v4.port()); Ok(())
}
SocketAddr::V6(v6) => {
dst.put_u8(6); dst.put_slice(&v6.ip().octets()); dst.put_u16(v6.port()); Ok(())
}
}
}
fn encoded_len(&self) -> usize {
match self.addr {
SocketAddr::V4(_v4) => {
1 + 4 + 2 }
SocketAddr::V6(_v6) => {
1 + 16 + 2 }
}
}
}
impl FrameExt for Start {
fn decode_from(mut src: ChunkLease) -> Result<Frame, FramingError> {
match src.get_u8() {
4 => {
let ip = Ipv4Addr::from(src.get_u32());
let port = src.get_u16();
let addr = SocketAddr::new(IpAddr::V4(ip), port);
let id = SessionId::from_u128(src.get_u128());
Ok(Frame::Start(Start::new(addr, id)))
}
6 => {
let ip = Ipv6Addr::from(src.get_u128());
let port = src.get_u16();
let addr = SocketAddr::new(IpAddr::V6(ip), port);
let id = SessionId::from_u128(src.get_u128());
Ok(Frame::Start(Start::new(addr, id)))
}
_ => {
panic!("Faulty Hello Message!");
}
}
}
fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
match self.addr {
SocketAddr::V4(v4) => {
dst.put_u8(4); dst.put_slice(&v4.ip().octets()); dst.put_u16(v4.port()); dst.put_u128(self.id.as_u128()); Ok(())
}
SocketAddr::V6(v6) => {
dst.put_u8(6); dst.put_slice(&v6.ip().octets()); dst.put_u16(v6.port()); dst.put_u128(self.id.as_u128()); Ok(())
}
}
}
fn encoded_len(&self) -> usize {
match self.addr {
SocketAddr::V4(_v4) => {
1 + 4 + 2 + 16 }
SocketAddr::V6(_v6) => {
1 + 16 + 2 + 16 }
}
}
}