1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use crate::error::*;
use etcd_client::{Client, LeaseKeepAliveStream, LeaseKeeper};
use log::*;
use std::{sync::Arc, time::Duration};
use tokio::{select, spawn, sync::oneshot, task::JoinHandle, time::delay_for};
pub(crate) struct LeaseInner {
pub stop_tx: oneshot::Sender<()>,
pub keep_alive_handle: JoinHandle<Result<(), etcd_client::Error>>,
}
#[derive(Clone)]
pub struct Lease {
pub(crate) inner: Arc<LeaseInner>,
pub(crate) client: Client,
pub(crate) lease_id: i64,
}
async fn process_keep_alive(
mut keeper: LeaseKeeper,
mut keep_alive_stream: LeaseKeepAliveStream,
ttl: i64,
mut stop: oneshot::Receiver<()>,
) -> Result<(), etcd_client::Error> {
loop {
let fut = async {
delay_for(Duration::from_secs_f64((ttl as f64) / 2.0)).await;
keeper.keep_alive().await?;
trace!("Sent keepalive message (lease: {:#x})", keeper.id());
keep_alive_stream.message().await?;
Result::<_, etcd_client::Error>::Ok(())
};
select! {
res = fut => res?,
_ = &mut stop => break,
};
}
Ok(())
}
pub async fn acquire_lease(client: &Client, ttl: i64) -> CrateResult<Lease> {
let mut client = client.clone();
let grant_response = client.lease_grant(ttl, None).await?;
let lease_id = grant_response.id();
let ttl = grant_response.ttl();
let (lease_keeper, lease_keeper_stream) = client.lease_keep_alive(lease_id).await?;
let (stop_tx, stop_rx) = oneshot::channel();
let keep_alive_handle = spawn(process_keep_alive(
lease_keeper,
lease_keeper_stream,
ttl,
stop_rx,
));
Ok(Lease {
inner: Arc::new(LeaseInner {
stop_tx,
keep_alive_handle,
}),
client,
lease_id,
})
}