scoutquest_rust/client.rs
1use crate::error::{Result, ScoutQuestError};
2use crate::models::*;
3use reqwest::{Client as HttpClient, Method};
4use serde_json::Value;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, RwLock};
8use tokio::time::{interval, sleep};
9use tracing::{debug, error, info, warn};
10use url::Url;
11
12/// The main client for interacting with ScoutQuest Service Discovery.
13///
14/// This client provides methods for service registration, discovery,
15/// and making HTTP calls to discovered services. It handles automatic heartbeats
16/// for registered services and includes retry logic for failed requests.
17///
18/// # Examples
19///
20/// ```rust,no_run
21/// use scoutquest_rust::*;
22///
23/// #[tokio::main]
24/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
25/// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
26///
27/// // Register a service
28/// client.register_service("my-service", "localhost", 3000, None).await?;
29///
30/// // Discover services
31/// let instance = client.discover_service("other-service", None).await?;
32///
33/// Ok(())
34/// }
35/// ```
36#[derive(Clone)]
37pub struct ServiceDiscoveryClient {
38 discovery_url: String,
39 http_client: HttpClient,
40 registered_instance: Arc<RwLock<Option<ServiceInstance>>>,
41 heartbeat_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
42 retry_attempts: usize,
43 retry_delay: Duration,
44}
45
46impl ServiceDiscoveryClient {
47 /// Creates a new ServiceDiscoveryClient with default configuration.
48 ///
49 /// # Arguments
50 ///
51 /// * `discovery_url` - The base URL of the ScoutQuest discovery server
52 ///
53 /// # Returns
54 ///
55 /// Returns a Result containing the client or an error if the URL is invalid.
56 ///
57 /// # Examples
58 ///
59 /// ```rust,no_run
60 /// use scoutquest_rust::ServiceDiscoveryClient;
61 ///
62 /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
63 /// # Ok::<(), Box<dyn std::error::Error>>(())
64 /// ```
65 pub fn new(discovery_url: &str) -> Result<Self> {
66 Self::with_config(
67 discovery_url,
68 Duration::from_secs(30),
69 3,
70 Duration::from_secs(1),
71 )
72 }
73
74 /// Creates a new ServiceDiscoveryClient with custom configuration.
75 ///
76 /// # Arguments
77 ///
78 /// * `discovery_url` - The base URL of the ScoutQuest discovery server
79 /// * `timeout` - HTTP request timeout
80 /// * `retry_attempts` - Number of retry attempts for failed requests
81 /// * `retry_delay` - Base delay between retry attempts
82 ///
83 /// # Returns
84 ///
85 /// Returns a Result containing the client or an error if the URL is invalid.
86 pub fn with_config(
87 discovery_url: &str,
88 timeout: Duration,
89 retry_attempts: usize,
90 retry_delay: Duration,
91 ) -> Result<Self> {
92 let discovery_url = discovery_url.trim_end_matches('/').to_string();
93
94 Url::parse(&discovery_url)?;
95
96 let http_client = HttpClient::builder()
97 .timeout(timeout)
98 .build()
99 .map_err(ScoutQuestError::NetworkError)?;
100
101 Ok(Self {
102 discovery_url,
103 http_client,
104 registered_instance: Arc::new(RwLock::new(None)),
105 heartbeat_handle: Arc::new(Mutex::new(None)),
106 retry_attempts,
107 retry_delay,
108 })
109 }
110
111 /// Registers a service with the ScoutQuest discovery server.
112 ///
113 /// This method registers a service instance and starts automatic heartbeat
114 /// to maintain the registration. Only one service can be registered per client.
115 ///
116 /// # Arguments
117 ///
118 /// * `service_name` - The name of the service to register
119 /// * `host` - The hostname or IP address where the service is running
120 /// * `port` - The port number where the service is listening
121 /// * `options` - Optional registration options (metadata, tags, health check, etc.)
122 ///
123 /// # Returns
124 ///
125 /// Returns the registered ServiceInstance or an error if registration fails.
126 ///
127 /// # Examples
128 ///
129 /// ```rust,no_run
130 /// use scoutquest_rust::*;
131 ///
132 /// # #[tokio::main]
133 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
134 /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
135 ///
136 /// let options = ServiceRegistrationOptions::new()
137 /// .with_tags(vec!["api".to_string(), "v1".to_string()]);
138 ///
139 /// let instance = client.register_service("user-service", "localhost", 3000, Some(options)).await?;
140 /// println!("Registered with ID: {}", instance.id);
141 /// # Ok(())
142 /// # }
143 /// ```
144 pub async fn register_service(
145 &self,
146 service_name: &str,
147 host: &str,
148 port: u16,
149 options: Option<ServiceRegistrationOptions>,
150 ) -> Result<ServiceInstance> {
151 let options = options.unwrap_or_default();
152
153 let request = RegisterServiceRequest {
154 service_name: service_name.to_string(),
155 host: host.to_string(),
156 port,
157 secure: options.secure,
158 metadata: options.metadata,
159 tags: options.tags,
160 health_check: options.health_check,
161 };
162
163 let url = format!("{}/api/services", self.discovery_url);
164
165 let response = self.http_client.post(&url).json(&request).send().await?;
166
167 if response.status().is_success() {
168 let instance: ServiceInstance = response.json().await?;
169
170 {
171 let mut registered = self.registered_instance.write().await;
172 *registered = Some(instance.clone());
173 }
174
175 self.start_heartbeat().await;
176
177 info!(
178 "Service {} registered with ID: {}",
179 service_name, instance.id
180 );
181 Ok(instance)
182 } else {
183 let status = response.status().as_u16();
184 let message = response.text().await.unwrap_or_default();
185 Err(ScoutQuestError::RegistrationFailed { status, message })
186 }
187 }
188
189 /// Discovers a service instance from the ScoutQuest discovery server.
190 ///
191 /// # Arguments
192 ///
193 /// * `service_name` - The name of the service to discover
194 /// * `options` - Discovery options (healthy only, tags, etc.)
195 ///
196 /// # Returns
197 ///
198 /// Returns a ServiceInstance or an error if no instances are available.
199 ///
200 /// # Examples
201 ///
202 /// ```rust,no_run
203 /// use scoutquest_rust::*;
204 ///
205 /// # #[tokio::main]
206 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
207 /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
208 ///
209 /// let instance = client.discover_service("user-service", None).await?;
210 /// println!("Found instance: {}:{}", instance.host, instance.port);
211 /// # Ok(())
212 /// # }
213 /// ```
214 pub async fn discover_service(
215 &self,
216 service_name: &str,
217 options: Option<ServiceDiscoveryOptions>,
218 ) -> Result<ServiceInstance> {
219 let options = options.unwrap_or_default();
220
221 let mut url = Url::parse(&format!(
222 "{}/api/discovery/{}",
223 self.discovery_url, service_name
224 ))?;
225
226 {
227 let mut query_pairs = url.query_pairs_mut();
228 query_pairs.append_pair("healthy_only", &options.healthy_only.to_string());
229
230 if let Some(tags) = &options.tags {
231 query_pairs.append_pair("tags", &tags.join(","));
232 }
233
234 if let Some(limit) = options.limit {
235 query_pairs.append_pair("limit", &limit.to_string());
236 }
237 }
238
239 let response = self.http_client.get(url).send().await?;
240
241 if response.status().is_success() {
242 let instance: ServiceInstance = response.json().await?;
243 debug!(
244 "Discovered instance for service {}: {}:{}",
245 service_name, instance.host, instance.port
246 );
247 Ok(instance)
248 } else if response.status().as_u16() == 404 {
249 Err(ScoutQuestError::ServiceNotFound {
250 service_name: service_name.to_string(),
251 })
252 } else {
253 warn!(
254 "Discovery failed for {}: {}",
255 service_name,
256 response.status()
257 );
258 Err(ScoutQuestError::InternalError(format!(
259 "Discovery failed with status: {}",
260 response.status()
261 )))
262 }
263 }
264
265 /// Finds all services that have the specified tag.
266 ///
267 /// # Arguments
268 ///
269 /// * `tag` - The tag to search for
270 ///
271 /// # Returns
272 ///
273 /// Returns a vector of Service objects that have the specified tag.
274 pub async fn get_services_by_tag(&self, tag: &str) -> Result<Vec<Service>> {
275 let url = format!("{}/api/services/tags/{}", self.discovery_url, tag);
276
277 let response = self.http_client.get(&url).send().await?;
278
279 if response.status().is_success() {
280 let services: Vec<Service> = response.json().await?;
281 Ok(services)
282 } else {
283 warn!("Tag search failed for {}: {}", tag, response.status());
284 Ok(Vec::new())
285 }
286 }
287
288 /// Calls a REST API endpoint on a discovered service with retry logic.
289 ///
290 /// # Arguments
291 ///
292 /// * `service_name` - The name of the service to call
293 /// * `path` - The API path to call
294 /// * `method` - The HTTP method to use
295 /// * `body` - Optional request body
296 ///
297 /// # Returns
298 ///
299 /// Returns the deserialized response of type T.
300 pub async fn call_service<T>(
301 &self,
302 service_name: &str,
303 path: &str,
304 method: Method,
305 body: Option<Value>,
306 ) -> Result<T>
307 where
308 T: serde::de::DeserializeOwned,
309 {
310 for attempt in 1..=self.retry_attempts {
311 match self
312 .try_call_service(service_name, path, &method, &body)
313 .await
314 {
315 Ok(response) => {
316 info!(
317 "Successful call to {}:{} (attempt {})",
318 service_name, path, attempt
319 );
320 return Ok(response);
321 }
322 Err(e) => {
323 warn!(
324 "Attempt {}/{} failed for {}:{}: {}",
325 attempt, self.retry_attempts, service_name, path, e
326 );
327
328 if attempt == self.retry_attempts {
329 error!(
330 "Final failure calling {}:{} after {} attempts",
331 service_name, path, self.retry_attempts
332 );
333 return Err(e);
334 }
335
336 sleep(self.retry_delay * attempt as u32).await;
337 }
338 }
339 }
340
341 unreachable!()
342 }
343
344 /// Tries to call a service endpoint with the specified parameters.
345 ///
346 /// # Arguments
347 ///
348 /// * `service_name` - The name of the service to call
349 /// * `path` - The API path to call
350 /// * `method` - The HTTP method to use
351 /// * `body` - The request body
352 ///
353 /// # Returns
354 ///
355 /// Returns the deserialized response of type T.
356 async fn try_call_service<T>(
357 &self,
358 service_name: &str,
359 path: &str,
360 method: &Method,
361 body: &Option<Value>,
362 ) -> Result<T>
363 where
364 T: serde::de::DeserializeOwned,
365 {
366 let instance = self.discover_service(service_name, None).await?;
367 let url = instance.get_url(path);
368
369 let mut request_builder = self.http_client.request(method.clone(), &url);
370
371 if let Some(body) = body {
372 request_builder = request_builder.json(body);
373 }
374
375 let response = request_builder.send().await?;
376
377 if response.status().is_success() {
378 let result: T = response.json().await?;
379 Ok(result)
380 } else {
381 Err(ScoutQuestError::InternalError(format!(
382 "HTTP error {}: {}",
383 response.status(),
384 response.text().await.unwrap_or_default()
385 )))
386 }
387 }
388
389 /// Makes an HTTP GET request to a discovered service.
390 ///
391 /// # Arguments
392 ///
393 /// * `service_name` - The name of the service to call
394 /// * `path` - The API path to call
395 ///
396 /// # Returns
397 ///
398 /// Returns the deserialized response of type T.
399 ///
400 /// # Examples
401 ///
402 /// ```rust,no_run
403 /// use scoutquest_rust::*;
404 /// use serde_json::Value;
405 ///
406 /// # #[tokio::main]
407 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
408 /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
409 ///
410 /// let response: Value = client.get("user-service", "/api/users").await?;
411 /// # Ok(())
412 /// # }
413 /// ```
414 pub async fn get<T>(&self, service_name: &str, path: &str) -> Result<T>
415 where
416 T: serde::de::DeserializeOwned,
417 {
418 self.call_service(service_name, path, Method::GET, None)
419 .await
420 }
421
422 /// Makes an HTTP POST request to a discovered service.
423 ///
424 /// # Arguments
425 ///
426 /// * `service_name` - The name of the service to call
427 /// * `path` - The API path to call
428 /// * `body` - The JSON body to send
429 ///
430 /// # Returns
431 ///
432 /// Returns the deserialized response of type T.
433 pub async fn post<T>(&self, service_name: &str, path: &str, body: Value) -> Result<T>
434 where
435 T: serde::de::DeserializeOwned,
436 {
437 self.call_service(service_name, path, Method::POST, Some(body))
438 .await
439 }
440
441 /// Makes an HTTP PUT request to a discovered service.
442 ///
443 /// # Arguments
444 ///
445 /// * `service_name` - The name of the service to call
446 /// * `path` - The API path to call
447 /// * `body` - The JSON body to send
448 ///
449 /// # Returns
450 ///
451 /// Returns the deserialized response of type T.
452 pub async fn put<T>(&self, service_name: &str, path: &str, body: Value) -> Result<T>
453 where
454 T: serde::de::DeserializeOwned,
455 {
456 self.call_service(service_name, path, Method::PUT, Some(body))
457 .await
458 }
459
460 /// Makes an HTTP DELETE request to a discovered service.
461 ///
462 /// # Arguments
463 ///
464 /// * `service_name` - The name of the service to call
465 /// * `path` - The API path to call
466 ///
467 /// # Returns
468 ///
469 /// Returns an empty result on success.
470 pub async fn delete(&self, service_name: &str, path: &str) -> Result<()> {
471 let _: Value = self
472 .call_service(service_name, path, Method::DELETE, None)
473 .await?;
474 Ok(())
475 }
476
477 /// Deregisters the currently registered service from the discovery server.
478 ///
479 /// This stops the automatic heartbeat and removes the service registration.
480 /// It's important to call this method before dropping the client to ensure
481 /// clean shutdown.
482 ///
483 /// # Returns
484 ///
485 /// Returns an empty result on success.
486 ///
487 /// # Examples
488 ///
489 /// ```rust,no_run
490 /// use scoutquest_rust::*;
491 ///
492 /// # #[tokio::main]
493 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
494 /// let client = ServiceDiscoveryClient::new("http://localhost:8080")?;
495 /// client.register_service("my-service", "localhost", 3000, None).await?;
496 ///
497 /// // ... do work ...
498 ///
499 /// client.deregister().await?;
500 /// # Ok(())
501 /// # }
502 /// ```
503 pub async fn deregister(&self) -> Result<()> {
504 let instance = {
505 let registered = self.registered_instance.read().await;
506 registered.clone()
507 };
508
509 if let Some(instance) = instance {
510 self.stop_heartbeat().await;
511
512 let url = format!(
513 "{}/api/services/{}/instances/{}",
514 self.discovery_url, instance.service_name, instance.id
515 );
516
517 let response = self.http_client.delete(&url).send().await?;
518
519 if response.status().is_success() {
520 info!("Service {} deregistered", instance.service_name);
521 } else {
522 warn!("Deregistration failed: {}", response.status());
523 }
524
525 {
526 let mut registered = self.registered_instance.write().await;
527 *registered = None;
528 }
529 }
530
531 Ok(())
532 }
533
534 /// Starts the heartbeat mechanism for the registered service instance.
535 ///
536 /// This method initiates a periodic heartbeat signal to the service discovery
537 /// server, indicating that the service instance is still alive and healthy.
538 async fn start_heartbeat(&self) {
539 self.stop_heartbeat().await;
540
541 let discovery_url = self.discovery_url.clone();
542 let http_client = self.http_client.clone();
543 let registered_instance = self.registered_instance.clone();
544
545 let handle = tokio::spawn(async move {
546 let mut interval = interval(Duration::from_secs(30));
547
548 loop {
549 interval.tick().await;
550
551 let instance = {
552 let registered = registered_instance.read().await;
553 registered.clone()
554 };
555
556 if let Some(instance) = instance {
557 let url = format!(
558 "{}/api/services/{}/instances/{}/heartbeat",
559 discovery_url, instance.service_name, instance.id
560 );
561
562 match http_client.post(&url).send().await {
563 Ok(response) => {
564 if !response.status().is_success() {
565 warn!("Heartbeat failed: {}", response.status());
566 }
567 }
568 Err(e) => {
569 error!("Error during heartbeat: {}", e);
570 }
571 }
572 } else {
573 break; // No registered instance, stop heartbeat
574 }
575 }
576 });
577
578 {
579 let mut heartbeat_handle = self.heartbeat_handle.lock().await;
580 *heartbeat_handle = Some(handle);
581 }
582 }
583
584 /// Stops the heartbeat mechanism for the registered service instance.
585 ///
586 /// This method stops the periodic heartbeat signal to the service discovery
587 /// server, indicating that the service instance is no longer alive or healthy.
588 async fn stop_heartbeat(&self) {
589 let mut heartbeat_handle = self.heartbeat_handle.lock().await;
590 if let Some(handle) = heartbeat_handle.take() {
591 handle.abort();
592 }
593 }
594
595 /// Retrieves the currently registered service instance.
596 ///
597 /// This method returns a clone of the registered service instance, if it exists.
598 pub async fn get_registered_instance(&self) -> Option<ServiceInstance> {
599 let registered = self.registered_instance.read().await;
600 registered.clone()
601 }
602
603 /// Retrieves the discovery URL for the service.
604 ///
605 /// This method returns the discovery URL for the service.
606 pub fn get_discovery_url(&self) -> &str {
607 &self.discovery_url
608 }
609}
610
611/// Service discovery client for interacting with the ScoutQuest server.
612impl Drop for ServiceDiscoveryClient {
613 /// This method is called when the ServiceDiscoveryClient is dropped.
614 fn drop(&mut self) {
615 if Arc::strong_count(&self.registered_instance) > 1 {
616 warn!("ServiceDiscoveryClient dropped without calling deregister(). Call deregister() before dropping.");
617 }
618 }
619}