1use async_stream::stream;
2use detector::{ServiceStatus, WatchEvent};
3use futures_core::Stream;
4use futures_util::StreamExt;
5use serde::de::DeserializeOwned;
6use std::fmt::Debug;
7use std::pin::Pin;
8
9pub struct Watcher<T1, T2>
11where
12 T1: Clone + Debug + Send,
13 T2: Clone + Debug + Send,
14{
15 stream: Pin<Box<dyn Stream<Item = Result<WatchEvent<T1, T2>, etcd_client::Error>> + Send>>,
16}
17
18impl<T1, T2> Watcher<T1, T2>
19where
20 T1: Clone + Debug + DeserializeOwned + 'static + Send,
21 T2: Clone + Debug + for<'a> TryFrom<&'a [u8]> + 'static + Send,
22 for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
23{
24 pub(crate) fn new(
25 init: Vec<T1>,
26 _watcher: etcd_client::Watcher,
27 watch_stream: etcd_client::WatchStream,
28 ) -> Self {
29 let stream = Box::pin(stream! {
30 for i in init {
31 yield Ok(WatchEvent::Changed(i));
32 }
33
34 let _watcher = _watcher;
35 let mut watch_stream = watch_stream;
36 loop {
37 match watch_stream.message().await {
38 Err(e) => yield Err(e),
39 Ok(resp) => {
40 match resp {
41 None => yield Err(etcd_client::Error::WatchError(String::from("stream closed"))),
42 Some(resp) => {
43 for event in resp.events() {
44 if event.kv().is_none() {
45 continue;
46 }
47
48 let kv = event.kv().unwrap();
49 match event.event_type() {
50 etcd_client::EventType::Put => {
51 if let Ok(r) = serde_json::from_slice::<T1>(kv.value()) {
52 yield Ok(WatchEvent::Changed(r));
53 }
54 },
55 etcd_client::EventType::Delete => {
56 if let Ok(r) = T2::try_from(kv.key()) {
57 yield Ok(WatchEvent::Deleted(r));
58 }
59 }
60 }
61 }
62 }
63 }
64 }
65 }
66 }
67 });
68
69 Self { stream }
70 }
71
72 pub async fn event(&mut self) -> Result<WatchEvent<T1, T2>, etcd_client::Error> {
74 if let Some(e) = self.stream.next().await {
75 e
76 } else {
77 panic!("error: should not return none");
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
84pub struct Oneself {
85 receiver: tokio::sync::watch::Receiver<ServiceStatus>,
86}
87
88impl Oneself {
89 pub(crate) fn new(receiver: tokio::sync::watch::Receiver<ServiceStatus>) -> Self {
90 Self { receiver }
91 }
92
93 pub fn status(&self) -> ServiceStatus {
95 *self.receiver.borrow()
96 }
97
98 pub async fn changed(&mut self) -> Result<ServiceStatus, ()> {
100 let _ = self.receiver.changed().await.map_err(|_| ())?;
101 let status = *self.receiver.borrow();
102 Ok(status)
103 }
104}