ya_client/market/
provider.rs

1//! Provider part of the Market API
2use ya_client_model::market::{
3    agreement::State, Agreement, AgreementListEntry, AgreementOperationEvent, NewOffer,
4    NewProposal, Offer, Proposal, ProviderEvent, Reason, MARKET_API_PATH,
5};
6
7use crate::{web::default_on_timeout, web::WebClient, web::WebInterface, Result};
8use chrono::{DateTime, TimeZone, Utc};
9use std::fmt::Display;
10
11/// Bindings for Provider part of the Market API.
12#[derive(Clone)]
13pub struct MarketProviderApi {
14    client: WebClient,
15}
16
17impl WebInterface for MarketProviderApi {
18    const API_URL_ENV_VAR: &'static str = crate::market::MARKET_URL_ENV_VAR;
19    const API_SUFFIX: &'static str = MARKET_API_PATH;
20
21    fn from_client(client: WebClient) -> Self {
22        MarketProviderApi { client }
23    }
24}
25
26impl MarketProviderApi {
27    /// Publish Provider’s service capabilities (`Offer`) on the market to declare an
28    /// interest in Demands meeting specified criteria.
29    pub async fn subscribe(&self, offer: &NewOffer) -> Result<String> {
30        self.client.post("offers").send_json(&offer).json().await
31    }
32
33    /// Fetches all active Offers which have been published by the Provider.
34    pub async fn get_offers(&self) -> Result<Vec<Offer>> {
35        self.client.get("offers").send().json().await
36    }
37
38    /// Stop subscription by invalidating a previously published Offer.
39    ///
40    /// Stop receiving Proposals.
41    /// **Note**: this will terminate all pending `collect_demands` calls on this subscription.
42    /// This implies, that client code should not `unsubscribe_offer` before it has received
43    /// all expected/useful inputs from `collect_demands`.
44    pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
45        let url = url_format!("offers/{subscription_id}");
46        self.client.delete(&url).send().json().await
47    }
48
49    /// Get events which have arrived from the market in response to the Offer
50    /// published by the Provider via  [`subscribe`](#method.subscribe).
51    /// Returns collection of at most `max_events` `ProviderEvents` or times out.
52    ///
53    /// This is a blocking operation. It will not return until there is at
54    /// least one new event.
55    ///
56    /// Returns Proposal related events:
57    ///
58    /// * `ProposalEvent` - Indicates that there is new Demand Proposal for
59    /// this Offer.
60    ///
61    /// * `ProposalRejectedEvent` - Indicates that the Requestor has rejected
62    ///   our previous Proposal related to this Offer. This effectively ends a
63    ///   Negotiation chain - it explicitly indicates that the sender will not
64    ///   create another counter-Proposal.
65    ///
66    /// * `AgreementEvent` - Indicates that the Requestor is accepting our
67    ///   previous Proposal and ask for our approval of the Agreement.
68    ///
69    /// * `PropertyQueryEvent` - not supported yet.
70    ///
71    /// **Note**: When `collectOffers` is waiting, simultaneous call to
72    /// `unsubscribeDemand` on the same `subscriptionId` should result in
73    /// "Subscription does not exist" error returned from `collectOffers`.
74    ///
75    /// **Note**: Specification requires this endpoint to support list of
76    /// specific Proposal Ids to listen for messages related only to specific
77    /// Proposals. This is not covered yet.
78    #[rustfmt::skip]
79    pub async fn collect(
80        &self,
81        subscription_id: &str,
82        timeout: Option<f32>,
83        max_events: Option<i32>,
84    ) -> Result<Vec<ProviderEvent>> {
85        let url = url_format!(
86            "offers/{subscription_id}/events",
87            #[query] timeout,
88            #[query] max_events,
89        );
90
91        self.client.get(&url).send().json().await.or_else(default_on_timeout)
92    }
93
94    /// Fetches Proposal (Demand) with given id.
95    pub async fn get_proposal(&self, subscription_id: &str, proposal_id: &str) -> Result<Proposal> {
96        let url = url_format!("offers/{subscription_id}/proposals/{proposal_id}",);
97        self.client.get(&url).send().json().await
98    }
99
100    /// Rejects Proposal (Demand).
101    ///
102    /// Effectively ends a Negotiation chain - it explicitly indicates that
103    /// the sender will not create another counter-Proposal.
104    pub async fn reject_proposal(
105        &self,
106        subscription_id: &str,
107        proposal_id: &str,
108        reason: &Option<Reason>,
109    ) -> Result<()> {
110        let url = url_format!("offers/{subscription_id}/proposals/{proposal_id}/reject",);
111        self.client.post(&url).send_json(&reason).json().await
112    }
113
114    /// Responds with a bespoke Offer to received Demand.
115    /// Creates and sends a modified version of original Offer (a
116    /// counter-proposal) adjusted to previously received Proposal (ie. Demand).
117    /// Changes Proposal state to `Draft`. Returns created Proposal id.
118    pub async fn counter_proposal(
119        &self,
120        offer_proposal: &NewProposal,
121        subscription_id: &str,
122        proposal_id: &str,
123    ) -> Result<String> {
124        let url = url_format!("offers/{subscription_id}/proposals/{proposal_id}",);
125        self.client
126            .post(&url)
127            .send_json(&offer_proposal)
128            .json()
129            .await
130    }
131
132    /// Approves Agreement proposed by the Reqestor.
133    ///
134    /// This is a blocking operation. The call may be aborted by Provider caller
135    /// code. After the call is aborted or timed out, another `approve_agreement`
136    /// call can be raised on the same `agreement_id`.
137    ///
138    /// It returns one of the following options:
139    ///
140    /// * `Ok` Agreement approved. Indicates that the approved Agreement has been
141    ///  successfully delivered to the Requestor and acknowledged.
142    ///    - The Requestor side has been notified about the Provider’s commitment.
143    ///    - The Provider is now ready to accept a request to start an Activity.
144    ///    - The Requestor’s corresponding `wait_for_approval` call returns `Ok`
145    ///    (Approved) **after** this endpoint on the Provider side.
146    ///
147    /// * `Err` - Indicates that Agreement is not approved.
148    ///   - `408` Agreement not approved within given timeout. Try again.
149    ///   - `410` Agreement approval failed permanently.
150    /// Attached `ErrorMessage` contains further details:
151    ///   - `Rejected` - Indicates that the Provider himself has already
152    ///     called `reject_agreement`.
153    ///   - `Cancelled` - Indicates that before Provider approved this Agreement,
154    ///     the Requestor has called `cancel_agreement`, thus invalidating the
155    ///     Agreement. The Provider may attempt to return to the Negotiation phase
156    ///     by sending a new Proposal.
157    ///   - `Expired` - Indicates that Agreement validity period elapsed and it was
158    ///     not approved, rejected nor cancelled.
159    ///   - `Terminated` - Indicates that Agreement is already terminated.
160    #[rustfmt::skip]
161    pub async fn approve_agreement(
162        &self,
163        agreement_id: &str,
164        app_session_id: Option<String>,
165        timeout: Option<f32>,
166    ) -> Result<()> {
167        let url = url_format!(
168            "agreements/{agreement_id}/approve",
169            #[query] app_session_id,
170            #[query] timeout,
171        );
172        self.client.post(&url).send().json().await
173    }
174
175    /// Rejects Agreement proposed by the Requestor.
176    ///
177    /// The Requestor side is notified about the Provider’s decision to reject
178    /// a negotiated agreement. This effectively stops the Agreement handshake.
179    ///
180    /// **Note**: Mutually exclusive with `approve_agreement`.
181    pub async fn reject_agreement(
182        &self,
183        agreement_id: &str,
184        reason: &Option<Reason>,
185    ) -> Result<()> {
186        let url = url_format!("agreements/{agreement_id}/reject");
187        self.client.post(&url).send_json(&reason).json().await
188    }
189
190    /// Terminates approved Agreement.
191    pub async fn terminate_agreement(
192        &self,
193        agreement_id: &str,
194        reason: &Option<Reason>,
195    ) -> Result<()> {
196        let url = url_format!("agreements/{agreement_id}/terminate");
197        self.client.post(&url).send_json(&reason).json().await
198    }
199
200    /// Lists agreements
201    ///
202    /// Supports filtering by:
203    /// * state
204    /// * creation date
205    /// * app session id
206    pub async fn list_agreements(
207        &self,
208        state: Option<State>,
209        before_date: Option<DateTime<Utc>>,
210        after_date: Option<DateTime<Utc>>,
211        app_session_id: Option<String>,
212    ) -> Result<Vec<AgreementListEntry>> {
213        let url = url_format!(
214            "agreements",
215            #[query]
216            state,
217            #[query]
218            before_date,
219            #[query]
220            after_date,
221            #[query]
222            app_session_id,
223        );
224        self.client.get(&url).send().json().await
225    }
226
227    /// Fetches agreement with given agreement id.
228    pub async fn get_agreement(&self, agreement_id: &str) -> Result<Agreement> {
229        let url = url_format!("agreements/{agreement_id}");
230        self.client.get(&url).send().json().await
231    }
232
233    /// Collects events related to an Agreement.
234    ///
235    /// This is a blocking operation. It will not return until there is
236    /// at least one new event. All events are appearing on both sides equally.
237    ///
238    /// Returns Agreement related events:
239    ///
240    /// * `AgreementApprovedEvent` - Indicates that the Agreement has been
241    ///   approved by the Provider.
242    ///     - The Provider is now ready to accept a request to start an
243    ///       Activity as described in the negotiated agreement.
244    ///     - The Providers’s corresponding `approveAgreement` call
245    ///       returns `Approved` after this event is emitted.
246    ///
247    /// * `AgreementRejectedEvent` - Indicates that the Provider has called
248    ///   `rejectAgreement`, which effectively stops the Agreement handshake.
249    ///   The Requestor may attempt to return to the Negotiation phase by
250    ///   sending a new Proposal.
251    ///
252    /// * `AgreementCancelledEvent` - Indicates that the Requestor has called
253    ///   `cancelAgreement`, which effectively stops the Agreement handshake.
254    ///
255    /// * `AgreementTerminatedEvent` - Indicates that the Agreement has been
256    ///   terminated by specified party (contains signature).
257    #[rustfmt::skip]
258    pub async fn collect_agreement_events<Tz>(
259        &self,
260        timeout: Option<f32>,
261        after_timestamp: Option<&DateTime<Tz>>,
262        max_events: Option<i32>,
263        app_session_id: Option<String>,
264    ) -> Result<Vec<AgreementOperationEvent>>
265        where
266            Tz: TimeZone,
267            Tz::Offset: Display,
268    {
269        let after_timestamp = after_timestamp.map(|dt| dt.to_rfc3339());
270        let url = url_format!(
271            "agreementEvents",
272            #[query] timeout,
273            #[query] after_timestamp,
274            #[query] max_events,
275            #[query] app_session_id,
276        );
277        self.client.get(&url).send().json().await.or_else(default_on_timeout)
278    }
279}