1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/*
 * @Author: plucky
 * @Date: 2023-11-06 15:04:59
 * @LastEditTime: 2023-11-10 16:36:42
 */

use std::{collections::HashMap, sync::{RwLock, Arc}, time::Duration, str::FromStr};

use etcd_client::*;
use tokio::sync::mpsc::Sender;
use tonic::transport::{Channel, Endpoint};
use tracing::info;
use tower::discover::Change;

/// etcd 服务发现
#[allow(dead_code)]
pub struct EtcdDiscovery {
    etcd_client: Client,
    service_map: Arc<RwLock<HashMap<String, Channel>>>,
    // 所有服务的channel
    all_channel: Channel,
    // 通知all_channel服务变化
    rx: Sender<Change<String, Endpoint>>,
}

impl EtcdDiscovery {
    /// 获取所有服务的channel, 负载均衡的channel
    pub fn get_all_channel(&self) -> Channel {
        self.all_channel.clone()
    }

    /// 获取所有服务的map
    pub fn get_service_map(&self) -> Arc<RwLock<HashMap<String, Channel>>> {
        self.service_map.clone()
    }

    /// 获取etcd客户端
    pub fn get_etcd_client(&self) -> Client {
        self.etcd_client.clone()
    }
}

impl EtcdDiscovery {
    /// 用etcd_client创建服务发现
    pub fn new(client: Client) -> Self {
        let (channel, rx) = Channel::balance_channel(1024);
        
        Self {
            etcd_client: client,
            service_map: Arc::new(RwLock::new(HashMap::new())),
            all_channel: channel,
            rx,
        }
    }

    /// 连接etcd, 创建服务发现
    pub async fn connect(etcd_addr: impl AsRef<[&str]>, options: Option<ConnectOptions>) -> Result<Self, Error> {
        let client = Client::connect(etcd_addr, options).await?;
        info!("etcd connect success");
        Ok(Self::new(client))
    }
    
    /// 服务发现, prefix为服务前缀, 例如: /hello, 发现前缀/hello的所有服务
    pub async fn service_discover(&mut self, prefix: &str) -> Result<(), Error> {
        let opt = Some(GetOptions::new().with_prefix());
        let resp = self.etcd_client.get(prefix, opt).await?;
        for kv in resp.kvs() {
            let key = kv.key_str().unwrap_or_default();
            let value = kv.value_str().unwrap_or_default();
            info!("put key: {} value: {}", key, value);
            self.add_service(key, value).await;
        }

        let opt = Some(WatchOptions::new().with_prefix());
        let (mut watcher, mut stream) = self.etcd_client.watch(prefix, opt).await?;
        let service_map = self.service_map.clone();
        let rx = self.rx.clone();

        tokio::spawn(async move {
            while let Some(resp) = stream.message().await.unwrap() {
                for event in resp.events() {
                    match event.event_type() {
                        etcd_client::EventType::Put => {
                            if let Some(kv) = event.kv(){
                                let key = kv.key_str().unwrap_or_default();
                                let value = kv.value_str().unwrap_or_default();
                                info!("watch put key: {} value: {}", key, value);
                                if key.is_empty() {
                                    continue
                                }
                                Self::add_service_map(&rx,&service_map, key, value).await;
                            }
                            
                        }
                        etcd_client::EventType::Delete => {
                            if let Some(kv) = event.kv(){
                                let key = kv.key_str().unwrap_or_default();
                                info!("watch delete key: {}", key);
                                Self::remove_service_map(&rx,&service_map, key).await;
                            }
                        }
                    }
                }
            }
            watcher.cancel().await.unwrap();

        });


        Ok(())
    }

    /// 获取一个服务的channel, 例如: /hello/1
    pub fn get_service(&self, key: impl AsRef<str>) -> Option<Channel> {
        self.service_map.read().unwrap().get(key.as_ref()).cloned()
    }
    
    pub fn remove_service(&mut self, key: impl AsRef<str>) -> Option<Channel> {
        self.service_map.write().unwrap().remove(key.as_ref())
    }

    pub async fn add_service(&self, key: impl AsRef<str>, url: &str) {
        Self::add_service_map(&self.rx, &self.service_map, key.as_ref(), url).await;
        
    }

    #[inline]
    async fn new_channel(uri: &str, timeout: u64) -> Result<Endpoint, tonic::transport::Error>{
        Ok(Endpoint::from_str(uri)?
            .timeout(Duration::from_secs(timeout))
            )
        
    }

    #[inline]
    async fn add_service_map(rx: &Sender<Change<String, Endpoint>>, service_map: &RwLock<HashMap<String, Channel>>, key: impl Into<String>, value: &str) {
        let key = key.into();
        if let Ok(channel) = Self::new_channel(value, 10).await {
            service_map
                .write()
                .unwrap()
                .insert(key.clone(), channel.connect_lazy());
            rx.try_send(Change::Insert(key, channel)).unwrap();

        }
        else {
            tracing::info!("connect error: {:?}", value);
        }
        
    }
    async fn remove_service_map(rx: &Sender<Change<String, Endpoint>>, service_map: &RwLock<HashMap<String, Channel>>, key: impl AsRef<str>) {
        service_map.write().unwrap().remove(key.as_ref());
        rx.try_send(Change::Remove(key.as_ref().into())).unwrap();
    }

}



#[cfg(test)]
mod tests {
    use etcd_client::*;
    use crate::etcd_discovery::EtcdDiscovery;

    #[tokio::test]
    async fn test_discovery() -> Result<(), etcd_client::Error> {
        tracing_subscriber::fmt().init();
        let opt = ConnectOptions::new().with_user("root", "789789");
        let mut discover = EtcdDiscovery::connect(["127.0.0.1:2379"], Some(opt)).await?;
        discover.service_discover("/hello/1").await?;

        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;

        Ok(())
    
    }

}