ya_etcd_rs/watch/
mod.rs

1//! The Watch API provides an event-based interface for asynchronously monitoring changes to keys.
2
3mod watch;
4
5pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse};
6
7use std::{
8    future::Future,
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13use futures::Stream;
14use tokio::sync::mpsc::Sender;
15use tonic::Streaming;
16
17use crate::proto::etcdserverpb;
18use crate::proto::mvccpb;
19use crate::{Error, KeyValue, Result};
20
21pub trait WatchOp {
22    fn watch<R>(&self, req: R) -> impl Future<Output = Result<(WatchStream, WatchCanceler)>>
23    where
24        R: Into<WatchCreateRequest> + Send;
25
26    // TODO: cancel_watch
27    // async fn cancel_watch<R>(&self, req: R) -> impl Future<Output = Result<()>>
28    // where
29    //     R: Into<WatchCancelRequest> + Send;
30}
31
32#[derive(Debug)]
33pub enum WatchInbound {
34    Ready(WatchResponse),
35    Interrupted(Error),
36    Closed,
37}
38
39pub struct WatchStream {
40    stream: Streaming<etcdserverpb::WatchResponse>,
41    is_closed: bool,
42}
43
44impl WatchStream {
45    pub(crate) fn new(stream: Streaming<etcdserverpb::WatchResponse>) -> Self {
46        Self {
47            stream,
48            is_closed: false,
49        }
50    }
51
52    pub async fn inbound(&mut self) -> WatchInbound {
53        if self.is_closed {
54            return WatchInbound::Closed;
55        }
56
57        match self.stream.message().await {
58            Ok(Some(resp)) => {
59                if resp.canceled {
60                    self.is_closed = true;
61                }
62
63                if resp.canceled && resp.events.is_empty() {
64                    WatchInbound::Closed
65                } else {
66                    WatchInbound::Ready(resp.into())
67                }
68            }
69            Ok(None) => WatchInbound::Interrupted(Error::WatchEventExhausted),
70            Err(e) => WatchInbound::Interrupted(e.into()),
71        }
72    }
73}
74
75impl Stream for WatchStream {
76    type Item = WatchInbound;
77
78    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        Pin::new(&mut self.get_mut().stream)
80            .poll_next(cx)
81            .map(|e| match e {
82                Some(Ok(resp)) => Some(WatchInbound::Ready(resp.into())),
83                Some(Err(e)) => Some(WatchInbound::Interrupted(e.into())),
84                None => Some(WatchInbound::Closed),
85            })
86    }
87}
88
89pub struct WatchCanceler {
90    watch_id: i64,
91    tx: Sender<etcdserverpb::WatchRequest>,
92}
93
94impl WatchCanceler {
95    pub(crate) fn new(watch_id: i64, tx: Sender<etcdserverpb::WatchRequest>) -> Self {
96        Self { watch_id, tx }
97    }
98
99    pub async fn cancel(self) -> Result<()> {
100        self.tx
101            .send(WatchCancelRequest::new(self.watch_id).into())
102            .await
103            .map_err(Error::WatchChannelSend)
104    }
105}
106
107/// The kind of event.
108#[derive(Debug, PartialEq, Clone)]
109pub enum EventType {
110    Put,
111    Delete,
112}
113
114impl From<mvccpb::event::EventType> for EventType {
115    fn from(event_type: mvccpb::event::EventType) -> Self {
116        use mvccpb::event::EventType;
117        match event_type {
118            EventType::Put => Self::Put,
119            EventType::Delete => Self::Delete,
120        }
121    }
122}
123
124/// Every change to every key is represented with Event messages.
125#[derive(Debug, Clone)]
126pub struct Event {
127    pub event_type: EventType,
128    pub kv: KeyValue,
129    pub prev_kv: Option<KeyValue>,
130}
131
132impl From<mvccpb::Event> for Event {
133    fn from(proto: mvccpb::Event) -> Self {
134        Self {
135            event_type: match proto.r#type {
136                0 => EventType::Put,
137                _ => EventType::Delete, // FIXME: assert valid event type
138            },
139            kv: From::from(proto.kv.expect("must fetch kv")),
140            prev_kv: proto.prev_kv.map(KeyValue::from),
141        }
142    }
143}