1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
//! Etcd Lock RPC.

use super::pb::v3lockpb;

pub use v3lockpb::lock_client::LockClient as PbLockClient;
pub use v3lockpb::{
    LockRequest as PbLockRequest, LockResponse as PbLockResponse, UnlockRequest as PbUnlockRequest,
    UnlockResponse as PbUnlockResponse,
};

use crate::error::Result;
use crate::rpc::ResponseHeader;
use tonic::transport::Channel;
use tonic::{Interceptor, IntoRequest, Request};

/// Client for Lock operations.
#[repr(transparent)]
#[derive(Clone)]
pub struct LockClient {
    inner: PbLockClient<Channel>,
}

impl LockClient {
    /// Creates a lock client.
    #[inline]
    pub fn new(channel: Channel, interceptor: Option<Interceptor>) -> Self {
        let inner = match interceptor {
            Some(it) => PbLockClient::with_interceptor(channel, it),
            None => PbLockClient::new(channel),
        };

        Self { inner }
    }

    /// Acquires a distributed shared lock on a given named lock.
    /// On success, it will return a unique key that exists so long as the
    /// lock is held by the caller. This key can be used in conjunction with
    /// transactions to safely ensure updates to etcd only occur while holding
    /// lock ownership. The lock is held until Unlock is called on the key or the
    /// lease associate with the owner expires.
    #[inline]
    pub async fn lock(
        &mut self,
        name: impl Into<Vec<u8>>,
        options: Option<LockOptions>,
    ) -> Result<LockResponse> {
        let resp = self
            .inner
            .lock(options.unwrap_or_default().with_name(name))
            .await?
            .into_inner();
        Ok(LockResponse::new(resp))
    }

    /// Takes a key returned by Lock and releases the hold on lock. The
    /// next Lock caller waiting for the lock will then be woken up and given
    /// ownership of the lock.
    #[inline]
    pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
        let resp = self
            .inner
            .unlock(UnlockOptions::new().with_key(key))
            .await?
            .into_inner();
        Ok(UnlockResponse::new(resp))
    }
}

/// Options for `Lock` operation.
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct LockOptions(PbLockRequest);

impl LockOptions {
    /// name is the identifier for the distributed shared lock to be acquired.
    #[inline]
    fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
        self.0.name = name.into();
        self
    }

    /// Creates a `LockOptions`.
    #[inline]
    pub const fn new() -> Self {
        Self(PbLockRequest {
            name: Vec::new(),
            lease: 0,
        })
    }

    /// `lease` is the ID of the lease that will be attached to ownership of the
    /// lock. If the lease expires or is revoked and currently holds the lock,
    /// the lock is automatically released. Calls to Lock with the same lease will
    /// be treated as a single acquisition; locking twice with the same lease is a
    /// no-op.
    #[inline]
    pub const fn with_lease(mut self, lease: i64) -> Self {
        self.0.lease = lease;
        self
    }
}

impl From<LockOptions> for PbLockRequest {
    #[inline]
    fn from(options: LockOptions) -> Self {
        options.0
    }
}

impl IntoRequest<PbLockRequest> for LockOptions {
    #[inline]
    fn into_request(self) -> Request<PbLockRequest> {
        Request::new(self.into())
    }
}

/// Response for `Lock` operation.
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct LockResponse(PbLockResponse);

impl LockResponse {
    /// Create a new `LockResponse` from pb lock response.
    #[inline]
    const fn new(resp: PbLockResponse) -> Self {
        Self(resp)
    }

    /// Get response header.
    #[inline]
    pub fn header(&self) -> Option<&ResponseHeader> {
        self.0.header.as_ref().map(From::from)
    }

    /// Takes the header out of the response, leaving a [`None`] in its place.
    #[inline]
    pub fn take_header(&mut self) -> Option<ResponseHeader> {
        self.0.header.take().map(ResponseHeader::new)
    }

    /// A key that will exist on etcd for the duration that the Lock caller
    /// owns the lock. Users should not modify this key or the lock may exhibit
    /// undefined behavior.
    #[inline]
    pub fn key(&self) -> &[u8] {
        &self.0.key
    }
}

/// Options for `Unlock` operation.
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct UnlockOptions(PbUnlockRequest);

impl UnlockOptions {
    /// key is the lock ownership key granted by Lock.
    #[inline]
    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
        self.0.key = key.into();
        self
    }

    /// Creates a `UnlockOptions`.
    #[inline]
    pub const fn new() -> Self {
        Self(PbUnlockRequest { key: Vec::new() })
    }
}

impl From<UnlockOptions> for PbUnlockRequest {
    #[inline]
    fn from(options: UnlockOptions) -> Self {
        options.0
    }
}

impl IntoRequest<PbUnlockRequest> for UnlockOptions {
    #[inline]
    fn into_request(self) -> Request<PbUnlockRequest> {
        Request::new(self.into())
    }
}

/// Response for `Unlock` operation.
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct UnlockResponse(PbUnlockResponse);

impl UnlockResponse {
    /// Create a new `UnlockResponse` from pb unlock response.
    #[inline]
    const fn new(resp: PbUnlockResponse) -> Self {
        Self(resp)
    }

    /// Get response header.
    #[inline]
    pub fn header(&self) -> Option<&ResponseHeader> {
        self.0.header.as_ref().map(From::from)
    }

    /// Takes the header out of the response, leaving a [`None`] in its place.
    #[inline]
    pub fn take_header(&mut self) -> Option<ResponseHeader> {
        self.0.header.take().map(ResponseHeader::new)
    }
}