async_dashscope/operation/
task.rs1use crate::error::{DashScopeError, Result};
2use crate::{Client, operation::common::TaskStatus};
3use output::*;
4use std::time::Duration;
5use tokio::time::sleep;
6const TASK_PATH: &str = "/tasks";
7
8pub mod output;
9
10pub struct Task<'a> {
11 client: &'a Client,
12}
13
14impl<'a> Task<'a> {
15 pub fn new(client: &'a Client) -> Self {
16 Self { client }
17 }
18
19 pub(crate) async fn query(&self, task_id: &str) -> Result<TaskResult> {
20 let http_client = self.client.http_client();
21 let headers = self.client.config().headers();
22 let req = http_client
23 .get(
24 self.client
25 .config()
26 .url(format!("{}/{}", TASK_PATH, task_id).as_str()),
27 )
28 .headers(headers)
29 .build()?;
30
31 let resp = http_client.execute(req).await?.bytes().await?;
32
33 if resp.is_empty() {
35 return Err(DashScopeError::ApiError(crate::error::ApiError {
36 message: "API returned empty response".to_string(),
37 request_id: None,
38 code: Some("EmptyResponse".to_string()),
39 }));
40 }
41
42 let raw_response_str = String::from_utf8_lossy(resp.as_ref());
43 tracing::debug!("Raw API response: {}", raw_response_str);
44
45 let resp_json = serde_json::from_slice::<TaskResult>(resp.as_ref()).map_err(|e| {
46 crate::error::DashScopeError::JSONDeserialize {
47 source: e,
48 raw_response: String::from_utf8_lossy(&resp).to_string(),
49 }
50 })?;
51
52 Ok(resp_json)
53 }
54
55 pub async fn poll_task_status(
76 &self,
77 task_id: &str,
78 interval: u64,
79 max_attempts: u32,
80 ) -> Result<TaskResult> {
81 let mut attempts = 0;
82 loop {
83 attempts += 1;
84 if attempts > max_attempts {
85 return Err(DashScopeError::TimeoutError(
86 "polling timeout, task did not complete within expected time".to_string(),
87 ));
88 }
89
90 match self.query(task_id).await {
91 Ok(result) => {
92 let task_status = &result.output.task_status;
93
94 match task_status {
95 TaskStatus::Succeeded => {
96 return Ok(result);
97 }
98 TaskStatus::Failed => {
99 return Ok(result);
100 }
101 TaskStatus::Pending | TaskStatus::Running => {
102 tracing::info!(
103 "Task still in progress, waiting {} seconds before next poll...",
104 interval
105 );
106 sleep(Duration::from_secs(interval)).await;
107 }
108 TaskStatus::Canceled | TaskStatus::Unknown => {
109 return Ok(result);
110 }
111 }
112 }
113 Err(e) => {
114 match &e {
115 DashScopeError::JSONDeserialize {
116 source: _,
117 raw_response: _,
118 } => {
119 sleep(Duration::from_secs(interval)).await;
120 }
121 DashScopeError::Reqwest(_) => {
122 sleep(Duration::from_secs(interval)).await;
123 }
124 DashScopeError::ApiError(api_error) => {
125 if api_error.code.as_deref() == Some("EmptyResponse") {
126 sleep(Duration::from_secs(interval)).await;
127 } else {
128 return Err(e);
129 }
130 }
131 _ => {
132 return Err(e);
133 }
134 }
135 }
136 }
137 }
138 }
139}