svc_agent/mqtt/outgoing_message/
outgoing_request.rs

1use chrono::{DateTime, Utc};
2use serde::Serialize;
3
4use crate::serde::ts_milliseconds_string_option;
5use crate::Addressable;
6use crate::AgentId;
7use crate::Authenticable;
8
9use super::*;
10
11/// Properties of an outgoing request.
12#[derive(Debug, Serialize)]
13pub struct OutgoingRequestProperties {
14    method: String,
15    correlation_data: String,
16    response_topic: String,
17    #[serde(default, skip_serializing_if = "Option::is_none")]
18    agent_id: Option<AgentId>,
19    #[serde(flatten)]
20    long_term_timing: Option<LongTermTimingProperties>,
21    #[serde(flatten)]
22    short_term_timing: OutgoingShortTermTimingProperties,
23    #[serde(flatten)]
24    tracking: Option<TrackingProperties>,
25    #[serde(
26        default,
27        skip_serializing_if = "Option::is_none",
28        with = "ts_milliseconds_string_option"
29    )]
30    local_timestamp: Option<DateTime<Utc>>,
31    #[serde(default, skip_serializing_if = "Option::is_none")]
32    local_tracking_label: Option<String>,
33    #[serde(skip)]
34    tags: ExtraTags,
35}
36
37impl OutgoingRequestProperties {
38    /// Builds [OutgoingRequestProperties](struct.OutgoingRequestProperties.html).
39    ///
40    /// Use this function only if you're making a request from scratch.
41    ///
42    /// If you make a request while handling another request consider using
43    /// [IncomingRequestProperties::to_request](struct.IncomingRequestProperties.html#method.to_request).
44    ///
45    /// # Arguments
46    ///
47    /// * `method` – request method.
48    /// * `response_topic` – a topic to send the response to the request to.
49    /// * `correlation_data` – any string to correlate request with the upcoming response.
50    /// * `short_term_timing` – outgoing request's short term timing properties.
51    ///
52    /// # Example
53    ///
54    /// ```
55    /// let props = OutgoingRequestProperties::new(
56    ///     "system.vacuum",
57    ///     &Subscription::unicast_responses(),
58    ///     OutgoingShortTermTimingProperties::new(Utc::now()),
59    /// );
60    /// ```
61    pub fn new(
62        method: &str,
63        response_topic: &str,
64        correlation_data: &str,
65        short_term_timing: OutgoingShortTermTimingProperties,
66    ) -> Self {
67        Self {
68            method: method.to_owned(),
69            response_topic: response_topic.to_owned(),
70            correlation_data: correlation_data.to_owned(),
71            agent_id: None,
72            long_term_timing: None,
73            short_term_timing,
74            tracking: None,
75            local_timestamp: None,
76            local_tracking_label: None,
77            tags: Default::default(),
78        }
79    }
80
81    pub fn set_agent_id(&mut self, agent_id: AgentId) -> &mut Self {
82        self.agent_id = Some(agent_id);
83        self
84    }
85
86    pub fn set_long_term_timing(&mut self, timing: LongTermTimingProperties) -> &mut Self {
87        self.long_term_timing = Some(timing);
88        self
89    }
90
91    pub fn set_tracking(&mut self, tracking: TrackingProperties) -> &mut Self {
92        self.tracking = Some(tracking);
93        self
94    }
95
96    pub fn set_local_tracking_label(&mut self, local_tracking_label: String) -> &mut Self {
97        self.local_tracking_label = Some(local_tracking_label);
98        self
99    }
100
101    pub fn set_local_timestamp(&mut self, local_timestamp: DateTime<Utc>) -> &mut Self {
102        self.local_timestamp = Some(local_timestamp);
103        self
104    }
105
106    pub fn correlation_data(&self) -> &str {
107        &self.correlation_data
108    }
109
110    pub fn tags(&self) -> &ExtraTags {
111        &self.tags
112    }
113
114    pub fn set_tags(&mut self, tags: ExtraTags) -> &mut Self {
115        self.tags = tags;
116        self
117    }
118}
119
120pub type OutgoingRequest<T> = OutgoingMessageContent<T, OutgoingRequestProperties>;
121
122impl<T> OutgoingRequest<T>
123where
124    T: serde::Serialize,
125{
126    /// Builds a multicast request to publish.
127    ///
128    /// # Arguments
129    ///
130    /// * `payload` – any serializable value.
131    /// * `properties` – properties of the outgoing request.
132    /// * `to` – destination [AccountId](../struct.AccountId.html).
133    ///
134    /// # Example
135    ///
136    /// ```
137    /// let props = request.properties().to_request(
138    ///     "room.enter",
139    ///     &Subscription::unicast_responses(),
140    ///     "some_corr_data",
141    ///     OutgoingShortTermTimingProperties::until_now(start_timestamp),
142    /// );
143    ///
144    /// let to = AccountId::new("service_name", "svc.example.org");
145    /// let message = OutgoingRequest::multicast(json!({ "foo": "bar" }), props, &to);
146    /// ```
147    pub fn multicast<A>(
148        payload: T,
149        properties: OutgoingRequestProperties,
150        to: &A,
151        version: &str,
152    ) -> OutgoingMessage<T>
153    where
154        A: Authenticable,
155    {
156        OutgoingMessage::Request(Self::new(
157            payload,
158            properties,
159            Destination::Multicast(to.as_account_id().to_owned(), version.to_owned()),
160        ))
161    }
162
163    /// Builds a unicast request to publish.
164    ///
165    /// # Arguments
166    ///
167    /// * `payload` – any serializable value.
168    /// * `properties` – properties of the outgoing request.
169    /// * `to` – destination [AgentId](../struct.AgentId.html).
170    /// * `version` – destination agent's API version.
171    ///
172    /// # Example
173    ///
174    /// ```
175    /// let props = request.properties().to_request(
176    ///     "room.enter",
177    ///     &Subscription::unicast_responses(),
178    ///     "some_corr_data",
179    ///     OutgoingShortTermTimingProperties::until_now(start_timestamp),
180    /// );
181    ///
182    /// let to = AgentId::new("instance01", AccountId::new("service_name", "svc.example.org"));
183    /// let message = OutgoingRequest::unicast(json!({ "foo": "bar" }), props, to, "v1");
184    /// ```
185    pub fn unicast<A>(
186        payload: T,
187        properties: OutgoingRequestProperties,
188        to: &A,
189        version: &str,
190    ) -> OutgoingMessage<T>
191    where
192        A: Addressable,
193    {
194        OutgoingMessage::Request(Self::new(
195            payload,
196            properties,
197            Destination::Unicast(to.as_agent_id().to_owned(), version.to_owned()),
198        ))
199    }
200}
201
202impl<T: serde::Serialize> Publishable for OutgoingRequest<T> {
203    fn destination_topic(&self, publisher: &Address) -> Result<String, Error> {
204        match self.destination {
205            Destination::Unicast(ref agent_id, ref version) => Ok(format!(
206                "agents/{agent_id}/api/{version}/in/{app}",
207                agent_id = agent_id,
208                version = version,
209                app = publisher.id().as_account_id(),
210            )),
211            Destination::Multicast(ref account_id, ref version) => Ok(format!(
212                "agents/{agent_id}/api/{version}/out/{app}",
213                agent_id = publisher.id(),
214                version = version,
215                app = account_id,
216            )),
217            _ => Err(Error::new(&format!(
218                "destination = '{:?}' is incompatible with request message type",
219                self.destination,
220            ))),
221        }
222    }
223
224    fn qos(&self) -> QoS {
225        QoS::AtMostOnce
226    }
227}