etcd_detector/
watch.rs

1use async_stream::stream;
2use detector::{ServiceStatus, WatchEvent};
3use futures_core::Stream;
4use futures_util::StreamExt;
5use serde::de::DeserializeOwned;
6use std::fmt::Debug;
7use std::pin::Pin;
8
9/// 监听事件
10pub struct Watcher<T1, T2>
11where
12    T1: Clone + Debug + Send,
13    T2: Clone + Debug + Send,
14{
15    stream: Pin<Box<dyn Stream<Item = Result<WatchEvent<T1, T2>, etcd_client::Error>> + Send>>,
16}
17
18impl<T1, T2> Watcher<T1, T2>
19where
20    T1: Clone + Debug + DeserializeOwned + 'static + Send,
21    T2: Clone + Debug + for<'a> TryFrom<&'a [u8]> + 'static + Send,
22    for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
23{
24    pub(crate) fn new(
25        init: Vec<T1>,
26        _watcher: etcd_client::Watcher,
27        watch_stream: etcd_client::WatchStream,
28    ) -> Self {
29        let stream = Box::pin(stream! {
30            for i in init {
31                yield Ok(WatchEvent::Changed(i));
32            }
33
34            let _watcher = _watcher;
35            let mut watch_stream = watch_stream;
36            loop {
37                match watch_stream.message().await {
38                    Err(e) => yield Err(e),
39                    Ok(resp) => {
40                        match resp {
41                            None => yield Err(etcd_client::Error::WatchError(String::from("stream closed"))),
42                            Some(resp) => {
43                                for event in resp.events() {
44                                    if event.kv().is_none() {
45                                        continue;
46                                    }
47
48                                    let kv = event.kv().unwrap();
49                                    match event.event_type() {
50                                        etcd_client::EventType::Put => {
51                                            if let Ok(r) = serde_json::from_slice::<T1>(kv.value()) {
52                                                yield Ok(WatchEvent::Changed(r));
53                                            }
54                                        },
55                                        etcd_client::EventType::Delete => {
56                                            if let Ok(r) = T2::try_from(kv.key()) {
57                                                yield Ok(WatchEvent::Deleted(r));
58                                            }
59                                        }
60                                    }
61                                }
62                            }
63                        }
64                    }
65                }
66            }
67        });
68
69        Self { stream }
70    }
71
72    /// 获取监听的事件
73    pub async fn event(&mut self) -> Result<WatchEvent<T1, T2>, etcd_client::Error> {
74        if let Some(e) = self.stream.next().await {
75            e
76        } else {
77            panic!("error: should not return none");
78        }
79    }
80}
81
82/// 监听注册状态
83#[derive(Debug, Clone)]
84pub struct Oneself {
85    receiver: tokio::sync::watch::Receiver<ServiceStatus>,
86}
87
88impl Oneself {
89    pub(crate) fn new(receiver: tokio::sync::watch::Receiver<ServiceStatus>) -> Self {
90        Self { receiver }
91    }
92
93    /// 查看最新的状态值
94    pub fn status(&self) -> ServiceStatus {
95        *self.receiver.borrow()
96    }
97
98    /// 返回失败则说明通道被关闭了
99    pub async fn changed(&mut self) -> Result<ServiceStatus, ()> {
100        let _ = self.receiver.changed().await.map_err(|_| ())?;
101        let status = *self.receiver.borrow();
102        Ok(status)
103    }
104}