etcd_rs/lease/
mod.rs

1//! Leases are a mechanism for detecting client liveness. The cluster grants leases with a time-to-live. A lease expires if the etcd cluster does not receive a keepAlive within a given TTL period.
2//!
3//! # Examples
4//!
5//! Grant lease and keep lease alive
6
7mod grant;
8mod keep_alive;
9mod revoke;
10mod time_to_live;
11
12pub use grant::{LeaseGrantRequest, LeaseGrantResponse};
13pub use keep_alive::{LeaseKeepAliveRequest, LeaseKeepAliveResponse};
14pub use revoke::{LeaseRevokeRequest, LeaseRevokeResponse};
15pub use time_to_live::{LeaseTimeToLiveRequest, LeaseTimeToLiveResponse};
16
17use async_trait::async_trait;
18use tokio::sync::mpsc::Sender;
19use tonic::Streaming;
20
21use crate::{Error, Result};
22
23pub type LeaseId = i64;
24
25#[async_trait]
26pub trait LeaseOp {
27    async fn grant_lease<R>(&self, req: R) -> Result<LeaseGrantResponse>
28    where
29        R: Into<LeaseGrantRequest> + Send;
30
31    async fn revoke<R>(&self, req: R) -> Result<LeaseRevokeResponse>
32    where
33        R: Into<LeaseRevokeRequest> + Send;
34
35    async fn keep_alive_for(&self, lease_id: LeaseId) -> Result<LeaseKeepAlive>;
36
37    async fn time_to_live<R>(&self, req: R) -> Result<LeaseTimeToLiveResponse>
38    where
39        R: Into<LeaseTimeToLiveRequest> + Send;
40}
41
42pub struct LeaseKeepAlive {
43    id: LeaseId,
44    req_tx: Sender<crate::proto::etcdserverpb::LeaseKeepAliveRequest>,
45    resp_rx: Streaming<crate::proto::etcdserverpb::LeaseKeepAliveResponse>,
46}
47
48impl LeaseKeepAlive {
49    pub(crate) fn new(
50        id: LeaseId,
51        req_tx: Sender<crate::proto::etcdserverpb::LeaseKeepAliveRequest>,
52        resp_rx: Streaming<crate::proto::etcdserverpb::LeaseKeepAliveResponse>,
53    ) -> Self {
54        Self {
55            id,
56            req_tx,
57            resp_rx,
58        }
59    }
60
61    #[inline]
62    pub fn lease_id(&mut self) -> LeaseId {
63        self.id
64    }
65
66    pub async fn keep_alive(&mut self) -> Result<Option<LeaseKeepAliveResponse>> {
67        let req = LeaseKeepAliveRequest::new(self.lease_id());
68
69        self.req_tx
70            .send(req.into())
71            .await
72            .map_err(|_| Error::ChannelClosed)?;
73
74        Ok(match self.resp_rx.message().await? {
75            Some(resp) => Some(resp.into()),
76            None => None,
77        })
78    }
79}