etcd_client/rpc/
watch.rs

1//! Etcd Watch RPC.
2
3pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::auth::AuthService;
6use crate::channel::Channel;
7use crate::error::{Error, Result};
8use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
9use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
10use crate::rpc::pb::etcdserverpb::{
11    WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
12    WatchResponse as PbWatchResponse,
13};
14use crate::rpc::pb::mvccpb::Event as PbEvent;
15use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
16use http::HeaderValue;
17use std::pin::Pin;
18use std::sync::{Arc, RwLock};
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Sender};
21use tokio_stream::{wrappers::ReceiverStream, Stream};
22use tonic::Streaming;
23
24/// Client for watch operations.
25#[repr(transparent)]
26#[derive(Clone)]
27pub struct WatchClient {
28    inner: PbWatchClient<AuthService<Channel>>,
29}
30
31impl WatchClient {
32    /// Creates a watch client.
33    #[inline]
34    pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
35        let inner = PbWatchClient::new(AuthService::new(channel, auth_token));
36        Self { inner }
37    }
38
39    /// Limits the maximum size of a decoded message.
40    ///
41    /// Default: `4MB`
42    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
43        self.inner = self.inner.max_decoding_message_size(limit);
44        self
45    }
46
47    /// Watches for events happening or that have happened. Both input and output
48    /// are streams; the input stream is for creating and canceling watchers and the output
49    /// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
50    /// for several watches at once. The entire event history can be watched starting from the
51    /// last compaction revision.
52    pub async fn watch(
53        &mut self,
54        key: impl Into<Vec<u8>>,
55        options: Option<WatchOptions>,
56    ) -> Result<(Watcher, WatchStream)> {
57        let (request_sender, request_receiver) = channel::<WatchRequest>(100);
58        let request_stream = ReceiverStream::new(request_receiver);
59
60        request_sender
61            .send(options.unwrap_or_default().with_key(key).into())
62            .await
63            .map_err(|e| Error::WatchError(e.to_string()))?;
64
65        let response_stream = self.inner.watch(request_stream).await?.into_inner();
66        let mut watch_stream = WatchStream::new(response_stream);
67
68        let watch_id = match watch_stream.message().await? {
69            Some(resp) => {
70                assert!(resp.created(), "not a create watch response");
71                resp.watch_id()
72            }
73            None => {
74                return Err(Error::WatchError("failed to create watch".to_string()));
75            }
76        };
77
78        Ok((Watcher::new(watch_id, request_sender), watch_stream))
79    }
80}
81
82/// Options for `Watch` operation.
83#[derive(Debug, Default, Clone)]
84pub struct WatchOptions {
85    req: WatchCreateRequest,
86    key_range: KeyRange,
87}
88
89impl WatchOptions {
90    /// Sets key.
91    #[inline]
92    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
93        self.key_range.with_key(key);
94        self
95    }
96
97    /// Creates a new `WatchOptions`.
98    #[inline]
99    pub const fn new() -> Self {
100        Self {
101            req: WatchCreateRequest {
102                key: Vec::new(),
103                range_end: Vec::new(),
104                start_revision: 0,
105                progress_notify: false,
106                filters: Vec::new(),
107                prev_kv: false,
108                watch_id: 0,
109                fragment: false,
110            },
111            key_range: KeyRange::new(),
112        }
113    }
114
115    /// Sets the end of the range [key, end) to watch. If `end` is not given,
116    /// only the key argument is watched. If `end` is equal to '\0', all keys greater than
117    /// or equal to the key argument are watched.
118    #[inline]
119    pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
120        self.key_range.with_range(end);
121        self
122    }
123
124    /// Watches all keys >= key.
125    #[inline]
126    pub fn with_from_key(mut self) -> Self {
127        self.key_range.with_from_key();
128        self
129    }
130
131    /// Watches all keys prefixed with key.
132    #[inline]
133    pub fn with_prefix(mut self) -> Self {
134        self.key_range.with_prefix();
135        self
136    }
137
138    /// Watches all keys.
139    #[inline]
140    pub fn with_all_keys(mut self) -> Self {
141        self.key_range.with_all_keys();
142        self
143    }
144
145    /// Sets the revision to watch from (inclusive). No `start_revision` is "now".
146    #[inline]
147    pub const fn with_start_revision(mut self, revision: i64) -> Self {
148        self.req.start_revision = revision;
149        self
150    }
151
152    /// `progress_notify` is set so that the etcd server will periodically send a `WatchResponse` with
153    /// no events to the new watcher if there are no recent events. It is useful when clients
154    /// wish to recover a disconnected watcher starting from a recent known revision.
155    /// The etcd server may decide how often it will send notifications based on current load.
156    #[inline]
157    pub const fn with_progress_notify(mut self) -> Self {
158        self.req.progress_notify = true;
159        self
160    }
161
162    /// Filter the events at server side before it sends back to the watcher.
163    #[inline]
164    pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
165        self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
166        self
167    }
168
169    /// If `prev_kv` is set, created watcher gets the previous KV before the event happens.
170    /// If the previous KV is already compacted, nothing will be returned.
171    #[inline]
172    pub const fn with_prev_key(mut self) -> Self {
173        self.req.prev_kv = true;
174        self
175    }
176
177    /// If `watch_id` is provided and non-zero, it will be assigned to this watcher.
178    /// Since creating a watcher in etcd is not a synchronous operation,
179    /// this can be used ensure that ordering is correct when creating multiple
180    /// watchers on the same stream. Creating a watcher with an ID already in
181    /// use on the stream will cause an error to be returned.
182    #[inline]
183    pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
184        self.req.watch_id = watch_id;
185        self
186    }
187
188    /// Enables splitting large revisions into multiple watch responses.
189    #[inline]
190    pub const fn with_fragment(mut self) -> Self {
191        self.req.fragment = true;
192        self
193    }
194}
195
196impl From<WatchOptions> for WatchCreateRequest {
197    #[inline]
198    fn from(mut options: WatchOptions) -> Self {
199        let (key, range_end) = options.key_range.build();
200        options.req.key = key;
201        options.req.range_end = range_end;
202        options.req
203    }
204}
205
206impl From<WatchOptions> for WatchRequest {
207    #[inline]
208    fn from(options: WatchOptions) -> Self {
209        Self {
210            request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
211        }
212    }
213}
214
215impl From<WatchCancelRequest> for WatchRequest {
216    #[inline]
217    fn from(req: WatchCancelRequest) -> Self {
218        Self {
219            request_union: Some(WatchRequestUnion::CancelRequest(req)),
220        }
221    }
222}
223
224impl From<WatchProgressRequest> for WatchRequest {
225    #[inline]
226    fn from(req: WatchProgressRequest) -> Self {
227        Self {
228            request_union: Some(WatchRequestUnion::ProgressRequest(req)),
229        }
230    }
231}
232
233/// Watch filter type.
234#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
235#[repr(i32)]
236pub enum WatchFilterType {
237    /// Filter out put event.
238    NoPut = 0,
239    /// Filter out delete event.
240    NoDelete = 1,
241}
242
243/// Response for `Watch` operation.
244#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
245#[derive(Debug, Clone)]
246#[repr(transparent)]
247pub struct WatchResponse(PbWatchResponse);
248
249impl WatchResponse {
250    /// Creates a new `WatchResponse`.
251    #[inline]
252    const fn new(resp: PbWatchResponse) -> Self {
253        Self(resp)
254    }
255
256    /// Watch response header.
257    #[inline]
258    pub fn header(&self) -> Option<&ResponseHeader> {
259        self.0.header.as_ref().map(From::from)
260    }
261
262    /// Takes the header out of the response, leaving a [`None`] in its place.
263    #[inline]
264    pub fn take_header(&mut self) -> Option<ResponseHeader> {
265        self.0.header.take().map(ResponseHeader::new)
266    }
267
268    /// The ID of the watcher that corresponds to the response.
269    #[inline]
270    pub const fn watch_id(&self) -> i64 {
271        self.0.watch_id
272    }
273
274    /// created is set to true if the response is for a create watch request.
275    /// The client should record the watch_id and expect to receive events for
276    /// the created watcher from the same stream.
277    /// All events sent to the created watcher will attach with the same watch_id.
278    #[inline]
279    pub const fn created(&self) -> bool {
280        self.0.created
281    }
282
283    /// `canceled` is set to true if the response is for a cancel watch request.
284    /// No further events will be sent to the canceled watcher.
285    #[inline]
286    pub const fn canceled(&self) -> bool {
287        self.0.canceled
288    }
289
290    /// `compact_revision` is set to the minimum index if a watcher tries to watch
291    /// at a compacted index.
292    ///
293    /// This happens when creating a watcher at a compacted revision or the watcher cannot
294    /// catch up with the progress of the key-value store.
295    ///
296    /// The client should treat the watcher as canceled and should not try to create any
297    /// watcher with the same start_revision again.
298    #[inline]
299    pub const fn compact_revision(&self) -> i64 {
300        self.0.compact_revision
301    }
302
303    /// Indicates the reason for canceling the watcher.
304    #[inline]
305    pub fn cancel_reason(&self) -> &str {
306        &self.0.cancel_reason
307    }
308
309    /// Events happened on the watched keys.
310    #[inline]
311    pub fn events(&self) -> &[Event] {
312        unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
313    }
314}
315
316/// Watching event.
317#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
318#[derive(Debug, Clone)]
319#[repr(transparent)]
320pub struct Event(PbEvent);
321
322impl Event {
323    /// The kind of event. If type is a `Put`, it indicates
324    /// new data has been stored to the key. If type is a `Delete`,
325    /// it indicates the key was deleted.
326    #[inline]
327    pub fn event_type(&self) -> EventType {
328        match self.0.r#type {
329            0 => EventType::Put,
330            1 => EventType::Delete,
331            i => panic!("unknown event {}", i),
332        }
333    }
334
335    /// The KeyValue for the event.
336    /// A `Put` event contains current kv pair.
337    /// A `Put` event with `kv.version()==1` indicates the creation of a key.
338    /// A `Delete` event contains the deleted key with
339    /// its modification revision set to the revision of deletion.
340    #[inline]
341    pub fn kv(&self) -> Option<&KeyValue> {
342        self.0.kv.as_ref().map(From::from)
343    }
344
345    /// The key-value pair before the event happens.
346    #[inline]
347    pub fn prev_kv(&self) -> Option<&KeyValue> {
348        self.0.prev_kv.as_ref().map(From::from)
349    }
350}
351
352/// The watching handle.
353#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
354#[derive(Debug)]
355pub struct Watcher {
356    watch_id: i64,
357    sender: Sender<WatchRequest>,
358}
359
360impl Watcher {
361    /// Creates a new `Watcher`.
362    #[inline]
363    const fn new(watch_id: i64, sender: Sender<WatchRequest>) -> Self {
364        Self { watch_id, sender }
365    }
366
367    /// The ID of the watcher.
368    #[inline]
369    pub const fn watch_id(&self) -> i64 {
370        self.watch_id
371    }
372
373    /// Watches for events happening or that have happened.
374    #[inline]
375    pub async fn watch(
376        &mut self,
377        key: impl Into<Vec<u8>>,
378        options: Option<WatchOptions>,
379    ) -> Result<()> {
380        self.sender
381            .send(options.unwrap_or_default().with_key(key).into())
382            .await
383            .map_err(|e| Error::WatchError(e.to_string()))
384    }
385
386    /// Cancels this watcher.
387    #[inline]
388    pub async fn cancel(&mut self) -> Result<()> {
389        let req = WatchCancelRequest {
390            watch_id: self.watch_id,
391        };
392        self.sender
393            .send(req.into())
394            .await
395            .map_err(|e| Error::WatchError(e.to_string()))
396    }
397
398    /// Cancels watch by specified `watch_id`.
399    #[inline]
400    pub async fn cancel_by_id(&mut self, watch_id: i64) -> Result<()> {
401        let req = WatchCancelRequest { watch_id };
402        self.sender
403            .send(req.into())
404            .await
405            .map_err(|e| Error::WatchError(e.to_string()))
406    }
407
408    /// Requests a watch stream progress status be sent in the watch response stream as soon as
409    /// possible.
410    #[inline]
411    pub async fn request_progress(&mut self) -> Result<()> {
412        let req = WatchProgressRequest {};
413        self.sender
414            .send(req.into())
415            .await
416            .map_err(|e| Error::WatchError(e.to_string()))
417    }
418}
419
420/// The watch response stream.
421#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
422#[derive(Debug)]
423pub struct WatchStream {
424    stream: Streaming<PbWatchResponse>,
425}
426
427impl WatchStream {
428    /// Creates a new `WatchStream`.
429    #[inline]
430    const fn new(stream: Streaming<PbWatchResponse>) -> Self {
431        Self { stream }
432    }
433
434    /// Fetch the next message from this stream.
435    #[inline]
436    pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
437        match self.stream.message().await? {
438            Some(resp) => Ok(Some(WatchResponse::new(resp))),
439            None => Ok(None),
440        }
441    }
442}
443
444impl Stream for WatchStream {
445    type Item = Result<WatchResponse>;
446
447    #[inline]
448    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
449        Pin::new(&mut self.get_mut().stream)
450            .poll_next(cx)
451            .map(|t| match t {
452                Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
453                Some(Err(e)) => Some(Err(From::from(e))),
454                None => None,
455            })
456    }
457}