go_server_rust_sdk/scheduler/client.rs
1//! Scheduler client implementation
2
3use reqwest::Client as HttpClient;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::time::Duration;
7use tokio::time::{sleep, timeout};
8use crate::crypto::{encrypt_data, salt_key, decrypt_data};
9use crate::error::{Result, SdkError};
10use super::{TASK_STATUS_ERROR, TASK_STATUS_DONE, TASK_STATUS_PENDING, TASK_STATUS_PROCESSING};
11
12/// Scheduler client for executing tasks
13#[derive(Clone)]
14pub struct Client {
15 base_url: String,
16 http_client: HttpClient,
17}
18
19/// Task execution request
20#[derive(Serialize, Debug)]
21pub struct ExecuteRequest {
22 pub method: String,
23 pub params: Value,
24}
25
26/// Encrypted task execution request
27#[derive(Serialize, Debug)]
28pub struct ExecuteEncryptedRequest {
29 pub method: String,
30 pub params: String,
31 pub key: String,
32 pub crypto: String,
33}
34
35/// Task result response
36#[derive(Deserialize, Debug, Clone)]
37pub struct ResultResponse {
38 #[serde(rename = "taskId")]
39 pub task_id: String,
40 pub status: String,
41 pub result: Option<Value>,
42}
43
44impl std::fmt::Display for ResultResponse {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "Task {} (status: {})", self.task_id, self.status)
47 }
48}
49
50impl Client {
51 /// Creates a new scheduler client
52 ///
53 /// # Arguments
54 ///
55 /// * `base_url` - The base URL of the scheduler server
56 ///
57 /// # Example
58 ///
59 /// ```rust
60 /// use go_server_rust_sdk::scheduler::Client;
61 ///
62 /// let client = Client::new("http://localhost:8080");
63 /// ```
64 pub fn new(base_url: impl Into<String>) -> Self {
65 Self {
66 base_url: base_url.into(),
67 http_client: HttpClient::builder()
68 .timeout(Duration::from_secs(30))
69 .build()
70 .expect("Failed to create HTTP client"),
71 }
72 }
73
74 /// Executes a task with the given method and parameters
75 ///
76 /// # Arguments
77 ///
78 /// * `method` - The method name to execute
79 /// * `params` - The parameters for the method
80 ///
81 /// # Returns
82 ///
83 /// A `ResultResponse` containing the task ID and initial status
84 ///
85 /// # Example
86 ///
87 /// ```rust,no_run
88 /// use go_server_rust_sdk::scheduler::Client;
89 /// use serde_json::json;
90 ///
91 /// # #[tokio::main]
92 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
93 /// let client = Client::new("http://localhost:8080");
94 /// let params = json!({"a": 10, "b": 20});
95 /// let response = client.execute("add", params).await?;
96 /// println!("Task ID: {}", response.task_id);
97 /// # Ok(())
98 /// # }
99 /// ```
100 pub async fn execute(&self, method: impl Into<String>, params: Value) -> Result<ResultResponse> {
101 let request = ExecuteRequest {
102 method: method.into(),
103 params,
104 };
105
106 let url = format!("{}/api/execute", self.base_url);
107 let response = self.http_client
108 .post(&url)
109 .json(&request)
110 .send()
111 .await?
112 .error_for_status()?;
113
114 let result: ResultResponse = response.json().await?;
115 Ok(result)
116 }
117
118 /// Executes an encrypted task with the given method, key, salt and parameters
119 ///
120 /// # Arguments
121 ///
122 /// * `method` - The method name to execute
123 /// * `key` - The encryption key
124 /// * `salt` - The salt value for key encryption
125 /// * `params` - The parameters for the method
126 ///
127 /// # Returns
128 ///
129 /// A `ResultResponse` containing the task ID and initial status
130 ///
131 /// # Example
132 ///
133 /// ```rust,no_run
134 /// use go_server_rust_sdk::scheduler::Client;
135 /// use serde_json::json;
136 ///
137 /// # #[tokio::main]
138 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
139 /// let client = Client::new("http://localhost:8080");
140 /// let params = json!({"a": 10, "b": 20});
141 /// let response = client.execute_encrypted("add", "my-secret-key", 123456, params).await?;
142 /// println!("Task ID: {}", response.task_id);
143 /// # Ok(())
144 /// # }
145 /// ```
146 pub async fn execute_encrypted(
147 &self,
148 method: impl Into<String>,
149 key: &str,
150 salt: i32,
151 params: Value,
152 ) -> Result<ResultResponse> {
153 // Encrypt parameters
154 let encrypted_params = encrypt_data(¶ms, key)?;
155
156 // Salt the key
157 let salted_key = salt_key(key, salt)?;
158
159 let request = ExecuteEncryptedRequest {
160 method: method.into(),
161 params: encrypted_params,
162 key: salted_key,
163 crypto: salt.to_string(),
164 };
165
166 let url = format!("{}/api/encrypted/execute", self.base_url);
167 let response = self.http_client
168 .post(&url)
169 .json(&request)
170 .send()
171 .await?
172 .error_for_status()?;
173
174 let result: ResultResponse = response.json().await?;
175 Ok(result)
176 }
177
178 /// Retrieves the result of a task by its ID with polling
179 ///
180 /// This method will automatically poll the server until the task is complete
181 /// or an error occurs.
182 ///
183 /// # Arguments
184 ///
185 /// * `task_id` - The ID of the task to retrieve
186 ///
187 /// # Returns
188 ///
189 /// A `ResultResponse` containing the final task result
190 ///
191 /// # Example
192 ///
193 /// ```rust,no_run
194 /// use go_server_rust_sdk::scheduler::Client;
195 ///
196 /// # #[tokio::main]
197 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
198 /// let client = Client::new("http://localhost:8080");
199 /// let result = client.get_result("task-123").await?;
200 /// println!("Result: {:?}", result.result);
201 /// # Ok(())
202 /// # }
203 /// ```
204 pub async fn get_result(&self, task_id: &str) -> Result<ResultResponse> {
205 loop {
206 let url = format!("{}/api/result/{}", self.base_url, task_id);
207 let response = self.http_client
208 .get(&url)
209 .send()
210 .await?
211 .error_for_status()?;
212
213 let result: ResultResponse = response.json().await?;
214
215 match result.status.as_str() {
216 TASK_STATUS_PENDING | TASK_STATUS_PROCESSING => {
217 sleep(Duration::from_secs(1)).await;
218 continue;
219 }
220 TASK_STATUS_ERROR => {
221 let error_msg = result.result
222 .as_ref()
223 .and_then(|v| v.as_str())
224 .unwrap_or("Unknown error");
225 return Err(SdkError::Task(error_msg.to_string()));
226 }
227 _ => return Ok(result),
228 }
229 }
230 }
231
232 /// Retrieves and decrypts the result of an encrypted task by its ID
233 ///
234 /// This method will automatically poll the server until the task is complete,
235 /// then decrypt the result using the provided key.
236 ///
237 /// # Arguments
238 ///
239 /// * `task_id` - The ID of the task to retrieve
240 /// * `key` - The decryption key
241 /// * `salt` - The salt value used for encryption
242 ///
243 /// # Returns
244 ///
245 /// A `ResultResponse` containing the decrypted task result
246 ///
247 /// # Example
248 ///
249 /// ```rust,no_run
250 /// use go_server_rust_sdk::scheduler::Client;
251 ///
252 /// # #[tokio::main]
253 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
254 /// let client = Client::new("http://localhost:8080");
255 /// let result = client.get_result_encrypted("task-123", "my-secret-key", 123456).await?;
256 /// println!("Decrypted result: {:?}", result.result);
257 /// # Ok(())
258 /// # }
259 /// ```
260 pub async fn get_result_encrypted(
261 &self,
262 task_id: &str,
263 key: &str,
264 _salt: i32,
265 ) -> Result<ResultResponse> {
266 loop {
267 let url = format!("{}/api/encrypted/result/{}", self.base_url, task_id);
268 let response = self.http_client
269 .get(&url)
270 .send()
271 .await?
272 .error_for_status()?;
273
274 let mut result: ResultResponse = response.json().await?;
275
276 match result.status.as_str() {
277 TASK_STATUS_PENDING | TASK_STATUS_PROCESSING => {
278 sleep(Duration::from_secs(1)).await;
279 continue;
280 }
281 TASK_STATUS_ERROR => {
282 let error_msg = result.result
283 .as_ref()
284 .and_then(|v| v.as_str())
285 .unwrap_or("Unknown error");
286 return Err(SdkError::Task(error_msg.to_string()));
287 }
288 TASK_STATUS_DONE => {
289 // Decrypt result data if present
290 if let Some(encrypted_result) = &result.result {
291 if let Some(encrypted_str) = encrypted_result.as_str() {
292 let decrypted_result = decrypt_data(encrypted_str, key)?;
293 result.result = Some(decrypted_result);
294 }
295 }
296 return Ok(result);
297 }
298 _ => return Ok(result),
299 }
300 }
301 }
302
303 /// Executes a task synchronously with polling and timeout
304 ///
305 /// This is a convenience method that combines `execute` and `get_result`
306 /// with a timeout.
307 ///
308 /// # Arguments
309 ///
310 /// * `method` - The method name to execute
311 /// * `params` - The parameters for the method
312 /// * `timeout_duration` - Maximum time to wait for completion
313 ///
314 /// # Returns
315 ///
316 /// A `ResultResponse` containing the final task result
317 ///
318 /// # Example
319 ///
320 /// ```rust,no_run
321 /// use go_server_rust_sdk::scheduler::Client;
322 /// use serde_json::json;
323 /// use std::time::Duration;
324 ///
325 /// # #[tokio::main]
326 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
327 /// let client = Client::new("http://localhost:8080");
328 /// let params = json!({"a": 10, "b": 20});
329 /// let result = client.execute_sync("add", params, Duration::from_secs(30)).await?;
330 /// println!("Result: {:?}", result.result);
331 /// # Ok(())
332 /// # }
333 /// ```
334 pub async fn execute_sync(
335 &self,
336 method: impl Into<String>,
337 params: Value,
338 timeout_duration: Duration,
339 ) -> Result<ResultResponse> {
340 // Submit task
341 let exec_response = self.execute(method, params).await?;
342
343 // Poll for result with timeout
344 let result = timeout(timeout_duration, self.get_result(&exec_response.task_id)).await
345 .map_err(|_| SdkError::Timeout)??;
346
347 Ok(result)
348 }
349
350 /// Executes an encrypted task synchronously with polling, decryption and timeout
351 ///
352 /// This is a convenience method that combines `execute_encrypted` and
353 /// `get_result_encrypted` with a timeout.
354 ///
355 /// # Arguments
356 ///
357 /// * `method` - The method name to execute
358 /// * `key` - The encryption key
359 /// * `salt` - The salt value for key encryption
360 /// * `params` - The parameters for the method
361 /// * `timeout_duration` - Maximum time to wait for completion
362 ///
363 /// # Returns
364 ///
365 /// A `ResultResponse` containing the decrypted task result
366 ///
367 /// # Example
368 ///
369 /// ```rust,no_run
370 /// use go_server_rust_sdk::scheduler::Client;
371 /// use serde_json::json;
372 /// use std::time::Duration;
373 ///
374 /// # #[tokio::main]
375 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
376 /// let client = Client::new("http://localhost:8080");
377 /// let params = json!({"a": 10, "b": 20});
378 /// let result = client.execute_sync_encrypted(
379 /// "add",
380 /// "my-secret-key",
381 /// 123456,
382 /// params,
383 /// Duration::from_secs(30)
384 /// ).await?;
385 /// println!("Decrypted result: {:?}", result.result);
386 /// # Ok(())
387 /// # }
388 /// ```
389 pub async fn execute_sync_encrypted(
390 &self,
391 method: impl Into<String>,
392 key: &str,
393 salt: i32,
394 params: Value,
395 timeout_duration: Duration,
396 ) -> Result<ResultResponse> {
397 // Submit encrypted task
398 let exec_response = self.execute_encrypted(method, key, salt, params).await?;
399
400 // Poll for result with timeout and decryption
401 let result = timeout(
402 timeout_duration,
403 self.get_result_encrypted(&exec_response.task_id, key, salt),
404 )
405 .await
406 .map_err(|_| SdkError::Timeout)??;
407
408 Ok(result)
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use serde_json::json;
416
417 #[test]
418 fn test_client_creation() {
419 let client = Client::new("http://localhost:8080");
420 assert_eq!(client.base_url, "http://localhost:8080");
421 }
422
423 #[test]
424 fn test_execute_request_serialization() {
425 let request = ExecuteRequest {
426 method: "test_method".to_string(),
427 params: json!({"key": "value"}),
428 };
429
430 let serialized = serde_json::to_string(&request).unwrap();
431 assert!(serialized.contains("test_method"));
432 assert!(serialized.contains("value"));
433 }
434
435 #[test]
436 fn test_result_response_deserialization() {
437 let json_str = r#"{
438 "taskId": "123",
439 "status": "done",
440 "result": {"answer": 42}
441 }"#;
442
443 let response: ResultResponse = serde_json::from_str(json_str).unwrap();
444 assert_eq!(response.task_id, "123");
445 assert_eq!(response.status, "done");
446 assert!(response.result.is_some());
447 }
448}