async_dashscope/operation/
task.rs

1use crate::error::{DashScopeError, Result};
2use crate::{Client, operation::common::TaskStatus};
3use output::*;
4use std::time::Duration;
5use tokio::time::sleep;
6const TASK_PATH: &str = "/tasks";
7
8pub mod output;
9
10pub struct Task<'a> {
11    client: &'a Client,
12}
13
14impl<'a> Task<'a> {
15    pub fn new(client: &'a Client) -> Self {
16        Self { client }
17    }
18
19    pub(crate) async fn query(&self, task_id: &str) -> Result<TaskResult> {
20        let http_client = self.client.http_client();
21        let headers = self.client.config().headers();
22        let req = http_client
23            .get(
24                self.client
25                    .config()
26                    .url(format!("{}/{}", TASK_PATH, task_id).as_str()),
27            )
28            .headers(headers)
29            .build()?;
30
31        let resp = http_client.execute(req).await?.bytes().await?;
32
33        // 检查响应是否为空
34        if resp.is_empty() {
35            return Err(DashScopeError::ApiError(crate::error::ApiError {
36                message: "API returned empty response".to_string(),
37                request_id: None,
38                code: Some("EmptyResponse".to_string()),
39            }));
40        }
41
42        let raw_response_str = String::from_utf8_lossy(resp.as_ref());
43        println!("Raw API response: {}", raw_response_str);
44
45        let resp_json = serde_json::from_slice::<TaskResult>(resp.as_ref()).map_err(|e| {
46            crate::error::DashScopeError::JSONDeserialize {
47                source: e,
48                raw_response: resp.to_vec(),
49            }
50        })?;
51
52        Ok(resp_json)
53    }
54
55    /// 轮询任务状态
56    ///
57    /// 该方法会定期查询任务状态,直到任务完成、失败或达到最大轮询次数。
58    ///
59    /// # Arguments
60    /// * `task_id` - 要轮询的任务ID
61    /// * `interval` - 每次轮询之间的间隔时间(秒)
62    /// * `max_attempts` - 最大轮询尝试次数
63    ///
64    /// # Returns
65    /// 返回 `Result<TaskResult>`,包含最终任务结果或错误
66    ///
67    /// # Errors
68    /// - 当任务在最大轮询次数内未完成时返回 `TimeoutError`
69    /// - 当遇到不可重试的错误(如配置错误)时返回相应错误
70    /// - 当API返回空响应或格式错误时会继续重试
71    ///
72    /// # Notes
73    /// - 对于可恢复的错误(如网络问题、临时API错误)会自动重试
74    /// - 每次轮询会打印当前状态信息到标准输出
75    pub async fn poll_task_status(
76        &self,
77        task_id: &str,
78        interval: u64,
79        max_attempts: u32,
80    ) -> Result<TaskResult> {
81        for attempt in 1..=max_attempts {
82            // println!("第 {} 次轮询...", attempt);
83
84            match self.query(task_id).await {
85                Ok(result) => {
86                    let task_status = &result.output.task_status;
87                    // println!("当前任务状态: {:?}", task_status);
88
89                    // 如果任务完成或失败,返回结果
90                    match task_status {
91                        TaskStatus::Succeeded => {
92                            // println!("任务执行完成,退出轮询");
93                            return Ok(result);
94                        }
95                        TaskStatus::Failed => {
96                            // println!("任务执行失败,退出轮询");
97                            return Ok(result);
98                        }
99                        TaskStatus::Pending | TaskStatus::Running => {
100                            // 继续轮询
101                            println!("任务仍在进行中,等待 {} 秒后继续轮询...", interval);
102                            sleep(Duration::from_secs(interval)).await;
103                        }
104                        TaskStatus::Canceled | TaskStatus::Unknown => {
105                            return Ok(result);
106                        }
107                    }
108                }
109                Err(e) => {
110                    // 区分不同类型的错误
111                    match &e {
112                        DashScopeError::JSONDeserialize {
113                            source: _,
114                            raw_response: _,
115                        } => {
116                            // JSON 反序列化错误,可能是 API 响应格式问题
117                            // 继续重试,可能是临时问题
118                            sleep(Duration::from_secs(interval)).await;
119                        }
120                        DashScopeError::Reqwest(_) => {
121                            // 网络错误,继续重试
122                            sleep(Duration::from_secs(interval)).await;
123                        }
124                        DashScopeError::ApiError(api_error) => {
125                            // API 错误,检查是否是空响应错误
126                            if api_error.code.as_deref() == Some("EmptyResponse") {
127                                sleep(Duration::from_secs(interval)).await;
128                            } else {
129                                // 其他 API 错误,可能是配置问题,直接返回错误
130                                return Err(e);
131                            }
132                        }
133                        _ => {
134                            // 其他错误,可能是配置问题,直接返回错误
135                            return Err(e);
136                        }
137                    }
138                }
139            }
140            if attempt > max_attempts {
141                break;
142            }
143        }
144
145        // 超过最大轮询次数
146        Err(DashScopeError::TimeoutError(
147            "轮询超时,任务未在预期时间内完成".to_string(),
148        ))
149    }
150}