ya_client/market/provider.rs
1//! Provider part of the Market API
2use ya_client_model::market::{
3 agreement::State, Agreement, AgreementListEntry, AgreementOperationEvent, NewOffer,
4 NewProposal, Offer, Proposal, ProviderEvent, Reason, MARKET_API_PATH,
5};
6
7use crate::{web::default_on_timeout, web::WebClient, web::WebInterface, Result};
8use chrono::{DateTime, TimeZone, Utc};
9use std::fmt::Display;
10
11/// Bindings for Provider part of the Market API.
12#[derive(Clone)]
13pub struct MarketProviderApi {
14 client: WebClient,
15}
16
17impl WebInterface for MarketProviderApi {
18 const API_URL_ENV_VAR: &'static str = crate::market::MARKET_URL_ENV_VAR;
19 const API_SUFFIX: &'static str = MARKET_API_PATH;
20
21 fn from_client(client: WebClient) -> Self {
22 MarketProviderApi { client }
23 }
24}
25
26impl MarketProviderApi {
27 /// Publish Provider’s service capabilities (`Offer`) on the market to declare an
28 /// interest in Demands meeting specified criteria.
29 pub async fn subscribe(&self, offer: &NewOffer) -> Result<String> {
30 self.client.post("offers").send_json(&offer).json().await
31 }
32
33 /// Fetches all active Offers which have been published by the Provider.
34 pub async fn get_offers(&self) -> Result<Vec<Offer>> {
35 self.client.get("offers").send().json().await
36 }
37
38 /// Stop subscription by invalidating a previously published Offer.
39 ///
40 /// Stop receiving Proposals.
41 /// **Note**: this will terminate all pending `collect_demands` calls on this subscription.
42 /// This implies, that client code should not `unsubscribe_offer` before it has received
43 /// all expected/useful inputs from `collect_demands`.
44 pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
45 let url = url_format!("offers/{subscription_id}");
46 self.client.delete(&url).send().json().await
47 }
48
49 /// Get events which have arrived from the market in response to the Offer
50 /// published by the Provider via [`subscribe`](#method.subscribe).
51 /// Returns collection of at most `max_events` `ProviderEvents` or times out.
52 ///
53 /// This is a blocking operation. It will not return until there is at
54 /// least one new event.
55 ///
56 /// Returns Proposal related events:
57 ///
58 /// * `ProposalEvent` - Indicates that there is new Demand Proposal for
59 /// this Offer.
60 ///
61 /// * `ProposalRejectedEvent` - Indicates that the Requestor has rejected
62 /// our previous Proposal related to this Offer. This effectively ends a
63 /// Negotiation chain - it explicitly indicates that the sender will not
64 /// create another counter-Proposal.
65 ///
66 /// * `AgreementEvent` - Indicates that the Requestor is accepting our
67 /// previous Proposal and ask for our approval of the Agreement.
68 ///
69 /// * `PropertyQueryEvent` - not supported yet.
70 ///
71 /// **Note**: When `collectOffers` is waiting, simultaneous call to
72 /// `unsubscribeDemand` on the same `subscriptionId` should result in
73 /// "Subscription does not exist" error returned from `collectOffers`.
74 ///
75 /// **Note**: Specification requires this endpoint to support list of
76 /// specific Proposal Ids to listen for messages related only to specific
77 /// Proposals. This is not covered yet.
78 #[rustfmt::skip]
79 pub async fn collect(
80 &self,
81 subscription_id: &str,
82 timeout: Option<f32>,
83 max_events: Option<i32>,
84 ) -> Result<Vec<ProviderEvent>> {
85 let url = url_format!(
86 "offers/{subscription_id}/events",
87 #[query] timeout,
88 #[query] max_events,
89 );
90
91 self.client.get(&url).send().json().await.or_else(default_on_timeout)
92 }
93
94 /// Fetches Proposal (Demand) with given id.
95 pub async fn get_proposal(&self, subscription_id: &str, proposal_id: &str) -> Result<Proposal> {
96 let url = url_format!("offers/{subscription_id}/proposals/{proposal_id}",);
97 self.client.get(&url).send().json().await
98 }
99
100 /// Rejects Proposal (Demand).
101 ///
102 /// Effectively ends a Negotiation chain - it explicitly indicates that
103 /// the sender will not create another counter-Proposal.
104 pub async fn reject_proposal(
105 &self,
106 subscription_id: &str,
107 proposal_id: &str,
108 reason: &Option<Reason>,
109 ) -> Result<()> {
110 let url = url_format!("offers/{subscription_id}/proposals/{proposal_id}/reject",);
111 self.client.post(&url).send_json(&reason).json().await
112 }
113
114 /// Responds with a bespoke Offer to received Demand.
115 /// Creates and sends a modified version of original Offer (a
116 /// counter-proposal) adjusted to previously received Proposal (ie. Demand).
117 /// Changes Proposal state to `Draft`. Returns created Proposal id.
118 pub async fn counter_proposal(
119 &self,
120 offer_proposal: &NewProposal,
121 subscription_id: &str,
122 proposal_id: &str,
123 ) -> Result<String> {
124 let url = url_format!("offers/{subscription_id}/proposals/{proposal_id}",);
125 self.client
126 .post(&url)
127 .send_json(&offer_proposal)
128 .json()
129 .await
130 }
131
132 /// Approves Agreement proposed by the Reqestor.
133 ///
134 /// This is a blocking operation. The call may be aborted by Provider caller
135 /// code. After the call is aborted or timed out, another `approve_agreement`
136 /// call can be raised on the same `agreement_id`.
137 ///
138 /// It returns one of the following options:
139 ///
140 /// * `Ok` Agreement approved. Indicates that the approved Agreement has been
141 /// successfully delivered to the Requestor and acknowledged.
142 /// - The Requestor side has been notified about the Provider’s commitment.
143 /// - The Provider is now ready to accept a request to start an Activity.
144 /// - The Requestor’s corresponding `wait_for_approval` call returns `Ok`
145 /// (Approved) **after** this endpoint on the Provider side.
146 ///
147 /// * `Err` - Indicates that Agreement is not approved.
148 /// - `408` Agreement not approved within given timeout. Try again.
149 /// - `410` Agreement approval failed permanently.
150 /// Attached `ErrorMessage` contains further details:
151 /// - `Rejected` - Indicates that the Provider himself has already
152 /// called `reject_agreement`.
153 /// - `Cancelled` - Indicates that before Provider approved this Agreement,
154 /// the Requestor has called `cancel_agreement`, thus invalidating the
155 /// Agreement. The Provider may attempt to return to the Negotiation phase
156 /// by sending a new Proposal.
157 /// - `Expired` - Indicates that Agreement validity period elapsed and it was
158 /// not approved, rejected nor cancelled.
159 /// - `Terminated` - Indicates that Agreement is already terminated.
160 #[rustfmt::skip]
161 pub async fn approve_agreement(
162 &self,
163 agreement_id: &str,
164 app_session_id: Option<String>,
165 timeout: Option<f32>,
166 ) -> Result<()> {
167 let url = url_format!(
168 "agreements/{agreement_id}/approve",
169 #[query] app_session_id,
170 #[query] timeout,
171 );
172 self.client.post(&url).send().json().await
173 }
174
175 /// Rejects Agreement proposed by the Requestor.
176 ///
177 /// The Requestor side is notified about the Provider’s decision to reject
178 /// a negotiated agreement. This effectively stops the Agreement handshake.
179 ///
180 /// **Note**: Mutually exclusive with `approve_agreement`.
181 pub async fn reject_agreement(
182 &self,
183 agreement_id: &str,
184 reason: &Option<Reason>,
185 ) -> Result<()> {
186 let url = url_format!("agreements/{agreement_id}/reject");
187 self.client.post(&url).send_json(&reason).json().await
188 }
189
190 /// Terminates approved Agreement.
191 pub async fn terminate_agreement(
192 &self,
193 agreement_id: &str,
194 reason: &Option<Reason>,
195 ) -> Result<()> {
196 let url = url_format!("agreements/{agreement_id}/terminate");
197 self.client.post(&url).send_json(&reason).json().await
198 }
199
200 /// Lists agreements
201 ///
202 /// Supports filtering by:
203 /// * state
204 /// * creation date
205 /// * app session id
206 pub async fn list_agreements(
207 &self,
208 state: Option<State>,
209 before_date: Option<DateTime<Utc>>,
210 after_date: Option<DateTime<Utc>>,
211 app_session_id: Option<String>,
212 ) -> Result<Vec<AgreementListEntry>> {
213 let url = url_format!(
214 "agreements",
215 #[query]
216 state,
217 #[query]
218 before_date,
219 #[query]
220 after_date,
221 #[query]
222 app_session_id,
223 );
224 self.client.get(&url).send().json().await
225 }
226
227 /// Fetches agreement with given agreement id.
228 pub async fn get_agreement(&self, agreement_id: &str) -> Result<Agreement> {
229 let url = url_format!("agreements/{agreement_id}");
230 self.client.get(&url).send().json().await
231 }
232
233 /// Collects events related to an Agreement.
234 ///
235 /// This is a blocking operation. It will not return until there is
236 /// at least one new event. All events are appearing on both sides equally.
237 ///
238 /// Returns Agreement related events:
239 ///
240 /// * `AgreementApprovedEvent` - Indicates that the Agreement has been
241 /// approved by the Provider.
242 /// - The Provider is now ready to accept a request to start an
243 /// Activity as described in the negotiated agreement.
244 /// - The Providers’s corresponding `approveAgreement` call
245 /// returns `Approved` after this event is emitted.
246 ///
247 /// * `AgreementRejectedEvent` - Indicates that the Provider has called
248 /// `rejectAgreement`, which effectively stops the Agreement handshake.
249 /// The Requestor may attempt to return to the Negotiation phase by
250 /// sending a new Proposal.
251 ///
252 /// * `AgreementCancelledEvent` - Indicates that the Requestor has called
253 /// `cancelAgreement`, which effectively stops the Agreement handshake.
254 ///
255 /// * `AgreementTerminatedEvent` - Indicates that the Agreement has been
256 /// terminated by specified party (contains signature).
257 #[rustfmt::skip]
258 pub async fn collect_agreement_events<Tz>(
259 &self,
260 timeout: Option<f32>,
261 after_timestamp: Option<&DateTime<Tz>>,
262 max_events: Option<i32>,
263 app_session_id: Option<String>,
264 ) -> Result<Vec<AgreementOperationEvent>>
265 where
266 Tz: TimeZone,
267 Tz::Offset: Display,
268 {
269 let after_timestamp = after_timestamp.map(|dt| dt.to_rfc3339());
270 let url = url_format!(
271 "agreementEvents",
272 #[query] timeout,
273 #[query] after_timestamp,
274 #[query] max_events,
275 #[query] app_session_id,
276 );
277 self.client.get(&url).send().json().await.or_else(default_on_timeout)
278 }
279}