mymq/
types.rs

1#[cfg(any(feature = "fuzzy", feature = "mymqd", test))]
2use arbitrary::{Arbitrary, Error as ArbitraryError, Unstructured};
3
4use std::ops::{Deref, DerefMut};
5use std::{cmp, fmt, mem, result};
6
7use crate::{util, IterTopicPath, Packetize};
8use crate::{Error, ErrorKind, ReasonCode, Result};
9
10/// Type alias for PacketID as u16.
11pub type PacketID = u16;
12
13// TODO: Section.4.7
14//       An Application Message is sent to each Client Subscription whose Topic
15//       Filter matches the Topic Name attached to an Application Message. The topic
16//       resource MAY be either predefined in the Server by an administrator or it MAY
17//       be dynamically created by the Server when it receives the first subscription
18//       or an Application Message with that Topic Name. The Server MAY also use a
19//       security component to authorize particular actions on the topic resource for
20//       a given Client.
21
22/// Type is associated with [Packetize] trait and optimizes on the returned byte-blob.
23#[derive(Clone, Eq, PartialEq, Debug)]
24pub enum Blob {
25    /// Small variant stores the bytes in stack.
26    Small { data: [u8; 32], size: usize },
27    /// Large variant stores the bytes in heap.
28    Large { data: Vec<u8> },
29}
30
31impl AsRef<[u8]> for Blob {
32    fn as_ref(&self) -> &[u8] {
33        match self {
34            Blob::Small { data, size } => &data[..*size],
35            Blob::Large { data } => &data,
36        }
37    }
38}
39
40/// Type client-id implements a unique ID, managed internally as string.
41#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Default)]
42pub struct ClientID(pub String);
43
44impl fmt::Display for ClientID {
45    fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
46        write!(f, "{}", self.0)
47    }
48}
49
50impl Deref for ClientID {
51    type Target = String;
52
53    fn deref(&self) -> &String {
54        &self.0
55    }
56}
57
58impl DerefMut for ClientID {
59    fn deref_mut(&mut self) -> &mut String {
60        &mut self.0
61    }
62}
63
64impl From<String> for ClientID {
65    fn from(val: String) -> ClientID {
66        ClientID(val)
67    }
68}
69
70#[cfg(any(feature = "fuzzy", test))]
71impl<'a> Arbitrary<'a> for ClientID {
72    fn arbitrary(uns: &mut Unstructured<'a>) -> result::Result<Self, ArbitraryError> {
73        let client_id = match uns.arbitrary::<u8>()? % 2 {
74            0 => ClientID::new_uuid_v4(),
75            1 => ClientID("".to_string()),
76            _ => unreachable!(),
77        };
78
79        Ok(client_id)
80    }
81}
82
83impl ClientID {
84    /// Use uuid-v4 to generate a unique client ID. Stringified representaion shall
85    /// look like: `0c046132-816a-49eb-90c9-2d8161c50409`
86    pub fn new_uuid_v4() -> ClientID {
87        ClientID(uuid::Uuid::new_v4().to_string())
88    }
89}
90
91/// Type implement topic-name defined by MQTT specification.
92#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
93pub struct TopicName(String);
94
95impl Deref for TopicName {
96    type Target = String;
97
98    fn deref(&self) -> &String {
99        &self.0
100    }
101}
102
103impl DerefMut for TopicName {
104    fn deref_mut(&mut self) -> &mut String {
105        &mut self.0
106    }
107}
108
109impl From<String> for TopicName {
110    fn from(val: String) -> TopicName {
111        TopicName(val)
112    }
113}
114
115#[cfg(any(feature = "fuzzy", feature = "mymqd", test))]
116impl<'a> Arbitrary<'a> for TopicName {
117    fn arbitrary(uns: &mut Unstructured<'a>) -> result::Result<Self, ArbitraryError> {
118        let names_choice = [
119            "/",
120            "//",
121            "/space ok",
122            "sport",
123            "sport/",
124            "sport/tennis/player1",
125            "sport/tennis/player1/ranking",
126            "sport/tennis/player1/score/wimbledon",
127            "sport/tennis/player2",
128            "sport/tennis/player2/ranking",
129            "/finance",
130            "$SYS/monitor/Clients",
131            "$SYS/name",
132        ];
133        let level_choice: Vec<String> =
134            vec!["", "$", "$SYS"].into_iter().map(|s| s.to_string()).collect();
135        let string_choice: Vec<String> = vec![
136            "", "a", "ab", "abc", "space ok", "sport", "tennis", "player1", "player2",
137            "ranking", "score", "finance",
138        ]
139        .into_iter()
140        .map(|s| s.to_string())
141        .collect();
142
143        let s = match uns.arbitrary::<u8>()? {
144            0..=10 => uns.choose(&names_choice)?.to_string(),
145            _ => {
146                let c = uns.arbitrary::<u8>()?;
147                let levels = match c {
148                    00..=09 => vec![uns.choose(&string_choice)?.to_string()],
149                    _ => {
150                        let mut levels = vec![];
151                        for _ in 0..((c % 10) + 1) {
152                            let level = match uns.arbitrary::<u8>()? {
153                                000..=200 => uns.choose(&string_choice)?.to_string(),
154                                201..=255 => uns.choose(&level_choice)?.clone(),
155                            };
156                            levels.push(level);
157                        }
158                        levels
159                    }
160                };
161
162                let mut s = String::from_iter(
163                    levels
164                        .join("/")
165                        .chars()
166                        .filter(|ch| !matches!(ch, '#' | '+' | '\u{0}')),
167                );
168
169                loop {
170                    if s.len() > 0 {
171                        break s;
172                    }
173                    s = uns.choose(&string_choice)?.to_string();
174                }
175            }
176        };
177
178        Ok(TopicName::from(s))
179    }
180}
181
182impl Packetize for TopicName {
183    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
184        let stream: &[u8] = stream.as_ref();
185
186        let (val, n) = String::decode(stream)?;
187        let val = TopicName::from(val);
188
189        val.validate()?;
190        Ok((val, n))
191    }
192
193    fn encode(&self) -> Result<Blob> {
194        self.validate()?;
195        self.0.encode()
196    }
197}
198
199impl<'a> IterTopicPath<'a> for TopicName {
200    type Iter = std::str::Split<'a, char>;
201
202    fn iter_topic_path(&'a self) -> Self::Iter {
203        self.split('/')
204    }
205
206    fn is_dollar_topic(&self) -> bool {
207        self.0.as_bytes()[0] == 36 // '$'
208    }
209
210    fn is_begin_wild_card(&self) -> bool {
211        let ch = self.0.as_bytes()[0];
212        ch == 35 || ch == 43 // '#' or '+'
213    }
214}
215
216impl TopicName {
217    /// Validate topic-name based on TopicName specified by MQTT v5.
218    pub fn validate(&self) -> Result<()> {
219        // All Topic Names and Topic Filters MUST be at least one character long.
220        if self.0.len() == 0 {
221            err!(MalformedPacket, code: MalformedPacket, "ZERO length TopicName")?;
222        }
223        if self.0.chars().any(|ch| matches!(ch, '#' | '+' | '\u{0}')) {
224            err!(MalformedPacket, code: MalformedPacket, "invalid char found")?;
225        }
226
227        Ok(())
228    }
229}
230
231/// Type implement topic-filter defined by MQTT specification.
232#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
233pub struct TopicFilter(String);
234
235impl Deref for TopicFilter {
236    type Target = String;
237
238    fn deref(&self) -> &String {
239        &self.0
240    }
241}
242
243impl DerefMut for TopicFilter {
244    fn deref_mut(&mut self) -> &mut String {
245        &mut self.0
246    }
247}
248
249impl From<String> for TopicFilter {
250    fn from(val: String) -> TopicFilter {
251        TopicFilter(val)
252    }
253}
254
255impl Packetize for TopicFilter {
256    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
257        let stream: &[u8] = stream.as_ref();
258
259        let (val, n) = String::decode(stream)?;
260        let val = TopicFilter::from(val);
261
262        val.validate()?;
263        Ok((val, n))
264    }
265
266    fn encode(&self) -> Result<Blob> {
267        self.validate()?;
268        self.0.encode()
269    }
270}
271
272impl<'a> IterTopicPath<'a> for TopicFilter {
273    type Iter = std::str::Split<'a, char>;
274
275    fn iter_topic_path(&'a self) -> Self::Iter {
276        self.split('/')
277    }
278
279    fn is_dollar_topic(&self) -> bool {
280        self.0.as_bytes()[0] == 36 // '$'
281    }
282
283    fn is_begin_wild_card(&self) -> bool {
284        let ch = self.0.as_bytes()[0];
285        ch == 35 || ch == 43 // '#' or '+'
286    }
287}
288
289#[cfg(any(feature = "fuzzy", feature = "mymqd", test))]
290impl<'a> Arbitrary<'a> for TopicFilter {
291    fn arbitrary(uns: &mut Unstructured<'a>) -> result::Result<Self, ArbitraryError> {
292        let filters_choice = [
293            "#".to_string(),
294            "+".to_string(),
295            "/+".to_string(),
296            "/".to_string(),
297            "//".to_string(),
298            "/space ok".to_string(),
299            "sport/#".to_string(),
300            "sport/+".to_string(),
301            "sport/tennis/#".to_string(),
302            "sport/tennis/+".to_string(),
303            "sport/+".to_string(),
304            "sport/+/player1".to_string(),
305            "sport/tennis/player1/#".to_string(),
306            "+/+".to_string(),
307            "/+".to_string(),
308            "+/tennis/#".to_string(),
309            "+/monitor/Clients".to_string(),
310            "$SYS/#".to_string(),
311            "$SYS/monitor/+".to_string(),
312        ];
313        let level_choice: Vec<String> =
314            vec!["", "$", "$SYS", "#", "+"].into_iter().map(|s| s.to_string()).collect();
315        let string_choice: Vec<String> = vec![
316            "", "a", "ab", "abc", "space ok", "sport", "tennis", "player1", "player2",
317            "ranking", "score", "finance", "monitor", "Clients",
318        ]
319        .into_iter()
320        .map(|s| s.to_string())
321        .collect();
322
323        let s = match uns.arbitrary::<u8>()? {
324            0..=6 => uns.choose(&filters_choice)?.to_string(),
325            _ => {
326                let c = uns.arbitrary::<u8>()?;
327                let levels = match c {
328                    00..=09 => vec![uns.choose(&level_choice)?.to_string()],
329                    10..=20 => vec![uns.choose(&string_choice)?.to_string()],
330                    _ => {
331                        let mut levels = vec![];
332                        for _ in 0..((c % 10) + 1) {
333                            let level = match uns.arbitrary::<u8>()? {
334                                000..=200 => uns.choose(&string_choice)?.to_string(),
335                                201..=255 => uns.choose(&level_choice)?.clone(),
336                            };
337                            levels.push(level);
338                        }
339                        match levels
340                            .iter()
341                            .enumerate()
342                            .skip_while(|(_, s)| s != &"#")
343                            .next()
344                        {
345                            Some((i, _)) => levels[..i + 1].to_vec(),
346                            None => levels,
347                        }
348                    }
349                };
350
351                let mut s = String::from_iter(
352                    levels.join("/").chars().filter(|ch| !matches!(ch, '\u{0}')),
353                );
354
355                loop {
356                    if s.len() > 0 {
357                        break s;
358                    }
359                    s = uns.choose(&string_choice)?.to_string();
360                }
361            }
362        };
363
364        Ok(TopicFilter::from(s))
365    }
366}
367
368impl TopicFilter {
369    /// Validate topic-filter based on TopicFilter specified by MQTT v5.
370    pub fn validate(&self) -> Result<()> {
371        // All Topic Names and Topic Filters MUST be at least one character long.
372        if self.0.len() == 0 {
373            err!(MalformedPacket, code: MalformedPacket, "ZERO length TopicFilter")?;
374        }
375        if self.0.chars().any(|ch| matches!(ch, '\u{0}')) {
376            err!(MalformedPacket, code: MalformedPacket, "null char found")?;
377        }
378
379        let levels = self.iter_topic_path();
380
381        let mut iter = levels.clone().filter(|l| l.len() > 1);
382        if iter.any(|l| l.chars().any(|c| matches!(c, '#' | '+'))) {
383            err!(MalformedPacket, code: MalformedPacket, "wildcard mixed with chars")?;
384        }
385
386        let mut iter = levels.clone().skip_while(|l| l != &"#");
387        iter.next(); // skip the '#'
388        if let Some(_) = iter.next() {
389            err!(MalformedPacket, code: MalformedPacket, "chars after # wildcard")?;
390        }
391
392        Ok(())
393    }
394}
395
396/// MQTT packet type
397#[cfg_attr(any(feature = "fuzzy", test), derive(Arbitrary))]
398#[derive(Debug, Clone, Copy, PartialEq, Eq)]
399pub enum PacketType {
400    Connect = 1,
401    ConnAck = 2,
402    Publish = 3,
403    PubAck = 4,
404    PubRec = 5,
405    PubRel = 6,
406    PubComp = 7,
407    Subscribe = 8,
408    SubAck = 9,
409    UnSubscribe = 10,
410    UnsubAck = 11,
411    PingReq = 12,
412    PingResp = 13,
413    Disconnect = 14,
414    Auth = 15,
415}
416
417impl TryFrom<u8> for PacketType {
418    type Error = Error;
419
420    fn try_from(val: u8) -> Result<PacketType> {
421        let val = match val {
422            1 => PacketType::Connect,
423            2 => PacketType::ConnAck,
424            3 => PacketType::Publish,
425            4 => PacketType::PubAck,
426            5 => PacketType::PubRec,
427            6 => PacketType::PubRel,
428            7 => PacketType::PubComp,
429            8 => PacketType::Subscribe,
430            9 => PacketType::SubAck,
431            10 => PacketType::UnSubscribe,
432            11 => PacketType::UnsubAck,
433            12 => PacketType::PingReq,
434            13 => PacketType::PingResp,
435            14 => PacketType::Disconnect,
436            15 => PacketType::Auth,
437            _ => err!(MalformedPacket, code: MalformedPacket, "forbidden packet-type")?,
438        };
439
440        Ok(val)
441    }
442}
443
444impl From<PacketType> for u8 {
445    fn from(val: PacketType) -> u8 {
446        match val {
447            PacketType::Connect => 1,
448            PacketType::ConnAck => 2,
449            PacketType::Publish => 3,
450            PacketType::PubAck => 4,
451            PacketType::PubRec => 5,
452            PacketType::PubRel => 6,
453            PacketType::PubComp => 7,
454            PacketType::Subscribe => 8,
455            PacketType::SubAck => 9,
456            PacketType::UnSubscribe => 10,
457            PacketType::UnsubAck => 11,
458            PacketType::PingReq => 12,
459            PacketType::PingResp => 13,
460            PacketType::Disconnect => 14,
461            PacketType::Auth => 15,
462        }
463    }
464}
465
466/// Quality of service
467#[cfg_attr(any(feature = "fuzzy", test), derive(Arbitrary))]
468#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
469pub enum QoS {
470    AtMostOnce = 0,
471    AtLeastOnce = 1,
472    ExactlyOnce = 2,
473}
474
475impl Default for QoS {
476    fn default() -> QoS {
477        QoS::AtMostOnce
478    }
479}
480
481impl fmt::Display for QoS {
482    fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
483        match self {
484            QoS::AtMostOnce => write!(f, "at_most_once"),
485            QoS::AtLeastOnce => write!(f, "at_least_once"),
486            QoS::ExactlyOnce => write!(f, "exactly_once"),
487        }
488    }
489}
490
491impl TryFrom<u8> for QoS {
492    type Error = Error;
493
494    fn try_from(val: u8) -> Result<QoS> {
495        let val = match val {
496            0 => QoS::AtMostOnce,
497            1 => QoS::AtLeastOnce,
498            2 => QoS::ExactlyOnce,
499            _ => err!(MalformedPacket, code: MalformedPacket, "reserved QoS")?,
500        };
501
502        Ok(val)
503    }
504}
505
506impl From<QoS> for u8 {
507    fn from(val: QoS) -> u8 {
508        match val {
509            QoS::AtMostOnce => 0,
510            QoS::AtLeastOnce => 1,
511            QoS::ExactlyOnce => 2,
512        }
513    }
514}
515
516/// Type implement variable-length unsigned 32-bit integer.
517///
518/// Uses continuation bit at position 7 to continue reading next byte to frame 'u32'.
519///
520/// ```txt
521/// i/p stream: 0b0www_wwww 0b1zzz_zzzz 0b1yyy_yyyy 0b1xxx_xxxx, low-byte to high-byte
522/// o/p u32   : 0bwww_wwww_zzz_zzzz_yyy_yyyy_xxx_xxxx
523/// ```
524#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
525pub struct VarU32(pub u32);
526
527#[cfg(any(feature = "fuzzy", test))]
528impl<'a> Arbitrary<'a> for VarU32 {
529    fn arbitrary(uns: &mut Unstructured<'a>) -> result::Result<Self, ArbitraryError> {
530        let val: u32 = uns.arbitrary()?;
531        Ok(VarU32(val % *VarU32::MAX))
532    }
533}
534
535impl Deref for VarU32 {
536    type Target = u32;
537
538    fn deref(&self) -> &u32 {
539        &self.0
540    }
541}
542
543impl From<VarU32> for u32 {
544    fn from(val: VarU32) -> u32 {
545        val.0
546    }
547}
548
549impl Packetize for VarU32 {
550    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
551        let stream: &[u8] = stream.as_ref();
552
553        let n = cmp::min(stream.len(), mem::size_of::<u32>());
554        let mut out = 0_u32;
555        for i in 0..n {
556            let val = ((stream[i] as u32) & 0x7f) << (7 * (i as u32));
557            out += val;
558            if stream[i] < 0x80 {
559                return Ok((VarU32(out), i + 1));
560            }
561        }
562
563        err!(MalformedPacket, code: MalformedPacket, "VarU32::decode")
564    }
565
566    fn encode(&self) -> Result<Blob> {
567        let mut data = [0_u8; 32];
568        let size = match *(&self.0) {
569            val if val < 128 => {
570                data[0] = (val & 0x7f_u32) as u8;
571                1
572            }
573            val if val < 16_384 => {
574                data[0] = ((val & 0x7f_u32) as u8) | 0x80;
575                data[1] = ((val >> 7) & 0x7f_u32) as u8;
576                2
577            }
578            val if val < 2_097_152 => {
579                data[0] = ((val & 0x7f_u32) as u8) | 0x80;
580                data[1] = (((val >> 7) & 0x7f_u32) as u8) | 0x80;
581                data[2] = ((val >> 14) & 0x7f_u32) as u8;
582                3
583            }
584            val if val <= *VarU32::MAX => {
585                data[0] = ((val & 0x7f_u32) as u8) | 0x80;
586                data[1] = (((val >> 7) & 0x7f_u32) as u8) | 0x80;
587                data[2] = (((val >> 14) & 0x7f_u32) as u8) | 0x80;
588                data[3] = ((val >> 21) & 0x7f_u32) as u8;
589                4
590            }
591            val => err!(ProtocolError, desc: "VarU32::encode({})", val)?,
592        };
593
594        Ok(Blob::Small { data, size })
595    }
596}
597
598impl VarU32 {
599    /// This is a maximum value held by variable length 32-bit unsigned-integer. One
600    /// bit is sacrificed for each byte.
601    pub const MAX: VarU32 = VarU32(268_435_455);
602}
603
604/// RetainForwardRule part of Subscription option defined by MQTT spec.
605#[cfg_attr(any(feature = "fuzzy", test), derive(Arbitrary))]
606#[derive(Debug, Copy, Clone, Eq, PartialEq)]
607pub enum RetainForwardRule {
608    OnEverySubscribe = 0,
609    OnNewSubscribe = 1,
610    Never = 2,
611}
612
613impl Default for RetainForwardRule {
614    fn default() -> RetainForwardRule {
615        RetainForwardRule::OnEverySubscribe
616    }
617}
618
619impl TryFrom<u8> for RetainForwardRule {
620    type Error = Error;
621
622    fn try_from(val: u8) -> Result<RetainForwardRule> {
623        let val = match val {
624            0 => RetainForwardRule::OnEverySubscribe,
625            1 => RetainForwardRule::OnNewSubscribe,
626            2 => RetainForwardRule::Never,
627            val => err!(
628                MalformedPacket,
629                code: MalformedPacket,
630                "val:{} invalid retain-forward-value",
631                val
632            )?,
633        };
634
635        Ok(val)
636    }
637}
638
639impl From<RetainForwardRule> for u8 {
640    fn from(val: RetainForwardRule) -> u8 {
641        match val {
642            RetainForwardRule::OnEverySubscribe => 0,
643            RetainForwardRule::OnNewSubscribe => 1,
644            RetainForwardRule::Never => 2,
645        }
646    }
647}
648
649/// Type captures an active subscription by client.
650#[derive(Clone, Debug, Default)]
651pub struct Subscription {
652    /// Uniquely identifies this subscription for the subscribing client. Within entire
653    /// cluster, `(client_id, topic_filter)` is uqniue.
654    pub topic_filter: TopicFilter,
655
656    /// Subscribing client's unique ID.
657    pub client_id: ClientID,
658    /// Shard ID hosting this client and its session.
659    pub shard_id: u32,
660
661    /// Comes from SUBSCRIBE packet, Refer to MQTT spec.
662    pub subscription_id: Option<u32>,
663    /// Comes from SUBSCRIBE packet, Refer to MQTT spec.
664    pub qos: QoS,
665    /// Comes from SUBSCRIBE packet, Refer to MQTT spec.
666    pub no_local: bool,
667    /// Comes from SUBSCRIBE packet, Refer to MQTT spec.
668    pub retain_as_published: bool,
669    /// Comes from SUBSCRIBE packet, Refer to MQTT spec.
670    pub retain_forward_rule: RetainForwardRule,
671}
672
673impl AsRef<ClientID> for Subscription {
674    fn as_ref(&self) -> &ClientID {
675        &self.client_id
676    }
677}
678
679impl PartialEq for Subscription {
680    fn eq(&self, other: &Self) -> bool {
681        self.topic_filter == other.topic_filter
682            && self.client_id == other.client_id
683            && self.shard_id == other.shard_id
684            // subscription options
685            && self.subscription_id == other.subscription_id
686            && self.qos == other.qos
687            && self.no_local == other.no_local
688            && self.retain_as_published == other.retain_as_published
689            && self.retain_forward_rule == other.retain_forward_rule
690    }
691}
692
693impl Eq for Subscription {}
694
695impl PartialOrd for Subscription {
696    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
697        match self.client_id.cmp(&other.client_id) {
698            cmp::Ordering::Equal => Some(self.topic_filter.cmp(&other.topic_filter)),
699            val => Some(val),
700        }
701    }
702}
703
704impl Ord for Subscription {
705    fn cmp(&self, other: &Self) -> cmp::Ordering {
706        self.partial_cmp(other).unwrap()
707    }
708}
709
710#[cfg(any(feature = "fuzzy", test))]
711impl<'a> Arbitrary<'a> for Subscription {
712    fn arbitrary(uns: &mut Unstructured<'a>) -> result::Result<Self, ArbitraryError> {
713        let val = Subscription {
714            topic_filter: uns.arbitrary()?,
715            client_id: uns.arbitrary()?,
716            shard_id: uns.arbitrary()?,
717            subscription_id: uns.arbitrary()?,
718            qos: uns.arbitrary()?,
719            no_local: uns.arbitrary()?,
720            retain_as_published: uns.arbitrary()?,
721            retain_forward_rule: uns.arbitrary()?,
722        };
723
724        Ok(val)
725    }
726}
727
728impl Packetize for u8 {
729    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
730        let stream: &[u8] = stream.as_ref();
731
732        match stream.len() {
733            n if n >= 1 => Ok((stream[0], 1)),
734            _ => err!(InsufficientBytes, code: MalformedPacket, "u8::decode()"),
735        }
736    }
737
738    fn encode(&self) -> Result<Blob> {
739        let mut blob = Blob::Small { data: Default::default(), size: 1 };
740        match &mut blob {
741            Blob::Small { data, .. } => data[0] = *self,
742            _ => (),
743        }
744
745        Ok(blob)
746    }
747}
748
749impl Packetize for u16 {
750    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
751        let stream: &[u8] = stream.as_ref();
752
753        match stream.len() {
754            n if n >= 2 => Ok((u16::from_be_bytes(stream[..2].try_into().unwrap()), 2)),
755            _ => err!(InsufficientBytes, code: MalformedPacket, "u16::decode()"),
756        }
757    }
758
759    fn encode(&self) -> Result<Blob> {
760        let mut blob = Blob::Small { data: Default::default(), size: 2 };
761        match &mut blob {
762            Blob::Small { data, .. } => data[..2].copy_from_slice(&self.to_be_bytes()),
763            _ => (),
764        }
765
766        Ok(blob)
767    }
768}
769
770impl Packetize for u32 {
771    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
772        let stream: &[u8] = stream.as_ref();
773
774        match stream.len() {
775            n if n >= 4 => Ok((u32::from_be_bytes(stream[..4].try_into().unwrap()), 4)),
776            _ => err!(InsufficientBytes, code: MalformedPacket, "u32::decode()"),
777        }
778    }
779
780    fn encode(&self) -> Result<Blob> {
781        let mut blob = Blob::Small { data: Default::default(), size: 4 };
782        match &mut blob {
783            Blob::Small { data, .. } => data[..4].copy_from_slice(&self.to_be_bytes()),
784            _ => (),
785        }
786
787        Ok(blob)
788    }
789}
790
791impl Packetize for String {
792    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
793        let stream: &[u8] = stream.as_ref();
794
795        let (len, _) = u16::decode(stream)?;
796        let len = usize::from(len);
797        if len + 2 > stream.len() {
798            err!(InsufficientBytes, code: MalformedPacket, "String::decode")?;
799        }
800
801        match std::str::from_utf8(&stream[2..2 + len]) {
802            Ok(s) if !s.chars().all(util::is_valid_utf8_code_point) => {
803                err!(
804                    MalformedPacket,
805                    code: MalformedPacket,
806                    "String::encode invalid utf8 string"
807                )
808            }
809            Ok(s) => Ok((s.to_string(), 2 + len)),
810            Err(err) => {
811                err!(MalformedPacket, code: MalformedPacket, cause: err, "String::decode")
812            }
813        }
814    }
815
816    fn encode(&self) -> Result<Blob> {
817        if !self.chars().all(util::is_valid_utf8_code_point) {
818            err!(ProtocolError, desc: "String::encode invalid utf8 string")?;
819        }
820
821        match self.len() {
822            n if n > (u16::MAX as usize) => {
823                err!(ProtocolError, desc: "String::encode too large {:?}", n)
824            }
825            n if n < 30 => {
826                let mut data = [0_u8; 32];
827                data[0..2].copy_from_slice(&(n as u16).to_be_bytes());
828                data[2..2 + n].copy_from_slice(self.as_bytes());
829                Ok(Blob::Small { data, size: 2 + n })
830            }
831            n => {
832                let mut data = Vec::with_capacity(2 + n);
833                data.extend_from_slice(&(n as u16).to_be_bytes());
834                data.extend_from_slice(self.as_bytes());
835                Ok(Blob::Large { data })
836            }
837        }
838    }
839}
840
841impl Packetize for Vec<u8> {
842    fn decode<T: AsRef<[u8]>>(stream: T) -> Result<(Self, usize)> {
843        let stream: &[u8] = stream.as_ref();
844
845        let (len, _) = u16::decode(stream)?;
846        let len = usize::from(len);
847        if len + 2 > stream.len() {
848            err!(InsufficientBytes, code: MalformedPacket, "Vector::decode")?;
849        }
850        Ok((stream[2..2 + len].to_vec(), 2 + len))
851    }
852
853    fn encode(&self) -> Result<Blob> {
854        match self.len() {
855            n if n > (u16::MAX as usize) => {
856                err!(ProtocolError, desc: "Vector::encode({})", n)
857            }
858            n => {
859                let mut data = Vec::with_capacity(2 + n);
860                data.extend_from_slice(&(n as u16).to_be_bytes());
861                data.extend_from_slice(self.as_ref());
862                Ok(Blob::Large { data })
863            }
864        }
865    }
866}
867
868#[derive(Clone, Copy)]
869pub struct F32(f32);
870
871impl Deref for F32 {
872    type Target = f32;
873
874    fn deref(&self) -> &f32 {
875        &self.0
876    }
877}
878
879impl DerefMut for F32 {
880    fn deref_mut(&mut self) -> &mut f32 {
881        &mut self.0
882    }
883}
884
885impl From<f32> for F32 {
886    fn from(val: f32) -> F32 {
887        F32(val)
888    }
889}
890
891impl From<F32> for f32 {
892    fn from(val: F32) -> f32 {
893        val.0
894    }
895}
896
897impl PartialEq for F32 {
898    fn eq(&self, other: &F32) -> bool {
899        self.total_cmp(other) == cmp::Ordering::Equal
900    }
901}
902
903impl Eq for F32 {}
904
905impl std::str::FromStr for F32 {
906    type Err = std::num::ParseFloatError;
907
908    fn from_str(src: &str) -> result::Result<F32, std::num::ParseFloatError> {
909        Ok(F32(f32::from_str(src)?))
910    }
911}