leptos_helios/data_sources/
database.rs1use super::errors::DataSourceError;
7use super::traits::*;
8use super::types::*;
9use async_trait::async_trait;
10use polars::prelude::*;
11use std::sync::{Arc, Mutex};
12use tokio::sync::RwLock;
13
14#[cfg(feature = "database")]
15use clickhouse::Client as ClickHouseClient;
16#[cfg(feature = "database")]
17use sqlx::{PgPool, Postgres, Row};
18
19pub struct PostgresAdapter {
21 config: ConnectionConfig,
22 #[cfg(feature = "database")]
23 pool: Option<Arc<PgPool>>,
24 #[cfg(not(feature = "database"))]
25 pool: Option<Arc<RwLock<MockConnectionPool>>>,
26}
27
28impl PostgresAdapter {
29 pub fn new(config: ConnectionConfig) -> Self {
30 Self { config, pool: None }
31 }
32
33 pub async fn initialize(&mut self) -> Result<(), DataSourceError> {
34 #[cfg(feature = "database")]
35 {
36 let pool = PgPool::connect(&self.config.connection_string)
37 .await
38 .map_err(|e| DataSourceError::ConnectionFailed(e.to_string()))?;
39 self.pool = Some(Arc::new(pool));
40 }
41 #[cfg(not(feature = "database"))]
42 {
43 let pool = MockConnectionPool::new(&self.config).await?;
44 self.pool = Some(Arc::new(RwLock::new(pool)));
45 }
46 Ok(())
47 }
48}
49
50#[async_trait]
51impl DataSource for PostgresAdapter {
52 fn name(&self) -> &str {
53 "PostgreSQL"
54 }
55
56 fn source_type(&self) -> &str {
57 "postgresql"
58 }
59
60 async fn connect(
61 &self,
62 _config: &ConnectionConfig,
63 ) -> Result<Box<dyn Connection>, DataSourceError> {
64 let pool = self
65 .pool
66 .as_ref()
67 .ok_or_else(|| DataSourceError::ConnectionFailed("Pool not initialized".to_string()))?;
68
69 let conn = PostgresConnection::new(pool.clone()).await?;
70 Ok(Box::new(conn))
71 }
72
73 async fn test_connection(&self, _config: &ConnectionConfig) -> Result<bool, DataSourceError> {
74 match self.connect(_config).await {
76 Ok(_) => Ok(true),
77 Err(_) => Ok(false),
78 }
79 }
80
81 fn supported_features(&self) -> Vec<String> {
82 vec![
83 "transactions".to_string(),
84 "prepared_statements".to_string(),
85 "schema_introspection".to_string(),
86 "json_support".to_string(),
87 "array_support".to_string(),
88 ]
89 }
90
91 fn connection_limits(&self) -> ConnectionLimits {
92 ConnectionLimits::new()
93 .with_max_connections(100)
94 .with_max_query_time(std::time::Duration::from_secs(300))
95 .with_max_result_size(100_000_000)
96 .with_max_parameters(1000)
97 }
98
99 async fn get_stats(&self) -> Result<DataSourceStats, DataSourceError> {
100 Ok(DataSourceStats::new())
102 }
103}
104
105pub struct PostgresConnection {
107 #[cfg(feature = "database")]
108 pool: Arc<PgPool>,
109 #[cfg(not(feature = "database"))]
110 pool: Arc<RwLock<MockConnectionPool>>,
111 stats: Arc<Mutex<ConnectionStats>>,
112}
113
114impl PostgresConnection {
115 pub async fn new(
116 #[cfg(feature = "database")] pool: Arc<PgPool>,
117 #[cfg(not(feature = "database"))] pool: Arc<RwLock<MockConnectionPool>>,
118 ) -> Result<Self, DataSourceError> {
119 Ok(Self {
120 pool,
121 stats: Arc::new(Mutex::new(ConnectionStats::new(
122 uuid::Uuid::new_v4().to_string(),
123 ))),
124 })
125 }
126}
127
128#[async_trait]
129impl Connection for PostgresConnection {
130 async fn execute_query(&self, _query: &str) -> Result<DataFrame, DataSourceError> {
131 let start_time = std::time::Instant::now();
132
133 #[cfg(feature = "database")]
134 {
135 let rows = sqlx::query(_query)
136 .fetch_all(&*self.pool)
137 .await
138 .map_err(|e| DataSourceError::QueryFailed(e.to_string()))?;
139
140 let mut data: Vec<Vec<String>> = Vec::new();
142 let mut columns: Vec<String> = Vec::new();
143
144 if let Some(first_row) = rows.first() {
145 for (i, column) in first_row.columns().iter().enumerate() {
146 columns.push(column.name().to_string());
147 }
148
149 for row in &rows {
150 let mut row_data = Vec::new();
151 for i in 0..columns.len() {
152 let value = row
153 .try_get::<String, _>(i)
154 .unwrap_or_else(|_| "NULL".to_string());
155 row_data.push(value);
156 }
157 data.push(row_data);
158 }
159 }
160
161 let execution_time = start_time.elapsed();
162 self.stats
163 .lock()
164 .unwrap()
165 .record_successful_query(execution_time);
166
167 let mut df_builder = DataFrame::new(Vec::new()).unwrap();
169 for (i, column) in columns.iter().enumerate() {
170 let series: Series = data
171 .iter()
172 .map(|row| row[i].clone())
173 .collect::<Vec<String>>()
174 .into_iter()
175 .collect();
176 df_builder = df_builder.with_column(series.with_name(column)).unwrap();
177 }
178
179 Ok(df_builder)
180 }
181
182 #[cfg(not(feature = "database"))]
183 {
184 let execution_time = start_time.elapsed();
186 self.stats
187 .lock()
188 .unwrap()
189 .record_successful_query(execution_time);
190
191 Ok(DataFrame::new(Vec::new()).unwrap())
193 }
194 }
195
196 async fn execute_query_with_params(
197 &self,
198 query: &str,
199 _params: &[&dyn ToSql],
200 ) -> Result<DataFrame, DataSourceError> {
201 self.execute_query(query).await
203 }
204
205 async fn execute_statement(&self, _statement: &str) -> Result<u64, DataSourceError> {
206 let start_time = std::time::Instant::now();
207
208 #[cfg(feature = "database")]
209 {
210 let result = sqlx::query(_statement)
211 .execute(&*self.pool)
212 .await
213 .map_err(|e| DataSourceError::QueryFailed(e.to_string()))?;
214
215 let execution_time = start_time.elapsed();
216 self.stats
217 .lock()
218 .unwrap()
219 .record_successful_query(execution_time);
220
221 Ok(result.rows_affected())
222 }
223
224 #[cfg(not(feature = "database"))]
225 {
226 let execution_time = start_time.elapsed();
227 self.stats
228 .lock()
229 .unwrap()
230 .record_successful_query(execution_time);
231 Ok(1) }
233 }
234
235 async fn execute_statement_with_params(
236 &self,
237 statement: &str,
238 _params: &[&dyn ToSql],
239 ) -> Result<u64, DataSourceError> {
240 self.execute_statement(statement).await
242 }
243
244 async fn begin_transaction(&self) -> Result<Box<dyn Transaction>, DataSourceError> {
245 Ok(Box::new(MockTransaction::new()))
247 }
248
249 async fn get_schema(&self) -> Result<super::types::Schema, DataSourceError> {
250 let mut schema = super::types::Schema::new();
252
253 let mut table = TableInfo::new("users".to_string());
255 table.add_column(ColumnInfo::new("id".to_string(), "INTEGER".to_string()));
256 table.add_column(ColumnInfo::new("name".to_string(), "VARCHAR".to_string()));
257 table.add_primary_key("id".to_string());
258 schema.add_table(table);
259
260 Ok(schema)
261 }
262
263 async fn get_table_info(&self, table_name: &str) -> Result<TableInfo, DataSourceError> {
264 let mut table = TableInfo::new(table_name.to_string());
266 table.add_column(ColumnInfo::new("id".to_string(), "INTEGER".to_string()));
267 table.add_column(ColumnInfo::new("name".to_string(), "VARCHAR".to_string()));
268 Ok(table)
269 }
270
271 async fn list_tables(&self) -> Result<Vec<String>, DataSourceError> {
272 Ok(vec![
274 "users".to_string(),
275 "orders".to_string(),
276 "products".to_string(),
277 ])
278 }
279
280 async fn list_views(&self) -> Result<Vec<String>, DataSourceError> {
281 Ok(vec!["user_summary".to_string()])
283 }
284
285 async fn table_exists(&self, table_name: &str) -> Result<bool, DataSourceError> {
286 let tables = self.list_tables().await?;
287 Ok(tables.contains(&table_name.to_string()))
288 }
289
290 async fn get_stats(&self) -> Result<ConnectionStats, DataSourceError> {
291 Ok(self.stats.lock().unwrap().clone())
292 }
293
294 async fn close(&self) -> Result<(), DataSourceError> {
295 Ok(())
297 }
298}
299
300pub struct ClickHouseAdapter {
302 config: ConnectionConfig,
303 #[cfg(feature = "database")]
304 client: Option<ClickHouseClient>,
305 #[cfg(not(feature = "database"))]
306 client: Option<Arc<RwLock<MockClickHouseClient>>>,
307}
308
309impl ClickHouseAdapter {
310 pub fn new(config: ConnectionConfig) -> Self {
311 Self {
312 config,
313 client: None,
314 }
315 }
316
317 pub async fn initialize(&mut self) -> Result<(), DataSourceError> {
318 #[cfg(feature = "database")]
319 {
320 let client = ClickHouseClient::default()
321 .with_url(&self.config.connection_string)
322 .map_err(|e| DataSourceError::ConnectionFailed(e.to_string()))?;
323 self.client = Some(client);
324 }
325 #[cfg(not(feature = "database"))]
326 {
327 let client = MockClickHouseClient::new(&self.config).await?;
328 self.client = Some(Arc::new(RwLock::new(client)));
329 }
330 Ok(())
331 }
332}
333
334#[async_trait]
335impl DataSource for ClickHouseAdapter {
336 fn name(&self) -> &str {
337 "ClickHouse"
338 }
339
340 fn source_type(&self) -> &str {
341 "clickhouse"
342 }
343
344 async fn connect(
345 &self,
346 _config: &ConnectionConfig,
347 ) -> Result<Box<dyn Connection>, DataSourceError> {
348 let client = self.client.as_ref().ok_or_else(|| {
349 DataSourceError::ConnectionFailed("Client not initialized".to_string())
350 })?;
351
352 let conn = ClickHouseConnection::new(client.clone()).await?;
353 Ok(Box::new(conn))
354 }
355
356 async fn test_connection(&self, _config: &ConnectionConfig) -> Result<bool, DataSourceError> {
357 match self.connect(_config).await {
358 Ok(_) => Ok(true),
359 Err(_) => Ok(false),
360 }
361 }
362
363 fn supported_features(&self) -> Vec<String> {
364 vec![
365 "columnar_storage".to_string(),
366 "compression".to_string(),
367 "distributed_queries".to_string(),
368 "materialized_views".to_string(),
369 "replication".to_string(),
370 ]
371 }
372
373 fn connection_limits(&self) -> ConnectionLimits {
374 ConnectionLimits::new()
375 .with_max_connections(50)
376 .with_max_query_time(std::time::Duration::from_secs(600))
377 .with_max_result_size(1_000_000_000) .with_max_parameters(10000)
379 }
380
381 async fn get_stats(&self) -> Result<DataSourceStats, DataSourceError> {
382 Ok(DataSourceStats::new())
383 }
384}
385
386pub struct ClickHouseConnection {
388 #[cfg(feature = "database")]
389 client: ClickHouseClient,
390 #[cfg(not(feature = "database"))]
391 client: Arc<RwLock<MockClickHouseClient>>,
392 stats: Arc<Mutex<ConnectionStats>>,
393}
394
395impl ClickHouseConnection {
396 pub async fn new(
397 #[cfg(feature = "database")] client: ClickHouseClient,
398 #[cfg(not(feature = "database"))] client: Arc<RwLock<MockClickHouseClient>>,
399 ) -> Result<Self, DataSourceError> {
400 Ok(Self {
401 client,
402 stats: Arc::new(Mutex::new(ConnectionStats::new(
403 uuid::Uuid::new_v4().to_string(),
404 ))),
405 })
406 }
407}
408
409#[async_trait]
410impl Connection for ClickHouseConnection {
411 async fn execute_query(&self, _query: &str) -> Result<DataFrame, DataSourceError> {
412 let start_time = std::time::Instant::now();
413
414 let execution_time = start_time.elapsed();
416 self.stats
417 .lock()
418 .unwrap()
419 .record_successful_query(execution_time);
420
421 Ok(DataFrame::new(Vec::new()).unwrap())
423 }
424
425 async fn execute_query_with_params(
426 &self,
427 query: &str,
428 _params: &[&dyn ToSql],
429 ) -> Result<DataFrame, DataSourceError> {
430 self.execute_query(query).await
431 }
432
433 async fn execute_statement(&self, _statement: &str) -> Result<u64, DataSourceError> {
434 let start_time = std::time::Instant::now();
435 let execution_time = start_time.elapsed();
436 self.stats
437 .lock()
438 .unwrap()
439 .record_successful_query(execution_time);
440 Ok(1) }
442
443 async fn execute_statement_with_params(
444 &self,
445 statement: &str,
446 _params: &[&dyn ToSql],
447 ) -> Result<u64, DataSourceError> {
448 self.execute_statement(statement).await
449 }
450
451 async fn begin_transaction(&self) -> Result<Box<dyn Transaction>, DataSourceError> {
452 Ok(Box::new(MockTransaction::new()))
453 }
454
455 async fn get_schema(&self) -> Result<super::types::Schema, DataSourceError> {
456 let mut schema = super::types::Schema::new();
457
458 let mut table = TableInfo::new("events".to_string());
460 table.add_column(ColumnInfo::new(
461 "timestamp".to_string(),
462 "DateTime".to_string(),
463 ));
464 table.add_column(ColumnInfo::new("user_id".to_string(), "UInt64".to_string()));
465 table.add_column(ColumnInfo::new(
466 "event_type".to_string(),
467 "String".to_string(),
468 ));
469 schema.add_table(table);
470
471 Ok(schema)
472 }
473
474 async fn get_table_info(&self, table_name: &str) -> Result<TableInfo, DataSourceError> {
475 let mut table = TableInfo::new(table_name.to_string());
476 table.add_column(ColumnInfo::new("id".to_string(), "UInt64".to_string()));
477 table.add_column(ColumnInfo::new("data".to_string(), "String".to_string()));
478 Ok(table)
479 }
480
481 async fn list_tables(&self) -> Result<Vec<String>, DataSourceError> {
482 Ok(vec![
483 "events".to_string(),
484 "users".to_string(),
485 "sessions".to_string(),
486 ])
487 }
488
489 async fn list_views(&self) -> Result<Vec<String>, DataSourceError> {
490 Ok(vec!["daily_events".to_string()])
491 }
492
493 async fn table_exists(&self, table_name: &str) -> Result<bool, DataSourceError> {
494 let tables = self.list_tables().await?;
495 Ok(tables.contains(&table_name.to_string()))
496 }
497
498 async fn get_stats(&self) -> Result<ConnectionStats, DataSourceError> {
499 Ok(self.stats.lock().unwrap().clone())
500 }
501
502 async fn close(&self) -> Result<(), DataSourceError> {
503 Ok(())
504 }
505}
506
507#[cfg(not(feature = "database"))]
509pub struct MockConnectionPool {
510 config: ConnectionConfig,
511}
512
513#[cfg(not(feature = "database"))]
514impl MockConnectionPool {
515 pub async fn new(config: &ConnectionConfig) -> Result<Self, DataSourceError> {
516 Ok(Self {
517 config: config.clone(),
518 })
519 }
520}
521
522#[cfg(not(feature = "database"))]
523pub struct MockClickHouseClient {
524 config: ConnectionConfig,
525}
526
527#[cfg(not(feature = "database"))]
528impl MockClickHouseClient {
529 pub async fn new(config: &ConnectionConfig) -> Result<Self, DataSourceError> {
530 Ok(Self {
531 config: config.clone(),
532 })
533 }
534}
535
536pub struct MockTransaction {
537 active: bool,
538}
539
540impl MockTransaction {
541 pub fn new() -> Self {
542 Self { active: true }
543 }
544}
545
546#[async_trait]
547impl Transaction for MockTransaction {
548 async fn execute_query(&self, _query: &str) -> Result<DataFrame, DataSourceError> {
549 Ok(DataFrame::new(Vec::new()).unwrap())
550 }
551
552 async fn execute_query_with_params(
553 &self,
554 query: &str,
555 _params: &[&dyn ToSql],
556 ) -> Result<DataFrame, DataSourceError> {
557 self.execute_query(query).await
558 }
559
560 async fn execute_statement(&self, _statement: &str) -> Result<u64, DataSourceError> {
561 Ok(1)
562 }
563
564 async fn execute_statement_with_params(
565 &self,
566 statement: &str,
567 _params: &[&dyn ToSql],
568 ) -> Result<u64, DataSourceError> {
569 self.execute_statement(statement).await
570 }
571
572 async fn commit(&self) -> Result<(), DataSourceError> {
573 Ok(())
574 }
575
576 async fn rollback(&self) -> Result<(), DataSourceError> {
577 Ok(())
578 }
579
580 fn is_active(&self) -> bool {
581 self.active
582 }
583}