Skip to main content

nacos_rust_client/client/naming_client/
client.rs

1use crate::client::auth::AuthActor;
2use crate::client::nacos_client::ActixSystemActorSetCmd;
3use crate::client::nacos_client::ActixSystemCmd;
4use crate::client::nacos_client::ActixSystemResult;
5use crate::client::AuthInfo;
6use crate::client::ServerEndpointInfo;
7use crate::conn_manage::manage::ConnManage;
8use crate::init_global_system_actor;
9use std::env;
10use std::sync::Arc;
11
12use super::Instance;
13use super::InstanceListener;
14use super::NamingQueryCmd;
15use super::NamingQueryResult;
16use super::QueryInstanceListParams;
17use super::ServiceInstanceKey;
18use super::{
19    InnerNamingListener, InnerNamingRegister, InnerNamingRequestClient, NamingListenerCmd,
20    NamingRegisterCmd, UdpWorker,
21};
22use crate::client::HostInfo;
23use actix::prelude::*;
24use actix::WeakAddr;
25
26pub struct NamingClient {
27    pub namespace_id: String,
28    pub(crate) register: Addr<InnerNamingRegister>,
29    pub(crate) listener_addr: Addr<InnerNamingListener>,
30    pub(crate) _conn_manage_addr: Addr<ConnManage>,
31    pub current_ip: String,
32}
33
34impl Drop for NamingClient {
35    fn drop(&mut self) {
36        self.droping();
37        //std::thread::sleep(utils::ms(50));
38    }
39}
40
41impl NamingClient {
42    pub fn new(host: HostInfo, namespace_id: String) -> Arc<Self> {
43        let use_grpc = false;
44        let current_ip = match env::var("NACOS_CLIENT_IP") {
45            Ok(v) => v,
46            Err(_) => local_ipaddress::get().unwrap_or("127.0.0.1".to_owned()),
47        };
48        let endpoint = Arc::new(ServerEndpointInfo { hosts: vec![host] });
49        let auth_actor = AuthActor::init_auth_actor(endpoint.clone(), None);
50        let conn_manage = ConnManage::new(
51            endpoint.hosts.clone(),
52            use_grpc,
53            None,
54            Default::default(),
55            Default::default(),
56            auth_actor.clone(),
57        );
58        let conn_manage_addr = conn_manage.start_at_global_system();
59        let request_client =
60            InnerNamingRequestClient::new_with_endpoint(endpoint, Some(auth_actor.clone()));
61        let addrs = Self::init_register(
62            namespace_id.clone(),
63            current_ip.clone(),
64            request_client,
65            Some(conn_manage_addr.clone().downgrade()),
66            use_grpc,
67        );
68        let r = Arc::new(Self {
69            namespace_id,
70            register: addrs.0,
71            listener_addr: addrs.1,
72            current_ip,
73            _conn_manage_addr: conn_manage_addr,
74        });
75        let system_addr = init_global_system_actor();
76        system_addr.do_send(ActixSystemActorSetCmd::LastNamingClient(r.clone()));
77        r
78    }
79
80    pub fn new_with_addrs(
81        addrs: &str,
82        namespace_id: String,
83        auth_info: Option<AuthInfo>,
84    ) -> Arc<Self> {
85        let use_grpc = false;
86        let endpoint = Arc::new(ServerEndpointInfo::new(addrs));
87        let auth_actor = AuthActor::init_auth_actor(endpoint.clone(), auth_info.clone());
88        let conn_manage = ConnManage::new(
89            endpoint.hosts.clone(),
90            use_grpc,
91            auth_info.clone(),
92            Default::default(),
93            Default::default(),
94            auth_actor.clone(),
95        );
96        let conn_manage_addr = conn_manage.start_at_global_system();
97        let request_client =
98            InnerNamingRequestClient::new_with_endpoint(endpoint, Some(auth_actor.clone()));
99        let current_ip = match env::var("NACOS_CLIENT_IP") {
100            Ok(v) => v,
101            Err(_) => local_ipaddress::get().unwrap_or("127.0.0.1".to_owned()),
102        };
103        let addrs = Self::init_register(
104            namespace_id.clone(),
105            current_ip.clone(),
106            request_client,
107            Some(conn_manage_addr.clone().downgrade()),
108            use_grpc,
109        );
110        let r = Arc::new(Self {
111            namespace_id,
112            register: addrs.0,
113            listener_addr: addrs.1,
114            current_ip,
115            _conn_manage_addr: conn_manage_addr,
116        });
117        let system_addr = init_global_system_actor();
118        system_addr.do_send(ActixSystemActorSetCmd::LastNamingClient(r.clone()));
119        r
120    }
121
122    pub(crate) fn init_register(
123        namespace_id: String,
124        client_ip: String,
125        request_client: InnerNamingRequestClient,
126        conn_manage_addr: Option<WeakAddr<ConnManage>>,
127        use_grpc: bool,
128    ) -> (Addr<InnerNamingRegister>, Addr<InnerNamingListener>) {
129        let system_addr = init_global_system_actor();
130
131        let actor = InnerNamingRegister::new(use_grpc, conn_manage_addr.clone());
132        let (tx, rx) = std::sync::mpsc::sync_channel(1);
133        let msg = ActixSystemCmd::InnerNamingRegister(actor, tx);
134        system_addr.do_send(msg);
135        let register_addr = match rx.recv().unwrap() {
136            ActixSystemResult::InnerNamingRegister(addr) => addr,
137            _ => panic!("init actor error"),
138        };
139
140        let actor = UdpWorker::new(None);
141        let (tx, rx) = std::sync::mpsc::sync_channel(1);
142        let msg = ActixSystemCmd::UdpWorker(actor, tx);
143        system_addr.do_send(msg);
144        let udp_work_addr = match rx.recv().unwrap() {
145            ActixSystemResult::UdpWorker(addr) => addr,
146            _ => panic!("init actor error"),
147        };
148
149        let actor = InnerNamingListener::new(
150            &namespace_id,
151            &client_ip,
152            0,
153            request_client,
154            udp_work_addr,
155            conn_manage_addr,
156            use_grpc,
157        );
158        let (tx, rx) = std::sync::mpsc::sync_channel(1);
159        let msg = ActixSystemCmd::InnerNamingListener(actor, tx);
160        system_addr.do_send(msg);
161        let listener_addr = match rx.recv().unwrap() {
162            ActixSystemResult::InnerNamingListener(addr) => addr,
163            _ => panic!("init actor error"),
164        };
165        (register_addr, listener_addr)
166    }
167
168    pub(crate) fn droping(&self) {
169        log::info!("NamingClient droping");
170        self.register.do_send(NamingRegisterCmd::Close);
171        self.listener_addr.do_send(NamingListenerCmd::Close);
172    }
173
174    pub fn register(&self, mut instance: Instance) {
175        instance.namespace_id = self.namespace_id.clone();
176        self.register.do_send(NamingRegisterCmd::Register(instance));
177    }
178
179    pub fn unregister(&self, mut instance: Instance) {
180        instance.namespace_id = self.namespace_id.clone();
181        self.register.do_send(NamingRegisterCmd::Remove(instance));
182    }
183
184    pub async fn query_instances(
185        &self,
186        mut params: QueryInstanceListParams,
187    ) -> anyhow::Result<Vec<Arc<Instance>>> {
188        params.namespace_id = self.namespace_id.clone();
189        let (tx, rx) = tokio::sync::oneshot::channel();
190        self.listener_addr
191            .do_send(NamingQueryCmd::QueryList(params, tx));
192        match rx.await? {
193            NamingQueryResult::List(list) => Ok(list),
194            _ => Err(anyhow::anyhow!("not found instance")),
195        }
196    }
197
198    pub async fn select_instance(
199        &self,
200        mut params: QueryInstanceListParams,
201    ) -> anyhow::Result<Arc<Instance>> {
202        params.namespace_id = self.namespace_id.clone();
203        let (tx, rx) = tokio::sync::oneshot::channel();
204        self.listener_addr
205            .do_send(NamingQueryCmd::Select(params, tx));
206        match rx.await? {
207            NamingQueryResult::One(one) => Ok(one),
208            _ => Err(anyhow::anyhow!("not found instance")),
209        }
210    }
211
212    pub async fn subscribe<T: InstanceListener + Send + 'static>(
213        &self,
214        listener: Box<T>,
215    ) -> anyhow::Result<()> {
216        let key = listener.get_key();
217        self.subscribe_with_key(key, listener).await
218    }
219
220    pub async fn subscribe_with_key<T: InstanceListener + Send + 'static>(
221        &self,
222        key: ServiceInstanceKey,
223        listener: Box<T>,
224    ) -> anyhow::Result<()> {
225        let id = 0u64;
226        //如果之前没有数据,会触发加载数据
227        let params = QueryInstanceListParams::new(
228            &self.namespace_id,
229            &key.group_name,
230            &key.service_name,
231            None,
232            true,
233        );
234        self.query_instances(params).await.ok();
235        let msg = NamingListenerCmd::Add(key, id, listener);
236        self.listener_addr.do_send(msg);
237        Ok(())
238    }
239
240    pub async fn unsubscribe(&self, key: ServiceInstanceKey) -> anyhow::Result<()> {
241        let id = 0u64;
242        let msg = NamingListenerCmd::Remove(key, id);
243        self.listener_addr.do_send(msg);
244        Ok(())
245    }
246}