1pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::error::{Error, Result};
6use crate::intercept::InterceptedChannel;
7use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
8use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
9use crate::rpc::pb::etcdserverpb::{
10 WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
11 WatchResponse as PbWatchResponse,
12};
13use crate::rpc::pb::mvccpb::Event as PbEvent;
14use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use tokio::sync::mpsc::{channel, Sender};
18use tokio_stream::{wrappers::ReceiverStream, Stream};
19use tonic::Streaming;
20
21#[repr(transparent)]
23#[derive(Clone)]
24pub struct WatchClient {
25 inner: PbWatchClient<InterceptedChannel>,
26}
27
28impl WatchClient {
29 #[inline]
31 pub(crate) fn new(channel: InterceptedChannel) -> Self {
32 let inner = PbWatchClient::new(channel);
33 Self { inner }
34 }
35
36 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
40 self.inner = self.inner.max_decoding_message_size(limit);
41 self
42 }
43
44 pub async fn watch(
52 &mut self,
53 key: impl Into<Vec<u8>>,
54 options: Option<WatchOptions>,
55 ) -> Result<WatchStream> {
56 let (request_sender, request_receiver) = channel::<WatchRequest>(100);
57 request_sender
58 .send(options.unwrap_or_default().with_key(key).into())
59 .await
60 .map_err(|e| Error::WatchError(e.to_string()))?;
61 let request_stream = ReceiverStream::new(request_receiver);
62 let response_stream = self.inner.watch(request_stream).await?.into_inner();
63 Ok(WatchStream::new(request_sender, response_stream))
64 }
65}
66
67#[derive(Debug, Default, Clone)]
69pub struct WatchOptions {
70 req: WatchCreateRequest,
71 key_range: KeyRange,
72}
73
74impl WatchOptions {
75 #[inline]
77 pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
78 self.key_range.with_key(key);
79 self
80 }
81
82 #[inline]
84 pub const fn new() -> Self {
85 Self {
86 req: WatchCreateRequest {
87 key: Vec::new(),
88 range_end: Vec::new(),
89 start_revision: 0,
90 progress_notify: false,
91 filters: Vec::new(),
92 prev_kv: false,
93 watch_id: 0,
94 fragment: false,
95 },
96 key_range: KeyRange::new(),
97 }
98 }
99
100 #[inline]
106 pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
107 self.key_range.with_range(end);
108 self
109 }
110
111 #[inline]
113 pub fn with_from_key(mut self) -> Self {
114 self.key_range.with_from_key();
115 self
116 }
117
118 #[inline]
120 pub fn with_prefix(mut self) -> Self {
121 self.key_range.with_prefix();
122 self
123 }
124
125 #[inline]
127 pub fn with_all_keys(mut self) -> Self {
128 self.key_range.with_all_keys();
129 self
130 }
131
132 #[inline]
134 pub const fn with_start_revision(mut self, revision: i64) -> Self {
135 self.req.start_revision = revision;
136 self
137 }
138
139 #[inline]
144 pub const fn with_progress_notify(mut self) -> Self {
145 self.req.progress_notify = true;
146 self
147 }
148
149 #[inline]
151 pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
152 self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
153 self
154 }
155
156 #[inline]
159 pub const fn with_prev_key(mut self) -> Self {
160 self.req.prev_kv = true;
161 self
162 }
163
164 #[inline]
170 pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
171 self.req.watch_id = watch_id;
172 self
173 }
174
175 #[inline]
177 pub const fn with_fragment(mut self) -> Self {
178 self.req.fragment = true;
179 self
180 }
181}
182
183impl From<WatchOptions> for WatchCreateRequest {
184 #[inline]
185 fn from(mut options: WatchOptions) -> Self {
186 let (key, range_end) = options.key_range.build();
187 options.req.key = key;
188 options.req.range_end = range_end;
189 options.req
190 }
191}
192
193impl From<WatchOptions> for WatchRequest {
194 #[inline]
195 fn from(options: WatchOptions) -> Self {
196 Self {
197 request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
198 }
199 }
200}
201
202impl From<WatchCancelRequest> for WatchRequest {
203 #[inline]
204 fn from(req: WatchCancelRequest) -> Self {
205 Self {
206 request_union: Some(WatchRequestUnion::CancelRequest(req)),
207 }
208 }
209}
210
211impl From<WatchProgressRequest> for WatchRequest {
212 #[inline]
213 fn from(req: WatchProgressRequest) -> Self {
214 Self {
215 request_union: Some(WatchRequestUnion::ProgressRequest(req)),
216 }
217 }
218}
219
220#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
222#[repr(i32)]
223pub enum WatchFilterType {
224 NoPut = 0,
226 NoDelete = 1,
228}
229
230#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
232#[derive(Debug, Clone)]
233#[repr(transparent)]
234pub struct WatchResponse(PbWatchResponse);
235
236impl WatchResponse {
237 #[inline]
239 const fn new(resp: PbWatchResponse) -> Self {
240 Self(resp)
241 }
242
243 #[inline]
245 pub fn header(&self) -> Option<&ResponseHeader> {
246 self.0.header.as_ref().map(From::from)
247 }
248
249 #[inline]
251 pub fn take_header(&mut self) -> Option<ResponseHeader> {
252 self.0.header.take().map(ResponseHeader::new)
253 }
254
255 #[inline]
257 pub const fn watch_id(&self) -> i64 {
258 self.0.watch_id
259 }
260
261 #[inline]
266 pub const fn created(&self) -> bool {
267 self.0.created
268 }
269
270 #[inline]
273 pub const fn canceled(&self) -> bool {
274 self.0.canceled
275 }
276
277 #[inline]
286 pub const fn compact_revision(&self) -> i64 {
287 self.0.compact_revision
288 }
289
290 #[inline]
292 pub fn cancel_reason(&self) -> &str {
293 &self.0.cancel_reason
294 }
295
296 #[inline]
298 pub fn events(&self) -> &[Event] {
299 unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
300 }
301}
302
303#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
305#[derive(Debug, Clone)]
306#[repr(transparent)]
307pub struct Event(PbEvent);
308
309impl Event {
310 #[inline]
314 pub fn event_type(&self) -> EventType {
315 match self.0.r#type {
316 0 => EventType::Put,
317 1 => EventType::Delete,
318 i => panic!("unknown event {i}"),
319 }
320 }
321
322 #[inline]
328 pub fn kv(&self) -> Option<&KeyValue> {
329 self.0.kv.as_ref().map(From::from)
330 }
331
332 #[inline]
334 pub fn prev_kv(&self) -> Option<&KeyValue> {
335 self.0.prev_kv.as_ref().map(From::from)
336 }
337}
338
339#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
341#[derive(Debug)]
342pub struct WatchStream {
343 request_sender: WatchRequestSender,
344 response_stream: WatchResponseStream,
345}
346
347#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
354#[derive(Debug)]
355pub struct WatchRequestSender(Sender<WatchRequest>);
356
357#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
364#[derive(Debug)]
365pub struct WatchResponseStream(Streaming<PbWatchResponse>);
366
367impl WatchResponseStream {
368 #[inline]
372 pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
373 self.0
374 .message()
375 .await
376 .map(|resp| resp.map(WatchResponse::new))
377 .map_err(From::from)
378 }
379}
380
381impl WatchStream {
382 #[inline]
384 const fn new(
385 request_sender: Sender<WatchRequest>,
386 response_stream: Streaming<PbWatchResponse>,
387 ) -> Self {
388 Self {
389 request_sender: WatchRequestSender(request_sender),
390 response_stream: WatchResponseStream(response_stream),
391 }
392 }
393
394 #[inline]
396 pub async fn watch(
397 &mut self,
398 key: impl Into<Vec<u8>>,
399 options: Option<WatchOptions>,
400 ) -> Result<()> {
401 self.request_sender.watch(key, options).await
402 }
403
404 #[inline]
406 pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
407 self.request_sender.cancel(watch_id).await
408 }
409
410 #[inline]
413 pub async fn request_progress(&mut self) -> Result<()> {
414 self.request_sender.request_progress().await
415 }
416
417 #[inline]
419 pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
420 self.response_stream.message().await
421 }
422
423 pub fn split(self) -> (WatchRequestSender, WatchResponseStream) {
425 (self.request_sender, self.response_stream)
426 }
427}
428
429impl WatchRequestSender {
430 #[inline]
432 async fn send(&mut self, req: WatchRequest) -> Result<()> {
433 self.0
434 .send(req)
435 .await
436 .map_err(|e| Error::WatchError(e.to_string()))
437 }
438
439 #[inline]
443 pub async fn watch(
444 &mut self,
445 key: impl Into<Vec<u8>>,
446 options: Option<WatchOptions>,
447 ) -> Result<()> {
448 self.send(options.unwrap_or_default().with_key(key).into())
449 .await
450 }
451
452 #[inline]
457 pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
458 let req = WatchCancelRequest { watch_id };
459 self.send(req.into()).await
460 }
461
462 #[inline]
468 pub async fn request_progress(&mut self) -> Result<()> {
469 let req = WatchProgressRequest {};
470 self.send(req.into()).await
471 }
472}
473
474impl Stream for WatchResponseStream {
475 type Item = Result<WatchResponse>;
476
477 #[inline]
478 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479 Pin::new(&mut self.get_mut().0)
480 .poll_next(cx)
481 .map(|t| match t {
482 Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
483 Some(Err(e)) => Some(Err(From::from(e))),
484 None => None,
485 })
486 }
487}
488
489impl Stream for WatchStream {
490 type Item = Result<WatchResponse>;
491
492 #[inline]
493 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
494 Pin::new(&mut self.get_mut().response_stream).poll_next(cx)
495 }
496}