Skip to main content

etcd_client/rpc/
watch.rs

1//! Etcd Watch RPC.
2
3pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::error::{Error, Result};
6use crate::intercept::InterceptedChannel;
7use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
8use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
9use crate::rpc::pb::etcdserverpb::{
10    WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
11    WatchResponse as PbWatchResponse,
12};
13use crate::rpc::pb::mvccpb::Event as PbEvent;
14use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use tokio::sync::mpsc::{channel, Sender};
18use tokio_stream::{wrappers::ReceiverStream, Stream};
19use tonic::Streaming;
20
21/// Client for watch operations.
22#[repr(transparent)]
23#[derive(Clone)]
24pub struct WatchClient {
25    inner: PbWatchClient<InterceptedChannel>,
26}
27
28impl WatchClient {
29    /// Creates a watch client.
30    #[inline]
31    pub(crate) fn new(channel: InterceptedChannel) -> Self {
32        let inner = PbWatchClient::new(channel);
33        Self { inner }
34    }
35
36    /// Limits the maximum size of a decoded message.
37    ///
38    /// Default: `4MB`
39    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
40        self.inner = self.inner.max_decoding_message_size(limit);
41        self
42    }
43
44    /// Watches for events happening or that have happened. Both input and output
45    /// are streams; the input stream is for creating and canceling watchers and the output
46    /// stream receives responses and events.
47    ///
48    /// One watch stream can watch on multiple key ranges, streaming events for several watches
49    /// are grouped by watch ID. The entire event history can be watched starting from the
50    /// last compaction revision.
51    pub async fn watch(
52        &mut self,
53        key: impl Into<Vec<u8>>,
54        options: Option<WatchOptions>,
55    ) -> Result<WatchStream> {
56        let (request_sender, request_receiver) = channel::<WatchRequest>(100);
57        request_sender
58            .send(options.unwrap_or_default().with_key(key).into())
59            .await
60            .map_err(|e| Error::WatchError(e.to_string()))?;
61        let request_stream = ReceiverStream::new(request_receiver);
62        let response_stream = self.inner.watch(request_stream).await?.into_inner();
63        Ok(WatchStream::new(request_sender, response_stream))
64    }
65}
66
67/// Options for `Watch` operation.
68#[derive(Debug, Default, Clone)]
69pub struct WatchOptions {
70    req: WatchCreateRequest,
71    key_range: KeyRange,
72}
73
74impl WatchOptions {
75    /// Sets key.
76    #[inline]
77    pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
78        self.key_range.with_key(key);
79        self
80    }
81
82    /// Creates a new `WatchOptions`.
83    #[inline]
84    pub const fn new() -> Self {
85        Self {
86            req: WatchCreateRequest {
87                key: Vec::new(),
88                range_end: Vec::new(),
89                start_revision: 0,
90                progress_notify: false,
91                filters: Vec::new(),
92                prev_kv: false,
93                watch_id: 0,
94                fragment: false,
95            },
96            key_range: KeyRange::new(),
97        }
98    }
99
100    /// Sets the end of the range `[key, end)` to watch.
101    ///
102    /// If `end` is not given, only the key argument is watched.
103    ///
104    /// If `end` is equal to `\0`, all keys greater than or equal to the key argument are watched.
105    #[inline]
106    pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
107        self.key_range.with_range(end);
108        self
109    }
110
111    /// Watches all keys >= key.
112    #[inline]
113    pub fn with_from_key(mut self) -> Self {
114        self.key_range.with_from_key();
115        self
116    }
117
118    /// Watches all keys prefixed with key.
119    #[inline]
120    pub fn with_prefix(mut self) -> Self {
121        self.key_range.with_prefix();
122        self
123    }
124
125    /// Watches all keys.
126    #[inline]
127    pub fn with_all_keys(mut self) -> Self {
128        self.key_range.with_all_keys();
129        self
130    }
131
132    /// Sets the revision to watch from (inclusive). No `start_revision` is "now".
133    #[inline]
134    pub const fn with_start_revision(mut self, revision: i64) -> Self {
135        self.req.start_revision = revision;
136        self
137    }
138
139    /// `progress_notify` is set so that the etcd server will periodically send a `WatchResponse` with
140    /// no events to the new watcher if there are no recent events. It is useful when clients
141    /// wish to recover a disconnected watcher starting from a recent known revision.
142    /// The etcd server may decide how often it will send notifications based on current load.
143    #[inline]
144    pub const fn with_progress_notify(mut self) -> Self {
145        self.req.progress_notify = true;
146        self
147    }
148
149    /// Filter the events at server side before it sends back to the watcher.
150    #[inline]
151    pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
152        self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
153        self
154    }
155
156    /// If `prev_kv` is set, created watcher gets the previous KV before the event happens.
157    /// If the previous KV is already compacted, nothing will be returned.
158    #[inline]
159    pub const fn with_prev_key(mut self) -> Self {
160        self.req.prev_kv = true;
161        self
162    }
163
164    /// If `watch_id` is provided and non-zero, it will be assigned to this watcher.
165    /// Since creating a watcher in etcd is not a synchronous operation,
166    /// this can be used ensure that ordering is correct when creating multiple
167    /// watchers on the same stream. Creating a watcher with an ID already in
168    /// use on the stream will cause an error to be returned.
169    #[inline]
170    pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
171        self.req.watch_id = watch_id;
172        self
173    }
174
175    /// Enables splitting large revisions into multiple watch responses.
176    #[inline]
177    pub const fn with_fragment(mut self) -> Self {
178        self.req.fragment = true;
179        self
180    }
181}
182
183impl From<WatchOptions> for WatchCreateRequest {
184    #[inline]
185    fn from(mut options: WatchOptions) -> Self {
186        let (key, range_end) = options.key_range.build();
187        options.req.key = key;
188        options.req.range_end = range_end;
189        options.req
190    }
191}
192
193impl From<WatchOptions> for WatchRequest {
194    #[inline]
195    fn from(options: WatchOptions) -> Self {
196        Self {
197            request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
198        }
199    }
200}
201
202impl From<WatchCancelRequest> for WatchRequest {
203    #[inline]
204    fn from(req: WatchCancelRequest) -> Self {
205        Self {
206            request_union: Some(WatchRequestUnion::CancelRequest(req)),
207        }
208    }
209}
210
211impl From<WatchProgressRequest> for WatchRequest {
212    #[inline]
213    fn from(req: WatchProgressRequest) -> Self {
214        Self {
215            request_union: Some(WatchRequestUnion::ProgressRequest(req)),
216        }
217    }
218}
219
220/// Watch filter type.
221#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
222#[repr(i32)]
223pub enum WatchFilterType {
224    /// Filter out put event.
225    NoPut = 0,
226    /// Filter out delete event.
227    NoDelete = 1,
228}
229
230/// Response for `Watch` operation.
231#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
232#[derive(Debug, Clone)]
233#[repr(transparent)]
234pub struct WatchResponse(PbWatchResponse);
235
236impl WatchResponse {
237    /// Creates a new `WatchResponse`.
238    #[inline]
239    const fn new(resp: PbWatchResponse) -> Self {
240        Self(resp)
241    }
242
243    /// Watch response header.
244    #[inline]
245    pub fn header(&self) -> Option<&ResponseHeader> {
246        self.0.header.as_ref().map(From::from)
247    }
248
249    /// Takes the header out of the response, leaving a [`None`] in its place.
250    #[inline]
251    pub fn take_header(&mut self) -> Option<ResponseHeader> {
252        self.0.header.take().map(ResponseHeader::new)
253    }
254
255    /// The ID of the watcher that corresponds to the response.
256    #[inline]
257    pub const fn watch_id(&self) -> i64 {
258        self.0.watch_id
259    }
260
261    /// created is set to true if the response is for a create watch request.
262    /// The client should record the watch_id and expect to receive events for
263    /// the created watcher from the same stream.
264    /// All events sent to the created watcher will attach with the same watch_id.
265    #[inline]
266    pub const fn created(&self) -> bool {
267        self.0.created
268    }
269
270    /// `canceled` is set to true if the response is for a cancel watch request.
271    /// No further events will be sent to the canceled watcher.
272    #[inline]
273    pub const fn canceled(&self) -> bool {
274        self.0.canceled
275    }
276
277    /// `compact_revision` is set to the minimum index if a watcher tries to watch
278    /// at a compacted index.
279    ///
280    /// This happens when creating a watcher at a compacted revision or the watcher cannot
281    /// catch up with the progress of the key-value store.
282    ///
283    /// The client should treat the watcher as canceled and should not try to create any
284    /// watcher with the same start_revision again.
285    #[inline]
286    pub const fn compact_revision(&self) -> i64 {
287        self.0.compact_revision
288    }
289
290    /// Indicates the reason for canceling the watcher.
291    #[inline]
292    pub fn cancel_reason(&self) -> &str {
293        &self.0.cancel_reason
294    }
295
296    /// Events happened on the watched keys.
297    #[inline]
298    pub fn events(&self) -> &[Event] {
299        unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
300    }
301}
302
303/// Watching event.
304#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
305#[derive(Debug, Clone)]
306#[repr(transparent)]
307pub struct Event(PbEvent);
308
309impl Event {
310    /// The kind of event. If type is a `Put`, it indicates
311    /// new data has been stored to the key. If type is a `Delete`,
312    /// it indicates the key was deleted.
313    #[inline]
314    pub fn event_type(&self) -> EventType {
315        match self.0.r#type {
316            0 => EventType::Put,
317            1 => EventType::Delete,
318            i => panic!("unknown event {i}"),
319        }
320    }
321
322    /// The KeyValue for the event.
323    /// A `Put` event contains current kv pair.
324    /// A `Put` event with `kv.version()==1` indicates the creation of a key.
325    /// A `Delete` event contains the deleted key with
326    /// its modification revision set to the revision of deletion.
327    #[inline]
328    pub fn kv(&self) -> Option<&KeyValue> {
329        self.0.kv.as_ref().map(From::from)
330    }
331
332    /// The key-value pair before the event happens.
333    #[inline]
334    pub fn prev_kv(&self) -> Option<&KeyValue> {
335        self.0.prev_kv.as_ref().map(From::from)
336    }
337}
338
339/// The watching handle.
340#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
341#[derive(Debug)]
342pub struct WatchStream {
343    request_sender: WatchRequestSender,
344    response_stream: WatchResponseStream,
345}
346
347/// The sender for sending watch requests in the existing watch stream.
348///
349/// The watch request can be sending using the [`WatchStream`] or the [`WatchRequestSender`].
350///
351/// The [`WatchRequestSender`] can be obtained by splitting the [`WatchStream`] using the
352/// [`WatchStream::split`] method.
353#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
354#[derive(Debug)]
355pub struct WatchRequestSender(Sender<WatchRequest>);
356
357/// The response stream for receiving watch responses in the existing watch stream.
358///
359/// The watch response can be receiving using the [`WatchStream`] or the [`WatchResponseStream`].
360///
361/// The [`WatchResponseStream`] can be obtained by splitting the [`WatchStream`] using the
362/// [`WatchStream::split`] method.
363#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
364#[derive(Debug)]
365pub struct WatchResponseStream(Streaming<PbWatchResponse>);
366
367impl WatchResponseStream {
368    /// Receive [`WatchResponse`] from this watch response stream.
369    ///
370    /// See also [`WatchStream::message`] for receiving watch response from the [`WatchStream`].
371    #[inline]
372    pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
373        self.0
374            .message()
375            .await
376            .map(|resp| resp.map(WatchResponse::new))
377            .map_err(From::from)
378    }
379}
380
381impl WatchStream {
382    /// Creates a new `WatchStream`.
383    #[inline]
384    const fn new(
385        request_sender: Sender<WatchRequest>,
386        response_stream: Streaming<PbWatchResponse>,
387    ) -> Self {
388        Self {
389            request_sender: WatchRequestSender(request_sender),
390            response_stream: WatchResponseStream(response_stream),
391        }
392    }
393
394    /// Send watch request in the existing watch stream.
395    #[inline]
396    pub async fn watch(
397        &mut self,
398        key: impl Into<Vec<u8>>,
399        options: Option<WatchOptions>,
400    ) -> Result<()> {
401        self.request_sender.watch(key, options).await
402    }
403
404    /// Cancels watch by specified `watch_id`.
405    #[inline]
406    pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
407        self.request_sender.cancel(watch_id).await
408    }
409
410    /// Requests a watch stream progress status be sent in the watch response stream as soon as
411    /// possible.
412    #[inline]
413    pub async fn request_progress(&mut self) -> Result<()> {
414        self.request_sender.request_progress().await
415    }
416
417    /// Receive [`WatchResponse`] from this watch stream.
418    #[inline]
419    pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
420        self.response_stream.message().await
421    }
422
423    /// Splits the watch stream into a request sender and a response receiver (stream).
424    pub fn split(self) -> (WatchRequestSender, WatchResponseStream) {
425        (self.request_sender, self.response_stream)
426    }
427}
428
429impl WatchRequestSender {
430    /// Send watch request in the existing watch stream.
431    #[inline]
432    async fn send(&mut self, req: WatchRequest) -> Result<()> {
433        self.0
434            .send(req)
435            .await
436            .map_err(|e| Error::WatchError(e.to_string()))
437    }
438
439    /// Send watch request in the existing watch stream.
440    ///
441    /// See also [`WatchStream::watch`] for sending watch request using [`WatchStream`].
442    #[inline]
443    pub async fn watch(
444        &mut self,
445        key: impl Into<Vec<u8>>,
446        options: Option<WatchOptions>,
447    ) -> Result<()> {
448        self.send(options.unwrap_or_default().with_key(key).into())
449            .await
450    }
451
452    /// Cancels watch by specified `watch_id`.
453    ///
454    ///
455    /// See also [`WatchStream::cancel`] for canceling watch using [`WatchStream`].
456    #[inline]
457    pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
458        let req = WatchCancelRequest { watch_id };
459        self.send(req.into()).await
460    }
461
462    /// Requests a watch stream progress status be sent in the watch response stream as soon as
463    /// possible.
464    ///
465    /// See also [`WatchStream::request_progress`] for requesting watch stream progress status
466    /// using [`WatchStream`].
467    #[inline]
468    pub async fn request_progress(&mut self) -> Result<()> {
469        let req = WatchProgressRequest {};
470        self.send(req.into()).await
471    }
472}
473
474impl Stream for WatchResponseStream {
475    type Item = Result<WatchResponse>;
476
477    #[inline]
478    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479        Pin::new(&mut self.get_mut().0)
480            .poll_next(cx)
481            .map(|t| match t {
482                Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
483                Some(Err(e)) => Some(Err(From::from(e))),
484                None => None,
485            })
486    }
487}
488
489impl Stream for WatchStream {
490    type Item = Result<WatchResponse>;
491
492    #[inline]
493    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
494        Pin::new(&mut self.get_mut().response_stream).poll_next(cx)
495    }
496}