ya_client/market/
requestor.rs

1//! Requestor part of the Market API
2use ya_client_model::market::{
3    agreement::State, Agreement, AgreementListEntry, AgreementOperationEvent, AgreementProposal,
4    Demand, NewDemand, NewProposal, Offer, Proposal, Reason, RequestorEvent,
5};
6
7use crate::{web::default_on_timeout, web::WebClient, web::WebInterface, Result};
8use chrono::{DateTime, TimeZone, Utc};
9use std::fmt::Display;
10use ya_client_model::market::scan::NewScan;
11use ya_client_model::NodeId;
12
13/// Bindings for Requestor part of the Market API.
14#[derive(Clone)]
15pub struct MarketRequestorApi {
16    client: WebClient,
17}
18
19impl WebInterface for MarketRequestorApi {
20    const API_URL_ENV_VAR: &'static str = crate::market::MARKET_URL_ENV_VAR;
21    const API_SUFFIX: &'static str = ya_client_model::market::MARKET_API_PATH;
22
23    fn from_client(client: WebClient) -> Self {
24        MarketRequestorApi { client }
25    }
26}
27
28impl MarketRequestorApi {
29    /// Publishes Requestor capabilities via Demand.
30    ///
31    /// Demand object can be considered an "open" or public Demand, as it is not directed
32    /// at a specific Provider, but rather is sent to the market so that the matching
33    /// mechanism implementation can associate relevant Offers.
34    ///
35    /// **Note**: it is an "atomic" operation, ie. as soon as Subscription is placed,
36    /// the Demand is published on the market.
37    pub async fn subscribe(&self, demand: &NewDemand) -> Result<String> {
38        self.client.post("demands").send_json(&demand).json().await
39    }
40
41    /// Fetches all active Demands which have been published by the Requestor.
42    pub async fn get_demands(&self) -> Result<Vec<Demand>> {
43        self.client.get("demands").send().json().await
44    }
45
46    /// Stop subscription by invalidating a previously published Demand.
47    pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
48        let url = url_format!("demands/{subscription_id}");
49        self.client.delete(&url).send().json().await
50    }
51
52    /// Get events which have arrived from the market in response to the Demand
53    /// published by the Requestor via  [`subscribe`](#method.subscribe).
54    /// Returns collection of at most `max_events` `RequestorEvents` or times out.
55    ///
56    /// This is a blocking operation. It will not return until there is at
57    /// least one new event.
58    ///
59    /// Returns Proposal related events:
60    ///
61    /// * `ProposalEvent` - Indicates that there is new Offer Proposal for
62    /// this Demand.
63    ///
64    /// * `ProposalRejectedEvent` - Indicates that the Provider has rejected
65    /// our previous Proposal related to this Demand. This effectively ends a
66    /// Negotiation chain - it explicitly indicates that the sender will not
67    /// create another counter-Proposal.
68    ///
69    /// * `PropertyQueryEvent` - not supported yet.
70    ///
71    /// **Note**: When `collectOffers` is waiting, simultaneous call to
72    /// `unsubscribeDemand` on the same `subscriptionId` should result in
73    /// "Subscription does not exist" error returned from `collectOffers`.
74    ///
75    /// **Note**: Specification requires this endpoint to support list of
76    /// specific Proposal Ids to listen for messages related only to specific
77    /// Proposals. This is not covered yet.
78    #[rustfmt::skip]
79    pub async fn collect(
80        &self,
81        subscription_id: &str,
82        timeout: Option<f32>,
83        max_events: Option<i32>,
84    ) -> Result<Vec<RequestorEvent>> {
85        let url = url_format!(
86            "demands/{subscription_id}/events",
87            #[query] timeout,
88            #[query] max_events,
89        );
90        self.client.get(&url).send().json().await.or_else(default_on_timeout)
91    }
92
93    /// Responds with a bespoke Demand to received Offer.
94    pub async fn counter_proposal(
95        &self,
96        demand_proposal: &NewProposal,
97        subscription_id: &str,
98        proposal_id: &str,
99    ) -> Result<String> {
100        let url = url_format!("demands/{subscription_id}/proposals/{proposal_id}",);
101        self.client
102            .post(&url)
103            .send_json(&demand_proposal)
104            .json()
105            .await
106    }
107
108    /// Fetches Proposal (Offer) with given id.
109    pub async fn get_proposal(&self, subscription_id: &str, proposal_id: &str) -> Result<Proposal> {
110        let url = url_format!("demands/{subscription_id}/proposals/{proposal_id}",);
111        self.client.get(&url).send().json().await
112    }
113
114    /// Rejects Proposal (Offer)
115    ///
116    /// Effectively ends a Negotiation chain - it explicitly indicates that
117    /// the sender will not create another counter-Proposal.
118    pub async fn reject_proposal(
119        &self,
120        subscription_id: &str,
121        proposal_id: &str,
122        reason: &Option<Reason>,
123    ) -> Result<()> {
124        let url = url_format!("demands/{subscription_id}/proposals/{proposal_id}/reject",);
125        self.client.post(&url).send_json(&reason).json().await
126    }
127
128    /// Creates Agreement from selected Proposal.
129    ///
130    /// Initiates the Agreement handshake phase.
131    ///
132    /// Formulates an Agreement artifact from the Proposal indicated by the
133    /// received Proposal Id.
134    ///
135    /// The Approval Expiry Date is added to Agreement artifact and implies
136    /// the effective timeout on the whole Agreement Confirmation sequence.
137    ///
138    /// A successful call to `create_agreement` shall immediately be followed
139    /// by a `confirm_agreement` and `wait_for_approval` call in order to listen
140    /// for responses from the Provider.
141    ///
142    /// **Note**: Moves given Proposal to `Approved` state.
143    pub async fn create_agreement(&self, agreement: &AgreementProposal) -> Result<String> {
144        self.client
145            .post("agreements")
146            .send_json(&agreement)
147            .json()
148            .await
149    }
150
151    /// Lists agreements
152    ///
153    /// Supports filtering by:
154    /// * state
155    /// * creation date
156    /// * app session id
157    pub async fn list_agreements(
158        &self,
159        state: Option<State>,
160        before_date: Option<DateTime<Utc>>,
161        after_date: Option<DateTime<Utc>>,
162        app_session_id: Option<String>,
163    ) -> Result<Vec<AgreementListEntry>> {
164        let url = url_format!(
165            "agreements",
166            #[query]
167            state,
168            #[query]
169            before_date,
170            #[query]
171            after_date,
172            #[query]
173            app_session_id,
174        );
175        self.client.get(&url).send().json().await
176    }
177
178    /// Fetches agreement with given agreement id.
179    pub async fn get_agreement(&self, agreement_id: &str) -> Result<Agreement> {
180        let url = url_format!("agreements/{agreement_id}");
181        self.client.get(&url).send().json().await
182    }
183
184    /// Sends Agreement draft to the Provider.
185    /// Signs Agreement self-created via `create_agreement` and sends it to the Provider.
186    #[rustfmt::skip]
187    pub async fn confirm_agreement(
188        &self,
189        agreement_id: &str,
190        app_session_id: Option<String>,
191    ) -> Result<()> {
192        let url = url_format!(
193            "agreements/{agreement_id}/confirm",
194            #[query] app_session_id,
195        );
196        self.client.post(&url).send().json().await
197    }
198
199    /// Waits for Agreement approval by the Provider.
200    ///
201    /// This is a blocking operation. The call may be aborted by Requestor caller
202    /// code. After the call is aborted or timed out, another `wait_for_approval`
203    /// call can be raised on the same `agreement_id`.
204    ///
205    /// It returns one of the following options:
206    ///
207    /// * `Ok` Agreement approved by the Provider.
208    ///  The Providers’s corresponding `approveAgreement` call returns `204`
209    ///  (Approved) **before** this endpoint on the Requestor side.
210    ///  The Provider is now ready to accept a request to start an Activity.
211    ///
212    /// * `Err` - Indicates that Agreement is not approved.
213    ///   - `408` Agreement not approved within given timeout. Try again.
214    ///   - `409` Agreement not confirmed yet by Requestor himself.
215    ///   - `410` Agreement is not approved. This state is permanent.
216    ///
217    /// Attached `ErrorMessage` contains further details:
218    /// - `Rejected` - Indicates that the Provider has called
219    /// `rejectAgreement`, which effectively stops the Agreement handshake.
220    /// The Requestor may attempt to return to the Negotiation phase by
221    /// sending a new Proposal or to the Agreement phase by creating
222    /// new Agreement.
223    /// - `Cancelled` - Indicates that the Requestor himself has called
224    /// `cancelAgreement`, which effectively stops the Agreement handshake.
225    /// - `Expired` - Indicates that Agreement validity period elapsed and it
226    /// was not approved, rejected nor cancelled.
227    /// - `Terminated` - Indicates that Agreement is already terminated.
228    #[rustfmt::skip]
229    pub async fn wait_for_approval(
230        &self,
231        agreement_id: &str,
232        timeout: Option<f32>,
233    ) -> Result<()> {
234        let url = url_format!(
235            "agreements/{agreement_id}/wait",
236            #[query] timeout,
237        );
238        self.client.post(&url).send().json().await
239    }
240
241    /// Cancels Agreement.
242    ///
243    /// It is only possible before Requestor confirmed or Provider approved
244    /// or rejected the Agreement, and before Expiration.
245    ///
246    /// Causes the awaiting `wait_for_approval` call to return with `Cancelled` response.
247    /// Also the Provider's corresponding `approve_agreement` returns `Cancelled`.
248    pub async fn cancel_agreement(
249        &self,
250        agreement_id: &str,
251        reason: &Option<Reason>,
252    ) -> Result<()> {
253        let url = url_format!("agreements/{agreement_id}/cancel");
254        self.client.post(&url).send_json(&reason).json().await
255    }
256
257    /// Terminates approved Agreement.
258    pub async fn terminate_agreement(
259        &self,
260        agreement_id: &str,
261        reason: &Option<Reason>,
262    ) -> Result<()> {
263        let url = url_format!("agreements/{agreement_id}/terminate");
264        self.client.post(&url).send_json(&reason).json().await
265    }
266
267    /// Collects events related to an Agreement.
268    ///
269    /// This is a blocking operation. It will not return until there is
270    /// at least one new event. All events are appearing on both sides equally.
271    ///
272    /// Returns Agreement related events:
273    ///
274    /// * `AgreementApprovedEvent` - Indicates that the Agreement has been
275    ///   approved by the Provider.
276    ///     - The Provider is now ready to accept a request to start an
277    ///       Activity as described in the negotiated agreement.
278    ///     - The Providers’s corresponding `approveAgreement` call
279    ///       returns `Approved` after this event is emitted.
280    ///
281    /// * `AgreementRejectedEvent` - Indicates that the Provider has called
282    ///   `rejectAgreement`, which effectively stops the Agreement handshake.
283    ///   The Requestor may attempt to return to the Negotiation phase by
284    ///   sending a new Proposal.
285    ///
286    /// * `AgreementCancelledEvent` - Indicates that the Requestor has called
287    ///   `cancelAgreement`, which effectively stops the Agreement handshake.
288    ///
289    /// * `AgreementTerminatedEvent` - Indicates that the Agreement has been
290    ///   terminated by specified party (contains signature).
291    #[rustfmt::skip]
292    pub async fn collect_agreement_events<Tz>(
293        &self,
294        timeout: Option<f32>,
295        after_timestamp: Option<&DateTime<Tz>>,
296        max_events: Option<i32>,
297        app_session_id: Option<String>,
298    ) -> Result<Vec<AgreementOperationEvent>>
299        where
300            Tz: TimeZone,
301            Tz::Offset: Display,
302    {
303        let after_timestamp = after_timestamp.map(|dt| dt.to_rfc3339());
304        let url = url_format!(
305            "agreementEvents",
306            #[query] timeout,
307            #[query] after_timestamp,
308            #[query] max_events,
309            #[query] app_session_id,
310        );
311        self.client.get(&url).send().json().await.or_else(default_on_timeout)
312    }
313
314    pub async fn begin_scan(&self, scan_req: &NewScan) -> Result<String> {
315        self.client.post("scan").send_json(&scan_req).json().await
316    }
317
318    pub async fn collect_scan(
319        &self,
320        subscription_id: &str,
321        timeout: Option<f32>,
322        max_events: Option<usize>,
323        peer_id: Option<&NodeId>,
324    ) -> Result<Vec<Offer>> {
325        let url = url_format!(
326            "scan/{subscription_id}/events",
327            #[query]
328            timeout,
329            #[query]
330            max_events,
331            #[query]
332            peer_id
333        );
334        self.client
335            .get(&url)
336            .send()
337            .json()
338            .await
339            .or_else(default_on_timeout)
340    }
341
342    pub async fn end_scan(&self, subscription_id: &str) -> Result<()> {
343        let url = url_format!("scan/{subscription_id}");
344        self.client.delete(&url).send().json().await
345    }
346}