batata_client/naming/
naming_service.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, info, warn};
8
9use crate::api::naming::{
10    Instance, InstanceRequest, InstanceResponse, QueryServiceResponse, ServiceListRequest,
11    ServiceListResponse, ServiceQueryRequest, SubscribeServiceRequest, SubscribeServiceResponse,
12};
13use crate::cache::FileCache;
14use crate::common::{build_service_key, DEFAULT_GROUP};
15use crate::error::{BatataError, Result};
16use crate::naming::{
17    CallbackServiceListener, LoadBalancer, ServiceChangeEvent, ServiceInfoCache, ServiceListener,
18    SubscriberRegistry, WeightedRoundRobinBalancer,
19};
20use crate::remote::RpcClient;
21use crate::CacheConfig;
22
23/// Naming service for service discovery and registration
24pub struct NamingService {
25    /// RPC client for server communication
26    rpc_client: Arc<RpcClient>,
27
28    /// Service information cache
29    cache: Arc<ServiceInfoCache>,
30
31    /// File cache for failover
32    file_cache: Option<Arc<FileCache>>,
33
34    /// Cache configuration
35    cache_config: CacheConfig,
36
37    /// Load balancer for instance selection
38    balancer: Arc<dyn LoadBalancer>,
39
40    /// Subscriber registry
41    subscribers: Arc<SubscriberRegistry>,
42
43    /// Namespace
44    namespace: String,
45
46    /// Group name
47    group_name: String,
48
49    /// Whether the service is started
50    started: Arc<RwLock<bool>>,
51
52    /// Heartbeat task handle
53    heartbeat_task: Arc<RwLock<Option<JoinHandle<()>>>>,
54
55    /// Shutdown notify
56    shutdown: Arc<Notify>,
57
58    /// Registered instances
59    registered_instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
60}
61
62impl NamingService {
63    /// Create a new NamingService with default WRR load balancer
64    pub fn new(rpc_client: Arc<RpcClient>, namespace: &str, cache_config: CacheConfig) -> Self {
65        Self::with_balancer(
66            rpc_client,
67            namespace,
68            cache_config,
69            Arc::new(WeightedRoundRobinBalancer::new()),
70        )
71    }
72
73    /// Create a new NamingService with custom load balancer
74    pub fn with_balancer(
75        rpc_client: Arc<RpcClient>,
76        namespace: &str,
77        cache_config: CacheConfig,
78        balancer: Arc<dyn LoadBalancer>,
79    ) -> Self {
80        // Create file cache if cache directory is configured
81        let file_cache = cache_config
82            .cache_dir
83            .as_ref()
84            .and_then(|dir| FileCache::new(dir).ok())
85            .map(Arc::new);
86
87        Self {
88            rpc_client,
89            cache: Arc::new(ServiceInfoCache::new()),
90            file_cache,
91            cache_config,
92            balancer,
93            subscribers: Arc::new(SubscriberRegistry::new()),
94            namespace: namespace.to_string(),
95            group_name: DEFAULT_GROUP.to_string(),
96            started: Arc::new(RwLock::new(false)),
97            heartbeat_task: Arc::new(RwLock::new(None)),
98            shutdown: Arc::new(Notify::new()),
99            registered_instances: Arc::new(RwLock::new(Vec::new())),
100        }
101    }
102
103    /// Set default group name
104    pub fn with_group(mut self, group_name: &str) -> Self {
105        self.group_name = group_name.to_string();
106        self
107    }
108
109    /// Start the naming service
110    pub async fn start(&self) -> Result<()> {
111        if *self.started.read() {
112            return Err(BatataError::ClientAlreadyStarted);
113        }
114
115        *self.started.write() = true;
116
117        // Start heartbeat task for ephemeral instances
118        let instances = self.registered_instances.clone();
119        let rpc_client = self.rpc_client.clone();
120        let namespace = self.namespace.clone();
121        let shutdown = self.shutdown.clone();
122
123        let handle = tokio::spawn(async move {
124            Self::heartbeat_loop(instances, rpc_client, namespace, shutdown).await;
125        });
126
127        *self.heartbeat_task.write() = Some(handle);
128
129        info!("NamingService started");
130        Ok(())
131    }
132
133    /// Stop the naming service
134    pub async fn stop(&self) {
135        *self.started.write() = false;
136        self.shutdown.notify_one();
137
138        if let Some(handle) = self.heartbeat_task.write().take() {
139            handle.abort();
140        }
141
142        // Deregister all instances
143        let instances = self.registered_instances.read().clone();
144        for (service_name, group_name, instance) in instances {
145            if let Err(e) = self.deregister_instance(&service_name, &group_name, instance).await {
146                warn!("Failed to deregister instance on shutdown: {}", e);
147            }
148        }
149
150        info!("NamingService stopped");
151    }
152
153    /// Register a service instance
154    pub async fn register_instance(
155        &self,
156        service_name: &str,
157        group_name: &str,
158        instance: Instance,
159    ) -> Result<()> {
160        let group_name = if group_name.is_empty() {
161            &self.group_name
162        } else {
163            group_name
164        };
165
166        let mut instance = instance;
167        instance.generate_instance_id();
168
169        let request =
170            InstanceRequest::register(&self.namespace, service_name, group_name, instance.clone());
171
172        let response: InstanceResponse = self.rpc_client.request(&request).await?;
173
174        if !response.response.success {
175            return Err(BatataError::server_error(
176                response.response.error_code,
177                response.response.message,
178            ));
179        }
180
181        // Track registered instance for heartbeat
182        if instance.ephemeral {
183            self.registered_instances.write().push((
184                service_name.to_string(),
185                group_name.to_string(),
186                instance,
187            ));
188        }
189
190        info!(
191            "Registered instance: service={}, group={}",
192            service_name, group_name
193        );
194
195        Ok(())
196    }
197
198    /// Register a service instance with simplified parameters
199    pub async fn register_instance_simple(
200        &self,
201        service_name: &str,
202        ip: &str,
203        port: i32,
204    ) -> Result<()> {
205        let instance = Instance::new(ip, port);
206        self.register_instance(service_name, &self.group_name, instance)
207            .await
208    }
209
210    /// Deregister a service instance
211    pub async fn deregister_instance(
212        &self,
213        service_name: &str,
214        group_name: &str,
215        instance: Instance,
216    ) -> Result<()> {
217        let group_name = if group_name.is_empty() {
218            &self.group_name
219        } else {
220            group_name
221        };
222
223        let request =
224            InstanceRequest::deregister(&self.namespace, service_name, group_name, instance.clone());
225
226        let response: InstanceResponse = self.rpc_client.request(&request).await?;
227
228        if !response.response.success {
229            return Err(BatataError::server_error(
230                response.response.error_code,
231                response.response.message,
232            ));
233        }
234
235        // Remove from registered instances
236        self.registered_instances
237            .write()
238            .retain(|(s, g, i)| !(s == service_name && g == group_name && i.key() == instance.key()));
239
240        info!(
241            "Deregistered instance: service={}, group={}",
242            service_name, group_name
243        );
244
245        Ok(())
246    }
247
248    /// Deregister a service instance with simplified parameters
249    pub async fn deregister_instance_simple(
250        &self,
251        service_name: &str,
252        ip: &str,
253        port: i32,
254    ) -> Result<()> {
255        let instance = Instance::new(ip, port);
256        self.deregister_instance(service_name, &self.group_name, instance)
257            .await
258    }
259
260    /// Update a service instance
261    pub async fn update_instance(
262        &self,
263        service_name: &str,
264        group_name: &str,
265        instance: Instance,
266    ) -> Result<()> {
267        let group_name = if group_name.is_empty() {
268            &self.group_name
269        } else {
270            group_name
271        };
272
273        let request =
274            InstanceRequest::update(&self.namespace, service_name, group_name, instance.clone());
275
276        let response: InstanceResponse = self.rpc_client.request(&request).await?;
277
278        if !response.response.success {
279            return Err(BatataError::server_error(
280                response.response.error_code,
281                response.response.message,
282            ));
283        }
284
285        // Update in registered instances list
286        {
287            let mut registered = self.registered_instances.write();
288            for (s, g, i) in registered.iter_mut() {
289                if s == service_name && g == group_name && i.key() == instance.key() {
290                    *i = instance.clone();
291                    break;
292                }
293            }
294        }
295
296        info!(
297            "Updated instance: service={}, group={}",
298            service_name, group_name
299        );
300
301        Ok(())
302    }
303
304    /// Update a service instance with simplified parameters
305    pub async fn update_instance_simple(
306        &self,
307        service_name: &str,
308        ip: &str,
309        port: i32,
310        weight: f64,
311        enabled: bool,
312    ) -> Result<()> {
313        let instance = Instance::new(ip, port)
314            .with_weight(weight)
315            .with_enabled(enabled);
316        self.update_instance(service_name, &self.group_name, instance)
317            .await
318    }
319
320    /// Get service information with cluster filtering
321    pub async fn get_service(
322        &self,
323        service_name: &str,
324        group_name: &str,
325        clusters: &[String],
326    ) -> Result<crate::api::naming::Service> {
327        let group_name = if group_name.is_empty() {
328            &self.group_name
329        } else {
330            group_name
331        };
332
333        let cluster_str = clusters.join(",");
334        let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
335            .with_cluster(&cluster_str);
336
337        let response: QueryServiceResponse = self.rpc_client.request(&request).await?;
338
339        // Update cache
340        self.cache.put(&self.namespace, response.service_info.clone());
341
342        Ok(response.service_info)
343    }
344
345    /// Get all instances of a service
346    pub async fn get_all_instances(
347        &self,
348        service_name: &str,
349        group_name: &str,
350    ) -> Result<Vec<Instance>> {
351        self.select_instances(service_name, group_name, false).await
352    }
353
354    /// Select healthy instances of a service
355    pub async fn select_instances(
356        &self,
357        service_name: &str,
358        group_name: &str,
359        healthy_only: bool,
360    ) -> Result<Vec<Instance>> {
361        let group_name = if group_name.is_empty() {
362            &self.group_name
363        } else {
364            group_name
365        };
366
367        // Try memory cache first
368        if let Some(service) = self.cache.get(&self.namespace, group_name, service_name) {
369            let instances = if healthy_only {
370                service
371                    .hosts
372                    .into_iter()
373                    .filter(|i| i.healthy && i.enabled)
374                    .collect()
375            } else {
376                service.hosts
377            };
378            return Ok(instances);
379        }
380
381        // Fetch from server
382        let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
383            .with_healthy_only(healthy_only);
384
385        match self
386            .rpc_client
387            .request::<_, QueryServiceResponse>(&request)
388            .await
389        {
390            Ok(response) => {
391                // Update memory cache
392                self.cache
393                    .put(&self.namespace, response.service_info.clone());
394
395                // Save to file cache for failover
396                if let Some(file_cache) = &self.file_cache {
397                    if let Err(e) = file_cache.save_service(&self.namespace, &response.service_info)
398                    {
399                        warn!("Failed to save service to file cache: {}", e);
400                    }
401                }
402
403                let instances = if healthy_only {
404                    response
405                        .service_info
406                        .hosts
407                        .into_iter()
408                        .filter(|i| i.healthy && i.enabled)
409                        .collect()
410                } else {
411                    response.service_info.hosts
412                };
413
414                Ok(instances)
415            }
416            Err(e) => {
417                // Failover: try file cache
418                if self.cache_config.failover_enabled {
419                    if let Some(file_cache) = &self.file_cache {
420                        if let Some(service) =
421                            file_cache.load_service(&self.namespace, group_name, service_name)
422                        {
423                            warn!(
424                                "Using cached service due to server error: {} (service={}, group={})",
425                                e, service_name, group_name
426                            );
427
428                            // Optionally update memory cache
429                            if self.cache_config.update_cache_when_empty {
430                                self.cache.put(&self.namespace, service.clone());
431                            }
432
433                            let instances = if healthy_only {
434                                service
435                                    .hosts
436                                    .into_iter()
437                                    .filter(|i| i.healthy && i.enabled)
438                                    .collect()
439                            } else {
440                                service.hosts
441                            };
442
443                            return Ok(instances);
444                        }
445                    }
446                }
447                Err(e)
448            }
449        }
450    }
451
452    /// Select one healthy instance using the configured load balancer
453    ///
454    /// By default, uses Weighted Round Robin (WRR) algorithm which distributes
455    /// traffic proportionally to instance weights.
456    pub async fn select_one_healthy_instance(
457        &self,
458        service_name: &str,
459        group_name: &str,
460    ) -> Result<Instance> {
461        let group_name = if group_name.is_empty() {
462            &self.group_name
463        } else {
464            group_name
465        };
466
467        let instances = self.select_instances(service_name, group_name, false).await?;
468
469        if instances.is_empty() {
470            return Err(BatataError::ServiceNotFound {
471                service_name: service_name.to_string(),
472                group_name: group_name.to_string(),
473                namespace: self.namespace.clone(),
474            });
475        }
476
477        // Use load balancer for instance selection
478        let service_key = build_service_key(service_name, group_name, &self.namespace);
479
480        self.balancer
481            .select(&service_key, &instances)
482            .ok_or_else(|| BatataError::ServiceNotFound {
483                service_name: service_name.to_string(),
484                group_name: group_name.to_string(),
485                namespace: self.namespace.clone(),
486            })
487    }
488
489    /// Get list of services
490    pub async fn get_services_of_server(
491        &self,
492        group_name: &str,
493        page_no: i32,
494        page_size: i32,
495    ) -> Result<(i32, Vec<String>)> {
496        let group_name = if group_name.is_empty() {
497            &self.group_name
498        } else {
499            group_name
500        };
501
502        let request = ServiceListRequest::new(&self.namespace, group_name)
503            .with_page(page_no, page_size);
504
505        let response: ServiceListResponse = self.rpc_client.request(&request).await?;
506
507        Ok((response.count, response.service_names))
508    }
509
510    /// Subscribe to service changes
511    pub async fn subscribe<L>(
512        &self,
513        service_name: &str,
514        group_name: &str,
515        listener: L,
516    ) -> Result<()>
517    where
518        L: ServiceListener + 'static,
519    {
520        let group_name = if group_name.is_empty() {
521            &self.group_name
522        } else {
523            group_name
524        };
525
526        // Add listener
527        self.subscribers.subscribe(
528            &self.namespace,
529            group_name,
530            service_name,
531            Arc::new(listener),
532        );
533
534        // Send subscribe request
535        let request = SubscribeServiceRequest::subscribe(&self.namespace, service_name, group_name);
536
537        let response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
538
539        // Update cache with initial data
540        self.cache.put(&self.namespace, response.service_info.clone());
541
542        info!(
543            "Subscribed to service: service={}, group={}",
544            service_name, group_name
545        );
546
547        Ok(())
548    }
549
550    /// Subscribe with callback
551    pub async fn subscribe_callback<F>(
552        &self,
553        service_name: &str,
554        group_name: &str,
555        callback: F,
556    ) -> Result<()>
557    where
558        F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
559    {
560        self.subscribe(service_name, group_name, CallbackServiceListener::new(callback))
561            .await
562    }
563
564    /// Unsubscribe from service changes
565    pub async fn unsubscribe(&self, service_name: &str, group_name: &str) -> Result<()> {
566        let group_name = if group_name.is_empty() {
567            &self.group_name
568        } else {
569            group_name
570        };
571
572        self.subscribers
573            .unsubscribe(&self.namespace, group_name, service_name);
574
575        // Send unsubscribe request
576        let request = SubscribeServiceRequest::unsubscribe(&self.namespace, service_name, group_name);
577
578        let _response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
579
580        info!(
581            "Unsubscribed from service: service={}, group={}",
582            service_name, group_name
583        );
584
585        Ok(())
586    }
587
588    /// Get server status
589    pub async fn get_server_status(&self) -> Result<String> {
590        if self.rpc_client.is_connected() {
591            Ok("UP".to_string())
592        } else {
593            Ok("DOWN".to_string())
594        }
595    }
596
597    /// Background heartbeat loop for ephemeral instances
598    async fn heartbeat_loop(
599        instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
600        rpc_client: Arc<RpcClient>,
601        namespace: String,
602        shutdown: Arc<Notify>,
603    ) {
604        let heartbeat_interval = Duration::from_secs(5);
605
606        loop {
607            tokio::select! {
608                _ = shutdown.notified() => {
609                    info!("Heartbeat loop shutdown");
610                    break;
611                }
612                _ = tokio::time::sleep(heartbeat_interval) => {
613                    let registered = instances.read().clone();
614
615                    for (service_name, group_name, instance) in registered {
616                        if !instance.ephemeral {
617                            continue;
618                        }
619
620                        // Re-register to maintain heartbeat
621                        let request = InstanceRequest::register(
622                            &namespace,
623                            &service_name,
624                            &group_name,
625                            instance,
626                        );
627
628                        if let Err(e) = rpc_client.request::<_, InstanceResponse>(&request).await {
629                            warn!(
630                                "Heartbeat failed for service={}, group={}: {}",
631                                service_name, group_name, e
632                            );
633                        } else {
634                            debug!(
635                                "Heartbeat sent for service={}, group={}",
636                                service_name, group_name
637                            );
638                        }
639                    }
640                }
641            }
642        }
643    }
644}
645
646impl Drop for NamingService {
647    fn drop(&mut self) {
648        self.shutdown.notify_one();
649    }
650}