go_server_rust_sdk/scheduler/
client.rs

1//! Scheduler client implementation
2
3use reqwest::Client as HttpClient;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::time::Duration;
7use tokio::time::{sleep, timeout};
8use crate::crypto::{encrypt_data, salt_key, decrypt_data};
9use crate::error::{Result, SdkError};
10use super::{TASK_STATUS_ERROR, TASK_STATUS_DONE, TASK_STATUS_PENDING, TASK_STATUS_PROCESSING};
11
12/// Scheduler client for executing tasks
13#[derive(Clone)]
14pub struct Client {
15    base_url: String,
16    http_client: HttpClient,
17}
18
19/// Task execution request
20#[derive(Serialize, Debug)]
21pub struct ExecuteRequest {
22    pub method: String,
23    pub params: Value,
24}
25
26/// Encrypted task execution request
27#[derive(Serialize, Debug)]
28pub struct ExecuteEncryptedRequest {
29    pub method: String,
30    pub params: String,
31    pub key: String,
32    pub crypto: String,
33}
34
35/// Task result response
36#[derive(Deserialize, Debug, Clone)]
37pub struct ResultResponse {
38    #[serde(rename = "taskId")]
39    pub task_id: String,
40    pub status: String,
41    pub result: Option<Value>,
42}
43
44impl std::fmt::Display for ResultResponse {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        write!(f, "Task {} (status: {})", self.task_id, self.status)
47    }
48}
49
50impl Client {
51    /// Creates a new scheduler client
52    /// 
53    /// # Arguments
54    /// 
55    /// * `base_url` - The base URL of the scheduler server
56    /// 
57    /// # Example
58    /// 
59    /// ```rust
60    /// use go_server_rust_sdk::scheduler::Client;
61    /// 
62    /// let client = Client::new("http://localhost:8080");
63    /// ```
64    pub fn new(base_url: impl Into<String>) -> Self {
65        Self {
66            base_url: base_url.into(),
67            http_client: HttpClient::builder()
68                .timeout(Duration::from_secs(30))
69                .build()
70                .expect("Failed to create HTTP client"),
71        }
72    }
73
74    /// Executes a task with the given method and parameters
75    /// 
76    /// # Arguments
77    /// 
78    /// * `method` - The method name to execute
79    /// * `params` - The parameters for the method
80    /// 
81    /// # Returns
82    /// 
83    /// A `ResultResponse` containing the task ID and initial status
84    /// 
85    /// # Example
86    /// 
87    /// ```rust,no_run
88    /// use go_server_rust_sdk::scheduler::Client;
89    /// use serde_json::json;
90    /// 
91    /// # #[tokio::main]
92    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
93    /// let client = Client::new("http://localhost:8080");
94    /// let params = json!({"a": 10, "b": 20});
95    /// let response = client.execute("add", params).await?;
96    /// println!("Task ID: {}", response.task_id);
97    /// # Ok(())
98    /// # }
99    /// ```
100    pub async fn execute(&self, method: impl Into<String>, params: Value) -> Result<ResultResponse> {
101        let request = ExecuteRequest {
102            method: method.into(),
103            params,
104        };
105
106        let url = format!("{}/api/execute", self.base_url);
107        let response = self.http_client
108            .post(&url)
109            .json(&request)
110            .send()
111            .await?
112            .error_for_status()?;
113
114        let result: ResultResponse = response.json().await?;
115        Ok(result)
116    }
117
118    /// Executes an encrypted task with the given method, key, salt and parameters
119    /// 
120    /// # Arguments
121    /// 
122    /// * `method` - The method name to execute
123    /// * `key` - The encryption key
124    /// * `salt` - The salt value for key encryption
125    /// * `params` - The parameters for the method
126    /// 
127    /// # Returns
128    /// 
129    /// A `ResultResponse` containing the task ID and initial status
130    /// 
131    /// # Example
132    /// 
133    /// ```rust,no_run
134    /// use go_server_rust_sdk::scheduler::Client;
135    /// use serde_json::json;
136    /// 
137    /// # #[tokio::main]
138    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
139    /// let client = Client::new("http://localhost:8080");
140    /// let params = json!({"a": 10, "b": 20});
141    /// let response = client.execute_encrypted("add", "my-secret-key", 123456, params).await?;
142    /// println!("Task ID: {}", response.task_id);
143    /// # Ok(())
144    /// # }
145    /// ```
146    pub async fn execute_encrypted(
147        &self,
148        method: impl Into<String>,
149        key: &str,
150        salt: i32,
151        params: Value,
152    ) -> Result<ResultResponse> {
153        // Encrypt parameters
154        let encrypted_params = encrypt_data(&params, key)?;
155
156        // Salt the key
157        let salted_key = salt_key(key, salt)?;
158
159        let request = ExecuteEncryptedRequest {
160            method: method.into(),
161            params: encrypted_params,
162            key: salted_key,
163            crypto: salt.to_string(),
164        };
165
166        let url = format!("{}/api/encrypted/execute", self.base_url);
167        let response = self.http_client
168            .post(&url)
169            .json(&request)
170            .send()
171            .await?
172            .error_for_status()?;
173
174        let result: ResultResponse = response.json().await?;
175        Ok(result)
176    }
177
178    /// Retrieves the result of a task by its ID with polling
179    /// 
180    /// This method will automatically poll the server until the task is complete
181    /// or an error occurs.
182    /// 
183    /// # Arguments
184    /// 
185    /// * `task_id` - The ID of the task to retrieve
186    /// 
187    /// # Returns
188    /// 
189    /// A `ResultResponse` containing the final task result
190    /// 
191    /// # Example
192    /// 
193    /// ```rust,no_run
194    /// use go_server_rust_sdk::scheduler::Client;
195    /// 
196    /// # #[tokio::main]
197    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
198    /// let client = Client::new("http://localhost:8080");
199    /// let result = client.get_result("task-123").await?;
200    /// println!("Result: {:?}", result.result);
201    /// # Ok(())
202    /// # }
203    /// ```
204    pub async fn get_result(&self, task_id: &str) -> Result<ResultResponse> {
205        loop {
206            let url = format!("{}/api/result/{}", self.base_url, task_id);
207            let response = self.http_client
208                .get(&url)
209                .send()
210                .await?
211                .error_for_status()?;
212
213            let result: ResultResponse = response.json().await?;
214
215            match result.status.as_str() {
216                TASK_STATUS_PENDING | TASK_STATUS_PROCESSING => {
217                    sleep(Duration::from_secs(1)).await;
218                    continue;
219                }
220                TASK_STATUS_ERROR => {
221                    let error_msg = result.result
222                        .as_ref()
223                        .and_then(|v| v.as_str())
224                        .unwrap_or("Unknown error");
225                    return Err(SdkError::Task(error_msg.to_string()));
226                }
227                _ => return Ok(result),
228            }
229        }
230    }
231
232    /// Retrieves and decrypts the result of an encrypted task by its ID
233    /// 
234    /// This method will automatically poll the server until the task is complete,
235    /// then decrypt the result using the provided key.
236    /// 
237    /// # Arguments
238    /// 
239    /// * `task_id` - The ID of the task to retrieve
240    /// * `key` - The decryption key
241    /// * `salt` - The salt value used for encryption
242    /// 
243    /// # Returns
244    /// 
245    /// A `ResultResponse` containing the decrypted task result
246    /// 
247    /// # Example
248    /// 
249    /// ```rust,no_run
250    /// use go_server_rust_sdk::scheduler::Client;
251    /// 
252    /// # #[tokio::main]
253    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
254    /// let client = Client::new("http://localhost:8080");
255    /// let result = client.get_result_encrypted("task-123", "my-secret-key", 123456).await?;
256    /// println!("Decrypted result: {:?}", result.result);
257    /// # Ok(())
258    /// # }
259    /// ```
260    pub async fn get_result_encrypted(
261        &self,
262        task_id: &str,
263        key: &str,
264        _salt: i32,
265    ) -> Result<ResultResponse> {
266        loop {
267            let url = format!("{}/api/encrypted/result/{}", self.base_url, task_id);
268            let response = self.http_client
269                .get(&url)
270                .send()
271                .await?
272                .error_for_status()?;
273
274            let mut result: ResultResponse = response.json().await?;
275
276            match result.status.as_str() {
277                TASK_STATUS_PENDING | TASK_STATUS_PROCESSING => {
278                    sleep(Duration::from_secs(1)).await;
279                    continue;
280                }
281                TASK_STATUS_ERROR => {
282                    let error_msg = result.result
283                        .as_ref()
284                        .and_then(|v| v.as_str())
285                        .unwrap_or("Unknown error");
286                    return Err(SdkError::Task(error_msg.to_string()));
287                }
288                TASK_STATUS_DONE => {
289                    // Decrypt result data if present
290                    if let Some(encrypted_result) = &result.result {
291                        if let Some(encrypted_str) = encrypted_result.as_str() {
292                            let decrypted_result = decrypt_data(encrypted_str, key)?;
293                            result.result = Some(decrypted_result);
294                        }
295                    }
296                    return Ok(result);
297                }
298                _ => return Ok(result),
299            }
300        }
301    }
302
303    /// Executes a task synchronously with polling and timeout
304    /// 
305    /// This is a convenience method that combines `execute` and `get_result`
306    /// with a timeout.
307    /// 
308    /// # Arguments
309    /// 
310    /// * `method` - The method name to execute
311    /// * `params` - The parameters for the method
312    /// * `timeout_duration` - Maximum time to wait for completion
313    /// 
314    /// # Returns
315    /// 
316    /// A `ResultResponse` containing the final task result
317    /// 
318    /// # Example
319    /// 
320    /// ```rust,no_run
321    /// use go_server_rust_sdk::scheduler::Client;
322    /// use serde_json::json;
323    /// use std::time::Duration;
324    /// 
325    /// # #[tokio::main]
326    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
327    /// let client = Client::new("http://localhost:8080");
328    /// let params = json!({"a": 10, "b": 20});
329    /// let result = client.execute_sync("add", params, Duration::from_secs(30)).await?;
330    /// println!("Result: {:?}", result.result);
331    /// # Ok(())
332    /// # }
333    /// ```
334    pub async fn execute_sync(
335        &self,
336        method: impl Into<String>,
337        params: Value,
338        timeout_duration: Duration,
339    ) -> Result<ResultResponse> {
340        // Submit task
341        let exec_response = self.execute(method, params).await?;
342
343        // Poll for result with timeout
344        let result = timeout(timeout_duration, self.get_result(&exec_response.task_id)).await
345            .map_err(|_| SdkError::Timeout)??;
346
347        Ok(result)
348    }
349
350    /// Executes an encrypted task synchronously with polling, decryption and timeout
351    /// 
352    /// This is a convenience method that combines `execute_encrypted` and 
353    /// `get_result_encrypted` with a timeout.
354    /// 
355    /// # Arguments
356    /// 
357    /// * `method` - The method name to execute
358    /// * `key` - The encryption key
359    /// * `salt` - The salt value for key encryption
360    /// * `params` - The parameters for the method
361    /// * `timeout_duration` - Maximum time to wait for completion
362    /// 
363    /// # Returns
364    /// 
365    /// A `ResultResponse` containing the decrypted task result
366    /// 
367    /// # Example
368    /// 
369    /// ```rust,no_run
370    /// use go_server_rust_sdk::scheduler::Client;
371    /// use serde_json::json;
372    /// use std::time::Duration;
373    /// 
374    /// # #[tokio::main]
375    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
376    /// let client = Client::new("http://localhost:8080");
377    /// let params = json!({"a": 10, "b": 20});
378    /// let result = client.execute_sync_encrypted(
379    ///     "add", 
380    ///     "my-secret-key", 
381    ///     123456, 
382    ///     params, 
383    ///     Duration::from_secs(30)
384    /// ).await?;
385    /// println!("Decrypted result: {:?}", result.result);
386    /// # Ok(())
387    /// # }
388    /// ```
389    pub async fn execute_sync_encrypted(
390        &self,
391        method: impl Into<String>,
392        key: &str,
393        salt: i32,
394        params: Value,
395        timeout_duration: Duration,
396    ) -> Result<ResultResponse> {
397        // Submit encrypted task
398        let exec_response = self.execute_encrypted(method, key, salt, params).await?;
399
400        // Poll for result with timeout and decryption
401        let result = timeout(
402            timeout_duration,
403            self.get_result_encrypted(&exec_response.task_id, key, salt),
404        )
405        .await
406        .map_err(|_| SdkError::Timeout)??;
407
408        Ok(result)
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use serde_json::json;
416
417    #[test]
418    fn test_client_creation() {
419        let client = Client::new("http://localhost:8080");
420        assert_eq!(client.base_url, "http://localhost:8080");
421    }
422
423    #[test]
424    fn test_execute_request_serialization() {
425        let request = ExecuteRequest {
426            method: "test_method".to_string(),
427            params: json!({"key": "value"}),
428        };
429        
430        let serialized = serde_json::to_string(&request).unwrap();
431        assert!(serialized.contains("test_method"));
432        assert!(serialized.contains("value"));
433    }
434
435    #[test]
436    fn test_result_response_deserialization() {
437        let json_str = r#"{
438            "taskId": "123",
439            "status": "done",
440            "result": {"answer": 42}
441        }"#;
442        
443        let response: ResultResponse = serde_json::from_str(json_str).unwrap();
444        assert_eq!(response.task_id, "123");
445        assert_eq!(response.status, "done");
446        assert!(response.result.is_some());
447    }
448}