throttle_client/client.rs
1use crate::error::Error;
2use humantime::format_duration;
3use reqwest::{self, IntoUrl, Response, StatusCode, Url};
4use serde::{Deserialize, Serialize};
5use std::{collections::HashMap, time::Duration};
6
7/// Returns an error for http status translating domain specific errors.
8pub async fn error_for_status(response: Response) -> Result<Response, Error> {
9 // Bad Request
10 match response.status() {
11 StatusCode::BAD_REQUEST => {
12 // Since this is the client code which made the erroneous request, it is
13 // supposedly caused by wrong arguments passed to this code by the
14 // application. Let's forward the error as an exception, so the App
15 // developer can act on it.
16 let message = response.text().await?;
17 // if message == "Unknown peer" {
18 // Error::UnknowPeer
19 // }
20 Err(Error::DomainError(message))
21 }
22 // This is returned by the server e.g. if requesting a lock with a count higher than max.
23 StatusCode::CONFLICT => {
24 let message = response.text().await?;
25 Err(Error::DomainError(message))
26 }
27 _ => Ok(response.error_for_status()?),
28 }
29}
30
31/// Send http requests to a throttle server. Only concerned with sending correct HTTP requests the
32/// Throttle server understands. Not a higher level locking library.
33#[derive(Debug, Clone)]
34pub struct Client {
35 client: reqwest::Client,
36 url: Url,
37}
38
39impl Client {
40 /// A new throttle Client instance.
41 pub fn new(url: impl IntoUrl) -> Result<Self, Error> {
42 let new_instance = Self {
43 client: reqwest::Client::new(),
44 url: url.into_url()?,
45 };
46 Ok(new_instance)
47 }
48
49 /// Register a new peer with the server.
50 ///
51 /// # Parameters
52 ///
53 /// * `expires_in`: Retention time of the peer on the server. A peer is used to acquire locks
54 /// and keep the leases to them alive. A Peer owns the locks which it acquires and releasing
55 /// it is going to release the owned locks as well.
56 ///
57 /// Every call to `new_peer` should be matched by a call to `release`.
58 ///
59 /// Creating a peer `new_peer` is separated from `acquire` in an extra Request mostly to make
60 /// `acquire` idempotent. This prevents a call to acquire from acquiring more than one
61 /// semaphore in case it is repeated due to a timeout.
62 pub async fn new_peer(&self, expires_in: Duration) -> Result<u64, Error> {
63 let response = self
64 .client
65 .post(self.url.join("new_peer").unwrap())
66 .json(&ExpiresIn { expires_in })
67 .send()
68 .await?;
69 let peer_id = error_for_status(response).await?.json().await?;
70 Ok(peer_id)
71 }
72
73 /// Deletes the peer on the throttle server.
74 ///
75 /// This is important to unblock other clients which may be waiting for the semaphore remainder
76 /// to increase.
77 pub async fn release(&self, peer_id: u64) -> Result<(), Error> {
78 let mut url = self.url.clone();
79 url.path_segments_mut()
80 .unwrap()
81 .extend(["peers", &peer_id.to_string()]);
82 let response = self.client.delete(url).send().await?;
83 error_for_status(response).await?;
84 Ok(())
85 }
86
87 /// Acquire a lock from the server.
88 ///
89 /// Every call to `acquire` should be matched by a call to `release`. Check out
90 /// `lock` which as contextmanager does this for you.
91 ///
92 /// # Parameters
93 ///
94 /// * `semaphore`: Name of the semaphore to be acquired.
95 /// * `count`: The count of the lock. A larger count represents a larger 'piece' of the resource
96 /// under procection.
97 /// * `block_for`: The request returns as soon as the lock could be acquireod or after the
98 /// duration has elapsed, even if the lock could not be acquired. If set to `None`, the
99 /// request returns immediatly. Please note that this function is asynchronous and does not
100 /// block. The blocking does refer to the actual request. As such `block_for` represents an
101 /// upper bound after which the returned futures poll method is going to return `Ready`.
102 /// * `expires_in`: The amount of time the remains valid. Can be prolonged by calling heartbeat.
103 /// After the time has passed the lock is considered released on the server side.
104 ///
105 /// # Return
106 ///
107 /// `true` if the lock is active, `false` otherwise.
108 pub async fn acquire(
109 &self,
110 peer_id: u64,
111 semaphore: &str,
112 count: u32,
113 expires_in: Option<Duration>,
114 block_for: Option<Duration>,
115 ) -> Result<bool, Error> {
116 let mut url = self.url.clone();
117 url.path_segments_mut()
118 .unwrap()
119 .extend(["peers", &peer_id.to_string(), semaphore]);
120
121 {
122 let mut query = url.query_pairs_mut();
123 query.extend_pairs(expires_in.map(|d| ("expires_in", format_duration(d).to_string())));
124 query.extend_pairs(block_for.map(|d| ("block_for", format_duration(d).to_string())));
125 }
126 let response = self.client.put(url).json(&count).send().await?;
127 let response = error_for_status(response).await?;
128 match response.status() {
129 StatusCode::OK => Ok(true),
130 StatusCode::ACCEPTED => Ok(false),
131 _ => Err(Error::UnexpectedResponse),
132 }
133 }
134
135 /// Ask the server wether the peer has acquired all its locks.
136 pub async fn is_acquired(&self, peer_id: u64) -> Result<bool, Error> {
137 let mut url = self.url.clone();
138 url.path_segments_mut()
139 .unwrap()
140 .extend(["peers", &peer_id.to_string(), "is_acquired"]);
141 let response = self.client.get(url).send().await?;
142 let acquired = error_for_status(response).await?.json().await?;
143 Ok(acquired)
144 }
145
146 /// The curent semaphore count. I.e. the number of available leases
147 ///
148 /// This is equal to the full semaphore count minus the current count. This number
149 /// could become negative, if the semaphores have been overcommitted (due to
150 /// previously reoccuring leases previously considered dead).
151 pub async fn remainder(&self, semaphore: &str) -> Result<i64, Error> {
152 let mut url = self.url.join("remainder").unwrap();
153 url.query_pairs_mut().append_pair("semaphore", semaphore);
154
155 let response = self.client.get(url).send().await?;
156 let remainder = error_for_status(response).await?.json().await?;
157
158 Ok(remainder)
159 }
160
161 /// Sends a restore request to the server.
162 ///
163 /// This request creates a peer with the specified peer id. The peer is also going to hold all
164 /// the locks passed in acquired, even if this means exceeding the semaphore count.
165 pub async fn restore(
166 &self,
167 peer_id: u64,
168 acquired: &HashMap<String, u32>,
169 expires_in: Duration,
170 ) -> Result<(), Error> {
171 let url = self.url.join("restore").unwrap();
172 let response = self
173 .client
174 .post(url)
175 .json(&Restore {
176 expires_in,
177 peer_id,
178 acquired,
179 })
180 .send()
181 .await?;
182 error_for_status(response).await?;
183 Ok(())
184 }
185
186 /// Send a PUT request to the server updating the expiration timestamp
187 pub async fn heartbeat(&self, peer_id: u64, expires_in: Duration) -> Result<(), Error> {
188 let mut url = self.url.clone();
189 url.path_segments_mut()
190 .unwrap()
191 .extend(["peers", &peer_id.to_string()]);
192 let response = self
193 .client
194 .put(url)
195 .json(&ExpiresIn { expires_in })
196 .send()
197 .await?;
198 error_for_status(response).await?;
199 Ok(())
200 }
201
202 /// Release a lock to a semaphore for a specific peer
203 pub async fn release_lock(&self, peer_id: u64, semaphore: &str) -> Result<(), Error> {
204 let mut url = self.url.clone();
205 url.path_segments_mut()
206 .unwrap()
207 .extend(["peers", &peer_id.to_string(), semaphore]);
208 let response = self.client.delete(url).send().await?;
209 error_for_status(response).await?;
210 Ok(())
211 }
212
213 pub async fn list_of_peers(&self) -> Result<Vec<PeerDescription>, Error> {
214 let mut url = self.url.clone();
215 url.path_segments_mut()
216 .unwrap()
217 // Use empty string to achieve trailing slash (`/`) in path
218 .extend(["peers", ""]);
219 let response = self.client.get(url).send().await?;
220 let peers = error_for_status(response).await?.json().await?;
221 Ok(peers)
222 }
223}
224
225/// The properties and state of a peer, as returned by throttle then listing peers.
226#[derive(Deserialize)]
227pub struct PeerDescription {}
228
229/// Used as a query parameter in requests. E.g. `?expires_in=5m`.
230#[derive(Serialize)]
231struct ExpiresIn {
232 #[serde(with = "humantime_serde")]
233 expires_in: Duration,
234}
235
236#[derive(Serialize)]
237struct Restore<'a> {
238 #[serde(with = "humantime_serde")]
239 expires_in: Duration,
240 peer_id: u64,
241 acquired: &'a HashMap<String, u32>,
242}