1use crate::{
10 error::{Error, Result},
11 types::SupabaseConfig,
12};
13use reqwest::Client as HttpClient;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::{
17 collections::HashMap,
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22#[cfg(not(target_arch = "wasm32"))]
23use tokio::sync::RwLock;
24
25#[cfg(target_arch = "wasm32")]
26mod wasm_rwlock {
27 use std::sync::RwLock as StdRwLock;
28
29 pub struct RwLock<T>(StdRwLock<T>);
30
31 impl<T> RwLock<T> {
32 pub fn new(value: T) -> Self {
33 Self(StdRwLock::new(value))
34 }
35
36 pub async fn read(&self) -> std::sync::RwLockReadGuard<'_, T> {
37 self.0.read().unwrap()
38 }
39
40 pub async fn write(&self) -> std::sync::RwLockWriteGuard<'_, T> {
41 self.0.write().unwrap()
42 }
43 }
44
45 impl<T: std::fmt::Debug> std::fmt::Debug for RwLock<T> {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 write!(f, "RwLock")
48 }
49 }
50}
51
52#[cfg(target_arch = "wasm32")]
53use wasm_rwlock::RwLock;
54
55use tracing::{debug, info};
56
57#[derive(Debug, Clone)]
59pub struct Performance {
60 #[allow(dead_code)] http_client: Arc<HttpClient>,
62 #[allow(dead_code)] config: Arc<SupabaseConfig>,
64 connection_pool: Arc<ConnectionPool>,
65 cache: Arc<RequestCache>,
66 batch_processor: Arc<BatchProcessor>,
67}
68
69#[derive(Debug)]
71pub struct ConnectionPool {
72 pools: RwLock<HashMap<String, Arc<HttpClient>>>,
73 config: ConnectionPoolConfig,
74}
75
76#[derive(Debug, Clone)]
78pub struct ConnectionPoolConfig {
79 pub max_connections_per_host: usize,
81 pub idle_timeout: Duration,
83 pub keep_alive_timeout: Duration,
85 pub http2: bool,
87 pub user_agent: Option<String>,
89}
90
91impl Default for ConnectionPoolConfig {
92 fn default() -> Self {
93 Self {
94 max_connections_per_host: 10,
95 idle_timeout: Duration::from_secs(90),
96 keep_alive_timeout: Duration::from_secs(60),
97 http2: true,
98 user_agent: Some("supabase-rust/0.4.2".to_string()),
99 }
100 }
101}
102
103#[derive(Debug)]
105pub struct RequestCache {
106 cache: RwLock<HashMap<String, CacheEntry>>,
107 config: CacheConfig,
108}
109
110#[derive(Debug, Clone)]
112pub struct CacheConfig {
113 pub max_entries: usize,
115 pub default_ttl: Duration,
117 pub enable_compression: bool,
119 pub cache_success_only: bool,
121}
122
123impl Default for CacheConfig {
124 fn default() -> Self {
125 Self {
126 max_entries: 1000,
127 default_ttl: Duration::from_secs(300), enable_compression: true,
129 cache_success_only: true,
130 }
131 }
132}
133
134#[derive(Debug, Clone)]
136pub struct CacheEntry {
137 pub data: Value,
139 pub created_at: Instant,
141 pub ttl: Duration,
143 pub size_bytes: usize,
145 pub hit_count: u64,
147}
148
149#[derive(Debug)]
151pub struct BatchProcessor {
152 pending_operations: RwLock<Vec<BatchOperation>>,
153 config: BatchConfig,
154}
155
156#[derive(Debug, Clone)]
158pub struct BatchConfig {
159 pub max_batch_size: usize,
161 pub flush_interval: Duration,
163 pub auto_batch: bool,
165 pub batch_timeout: Duration,
167}
168
169impl Default for BatchConfig {
170 fn default() -> Self {
171 Self {
172 max_batch_size: 50,
173 flush_interval: Duration::from_millis(100),
174 auto_batch: true,
175 batch_timeout: Duration::from_secs(5),
176 }
177 }
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct BatchOperation {
183 pub id: String,
185 pub method: String,
187 pub url: String,
189 pub headers: HashMap<String, String>,
191 pub body: Option<Value>,
193 pub priority: u8,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct BatchResult {
200 pub id: String,
202 pub status: u16,
204 pub data: Option<Value>,
206 pub error: Option<String>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct PerformanceMetrics {
213 pub active_connections: usize,
215 pub cache_hit_ratio: f64,
217 pub cache_entries: usize,
219 pub avg_response_time_ms: f64,
221 pub total_requests: u64,
223 pub successful_requests: u64,
225 pub failed_requests: u64,
227 pub batched_operations: u64,
229}
230
231impl Performance {
232 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
234 debug!("Initializing Performance module");
235
236 let connection_pool = Arc::new(ConnectionPool::new(ConnectionPoolConfig::default()));
237 let cache = Arc::new(RequestCache::new(CacheConfig::default()));
238 let batch_processor = Arc::new(BatchProcessor::new(BatchConfig::default()));
239
240 Ok(Self {
241 http_client,
242 config,
243 connection_pool,
244 cache,
245 batch_processor,
246 })
247 }
248
249 pub fn new_with_config(
251 config: Arc<SupabaseConfig>,
252 http_client: Arc<HttpClient>,
253 pool_config: ConnectionPoolConfig,
254 cache_config: CacheConfig,
255 batch_config: BatchConfig,
256 ) -> Result<Self> {
257 debug!("Initializing Performance module with custom config");
258
259 let connection_pool = Arc::new(ConnectionPool::new(pool_config));
260 let cache = Arc::new(RequestCache::new(cache_config));
261 let batch_processor = Arc::new(BatchProcessor::new(batch_config));
262
263 Ok(Self {
264 http_client,
265 config,
266 connection_pool,
267 cache,
268 batch_processor,
269 })
270 }
271
272 pub async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
274 self.connection_pool.get_client(host).await
275 }
276
277 pub async fn cache_response(
279 &self,
280 key: &str,
281 data: Value,
282 ttl: Option<Duration>,
283 ) -> Result<()> {
284 self.cache.set(key, data, ttl).await
285 }
286
287 pub async fn get_cached_response(&self, key: &str) -> Result<Option<Value>> {
289 self.cache.get(key).await
290 }
291
292 pub async fn add_to_batch(&self, operation: BatchOperation) -> Result<()> {
294 self.batch_processor.add_operation(operation).await
295 }
296
297 pub async fn process_batch(&self) -> Result<Vec<BatchResult>> {
299 self.batch_processor.process_pending().await
300 }
301
302 pub async fn get_metrics(&self) -> PerformanceMetrics {
304 let connection_metrics = self.connection_pool.get_metrics().await;
305 let cache_metrics = self.cache.get_metrics().await;
306 let batch_metrics = self.batch_processor.get_metrics().await;
307
308 PerformanceMetrics {
309 active_connections: connection_metrics.active_count,
310 cache_hit_ratio: cache_metrics.hit_ratio,
311 cache_entries: cache_metrics.entry_count,
312 avg_response_time_ms: 0.0, total_requests: 0, successful_requests: 0, failed_requests: 0, batched_operations: batch_metrics.total_operations,
317 }
318 }
319
320 pub async fn clear_cache(&self) -> Result<()> {
322 self.cache.clear().await
323 }
324
325 pub async fn warm_up_connections(&self, hosts: Vec<String>) -> Result<()> {
327 for host in hosts {
328 let _ = self.connection_pool.get_client(&host).await?;
329 debug!("Warmed up connection for host: {}", host);
330 }
331 Ok(())
332 }
333}
334
335impl ConnectionPool {
338 fn new(config: ConnectionPoolConfig) -> Self {
339 Self {
340 pools: RwLock::new(HashMap::new()),
341 config,
342 }
343 }
344
345 async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
346 {
348 let pools = self.pools.read().await;
349 if let Some(client) = pools.get(host) {
350 return Ok(Arc::clone(client));
351 }
352 }
353
354 let client = self.create_optimized_client().await?;
356 let client_arc = Arc::new(client);
357
358 {
360 let mut pools = self.pools.write().await;
361 pools.insert(host.to_string(), Arc::clone(&client_arc));
362 }
363
364 info!("Created new HTTP client for host: {}", host);
365 Ok(client_arc)
366 }
367
368 #[cfg(not(target_arch = "wasm32"))]
369 async fn create_optimized_client(&self) -> Result<HttpClient> {
370 let mut builder = HttpClient::builder()
371 .pool_max_idle_per_host(self.config.max_connections_per_host)
372 .pool_idle_timeout(self.config.idle_timeout)
373 .tcp_keepalive(Some(self.config.keep_alive_timeout));
374
375 if let Some(user_agent) = &self.config.user_agent {
376 builder = builder.user_agent(user_agent);
377 }
378
379 builder
380 .build()
381 .map_err(|e| Error::config(format!("Failed to create HTTP client: {}", e)))
382 }
383
384 #[cfg(target_arch = "wasm32")]
385 async fn create_optimized_client(&self) -> Result<HttpClient> {
386 let mut builder = HttpClient::builder();
387
388 if let Some(user_agent) = &self.config.user_agent {
389 builder = builder.user_agent(user_agent);
390 }
391
392 builder
393 .build()
394 .map_err(|e| Error::config(format!("Failed to create HTTP client: {}", e)))
395 }
396
397 async fn get_metrics(&self) -> ConnectionMetrics {
398 let pools = self.pools.read().await;
399 ConnectionMetrics {
400 active_count: pools.len(),
401 total_created: pools.len() as u64, }
403 }
404}
405
406#[derive(Debug, Clone)]
407struct ConnectionMetrics {
408 active_count: usize,
409 #[allow(dead_code)] total_created: u64,
411}
412
413impl RequestCache {
416 fn new(config: CacheConfig) -> Self {
417 Self {
418 cache: RwLock::new(HashMap::new()),
419 config,
420 }
421 }
422
423 async fn set(&self, key: &str, data: Value, ttl: Option<Duration>) -> Result<()> {
424 let entry = CacheEntry {
425 data,
426 created_at: Instant::now(),
427 ttl: ttl.unwrap_or(self.config.default_ttl),
428 size_bytes: 0, hit_count: 0,
430 };
431
432 let mut cache = self.cache.write().await;
433
434 if cache.len() >= self.config.max_entries {
436 self.evict_oldest(&mut cache);
437 }
438
439 cache.insert(key.to_string(), entry);
440 debug!("Cached response for key: {}", key);
441 Ok(())
442 }
443
444 async fn get(&self, key: &str) -> Result<Option<Value>> {
445 let mut cache = self.cache.write().await;
446
447 if let Some(entry) = cache.get_mut(key) {
448 if entry.created_at.elapsed() > entry.ttl {
450 cache.remove(key);
451 debug!("Cache entry expired for key: {}", key);
452 return Ok(None);
453 }
454
455 entry.hit_count += 1;
457 debug!("Cache hit for key: {}", key);
458 Ok(Some(entry.data.clone()))
459 } else {
460 debug!("Cache miss for key: {}", key);
461 Ok(None)
462 }
463 }
464
465 async fn clear(&self) -> Result<()> {
466 let mut cache = self.cache.write().await;
467 cache.clear();
468 info!("Cache cleared");
469 Ok(())
470 }
471
472 async fn get_metrics(&self) -> CacheMetrics {
473 let cache = self.cache.read().await;
474 let total_hits: u64 = cache.values().map(|entry| entry.hit_count).sum();
475 let total_requests = total_hits + cache.len() as u64; CacheMetrics {
478 entry_count: cache.len(),
479 hit_ratio: if total_requests > 0 {
480 total_hits as f64 / total_requests as f64
481 } else {
482 0.0
483 },
484 }
485 }
486
487 fn evict_oldest(&self, cache: &mut HashMap<String, CacheEntry>) {
488 if let Some((oldest_key, _)) = cache
489 .iter()
490 .min_by_key(|(_, entry)| entry.created_at)
491 .map(|(k, v)| (k.clone(), v.created_at))
492 {
493 cache.remove(&oldest_key);
494 debug!("Evicted oldest cache entry: {}", oldest_key);
495 }
496 }
497}
498
499#[derive(Debug, Clone)]
500struct CacheMetrics {
501 entry_count: usize,
502 hit_ratio: f64,
503}
504
505impl BatchProcessor {
508 fn new(config: BatchConfig) -> Self {
509 Self {
510 pending_operations: RwLock::new(Vec::new()),
511 config,
512 }
513 }
514
515 async fn add_operation(&self, operation: BatchOperation) -> Result<()> {
516 let mut pending = self.pending_operations.write().await;
517 pending.push(operation);
518
519 if self.config.auto_batch && pending.len() >= self.config.max_batch_size {
521 drop(pending); let _ = self.process_pending().await;
523 }
524
525 Ok(())
526 }
527
528 async fn process_pending(&self) -> Result<Vec<BatchResult>> {
529 let mut pending = self.pending_operations.write().await;
530 if pending.is_empty() {
531 return Ok(Vec::new());
532 }
533
534 let operations = pending.drain(..).collect::<Vec<_>>();
535 drop(pending); debug!("Processing batch of {} operations", operations.len());
538
539 let results = operations
541 .into_iter()
542 .map(|op| BatchResult {
543 id: op.id,
544 status: 200, data: Some(Value::Null),
546 error: None,
547 })
548 .collect();
549
550 Ok(results)
551 }
552
553 async fn get_metrics(&self) -> BatchMetrics {
554 let pending = self.pending_operations.read().await;
555 BatchMetrics {
556 pending_operations: pending.len(),
557 total_operations: 0, }
559 }
560}
561
562#[derive(Debug, Clone)]
563struct BatchMetrics {
564 #[allow(dead_code)] pending_operations: usize,
566 total_operations: u64,
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572
573 #[tokio::test]
574 async fn test_connection_pool_creation() {
575 let pool = ConnectionPool::new(ConnectionPoolConfig::default());
576 let client = pool.get_client("localhost").await.unwrap();
577 assert!(Arc::strong_count(&client) >= 1);
579 }
580
581 #[tokio::test]
582 async fn test_cache_set_get() {
583 let cache = RequestCache::new(CacheConfig::default());
584 let test_data = serde_json::json!({"test": "data"});
585
586 cache
587 .set("test_key", test_data.clone(), None)
588 .await
589 .unwrap();
590 let retrieved = cache.get("test_key").await.unwrap();
591
592 assert_eq!(retrieved, Some(test_data));
593 }
594
595 #[tokio::test]
596 async fn test_batch_processor() {
597 let processor = BatchProcessor::new(BatchConfig::default());
598
599 let operation = BatchOperation {
600 id: "test_op".to_string(),
601 method: "GET".to_string(),
602 url: "https://example.com".to_string(),
603 headers: HashMap::new(),
604 body: None,
605 priority: 1,
606 };
607
608 processor.add_operation(operation).await.unwrap();
609 let results = processor.process_pending().await.unwrap();
610
611 assert_eq!(results.len(), 1);
612 assert_eq!(results[0].id, "test_op");
613 }
614}