etcd_client/rpc/
lease.rs

1//! Etcd Lease RPC.
2
3use crate::auth::AuthService;
4use crate::channel::Channel;
5use crate::error::Result;
6use crate::rpc::pb::etcdserverpb::lease_client::LeaseClient as PbLeaseClient;
7use crate::rpc::pb::etcdserverpb::{
8    LeaseGrantRequest as PbLeaseGrantRequest, LeaseGrantResponse as PbLeaseGrantResponse,
9    LeaseKeepAliveRequest as PbLeaseKeepAliveRequest,
10    LeaseKeepAliveResponse as PbLeaseKeepAliveResponse, LeaseLeasesRequest as PbLeaseLeasesRequest,
11    LeaseLeasesResponse as PbLeaseLeasesResponse, LeaseRevokeRequest as PbLeaseRevokeRequest,
12    LeaseRevokeResponse as PbLeaseRevokeResponse, LeaseStatus as PbLeaseStatus,
13    LeaseTimeToLiveRequest as PbLeaseTimeToLiveRequest,
14    LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse,
15};
16use crate::rpc::ResponseHeader;
17use crate::vec::VecExt;
18use crate::Error;
19use http::HeaderValue;
20use std::pin::Pin;
21use std::sync::{Arc, RwLock};
22use std::task::{Context, Poll};
23use tokio::sync::mpsc::{channel, Sender};
24use tokio_stream::wrappers::ReceiverStream;
25use tokio_stream::Stream;
26use tonic::{IntoRequest, Request, Streaming};
27
28/// Client for lease operations.
29#[repr(transparent)]
30#[derive(Clone)]
31pub struct LeaseClient {
32    inner: PbLeaseClient<AuthService<Channel>>,
33}
34
35impl LeaseClient {
36    /// Creates a `LeaseClient`.
37    #[inline]
38    pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
39        let inner = PbLeaseClient::new(AuthService::new(channel, auth_token));
40        Self { inner }
41    }
42
43    /// Creates a lease which expires if the server does not receive a keepAlive
44    /// within a given time to live period. All keys attached to the lease will be expired and
45    /// deleted if the lease expires. Each expired key generates a delete event in the event history.
46    #[inline]
47    pub async fn grant(
48        &mut self,
49        ttl: i64,
50        options: Option<LeaseGrantOptions>,
51    ) -> Result<LeaseGrantResponse> {
52        let resp = self
53            .inner
54            .lease_grant(options.unwrap_or_default().with_ttl(ttl))
55            .await?
56            .into_inner();
57        Ok(LeaseGrantResponse::new(resp))
58    }
59
60    /// Revokes a lease. All keys attached to the lease will expire and be deleted.
61    #[inline]
62    pub async fn revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
63        let resp = self
64            .inner
65            .lease_revoke(LeaseRevokeOptions::new().with_id(id))
66            .await?
67            .into_inner();
68        Ok(LeaseRevokeResponse::new(resp))
69    }
70
71    /// Keeps the lease alive by streaming keep alive requests from the client
72    /// to the server and streaming keep alive responses from the server to the client.
73    #[inline]
74    pub async fn keep_alive(&mut self, id: i64) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
75        let (sender, receiver) = channel::<PbLeaseKeepAliveRequest>(100);
76        sender
77            .send(LeaseKeepAliveOptions::new().with_id(id).into())
78            .await
79            .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))?;
80
81        let receiver = ReceiverStream::new(receiver);
82
83        let mut stream = self.inner.lease_keep_alive(receiver).await?.into_inner();
84
85        let id = match stream.message().await? {
86            Some(resp) => {
87                if resp.ttl <= 0 {
88                    return Err(Error::LeaseKeepAliveError("lease not found".to_string()));
89                }
90                resp.id
91            }
92            None => {
93                return Err(Error::WatchError(
94                    "failed to create lease keeper".to_string(),
95                ));
96            }
97        };
98
99        Ok((
100            LeaseKeeper::new(id, sender),
101            LeaseKeepAliveStream::new(stream),
102        ))
103    }
104
105    /// Retrieves lease information.
106    #[inline]
107    pub async fn time_to_live(
108        &mut self,
109        id: i64,
110        options: Option<LeaseTimeToLiveOptions>,
111    ) -> Result<LeaseTimeToLiveResponse> {
112        let resp = self
113            .inner
114            .lease_time_to_live(options.unwrap_or_default().with_id(id))
115            .await?
116            .into_inner();
117        Ok(LeaseTimeToLiveResponse::new(resp))
118    }
119
120    /// Lists all existing leases.
121    #[inline]
122    pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
123        let resp = self
124            .inner
125            .lease_leases(PbLeaseLeasesRequest {})
126            .await?
127            .into_inner();
128        Ok(LeaseLeasesResponse::new(resp))
129    }
130}
131
132/// Options for `Grant` operation.
133#[derive(Debug, Default, Clone)]
134#[repr(transparent)]
135pub struct LeaseGrantOptions(PbLeaseGrantRequest);
136
137impl LeaseGrantOptions {
138    /// Set ttl
139    #[inline]
140    const fn with_ttl(mut self, ttl: i64) -> Self {
141        self.0.ttl = ttl;
142        self
143    }
144
145    /// Set id
146    #[inline]
147    pub const fn with_id(mut self, id: i64) -> Self {
148        self.0.id = id;
149        self
150    }
151
152    /// Creates a `LeaseGrantOptions`.
153    #[inline]
154    pub const fn new() -> Self {
155        Self(PbLeaseGrantRequest { ttl: 0, id: 0 })
156    }
157}
158
159impl From<LeaseGrantOptions> for PbLeaseGrantRequest {
160    #[inline]
161    fn from(options: LeaseGrantOptions) -> Self {
162        options.0
163    }
164}
165
166impl IntoRequest<PbLeaseGrantRequest> for LeaseGrantOptions {
167    #[inline]
168    fn into_request(self) -> Request<PbLeaseGrantRequest> {
169        Request::new(self.into())
170    }
171}
172
173/// Response for `Grant` operation.
174#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
175#[derive(Debug, Clone)]
176#[repr(transparent)]
177pub struct LeaseGrantResponse(PbLeaseGrantResponse);
178
179impl LeaseGrantResponse {
180    /// Creates a new `LeaseGrantResponse` from pb lease grant response.
181    #[inline]
182    const fn new(resp: PbLeaseGrantResponse) -> Self {
183        Self(resp)
184    }
185
186    /// Get response header.
187    #[inline]
188    pub fn header(&self) -> Option<&ResponseHeader> {
189        self.0.header.as_ref().map(From::from)
190    }
191
192    /// Takes the header out of the response, leaving a [`None`] in its place.
193    #[inline]
194    pub fn take_header(&mut self) -> Option<ResponseHeader> {
195        self.0.header.take().map(ResponseHeader::new)
196    }
197
198    /// TTL is the server chosen lease time-to-live in seconds
199    #[inline]
200    pub const fn ttl(&self) -> i64 {
201        self.0.ttl
202    }
203
204    /// ID is the lease ID for the granted lease.
205    #[inline]
206    pub const fn id(&self) -> i64 {
207        self.0.id
208    }
209
210    /// Error message if return error.
211    #[inline]
212    pub fn error(&self) -> &str {
213        &self.0.error
214    }
215}
216
217/// Options for `Revoke` operation.
218#[derive(Debug, Default, Clone)]
219#[repr(transparent)]
220struct LeaseRevokeOptions(PbLeaseRevokeRequest);
221
222impl LeaseRevokeOptions {
223    /// Set id
224    #[inline]
225    fn with_id(mut self, id: i64) -> Self {
226        self.0.id = id;
227        self
228    }
229
230    /// Creates a `LeaseRevokeOptions`.
231    #[inline]
232    pub const fn new() -> Self {
233        Self(PbLeaseRevokeRequest { id: 0 })
234    }
235}
236
237impl From<LeaseRevokeOptions> for PbLeaseRevokeRequest {
238    #[inline]
239    fn from(options: LeaseRevokeOptions) -> Self {
240        options.0
241    }
242}
243
244impl IntoRequest<PbLeaseRevokeRequest> for LeaseRevokeOptions {
245    #[inline]
246    fn into_request(self) -> Request<PbLeaseRevokeRequest> {
247        Request::new(self.into())
248    }
249}
250
251/// Response for `Revoke` operation.
252#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
253#[derive(Debug, Clone)]
254#[repr(transparent)]
255pub struct LeaseRevokeResponse(PbLeaseRevokeResponse);
256
257impl LeaseRevokeResponse {
258    /// Creates a new `LeaseRevokeResponse` from pb lease revoke response.
259    #[inline]
260    const fn new(resp: PbLeaseRevokeResponse) -> Self {
261        Self(resp)
262    }
263
264    /// Get response header.
265    #[inline]
266    pub fn header(&self) -> Option<&ResponseHeader> {
267        self.0.header.as_ref().map(From::from)
268    }
269
270    /// Takes the header out of the response, leaving a [`None`] in its place.
271    #[inline]
272    pub fn take_header(&mut self) -> Option<ResponseHeader> {
273        self.0.header.take().map(ResponseHeader::new)
274    }
275}
276
277/// Options for `KeepAlive` operation.
278#[derive(Debug, Default, Clone)]
279#[repr(transparent)]
280struct LeaseKeepAliveOptions(PbLeaseKeepAliveRequest);
281
282impl LeaseKeepAliveOptions {
283    /// Set id
284    #[inline]
285    fn with_id(mut self, id: i64) -> Self {
286        self.0.id = id;
287        self
288    }
289
290    /// Creates a `LeaseKeepAliveOptions`.
291    #[inline]
292    pub const fn new() -> Self {
293        Self(PbLeaseKeepAliveRequest { id: 0 })
294    }
295}
296
297impl From<LeaseKeepAliveOptions> for PbLeaseKeepAliveRequest {
298    #[inline]
299    fn from(options: LeaseKeepAliveOptions) -> Self {
300        options.0
301    }
302}
303
304impl IntoRequest<PbLeaseKeepAliveRequest> for LeaseKeepAliveOptions {
305    #[inline]
306    fn into_request(self) -> Request<PbLeaseKeepAliveRequest> {
307        Request::new(self.into())
308    }
309}
310
311/// Response for `KeepAlive` operation.
312#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
313#[derive(Debug, Clone)]
314#[repr(transparent)]
315pub struct LeaseKeepAliveResponse(PbLeaseKeepAliveResponse);
316
317impl LeaseKeepAliveResponse {
318    /// Creates a new `LeaseKeepAliveResponse` from pb lease KeepAlive response.
319    #[inline]
320    const fn new(resp: PbLeaseKeepAliveResponse) -> Self {
321        Self(resp)
322    }
323
324    /// Get response header.
325    #[inline]
326    pub fn header(&self) -> Option<&ResponseHeader> {
327        self.0.header.as_ref().map(From::from)
328    }
329
330    /// Takes the header out of the response, leaving a [`None`] in its place.
331    #[inline]
332    pub fn take_header(&mut self) -> Option<ResponseHeader> {
333        self.0.header.take().map(ResponseHeader::new)
334    }
335
336    /// TTL is the new time-to-live for the lease.
337    #[inline]
338    pub const fn ttl(&self) -> i64 {
339        self.0.ttl
340    }
341
342    /// ID is the lease ID for the keep alive request.
343    #[inline]
344    pub const fn id(&self) -> i64 {
345        self.0.id
346    }
347}
348
349/// Options for `TimeToLive` operation.
350#[derive(Debug, Default, Clone)]
351#[repr(transparent)]
352pub struct LeaseTimeToLiveOptions(PbLeaseTimeToLiveRequest);
353
354impl LeaseTimeToLiveOptions {
355    /// ID is the lease ID for the lease.
356    #[inline]
357    const fn with_id(mut self, id: i64) -> Self {
358        self.0.id = id;
359        self
360    }
361
362    /// Keys is true to query all the keys attached to this lease.
363    #[inline]
364    pub const fn with_keys(mut self) -> Self {
365        self.0.keys = true;
366        self
367    }
368
369    /// Creates a `LeaseTimeToLiveOptions`.
370    #[inline]
371    pub const fn new() -> Self {
372        Self(PbLeaseTimeToLiveRequest { id: 0, keys: false })
373    }
374}
375
376impl From<LeaseTimeToLiveOptions> for PbLeaseTimeToLiveRequest {
377    #[inline]
378    fn from(options: LeaseTimeToLiveOptions) -> Self {
379        options.0
380    }
381}
382
383impl IntoRequest<PbLeaseTimeToLiveRequest> for LeaseTimeToLiveOptions {
384    #[inline]
385    fn into_request(self) -> Request<PbLeaseTimeToLiveRequest> {
386        Request::new(self.into())
387    }
388}
389
390/// Response for `TimeToLive` operation.
391#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
392#[derive(Debug, Clone)]
393#[repr(transparent)]
394pub struct LeaseTimeToLiveResponse(PbLeaseTimeToLiveResponse);
395
396impl LeaseTimeToLiveResponse {
397    /// Creates a new `LeaseTimeToLiveResponse` from pb lease TimeToLive response.
398    #[inline]
399    const fn new(resp: PbLeaseTimeToLiveResponse) -> Self {
400        Self(resp)
401    }
402
403    /// Get response header.
404    #[inline]
405    pub fn header(&self) -> Option<&ResponseHeader> {
406        self.0.header.as_ref().map(From::from)
407    }
408
409    /// Takes the header out of the response, leaving a [`None`] in its place.
410    #[inline]
411    pub fn take_header(&mut self) -> Option<ResponseHeader> {
412        self.0.header.take().map(ResponseHeader::new)
413    }
414
415    /// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
416    #[inline]
417    pub const fn ttl(&self) -> i64 {
418        self.0.ttl
419    }
420
421    /// ID is the lease ID from the keep alive request.
422    #[inline]
423    pub const fn id(&self) -> i64 {
424        self.0.id
425    }
426
427    /// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
428    #[inline]
429    pub const fn granted_ttl(&self) -> i64 {
430        self.0.granted_ttl
431    }
432
433    /// Keys is the list of keys attached to this lease.
434    #[inline]
435    pub fn keys(&self) -> &[Vec<u8>] {
436        &self.0.keys
437    }
438
439    #[inline]
440    pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) {
441        self.0.keys.iter_mut().for_each(|key| {
442            key.strip_key_prefix(prefix);
443        });
444    }
445}
446
447/// Response for `Leases` operation.
448#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
449#[derive(Debug, Clone)]
450#[repr(transparent)]
451pub struct LeaseLeasesResponse(PbLeaseLeasesResponse);
452
453impl LeaseLeasesResponse {
454    /// Creates a new `LeaseLeasesResponse` from pb lease Leases response.
455    #[inline]
456    const fn new(resp: PbLeaseLeasesResponse) -> Self {
457        Self(resp)
458    }
459
460    /// Get response header.
461    #[inline]
462    pub fn header(&self) -> Option<&ResponseHeader> {
463        self.0.header.as_ref().map(From::from)
464    }
465
466    /// Takes the header out of the response, leaving a [`None`] in its place.
467    #[inline]
468    pub fn take_header(&mut self) -> Option<ResponseHeader> {
469        self.0.header.take().map(ResponseHeader::new)
470    }
471
472    /// Get leases status
473    #[inline]
474    pub fn leases(&self) -> &[LeaseStatus] {
475        unsafe { &*(self.0.leases.as_slice() as *const _ as *const [LeaseStatus]) }
476    }
477}
478
479/// Lease status.
480#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
481#[derive(Debug, Clone, PartialEq)]
482#[repr(transparent)]
483pub struct LeaseStatus(PbLeaseStatus);
484
485impl LeaseStatus {
486    /// Lease id.
487    #[inline]
488    pub const fn id(&self) -> i64 {
489        self.0.id
490    }
491}
492
493impl From<&PbLeaseStatus> for &LeaseStatus {
494    #[inline]
495    fn from(src: &PbLeaseStatus) -> Self {
496        unsafe { &*(src as *const _ as *const LeaseStatus) }
497    }
498}
499
500/// The lease keep alive handle.
501#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
502#[derive(Debug)]
503pub struct LeaseKeeper {
504    id: i64,
505    sender: Sender<PbLeaseKeepAliveRequest>,
506}
507
508impl LeaseKeeper {
509    /// Creates a new `LeaseKeeper`.
510    #[inline]
511    const fn new(id: i64, sender: Sender<PbLeaseKeepAliveRequest>) -> Self {
512        Self { id, sender }
513    }
514
515    /// The lease id which user want to keep alive.
516    #[inline]
517    pub const fn id(&self) -> i64 {
518        self.id
519    }
520
521    /// Sends a keep alive request and receive response
522    #[inline]
523    pub async fn keep_alive(&mut self) -> Result<()> {
524        self.sender
525            .send(LeaseKeepAliveOptions::new().with_id(self.id).into())
526            .await
527            .map_err(|e| Error::LeaseKeepAliveError(e.to_string()))
528    }
529}
530
531/// The lease keep alive response stream.
532#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
533#[derive(Debug)]
534pub struct LeaseKeepAliveStream {
535    stream: Streaming<PbLeaseKeepAliveResponse>,
536}
537
538impl LeaseKeepAliveStream {
539    /// Creates a new `LeaseKeepAliveStream`.
540    #[inline]
541    const fn new(stream: Streaming<PbLeaseKeepAliveResponse>) -> Self {
542        Self { stream }
543    }
544
545    /// Fetches the next message from this stream.
546    #[inline]
547    pub async fn message(&mut self) -> Result<Option<LeaseKeepAliveResponse>> {
548        match self.stream.message().await? {
549            Some(resp) => Ok(Some(LeaseKeepAliveResponse::new(resp))),
550            None => Ok(None),
551        }
552    }
553}
554
555impl Stream for LeaseKeepAliveStream {
556    type Item = Result<LeaseKeepAliveResponse>;
557
558    #[inline]
559    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
560        Pin::new(&mut self.get_mut().stream)
561            .poll_next(cx)
562            .map(|t| match t {
563                Some(Ok(resp)) => Some(Ok(LeaseKeepAliveResponse::new(resp))),
564                Some(Err(e)) => Some(Err(From::from(e))),
565                None => None,
566            })
567    }
568}