scoutquest_rust/
client.rs

1use crate::error::{Result, ScoutQuestError};
2use crate::models::*;
3use reqwest::{Client as HttpClient, Method};
4use serde_json::Value;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, RwLock};
8use tokio::time::{interval, sleep};
9use tracing::{debug, error, info, warn};
10use url::Url;
11
12/// The main client for interacting with ScoutQuest Service Discovery.
13///
14/// This client provides methods for service registration, discovery,
15/// and making HTTP calls to discovered services. It handles automatic heartbeats
16/// for registered services and includes retry logic for failed requests.
17///
18/// # Examples
19///
20/// ```rust,no_run
21/// use scoutquest_rust::*;
22///
23/// #[tokio::main]
24/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
25///     let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
26///
27///     // Register a service
28///     client.register_service("my-service", "localhost", 3000, None).await?;
29///
30///     // Discover services
31///     let instance = client.discover_service("other-service", None).await?;
32///
33///     Ok(())
34/// }
35/// ```
36#[derive(Clone)]
37pub struct ServiceDiscoveryClient {
38    discovery_url: String,
39    http_client: HttpClient,
40    registered_instance: Arc<RwLock<Option<ServiceInstance>>>,
41    heartbeat_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
42    retry_attempts: usize,
43    retry_delay: Duration,
44}
45
46impl ServiceDiscoveryClient {
47    /// Creates a new ServiceDiscoveryClient with default configuration.
48    ///
49    /// # Arguments
50    ///
51    /// * `discovery_url` - The base URL of the ScoutQuest discovery server
52    ///
53    /// # Returns
54    ///
55    /// Returns a Result containing the client or an error if the URL is invalid.
56    ///
57    /// # Examples
58    ///
59    /// ```rust,no_run
60    /// use scoutquest_rust::ServiceDiscoveryClient;
61    ///
62    /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
63    /// # Ok::<(), Box<dyn std::error::Error>>(())
64    /// ```
65    pub fn new(discovery_url: &str) -> Result<Self> {
66        Self::with_config(
67            discovery_url,
68            Duration::from_secs(30),
69            3,
70            Duration::from_secs(1),
71        )
72    }
73
74    /// Creates a new ServiceDiscoveryClient with custom configuration.
75    ///
76    /// # Arguments
77    ///
78    /// * `discovery_url` - The base URL of the ScoutQuest discovery server
79    /// * `timeout` - HTTP request timeout
80    /// * `retry_attempts` - Number of retry attempts for failed requests
81    /// * `retry_delay` - Base delay between retry attempts
82    ///
83    /// # Returns
84    ///
85    /// Returns a Result containing the client or an error if the URL is invalid.
86    pub fn with_config(
87        discovery_url: &str,
88        timeout: Duration,
89        retry_attempts: usize,
90        retry_delay: Duration,
91    ) -> Result<Self> {
92        let discovery_url = discovery_url.trim_end_matches('/').to_string();
93
94        Url::parse(&discovery_url)?;
95
96        let http_client = HttpClient::builder()
97            .timeout(timeout)
98            .build()
99            .map_err(ScoutQuestError::NetworkError)?;
100
101        Ok(Self {
102            discovery_url,
103            http_client,
104            registered_instance: Arc::new(RwLock::new(None)),
105            heartbeat_handle: Arc::new(Mutex::new(None)),
106            retry_attempts,
107            retry_delay,
108        })
109    }
110
111    /// Registers a service with the ScoutQuest discovery server.
112    ///
113    /// This method registers a service instance and starts automatic heartbeat
114    /// to maintain the registration. Only one service can be registered per client.
115    ///
116    /// # Arguments
117    ///
118    /// * `service_name` - The name of the service to register
119    /// * `host` - The hostname or IP address where the service is running
120    /// * `port` - The port number where the service is listening
121    /// * `options` - Optional registration options (metadata, tags, health check, etc.)
122    ///
123    /// # Returns
124    ///
125    /// Returns the registered ServiceInstance or an error if registration fails.
126    ///
127    /// # Examples
128    ///
129    /// ```rust,no_run
130    /// use scoutquest_rust::*;
131    ///
132    /// # #[tokio::main]
133    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
134    /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
135    ///
136    /// let options = ServiceRegistrationOptions::new()
137    ///     .with_tags(vec!["api".to_string(), "v1".to_string()]);
138    ///
139    /// let instance = client.register_service("user-service", "localhost", 3000, Some(options)).await?;
140    /// println!("Registered with ID: {}", instance.id);
141    /// # Ok(())
142    /// # }
143    /// ```
144    pub async fn register_service(
145        &self,
146        service_name: &str,
147        host: &str,
148        port: u16,
149        options: Option<ServiceRegistrationOptions>,
150    ) -> Result<ServiceInstance> {
151        let options = options.unwrap_or_default();
152
153        let request = RegisterServiceRequest {
154            service_name: service_name.to_string(),
155            host: host.to_string(),
156            port,
157            secure: options.secure,
158            metadata: options.metadata,
159            tags: options.tags,
160            health_check: options.health_check,
161        };
162
163        let url = format!("{}/api/services", self.discovery_url);
164
165        let response = self.http_client.post(&url).json(&request).send().await?;
166
167        if response.status().is_success() {
168            let instance: ServiceInstance = response.json().await?;
169
170            {
171                let mut registered = self.registered_instance.write().await;
172                *registered = Some(instance.clone());
173            }
174
175            self.start_heartbeat().await;
176
177            info!(
178                "Service {} registered with ID: {}",
179                service_name, instance.id
180            );
181            Ok(instance)
182        } else {
183            let status = response.status().as_u16();
184            let message = response.text().await.unwrap_or_default();
185            Err(ScoutQuestError::RegistrationFailed { status, message })
186        }
187    }
188
189    /// Discovers a service instance from the ScoutQuest discovery server.
190    ///
191    /// # Arguments
192    ///
193    /// * `service_name` - The name of the service to discover
194    /// * `options` - Discovery options (healthy only, tags, etc.)
195    ///
196    /// # Returns
197    ///
198    /// Returns a ServiceInstance or an error if no instances are available.
199    ///
200    /// # Examples
201    ///
202    /// ```rust,no_run
203    /// use scoutquest_rust::*;
204    ///
205    /// # #[tokio::main]
206    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
207    /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
208    ///
209    /// let instance = client.discover_service("user-service", None).await?;
210    /// println!("Found instance: {}:{}", instance.host, instance.port);
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub async fn discover_service(
215        &self,
216        service_name: &str,
217        options: Option<ServiceDiscoveryOptions>,
218    ) -> Result<ServiceInstance> {
219        let options = options.unwrap_or_default();
220
221        let mut url = Url::parse(&format!(
222            "{}/api/discovery/{}",
223            self.discovery_url, service_name
224        ))?;
225
226        {
227            let mut query_pairs = url.query_pairs_mut();
228            query_pairs.append_pair("healthy_only", &options.healthy_only.to_string());
229
230            if let Some(tags) = &options.tags {
231                query_pairs.append_pair("tags", &tags.join(","));
232            }
233
234            if let Some(limit) = options.limit {
235                query_pairs.append_pair("limit", &limit.to_string());
236            }
237        }
238
239        let response = self.http_client.get(url).send().await?;
240
241        if response.status().is_success() {
242            let instance: ServiceInstance = response.json().await?;
243            debug!(
244                "Discovered instance for service {}: {}:{}",
245                service_name, instance.host, instance.port
246            );
247            Ok(instance)
248        } else if response.status().as_u16() == 404 {
249            Err(ScoutQuestError::ServiceNotFound {
250                service_name: service_name.to_string(),
251            })
252        } else {
253            warn!(
254                "Discovery failed for {}: {}",
255                service_name,
256                response.status()
257            );
258            Err(ScoutQuestError::InternalError(format!(
259                "Discovery failed with status: {}",
260                response.status()
261            )))
262        }
263    }
264
265    /// Finds all services that have the specified tag.
266    ///
267    /// # Arguments
268    ///
269    /// * `tag` - The tag to search for
270    ///
271    /// # Returns
272    ///
273    /// Returns a vector of Service objects that have the specified tag.
274    pub async fn get_services_by_tag(&self, tag: &str) -> Result<Vec<Service>> {
275        let url = format!("{}/api/services/tags/{}", self.discovery_url, tag);
276
277        let response = self.http_client.get(&url).send().await?;
278
279        if response.status().is_success() {
280            let services: Vec<Service> = response.json().await?;
281            Ok(services)
282        } else {
283            warn!("Tag search failed for {}: {}", tag, response.status());
284            Ok(Vec::new())
285        }
286    }
287
288    /// Calls a REST API endpoint on a discovered service with retry logic.
289    ///
290    /// # Arguments
291    ///
292    /// * `service_name` - The name of the service to call
293    /// * `path` - The API path to call
294    /// * `method` - The HTTP method to use
295    /// * `body` - Optional request body
296    ///
297    /// # Returns
298    ///
299    /// Returns the deserialized response of type T.
300    pub async fn call_service<T>(
301        &self,
302        service_name: &str,
303        path: &str,
304        method: Method,
305        body: Option<Value>,
306    ) -> Result<T>
307    where
308        T: serde::de::DeserializeOwned,
309    {
310        for attempt in 1..=self.retry_attempts {
311            match self
312                .try_call_service(service_name, path, &method, &body)
313                .await
314            {
315                Ok(response) => {
316                    info!(
317                        "Successful call to {}:{} (attempt {})",
318                        service_name, path, attempt
319                    );
320                    return Ok(response);
321                }
322                Err(e) => {
323                    warn!(
324                        "Attempt {}/{} failed for {}:{}: {}",
325                        attempt, self.retry_attempts, service_name, path, e
326                    );
327
328                    if attempt == self.retry_attempts {
329                        error!(
330                            "Final failure calling {}:{} after {} attempts",
331                            service_name, path, self.retry_attempts
332                        );
333                        return Err(e);
334                    }
335
336                    sleep(self.retry_delay * attempt as u32).await;
337                }
338            }
339        }
340
341        unreachable!()
342    }
343
344    /// Tries to call a service endpoint with the specified parameters.
345    ///
346    /// # Arguments
347    ///
348    /// * `service_name` - The name of the service to call
349    /// * `path` - The API path to call
350    /// * `method` - The HTTP method to use
351    /// * `body` - The request body
352    ///
353    /// # Returns
354    ///
355    /// Returns the deserialized response of type T.
356    async fn try_call_service<T>(
357        &self,
358        service_name: &str,
359        path: &str,
360        method: &Method,
361        body: &Option<Value>,
362    ) -> Result<T>
363    where
364        T: serde::de::DeserializeOwned,
365    {
366        let instance = self.discover_service(service_name, None).await?;
367        let url = instance.get_url(path);
368
369        let mut request_builder = self.http_client.request(method.clone(), &url);
370
371        if let Some(body) = body {
372            request_builder = request_builder.json(body);
373        }
374
375        let response = request_builder.send().await?;
376
377        if response.status().is_success() {
378            let result: T = response.json().await?;
379            Ok(result)
380        } else {
381            Err(ScoutQuestError::InternalError(format!(
382                "HTTP error {}: {}",
383                response.status(),
384                response.text().await.unwrap_or_default()
385            )))
386        }
387    }
388
389    /// Makes an HTTP GET request to a discovered service.
390    ///
391    /// # Arguments
392    ///
393    /// * `service_name` - The name of the service to call
394    /// * `path` - The API path to call
395    ///
396    /// # Returns
397    ///
398    /// Returns the deserialized response of type T.
399    ///
400    /// # Examples
401    ///
402    /// ```rust,no_run
403    /// use scoutquest_rust::*;
404    /// use serde_json::Value;
405    ///
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
408    /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
409    ///
410    /// let response: Value = client.get("user-service", "/api/users").await?;
411    /// # Ok(())
412    /// # }
413    /// ```
414    pub async fn get<T>(&self, service_name: &str, path: &str) -> Result<T>
415    where
416        T: serde::de::DeserializeOwned,
417    {
418        self.call_service(service_name, path, Method::GET, None)
419            .await
420    }
421
422    /// Makes an HTTP POST request to a discovered service.
423    ///
424    /// # Arguments
425    ///
426    /// * `service_name` - The name of the service to call
427    /// * `path` - The API path to call
428    /// * `body` - The JSON body to send
429    ///
430    /// # Returns
431    ///
432    /// Returns the deserialized response of type T.
433    pub async fn post<T>(&self, service_name: &str, path: &str, body: Value) -> Result<T>
434    where
435        T: serde::de::DeserializeOwned,
436    {
437        self.call_service(service_name, path, Method::POST, Some(body))
438            .await
439    }
440
441    /// Makes an HTTP PUT request to a discovered service.
442    ///
443    /// # Arguments
444    ///
445    /// * `service_name` - The name of the service to call
446    /// * `path` - The API path to call
447    /// * `body` - The JSON body to send
448    ///
449    /// # Returns
450    ///
451    /// Returns the deserialized response of type T.
452    pub async fn put<T>(&self, service_name: &str, path: &str, body: Value) -> Result<T>
453    where
454        T: serde::de::DeserializeOwned,
455    {
456        self.call_service(service_name, path, Method::PUT, Some(body))
457            .await
458    }
459
460    /// Makes an HTTP DELETE request to a discovered service.
461    ///
462    /// # Arguments
463    ///
464    /// * `service_name` - The name of the service to call
465    /// * `path` - The API path to call
466    ///
467    /// # Returns
468    ///
469    /// Returns an empty result on success.
470    pub async fn delete(&self, service_name: &str, path: &str) -> Result<()> {
471        let _: Value = self
472            .call_service(service_name, path, Method::DELETE, None)
473            .await?;
474        Ok(())
475    }
476
477    /// Deregisters the currently registered service from the discovery server.
478    ///
479    /// This stops the automatic heartbeat and removes the service registration.
480    /// It's important to call this method before dropping the client to ensure
481    /// clean shutdown.
482    ///
483    /// # Returns
484    ///
485    /// Returns an empty result on success.
486    ///
487    /// # Examples
488    ///
489    /// ```rust,no_run
490    /// use scoutquest_rust::*;
491    ///
492    /// # #[tokio::main]
493    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
494    /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
495    /// client.register_service("my-service", "localhost", 3000, None).await?;
496    ///
497    /// // ... do work ...
498    ///
499    /// client.deregister().await?;
500    /// # Ok(())
501    /// # }
502    /// ```
503    pub async fn deregister(&self) -> Result<()> {
504        let instance = {
505            let registered = self.registered_instance.read().await;
506            registered.clone()
507        };
508
509        if let Some(instance) = instance {
510            self.stop_heartbeat().await;
511
512            let url = format!(
513                "{}/api/services/{}/instances/{}",
514                self.discovery_url, instance.service_name, instance.id
515            );
516
517            let response = self.http_client.delete(&url).send().await?;
518
519            if response.status().is_success() {
520                info!("Service {} deregistered", instance.service_name);
521            } else {
522                warn!("Deregistration failed: {}", response.status());
523            }
524
525            {
526                let mut registered = self.registered_instance.write().await;
527                *registered = None;
528            }
529        }
530
531        Ok(())
532    }
533
534    /// Starts the heartbeat mechanism for the registered service instance.
535    ///
536    /// This method initiates a periodic heartbeat signal to the service discovery
537    /// server, indicating that the service instance is still alive and healthy.
538    async fn start_heartbeat(&self) {
539        self.stop_heartbeat().await;
540
541        let discovery_url = self.discovery_url.clone();
542        let http_client = self.http_client.clone();
543        let registered_instance = self.registered_instance.clone();
544
545        let handle = tokio::spawn(async move {
546            let mut interval = interval(Duration::from_secs(30));
547
548            loop {
549                interval.tick().await;
550
551                let instance = {
552                    let registered = registered_instance.read().await;
553                    registered.clone()
554                };
555
556                if let Some(instance) = instance {
557                    let url = format!(
558                        "{}/api/services/{}/instances/{}/heartbeat",
559                        discovery_url, instance.service_name, instance.id
560                    );
561
562                    match http_client.post(&url).send().await {
563                        Ok(response) => {
564                            if !response.status().is_success() {
565                                warn!("Heartbeat failed: {}", response.status());
566                            }
567                        }
568                        Err(e) => {
569                            error!("Error during heartbeat: {}", e);
570                        }
571                    }
572                } else {
573                    break; // No registered instance, stop heartbeat
574                }
575            }
576        });
577
578        {
579            let mut heartbeat_handle = self.heartbeat_handle.lock().await;
580            *heartbeat_handle = Some(handle);
581        }
582    }
583
584    /// Stops the heartbeat mechanism for the registered service instance.
585    ///
586    /// This method stops the periodic heartbeat signal to the service discovery
587    /// server, indicating that the service instance is no longer alive or healthy.
588    async fn stop_heartbeat(&self) {
589        let mut heartbeat_handle = self.heartbeat_handle.lock().await;
590        if let Some(handle) = heartbeat_handle.take() {
591            handle.abort();
592        }
593    }
594
595    /// Retrieves the currently registered service instance.
596    ///
597    /// This method returns a clone of the registered service instance, if it exists.
598    pub async fn get_registered_instance(&self) -> Option<ServiceInstance> {
599        let registered = self.registered_instance.read().await;
600        registered.clone()
601    }
602
603    /// Retrieves the discovery URL for the service.
604    ///
605    /// This method returns the discovery URL for the service.
606    pub fn get_discovery_url(&self) -> &str {
607        &self.discovery_url
608    }
609}
610
611/// Service discovery client for interacting with the ScoutQuest server.
612impl Drop for ServiceDiscoveryClient {
613    /// This method is called when the ServiceDiscoveryClient is dropped.
614    fn drop(&mut self) {
615        if Arc::strong_count(&self.registered_instance) > 1 {
616            warn!("ServiceDiscoveryClient dropped without calling deregister(). Call deregister() before dropping.");
617        }
618    }
619}