1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
//! Etcd RPC interfaces.

mod pb;

pub mod auth;
pub mod cluster;
pub mod election;
pub mod kv;
pub mod lease;
pub mod lock;
pub mod maintenance;
pub mod watch;

use crate::error::Result;
use pb::etcdserverpb::ResponseHeader as PbResponseHeader;
use pb::mvccpb::KeyValue as PbKeyValue;

/// General `etcd` response header.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct ResponseHeader(PbResponseHeader);

impl ResponseHeader {
    /// Create a response header from pb header.
    #[inline]
    pub(crate) const fn new(header: PbResponseHeader) -> Self {
        Self(header)
    }

    /// The ID of the cluster which sent the response.
    #[inline]
    pub const fn cluster_id(&self) -> u64 {
        self.0.cluster_id
    }

    /// The ID of the member which sent the response.
    #[inline]
    pub const fn member_id(&self) -> u64 {
        self.0.member_id
    }

    /// The key-value store revision when the request was applied.
    /// For watch progress responses, the header.revision() indicates progress. All future events
    /// received in this stream are guaranteed to have a higher revision number than the
    /// header.revision() number.
    #[inline]
    pub const fn revision(&self) -> i64 {
        self.0.revision
    }

    /// The raft term when the request was applied.
    #[inline]
    pub const fn raft_term(&self) -> u64 {
        self.0.raft_term
    }
}

impl From<&PbResponseHeader> for &ResponseHeader {
    #[inline]
    fn from(src: &PbResponseHeader) -> Self {
        unsafe { &*(src as *const _ as *const ResponseHeader) }
    }
}

/// Key-value pair.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct KeyValue(PbKeyValue);

impl KeyValue {
    /// Create a KeyValue from pb kv.
    #[inline]
    pub(crate) const fn new(kv: PbKeyValue) -> Self {
        Self(kv)
    }

    /// The key in bytes. An empty key is not allowed.
    #[inline]
    pub fn key(&self) -> &[u8] {
        &self.0.key
    }

    /// The key in string. An empty key is not allowed.
    #[inline]
    pub fn key_str(&self) -> Result<&str> {
        std::str::from_utf8(self.key()).map_err(From::from)
    }

    /// The key in string. An empty key is not allowed.
    ///
    /// # Safety
    /// This function is unsafe because it does not check that the bytes of the key are valid UTF-8.
    /// If this constraint is violated, undefined behavior results,
    /// as the rest of Rust assumes that [`&str`]s are valid UTF-8.
    #[inline]
    pub unsafe fn key_str_unchecked(&self) -> &str {
        std::str::from_utf8_unchecked(self.key())
    }

    /// The value held by the key, in bytes.
    #[inline]
    pub fn value(&self) -> &[u8] {
        &self.0.value
    }

    /// The value held by the key, in string.
    #[inline]
    pub fn value_str(&self) -> Result<&str> {
        std::str::from_utf8(self.value()).map_err(From::from)
    }

    /// The value held by the key, in bytes.
    ///
    /// # Safety
    /// This function is unsafe because it does not check that the bytes of the value are valid UTF-8.
    /// If this constraint is violated, undefined behavior results,
    /// as the rest of Rust assumes that [`&str`]s are valid UTF-8.
    #[inline]
    pub unsafe fn value_str_unchecked(&self) -> &str {
        std::str::from_utf8_unchecked(self.value())
    }

    /// The revision of last creation on this key.
    #[inline]
    pub const fn create_revision(&self) -> i64 {
        self.0.create_revision
    }

    /// The revision of last modification on this key.
    #[inline]
    pub const fn mod_revision(&self) -> i64 {
        self.0.mod_revision
    }

    /// The version of the key. A deletion resets
    /// the version to zero and any modification of the key
    /// increases its version.
    #[inline]
    pub const fn version(&self) -> i64 {
        self.0.version
    }

    /// The ID of the lease that attached to key.
    /// When the attached lease expires, the key will be deleted.
    /// If lease is 0, then no lease is attached to the key.
    #[inline]
    pub const fn lease(&self) -> i64 {
        self.0.lease
    }
}

impl From<&PbKeyValue> for &KeyValue {
    #[inline]
    fn from(src: &PbKeyValue) -> Self {
        unsafe { &*(src as *const _ as *const KeyValue) }
    }
}

/// Get prefix end key of `key`.
#[inline]
fn get_prefix(key: &[u8]) -> Vec<u8> {
    for (i, v) in key.iter().enumerate().rev() {
        if *v < 0xFF {
            let mut end = Vec::from(&key[..=i]);
            end[i] = *v + 1;
            return end;
        }
    }

    // next prefix does not exist (e.g., 0xffff);
    vec![0]
}

/// Key range builder.
#[derive(Debug, Default, Clone)]
struct KeyRange {
    key: Vec<u8>,
    range_end: Vec<u8>,
    with_prefix: bool,
    with_from_key: bool,
    with_all_key: bool,
}

impl KeyRange {
    #[inline]
    pub const fn new() -> Self {
        KeyRange {
            key: Vec::new(),
            range_end: Vec::new(),
            with_prefix: false,
            with_from_key: false,
            with_all_key: false,
        }
    }

    /// Sets key.
    #[inline]
    pub fn with_key(&mut self, key: impl Into<Vec<u8>>) {
        self.key = key.into();
    }

    /// Specifies the range end.
    /// `end_key` must be lexicographically greater than start key.
    #[inline]
    pub fn with_range(&mut self, end_key: impl Into<Vec<u8>>) {
        self.range_end = end_key.into();
        self.with_prefix = false;
        self.with_from_key = false;
        self.with_all_key = false;
    }

    /// Sets all keys >= key.
    #[inline]
    pub fn with_from_key(&mut self) {
        self.with_from_key = true;
        self.with_prefix = false;
        self.with_all_key = false;
    }

    /// Sets all keys prefixed with key.
    #[inline]
    pub fn with_prefix(&mut self) {
        self.with_prefix = true;
        self.with_from_key = false;
        self.with_all_key = false;
    }

    /// Sets all keys.
    #[inline]
    pub fn with_all_keys(&mut self) {
        self.with_all_key = true;
        self.with_prefix = false;
        self.with_from_key = false;
    }

    /// Build the key and range end.
    #[inline]
    pub fn build(mut self) -> (Vec<u8>, Vec<u8>) {
        if self.with_all_key {
            self.key = vec![b'\0'];
            self.range_end = vec![b'\0'];
        } else if self.with_from_key {
            if self.key.is_empty() {
                self.key = vec![b'\0'];
            }
            self.range_end = vec![b'\0'];
        } else if self.with_prefix {
            if self.key.is_empty() {
                self.key = vec![b'\0'];
                self.range_end = vec![b'\0'];
            } else {
                self.range_end = get_prefix(&self.key);
            }
        }

        (self.key, self.range_end)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_get_prefix() {
        assert_eq!(get_prefix(b"foo1").as_slice(), b"foo2");
        assert_eq!(get_prefix(b"\xFF").as_slice(), b"\0");
        assert_eq!(get_prefix(b"foo\xFF").as_slice(), b"fop");
    }
}