1#![allow(clippy::missing_errors_doc)]
2
3use anyhow::{Context, Result};
4use reqwest::{Client, header};
5use crate::models::{CreateDashboard, CreateQuery, CreateWidget, Dashboard, DashboardsResponse, DashboardSummary, DataSource, DataSourceSchema, QueriesResponse, Query};
6
7pub struct RedashClient {
8 client: Client,
9 base_url: String,
10}
11
12impl RedashClient {
13 pub fn new(base_url: String, api_key: &str) -> Result<Self> {
14 let mut headers = header::HeaderMap::new();
15 headers.insert(
16 "Authorization",
17 header::HeaderValue::from_str(&format!("Key {api_key}"))
18 .context("Invalid API key format")?,
19 );
20
21 let client = Client::builder()
22 .default_headers(headers)
23 .build()
24 .context("Failed to build HTTP client")?;
25
26 Ok(Self {
27 client,
28 base_url,
29 })
30 }
31
32 pub async fn list_my_queries(&self, page: u32, page_size: u32) -> Result<QueriesResponse> {
33 let url = format!("{}/api/queries/my?page={page}&page_size={page_size}", self.base_url);
34 let response = self.client
35 .get(&url)
36 .send()
37 .await
38 .context("Failed to fetch my queries")?
39 .error_for_status()
40 .context("API returned error status")?;
41
42 response
43 .json()
44 .await
45 .context("Failed to parse queries response")
46 }
47
48 pub async fn get_query(&self, query_id: u64) -> Result<Query> {
49 let url = format!("{}/api/queries/{query_id}", self.base_url);
50 let response = self.client
51 .get(&url)
52 .send()
53 .await
54 .context(format!("Failed to fetch query {query_id}"))?
55 .error_for_status()
56 .context("API returned error status")?;
57
58 response
59 .json()
60 .await
61 .context("Failed to parse query response")
62 }
63
64 pub async fn list_data_sources(&self) -> Result<Vec<DataSource>> {
65 let url = format!("{}/api/data_sources", self.base_url);
66 let response = self.client
67 .get(&url)
68 .send()
69 .await
70 .context("Failed to fetch data sources")?
71 .error_for_status()
72 .context("API returned error status")?;
73
74 response
75 .json()
76 .await
77 .context("Failed to parse data sources response")
78 }
79
80 pub async fn get_data_source(&self, data_source_id: u64) -> Result<DataSource> {
81 let url = format!("{}/api/data_sources/{data_source_id}", self.base_url);
82 let response = self.client
83 .get(&url)
84 .send()
85 .await
86 .context(format!("Failed to fetch data source {data_source_id}"))?
87 .error_for_status()
88 .context("API returned error status")?;
89
90 response
91 .json()
92 .await
93 .context("Failed to parse data source response")
94 }
95
96 pub async fn get_data_source_schema(
97 &self,
98 data_source_id: u64,
99 refresh: bool,
100 ) -> Result<DataSourceSchema> {
101 let url = if refresh {
102 format!("{}/api/data_sources/{data_source_id}/schema?refresh=true", self.base_url)
103 } else {
104 format!("{}/api/data_sources/{data_source_id}/schema", self.base_url)
105 };
106
107 let response = self.client
108 .get(&url)
109 .send()
110 .await
111 .context(format!("Failed to fetch schema for data source {data_source_id}"))?
112 .error_for_status()
113 .context("API returned error status")?;
114
115 response
116 .json()
117 .await
118 .context("Failed to parse schema response")
119 }
120
121 pub async fn create_query(&self, create_query: &CreateQuery) -> Result<Query> {
122 let url = format!("{}/api/queries", self.base_url);
123 let response = self.client
124 .post(&url)
125 .json(create_query)
126 .send()
127 .await
128 .context("Failed to create query")?
129 .error_for_status()
130 .context("API returned error status")?;
131
132 response
133 .json()
134 .await
135 .context("Failed to parse query create response")
136 }
137
138 pub async fn create_or_update_query(&self, query: &Query) -> Result<Query> {
139 let url = format!("{}/api/queries/{}", self.base_url, query.id);
140 let response = self.client
141 .post(&url)
142 .json(query)
143 .send()
144 .await
145 .context(format!("Failed to update query {}", query.id))?
146 .error_for_status()
147 .context("API returned error status")?;
148
149 response
150 .json()
151 .await
152 .context("Failed to parse query update response")
153 }
154
155 pub async fn create_visualization(&self, query_id: u64, viz: &crate::models::CreateVisualization) -> Result<crate::models::Visualization> {
156 let url = format!("{}/api/visualizations", self.base_url);
157 let response = self.client
158 .post(&url)
159 .json(viz)
160 .send()
161 .await
162 .context(format!("Failed to create visualization for query {query_id}"))?
163 .error_for_status()
164 .context("API returned error status")?;
165
166 response
167 .json()
168 .await
169 .context("Failed to parse visualization create response")
170 }
171
172 pub async fn update_visualization(&self, viz: &crate::models::Visualization) -> Result<crate::models::Visualization> {
173 let url = format!("{}/api/visualizations/{}", self.base_url, viz.id);
174 let response = self.client
175 .post(&url)
176 .json(viz)
177 .send()
178 .await
179 .context(format!("Failed to update visualization {}", viz.id))?
180 .error_for_status()
181 .context("API returned error status")?;
182
183 response
184 .json()
185 .await
186 .context("Failed to parse visualization update response")
187 }
188
189 pub async fn fetch_all_queries(&self) -> Result<Vec<Query>> {
190 let mut all_queries = Vec::new();
191 let mut page = 1;
192 let page_size = 100;
193
194 loop {
195 let response = self.list_my_queries(page, page_size).await?;
196
197 if response.results.is_empty() {
198 break;
199 }
200
201 all_queries.extend(response.results);
202 eprintln!("Fetched {} / {} queries...", all_queries.len(), response.count);
203
204 #[allow(clippy::cast_possible_truncation)]
205 if all_queries.len() >= response.count as usize {
206 break;
207 }
208
209 page += 1;
210 }
211
212 Ok(all_queries)
213 }
214
215 pub async fn refresh_query(
216 &self,
217 query_id: u64,
218 parameters: Option<std::collections::HashMap<String, serde_json::Value>>,
219 ) -> Result<crate::models::Job> {
220 let url = format!("{}/api/queries/{query_id}/results", self.base_url);
221
222 let request = crate::models::RefreshRequest {
223 max_age: 0,
224 parameters,
225 };
226
227 let response = self.client
228 .post(&url)
229 .json(&request)
230 .send()
231 .await
232 .context(format!("Failed to refresh query {query_id}"))?;
233
234 let status = response.status();
235 if !status.is_success() {
236 let error_body = response.text().await.unwrap_or_else(|_| "Unable to read error response".to_string());
237 anyhow::bail!("API returned error status {status}: {error_body}");
238 }
239
240 let job_response: crate::models::JobResponse = response
241 .json()
242 .await
243 .context("Failed to parse job response")?;
244
245 Ok(job_response.job)
246 }
247
248 pub async fn poll_job(&self, job_id: &str) -> Result<crate::models::Job> {
249 let url = format!("{}/api/jobs/{job_id}", self.base_url);
250
251 let response = self.client
252 .get(&url)
253 .send()
254 .await
255 .context(format!("Failed to poll job {job_id}"))?
256 .error_for_status()
257 .context("API returned error status")?;
258
259 let job_response: crate::models::JobResponse = response
260 .json()
261 .await
262 .context("Failed to parse job response")?;
263
264 Ok(job_response.job)
265 }
266
267 pub async fn get_query_result(
268 &self,
269 query_id: u64,
270 result_id: u64,
271 ) -> Result<crate::models::QueryResult> {
272 let url = format!(
273 "{}/api/queries/{query_id}/results/{result_id}.json",
274 self.base_url
275 );
276
277 let response = self.client
278 .get(&url)
279 .send()
280 .await
281 .context(format!("Failed to fetch result {result_id} for query {query_id}"))?
282 .error_for_status()
283 .context("API returned error status")?;
284
285 let result_response: crate::models::QueryResultResponse = response
286 .json()
287 .await
288 .context("Failed to parse query result response")?;
289
290 Ok(result_response.query_result)
291 }
292
293 pub async fn execute_query_with_polling(
294 &self,
295 query_id: u64,
296 parameters: Option<std::collections::HashMap<String, serde_json::Value>>,
297 timeout_secs: u64,
298 poll_interval_ms: u64,
299 ) -> Result<crate::models::QueryResult> {
300 use tokio::time::{sleep, Duration};
301 use crate::models::JobStatus;
302
303 eprintln!("Executing query {query_id}...");
304 let job = self.refresh_query(query_id, parameters).await?;
305
306 let start = std::time::Instant::now();
307 let timeout = Duration::from_secs(timeout_secs);
308 let poll_interval = Duration::from_millis(poll_interval_ms);
309
310 let mut current_job = job;
311 loop {
312 if start.elapsed() > timeout {
313 anyhow::bail!("Query execution timed out after {timeout_secs} seconds");
314 }
315
316 let status = JobStatus::from_u8(current_job.status)?;
317
318 match status {
319 JobStatus::Success => {
320 let result_id = current_job.query_result_id
321 .context("Job succeeded but no result_id returned")?;
322
323 eprintln!("Query completed, fetching results...");
324 return self.get_query_result(query_id, result_id).await;
325 }
326 JobStatus::Failure => {
327 let error = current_job.error.unwrap_or_else(|| "Unknown error".to_string());
328 anyhow::bail!("Query execution failed: {error}");
329 }
330 JobStatus::Cancelled => {
331 anyhow::bail!("Query execution was cancelled");
332 }
333 JobStatus::Pending | JobStatus::Started => {
334 eprint!(".");
335 sleep(poll_interval).await;
336 current_job = self.poll_job(¤t_job.id).await?;
337 }
338 }
339 }
340 }
341
342 pub async fn archive_query(&self, query_id: u64) -> Result<Query> {
343 let url = format!("{}/api/queries/{query_id}", self.base_url);
344 let payload = serde_json::json!({"is_archived": true});
345
346 let response = self.client
347 .post(&url)
348 .json(&payload)
349 .send()
350 .await
351 .context(format!("Failed to archive query {query_id}"))?
352 .error_for_status()
353 .context("API returned error status")?;
354
355 response
356 .json()
357 .await
358 .context("Failed to parse archive response")
359 }
360
361 pub async fn unarchive_query(&self, query_id: u64) -> Result<Query> {
362 let url = format!("{}/api/queries/{query_id}", self.base_url);
363 let payload = serde_json::json!({"is_archived": false});
364
365 let response = self.client
366 .post(&url)
367 .json(&payload)
368 .send()
369 .await
370 .context(format!("Failed to unarchive query {query_id}"))?
371 .error_for_status()
372 .context("API returned error status")?;
373
374 response
375 .json()
376 .await
377 .context("Failed to parse unarchive response")
378 }
379
380 pub async fn create_dashboard(&self, dashboard: &CreateDashboard) -> Result<Dashboard> {
381 let url = format!("{}/api/dashboards", self.base_url);
382 let response = self.client
383 .post(&url)
384 .json(dashboard)
385 .send()
386 .await
387 .context("Failed to create dashboard")?;
388
389 let status = response.status();
390 if !status.is_success() {
391 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
392 }
393
394 response
395 .json()
396 .await
397 .context("Failed to parse dashboard create response")
398 }
399
400 pub async fn list_favorite_dashboards(&self, page: u32, page_size: u32) -> Result<DashboardsResponse> {
401 let url = format!("{}/api/dashboards/favorites?page={page}&page_size={page_size}", self.base_url);
402 let response = self.client
403 .get(&url)
404 .send()
405 .await
406 .context("Failed to fetch dashboards")?;
407
408 let status = response.status();
409 if !status.is_success() {
410 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
411 }
412
413 response
414 .json()
415 .await
416 .context("Failed to parse dashboards response")
417 }
418
419 pub async fn get_dashboard(&self, slug_or_id: &str) -> Result<Dashboard> {
420 let url = format!("{}/api/dashboards/{slug_or_id}", self.base_url);
421 let response = self.client
422 .get(&url)
423 .send()
424 .await
425 .context(format!("Failed to fetch dashboard {slug_or_id}"))?;
426
427 let status = response.status();
428 if !status.is_success() {
429 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
430 }
431
432 response
433 .json()
434 .await
435 .context("Failed to parse dashboard response")
436 }
437
438 pub async fn update_dashboard(&self, dashboard: &Dashboard) -> Result<Dashboard> {
439 let url = format!("{}/api/dashboards/{}", self.base_url, dashboard.id);
440 let response = self.client
441 .post(&url)
442 .json(dashboard)
443 .send()
444 .await
445 .context(format!("Failed to update dashboard {}", dashboard.id))?;
446
447 let status = response.status();
448 if !status.is_success() {
449 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
450 }
451
452 response
453 .json()
454 .await
455 .context("Failed to parse dashboard update response")
456 }
457
458 pub async fn archive_dashboard(&self, dashboard_id: u64) -> Result<()> {
459 let url = format!("{}/api/dashboards/{dashboard_id}", self.base_url);
460 let response = self.client
461 .delete(&url)
462 .send()
463 .await
464 .context(format!("Failed to archive dashboard {dashboard_id}"))?;
465
466 let status = response.status();
467 if !status.is_success() {
468 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
469 }
470
471 Ok(())
472 }
473
474 pub async fn unarchive_dashboard(&self, dashboard_id: u64) -> Result<Dashboard> {
475 let url = format!("{}/api/dashboards/{dashboard_id}", self.base_url);
476 let payload = serde_json::json!({"is_archived": false});
477
478 let response = self.client
479 .post(&url)
480 .json(&payload)
481 .send()
482 .await
483 .context(format!("Failed to unarchive dashboard {dashboard_id}"))?;
484
485 let status = response.status();
486 if !status.is_success() {
487 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
488 }
489
490 response
491 .json()
492 .await
493 .context("Failed to parse unarchive response")
494 }
495
496 pub async fn create_widget(&self, widget: &CreateWidget) -> Result<crate::models::Widget> {
497 let url = format!("{}/api/widgets", self.base_url);
498 let response = self.client
499 .post(&url)
500 .json(widget)
501 .send()
502 .await
503 .context("Failed to create widget")?;
504
505 let status = response.status();
506 if !status.is_success() {
507 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
508 }
509
510 response
511 .json()
512 .await
513 .context("Failed to parse widget create response")
514 }
515
516 pub async fn delete_widget(&self, widget_id: u64) -> Result<()> {
517 let url = format!("{}/api/widgets/{widget_id}", self.base_url);
518 let response = self.client
519 .delete(&url)
520 .send()
521 .await
522 .context(format!("Failed to delete widget {widget_id}"))?;
523
524 let status = response.status();
525 if !status.is_success() {
526 anyhow::bail!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown error"));
527 }
528
529 Ok(())
530 }
531
532 pub async fn fetch_favorite_dashboards(&self) -> Result<Vec<DashboardSummary>> {
533 let mut all_dashboards = Vec::new();
534 let mut page = 1;
535 let page_size = 100;
536
537 loop {
538 let response = self.list_favorite_dashboards(page, page_size).await?;
539
540 if response.results.is_empty() {
541 break;
542 }
543
544 all_dashboards.extend(response.results);
545 eprintln!("Fetched {} / {} dashboards...", all_dashboards.len(), response.count);
546
547 #[allow(clippy::cast_possible_truncation)]
548 if all_dashboards.len() >= response.count as usize {
549 break;
550 }
551
552 page += 1;
553 }
554
555 Ok(all_dashboards)
556 }
557}