nacos_rust_client/client/naming_client/
client.rs1use crate::client::auth::AuthActor;
2use crate::client::nacos_client::ActixSystemActorSetCmd;
3use crate::client::nacos_client::ActixSystemCmd;
4use crate::client::nacos_client::ActixSystemResult;
5use crate::client::AuthInfo;
6use crate::client::ServerEndpointInfo;
7use crate::conn_manage::manage::ConnManage;
8use crate::init_global_system_actor;
9use std::env;
10use std::sync::Arc;
11
12use super::Instance;
13use super::InstanceListener;
14use super::NamingQueryCmd;
15use super::NamingQueryResult;
16use super::QueryInstanceListParams;
17use super::ServiceInstanceKey;
18use super::{
19 InnerNamingListener, InnerNamingRegister, InnerNamingRequestClient, NamingListenerCmd,
20 NamingRegisterCmd, UdpWorker,
21};
22use crate::client::HostInfo;
23use actix::prelude::*;
24use actix::WeakAddr;
25
26pub struct NamingClient {
27 pub namespace_id: String,
28 pub(crate) register: Addr<InnerNamingRegister>,
29 pub(crate) listener_addr: Addr<InnerNamingListener>,
30 pub(crate) _conn_manage_addr: Addr<ConnManage>,
31 pub current_ip: String,
32}
33
34impl Drop for NamingClient {
35 fn drop(&mut self) {
36 self.droping();
37 }
39}
40
41impl NamingClient {
42 pub fn new(host: HostInfo, namespace_id: String) -> Arc<Self> {
43 let use_grpc = false;
44 let current_ip = match env::var("NACOS_CLIENT_IP") {
45 Ok(v) => v,
46 Err(_) => local_ipaddress::get().unwrap_or("127.0.0.1".to_owned()),
47 };
48 let endpoint = Arc::new(ServerEndpointInfo { hosts: vec![host] });
49 let auth_actor = AuthActor::init_auth_actor(endpoint.clone(), None);
50 let conn_manage = ConnManage::new(
51 endpoint.hosts.clone(),
52 use_grpc,
53 None,
54 Default::default(),
55 Default::default(),
56 auth_actor.clone(),
57 );
58 let conn_manage_addr = conn_manage.start_at_global_system();
59 let request_client =
60 InnerNamingRequestClient::new_with_endpoint(endpoint, Some(auth_actor.clone()));
61 let addrs = Self::init_register(
62 namespace_id.clone(),
63 current_ip.clone(),
64 request_client,
65 Some(conn_manage_addr.clone().downgrade()),
66 use_grpc,
67 );
68 let r = Arc::new(Self {
69 namespace_id,
70 register: addrs.0,
71 listener_addr: addrs.1,
72 current_ip,
73 _conn_manage_addr: conn_manage_addr,
74 });
75 let system_addr = init_global_system_actor();
76 system_addr.do_send(ActixSystemActorSetCmd::LastNamingClient(r.clone()));
77 r
78 }
79
80 pub fn new_with_addrs(
81 addrs: &str,
82 namespace_id: String,
83 auth_info: Option<AuthInfo>,
84 ) -> Arc<Self> {
85 let use_grpc = false;
86 let endpoint = Arc::new(ServerEndpointInfo::new(addrs));
87 let auth_actor = AuthActor::init_auth_actor(endpoint.clone(), auth_info.clone());
88 let conn_manage = ConnManage::new(
89 endpoint.hosts.clone(),
90 use_grpc,
91 auth_info.clone(),
92 Default::default(),
93 Default::default(),
94 auth_actor.clone(),
95 );
96 let conn_manage_addr = conn_manage.start_at_global_system();
97 let request_client =
98 InnerNamingRequestClient::new_with_endpoint(endpoint, Some(auth_actor.clone()));
99 let current_ip = match env::var("NACOS_CLIENT_IP") {
100 Ok(v) => v,
101 Err(_) => local_ipaddress::get().unwrap_or("127.0.0.1".to_owned()),
102 };
103 let addrs = Self::init_register(
104 namespace_id.clone(),
105 current_ip.clone(),
106 request_client,
107 Some(conn_manage_addr.clone().downgrade()),
108 use_grpc,
109 );
110 let r = Arc::new(Self {
111 namespace_id,
112 register: addrs.0,
113 listener_addr: addrs.1,
114 current_ip,
115 _conn_manage_addr: conn_manage_addr,
116 });
117 let system_addr = init_global_system_actor();
118 system_addr.do_send(ActixSystemActorSetCmd::LastNamingClient(r.clone()));
119 r
120 }
121
122 pub(crate) fn init_register(
123 namespace_id: String,
124 client_ip: String,
125 request_client: InnerNamingRequestClient,
126 conn_manage_addr: Option<WeakAddr<ConnManage>>,
127 use_grpc: bool,
128 ) -> (Addr<InnerNamingRegister>, Addr<InnerNamingListener>) {
129 let system_addr = init_global_system_actor();
130
131 let actor = InnerNamingRegister::new(use_grpc, conn_manage_addr.clone());
132 let (tx, rx) = std::sync::mpsc::sync_channel(1);
133 let msg = ActixSystemCmd::InnerNamingRegister(actor, tx);
134 system_addr.do_send(msg);
135 let register_addr = match rx.recv().unwrap() {
136 ActixSystemResult::InnerNamingRegister(addr) => addr,
137 _ => panic!("init actor error"),
138 };
139
140 let actor = UdpWorker::new(None);
141 let (tx, rx) = std::sync::mpsc::sync_channel(1);
142 let msg = ActixSystemCmd::UdpWorker(actor, tx);
143 system_addr.do_send(msg);
144 let udp_work_addr = match rx.recv().unwrap() {
145 ActixSystemResult::UdpWorker(addr) => addr,
146 _ => panic!("init actor error"),
147 };
148
149 let actor = InnerNamingListener::new(
150 &namespace_id,
151 &client_ip,
152 0,
153 request_client,
154 udp_work_addr,
155 conn_manage_addr,
156 use_grpc,
157 );
158 let (tx, rx) = std::sync::mpsc::sync_channel(1);
159 let msg = ActixSystemCmd::InnerNamingListener(actor, tx);
160 system_addr.do_send(msg);
161 let listener_addr = match rx.recv().unwrap() {
162 ActixSystemResult::InnerNamingListener(addr) => addr,
163 _ => panic!("init actor error"),
164 };
165 (register_addr, listener_addr)
166 }
167
168 pub(crate) fn droping(&self) {
169 log::info!("NamingClient droping");
170 self.register.do_send(NamingRegisterCmd::Close);
171 self.listener_addr.do_send(NamingListenerCmd::Close);
172 }
173
174 pub fn register(&self, mut instance: Instance) {
175 instance.namespace_id = self.namespace_id.clone();
176 self.register.do_send(NamingRegisterCmd::Register(instance));
177 }
178
179 pub fn unregister(&self, mut instance: Instance) {
180 instance.namespace_id = self.namespace_id.clone();
181 self.register.do_send(NamingRegisterCmd::Remove(instance));
182 }
183
184 pub async fn query_instances(
185 &self,
186 mut params: QueryInstanceListParams,
187 ) -> anyhow::Result<Vec<Arc<Instance>>> {
188 params.namespace_id = self.namespace_id.clone();
189 let (tx, rx) = tokio::sync::oneshot::channel();
190 self.listener_addr
191 .do_send(NamingQueryCmd::QueryList(params, tx));
192 match rx.await? {
193 NamingQueryResult::List(list) => Ok(list),
194 _ => Err(anyhow::anyhow!("not found instance")),
195 }
196 }
197
198 pub async fn select_instance(
199 &self,
200 mut params: QueryInstanceListParams,
201 ) -> anyhow::Result<Arc<Instance>> {
202 params.namespace_id = self.namespace_id.clone();
203 let (tx, rx) = tokio::sync::oneshot::channel();
204 self.listener_addr
205 .do_send(NamingQueryCmd::Select(params, tx));
206 match rx.await? {
207 NamingQueryResult::One(one) => Ok(one),
208 _ => Err(anyhow::anyhow!("not found instance")),
209 }
210 }
211
212 pub async fn subscribe<T: InstanceListener + Send + 'static>(
213 &self,
214 listener: Box<T>,
215 ) -> anyhow::Result<()> {
216 let key = listener.get_key();
217 self.subscribe_with_key(key, listener).await
218 }
219
220 pub async fn subscribe_with_key<T: InstanceListener + Send + 'static>(
221 &self,
222 key: ServiceInstanceKey,
223 listener: Box<T>,
224 ) -> anyhow::Result<()> {
225 let id = 0u64;
226 let params = QueryInstanceListParams::new(
228 &self.namespace_id,
229 &key.group_name,
230 &key.service_name,
231 None,
232 true,
233 );
234 self.query_instances(params).await.ok();
235 let msg = NamingListenerCmd::Add(key, id, listener);
236 self.listener_addr.do_send(msg);
237 Ok(())
238 }
239
240 pub async fn unsubscribe(&self, key: ServiceInstanceKey) -> anyhow::Result<()> {
241 let id = 0u64;
242 let msg = NamingListenerCmd::Remove(key, id);
243 self.listener_addr.do_send(msg);
244 Ok(())
245 }
246}