1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use crate::{
    Byte, Decode, Encode, Error, PropertiesDecoder, Property, QoS, Result as SageResult,
    TwoByteInteger, UTF8String, VariableByteInteger, DEFAULT_MAXIMUM_QOS,
};
use std::{
    convert::{TryFrom, TryInto},
    io::{Read, Write},
};

#[derive(Eq, Debug, PartialEq, Clone, Copy)]
pub enum RetainHandling {
    OnSubscribe = 0x00,
    OnFirstSubscribe = 0x01,
    DontSend = 0x02,
}

impl TryFrom<u8> for RetainHandling {
    type Error = Error;
    fn try_from(value: u8) -> Result<Self, Self::Error> {
        match value {
            0x00 => Ok(RetainHandling::OnSubscribe),
            0x01 => Ok(RetainHandling::OnFirstSubscribe),
            0x02 => Ok(RetainHandling::DontSend),
            _ => Err(Self::Error::MalformedPacket),
        }
    }
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub struct SubscriptionOptions {
    pub qos: QoS,
    pub no_local: bool,
    pub retain_as_published: bool,
    pub retain_handling: RetainHandling,
}

impl Default for SubscriptionOptions {
    fn default() -> Self {
        SubscriptionOptions {
            qos: DEFAULT_MAXIMUM_QOS,
            no_local: false,
            retain_as_published: false,
            retain_handling: RetainHandling::OnSubscribe,
        }
    }
}

impl Encode for SubscriptionOptions {
    fn encode<W: Write>(self, writer: &mut W) -> SageResult<usize> {
        Byte(
            self.qos as u8
                | (self.no_local as u8) << 2
                | (self.retain_as_published as u8) << 3
                | (self.retain_handling as u8) << 4,
        )
        .encode(writer)
    }
}

impl Decode for SubscriptionOptions {
    fn decode<R: Read>(reader: &mut R) -> SageResult<Self> {
        let flags = u8::from(Byte::decode(reader)?);
        if flags & 0b1100_0000 > 0 {
            Err(Error::ProtocolError)
        } else {
            Ok(SubscriptionOptions {
                qos: (flags & 0b0000_0011).try_into()?,
                no_local: (flags & 0b0000_0010) > 0,
                retain_as_published: (flags & 0b0000_1000) > 0,
                retain_handling: ((flags & 0b0011_0000) >> 4).try_into()?,
            })
        }
    }
}

#[derive(Debug, PartialEq, Clone)]
pub struct Subscribe {
    pub packet_identifier: u16,
    pub subscription_identifier: Option<u32>,
    pub user_properties: Vec<(String, String)>,
    pub subscriptions: Vec<(String, SubscriptionOptions)>,
}

impl Default for Subscribe {
    fn default() -> Self {
        Subscribe {
            packet_identifier: 0,
            subscription_identifier: None,
            user_properties: Default::default(),
            subscriptions: Default::default(),
        }
    }
}

impl Subscribe {
    pub fn write<W: Write>(self, writer: &mut W) -> SageResult<usize> {
        let mut n_bytes = TwoByteInteger(self.packet_identifier).encode(writer)?;

        let mut properties = Vec::new();

        if let Some(v) = self.subscription_identifier {
            n_bytes += VariableByteInteger(v).encode(writer)?;
        }
        for (k, v) in self.user_properties {
            n_bytes += Property::UserProperty(k, v).encode(&mut properties)?;
        }

        n_bytes += VariableByteInteger(properties.len() as u32).encode(writer)?;
        writer.write_all(&properties)?;

        for option in self.subscriptions {
            n_bytes += UTF8String(option.0).encode(writer)?;
            n_bytes += option.1.encode(writer)?;
        }

        Ok(n_bytes)
    }

    pub fn read<R: Read>(reader: &mut R, remaining_size: usize) -> SageResult<Self> {
        let mut reader = reader.take(remaining_size as u64);

        let packet_identifier = TwoByteInteger::decode(&mut reader)?.into();

        let mut user_properties = Vec::new();
        let mut subscription_identifier = None;

        let mut properties = PropertiesDecoder::take(&mut reader)?;
        while properties.has_properties() {
            match properties.read()? {
                Property::SubscriptionIdentifier(v) => subscription_identifier = Some(v),
                Property::UserProperty(k, v) => user_properties.push((k, v)),
                _ => return Err(Error::ProtocolError),
            }
        }

        let mut subscriptions = Vec::new();

        while reader.limit() > 0 {
            subscriptions.push((
                UTF8String::decode(&mut reader)?.into(),
                SubscriptionOptions::decode(&mut reader)?,
            ));
        }

        if subscriptions.is_empty() {
            Err(Error::ProtocolError)
        } else {
            Ok(Subscribe {
                packet_identifier,
                subscription_identifier,
                user_properties,
                subscriptions,
            })
        }
    }
}