1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
use tokio::sync::mpsc::UnboundedSender; use tonic::codec::Streaming; use crate::pb::{watch_request, WatchCancelRequest, WatchRequest, WatchResponse}; use crate::{error::WatchError, EtcdClientError}; pub struct Watcher { canceled: bool, watch_id: i64, req_tx: UnboundedSender<WatchRequest>, inbound: Streaming<crate::pb::WatchResponse>, } impl Watcher { pub(crate) fn new( watch_id: i64, req_tx: UnboundedSender<WatchRequest>, inbound: Streaming<crate::pb::WatchResponse>, ) -> Watcher { Watcher { canceled: false, watch_id, req_tx, inbound, } } pub async fn cancel(&mut self) -> Result<(), EtcdClientError> { if self.canceled { return Err(EtcdClientError::from(WatchError::WatchCanceled)); } let cancel_watch = watch_request::RequestUnion::CancelRequest(WatchCancelRequest { watch_id: self.watch_id, }); let cancel_req = WatchRequest { request_union: Some(cancel_watch), }; self.req_tx .try_send(cancel_req) .map_err(WatchError::WatchRequestError)?; self.canceled = true; Ok(()) } pub async fn message(&mut self) -> Result<Option<WatchResponse>, EtcdClientError> { match self.inbound.message().await? { Some(resp) => { if resp.canceled { Ok(None) } else { Ok(Some(resp)) } } None => Ok(None), } } }