1mod watch;
4
5pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse};
6
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use async_trait::async_trait;
11use futures::Stream;
12use tokio::sync::mpsc::Sender;
13use tonic::Streaming;
14
15use crate::proto::etcdserverpb;
16use crate::proto::mvccpb;
17use crate::{Error, KeyValue, Result};
18
19#[async_trait]
20pub trait WatchOp {
21 async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
22 where
23 R: Into<WatchCreateRequest> + Send;
24
25 }
29
30#[derive(Debug)]
31pub enum WatchInbound {
32 Ready(WatchResponse),
33 Interrupted(Error),
34 Closed,
35}
36
37pub struct WatchStream {
38 stream: Streaming<etcdserverpb::WatchResponse>,
39 is_closed: bool,
40}
41
42impl WatchStream {
43 pub(crate) fn new(stream: Streaming<etcdserverpb::WatchResponse>) -> Self {
44 Self {
45 stream,
46 is_closed: false,
47 }
48 }
49
50 pub async fn inbound(&mut self) -> WatchInbound {
51 if self.is_closed {
52 return WatchInbound::Closed;
53 }
54
55 match self.stream.message().await {
56 Ok(Some(resp)) => {
57 if resp.canceled {
58 self.is_closed = true;
59 }
60
61 if resp.canceled && resp.events.is_empty() {
62 WatchInbound::Closed
63 } else {
64 WatchInbound::Ready(resp.into())
65 }
66 }
67 Ok(None) => WatchInbound::Interrupted(Error::WatchEventExhausted),
68 Err(e) => WatchInbound::Interrupted(e.into()),
69 }
70 }
71}
72
73impl Stream for WatchStream {
74 type Item = WatchInbound;
75
76 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 Pin::new(&mut self.get_mut().stream)
78 .poll_next(cx)
79 .map(|e| match e {
80 Some(Ok(resp)) => Some(WatchInbound::Ready(resp.into())),
81 Some(Err(e)) => Some(WatchInbound::Interrupted(e.into())),
82 None => Some(WatchInbound::Closed),
83 })
84 }
85}
86
87pub struct WatchCanceler {
88 watch_id: i64,
89 tx: Sender<etcdserverpb::WatchRequest>,
90}
91
92impl WatchCanceler {
93 pub(crate) fn new(watch_id: i64, tx: Sender<etcdserverpb::WatchRequest>) -> Self {
94 Self { watch_id, tx }
95 }
96
97 pub async fn cancel(self) -> Result<()> {
98 self.tx
99 .send(WatchCancelRequest::new(self.watch_id).into())
100 .await
101 .map_err(|e| Error::WatchChannelSend(e))
102 }
103}
104
105#[derive(Debug, PartialEq, Clone)]
107pub enum EventType {
108 Put,
109 Delete,
110}
111
112impl From<mvccpb::event::EventType> for EventType {
113 fn from(event_type: mvccpb::event::EventType) -> Self {
114 use mvccpb::event::EventType;
115 match event_type {
116 EventType::Put => Self::Put,
117 EventType::Delete => Self::Delete,
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct Event {
125 pub event_type: EventType,
126 pub kv: KeyValue,
127 pub prev_kv: Option<KeyValue>,
128}
129
130impl From<mvccpb::Event> for Event {
131 fn from(proto: mvccpb::Event) -> Self {
132 Self {
133 event_type: match proto.r#type {
134 0 => EventType::Put,
135 _ => EventType::Delete, },
137 kv: From::from(proto.kv.expect("must fetch kv")),
138 prev_kv: proto.prev_kv.map(KeyValue::from),
139 }
140 }
141}