1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use crate::error::*;
use etcd_client::{Client, LeaseKeepAliveStream, LeaseKeeper};
use log::*;
use std::{sync::Arc, time::Duration};
use tokio::{select, spawn, sync::oneshot, task::JoinHandle, time::delay_for};

pub(crate) struct LeaseInner {
    pub stop_tx: oneshot::Sender<()>,
    pub keep_alive_handle: JoinHandle<Result<(), etcd_client::Error>>,
}

#[derive(Clone)]
pub struct Lease {
    pub(crate) inner: Arc<LeaseInner>,
    pub(crate) client: Client,
    pub(crate) lease_id: i64,
}

async fn process_keep_alive(
    mut keeper: LeaseKeeper,
    mut keep_alive_stream: LeaseKeepAliveStream,
    ttl: i64,
    mut stop: oneshot::Receiver<()>,
) -> Result<(), etcd_client::Error> {
    loop {
        let fut = async {
            delay_for(Duration::from_secs_f64((ttl as f64) / 2.0)).await;

            keeper.keep_alive().await?;
            trace!("Sent keepalive message (lease: {:#x})", keeper.id());

            keep_alive_stream.message().await?;

            Result::<_, etcd_client::Error>::Ok(())
        };

        select! {
            res = fut => res?,
            _ = &mut stop => break,
        };
    }

    Ok(())
}

/// Acquire an etcd [`Lease`] that is continually kept-alive.
///
/// A lease can be used to request a lock from the etcd server that
/// will be purged if the client does not keep the lock alive.
/// The returned [`Lease`] object will [`spawn`] a task that will
/// ping the server with a keep-alive request every `ttl / 2` seconds.
///
/// # Example
/// ```no_run
/// # async fn test(client: etcd_client::Client) -> Result<(), tokio_etcd_lock::Error> {
/// let mut lease = tokio_etcd_lock::acquire_lease(&client, 10).await?;
/// # Ok(())
/// # }
/// ```
///
/// [`Lease`]: ./struct.lease.html
/// [`spawn`]: https://docs.rs/tokio/0.2.22/tokio/fn.spawn.html
pub async fn acquire_lease(client: &Client, ttl: i64) -> CrateResult<Lease> {
    let mut client = client.clone();

    let grant_response = client.lease_grant(ttl, None).await?;

    let lease_id = grant_response.id();
    let ttl = grant_response.ttl();

    let (lease_keeper, lease_keeper_stream) = client.lease_keep_alive(lease_id).await?;

    let (stop_tx, stop_rx) = oneshot::channel();

    let keep_alive_handle = spawn(process_keep_alive(
        lease_keeper,
        lease_keeper_stream,
        ttl,
        stop_rx,
    ));

    Ok(Lease {
        inner: Arc::new(LeaseInner {
            stop_tx,
            keep_alive_handle,
        }),
        client,
        lease_id,
    })
}