1pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::error::{Error, Result};
6use crate::intercept::InterceptedChannel;
7use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
8use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
9use crate::rpc::pb::etcdserverpb::{
10 WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
11 WatchResponse as PbWatchResponse,
12};
13use crate::rpc::pb::mvccpb::Event as PbEvent;
14use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use tokio::sync::mpsc::{channel, Sender};
18use tokio_stream::{wrappers::ReceiverStream, Stream};
19use tonic::Streaming;
20
21#[repr(transparent)]
23#[derive(Clone)]
24pub struct WatchClient {
25 inner: PbWatchClient<InterceptedChannel>,
26}
27
28impl WatchClient {
29 #[inline]
31 pub(crate) fn new(channel: InterceptedChannel) -> Self {
32 let inner = PbWatchClient::new(channel);
33 Self { inner }
34 }
35
36 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
40 self.inner = self.inner.max_decoding_message_size(limit);
41 self
42 }
43
44 pub async fn watch(
52 &mut self,
53 key: impl Into<Vec<u8>>,
54 options: Option<WatchOptions>,
55 ) -> Result<WatchStream> {
56 let (request_sender, request_receiver) = channel::<WatchRequest>(100);
57 request_sender
58 .send(options.unwrap_or_default().with_key(key).into())
59 .await
60 .map_err(|e| Error::WatchError(e.to_string()))?;
61 let request_stream = ReceiverStream::new(request_receiver);
62 let response_stream = self.inner.watch(request_stream).await?.into_inner();
63 Ok(WatchStream::new(request_sender, response_stream))
64 }
65}
66
67#[derive(Debug, Default, Clone)]
69pub struct WatchOptions {
70 req: WatchCreateRequest,
71 key_range: KeyRange,
72}
73
74impl WatchOptions {
75 #[inline]
77 fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
78 self.key_range.with_key(key);
79 self
80 }
81
82 #[inline]
84 pub const fn new() -> Self {
85 Self {
86 req: WatchCreateRequest {
87 key: Vec::new(),
88 range_end: Vec::new(),
89 start_revision: 0,
90 progress_notify: false,
91 filters: Vec::new(),
92 prev_kv: false,
93 watch_id: 0,
94 fragment: false,
95 },
96 key_range: KeyRange::new(),
97 }
98 }
99
100 #[inline]
104 pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
105 self.key_range.with_range(end);
106 self
107 }
108
109 #[inline]
111 pub fn with_from_key(mut self) -> Self {
112 self.key_range.with_from_key();
113 self
114 }
115
116 #[inline]
118 pub fn with_prefix(mut self) -> Self {
119 self.key_range.with_prefix();
120 self
121 }
122
123 #[inline]
125 pub fn with_all_keys(mut self) -> Self {
126 self.key_range.with_all_keys();
127 self
128 }
129
130 #[inline]
132 pub const fn with_start_revision(mut self, revision: i64) -> Self {
133 self.req.start_revision = revision;
134 self
135 }
136
137 #[inline]
142 pub const fn with_progress_notify(mut self) -> Self {
143 self.req.progress_notify = true;
144 self
145 }
146
147 #[inline]
149 pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
150 self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
151 self
152 }
153
154 #[inline]
157 pub const fn with_prev_key(mut self) -> Self {
158 self.req.prev_kv = true;
159 self
160 }
161
162 #[inline]
168 pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
169 self.req.watch_id = watch_id;
170 self
171 }
172
173 #[inline]
175 pub const fn with_fragment(mut self) -> Self {
176 self.req.fragment = true;
177 self
178 }
179}
180
181impl From<WatchOptions> for WatchCreateRequest {
182 #[inline]
183 fn from(mut options: WatchOptions) -> Self {
184 let (key, range_end) = options.key_range.build();
185 options.req.key = key;
186 options.req.range_end = range_end;
187 options.req
188 }
189}
190
191impl From<WatchOptions> for WatchRequest {
192 #[inline]
193 fn from(options: WatchOptions) -> Self {
194 Self {
195 request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
196 }
197 }
198}
199
200impl From<WatchCancelRequest> for WatchRequest {
201 #[inline]
202 fn from(req: WatchCancelRequest) -> Self {
203 Self {
204 request_union: Some(WatchRequestUnion::CancelRequest(req)),
205 }
206 }
207}
208
209impl From<WatchProgressRequest> for WatchRequest {
210 #[inline]
211 fn from(req: WatchProgressRequest) -> Self {
212 Self {
213 request_union: Some(WatchRequestUnion::ProgressRequest(req)),
214 }
215 }
216}
217
218#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
220#[repr(i32)]
221pub enum WatchFilterType {
222 NoPut = 0,
224 NoDelete = 1,
226}
227
228#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
230#[derive(Debug, Clone)]
231#[repr(transparent)]
232pub struct WatchResponse(PbWatchResponse);
233
234impl WatchResponse {
235 #[inline]
237 const fn new(resp: PbWatchResponse) -> Self {
238 Self(resp)
239 }
240
241 #[inline]
243 pub fn header(&self) -> Option<&ResponseHeader> {
244 self.0.header.as_ref().map(From::from)
245 }
246
247 #[inline]
249 pub fn take_header(&mut self) -> Option<ResponseHeader> {
250 self.0.header.take().map(ResponseHeader::new)
251 }
252
253 #[inline]
255 pub const fn watch_id(&self) -> i64 {
256 self.0.watch_id
257 }
258
259 #[inline]
264 pub const fn created(&self) -> bool {
265 self.0.created
266 }
267
268 #[inline]
271 pub const fn canceled(&self) -> bool {
272 self.0.canceled
273 }
274
275 #[inline]
284 pub const fn compact_revision(&self) -> i64 {
285 self.0.compact_revision
286 }
287
288 #[inline]
290 pub fn cancel_reason(&self) -> &str {
291 &self.0.cancel_reason
292 }
293
294 #[inline]
296 pub fn events(&self) -> &[Event] {
297 unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
298 }
299}
300
301#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
303#[derive(Debug, Clone)]
304#[repr(transparent)]
305pub struct Event(PbEvent);
306
307impl Event {
308 #[inline]
312 pub fn event_type(&self) -> EventType {
313 match self.0.r#type {
314 0 => EventType::Put,
315 1 => EventType::Delete,
316 i => panic!("unknown event {i}"),
317 }
318 }
319
320 #[inline]
326 pub fn kv(&self) -> Option<&KeyValue> {
327 self.0.kv.as_ref().map(From::from)
328 }
329
330 #[inline]
332 pub fn prev_kv(&self) -> Option<&KeyValue> {
333 self.0.prev_kv.as_ref().map(From::from)
334 }
335}
336
337#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
339#[derive(Debug)]
340pub struct WatchStream {
341 request_stream: Sender<WatchRequest>,
342 response_stream: Streaming<PbWatchResponse>,
343}
344
345impl WatchStream {
346 #[inline]
348 const fn new(
349 request_stream: Sender<WatchRequest>,
350 response_stream: Streaming<PbWatchResponse>,
351 ) -> Self {
352 Self {
353 request_stream,
354 response_stream,
355 }
356 }
357
358 #[inline]
360 pub async fn watch(
361 &mut self,
362 key: impl Into<Vec<u8>>,
363 options: Option<WatchOptions>,
364 ) -> Result<()> {
365 self.request_stream
366 .send(options.unwrap_or_default().with_key(key).into())
367 .await
368 .map_err(|e| Error::WatchError(e.to_string()))
369 }
370
371 #[inline]
373 pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
374 let req = WatchCancelRequest { watch_id };
375 self.request_stream
376 .send(req.into())
377 .await
378 .map_err(|e| Error::WatchError(e.to_string()))
379 }
380
381 #[inline]
384 pub async fn request_progress(&mut self) -> Result<()> {
385 let req = WatchProgressRequest {};
386 self.request_stream
387 .send(req.into())
388 .await
389 .map_err(|e| Error::WatchError(e.to_string()))
390 }
391
392 #[inline]
394 pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
395 match self.response_stream.message().await? {
396 Some(resp) => Ok(Some(WatchResponse::new(resp))),
397 None => Ok(None),
398 }
399 }
400
401 pub fn split(self) -> (Sender<WatchRequest>, Streaming<PbWatchResponse>) {
403 (self.request_stream, self.response_stream)
404 }
405}
406
407impl Stream for WatchStream {
408 type Item = Result<WatchResponse>;
409
410 #[inline]
411 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
412 Pin::new(&mut self.get_mut().response_stream)
413 .poll_next(cx)
414 .map(|t| match t {
415 Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
416 Some(Err(e)) => Some(Err(From::from(e))),
417 None => None,
418 })
419 }
420}