Skip to main content

async_dashscope/operation/
task.rs

1use crate::error::{DashScopeError, Result};
2use crate::{Client, operation::common::TaskStatus};
3use output::*;
4use std::time::Duration;
5use tokio::time::sleep;
6const TASK_PATH: &str = "/tasks";
7
8pub mod output;
9
10pub struct Task<'a> {
11    client: &'a Client,
12}
13
14impl<'a> Task<'a> {
15    pub fn new(client: &'a Client) -> Self {
16        Self { client }
17    }
18
19    pub(crate) async fn query(&self, task_id: &str) -> Result<TaskResult> {
20        let http_client = self.client.http_client();
21        let headers = self.client.config().headers();
22        let req = http_client
23            .get(
24                self.client
25                    .config()
26                    .url(format!("{}/{}", TASK_PATH, task_id).as_str()),
27            )
28            .headers(headers)
29            .build()?;
30
31        let resp = http_client.execute(req).await?.bytes().await?;
32
33        // 检查响应是否为空
34        if resp.is_empty() {
35            return Err(DashScopeError::ApiError(crate::error::ApiError {
36                message: "API returned empty response".to_string(),
37                request_id: None,
38                code: Some("EmptyResponse".to_string()),
39            }));
40        }
41
42        let raw_response_str = String::from_utf8_lossy(resp.as_ref());
43        tracing::debug!("Raw API response: {}", raw_response_str);
44
45        let resp_json = serde_json::from_slice::<TaskResult>(resp.as_ref()).map_err(|e| {
46            crate::error::DashScopeError::JSONDeserialize {
47                source: e,
48                raw_response: String::from_utf8_lossy(&resp).to_string(),
49            }
50        })?;
51
52        Ok(resp_json)
53    }
54
55    /// 轮询任务状态
56    ///
57    /// 该方法会定期查询任务状态,直到任务完成、失败或达到最大轮询次数。
58    ///
59    /// # Arguments
60    /// * `task_id` - 要轮询的任务ID
61    /// * `interval` - 每次轮询之间的间隔时间(秒)
62    /// * `max_attempts` - 最大轮询尝试次数
63    ///
64    /// # Returns
65    /// 返回 `Result<TaskResult>`,包含最终任务结果或错误
66    ///
67    /// # Errors
68    /// - 当任务在最大轮询次数内未完成时返回 `TimeoutError`
69    /// - 当遇到不可重试的错误(如配置错误)时返回相应错误
70    /// - 当API返回空响应或格式错误时会继续重试
71    ///
72    /// # Notes
73    /// - 对于可恢复的错误(如网络问题、临时API错误)会自动重试
74    /// - 每次轮询会打印当前状态信息到标准输出
75    pub async fn poll_task_status(
76        &self,
77        task_id: &str,
78        interval: u64,
79        max_attempts: u32,
80    ) -> Result<TaskResult> {
81        let mut attempts = 0;
82        loop {
83            attempts += 1;
84            if attempts > max_attempts {
85                return Err(DashScopeError::TimeoutError(
86                    "polling timeout, task did not complete within expected time".to_string(),
87                ));
88            }
89
90            match self.query(task_id).await {
91                Ok(result) => {
92                    let task_status = &result.output.task_status;
93
94                    match task_status {
95                        TaskStatus::Succeeded => {
96                            return Ok(result);
97                        }
98                        TaskStatus::Failed => {
99                            return Ok(result);
100                        }
101                        TaskStatus::Pending | TaskStatus::Running => {
102                            tracing::info!(
103                                "Task still in progress, waiting {} seconds before next poll...",
104                                interval
105                            );
106                            sleep(Duration::from_secs(interval)).await;
107                        }
108                        TaskStatus::Canceled | TaskStatus::Unknown => {
109                            return Ok(result);
110                        }
111                    }
112                }
113                Err(e) => {
114                    match &e {
115                        DashScopeError::JSONDeserialize {
116                            source: _,
117                            raw_response: _,
118                        } => {
119                            sleep(Duration::from_secs(interval)).await;
120                        }
121                        DashScopeError::Reqwest(_) => {
122                            sleep(Duration::from_secs(interval)).await;
123                        }
124                        DashScopeError::ApiError(api_error) => {
125                            if api_error.code.as_deref() == Some("EmptyResponse") {
126                                sleep(Duration::from_secs(interval)).await;
127                            } else {
128                                return Err(e);
129                            }
130                        }
131                        _ => {
132                            return Err(e);
133                        }
134                    }
135                }
136            }
137        }
138    }
139}