1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use tokio::sync::mpsc::UnboundedSender;
use tonic::codec::Streaming;

use crate::pb::{watch_request, WatchCancelRequest, WatchRequest, WatchResponse};
use crate::{error::WatchError, EtcdClientError};

pub struct Watcher {
    canceled: bool,
    watch_id: i64,
    req_tx: UnboundedSender<WatchRequest>,
    inbound: Streaming<crate::pb::WatchResponse>,
}

impl Watcher {
    pub(crate) fn new(
        watch_id: i64,
        req_tx: UnboundedSender<WatchRequest>,
        inbound: Streaming<crate::pb::WatchResponse>,
    ) -> Watcher {
        Watcher {
            canceled: false,
            watch_id,
            req_tx,
            inbound,
        }
    }

    pub async fn cancel(&mut self) -> Result<(), EtcdClientError> {
        if self.canceled {
            return Err(EtcdClientError::from(WatchError::WatchCanceled));
        }

        let cancel_watch = watch_request::RequestUnion::CancelRequest(WatchCancelRequest {
            watch_id: self.watch_id,
        });
        let cancel_req = WatchRequest {
            request_union: Some(cancel_watch),
        };

        self.req_tx
            .try_send(cancel_req)
            .map_err(WatchError::WatchRequestError)?;

        self.canceled = true;

        Ok(())
    }

    pub async fn message(&mut self) -> Result<Option<WatchResponse>, EtcdClientError> {
        match self.inbound.message().await? {
            Some(resp) => {
                if resp.canceled {
                    Ok(None)
                } else {
                    Ok(Some(resp))
                }
            }
            None => Ok(None),
        }
    }
}