dune_api/executions/
api.rs

1//! Executions API implementation
2
3use super::types::*;
4use crate::client::Client;
5use crate::error::{self, Error, Result};
6use std::time::Duration;
7use tokio::time::sleep;
8
9/// Executions API
10pub struct ExecutionsApi<'a> {
11    client: &'a Client,
12}
13
14impl<'a> ExecutionsApi<'a> {
15    pub(crate) fn new(client: &'a Client) -> Self {
16        Self { client }
17    }
18
19    /// Execute a saved query by ID
20    pub async fn execute(&self, query_id: i64) -> Result<ExecuteQueryResponse> {
21        self.execute_with_options(query_id, &ExecuteQueryRequest::default())
22            .await
23    }
24
25    /// Execute a saved query with options
26    pub async fn execute_with_options(
27        &self,
28        query_id: i64,
29        request: &ExecuteQueryRequest,
30    ) -> Result<ExecuteQueryResponse> {
31        let url = format!("{}/v1/query/{}/execute", self.client.base_url(), query_id);
32        let response = self.client.http().post(&url).json(request).send().await?;
33
34        if response.status().is_success() {
35            Ok(response.json().await?)
36        } else if response.status() == 404 {
37            Err(error::not_found(format!("Query {}", query_id)))
38        } else if response.status() == 402 {
39            Err(error::insufficient_credits())
40        } else if response.status() == 429 {
41            Err(Error::rate_limited(None))
42        } else {
43            let status = response.status().as_u16();
44            let message = response.text().await.unwrap_or_default();
45            Err(Error::api(status, message))
46        }
47    }
48
49    /// Execute raw SQL
50    pub async fn execute_sql(&self, request: &ExecuteSqlRequest) -> Result<ExecuteQueryResponse> {
51        let url = format!("{}/v1/sql/execute", self.client.base_url());
52        let response = self.client.http().post(&url).json(request).send().await?;
53
54        if response.status().is_success() {
55            Ok(response.json().await?)
56        } else if response.status() == 402 {
57            Err(error::insufficient_credits())
58        } else if response.status() == 429 {
59            Err(Error::rate_limited(None))
60        } else {
61            let status = response.status().as_u16();
62            let message = response.text().await.unwrap_or_default();
63            Err(Error::api(status, message))
64        }
65    }
66
67    /// Get execution status
68    pub async fn status(&self, execution_id: &str) -> Result<ExecutionStatus> {
69        let url = format!(
70            "{}/v1/execution/{}/status",
71            self.client.base_url(),
72            execution_id
73        );
74        let response = self.client.http().get(&url).send().await?;
75
76        if response.status().is_success() {
77            Ok(response.json().await?)
78        } else if response.status() == 404 {
79            Err(error::not_found(format!("Execution {}", execution_id)))
80        } else {
81            let status = response.status().as_u16();
82            let message = response.text().await.unwrap_or_default();
83            Err(Error::api(status, message))
84        }
85    }
86
87    /// Get execution results
88    pub async fn results(&self, execution_id: &str) -> Result<ExecutionResult> {
89        self.results_with_options(execution_id, &GetResultsOptions::default())
90            .await
91    }
92
93    /// Get execution results with options
94    pub async fn results_with_options(
95        &self,
96        execution_id: &str,
97        options: &GetResultsOptions,
98    ) -> Result<ExecutionResult> {
99        let url = format!(
100            "{}/v1/execution/{}/results{}",
101            self.client.base_url(),
102            execution_id,
103            options.to_query_string()
104        );
105        let response = self.client.http().get(&url).send().await?;
106
107        if response.status().is_success() {
108            Ok(response.json().await?)
109        } else if response.status() == 404 {
110            Err(error::not_found(format!("Execution {}", execution_id)))
111        } else {
112            let status = response.status().as_u16();
113            let message = response.text().await.unwrap_or_default();
114            Err(Error::api(status, message))
115        }
116    }
117
118    /// Get execution results as CSV
119    pub async fn results_csv(&self, execution_id: &str) -> Result<String> {
120        self.results_csv_with_options(execution_id, &GetResultsOptions::default())
121            .await
122    }
123
124    /// Get execution results as CSV with options
125    pub async fn results_csv_with_options(
126        &self,
127        execution_id: &str,
128        options: &GetResultsOptions,
129    ) -> Result<String> {
130        let url = format!(
131            "{}/v1/execution/{}/results/csv{}",
132            self.client.base_url(),
133            execution_id,
134            options.to_query_string()
135        );
136        let response = self.client.http().get(&url).send().await?;
137
138        if response.status().is_success() {
139            Ok(response.text().await?)
140        } else if response.status() == 404 {
141            Err(error::not_found(format!("Execution {}", execution_id)))
142        } else {
143            let status = response.status().as_u16();
144            let message = response.text().await.unwrap_or_default();
145            Err(Error::api(status, message))
146        }
147    }
148
149    /// Cancel an execution
150    pub async fn cancel(&self, execution_id: &str) -> Result<CancelExecutionResponse> {
151        let url = format!(
152            "{}/v1/execution/{}/cancel",
153            self.client.base_url(),
154            execution_id
155        );
156        let response = self.client.http().post(&url).send().await?;
157
158        if response.status().is_success() {
159            Ok(response.json().await?)
160        } else if response.status() == 404 {
161            Err(error::not_found(format!("Execution {}", execution_id)))
162        } else {
163            let status = response.status().as_u16();
164            let message = response.text().await.unwrap_or_default();
165            Err(Error::api(status, message))
166        }
167    }
168
169    /// Get the latest results for a saved query (uses cached results if available)
170    pub async fn query_results(&self, query_id: i64) -> Result<ExecutionResult> {
171        self.query_results_with_options(query_id, &GetResultsOptions::default())
172            .await
173    }
174
175    /// Get the latest results for a saved query with options
176    pub async fn query_results_with_options(
177        &self,
178        query_id: i64,
179        options: &GetResultsOptions,
180    ) -> Result<ExecutionResult> {
181        let url = format!(
182            "{}/v1/query/{}/results{}",
183            self.client.base_url(),
184            query_id,
185            options.to_query_string()
186        );
187        let response = self.client.http().get(&url).send().await?;
188
189        if response.status().is_success() {
190            Ok(response.json().await?)
191        } else if response.status() == 404 {
192            Err(error::not_found(format!("Query {}", query_id)))
193        } else {
194            let status = response.status().as_u16();
195            let message = response.text().await.unwrap_or_default();
196            Err(Error::api(status, message))
197        }
198    }
199
200    /// Get the latest results for a saved query as CSV
201    pub async fn query_results_csv(&self, query_id: i64) -> Result<String> {
202        self.query_results_csv_with_options(query_id, &GetResultsOptions::default())
203            .await
204    }
205
206    /// Get the latest results for a saved query as CSV with options
207    pub async fn query_results_csv_with_options(
208        &self,
209        query_id: i64,
210        options: &GetResultsOptions,
211    ) -> Result<String> {
212        let url = format!(
213            "{}/v1/query/{}/results/csv{}",
214            self.client.base_url(),
215            query_id,
216            options.to_query_string()
217        );
218        let response = self.client.http().get(&url).send().await?;
219
220        if response.status().is_success() {
221            Ok(response.text().await?)
222        } else if response.status() == 404 {
223            Err(error::not_found(format!("Query {}", query_id)))
224        } else {
225            let status = response.status().as_u16();
226            let message = response.text().await.unwrap_or_default();
227            Err(Error::api(status, message))
228        }
229    }
230
231    /// Execute a query and wait for results (convenience method)
232    ///
233    /// This method polls for completion with exponential backoff.
234    /// Timeout is in seconds (default 300 = 5 minutes).
235    pub async fn run_query(
236        &self,
237        query_id: i64,
238        timeout_secs: Option<u64>,
239    ) -> Result<ExecutionResult> {
240        self.run_query_with_options(query_id, &ExecuteQueryRequest::default(), timeout_secs)
241            .await
242    }
243
244    /// Execute a query with options and wait for results
245    pub async fn run_query_with_options(
246        &self,
247        query_id: i64,
248        request: &ExecuteQueryRequest,
249        timeout_secs: Option<u64>,
250    ) -> Result<ExecutionResult> {
251        let timeout = timeout_secs.unwrap_or(300);
252        let exec = self.execute_with_options(query_id, request).await?;
253        self.wait_for_results(&exec.execution_id, timeout).await
254    }
255
256    /// Execute SQL and wait for results (convenience method)
257    pub async fn run_sql(&self, sql: &str, timeout_secs: Option<u64>) -> Result<ExecutionResult> {
258        let request = ExecuteSqlRequest::new(sql);
259        self.run_sql_with_options(&request, timeout_secs).await
260    }
261
262    /// Execute SQL with options and wait for results
263    pub async fn run_sql_with_options(
264        &self,
265        request: &ExecuteSqlRequest,
266        timeout_secs: Option<u64>,
267    ) -> Result<ExecutionResult> {
268        let timeout = timeout_secs.unwrap_or(300);
269        let exec = self.execute_sql(request).await?;
270        self.wait_for_results(&exec.execution_id, timeout).await
271    }
272
273    /// Wait for an execution to complete and return results
274    async fn wait_for_results(
275        &self,
276        execution_id: &str,
277        timeout_secs: u64,
278    ) -> Result<ExecutionResult> {
279        let start = std::time::Instant::now();
280        let timeout = Duration::from_secs(timeout_secs);
281        let mut poll_interval = Duration::from_millis(500);
282        let max_interval = Duration::from_secs(5);
283
284        loop {
285            if start.elapsed() > timeout {
286                return Err(error::execution_timeout(timeout_secs));
287            }
288
289            let status = self.status(execution_id).await?;
290
291            if status.is_execution_finished {
292                if status.state.is_success() {
293                    return self.results(execution_id).await;
294                } else {
295                    let msg = status
296                        .error
297                        .map(|e| e.message.unwrap_or_default())
298                        .unwrap_or_else(|| {
299                            format!("Execution failed with state: {:?}", status.state)
300                        });
301                    return Err(error::execution_failed(msg));
302                }
303            }
304
305            sleep(poll_interval).await;
306            poll_interval = std::cmp::min(poll_interval * 2, max_interval);
307        }
308    }
309}