async_dashscope/operation/task.rs
1use crate::error::{DashScopeError, Result};
2use crate::{Client, operation::common::TaskStatus};
3use output::*;
4use std::time::Duration;
5use tokio::time::sleep;
6const TASK_PATH: &str = "/tasks";
7
8pub mod output;
9
10pub struct Task<'a> {
11 client: &'a Client,
12}
13
14impl<'a> Task<'a> {
15 pub fn new(client: &'a Client) -> Self {
16 Self { client }
17 }
18
19 pub(crate) async fn query(&self, task_id: &str) -> Result<TaskResult> {
20 let http_client = self.client.http_client();
21 let headers = self.client.config().headers();
22 let req = http_client
23 .get(
24 self.client
25 .config()
26 .url(format!("{}/{}", TASK_PATH, task_id).as_str()),
27 )
28 .headers(headers)
29 .build()?;
30
31 let resp = http_client.execute(req).await?.bytes().await?;
32
33 // 检查响应是否为空
34 if resp.is_empty() {
35 return Err(DashScopeError::ApiError(crate::error::ApiError {
36 message: "API returned empty response".to_string(),
37 request_id: None,
38 code: Some("EmptyResponse".to_string()),
39 }));
40 }
41
42 let raw_response_str = String::from_utf8_lossy(resp.as_ref());
43 println!("Raw API response: {}", raw_response_str);
44
45 let resp_json = serde_json::from_slice::<TaskResult>(resp.as_ref()).map_err(|e| {
46 crate::error::DashScopeError::JSONDeserialize {
47 source: e,
48 raw_response: resp.to_vec(),
49 }
50 })?;
51
52 Ok(resp_json)
53 }
54
55 /// 轮询任务状态
56 ///
57 /// 该方法会定期查询任务状态,直到任务完成、失败或达到最大轮询次数。
58 ///
59 /// # Arguments
60 /// * `task_id` - 要轮询的任务ID
61 /// * `interval` - 每次轮询之间的间隔时间(秒)
62 /// * `max_attempts` - 最大轮询尝试次数
63 ///
64 /// # Returns
65 /// 返回 `Result<TaskResult>`,包含最终任务结果或错误
66 ///
67 /// # Errors
68 /// - 当任务在最大轮询次数内未完成时返回 `TimeoutError`
69 /// - 当遇到不可重试的错误(如配置错误)时返回相应错误
70 /// - 当API返回空响应或格式错误时会继续重试
71 ///
72 /// # Notes
73 /// - 对于可恢复的错误(如网络问题、临时API错误)会自动重试
74 /// - 每次轮询会打印当前状态信息到标准输出
75 pub async fn poll_task_status(
76 &self,
77 task_id: &str,
78 interval: u64,
79 max_attempts: u32,
80 ) -> Result<TaskResult> {
81 for attempt in 1..=max_attempts {
82 // println!("第 {} 次轮询...", attempt);
83
84 match self.query(task_id).await {
85 Ok(result) => {
86 let task_status = &result.output.task_status;
87 // println!("当前任务状态: {:?}", task_status);
88
89 // 如果任务完成或失败,返回结果
90 match task_status {
91 TaskStatus::Succeeded => {
92 // println!("任务执行完成,退出轮询");
93 return Ok(result);
94 }
95 TaskStatus::Failed => {
96 // println!("任务执行失败,退出轮询");
97 return Ok(result);
98 }
99 TaskStatus::Pending | TaskStatus::Running => {
100 // 继续轮询
101 println!("任务仍在进行中,等待 {} 秒后继续轮询...", interval);
102 sleep(Duration::from_secs(interval)).await;
103 }
104 TaskStatus::Canceled | TaskStatus::Unknown => {
105 return Ok(result);
106 }
107 }
108 }
109 Err(e) => {
110 // 区分不同类型的错误
111 match &e {
112 DashScopeError::JSONDeserialize {
113 source: _,
114 raw_response: _,
115 } => {
116 // JSON 反序列化错误,可能是 API 响应格式问题
117 // 继续重试,可能是临时问题
118 sleep(Duration::from_secs(interval)).await;
119 }
120 DashScopeError::Reqwest(_) => {
121 // 网络错误,继续重试
122 sleep(Duration::from_secs(interval)).await;
123 }
124 DashScopeError::ApiError(api_error) => {
125 // API 错误,检查是否是空响应错误
126 if api_error.code.as_deref() == Some("EmptyResponse") {
127 sleep(Duration::from_secs(interval)).await;
128 } else {
129 // 其他 API 错误,可能是配置问题,直接返回错误
130 return Err(e);
131 }
132 }
133 _ => {
134 // 其他错误,可能是配置问题,直接返回错误
135 return Err(e);
136 }
137 }
138 }
139 }
140 if attempt > max_attempts {
141 break;
142 }
143 }
144
145 // 超过最大轮询次数
146 Err(DashScopeError::TimeoutError(
147 "轮询超时,任务未在预期时间内完成".to_string(),
148 ))
149 }
150}