throttle_client/
client.rs

1use crate::error::Error;
2use humantime::format_duration;
3use reqwest::{self, IntoUrl, Response, StatusCode, Url};
4use serde::{Deserialize, Serialize};
5use std::{collections::HashMap, time::Duration};
6
7/// Returns an error for http status translating domain specific errors.
8pub async fn error_for_status(response: Response) -> Result<Response, Error> {
9    // Bad Request
10    match response.status() {
11        StatusCode::BAD_REQUEST => {
12            // Since this is the client code which made the erroneous request, it is
13            // supposedly caused by wrong arguments passed to this code by the
14            // application. Let's forward the error as an exception, so the App
15            // developer can act on it.
16            let message = response.text().await?;
17            // if message == "Unknown peer" {
18            //     Error::UnknowPeer
19            // }
20            Err(Error::DomainError(message))
21        }
22        // This is returned by the server e.g. if requesting a lock with a count higher than max.
23        StatusCode::CONFLICT => {
24            let message = response.text().await?;
25            Err(Error::DomainError(message))
26        }
27        _ => Ok(response.error_for_status()?),
28    }
29}
30
31/// Send http requests to a throttle server. Only concerned with sending correct HTTP requests the
32/// Throttle server understands. Not a higher level locking library.
33#[derive(Debug, Clone)]
34pub struct Client {
35    client: reqwest::Client,
36    url: Url,
37}
38
39impl Client {
40    /// A new throttle Client instance.
41    pub fn new(url: impl IntoUrl) -> Result<Self, Error> {
42        let new_instance = Self {
43            client: reqwest::Client::new(),
44            url: url.into_url()?,
45        };
46        Ok(new_instance)
47    }
48
49    /// Register a new peer with the server.
50    ///
51    /// # Parameters
52    ///
53    /// * `expires_in`: Retention time of the peer on the server. A peer is used to acquire locks
54    ///   and keep the leases to them alive. A Peer owns the locks which it acquires and releasing
55    ///   it is going to release the owned locks as well.
56    ///
57    /// Every call to `new_peer` should be matched by a call to `release`.
58    ///
59    /// Creating a peer `new_peer` is separated from `acquire` in an extra Request mostly to make
60    /// `acquire` idempotent. This prevents a call to acquire from acquiring more than one
61    /// semaphore in case it is repeated due to a timeout.
62    pub async fn new_peer(&self, expires_in: Duration) -> Result<u64, Error> {
63        let response = self
64            .client
65            .post(self.url.join("new_peer").unwrap())
66            .json(&ExpiresIn { expires_in })
67            .send()
68            .await?;
69        let peer_id = error_for_status(response).await?.json().await?;
70        Ok(peer_id)
71    }
72
73    /// Deletes the peer on the throttle server.
74    ///
75    /// This is important to unblock other clients which may be waiting for the semaphore remainder
76    /// to increase.
77    pub async fn release(&self, peer_id: u64) -> Result<(), Error> {
78        let mut url = self.url.clone();
79        url.path_segments_mut()
80            .unwrap()
81            .extend(["peers", &peer_id.to_string()]);
82        let response = self.client.delete(url).send().await?;
83        error_for_status(response).await?;
84        Ok(())
85    }
86
87    /// Acquire a lock from the server.
88    ///
89    /// Every call to `acquire` should be matched by a call to `release`. Check out
90    /// `lock` which as contextmanager does this for you.
91    ///
92    /// # Parameters
93    ///
94    /// * `semaphore`: Name of the semaphore to be acquired.
95    /// * `count`: The count of the lock. A larger count represents a larger 'piece' of the resource
96    ///   under procection.
97    /// * `block_for`: The request returns as soon as the lock could be acquireod or after the
98    ///   duration has elapsed, even if the lock could not be acquired. If set to `None`, the
99    ///   request returns immediatly. Please note that this function is asynchronous and does not
100    ///   block. The blocking does refer to the actual request. As such `block_for` represents an
101    ///   upper bound after which the returned futures poll method is going to return `Ready`.
102    /// * `expires_in`: The amount of time the remains valid. Can be prolonged by calling heartbeat.
103    ///   After the time has passed the lock is considered released on the server side.
104    ///
105    /// # Return
106    ///
107    /// `true` if the lock is active, `false` otherwise.
108    pub async fn acquire(
109        &self,
110        peer_id: u64,
111        semaphore: &str,
112        count: u32,
113        expires_in: Option<Duration>,
114        block_for: Option<Duration>,
115    ) -> Result<bool, Error> {
116        let mut url = self.url.clone();
117        url.path_segments_mut()
118            .unwrap()
119            .extend(["peers", &peer_id.to_string(), semaphore]);
120
121        {
122            let mut query = url.query_pairs_mut();
123            query.extend_pairs(expires_in.map(|d| ("expires_in", format_duration(d).to_string())));
124            query.extend_pairs(block_for.map(|d| ("block_for", format_duration(d).to_string())));
125        }
126        let response = self.client.put(url).json(&count).send().await?;
127        let response = error_for_status(response).await?;
128        match response.status() {
129            StatusCode::OK => Ok(true),
130            StatusCode::ACCEPTED => Ok(false),
131            _ => Err(Error::UnexpectedResponse),
132        }
133    }
134
135    /// Ask the server wether the peer has acquired all its locks.
136    pub async fn is_acquired(&self, peer_id: u64) -> Result<bool, Error> {
137        let mut url = self.url.clone();
138        url.path_segments_mut()
139            .unwrap()
140            .extend(["peers", &peer_id.to_string(), "is_acquired"]);
141        let response = self.client.get(url).send().await?;
142        let acquired = error_for_status(response).await?.json().await?;
143        Ok(acquired)
144    }
145
146    /// The curent semaphore count. I.e. the number of available leases
147    ///
148    /// This is equal to the full semaphore count minus the current count. This number
149    /// could become negative, if the semaphores have been overcommitted (due to
150    /// previously reoccuring leases previously considered dead).
151    pub async fn remainder(&self, semaphore: &str) -> Result<i64, Error> {
152        let mut url = self.url.join("remainder").unwrap();
153        url.query_pairs_mut().append_pair("semaphore", semaphore);
154
155        let response = self.client.get(url).send().await?;
156        let remainder = error_for_status(response).await?.json().await?;
157
158        Ok(remainder)
159    }
160
161    /// Sends a restore request to the server.
162    ///
163    /// This request creates a peer with the specified peer id. The peer is also going to hold all
164    /// the locks passed in acquired, even if this means exceeding the semaphore count.
165    pub async fn restore(
166        &self,
167        peer_id: u64,
168        acquired: &HashMap<String, u32>,
169        expires_in: Duration,
170    ) -> Result<(), Error> {
171        let url = self.url.join("restore").unwrap();
172        let response = self
173            .client
174            .post(url)
175            .json(&Restore {
176                expires_in,
177                peer_id,
178                acquired,
179            })
180            .send()
181            .await?;
182        error_for_status(response).await?;
183        Ok(())
184    }
185
186    /// Send a PUT request to the server updating the expiration timestamp
187    pub async fn heartbeat(&self, peer_id: u64, expires_in: Duration) -> Result<(), Error> {
188        let mut url = self.url.clone();
189        url.path_segments_mut()
190            .unwrap()
191            .extend(["peers", &peer_id.to_string()]);
192        let response = self
193            .client
194            .put(url)
195            .json(&ExpiresIn { expires_in })
196            .send()
197            .await?;
198        error_for_status(response).await?;
199        Ok(())
200    }
201
202    /// Release a lock to a semaphore for a specific peer
203    pub async fn release_lock(&self, peer_id: u64, semaphore: &str) -> Result<(), Error> {
204        let mut url = self.url.clone();
205        url.path_segments_mut()
206            .unwrap()
207            .extend(["peers", &peer_id.to_string(), semaphore]);
208        let response = self.client.delete(url).send().await?;
209        error_for_status(response).await?;
210        Ok(())
211    }
212
213    pub async fn list_of_peers(&self) -> Result<Vec<PeerDescription>, Error> {
214        let mut url = self.url.clone();
215        url.path_segments_mut()
216            .unwrap()
217            // Use empty string to achieve trailing slash (`/`) in path
218            .extend(["peers", ""]);
219        let response = self.client.get(url).send().await?;
220        let peers = error_for_status(response).await?.json().await?;
221        Ok(peers)
222    }
223}
224
225/// The properties and state of a peer, as returned by throttle then listing peers.
226#[derive(Deserialize)]
227pub struct PeerDescription {}
228
229/// Used as a query parameter in requests. E.g. `?expires_in=5m`.
230#[derive(Serialize)]
231struct ExpiresIn {
232    #[serde(with = "humantime_serde")]
233    expires_in: Duration,
234}
235
236#[derive(Serialize)]
237struct Restore<'a> {
238    #[serde(with = "humantime_serde")]
239    expires_in: Duration,
240    peer_id: u64,
241    acquired: &'a HashMap<String, u32>,
242}