1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
use crate::{ Byte, Decode, Encode, Error, PropertiesDecoder, Property, QoS, Result as SageResult, TwoByteInteger, UTF8String, VariableByteInteger, DEFAULT_MAXIMUM_QOS, }; use std::{ convert::{TryFrom, TryInto}, io::{Read, Write}, }; #[derive(Eq, Debug, PartialEq, Clone, Copy)] pub enum RetainHandling { OnSubscribe = 0x00, OnFirstSubscribe = 0x01, DontSend = 0x02, } impl TryFrom<u8> for RetainHandling { type Error = Error; fn try_from(value: u8) -> Result<Self, Self::Error> { match value { 0x00 => Ok(RetainHandling::OnSubscribe), 0x01 => Ok(RetainHandling::OnFirstSubscribe), 0x02 => Ok(RetainHandling::DontSend), _ => Err(Self::Error::MalformedPacket), } } } #[derive(Debug, PartialEq, Clone, Copy)] pub struct SubscriptionOptions { pub qos: QoS, pub no_local: bool, pub retain_as_published: bool, pub retain_handling: RetainHandling, } impl Default for SubscriptionOptions { fn default() -> Self { SubscriptionOptions { qos: DEFAULT_MAXIMUM_QOS, no_local: false, retain_as_published: false, retain_handling: RetainHandling::OnSubscribe, } } } impl Encode for SubscriptionOptions { fn encode<W: Write>(self, writer: &mut W) -> SageResult<usize> { Byte( self.qos as u8 | (self.no_local as u8) << 2 | (self.retain_as_published as u8) << 3 | (self.retain_handling as u8) << 4, ) .encode(writer) } } impl Decode for SubscriptionOptions { fn decode<R: Read>(reader: &mut R) -> SageResult<Self> { let flags = u8::from(Byte::decode(reader)?); if flags & 0b1100_0000 > 0 { Err(Error::ProtocolError) } else { Ok(SubscriptionOptions { qos: (flags & 0b0000_0011).try_into()?, no_local: (flags & 0b0000_0010) > 0, retain_as_published: (flags & 0b0000_1000) > 0, retain_handling: ((flags & 0b0011_0000) >> 4).try_into()?, }) } } } #[derive(Debug, PartialEq, Clone)] pub struct Subscribe { pub packet_identifier: u16, pub subscription_identifier: Option<u32>, pub user_properties: Vec<(String, String)>, pub subscriptions: Vec<(String, SubscriptionOptions)>, } impl Default for Subscribe { fn default() -> Self { Subscribe { packet_identifier: 0, subscription_identifier: None, user_properties: Default::default(), subscriptions: Default::default(), } } } impl Subscribe { pub fn write<W: Write>(self, writer: &mut W) -> SageResult<usize> { let mut n_bytes = TwoByteInteger(self.packet_identifier).encode(writer)?; let mut properties = Vec::new(); if let Some(v) = self.subscription_identifier { n_bytes += VariableByteInteger(v).encode(writer)?; } for (k, v) in self.user_properties { n_bytes += Property::UserProperty(k, v).encode(&mut properties)?; } n_bytes += VariableByteInteger(properties.len() as u32).encode(writer)?; writer.write_all(&properties)?; for option in self.subscriptions { n_bytes += UTF8String(option.0).encode(writer)?; n_bytes += option.1.encode(writer)?; } Ok(n_bytes) } pub fn read<R: Read>(reader: &mut R, remaining_size: usize) -> SageResult<Self> { let mut reader = reader.take(remaining_size as u64); let packet_identifier = TwoByteInteger::decode(&mut reader)?.into(); let mut user_properties = Vec::new(); let mut subscription_identifier = None; let mut properties = PropertiesDecoder::take(&mut reader)?; while properties.has_properties() { match properties.read()? { Property::SubscriptionIdentifier(v) => subscription_identifier = Some(v), Property::UserProperty(k, v) => user_properties.push((k, v)), _ => return Err(Error::ProtocolError), } } let mut subscriptions = Vec::new(); while reader.limit() > 0 { subscriptions.push(( UTF8String::decode(&mut reader)?.into(), SubscriptionOptions::decode(&mut reader)?, )); } if subscriptions.is_empty() { Err(Error::ProtocolError) } else { Ok(Subscribe { packet_identifier, subscription_identifier, user_properties, subscriptions, }) } } }