inve_etcd/watch/
mod.rs

1mod watch;
2
3pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse};
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use async_trait::async_trait;
9use futures::Stream;
10use tokio::sync::mpsc::Sender;
11use tonic::Streaming;
12
13use crate::proto::etcdserverpb;
14use crate::proto::mvccpb;
15use crate::{Error, KeyValue, Result};
16
17#[async_trait]
18pub trait WatchOp {
19    async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
20    where
21        R: Into<WatchCreateRequest> + Send;
22}
23
24#[derive(Debug)]
25pub enum WatchInbound {
26    Ready(WatchResponse),
27    Interrupted(Error),
28    Closed,
29}
30
31pub struct WatchStream {
32    stream: Streaming<etcdserverpb::WatchResponse>,
33    is_closed: bool,
34}
35
36impl WatchStream {
37    pub(crate) fn new(stream: Streaming<etcdserverpb::WatchResponse>) -> Self {
38        Self {
39            stream,
40            is_closed: false,
41        }
42    }
43
44    pub async fn inbound(&mut self) -> WatchInbound {
45        if self.is_closed {
46            return WatchInbound::Closed;
47        }
48
49        match self.stream.message().await {
50            Ok(Some(resp)) => {
51                if resp.canceled {
52                    self.is_closed = true;
53                }
54
55                if resp.canceled && resp.events.is_empty() {
56                    WatchInbound::Closed
57                } else {
58                    WatchInbound::Ready(resp.into())
59                }
60            }
61            Ok(None) => WatchInbound::Interrupted(Error::WatchEventExhausted),
62            Err(e) => WatchInbound::Interrupted(e.into()),
63        }
64    }
65}
66
67impl Stream for WatchStream {
68    type Item = WatchInbound;
69
70    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        Pin::new(&mut self.get_mut().stream)
72            .poll_next(cx)
73            .map(|e| match e {
74                Some(Ok(resp)) => Some(WatchInbound::Ready(resp.into())),
75                Some(Err(e)) => Some(WatchInbound::Interrupted(e.into())),
76                None => Some(WatchInbound::Closed),
77            })
78    }
79}
80
81pub struct WatchCanceler {
82    watch_id: i64,
83    tx: Sender<etcdserverpb::WatchRequest>,
84}
85
86impl WatchCanceler {
87    pub(crate) fn new(watch_id: i64, tx: Sender<etcdserverpb::WatchRequest>) -> Self {
88        Self { watch_id, tx }
89    }
90
91    pub async fn cancel(self) -> Result<()> {
92        self.tx
93            .send(WatchCancelRequest::new(self.watch_id).into())
94            .await
95            .map_err(|e| Error::WatchChannelSend(e))
96    }
97}
98
99#[derive(Debug, PartialEq, Clone)]
100pub enum EventType {
101    Put,
102    Delete,
103}
104
105impl From<mvccpb::event::EventType> for EventType {
106    fn from(event_type: mvccpb::event::EventType) -> Self {
107        use mvccpb::event::EventType;
108        match event_type {
109            EventType::Put => Self::Put,
110            EventType::Delete => Self::Delete,
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct Event {
117    pub event_type: EventType,
118    pub kv: KeyValue,
119}
120
121impl From<mvccpb::Event> for Event {
122    fn from(proto: mvccpb::Event) -> Self {
123        Self {
124            event_type: match proto.r#type {
125                0 => EventType::Put,
126                _ => EventType::Delete,
127            },
128            kv: From::from(proto.kv.expect("must fetch kv")),
129        }
130    }
131}