1use super::types::*;
4use crate::client::Client;
5use crate::error::{self, Error, Result};
6use std::time::Duration;
7use tokio::time::sleep;
8
9pub struct ExecutionsApi<'a> {
11 client: &'a Client,
12}
13
14impl<'a> ExecutionsApi<'a> {
15 pub(crate) fn new(client: &'a Client) -> Self {
16 Self { client }
17 }
18
19 pub async fn execute(&self, query_id: i64) -> Result<ExecuteQueryResponse> {
21 self.execute_with_options(query_id, &ExecuteQueryRequest::default())
22 .await
23 }
24
25 pub async fn execute_with_options(
27 &self,
28 query_id: i64,
29 request: &ExecuteQueryRequest,
30 ) -> Result<ExecuteQueryResponse> {
31 let url = format!("{}/v1/query/{}/execute", self.client.base_url(), query_id);
32 let response = self.client.http().post(&url).json(request).send().await?;
33
34 if response.status().is_success() {
35 Ok(response.json().await?)
36 } else if response.status() == 404 {
37 Err(error::not_found(format!("Query {}", query_id)))
38 } else if response.status() == 402 {
39 Err(error::insufficient_credits())
40 } else if response.status() == 429 {
41 Err(Error::rate_limited(None))
42 } else {
43 let status = response.status().as_u16();
44 let message = response.text().await.unwrap_or_default();
45 Err(Error::api(status, message))
46 }
47 }
48
49 pub async fn execute_sql(&self, request: &ExecuteSqlRequest) -> Result<ExecuteQueryResponse> {
51 let url = format!("{}/v1/sql/execute", self.client.base_url());
52 let response = self.client.http().post(&url).json(request).send().await?;
53
54 if response.status().is_success() {
55 Ok(response.json().await?)
56 } else if response.status() == 402 {
57 Err(error::insufficient_credits())
58 } else if response.status() == 429 {
59 Err(Error::rate_limited(None))
60 } else {
61 let status = response.status().as_u16();
62 let message = response.text().await.unwrap_or_default();
63 Err(Error::api(status, message))
64 }
65 }
66
67 pub async fn status(&self, execution_id: &str) -> Result<ExecutionStatus> {
69 let url = format!(
70 "{}/v1/execution/{}/status",
71 self.client.base_url(),
72 execution_id
73 );
74 let response = self.client.http().get(&url).send().await?;
75
76 if response.status().is_success() {
77 Ok(response.json().await?)
78 } else if response.status() == 404 {
79 Err(error::not_found(format!("Execution {}", execution_id)))
80 } else {
81 let status = response.status().as_u16();
82 let message = response.text().await.unwrap_or_default();
83 Err(Error::api(status, message))
84 }
85 }
86
87 pub async fn results(&self, execution_id: &str) -> Result<ExecutionResult> {
89 self.results_with_options(execution_id, &GetResultsOptions::default())
90 .await
91 }
92
93 pub async fn results_with_options(
95 &self,
96 execution_id: &str,
97 options: &GetResultsOptions,
98 ) -> Result<ExecutionResult> {
99 let url = format!(
100 "{}/v1/execution/{}/results{}",
101 self.client.base_url(),
102 execution_id,
103 options.to_query_string()
104 );
105 let response = self.client.http().get(&url).send().await?;
106
107 if response.status().is_success() {
108 Ok(response.json().await?)
109 } else if response.status() == 404 {
110 Err(error::not_found(format!("Execution {}", execution_id)))
111 } else {
112 let status = response.status().as_u16();
113 let message = response.text().await.unwrap_or_default();
114 Err(Error::api(status, message))
115 }
116 }
117
118 pub async fn results_csv(&self, execution_id: &str) -> Result<String> {
120 self.results_csv_with_options(execution_id, &GetResultsOptions::default())
121 .await
122 }
123
124 pub async fn results_csv_with_options(
126 &self,
127 execution_id: &str,
128 options: &GetResultsOptions,
129 ) -> Result<String> {
130 let url = format!(
131 "{}/v1/execution/{}/results/csv{}",
132 self.client.base_url(),
133 execution_id,
134 options.to_query_string()
135 );
136 let response = self.client.http().get(&url).send().await?;
137
138 if response.status().is_success() {
139 Ok(response.text().await?)
140 } else if response.status() == 404 {
141 Err(error::not_found(format!("Execution {}", execution_id)))
142 } else {
143 let status = response.status().as_u16();
144 let message = response.text().await.unwrap_or_default();
145 Err(Error::api(status, message))
146 }
147 }
148
149 pub async fn cancel(&self, execution_id: &str) -> Result<CancelExecutionResponse> {
151 let url = format!(
152 "{}/v1/execution/{}/cancel",
153 self.client.base_url(),
154 execution_id
155 );
156 let response = self.client.http().post(&url).send().await?;
157
158 if response.status().is_success() {
159 Ok(response.json().await?)
160 } else if response.status() == 404 {
161 Err(error::not_found(format!("Execution {}", execution_id)))
162 } else {
163 let status = response.status().as_u16();
164 let message = response.text().await.unwrap_or_default();
165 Err(Error::api(status, message))
166 }
167 }
168
169 pub async fn query_results(&self, query_id: i64) -> Result<ExecutionResult> {
171 self.query_results_with_options(query_id, &GetResultsOptions::default())
172 .await
173 }
174
175 pub async fn query_results_with_options(
177 &self,
178 query_id: i64,
179 options: &GetResultsOptions,
180 ) -> Result<ExecutionResult> {
181 let url = format!(
182 "{}/v1/query/{}/results{}",
183 self.client.base_url(),
184 query_id,
185 options.to_query_string()
186 );
187 let response = self.client.http().get(&url).send().await?;
188
189 if response.status().is_success() {
190 Ok(response.json().await?)
191 } else if response.status() == 404 {
192 Err(error::not_found(format!("Query {}", query_id)))
193 } else {
194 let status = response.status().as_u16();
195 let message = response.text().await.unwrap_or_default();
196 Err(Error::api(status, message))
197 }
198 }
199
200 pub async fn query_results_csv(&self, query_id: i64) -> Result<String> {
202 self.query_results_csv_with_options(query_id, &GetResultsOptions::default())
203 .await
204 }
205
206 pub async fn query_results_csv_with_options(
208 &self,
209 query_id: i64,
210 options: &GetResultsOptions,
211 ) -> Result<String> {
212 let url = format!(
213 "{}/v1/query/{}/results/csv{}",
214 self.client.base_url(),
215 query_id,
216 options.to_query_string()
217 );
218 let response = self.client.http().get(&url).send().await?;
219
220 if response.status().is_success() {
221 Ok(response.text().await?)
222 } else if response.status() == 404 {
223 Err(error::not_found(format!("Query {}", query_id)))
224 } else {
225 let status = response.status().as_u16();
226 let message = response.text().await.unwrap_or_default();
227 Err(Error::api(status, message))
228 }
229 }
230
231 pub async fn run_query(
236 &self,
237 query_id: i64,
238 timeout_secs: Option<u64>,
239 ) -> Result<ExecutionResult> {
240 self.run_query_with_options(query_id, &ExecuteQueryRequest::default(), timeout_secs)
241 .await
242 }
243
244 pub async fn run_query_with_options(
246 &self,
247 query_id: i64,
248 request: &ExecuteQueryRequest,
249 timeout_secs: Option<u64>,
250 ) -> Result<ExecutionResult> {
251 let timeout = timeout_secs.unwrap_or(300);
252 let exec = self.execute_with_options(query_id, request).await?;
253 self.wait_for_results(&exec.execution_id, timeout).await
254 }
255
256 pub async fn run_sql(&self, sql: &str, timeout_secs: Option<u64>) -> Result<ExecutionResult> {
258 let request = ExecuteSqlRequest::new(sql);
259 self.run_sql_with_options(&request, timeout_secs).await
260 }
261
262 pub async fn run_sql_with_options(
264 &self,
265 request: &ExecuteSqlRequest,
266 timeout_secs: Option<u64>,
267 ) -> Result<ExecutionResult> {
268 let timeout = timeout_secs.unwrap_or(300);
269 let exec = self.execute_sql(request).await?;
270 self.wait_for_results(&exec.execution_id, timeout).await
271 }
272
273 async fn wait_for_results(
275 &self,
276 execution_id: &str,
277 timeout_secs: u64,
278 ) -> Result<ExecutionResult> {
279 let start = std::time::Instant::now();
280 let timeout = Duration::from_secs(timeout_secs);
281 let mut poll_interval = Duration::from_millis(500);
282 let max_interval = Duration::from_secs(5);
283
284 loop {
285 if start.elapsed() > timeout {
286 return Err(error::execution_timeout(timeout_secs));
287 }
288
289 let status = self.status(execution_id).await?;
290
291 if status.is_execution_finished {
292 if status.state.is_success() {
293 return self.results(execution_id).await;
294 } else {
295 let msg = status
296 .error
297 .map(|e| e.message.unwrap_or_default())
298 .unwrap_or_else(|| {
299 format!("Execution failed with state: {:?}", status.state)
300 });
301 return Err(error::execution_failed(msg));
302 }
303 }
304
305 sleep(poll_interval).await;
306 poll_interval = std::cmp::min(poll_interval * 2, max_interval);
307 }
308 }
309}