ya_client/market/requestor.rs
1//! Requestor part of the Market API
2use ya_client_model::market::{
3 agreement::State, Agreement, AgreementListEntry, AgreementOperationEvent, AgreementProposal,
4 Demand, NewDemand, NewProposal, Offer, Proposal, Reason, RequestorEvent,
5};
6
7use crate::{web::default_on_timeout, web::WebClient, web::WebInterface, Result};
8use chrono::{DateTime, TimeZone, Utc};
9use std::fmt::Display;
10use ya_client_model::market::scan::NewScan;
11use ya_client_model::NodeId;
12
13/// Bindings for Requestor part of the Market API.
14#[derive(Clone)]
15pub struct MarketRequestorApi {
16 client: WebClient,
17}
18
19impl WebInterface for MarketRequestorApi {
20 const API_URL_ENV_VAR: &'static str = crate::market::MARKET_URL_ENV_VAR;
21 const API_SUFFIX: &'static str = ya_client_model::market::MARKET_API_PATH;
22
23 fn from_client(client: WebClient) -> Self {
24 MarketRequestorApi { client }
25 }
26}
27
28impl MarketRequestorApi {
29 /// Publishes Requestor capabilities via Demand.
30 ///
31 /// Demand object can be considered an "open" or public Demand, as it is not directed
32 /// at a specific Provider, but rather is sent to the market so that the matching
33 /// mechanism implementation can associate relevant Offers.
34 ///
35 /// **Note**: it is an "atomic" operation, ie. as soon as Subscription is placed,
36 /// the Demand is published on the market.
37 pub async fn subscribe(&self, demand: &NewDemand) -> Result<String> {
38 self.client.post("demands").send_json(&demand).json().await
39 }
40
41 /// Fetches all active Demands which have been published by the Requestor.
42 pub async fn get_demands(&self) -> Result<Vec<Demand>> {
43 self.client.get("demands").send().json().await
44 }
45
46 /// Stop subscription by invalidating a previously published Demand.
47 pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
48 let url = url_format!("demands/{subscription_id}");
49 self.client.delete(&url).send().json().await
50 }
51
52 /// Get events which have arrived from the market in response to the Demand
53 /// published by the Requestor via [`subscribe`](#method.subscribe).
54 /// Returns collection of at most `max_events` `RequestorEvents` or times out.
55 ///
56 /// This is a blocking operation. It will not return until there is at
57 /// least one new event.
58 ///
59 /// Returns Proposal related events:
60 ///
61 /// * `ProposalEvent` - Indicates that there is new Offer Proposal for
62 /// this Demand.
63 ///
64 /// * `ProposalRejectedEvent` - Indicates that the Provider has rejected
65 /// our previous Proposal related to this Demand. This effectively ends a
66 /// Negotiation chain - it explicitly indicates that the sender will not
67 /// create another counter-Proposal.
68 ///
69 /// * `PropertyQueryEvent` - not supported yet.
70 ///
71 /// **Note**: When `collectOffers` is waiting, simultaneous call to
72 /// `unsubscribeDemand` on the same `subscriptionId` should result in
73 /// "Subscription does not exist" error returned from `collectOffers`.
74 ///
75 /// **Note**: Specification requires this endpoint to support list of
76 /// specific Proposal Ids to listen for messages related only to specific
77 /// Proposals. This is not covered yet.
78 #[rustfmt::skip]
79 pub async fn collect(
80 &self,
81 subscription_id: &str,
82 timeout: Option<f32>,
83 max_events: Option<i32>,
84 ) -> Result<Vec<RequestorEvent>> {
85 let url = url_format!(
86 "demands/{subscription_id}/events",
87 #[query] timeout,
88 #[query] max_events,
89 );
90 self.client.get(&url).send().json().await.or_else(default_on_timeout)
91 }
92
93 /// Responds with a bespoke Demand to received Offer.
94 pub async fn counter_proposal(
95 &self,
96 demand_proposal: &NewProposal,
97 subscription_id: &str,
98 proposal_id: &str,
99 ) -> Result<String> {
100 let url = url_format!("demands/{subscription_id}/proposals/{proposal_id}",);
101 self.client
102 .post(&url)
103 .send_json(&demand_proposal)
104 .json()
105 .await
106 }
107
108 /// Fetches Proposal (Offer) with given id.
109 pub async fn get_proposal(&self, subscription_id: &str, proposal_id: &str) -> Result<Proposal> {
110 let url = url_format!("demands/{subscription_id}/proposals/{proposal_id}",);
111 self.client.get(&url).send().json().await
112 }
113
114 /// Rejects Proposal (Offer)
115 ///
116 /// Effectively ends a Negotiation chain - it explicitly indicates that
117 /// the sender will not create another counter-Proposal.
118 pub async fn reject_proposal(
119 &self,
120 subscription_id: &str,
121 proposal_id: &str,
122 reason: &Option<Reason>,
123 ) -> Result<()> {
124 let url = url_format!("demands/{subscription_id}/proposals/{proposal_id}/reject",);
125 self.client.post(&url).send_json(&reason).json().await
126 }
127
128 /// Creates Agreement from selected Proposal.
129 ///
130 /// Initiates the Agreement handshake phase.
131 ///
132 /// Formulates an Agreement artifact from the Proposal indicated by the
133 /// received Proposal Id.
134 ///
135 /// The Approval Expiry Date is added to Agreement artifact and implies
136 /// the effective timeout on the whole Agreement Confirmation sequence.
137 ///
138 /// A successful call to `create_agreement` shall immediately be followed
139 /// by a `confirm_agreement` and `wait_for_approval` call in order to listen
140 /// for responses from the Provider.
141 ///
142 /// **Note**: Moves given Proposal to `Approved` state.
143 pub async fn create_agreement(&self, agreement: &AgreementProposal) -> Result<String> {
144 self.client
145 .post("agreements")
146 .send_json(&agreement)
147 .json()
148 .await
149 }
150
151 /// Lists agreements
152 ///
153 /// Supports filtering by:
154 /// * state
155 /// * creation date
156 /// * app session id
157 pub async fn list_agreements(
158 &self,
159 state: Option<State>,
160 before_date: Option<DateTime<Utc>>,
161 after_date: Option<DateTime<Utc>>,
162 app_session_id: Option<String>,
163 ) -> Result<Vec<AgreementListEntry>> {
164 let url = url_format!(
165 "agreements",
166 #[query]
167 state,
168 #[query]
169 before_date,
170 #[query]
171 after_date,
172 #[query]
173 app_session_id,
174 );
175 self.client.get(&url).send().json().await
176 }
177
178 /// Fetches agreement with given agreement id.
179 pub async fn get_agreement(&self, agreement_id: &str) -> Result<Agreement> {
180 let url = url_format!("agreements/{agreement_id}");
181 self.client.get(&url).send().json().await
182 }
183
184 /// Sends Agreement draft to the Provider.
185 /// Signs Agreement self-created via `create_agreement` and sends it to the Provider.
186 #[rustfmt::skip]
187 pub async fn confirm_agreement(
188 &self,
189 agreement_id: &str,
190 app_session_id: Option<String>,
191 ) -> Result<()> {
192 let url = url_format!(
193 "agreements/{agreement_id}/confirm",
194 #[query] app_session_id,
195 );
196 self.client.post(&url).send().json().await
197 }
198
199 /// Waits for Agreement approval by the Provider.
200 ///
201 /// This is a blocking operation. The call may be aborted by Requestor caller
202 /// code. After the call is aborted or timed out, another `wait_for_approval`
203 /// call can be raised on the same `agreement_id`.
204 ///
205 /// It returns one of the following options:
206 ///
207 /// * `Ok` Agreement approved by the Provider.
208 /// The Providers’s corresponding `approveAgreement` call returns `204`
209 /// (Approved) **before** this endpoint on the Requestor side.
210 /// The Provider is now ready to accept a request to start an Activity.
211 ///
212 /// * `Err` - Indicates that Agreement is not approved.
213 /// - `408` Agreement not approved within given timeout. Try again.
214 /// - `409` Agreement not confirmed yet by Requestor himself.
215 /// - `410` Agreement is not approved. This state is permanent.
216 ///
217 /// Attached `ErrorMessage` contains further details:
218 /// - `Rejected` - Indicates that the Provider has called
219 /// `rejectAgreement`, which effectively stops the Agreement handshake.
220 /// The Requestor may attempt to return to the Negotiation phase by
221 /// sending a new Proposal or to the Agreement phase by creating
222 /// new Agreement.
223 /// - `Cancelled` - Indicates that the Requestor himself has called
224 /// `cancelAgreement`, which effectively stops the Agreement handshake.
225 /// - `Expired` - Indicates that Agreement validity period elapsed and it
226 /// was not approved, rejected nor cancelled.
227 /// - `Terminated` - Indicates that Agreement is already terminated.
228 #[rustfmt::skip]
229 pub async fn wait_for_approval(
230 &self,
231 agreement_id: &str,
232 timeout: Option<f32>,
233 ) -> Result<()> {
234 let url = url_format!(
235 "agreements/{agreement_id}/wait",
236 #[query] timeout,
237 );
238 self.client.post(&url).send().json().await
239 }
240
241 /// Cancels Agreement.
242 ///
243 /// It is only possible before Requestor confirmed or Provider approved
244 /// or rejected the Agreement, and before Expiration.
245 ///
246 /// Causes the awaiting `wait_for_approval` call to return with `Cancelled` response.
247 /// Also the Provider's corresponding `approve_agreement` returns `Cancelled`.
248 pub async fn cancel_agreement(
249 &self,
250 agreement_id: &str,
251 reason: &Option<Reason>,
252 ) -> Result<()> {
253 let url = url_format!("agreements/{agreement_id}/cancel");
254 self.client.post(&url).send_json(&reason).json().await
255 }
256
257 /// Terminates approved Agreement.
258 pub async fn terminate_agreement(
259 &self,
260 agreement_id: &str,
261 reason: &Option<Reason>,
262 ) -> Result<()> {
263 let url = url_format!("agreements/{agreement_id}/terminate");
264 self.client.post(&url).send_json(&reason).json().await
265 }
266
267 /// Collects events related to an Agreement.
268 ///
269 /// This is a blocking operation. It will not return until there is
270 /// at least one new event. All events are appearing on both sides equally.
271 ///
272 /// Returns Agreement related events:
273 ///
274 /// * `AgreementApprovedEvent` - Indicates that the Agreement has been
275 /// approved by the Provider.
276 /// - The Provider is now ready to accept a request to start an
277 /// Activity as described in the negotiated agreement.
278 /// - The Providers’s corresponding `approveAgreement` call
279 /// returns `Approved` after this event is emitted.
280 ///
281 /// * `AgreementRejectedEvent` - Indicates that the Provider has called
282 /// `rejectAgreement`, which effectively stops the Agreement handshake.
283 /// The Requestor may attempt to return to the Negotiation phase by
284 /// sending a new Proposal.
285 ///
286 /// * `AgreementCancelledEvent` - Indicates that the Requestor has called
287 /// `cancelAgreement`, which effectively stops the Agreement handshake.
288 ///
289 /// * `AgreementTerminatedEvent` - Indicates that the Agreement has been
290 /// terminated by specified party (contains signature).
291 #[rustfmt::skip]
292 pub async fn collect_agreement_events<Tz>(
293 &self,
294 timeout: Option<f32>,
295 after_timestamp: Option<&DateTime<Tz>>,
296 max_events: Option<i32>,
297 app_session_id: Option<String>,
298 ) -> Result<Vec<AgreementOperationEvent>>
299 where
300 Tz: TimeZone,
301 Tz::Offset: Display,
302 {
303 let after_timestamp = after_timestamp.map(|dt| dt.to_rfc3339());
304 let url = url_format!(
305 "agreementEvents",
306 #[query] timeout,
307 #[query] after_timestamp,
308 #[query] max_events,
309 #[query] app_session_id,
310 );
311 self.client.get(&url).send().json().await.or_else(default_on_timeout)
312 }
313
314 pub async fn begin_scan(&self, scan_req: &NewScan) -> Result<String> {
315 self.client.post("scan").send_json(&scan_req).json().await
316 }
317
318 pub async fn collect_scan(
319 &self,
320 subscription_id: &str,
321 timeout: Option<f32>,
322 max_events: Option<usize>,
323 peer_id: Option<&NodeId>,
324 ) -> Result<Vec<Offer>> {
325 let url = url_format!(
326 "scan/{subscription_id}/events",
327 #[query]
328 timeout,
329 #[query]
330 max_events,
331 #[query]
332 peer_id
333 );
334 self.client
335 .get(&url)
336 .send()
337 .json()
338 .await
339 .or_else(default_on_timeout)
340 }
341
342 pub async fn end_scan(&self, subscription_id: &str) -> Result<()> {
343 let url = url_format!("scan/{subscription_id}");
344 self.client.delete(&url).send().json().await
345 }
346}