1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, info, warn};
8
9use crate::api::naming::{
10 Instance, InstanceRequest, InstanceResponse, QueryServiceResponse, ServiceListRequest,
11 ServiceListResponse, ServiceQueryRequest, SubscribeServiceRequest, SubscribeServiceResponse,
12};
13use crate::cache::FileCache;
14use crate::common::{build_service_key, DEFAULT_GROUP};
15use crate::error::{BatataError, Result};
16use crate::naming::{
17 CallbackServiceListener, LoadBalancer, ServiceChangeEvent, ServiceInfoCache, ServiceListener,
18 SubscriberRegistry, WeightedRoundRobinBalancer,
19};
20use crate::remote::RpcClient;
21use crate::CacheConfig;
22
23pub struct NamingService {
25 rpc_client: Arc<RpcClient>,
27
28 cache: Arc<ServiceInfoCache>,
30
31 file_cache: Option<Arc<FileCache>>,
33
34 cache_config: CacheConfig,
36
37 balancer: Arc<dyn LoadBalancer>,
39
40 subscribers: Arc<SubscriberRegistry>,
42
43 namespace: String,
45
46 group_name: String,
48
49 started: Arc<RwLock<bool>>,
51
52 heartbeat_task: Arc<RwLock<Option<JoinHandle<()>>>>,
54
55 shutdown: Arc<Notify>,
57
58 registered_instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
60}
61
62impl NamingService {
63 pub fn new(rpc_client: Arc<RpcClient>, namespace: &str, cache_config: CacheConfig) -> Self {
65 Self::with_balancer(
66 rpc_client,
67 namespace,
68 cache_config,
69 Arc::new(WeightedRoundRobinBalancer::new()),
70 )
71 }
72
73 pub fn with_balancer(
75 rpc_client: Arc<RpcClient>,
76 namespace: &str,
77 cache_config: CacheConfig,
78 balancer: Arc<dyn LoadBalancer>,
79 ) -> Self {
80 let file_cache = cache_config
82 .cache_dir
83 .as_ref()
84 .and_then(|dir| FileCache::new(dir).ok())
85 .map(Arc::new);
86
87 Self {
88 rpc_client,
89 cache: Arc::new(ServiceInfoCache::new()),
90 file_cache,
91 cache_config,
92 balancer,
93 subscribers: Arc::new(SubscriberRegistry::new()),
94 namespace: namespace.to_string(),
95 group_name: DEFAULT_GROUP.to_string(),
96 started: Arc::new(RwLock::new(false)),
97 heartbeat_task: Arc::new(RwLock::new(None)),
98 shutdown: Arc::new(Notify::new()),
99 registered_instances: Arc::new(RwLock::new(Vec::new())),
100 }
101 }
102
103 pub fn with_group(mut self, group_name: &str) -> Self {
105 self.group_name = group_name.to_string();
106 self
107 }
108
109 pub async fn start(&self) -> Result<()> {
111 if *self.started.read() {
112 return Err(BatataError::ClientAlreadyStarted);
113 }
114
115 *self.started.write() = true;
116
117 let instances = self.registered_instances.clone();
119 let rpc_client = self.rpc_client.clone();
120 let namespace = self.namespace.clone();
121 let shutdown = self.shutdown.clone();
122
123 let handle = tokio::spawn(async move {
124 Self::heartbeat_loop(instances, rpc_client, namespace, shutdown).await;
125 });
126
127 *self.heartbeat_task.write() = Some(handle);
128
129 info!("NamingService started");
130 Ok(())
131 }
132
133 pub async fn stop(&self) {
135 *self.started.write() = false;
136 self.shutdown.notify_one();
137
138 if let Some(handle) = self.heartbeat_task.write().take() {
139 handle.abort();
140 }
141
142 let instances = self.registered_instances.read().clone();
144 for (service_name, group_name, instance) in instances {
145 if let Err(e) = self.deregister_instance(&service_name, &group_name, instance).await {
146 warn!("Failed to deregister instance on shutdown: {}", e);
147 }
148 }
149
150 info!("NamingService stopped");
151 }
152
153 pub async fn register_instance(
155 &self,
156 service_name: &str,
157 group_name: &str,
158 instance: Instance,
159 ) -> Result<()> {
160 let group_name = if group_name.is_empty() {
161 &self.group_name
162 } else {
163 group_name
164 };
165
166 let mut instance = instance;
167 instance.generate_instance_id();
168
169 let request =
170 InstanceRequest::register(&self.namespace, service_name, group_name, instance.clone());
171
172 let response: InstanceResponse = self.rpc_client.request(&request).await?;
173
174 if !response.response.success {
175 return Err(BatataError::server_error(
176 response.response.error_code,
177 response.response.message,
178 ));
179 }
180
181 if instance.ephemeral {
183 self.registered_instances.write().push((
184 service_name.to_string(),
185 group_name.to_string(),
186 instance,
187 ));
188 }
189
190 info!(
191 "Registered instance: service={}, group={}",
192 service_name, group_name
193 );
194
195 Ok(())
196 }
197
198 pub async fn register_instance_simple(
200 &self,
201 service_name: &str,
202 ip: &str,
203 port: i32,
204 ) -> Result<()> {
205 let instance = Instance::new(ip, port);
206 self.register_instance(service_name, &self.group_name, instance)
207 .await
208 }
209
210 pub async fn deregister_instance(
212 &self,
213 service_name: &str,
214 group_name: &str,
215 instance: Instance,
216 ) -> Result<()> {
217 let group_name = if group_name.is_empty() {
218 &self.group_name
219 } else {
220 group_name
221 };
222
223 let request =
224 InstanceRequest::deregister(&self.namespace, service_name, group_name, instance.clone());
225
226 let response: InstanceResponse = self.rpc_client.request(&request).await?;
227
228 if !response.response.success {
229 return Err(BatataError::server_error(
230 response.response.error_code,
231 response.response.message,
232 ));
233 }
234
235 self.registered_instances
237 .write()
238 .retain(|(s, g, i)| !(s == service_name && g == group_name && i.key() == instance.key()));
239
240 info!(
241 "Deregistered instance: service={}, group={}",
242 service_name, group_name
243 );
244
245 Ok(())
246 }
247
248 pub async fn deregister_instance_simple(
250 &self,
251 service_name: &str,
252 ip: &str,
253 port: i32,
254 ) -> Result<()> {
255 let instance = Instance::new(ip, port);
256 self.deregister_instance(service_name, &self.group_name, instance)
257 .await
258 }
259
260 pub async fn update_instance(
262 &self,
263 service_name: &str,
264 group_name: &str,
265 instance: Instance,
266 ) -> Result<()> {
267 let group_name = if group_name.is_empty() {
268 &self.group_name
269 } else {
270 group_name
271 };
272
273 let request =
274 InstanceRequest::update(&self.namespace, service_name, group_name, instance.clone());
275
276 let response: InstanceResponse = self.rpc_client.request(&request).await?;
277
278 if !response.response.success {
279 return Err(BatataError::server_error(
280 response.response.error_code,
281 response.response.message,
282 ));
283 }
284
285 {
287 let mut registered = self.registered_instances.write();
288 for (s, g, i) in registered.iter_mut() {
289 if s == service_name && g == group_name && i.key() == instance.key() {
290 *i = instance.clone();
291 break;
292 }
293 }
294 }
295
296 info!(
297 "Updated instance: service={}, group={}",
298 service_name, group_name
299 );
300
301 Ok(())
302 }
303
304 pub async fn update_instance_simple(
306 &self,
307 service_name: &str,
308 ip: &str,
309 port: i32,
310 weight: f64,
311 enabled: bool,
312 ) -> Result<()> {
313 let instance = Instance::new(ip, port)
314 .with_weight(weight)
315 .with_enabled(enabled);
316 self.update_instance(service_name, &self.group_name, instance)
317 .await
318 }
319
320 pub async fn get_service(
322 &self,
323 service_name: &str,
324 group_name: &str,
325 clusters: &[String],
326 ) -> Result<crate::api::naming::Service> {
327 let group_name = if group_name.is_empty() {
328 &self.group_name
329 } else {
330 group_name
331 };
332
333 let cluster_str = clusters.join(",");
334 let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
335 .with_cluster(&cluster_str);
336
337 let response: QueryServiceResponse = self.rpc_client.request(&request).await?;
338
339 self.cache.put(&self.namespace, response.service_info.clone());
341
342 Ok(response.service_info)
343 }
344
345 pub async fn get_all_instances(
347 &self,
348 service_name: &str,
349 group_name: &str,
350 ) -> Result<Vec<Instance>> {
351 self.select_instances(service_name, group_name, false).await
352 }
353
354 pub async fn select_instances(
356 &self,
357 service_name: &str,
358 group_name: &str,
359 healthy_only: bool,
360 ) -> Result<Vec<Instance>> {
361 let group_name = if group_name.is_empty() {
362 &self.group_name
363 } else {
364 group_name
365 };
366
367 if let Some(service) = self.cache.get(&self.namespace, group_name, service_name) {
369 let instances = if healthy_only {
370 service
371 .hosts
372 .into_iter()
373 .filter(|i| i.healthy && i.enabled)
374 .collect()
375 } else {
376 service.hosts
377 };
378 return Ok(instances);
379 }
380
381 let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
383 .with_healthy_only(healthy_only);
384
385 match self
386 .rpc_client
387 .request::<_, QueryServiceResponse>(&request)
388 .await
389 {
390 Ok(response) => {
391 self.cache
393 .put(&self.namespace, response.service_info.clone());
394
395 if let Some(file_cache) = &self.file_cache {
397 if let Err(e) = file_cache.save_service(&self.namespace, &response.service_info)
398 {
399 warn!("Failed to save service to file cache: {}", e);
400 }
401 }
402
403 let instances = if healthy_only {
404 response
405 .service_info
406 .hosts
407 .into_iter()
408 .filter(|i| i.healthy && i.enabled)
409 .collect()
410 } else {
411 response.service_info.hosts
412 };
413
414 Ok(instances)
415 }
416 Err(e) => {
417 if self.cache_config.failover_enabled {
419 if let Some(file_cache) = &self.file_cache {
420 if let Some(service) =
421 file_cache.load_service(&self.namespace, group_name, service_name)
422 {
423 warn!(
424 "Using cached service due to server error: {} (service={}, group={})",
425 e, service_name, group_name
426 );
427
428 if self.cache_config.update_cache_when_empty {
430 self.cache.put(&self.namespace, service.clone());
431 }
432
433 let instances = if healthy_only {
434 service
435 .hosts
436 .into_iter()
437 .filter(|i| i.healthy && i.enabled)
438 .collect()
439 } else {
440 service.hosts
441 };
442
443 return Ok(instances);
444 }
445 }
446 }
447 Err(e)
448 }
449 }
450 }
451
452 pub async fn select_one_healthy_instance(
457 &self,
458 service_name: &str,
459 group_name: &str,
460 ) -> Result<Instance> {
461 let group_name = if group_name.is_empty() {
462 &self.group_name
463 } else {
464 group_name
465 };
466
467 let instances = self.select_instances(service_name, group_name, false).await?;
468
469 if instances.is_empty() {
470 return Err(BatataError::ServiceNotFound {
471 service_name: service_name.to_string(),
472 group_name: group_name.to_string(),
473 namespace: self.namespace.clone(),
474 });
475 }
476
477 let service_key = build_service_key(service_name, group_name, &self.namespace);
479
480 self.balancer
481 .select(&service_key, &instances)
482 .ok_or_else(|| BatataError::ServiceNotFound {
483 service_name: service_name.to_string(),
484 group_name: group_name.to_string(),
485 namespace: self.namespace.clone(),
486 })
487 }
488
489 pub async fn get_services_of_server(
491 &self,
492 group_name: &str,
493 page_no: i32,
494 page_size: i32,
495 ) -> Result<(i32, Vec<String>)> {
496 let group_name = if group_name.is_empty() {
497 &self.group_name
498 } else {
499 group_name
500 };
501
502 let request = ServiceListRequest::new(&self.namespace, group_name)
503 .with_page(page_no, page_size);
504
505 let response: ServiceListResponse = self.rpc_client.request(&request).await?;
506
507 Ok((response.count, response.service_names))
508 }
509
510 pub async fn subscribe<L>(
512 &self,
513 service_name: &str,
514 group_name: &str,
515 listener: L,
516 ) -> Result<()>
517 where
518 L: ServiceListener + 'static,
519 {
520 let group_name = if group_name.is_empty() {
521 &self.group_name
522 } else {
523 group_name
524 };
525
526 self.subscribers.subscribe(
528 &self.namespace,
529 group_name,
530 service_name,
531 Arc::new(listener),
532 );
533
534 let request = SubscribeServiceRequest::subscribe(&self.namespace, service_name, group_name);
536
537 let response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
538
539 self.cache.put(&self.namespace, response.service_info.clone());
541
542 info!(
543 "Subscribed to service: service={}, group={}",
544 service_name, group_name
545 );
546
547 Ok(())
548 }
549
550 pub async fn subscribe_callback<F>(
552 &self,
553 service_name: &str,
554 group_name: &str,
555 callback: F,
556 ) -> Result<()>
557 where
558 F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
559 {
560 self.subscribe(service_name, group_name, CallbackServiceListener::new(callback))
561 .await
562 }
563
564 pub async fn unsubscribe(&self, service_name: &str, group_name: &str) -> Result<()> {
566 let group_name = if group_name.is_empty() {
567 &self.group_name
568 } else {
569 group_name
570 };
571
572 self.subscribers
573 .unsubscribe(&self.namespace, group_name, service_name);
574
575 let request = SubscribeServiceRequest::unsubscribe(&self.namespace, service_name, group_name);
577
578 let _response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
579
580 info!(
581 "Unsubscribed from service: service={}, group={}",
582 service_name, group_name
583 );
584
585 Ok(())
586 }
587
588 pub async fn get_server_status(&self) -> Result<String> {
590 if self.rpc_client.is_connected() {
591 Ok("UP".to_string())
592 } else {
593 Ok("DOWN".to_string())
594 }
595 }
596
597 async fn heartbeat_loop(
599 instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
600 rpc_client: Arc<RpcClient>,
601 namespace: String,
602 shutdown: Arc<Notify>,
603 ) {
604 let heartbeat_interval = Duration::from_secs(5);
605
606 loop {
607 tokio::select! {
608 _ = shutdown.notified() => {
609 info!("Heartbeat loop shutdown");
610 break;
611 }
612 _ = tokio::time::sleep(heartbeat_interval) => {
613 let registered = instances.read().clone();
614
615 for (service_name, group_name, instance) in registered {
616 if !instance.ephemeral {
617 continue;
618 }
619
620 let request = InstanceRequest::register(
622 &namespace,
623 &service_name,
624 &group_name,
625 instance,
626 );
627
628 if let Err(e) = rpc_client.request::<_, InstanceResponse>(&request).await {
629 warn!(
630 "Heartbeat failed for service={}, group={}: {}",
631 service_name, group_name, e
632 );
633 } else {
634 debug!(
635 "Heartbeat sent for service={}, group={}",
636 service_name, group_name
637 );
638 }
639 }
640 }
641 }
642 }
643 }
644}
645
646impl Drop for NamingService {
647 fn drop(&mut self) {
648 self.shutdown.notify_one();
649 }
650}