1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
use crate::{
    packets::Pub,
    properties::Property,
    types::{BinaryData, Properties, Utf8String},
    ProtocolError, QoS, Retain,
};

pub trait ToPayload {
    type Error;
    fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
}

impl<'a> ToPayload for &'a [u8] {
    type Error = ();

    fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
        if buffer.len() < self.len() {
            return Err(());
        }
        buffer[..self.len()].copy_from_slice(self);
        Ok(self.len())
    }
}

impl<'a> ToPayload for &'a str {
    type Error = ();

    fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
        self.as_bytes().serialize(buffer)
    }
}

impl<const N: usize> ToPayload for &[u8; N] {
    type Error = ();

    fn serialize(self, buffer: &mut [u8]) -> Result<usize, ()> {
        (&self[..]).serialize(buffer)
    }
}

/// A publication where the payload is serialized directly into the transmission buffer in the
/// future.
///
/// # Note
/// This is "deferred" because the closure will only be called once the publication is actually
/// sent.
pub struct DeferredPublication<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> {
    func: F,
}

impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> DeferredPublication<E, F> {
    pub fn new<'a>(func: F) -> Publication<'a, Self> {
        Publication::new(Self { func })
    }
}

impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> ToPayload for DeferredPublication<E, F> {
    type Error = E;
    fn serialize(self, buffer: &mut [u8]) -> Result<usize, E> {
        (self.func)(buffer)
    }
}

/// Builder pattern for generating MQTT publications.
///
/// # Note
/// By default, messages are constructed with:
/// * A QoS setting of [QoS::AtMostOnce]
/// * No properties
/// * No destination topic
/// * Retention set to [Retain::NotRetained]
///
/// It is expected that the user provide a topic either by directly specifying a publication topic
/// in [Publication::topic], or by parsing a topic from the [Property::ResponseTopic] property
/// contained within received properties by using the [Publication::reply] API.
pub struct Publication<'a, P: ToPayload> {
    topic: Option<&'a str>,
    properties: Properties<'a>,
    qos: QoS,
    payload: P,
    retain: Retain,
}

impl<'a, P: ToPayload> Publication<'a, P> {
    /// Construct a new publication with a payload.
    pub fn new(payload: P) -> Self {
        Self {
            payload,
            qos: QoS::AtMostOnce,
            topic: None,
            properties: Properties::Slice(&[]),
            retain: Retain::NotRetained,
        }
    }

    /// Specify the [QoS] of the publication. By default, the QoS is set to [QoS::AtMostOnce].
    pub fn qos(mut self, qos: QoS) -> Self {
        self.qos = qos;
        self
    }

    /// Specify that this message should be [Retain::Retained].
    pub fn retain(mut self) -> Self {
        self.retain = Retain::Retained;
        self
    }

    /// Specify the publication topic for this message.
    ///
    /// # Note
    /// If this is called after [Publication::reply] determines a response topic, the response
    /// topic will be overridden.
    pub fn topic(mut self, topic: &'a str) -> Self {
        self.topic.replace(topic);
        self
    }

    /// Specify properties associated with this publication.
    pub fn properties(mut self, properties: &'a [Property<'a>]) -> Self {
        self.properties = match self.properties {
            Properties::Slice(_) => Properties::Slice(properties),
            Properties::CorrelatedSlice { correlation, .. } => Properties::CorrelatedSlice {
                correlation,
                properties,
            },
            _ => unimplemented!(),
        };
        self
    }

    /// Generate the publication as a reply to some other received message.
    ///
    /// # Note
    /// The received message properties are parsed for both [Property::CorrelationData] and
    /// [Property::ResponseTopic].
    ///
    /// * If correlation data is found, it is automatically appended to the
    /// publication properties.
    ///
    /// * If a response topic is identified, the message topic will be
    /// configured for it, which will override any previously-specified topic.
    pub fn reply(mut self, received_properties: &'a Properties<'a>) -> Self {
        if let Some(response_topic) = received_properties.into_iter().find_map(|p| {
            if let Ok(Property::ResponseTopic(topic)) = p {
                Some(topic.0)
            } else {
                None
            }
        }) {
            self.topic.replace(response_topic);
        }

        // Next, copy over any correlation data to the outbound properties.
        if let Some(correlation_data) = received_properties.into_iter().find_map(|p| {
            if let Ok(Property::CorrelationData(data)) = p {
                Some(data.0)
            } else {
                None
            }
        }) {
            self.correlate(correlation_data)
        } else {
            self
        }
    }

    /// Include correlation data to the message
    ///
    /// # Note
    /// This will override any existing correlation data in the message.
    ///
    /// # Args
    /// * `data` - The data composing the correlation data.
    pub fn correlate(mut self, data: &'a [u8]) -> Self {
        self.properties = match self.properties {
            Properties::Slice(properties) | Properties::CorrelatedSlice { properties, .. } => {
                Properties::CorrelatedSlice {
                    properties,
                    correlation: Property::CorrelationData(BinaryData(data)),
                }
            }
            _ => unimplemented!(),
        };

        self
    }

    /// Generate the final publication.
    ///
    /// # Returns
    /// The message to be published if a publication topic was specified. If no publication topic
    /// was identified, an error is returned.
    pub fn finish(self) -> Result<Pub<'a, P>, ProtocolError> {
        Ok(Pub {
            topic: Utf8String(self.topic.ok_or(ProtocolError::NoTopic)?),
            properties: self.properties,
            packet_id: None,
            payload: self.payload,
            retain: self.retain,
            qos: self.qos,
            dup: false,
        })
    }
}