batata_client/naming/
naming_service.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, info, warn};
8
9use crate::api::naming::{
10    Instance, InstanceRequest, InstanceResponse, QueryServiceResponse,
11    ServiceListRequest, ServiceListResponse, ServiceQueryRequest, SubscribeServiceRequest,
12    SubscribeServiceResponse,
13};
14use crate::common::DEFAULT_GROUP;
15use crate::error::{BatataError, Result};
16use crate::naming::{
17    CallbackServiceListener, ServiceChangeEvent, ServiceInfoCache, ServiceListener,
18    SubscriberRegistry,
19};
20use crate::remote::RpcClient;
21
22/// Naming service for service discovery and registration
23pub struct NamingService {
24    /// RPC client for server communication
25    rpc_client: Arc<RpcClient>,
26
27    /// Service information cache
28    cache: Arc<ServiceInfoCache>,
29
30    /// Subscriber registry
31    subscribers: Arc<SubscriberRegistry>,
32
33    /// Namespace
34    namespace: String,
35
36    /// Group name
37    group_name: String,
38
39    /// Whether the service is started
40    started: Arc<RwLock<bool>>,
41
42    /// Heartbeat task handle
43    heartbeat_task: Arc<RwLock<Option<JoinHandle<()>>>>,
44
45    /// Shutdown notify
46    shutdown: Arc<Notify>,
47
48    /// Registered instances
49    registered_instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
50}
51
52impl NamingService {
53    /// Create a new NamingService
54    pub fn new(rpc_client: Arc<RpcClient>, namespace: &str) -> Self {
55        Self {
56            rpc_client,
57            cache: Arc::new(ServiceInfoCache::new()),
58            subscribers: Arc::new(SubscriberRegistry::new()),
59            namespace: namespace.to_string(),
60            group_name: DEFAULT_GROUP.to_string(),
61            started: Arc::new(RwLock::new(false)),
62            heartbeat_task: Arc::new(RwLock::new(None)),
63            shutdown: Arc::new(Notify::new()),
64            registered_instances: Arc::new(RwLock::new(Vec::new())),
65        }
66    }
67
68    /// Set default group name
69    pub fn with_group(mut self, group_name: &str) -> Self {
70        self.group_name = group_name.to_string();
71        self
72    }
73
74    /// Start the naming service
75    pub async fn start(&self) -> Result<()> {
76        if *self.started.read() {
77            return Err(BatataError::ClientAlreadyStarted);
78        }
79
80        *self.started.write() = true;
81
82        // Start heartbeat task for ephemeral instances
83        let instances = self.registered_instances.clone();
84        let rpc_client = self.rpc_client.clone();
85        let namespace = self.namespace.clone();
86        let shutdown = self.shutdown.clone();
87
88        let handle = tokio::spawn(async move {
89            Self::heartbeat_loop(instances, rpc_client, namespace, shutdown).await;
90        });
91
92        *self.heartbeat_task.write() = Some(handle);
93
94        info!("NamingService started");
95        Ok(())
96    }
97
98    /// Stop the naming service
99    pub async fn stop(&self) {
100        *self.started.write() = false;
101        self.shutdown.notify_one();
102
103        if let Some(handle) = self.heartbeat_task.write().take() {
104            handle.abort();
105        }
106
107        // Deregister all instances
108        let instances = self.registered_instances.read().clone();
109        for (service_name, group_name, instance) in instances {
110            if let Err(e) = self.deregister_instance(&service_name, &group_name, instance).await {
111                warn!("Failed to deregister instance on shutdown: {}", e);
112            }
113        }
114
115        info!("NamingService stopped");
116    }
117
118    /// Register a service instance
119    pub async fn register_instance(
120        &self,
121        service_name: &str,
122        group_name: &str,
123        instance: Instance,
124    ) -> Result<()> {
125        let group_name = if group_name.is_empty() {
126            &self.group_name
127        } else {
128            group_name
129        };
130
131        let mut instance = instance;
132        instance.generate_instance_id();
133
134        let request =
135            InstanceRequest::register(&self.namespace, service_name, group_name, instance.clone());
136
137        let response: InstanceResponse = self.rpc_client.request(&request).await?;
138
139        if !response.response.success {
140            return Err(BatataError::server_error(
141                response.response.error_code,
142                response.response.message,
143            ));
144        }
145
146        // Track registered instance for heartbeat
147        if instance.ephemeral {
148            self.registered_instances.write().push((
149                service_name.to_string(),
150                group_name.to_string(),
151                instance,
152            ));
153        }
154
155        info!(
156            "Registered instance: service={}, group={}",
157            service_name, group_name
158        );
159
160        Ok(())
161    }
162
163    /// Register a service instance with simplified parameters
164    pub async fn register_instance_simple(
165        &self,
166        service_name: &str,
167        ip: &str,
168        port: i32,
169    ) -> Result<()> {
170        let instance = Instance::new(ip, port);
171        self.register_instance(service_name, &self.group_name, instance)
172            .await
173    }
174
175    /// Deregister a service instance
176    pub async fn deregister_instance(
177        &self,
178        service_name: &str,
179        group_name: &str,
180        instance: Instance,
181    ) -> Result<()> {
182        let group_name = if group_name.is_empty() {
183            &self.group_name
184        } else {
185            group_name
186        };
187
188        let request =
189            InstanceRequest::deregister(&self.namespace, service_name, group_name, instance.clone());
190
191        let response: InstanceResponse = self.rpc_client.request(&request).await?;
192
193        if !response.response.success {
194            return Err(BatataError::server_error(
195                response.response.error_code,
196                response.response.message,
197            ));
198        }
199
200        // Remove from registered instances
201        self.registered_instances
202            .write()
203            .retain(|(s, g, i)| !(s == service_name && g == group_name && i.key() == instance.key()));
204
205        info!(
206            "Deregistered instance: service={}, group={}",
207            service_name, group_name
208        );
209
210        Ok(())
211    }
212
213    /// Deregister a service instance with simplified parameters
214    pub async fn deregister_instance_simple(
215        &self,
216        service_name: &str,
217        ip: &str,
218        port: i32,
219    ) -> Result<()> {
220        let instance = Instance::new(ip, port);
221        self.deregister_instance(service_name, &self.group_name, instance)
222            .await
223    }
224
225    /// Update a service instance
226    pub async fn update_instance(
227        &self,
228        service_name: &str,
229        group_name: &str,
230        instance: Instance,
231    ) -> Result<()> {
232        let group_name = if group_name.is_empty() {
233            &self.group_name
234        } else {
235            group_name
236        };
237
238        let request =
239            InstanceRequest::update(&self.namespace, service_name, group_name, instance.clone());
240
241        let response: InstanceResponse = self.rpc_client.request(&request).await?;
242
243        if !response.response.success {
244            return Err(BatataError::server_error(
245                response.response.error_code,
246                response.response.message,
247            ));
248        }
249
250        // Update in registered instances list
251        {
252            let mut registered = self.registered_instances.write();
253            for (s, g, i) in registered.iter_mut() {
254                if s == service_name && g == group_name && i.key() == instance.key() {
255                    *i = instance.clone();
256                    break;
257                }
258            }
259        }
260
261        info!(
262            "Updated instance: service={}, group={}",
263            service_name, group_name
264        );
265
266        Ok(())
267    }
268
269    /// Update a service instance with simplified parameters
270    pub async fn update_instance_simple(
271        &self,
272        service_name: &str,
273        ip: &str,
274        port: i32,
275        weight: f64,
276        enabled: bool,
277    ) -> Result<()> {
278        let instance = Instance::new(ip, port)
279            .with_weight(weight)
280            .with_enabled(enabled);
281        self.update_instance(service_name, &self.group_name, instance)
282            .await
283    }
284
285    /// Get service information with cluster filtering
286    pub async fn get_service(
287        &self,
288        service_name: &str,
289        group_name: &str,
290        clusters: &[String],
291    ) -> Result<crate::api::naming::Service> {
292        let group_name = if group_name.is_empty() {
293            &self.group_name
294        } else {
295            group_name
296        };
297
298        let cluster_str = clusters.join(",");
299        let request = ServiceQueryRequest::new(&self.namespace, service_name, group_name)
300            .with_cluster(&cluster_str);
301
302        let response: QueryServiceResponse = self.rpc_client.request(&request).await?;
303
304        // Update cache
305        self.cache.put(&self.namespace, response.service_info.clone());
306
307        Ok(response.service_info)
308    }
309
310    /// Get all instances of a service
311    pub async fn get_all_instances(
312        &self,
313        service_name: &str,
314        group_name: &str,
315    ) -> Result<Vec<Instance>> {
316        self.select_instances(service_name, group_name, false).await
317    }
318
319    /// Select healthy instances of a service
320    pub async fn select_instances(
321        &self,
322        service_name: &str,
323        group_name: &str,
324        healthy_only: bool,
325    ) -> Result<Vec<Instance>> {
326        let group_name = if group_name.is_empty() {
327            &self.group_name
328        } else {
329            group_name
330        };
331
332        // Try cache first
333        if let Some(service) = self.cache.get(&self.namespace, group_name, service_name) {
334            let instances = if healthy_only {
335                service
336                    .hosts
337                    .into_iter()
338                    .filter(|i| i.healthy && i.enabled)
339                    .collect()
340            } else {
341                service.hosts
342            };
343            return Ok(instances);
344        }
345
346        // Fetch from server
347        let request =
348            ServiceQueryRequest::new(&self.namespace, service_name, group_name)
349                .with_healthy_only(healthy_only);
350
351        let response: QueryServiceResponse = self.rpc_client.request(&request).await?;
352
353        // Update cache
354        self.cache.put(&self.namespace, response.service_info.clone());
355
356        let instances = if healthy_only {
357            response
358                .service_info
359                .hosts
360                .into_iter()
361                .filter(|i| i.healthy && i.enabled)
362                .collect()
363        } else {
364            response.service_info.hosts
365        };
366
367        Ok(instances)
368    }
369
370    /// Select one healthy instance randomly
371    pub async fn select_one_healthy_instance(
372        &self,
373        service_name: &str,
374        group_name: &str,
375    ) -> Result<Instance> {
376        let instances = self.select_instances(service_name, group_name, true).await?;
377
378        if instances.is_empty() {
379            return Err(BatataError::ServiceNotFound {
380                service_name: service_name.to_string(),
381                group_name: group_name.to_string(),
382                namespace: self.namespace.clone(),
383            });
384        }
385
386        // Simple random selection
387        let index = rand_index(instances.len());
388        Ok(instances[index].clone())
389    }
390
391    /// Get list of services
392    pub async fn get_services_of_server(
393        &self,
394        group_name: &str,
395        page_no: i32,
396        page_size: i32,
397    ) -> Result<(i32, Vec<String>)> {
398        let group_name = if group_name.is_empty() {
399            &self.group_name
400        } else {
401            group_name
402        };
403
404        let request = ServiceListRequest::new(&self.namespace, group_name)
405            .with_page(page_no, page_size);
406
407        let response: ServiceListResponse = self.rpc_client.request(&request).await?;
408
409        Ok((response.count, response.service_names))
410    }
411
412    /// Subscribe to service changes
413    pub async fn subscribe<L>(
414        &self,
415        service_name: &str,
416        group_name: &str,
417        listener: L,
418    ) -> Result<()>
419    where
420        L: ServiceListener + 'static,
421    {
422        let group_name = if group_name.is_empty() {
423            &self.group_name
424        } else {
425            group_name
426        };
427
428        // Add listener
429        self.subscribers.subscribe(
430            &self.namespace,
431            group_name,
432            service_name,
433            Arc::new(listener),
434        );
435
436        // Send subscribe request
437        let request = SubscribeServiceRequest::subscribe(&self.namespace, service_name, group_name);
438
439        let response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
440
441        // Update cache with initial data
442        self.cache.put(&self.namespace, response.service_info.clone());
443
444        info!(
445            "Subscribed to service: service={}, group={}",
446            service_name, group_name
447        );
448
449        Ok(())
450    }
451
452    /// Subscribe with callback
453    pub async fn subscribe_callback<F>(
454        &self,
455        service_name: &str,
456        group_name: &str,
457        callback: F,
458    ) -> Result<()>
459    where
460        F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
461    {
462        self.subscribe(service_name, group_name, CallbackServiceListener::new(callback))
463            .await
464    }
465
466    /// Unsubscribe from service changes
467    pub async fn unsubscribe(&self, service_name: &str, group_name: &str) -> Result<()> {
468        let group_name = if group_name.is_empty() {
469            &self.group_name
470        } else {
471            group_name
472        };
473
474        self.subscribers
475            .unsubscribe(&self.namespace, group_name, service_name);
476
477        // Send unsubscribe request
478        let request = SubscribeServiceRequest::unsubscribe(&self.namespace, service_name, group_name);
479
480        let _response: SubscribeServiceResponse = self.rpc_client.request(&request).await?;
481
482        info!(
483            "Unsubscribed from service: service={}, group={}",
484            service_name, group_name
485        );
486
487        Ok(())
488    }
489
490    /// Get server status
491    pub async fn get_server_status(&self) -> Result<String> {
492        if self.rpc_client.is_connected() {
493            Ok("UP".to_string())
494        } else {
495            Ok("DOWN".to_string())
496        }
497    }
498
499    /// Background heartbeat loop for ephemeral instances
500    async fn heartbeat_loop(
501        instances: Arc<RwLock<Vec<(String, String, Instance)>>>,
502        rpc_client: Arc<RpcClient>,
503        namespace: String,
504        shutdown: Arc<Notify>,
505    ) {
506        let heartbeat_interval = Duration::from_secs(5);
507
508        loop {
509            tokio::select! {
510                _ = shutdown.notified() => {
511                    info!("Heartbeat loop shutdown");
512                    break;
513                }
514                _ = tokio::time::sleep(heartbeat_interval) => {
515                    let registered = instances.read().clone();
516
517                    for (service_name, group_name, instance) in registered {
518                        if !instance.ephemeral {
519                            continue;
520                        }
521
522                        // Re-register to maintain heartbeat
523                        let request = InstanceRequest::register(
524                            &namespace,
525                            &service_name,
526                            &group_name,
527                            instance,
528                        );
529
530                        if let Err(e) = rpc_client.request::<_, InstanceResponse>(&request).await {
531                            warn!(
532                                "Heartbeat failed for service={}, group={}: {}",
533                                service_name, group_name, e
534                            );
535                        } else {
536                            debug!(
537                                "Heartbeat sent for service={}, group={}",
538                                service_name, group_name
539                            );
540                        }
541                    }
542                }
543            }
544        }
545    }
546}
547
548impl Drop for NamingService {
549    fn drop(&mut self) {
550        self.shutdown.notify_one();
551    }
552}
553
554/// Simple random index generator
555fn rand_index(max: usize) -> usize {
556    use std::time::{SystemTime, UNIX_EPOCH};
557    let nanos = SystemTime::now()
558        .duration_since(UNIX_EPOCH)
559        .unwrap()
560        .subsec_nanos() as usize;
561    nanos % max
562}