metabase_api_rs/service/
query.rs1use super::traits::{Service, ServiceError, ServiceResult};
6use crate::core::models::query::{NativeQuery, QueryResult};
7use crate::core::models::DatasetQuery;
8use crate::repository::query::QueryRepository;
9use async_trait::async_trait;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14#[async_trait]
16pub trait QueryService: Service {
17 async fn execute_dataset_query(&self, query: DatasetQuery) -> ServiceResult<QueryResult>;
19
20 async fn execute_native_query(
22 &self,
23 database_id: i32,
24 query: NativeQuery,
25 ) -> ServiceResult<QueryResult>;
26
27 async fn execute_raw_query(&self, query: Value) -> ServiceResult<QueryResult>;
29
30 async fn execute_pivot_query(&self, query: Value) -> ServiceResult<QueryResult>;
32
33 async fn execute_sql_with_params(
35 &self,
36 database_id: i32,
37 sql: &str,
38 params: HashMap<String, serde_json::Value>,
39 ) -> ServiceResult<QueryResult>;
40
41 async fn execute_sql(&self, database_id: i32, sql: &str) -> ServiceResult<QueryResult>;
43
44 async fn export_query(&self, format: &str, query: Value) -> ServiceResult<Vec<u8>>;
46
47 async fn validate_query(&self, sql: &str) -> ServiceResult<()>;
49
50 async fn validate_dataset_query(&self, query: &DatasetQuery) -> ServiceResult<()>;
52}
53
54pub struct HttpQueryService {
56 repository: Arc<dyn QueryRepository>,
57}
58
59impl HttpQueryService {
60 pub fn new(repository: Arc<dyn QueryRepository>) -> Self {
62 Self { repository }
63 }
64
65 fn validate_sql(&self, sql: &str) -> ServiceResult<()> {
67 if sql.trim().is_empty() {
69 return Err(ServiceError::Validation(
70 "SQL query cannot be empty".to_string(),
71 ));
72 }
73
74 let sql_upper = sql.to_uppercase();
76 let dangerous_keywords = [
77 "DROP", "DELETE", "TRUNCATE", "ALTER", "CREATE", "GRANT", "REVOKE",
78 ];
79
80 for keyword in &dangerous_keywords {
81 if sql_upper.contains(keyword) {
82 if !sql_upper.contains(&format!("--{}", keyword))
84 && !sql_upper.contains(&format!("/*{}*/", keyword))
85 {
86 return Err(ServiceError::BusinessRule(format!(
87 "Query contains potentially dangerous operation: {}",
88 keyword
89 )));
90 }
91 }
92 }
93
94 Ok(())
95 }
96}
97
98#[async_trait]
99impl Service for HttpQueryService {
100 fn name(&self) -> &str {
101 "QueryService"
102 }
103}
104
105#[async_trait]
106impl QueryService for HttpQueryService {
107 async fn execute_dataset_query(&self, query: DatasetQuery) -> ServiceResult<QueryResult> {
108 self.validate_dataset_query(&query).await?;
110
111 self.repository
113 .execute_dataset_query(query)
114 .await
115 .map_err(ServiceError::from)
116 }
117
118 async fn execute_native_query(
119 &self,
120 database_id: i32,
121 query: NativeQuery,
122 ) -> ServiceResult<QueryResult> {
123 self.validate_sql(&query.query)?;
125
126 self.repository
128 .execute_native_query(database_id, query)
129 .await
130 .map_err(ServiceError::from)
131 }
132
133 async fn execute_raw_query(&self, query: Value) -> ServiceResult<QueryResult> {
134 if !query.is_object() {
136 return Err(ServiceError::Validation(
137 "Query must be a JSON object".to_string(),
138 ));
139 }
140
141 self.repository
143 .execute_raw_query(query)
144 .await
145 .map_err(ServiceError::from)
146 }
147
148 async fn execute_pivot_query(&self, query: Value) -> ServiceResult<QueryResult> {
149 if !query.is_object() {
151 return Err(ServiceError::Validation(
152 "Query must be a JSON object".to_string(),
153 ));
154 }
155
156 self.repository
158 .execute_pivot_query(query)
159 .await
160 .map_err(ServiceError::from)
161 }
162
163 async fn execute_sql_with_params(
164 &self,
165 database_id: i32,
166 sql: &str,
167 params: HashMap<String, serde_json::Value>,
168 ) -> ServiceResult<QueryResult> {
169 self.validate_sql(sql)?;
171
172 let query = NativeQuery::builder(sql).with_params(params).build();
174
175 self.repository
177 .execute_native_query(database_id, query)
178 .await
179 .map_err(ServiceError::from)
180 }
181
182 async fn execute_sql(&self, database_id: i32, sql: &str) -> ServiceResult<QueryResult> {
183 self.validate_sql(sql)?;
185
186 let query = NativeQuery::builder(sql).build();
188
189 self.repository
191 .execute_native_query(database_id, query)
192 .await
193 .map_err(ServiceError::from)
194 }
195
196 async fn export_query(&self, format: &str, query: Value) -> ServiceResult<Vec<u8>> {
197 let valid_formats = ["csv", "json", "xlsx"];
199 if !valid_formats.contains(&format) {
200 return Err(ServiceError::Validation(format!(
201 "Invalid export format: {}. Must be one of: csv, json, xlsx",
202 format
203 )));
204 }
205
206 if !query.is_object() {
208 return Err(ServiceError::Validation(
209 "Query must be a JSON object".to_string(),
210 ));
211 }
212
213 self.repository
215 .export_query(format, query)
216 .await
217 .map_err(ServiceError::from)
218 }
219
220 async fn validate_query(&self, sql: &str) -> ServiceResult<()> {
221 self.validate_sql(sql)
222 }
223
224 async fn validate_dataset_query(&self, query: &DatasetQuery) -> ServiceResult<()> {
225 if query.database.0 < 1 {
227 return Err(ServiceError::Validation(
228 "Invalid database ID: must be positive".to_string(),
229 ));
230 }
231
232 if query.query.is_null() {
234 return Err(ServiceError::Validation(
235 "Query content cannot be empty".to_string(),
236 ));
237 }
238
239 Ok(())
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::core::models::query::QueryStatus;
247 use crate::core::models::MetabaseId;
248 use crate::repository::query::{MockQueryRepository, QueryRepository};
249 use serde_json::json;
250 use std::sync::Arc;
251
252 #[tokio::test]
253 async fn test_execute_dataset_query() {
254 let repository = Arc::new(MockQueryRepository::new()) as Arc<dyn QueryRepository>;
256 let service = HttpQueryService::new(repository);
257 let query = DatasetQuery {
258 database: MetabaseId(1),
259 query_type: "native".to_string(),
260 query: json!({"query": "SELECT * FROM users"}),
261 parameters: None,
262 constraints: None,
263 };
264
265 let result = service.execute_dataset_query(query).await;
267
268 assert!(result.is_ok());
270 let query_result = result.unwrap();
271 assert_eq!(query_result.status, QueryStatus::Completed);
272 }
273
274 #[tokio::test]
275 async fn test_validate_sql() {
276 let repository = Arc::new(MockQueryRepository::new()) as Arc<dyn QueryRepository>;
277 let service = HttpQueryService::new(repository);
278
279 assert!(service.validate_sql("SELECT * FROM users").is_ok());
281
282 assert!(service.validate_sql("").is_err());
284
285 assert!(service.validate_sql("DROP TABLE users").is_err());
287 }
288
289 #[tokio::test]
290 async fn test_export_query() {
291 let repository = Arc::new(MockQueryRepository::new()) as Arc<dyn QueryRepository>;
292 let service = HttpQueryService::new(repository);
293 let query = json!({
294 "database": 1,
295 "type": "native",
296 "native": {"query": "SELECT * FROM products"}
297 });
298
299 let result = service.export_query("csv", query.clone()).await;
301 assert!(result.is_ok());
302
303 let result = service.export_query("invalid", query).await;
305 assert!(result.is_err());
306 }
307}