1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use bytes::{Buf, BufMut, Bytes, BytesMut};

use super::utils;
use super::{Body, Frame, REQUEST_MAX};
use crate::error::RSocketError;
use crate::utils::Writeable;

#[derive(Debug, PartialEq)]
pub struct RequestChannel {
    initial_request_n: u32,
    metadata: Option<Bytes>,
    data: Option<Bytes>,
}

pub struct RequestChannelBuilder {
    stream_id: u32,
    flag: u16,
    value: RequestChannel,
}

impl RequestChannelBuilder {
    pub fn new(stream_id: u32, flag: u16) -> RequestChannelBuilder {
        RequestChannelBuilder {
            stream_id,
            flag,
            value: RequestChannel {
                initial_request_n: REQUEST_MAX,
                metadata: None,
                data: None,
            },
        }
    }

    pub fn build(self) -> Frame {
        Frame::new(self.stream_id, Body::RequestChannel(self.value), self.flag)
    }

    pub fn set_initial_request_n(mut self, n: u32) -> Self {
        self.value.initial_request_n = n;
        self
    }

    pub fn set_all(mut self, data_and_metadata: (Option<Bytes>, Option<Bytes>)) -> Self {
        self.value.data = data_and_metadata.0;
        match data_and_metadata.1 {
            Some(m) => {
                self.value.metadata = Some(m);
                self.flag |= Frame::FLAG_METADATA;
            }
            None => {
                self.value.metadata = None;
                self.flag &= !Frame::FLAG_METADATA;
            }
        }
        self
    }

    pub fn set_metadata(mut self, metadata: Bytes) -> Self {
        self.value.metadata = Some(metadata);
        self.flag |= Frame::FLAG_METADATA;
        self
    }

    pub fn set_data(mut self, data: Bytes) -> Self {
        self.value.data = Some(data);
        self
    }
}

impl RequestChannel {
    pub(crate) fn decode(flag: u16, bf: &mut BytesMut) -> crate::Result<RequestChannel> {
        if bf.len() < 4 {
            Err(RSocketError::InCompleteFrame.into())
        } else {
            let initial_request_n = bf.get_u32();
            utils::read_payload(flag, bf).map(move |(metadata, data)| RequestChannel {
                initial_request_n,
                metadata,
                data,
            })
        }
    }

    pub fn builder(stream_id: u32, flag: u16) -> RequestChannelBuilder {
        RequestChannelBuilder::new(stream_id, flag)
    }

    pub fn get_initial_request_n(&self) -> u32 {
        self.initial_request_n
    }

    pub fn get_metadata(&self) -> Option<&Bytes> {
        self.metadata.as_ref()
    }
    pub fn get_data(&self) -> Option<&Bytes> {
        self.data.as_ref()
    }

    pub fn split(self) -> (Option<Bytes>, Option<Bytes>) {
        (self.data, self.metadata)
    }
}

impl Writeable for RequestChannel {
    fn write_to(&self, bf: &mut BytesMut) {
        bf.put_u32(self.initial_request_n);
        utils::write_payload(bf, self.get_metadata(), self.get_data());
    }

    fn len(&self) -> usize {
        4 + utils::calculate_payload_length(self.get_metadata(), self.get_data())
    }
}