svc_agent/mqtt/incoming_message/
incoming_request.rs1use super::super::*;
2use crate::mqtt::ExtraTags;
3use crate::{AccountId, Addressable, AgentId, Authenticable};
4
5#[derive(Debug, Clone, Deserialize, Serialize)]
7pub struct IncomingRequestProperties {
8 method: String,
9 correlation_data: String,
10 response_topic: String,
11 #[serde(flatten)]
12 conn: ConnectionProperties,
13 broker_agent_id: AgentId,
14 #[serde(flatten)]
15 long_term_timing: LongTermTimingProperties,
16 #[serde(flatten)]
17 short_term_timing: IncomingShortTermTimingProperties,
18 #[serde(flatten)]
19 tracking: TrackingProperties,
20 #[serde(default, skip_serializing_if = "Option::is_none")]
21 local_tracking_label: Option<String>,
22 #[serde(flatten)]
23 tags: ExtraTags,
24}
25
26impl IncomingRequestProperties {
27 pub fn method(&self) -> &str {
28 &self.method
29 }
30
31 pub fn correlation_data(&self) -> &str {
32 &self.correlation_data
33 }
34
35 pub fn response_topic(&self) -> &str {
36 &self.response_topic
37 }
38
39 pub fn broker_agent_id(&self) -> &AgentId {
40 &self.broker_agent_id
41 }
42
43 pub fn long_term_timing(&self) -> &LongTermTimingProperties {
44 &self.long_term_timing
45 }
46
47 pub fn short_term_timing(&self) -> &IncomingShortTermTimingProperties {
48 &self.short_term_timing
49 }
50
51 pub fn tracking(&self) -> &TrackingProperties {
52 &self.tracking
53 }
54
55 pub fn local_tracking_label(&self) -> &Option<String> {
56 &self.local_tracking_label
57 }
58
59 pub fn to_connection(&self) -> Connection {
60 self.conn.to_connection()
61 }
62
63 pub fn tags(&self) -> &ExtraTags {
64 &self.tags
65 }
66
67 pub fn set_method(&mut self, method: &str) {
68 self.tags.set_method(method);
69 }
70
71 pub fn to_event(
89 &self,
90 label: &'static str,
91 short_term_timing: OutgoingShortTermTimingProperties,
92 ) -> OutgoingEventProperties {
93 let long_term_timing = self.update_long_term_timing(&short_term_timing);
94 let mut props = OutgoingEventProperties::new(label, short_term_timing);
95
96 props.set_long_term_timing(long_term_timing);
97 props.set_tracking(self.tracking.clone());
98 props.set_tags(self.tags.clone());
99
100 if let Some(ref label) = self.local_tracking_label {
101 props.set_local_tracking_label(label.to_owned());
102 }
103 props
104 }
105
106 pub fn to_request(
128 &self,
129 method: &str,
130 response_topic: &str,
131 correlation_data: &str,
132 short_term_timing: OutgoingShortTermTimingProperties,
133 ) -> OutgoingRequestProperties {
134 let long_term_timing = self.update_long_term_timing(&short_term_timing);
135
136 let mut props = OutgoingRequestProperties::new(
137 method,
138 response_topic,
139 correlation_data,
140 short_term_timing,
141 );
142
143 props.set_long_term_timing(long_term_timing);
144 props.set_tracking(self.tracking.clone());
145 props.set_tags(self.tags.clone());
146
147 if let Some(ref label) = self.local_tracking_label {
148 props.set_local_tracking_label(label.to_owned());
149 }
150 props
151 }
152
153 pub fn to_response(
170 &self,
171 status: ResponseStatus,
172 short_term_timing: OutgoingShortTermTimingProperties,
173 ) -> OutgoingResponseProperties {
174 let mut props = OutgoingResponseProperties::new(
175 status,
176 &self.correlation_data,
177 self.update_long_term_timing(&short_term_timing),
178 short_term_timing,
179 self.tracking.clone(),
180 self.local_tracking_label.clone(),
181 );
182
183 props.set_response_topic(&self.response_topic);
184 props.set_tags(self.tags.clone());
185
186 props
187 }
188
189 fn update_long_term_timing(
190 &self,
191 short_term_timing: &OutgoingShortTermTimingProperties,
192 ) -> LongTermTimingProperties {
193 self.long_term_timing
194 .clone()
195 .update_cumulative_timings(short_term_timing)
196 }
197}
198
199impl Authenticable for IncomingRequestProperties {
200 fn as_account_id(&self) -> &AccountId {
201 self.conn.as_account_id()
202 }
203}
204
205impl Addressable for IncomingRequestProperties {
206 fn as_agent_id(&self) -> &AgentId {
207 self.conn.as_agent_id()
208 }
209}
210
211impl Authenticable for &IncomingRequestProperties {
212 fn as_account_id(&self) -> &AccountId {
213 self.conn.as_account_id()
214 }
215}
216
217impl Addressable for &IncomingRequestProperties {
218 fn as_agent_id(&self) -> &AgentId {
219 self.conn.as_agent_id()
220 }
221}
222
223pub type IncomingRequest<T> = IncomingMessageContent<T, IncomingRequestProperties>;
224
225impl<T> IncomingRequest<T> {
226 pub fn to_response<R>(
247 &self,
248 data: R,
249 status: ResponseStatus,
250 timing: OutgoingShortTermTimingProperties,
251 api_version: &str,
252 ) -> OutgoingMessage<R>
253 where
254 R: serde::Serialize,
255 {
256 OutgoingMessage::Response(OutgoingResponse::new(
257 data,
258 self.properties().to_response(status, timing),
259 Destination::Unicast(
260 self.properties().as_agent_id().clone(),
261 api_version.to_owned(),
262 ),
263 ))
264 }
265}
266
267impl<String: std::ops::Deref<Target = str>> IncomingRequest<String> {
268 pub fn convert_payload<T>(message: &IncomingRequest<String>) -> Result<T, Error>
269 where
270 T: serde::de::DeserializeOwned,
271 {
272 let payload = serde_json::from_str::<T>(message.payload()).map_err(|e| {
273 Error::new(&format!(
274 "error deserializing payload of an envelope, {}",
275 &e
276 ))
277 })?;
278 Ok(payload)
279 }
280
281 pub fn convert<T>(message: IncomingRequest<String>) -> Result<IncomingRequest<T>, Error>
282 where
283 T: serde::de::DeserializeOwned,
284 {
285 let props = message.properties().to_owned();
286 let payload = serde_json::from_str::<T>(message.payload()).map_err(|e| {
287 Error::new(&format!(
288 "error deserializing payload of an envelope, {}",
289 &e
290 ))
291 })?;
292 Ok(IncomingRequest::new(payload, props))
293 }
294}