svc_agent/mqtt/outgoing_message/
outgoing_request.rs1use chrono::{DateTime, Utc};
2use serde::Serialize;
3
4use crate::serde::ts_milliseconds_string_option;
5use crate::Addressable;
6use crate::AgentId;
7use crate::Authenticable;
8
9use super::*;
10
11#[derive(Debug, Serialize)]
13pub struct OutgoingRequestProperties {
14 method: String,
15 correlation_data: String,
16 response_topic: String,
17 #[serde(default, skip_serializing_if = "Option::is_none")]
18 agent_id: Option<AgentId>,
19 #[serde(flatten)]
20 long_term_timing: Option<LongTermTimingProperties>,
21 #[serde(flatten)]
22 short_term_timing: OutgoingShortTermTimingProperties,
23 #[serde(flatten)]
24 tracking: Option<TrackingProperties>,
25 #[serde(
26 default,
27 skip_serializing_if = "Option::is_none",
28 with = "ts_milliseconds_string_option"
29 )]
30 local_timestamp: Option<DateTime<Utc>>,
31 #[serde(default, skip_serializing_if = "Option::is_none")]
32 local_tracking_label: Option<String>,
33 #[serde(skip)]
34 tags: ExtraTags,
35}
36
37impl OutgoingRequestProperties {
38 pub fn new(
62 method: &str,
63 response_topic: &str,
64 correlation_data: &str,
65 short_term_timing: OutgoingShortTermTimingProperties,
66 ) -> Self {
67 Self {
68 method: method.to_owned(),
69 response_topic: response_topic.to_owned(),
70 correlation_data: correlation_data.to_owned(),
71 agent_id: None,
72 long_term_timing: None,
73 short_term_timing,
74 tracking: None,
75 local_timestamp: None,
76 local_tracking_label: None,
77 tags: Default::default(),
78 }
79 }
80
81 pub fn set_agent_id(&mut self, agent_id: AgentId) -> &mut Self {
82 self.agent_id = Some(agent_id);
83 self
84 }
85
86 pub fn set_long_term_timing(&mut self, timing: LongTermTimingProperties) -> &mut Self {
87 self.long_term_timing = Some(timing);
88 self
89 }
90
91 pub fn set_tracking(&mut self, tracking: TrackingProperties) -> &mut Self {
92 self.tracking = Some(tracking);
93 self
94 }
95
96 pub fn set_local_tracking_label(&mut self, local_tracking_label: String) -> &mut Self {
97 self.local_tracking_label = Some(local_tracking_label);
98 self
99 }
100
101 pub fn set_local_timestamp(&mut self, local_timestamp: DateTime<Utc>) -> &mut Self {
102 self.local_timestamp = Some(local_timestamp);
103 self
104 }
105
106 pub fn correlation_data(&self) -> &str {
107 &self.correlation_data
108 }
109
110 pub fn tags(&self) -> &ExtraTags {
111 &self.tags
112 }
113
114 pub fn set_tags(&mut self, tags: ExtraTags) -> &mut Self {
115 self.tags = tags;
116 self
117 }
118}
119
120pub type OutgoingRequest<T> = OutgoingMessageContent<T, OutgoingRequestProperties>;
121
122impl<T> OutgoingRequest<T>
123where
124 T: serde::Serialize,
125{
126 pub fn multicast<A>(
148 payload: T,
149 properties: OutgoingRequestProperties,
150 to: &A,
151 version: &str,
152 ) -> OutgoingMessage<T>
153 where
154 A: Authenticable,
155 {
156 OutgoingMessage::Request(Self::new(
157 payload,
158 properties,
159 Destination::Multicast(to.as_account_id().to_owned(), version.to_owned()),
160 ))
161 }
162
163 pub fn unicast<A>(
186 payload: T,
187 properties: OutgoingRequestProperties,
188 to: &A,
189 version: &str,
190 ) -> OutgoingMessage<T>
191 where
192 A: Addressable,
193 {
194 OutgoingMessage::Request(Self::new(
195 payload,
196 properties,
197 Destination::Unicast(to.as_agent_id().to_owned(), version.to_owned()),
198 ))
199 }
200}
201
202impl<T: serde::Serialize> Publishable for OutgoingRequest<T> {
203 fn destination_topic(&self, publisher: &Address) -> Result<String, Error> {
204 match self.destination {
205 Destination::Unicast(ref agent_id, ref version) => Ok(format!(
206 "agents/{agent_id}/api/{version}/in/{app}",
207 agent_id = agent_id,
208 version = version,
209 app = publisher.id().as_account_id(),
210 )),
211 Destination::Multicast(ref account_id, ref version) => Ok(format!(
212 "agents/{agent_id}/api/{version}/out/{app}",
213 agent_id = publisher.id(),
214 version = version,
215 app = account_id,
216 )),
217 _ => Err(Error::new(&format!(
218 "destination = '{:?}' is incompatible with request message type",
219 self.destination,
220 ))),
221 }
222 }
223
224 fn qos(&self) -> QoS {
225 QoS::AtMostOnce
226 }
227}