1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, info, warn};
8
9use crate::api::naming::{
10 Instance, InstanceRequest, InstanceResponse, QueryServiceResponse,
11 ServiceListRequest, ServiceListResponse, ServiceQueryRequest, SubscribeServiceRequest,
12 SubscribeServiceResponse,
13};
14use crate::common::DEFAULT_GROUP;
15use crate::error::{BatataError, Result};
16use crate::naming::{
17 CallbackServiceListener, ServiceChangeEvent, ServiceInfoCache, ServiceListener,
18 SubscriberRegistry,
19};
20use crate::remote::RpcClient;
21
22pub struct NamingService {
24 rpc_client: Arc<RpcClient>,
26
27 cache: Arc<ServiceInfoCache>,
29
30 subscribers: Arc<SubscriberRegistry>,
32
33 namespace: String,
35
36 group_name: String,
38
39 started: Arc<RwLock<bool>>,
41
42 heartbeat_task: Arc<RwLock<Option<JoinHandle<()>>>>,
44
45 shutdown: Arc<Notify>,
47
48 registered_instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
50}
51
52impl NamingService {
53 pub fn new(rpc_client: Arc<RpcClient>, namespace: &str) -> Self {
55 Self {
56 rpc_client,
57 cache: Arc::new(ServiceInfoCache::new()),
58 subscribers: Arc::new(SubscriberRegistry::new()),
59 namespace: namespace.to_string(),
60 group_name: DEFAULT_GROUP.to_string(),
61 started: Arc::new(RwLock::new(false)),
62 heartbeat_task: Arc::new(RwLock::new(None)),
63 shutdown: Arc::new(Notify::new()),
64 registered_instances: Arc::new(RwLock::new(Vec::new())),
65 }
66 }
67
68 pub fn with_group(mut self, group_name: &str) -> Self {
70 self.group_name = group_name.to_string();
71 self
72 }
73
74 pub async fn start(&self) -> Result<()> {
76 if *self.started.read() {
77 return Err(BatataError::ClientAlreadyStarted);
78 }
79
80 *self.started.write() = true;
81
82 let instances = self.registered_instances.clone();
84 let rpc_client = self.rpc_client.clone();
85 let namespace = self.namespace.clone();
86 let shutdown = self.shutdown.clone();
87
88 let handle = tokio::spawn(async move {
89 Self::heartbeat_loop(instances, rpc_client, namespace, shutdown).await;
90 });
91
92 *self.heartbeat_task.write() = Some(handle);
93
94 info!("NamingService started");
95 Ok(())
96 }
97
98 pub async fn stop(&self) {
100 *self.started.write() = false;
101 self.shutdown.notify_one();
102
103 if let Some(handle) = self.heartbeat_task.write().take() {
104 handle.abort();
105 }
106
107 let instances = self.registered_instances.read().clone();
109 for (service_name, group_name, instance) in instances {
110 if let Err(e) = self.deregister_instance(&service_name, &group_name, instance).await {
111 warn!("Failed to deregister instance on shutdown: {}", e);
112 }
113 }
114
115 info!("NamingService stopped");
116 }
117
118 pub async fn register_instance(
120 &self,
121 service_name: &str,
122 group_name: &str,
123 instance: Instance,
124 ) -> Result<()> {
125 let group_name = if group_name.is_empty() {
126 &self.group_name
127 } else {
128 group_name
129 };
130
131 let mut instance = instance;
132 instance.generate_instance_id();
133
134 let request =
135 InstanceRequest::register(&self.namespace, service_name, group_name, instance.clone());
136
137 let response: InstanceResponse = self.rpc_client.request(&request).await?;
138
139 if !response.response.success {
140 return Err(BatataError::server_error(
141 response.response.error_code,
142 response.response.message,
143 ));
144 }
145
146 if instance.ephemeral {
148 self.registered_instances.write().push((
149 service_name.to_string(),
150 group_name.to_string(),
151 instance,
152 ));
153 }
154
155 info!(
156 "Registered instance: service={}, group={}",
157 service_name, group_name
158 );
159
160 Ok(())
161 }
162
163 pub async fn register_instance_simple(
165 &self,
166 service_name: &str,
167 ip: &str,
168 port: i32,
169 ) -> Result<()> {
170 let instance = Instance::new(ip, port);
171 self.register_instance(service_name, &self.group_name, instance)
172 .await
173 }
174
175 pub async fn deregister_instance(
177 &self,
178 service_name: &str,
179 group_name: &str,
180 instance: Instance,
181 ) -> Result<()> {
182 let group_name = if group_name.is_empty() {
183 &self.group_name
184 } else {
185 group_name
186 };
187
188 let request =
189 InstanceRequest::deregister(&self.namespace, service_name, group_name, instance.clone());
190
191 let response: InstanceResponse = self.rpc_client.request(&request).await?;
192
193 if !response.response.success {
194 return Err(BatataError::server_error(
195 response.response.error_code,
196 response.response.message,
197 ));
198 }
199
200 self.registered_instances
202 .write()
203 .retain(|(s, g, i)| !(s == service_name && g == group_name && i.key() == instance.key()));
204
205 info!(
206 "Deregistered instance: service={}, group={}",
207 service_name, group_name
208 );
209
210 Ok(())
211 }
212
213 pub async fn deregister_instance_simple(
215 &self,
216 service_name: &str,
217 ip: &str,
218 port: i32,
219 ) -> Result<()> {
220 let instance = Instance::new(ip, port);
221 self.deregister_instance(service_name, &self.group_name, instance)
222 .await
223 }
224
225 pub async fn update_instance(
227 &self,
228 service_name: &str,
229 group_name: &str,
230 instance: Instance,
231 ) -> Result<()> {
232 let group_name = if group_name.is_empty() {
233 &self.group_name
234 } else {
235 group_name
236 };
237
238 let request =
239 InstanceRequest::update(&self.namespace, service_name, group_name, instance.clone());
240
241 let response: InstanceResponse = self.rpc_client.request(&request).await?;
242
243 if !response.response.success {
244 return Err(BatataError::server_error(
245 response.response.error_code,
246 response.response.message,
247 ));
248 }
249
250 {
252 let mut registered = self.registered_instances.write();
253 for (s, g, i) in registered.iter_mut() {
254 if s == service_name && g == group_name && i.key() == instance.key() {
255 *i = instance.clone();
256 break;
257 }
258 }
259 }
260
261 info!(
262 "Updated instance: service={}, group={}",
263 service_name, group_name
264 );
265
266 Ok(())
267 }
268
269 pub async fn update_instance_simple(
271 &self,
272 service_name: &str,
273 ip: &str,
274 port: i32,
275 weight: f64,
276 enabled: bool,
277 ) -> Result<()> {
278 let instance = Instance::new(ip, port)
279 .with_weight(weight)
280 .with_enabled(enabled);
281 self.update_instance(service_name, &self.group_name, instance)
282 .await
283 }
284
285 pub async fn get_service(
287 &self,
288 service_name: &str,
289 group_name: &str,
290 clusters: &[String],
291 ) -> Result<crate::api::naming::Service> {
292 let group_name = if group_name.is_empty() {
293 &self.group_name
294 } else {
295 group_name
296 };
297
298 let cluster_str = clusters.join(",");
299 let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
300 .with_cluster(&cluster_str);
301
302 let response: QueryServiceResponse = self.rpc_client.request(&request).await?;
303
304 self.cache.put(&self.namespace, response.service_info.clone());
306
307 Ok(response.service_info)
308 }
309
310 pub async fn get_all_instances(
312 &self,
313 service_name: &str,
314 group_name: &str,
315 ) -> Result<Vec<Instance>> {
316 self.select_instances(service_name, group_name, false).await
317 }
318
319 pub async fn select_instances(
321 &self,
322 service_name: &str,
323 group_name: &str,
324 healthy_only: bool,
325 ) -> Result<Vec<Instance>> {
326 let group_name = if group_name.is_empty() {
327 &self.group_name
328 } else {
329 group_name
330 };
331
332 if let Some(service) = self.cache.get(&self.namespace, group_name, service_name) {
334 let instances = if healthy_only {
335 service
336 .hosts
337 .into_iter()
338 .filter(|i| i.healthy && i.enabled)
339 .collect()
340 } else {
341 service.hosts
342 };
343 return Ok(instances);
344 }
345
346 let request =
348 ServiceQueryRequest::new(&self.namespace, service_name, group_name)
349 .with_healthy_only(healthy_only);
350
351 let response: QueryServiceResponse = self.rpc_client.request(&request).await?;
352
353 self.cache.put(&self.namespace, response.service_info.clone());
355
356 let instances = if healthy_only {
357 response
358 .service_info
359 .hosts
360 .into_iter()
361 .filter(|i| i.healthy && i.enabled)
362 .collect()
363 } else {
364 response.service_info.hosts
365 };
366
367 Ok(instances)
368 }
369
370 pub async fn select_one_healthy_instance(
372 &self,
373 service_name: &str,
374 group_name: &str,
375 ) -> Result<Instance> {
376 let instances = self.select_instances(service_name, group_name, true).await?;
377
378 if instances.is_empty() {
379 return Err(BatataError::ServiceNotFound {
380 service_name: service_name.to_string(),
381 group_name: group_name.to_string(),
382 namespace: self.namespace.clone(),
383 });
384 }
385
386 let index = rand_index(instances.len());
388 Ok(instances[index].clone())
389 }
390
391 pub async fn get_services_of_server(
393 &self,
394 group_name: &str,
395 page_no: i32,
396 page_size: i32,
397 ) -> Result<(i32, Vec<String>)> {
398 let group_name = if group_name.is_empty() {
399 &self.group_name
400 } else {
401 group_name
402 };
403
404 let request = ServiceListRequest::new(&self.namespace, group_name)
405 .with_page(page_no, page_size);
406
407 let response: ServiceListResponse = self.rpc_client.request(&request).await?;
408
409 Ok((response.count, response.service_names))
410 }
411
412 pub async fn subscribe<L>(
414 &self,
415 service_name: &str,
416 group_name: &str,
417 listener: L,
418 ) -> Result<()>
419 where
420 L: ServiceListener + 'static,
421 {
422 let group_name = if group_name.is_empty() {
423 &self.group_name
424 } else {
425 group_name
426 };
427
428 self.subscribers.subscribe(
430 &self.namespace,
431 group_name,
432 service_name,
433 Arc::new(listener),
434 );
435
436 let request = SubscribeServiceRequest::subscribe(&self.namespace, service_name, group_name);
438
439 let response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
440
441 self.cache.put(&self.namespace, response.service_info.clone());
443
444 info!(
445 "Subscribed to service: service={}, group={}",
446 service_name, group_name
447 );
448
449 Ok(())
450 }
451
452 pub async fn subscribe_callback<F>(
454 &self,
455 service_name: &str,
456 group_name: &str,
457 callback: F,
458 ) -> Result<()>
459 where
460 F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
461 {
462 self.subscribe(service_name, group_name, CallbackServiceListener::new(callback))
463 .await
464 }
465
466 pub async fn unsubscribe(&self, service_name: &str, group_name: &str) -> Result<()> {
468 let group_name = if group_name.is_empty() {
469 &self.group_name
470 } else {
471 group_name
472 };
473
474 self.subscribers
475 .unsubscribe(&self.namespace, group_name, service_name);
476
477 let request = SubscribeServiceRequest::unsubscribe(&self.namespace, service_name, group_name);
479
480 let _response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
481
482 info!(
483 "Unsubscribed from service: service={}, group={}",
484 service_name, group_name
485 );
486
487 Ok(())
488 }
489
490 pub async fn get_server_status(&self) -> Result<String> {
492 if self.rpc_client.is_connected() {
493 Ok("UP".to_string())
494 } else {
495 Ok("DOWN".to_string())
496 }
497 }
498
499 async fn heartbeat_loop(
501 instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
502 rpc_client: Arc<RpcClient>,
503 namespace: String,
504 shutdown: Arc<Notify>,
505 ) {
506 let heartbeat_interval = Duration::from_secs(5);
507
508 loop {
509 tokio::select! {
510 _ = shutdown.notified() => {
511 info!("Heartbeat loop shutdown");
512 break;
513 }
514 _ = tokio::time::sleep(heartbeat_interval) => {
515 let registered = instances.read().clone();
516
517 for (service_name, group_name, instance) in registered {
518 if !instance.ephemeral {
519 continue;
520 }
521
522 let request = InstanceRequest::register(
524 &namespace,
525 &service_name,
526 &group_name,
527 instance,
528 );
529
530 if let Err(e) = rpc_client.request::<_, InstanceResponse>(&request).await {
531 warn!(
532 "Heartbeat failed for service={}, group={}: {}",
533 service_name, group_name, e
534 );
535 } else {
536 debug!(
537 "Heartbeat sent for service={}, group={}",
538 service_name, group_name
539 );
540 }
541 }
542 }
543 }
544 }
545 }
546}
547
548impl Drop for NamingService {
549 fn drop(&mut self) {
550 self.shutdown.notify_one();
551 }
552}
553
554fn rand_index(max: usize) -> usize {
556 use std::time::{SystemTime, UNIX_EPOCH};
557 let nanos = SystemTime::now()
558 .duration_since(UNIX_EPOCH)
559 .unwrap()
560 .subsec_nanos() as usize;
561 nanos % max
562}