1mod watch;
4
5pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse};
6
7use std::{
8 future::Future,
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13use futures::Stream;
14use tokio::sync::mpsc::Sender;
15use tonic::Streaming;
16
17use crate::proto::etcdserverpb;
18use crate::proto::mvccpb;
19use crate::{Error, KeyValue, Result};
20
21pub trait WatchOp {
22 fn watch<R>(&self, req: R) -> impl Future<Output = Result<(WatchStream, WatchCanceler)>>
23 where
24 R: Into<WatchCreateRequest> + Send;
25
26 }
31
32#[derive(Debug)]
33pub enum WatchInbound {
34 Ready(WatchResponse),
35 Interrupted(Error),
36 Closed,
37}
38
39pub struct WatchStream {
40 stream: Streaming<etcdserverpb::WatchResponse>,
41 is_closed: bool,
42}
43
44impl WatchStream {
45 pub(crate) fn new(stream: Streaming<etcdserverpb::WatchResponse>) -> Self {
46 Self {
47 stream,
48 is_closed: false,
49 }
50 }
51
52 pub async fn inbound(&mut self) -> WatchInbound {
53 if self.is_closed {
54 return WatchInbound::Closed;
55 }
56
57 match self.stream.message().await {
58 Ok(Some(resp)) => {
59 if resp.canceled {
60 self.is_closed = true;
61 }
62
63 if resp.canceled && resp.events.is_empty() {
64 WatchInbound::Closed
65 } else {
66 WatchInbound::Ready(resp.into())
67 }
68 }
69 Ok(None) => WatchInbound::Interrupted(Error::WatchEventExhausted),
70 Err(e) => WatchInbound::Interrupted(e.into()),
71 }
72 }
73}
74
75impl Stream for WatchStream {
76 type Item = WatchInbound;
77
78 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 Pin::new(&mut self.get_mut().stream)
80 .poll_next(cx)
81 .map(|e| match e {
82 Some(Ok(resp)) => Some(WatchInbound::Ready(resp.into())),
83 Some(Err(e)) => Some(WatchInbound::Interrupted(e.into())),
84 None => Some(WatchInbound::Closed),
85 })
86 }
87}
88
89pub struct WatchCanceler {
90 watch_id: i64,
91 tx: Sender<etcdserverpb::WatchRequest>,
92}
93
94impl WatchCanceler {
95 pub(crate) fn new(watch_id: i64, tx: Sender<etcdserverpb::WatchRequest>) -> Self {
96 Self { watch_id, tx }
97 }
98
99 pub async fn cancel(self) -> Result<()> {
100 self.tx
101 .send(WatchCancelRequest::new(self.watch_id).into())
102 .await
103 .map_err(Error::WatchChannelSend)
104 }
105}
106
107#[derive(Debug, PartialEq, Clone)]
109pub enum EventType {
110 Put,
111 Delete,
112}
113
114impl From<mvccpb::event::EventType> for EventType {
115 fn from(event_type: mvccpb::event::EventType) -> Self {
116 use mvccpb::event::EventType;
117 match event_type {
118 EventType::Put => Self::Put,
119 EventType::Delete => Self::Delete,
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct Event {
127 pub event_type: EventType,
128 pub kv: KeyValue,
129 pub prev_kv: Option<KeyValue>,
130}
131
132impl From<mvccpb::Event> for Event {
133 fn from(proto: mvccpb::Event) -> Self {
134 Self {
135 event_type: match proto.r#type {
136 0 => EventType::Put,
137 _ => EventType::Delete, },
139 kv: From::from(proto.kv.expect("must fetch kv")),
140 prev_kv: proto.prev_kv.map(KeyValue::from),
141 }
142 }
143}