svc_agent/mqtt/incoming_message/
incoming_request.rs

1use super::super::*;
2use crate::mqtt::ExtraTags;
3use crate::{AccountId, Addressable, AgentId, Authenticable};
4
5/// Properties of an incoming request.
6#[derive(Debug, Clone, Deserialize, Serialize)]
7pub struct IncomingRequestProperties {
8    method: String,
9    correlation_data: String,
10    response_topic: String,
11    #[serde(flatten)]
12    conn: ConnectionProperties,
13    broker_agent_id: AgentId,
14    #[serde(flatten)]
15    long_term_timing: LongTermTimingProperties,
16    #[serde(flatten)]
17    short_term_timing: IncomingShortTermTimingProperties,
18    #[serde(flatten)]
19    tracking: TrackingProperties,
20    #[serde(default, skip_serializing_if = "Option::is_none")]
21    local_tracking_label: Option<String>,
22    #[serde(flatten)]
23    tags: ExtraTags,
24}
25
26impl IncomingRequestProperties {
27    pub fn method(&self) -> &str {
28        &self.method
29    }
30
31    pub fn correlation_data(&self) -> &str {
32        &self.correlation_data
33    }
34
35    pub fn response_topic(&self) -> &str {
36        &self.response_topic
37    }
38
39    pub fn broker_agent_id(&self) -> &AgentId {
40        &self.broker_agent_id
41    }
42
43    pub fn long_term_timing(&self) -> &LongTermTimingProperties {
44        &self.long_term_timing
45    }
46
47    pub fn short_term_timing(&self) -> &IncomingShortTermTimingProperties {
48        &self.short_term_timing
49    }
50
51    pub fn tracking(&self) -> &TrackingProperties {
52        &self.tracking
53    }
54
55    pub fn local_tracking_label(&self) -> &Option<String> {
56        &self.local_tracking_label
57    }
58
59    pub fn to_connection(&self) -> Connection {
60        self.conn.to_connection()
61    }
62
63    pub fn tags(&self) -> &ExtraTags {
64        &self.tags
65    }
66
67    pub fn set_method(&mut self, method: &str) {
68        self.tags.set_method(method);
69    }
70
71    /// Builds [OutgoingEventProperties](struct.OutgoingEventProperties.html) based on the
72    /// [IncomingRequestProperties](struct.IncomingRequestProperties.html).
73    ///
74    /// Use it to publish an event when something worth notifying subscribers happens during
75    /// the request processing.
76    ///
77    /// # Arguments
78    ///
79    /// * `label` – outgoing event label.
80    /// * `short_term_timing` – outgoing event's short term timing properties.
81    ///
82    /// # Example
83    ///
84    /// ```
85    /// let short_term_timing = OutgoingShortTermTimingProperties::until_now(start_timestamp);
86    /// let out_props = in_props.to_event("agent.enter", short_term_timing);
87    /// ```
88    pub fn to_event(
89        &self,
90        label: &'static str,
91        short_term_timing: OutgoingShortTermTimingProperties,
92    ) -> OutgoingEventProperties {
93        let long_term_timing = self.update_long_term_timing(&short_term_timing);
94        let mut props = OutgoingEventProperties::new(label, short_term_timing);
95
96        props.set_long_term_timing(long_term_timing);
97        props.set_tracking(self.tracking.clone());
98        props.set_tags(self.tags.clone());
99
100        if let Some(ref label) = self.local_tracking_label {
101            props.set_local_tracking_label(label.to_owned());
102        }
103        props
104    }
105
106    /// Builds [OutgoingRequestProperties](struct.OutgoingRequestProperties.html) based on the
107    /// [IncomingRequestProperties](struct.IncomingRequestProperties.html).
108    ///
109    /// Use it to send a request to another service while handling a request.
110    ///
111    /// # Arguments
112    ///
113    /// * `method` – request method.
114    /// * `response_topic` – topic for response.
115    /// * `correlation_data` – any string to correlate request with response.
116    /// * `short_term_timing` – outgoing request's short term timing properties.
117    ///
118    /// # Example
119    ///
120    /// ```
121    /// let out_props = in_props.to_request(
122    ///     "room.enter",
123    ///     &Subscription::unicast_responses(),
124    ///     OutgoingShortTermTimingProperties::until_now(start_timestamp),
125    /// );
126    /// ```
127    pub fn to_request(
128        &self,
129        method: &str,
130        response_topic: &str,
131        correlation_data: &str,
132        short_term_timing: OutgoingShortTermTimingProperties,
133    ) -> OutgoingRequestProperties {
134        let long_term_timing = self.update_long_term_timing(&short_term_timing);
135
136        let mut props = OutgoingRequestProperties::new(
137            method,
138            response_topic,
139            correlation_data,
140            short_term_timing,
141        );
142
143        props.set_long_term_timing(long_term_timing);
144        props.set_tracking(self.tracking.clone());
145        props.set_tags(self.tags.clone());
146
147        if let Some(ref label) = self.local_tracking_label {
148            props.set_local_tracking_label(label.to_owned());
149        }
150        props
151    }
152
153    /// Builds [OutgoingResponseProperties](struct.OutgoingResponseProperties.html) based on
154    /// the [IncomingRequestProperties](struct.IncomingRequestProperties.html).
155    ///
156    /// Use it to response on a request.
157    ///
158    /// # Arguments
159    ///
160    /// * `status` – response status.
161    /// * `short_term_timing` – outgoing response's short term timings properties.
162    ///
163    /// # Example
164    ///
165    /// ```
166    /// let short_term_timing = OutgoingShortTermTimingProperties::until_now(start_timestamp);
167    /// let out_props = in_props.to_response(ResponseStatus::OK, short_term_timing);
168    /// ```
169    pub fn to_response(
170        &self,
171        status: ResponseStatus,
172        short_term_timing: OutgoingShortTermTimingProperties,
173    ) -> OutgoingResponseProperties {
174        let mut props = OutgoingResponseProperties::new(
175            status,
176            &self.correlation_data,
177            self.update_long_term_timing(&short_term_timing),
178            short_term_timing,
179            self.tracking.clone(),
180            self.local_tracking_label.clone(),
181        );
182
183        props.set_response_topic(&self.response_topic);
184        props.set_tags(self.tags.clone());
185
186        props
187    }
188
189    fn update_long_term_timing(
190        &self,
191        short_term_timing: &OutgoingShortTermTimingProperties,
192    ) -> LongTermTimingProperties {
193        self.long_term_timing
194            .clone()
195            .update_cumulative_timings(short_term_timing)
196    }
197}
198
199impl Authenticable for IncomingRequestProperties {
200    fn as_account_id(&self) -> &AccountId {
201        self.conn.as_account_id()
202    }
203}
204
205impl Addressable for IncomingRequestProperties {
206    fn as_agent_id(&self) -> &AgentId {
207        self.conn.as_agent_id()
208    }
209}
210
211impl Authenticable for &IncomingRequestProperties {
212    fn as_account_id(&self) -> &AccountId {
213        self.conn.as_account_id()
214    }
215}
216
217impl Addressable for &IncomingRequestProperties {
218    fn as_agent_id(&self) -> &AgentId {
219        self.conn.as_agent_id()
220    }
221}
222
223pub type IncomingRequest<T> = IncomingMessageContent<T, IncomingRequestProperties>;
224
225impl<T> IncomingRequest<T> {
226    /// Builds [OutgoingResponse](OutgoingResponse.html) based on
227    /// the [IncomingRequest](IncomingRequest.html).
228    ///
229    /// Use it to response on a request.
230    ///
231    /// # Arguments
232    ///
233    /// * `data` – serializable response payload.
234    /// * `status` – response status.
235    /// * `timing` – outgoing response's short term timing properties.
236    ///
237    /// # Example
238    ///
239    /// ```
240    /// let response = request.to_response(
241    ///     json!({ "foo": "bar" }),
242    ///     ResponseStatus::OK,
243    ///     OutgoingShortTermTimingProperties::until_now(start_timestamp),
244    /// );
245    /// ```
246    pub fn to_response<R>(
247        &self,
248        data: R,
249        status: ResponseStatus,
250        timing: OutgoingShortTermTimingProperties,
251        api_version: &str,
252    ) -> OutgoingMessage<R>
253    where
254        R: serde::Serialize,
255    {
256        OutgoingMessage::Response(OutgoingResponse::new(
257            data,
258            self.properties().to_response(status, timing),
259            Destination::Unicast(
260                self.properties().as_agent_id().clone(),
261                api_version.to_owned(),
262            ),
263        ))
264    }
265}
266
267impl<String: std::ops::Deref<Target = str>> IncomingRequest<String> {
268    pub fn convert_payload<T>(message: &IncomingRequest<String>) -> Result<T, Error>
269    where
270        T: serde::de::DeserializeOwned,
271    {
272        let payload = serde_json::from_str::<T>(message.payload()).map_err(|e| {
273            Error::new(&format!(
274                "error deserializing payload of an envelope, {}",
275                &e
276            ))
277        })?;
278        Ok(payload)
279    }
280
281    pub fn convert<T>(message: IncomingRequest<String>) -> Result<IncomingRequest<T>, Error>
282    where
283        T: serde::de::DeserializeOwned,
284    {
285        let props = message.properties().to_owned();
286        let payload = serde_json::from_str::<T>(message.payload()).map_err(|e| {
287            Error::new(&format!(
288                "error deserializing payload of an envelope, {}",
289                &e
290            ))
291        })?;
292        Ok(IncomingRequest::new(payload, props))
293    }
294}