supabase/
performance.rs

1//! Performance optimization module
2//!
3//! This module provides performance enhancements for Supabase operations:
4//! - **Connection Pooling**: Efficient HTTP client connection management
5//! - **Request Caching**: Intelligent API response caching
6//! - **Batch Operations**: Multi-request optimization
7//! - **Compression**: Request/response compression support
8
9use crate::{
10    error::{Error, Result},
11    types::SupabaseConfig,
12};
13use reqwest::Client as HttpClient;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::{
17    collections::HashMap,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22#[cfg(not(target_arch = "wasm32"))]
23use tokio::sync::RwLock;
24
25#[cfg(target_arch = "wasm32")]
26mod wasm_rwlock {
27    use std::sync::RwLock as StdRwLock;
28
29    pub struct RwLock<T>(StdRwLock<T>);
30
31    impl<T> RwLock<T> {
32        pub fn new(value: T) -> Self {
33            Self(StdRwLock::new(value))
34        }
35
36        pub async fn read(&self) -> std::sync::RwLockReadGuard<'_, T> {
37            self.0.read().unwrap()
38        }
39
40        pub async fn write(&self) -> std::sync::RwLockWriteGuard<'_, T> {
41            self.0.write().unwrap()
42        }
43    }
44
45    impl<T: std::fmt::Debug> std::fmt::Debug for RwLock<T> {
46        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47            write!(f, "RwLock")
48        }
49    }
50}
51
52#[cfg(target_arch = "wasm32")]
53use wasm_rwlock::RwLock;
54
55use tracing::{debug, info};
56
57/// Performance optimization manager
58#[derive(Debug, Clone)]
59pub struct Performance {
60    #[allow(dead_code)] // Used in future implementations
61    http_client: Arc<HttpClient>,
62    #[allow(dead_code)] // Used in future implementations
63    config: Arc<SupabaseConfig>,
64    connection_pool: Arc<ConnectionPool>,
65    cache: Arc<RequestCache>,
66    batch_processor: Arc<BatchProcessor>,
67}
68
69/// Connection pool for HTTP clients
70#[derive(Debug)]
71pub struct ConnectionPool {
72    pools: RwLock<HashMap<String, Arc<HttpClient>>>,
73    config: ConnectionPoolConfig,
74}
75
76/// Connection pool configuration
77#[derive(Debug, Clone)]
78pub struct ConnectionPoolConfig {
79    /// Maximum connections per host
80    pub max_connections_per_host: usize,
81    /// Connection idle timeout
82    pub idle_timeout: Duration,
83    /// Connection keep-alive timeout
84    pub keep_alive_timeout: Duration,
85    /// Enable HTTP/2
86    pub http2: bool,
87    /// User agent string
88    pub user_agent: Option<String>,
89}
90
91impl Default for ConnectionPoolConfig {
92    fn default() -> Self {
93        Self {
94            max_connections_per_host: 10,
95            idle_timeout: Duration::from_secs(90),
96            keep_alive_timeout: Duration::from_secs(60),
97            http2: true,
98            user_agent: Some("supabase-rust/0.4.2".to_string()),
99        }
100    }
101}
102
103/// Request cache for API responses
104#[derive(Debug)]
105pub struct RequestCache {
106    cache: RwLock<HashMap<String, CacheEntry>>,
107    config: CacheConfig,
108}
109
110/// Cache configuration
111#[derive(Debug, Clone)]
112pub struct CacheConfig {
113    /// Maximum cache size (number of entries)
114    pub max_entries: usize,
115    /// Default cache TTL
116    pub default_ttl: Duration,
117    /// Enable cache compression
118    pub enable_compression: bool,
119    /// Cache only successful responses
120    pub cache_success_only: bool,
121}
122
123impl Default for CacheConfig {
124    fn default() -> Self {
125        Self {
126            max_entries: 1000,
127            default_ttl: Duration::from_secs(300), // 5 minutes
128            enable_compression: true,
129            cache_success_only: true,
130        }
131    }
132}
133
134/// Cache entry with metadata
135#[derive(Debug, Clone)]
136pub struct CacheEntry {
137    /// Cached response data
138    pub data: Value,
139    /// Entry creation time
140    pub created_at: Instant,
141    /// Time-to-live duration
142    pub ttl: Duration,
143    /// Response size (compressed if enabled)
144    pub size_bytes: usize,
145    /// Cache hit count
146    pub hit_count: u64,
147}
148
149/// Batch processing for multiple operations
150#[derive(Debug)]
151pub struct BatchProcessor {
152    pending_operations: RwLock<Vec<BatchOperation>>,
153    config: BatchConfig,
154}
155
156/// Batch processing configuration
157#[derive(Debug, Clone)]
158pub struct BatchConfig {
159    /// Maximum batch size
160    pub max_batch_size: usize,
161    /// Batch flush interval
162    pub flush_interval: Duration,
163    /// Enable automatic batching
164    pub auto_batch: bool,
165    /// Batch timeout
166    pub batch_timeout: Duration,
167}
168
169impl Default for BatchConfig {
170    fn default() -> Self {
171        Self {
172            max_batch_size: 50,
173            flush_interval: Duration::from_millis(100),
174            auto_batch: true,
175            batch_timeout: Duration::from_secs(5),
176        }
177    }
178}
179
180/// Batch operation
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct BatchOperation {
183    /// Operation ID
184    pub id: String,
185    /// HTTP method
186    pub method: String,
187    /// Request URL
188    pub url: String,
189    /// Request headers
190    pub headers: HashMap<String, String>,
191    /// Request body
192    pub body: Option<Value>,
193    /// Operation priority
194    pub priority: u8,
195}
196
197/// Batch execution result
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct BatchResult {
200    /// Operation ID
201    pub id: String,
202    /// HTTP status code
203    pub status: u16,
204    /// Response data
205    pub data: Option<Value>,
206    /// Error message if any
207    pub error: Option<String>,
208}
209
210/// Performance metrics
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct PerformanceMetrics {
213    /// Active connections count
214    pub active_connections: usize,
215    /// Cache hit ratio (0.0 to 1.0)
216    pub cache_hit_ratio: f64,
217    /// Cache entry count
218    pub cache_entries: usize,
219    /// Average response time (ms)
220    pub avg_response_time_ms: f64,
221    /// Total requests processed
222    pub total_requests: u64,
223    /// Successful requests count
224    pub successful_requests: u64,
225    /// Failed requests count
226    pub failed_requests: u64,
227    /// Total batched operations
228    pub batched_operations: u64,
229}
230
231impl Performance {
232    /// Create a new Performance instance
233    pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
234        debug!("Initializing Performance module");
235
236        let connection_pool = Arc::new(ConnectionPool::new(ConnectionPoolConfig::default()));
237        let cache = Arc::new(RequestCache::new(CacheConfig::default()));
238        let batch_processor = Arc::new(BatchProcessor::new(BatchConfig::default()));
239
240        Ok(Self {
241            http_client,
242            config,
243            connection_pool,
244            cache,
245            batch_processor,
246        })
247    }
248
249    /// Create with custom configuration
250    pub fn new_with_config(
251        config: Arc<SupabaseConfig>,
252        http_client: Arc<HttpClient>,
253        pool_config: ConnectionPoolConfig,
254        cache_config: CacheConfig,
255        batch_config: BatchConfig,
256    ) -> Result<Self> {
257        debug!("Initializing Performance module with custom config");
258
259        let connection_pool = Arc::new(ConnectionPool::new(pool_config));
260        let cache = Arc::new(RequestCache::new(cache_config));
261        let batch_processor = Arc::new(BatchProcessor::new(batch_config));
262
263        Ok(Self {
264            http_client,
265            config,
266            connection_pool,
267            cache,
268            batch_processor,
269        })
270    }
271
272    /// Get optimized HTTP client for a host
273    pub async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
274        self.connection_pool.get_client(host).await
275    }
276
277    /// Cache a response with optional TTL
278    pub async fn cache_response(
279        &self,
280        key: &str,
281        data: Value,
282        ttl: Option<Duration>,
283    ) -> Result<()> {
284        self.cache.set(key, data, ttl).await
285    }
286
287    /// Get cached response
288    pub async fn get_cached_response(&self, key: &str) -> Result<Option<Value>> {
289        self.cache.get(key).await
290    }
291
292    /// Add operation to batch processing queue
293    pub async fn add_to_batch(&self, operation: BatchOperation) -> Result<()> {
294        self.batch_processor.add_operation(operation).await
295    }
296
297    /// Process pending batch operations
298    pub async fn process_batch(&self) -> Result<Vec<BatchResult>> {
299        self.batch_processor.process_pending().await
300    }
301
302    /// Get performance metrics
303    pub async fn get_metrics(&self) -> PerformanceMetrics {
304        let connection_metrics = self.connection_pool.get_metrics().await;
305        let cache_metrics = self.cache.get_metrics().await;
306        let batch_metrics = self.batch_processor.get_metrics().await;
307
308        PerformanceMetrics {
309            active_connections: connection_metrics.active_count,
310            cache_hit_ratio: cache_metrics.hit_ratio,
311            cache_entries: cache_metrics.entry_count,
312            avg_response_time_ms: 0.0, // TODO: Implement response time tracking
313            total_requests: 0,         // TODO: Implement request tracking
314            successful_requests: 0,    // TODO: Implement success tracking
315            failed_requests: 0,        // TODO: Implement failure tracking
316            batched_operations: batch_metrics.total_operations,
317        }
318    }
319
320    /// Clear all caches
321    pub async fn clear_cache(&self) -> Result<()> {
322        self.cache.clear().await
323    }
324
325    /// Warm up connections for specified hosts
326    pub async fn warm_up_connections(&self, hosts: Vec<String>) -> Result<()> {
327        for host in hosts {
328            let _ = self.connection_pool.get_client(&host).await?;
329            debug!("Warmed up connection for host: {}", host);
330        }
331        Ok(())
332    }
333}
334
335// Connection Pool Implementation
336
337impl ConnectionPool {
338    fn new(config: ConnectionPoolConfig) -> Self {
339        Self {
340            pools: RwLock::new(HashMap::new()),
341            config,
342        }
343    }
344
345    async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
346        // Check if client already exists
347        {
348            let pools = self.pools.read().await;
349            if let Some(client) = pools.get(host) {
350                return Ok(Arc::clone(client));
351            }
352        }
353
354        // Create new optimized client
355        let client = self.create_optimized_client().await?;
356        let client_arc = Arc::new(client);
357
358        // Store in pool
359        {
360            let mut pools = self.pools.write().await;
361            pools.insert(host.to_string(), Arc::clone(&client_arc));
362        }
363
364        info!("Created new HTTP client for host: {}", host);
365        Ok(client_arc)
366    }
367
368    #[cfg(not(target_arch = "wasm32"))]
369    async fn create_optimized_client(&self) -> Result<HttpClient> {
370        let mut builder = HttpClient::builder()
371            .pool_max_idle_per_host(self.config.max_connections_per_host)
372            .pool_idle_timeout(self.config.idle_timeout)
373            .tcp_keepalive(Some(self.config.keep_alive_timeout));
374
375        if let Some(user_agent) = &self.config.user_agent {
376            builder = builder.user_agent(user_agent);
377        }
378
379        builder
380            .build()
381            .map_err(|e| Error::config(format!("Failed to create HTTP client: {}", e)))
382    }
383
384    #[cfg(target_arch = "wasm32")]
385    async fn create_optimized_client(&self) -> Result<HttpClient> {
386        let mut builder = HttpClient::builder();
387
388        if let Some(user_agent) = &self.config.user_agent {
389            builder = builder.user_agent(user_agent);
390        }
391
392        builder
393            .build()
394            .map_err(|e| Error::config(format!("Failed to create HTTP client: {}", e)))
395    }
396
397    async fn get_metrics(&self) -> ConnectionMetrics {
398        let pools = self.pools.read().await;
399        ConnectionMetrics {
400            active_count: pools.len(),
401            total_created: pools.len() as u64, // Simplified for now
402        }
403    }
404}
405
406#[derive(Debug, Clone)]
407struct ConnectionMetrics {
408    active_count: usize,
409    #[allow(dead_code)] // Used in future metrics implementations
410    total_created: u64,
411}
412
413// Request Cache Implementation
414
415impl RequestCache {
416    fn new(config: CacheConfig) -> Self {
417        Self {
418            cache: RwLock::new(HashMap::new()),
419            config,
420        }
421    }
422
423    async fn set(&self, key: &str, data: Value, ttl: Option<Duration>) -> Result<()> {
424        let entry = CacheEntry {
425            data,
426            created_at: Instant::now(),
427            ttl: ttl.unwrap_or(self.config.default_ttl),
428            size_bytes: 0, // TODO: Calculate actual size
429            hit_count: 0,
430        };
431
432        let mut cache = self.cache.write().await;
433
434        // Check cache size limit
435        if cache.len() >= self.config.max_entries {
436            self.evict_oldest(&mut cache);
437        }
438
439        cache.insert(key.to_string(), entry);
440        debug!("Cached response for key: {}", key);
441        Ok(())
442    }
443
444    async fn get(&self, key: &str) -> Result<Option<Value>> {
445        let mut cache = self.cache.write().await;
446
447        if let Some(entry) = cache.get_mut(key) {
448            // Check if expired
449            if entry.created_at.elapsed() > entry.ttl {
450                cache.remove(key);
451                debug!("Cache entry expired for key: {}", key);
452                return Ok(None);
453            }
454
455            // Update hit count
456            entry.hit_count += 1;
457            debug!("Cache hit for key: {}", key);
458            Ok(Some(entry.data.clone()))
459        } else {
460            debug!("Cache miss for key: {}", key);
461            Ok(None)
462        }
463    }
464
465    async fn clear(&self) -> Result<()> {
466        let mut cache = self.cache.write().await;
467        cache.clear();
468        info!("Cache cleared");
469        Ok(())
470    }
471
472    async fn get_metrics(&self) -> CacheMetrics {
473        let cache = self.cache.read().await;
474        let total_hits: u64 = cache.values().map(|entry| entry.hit_count).sum();
475        let total_requests = total_hits + cache.len() as u64; // Simplified calculation
476
477        CacheMetrics {
478            entry_count: cache.len(),
479            hit_ratio: if total_requests > 0 {
480                total_hits as f64 / total_requests as f64
481            } else {
482                0.0
483            },
484        }
485    }
486
487    fn evict_oldest(&self, cache: &mut HashMap<String, CacheEntry>) {
488        if let Some((oldest_key, _)) = cache
489            .iter()
490            .min_by_key(|(_, entry)| entry.created_at)
491            .map(|(k, v)| (k.clone(), v.created_at))
492        {
493            cache.remove(&oldest_key);
494            debug!("Evicted oldest cache entry: {}", oldest_key);
495        }
496    }
497}
498
499#[derive(Debug, Clone)]
500struct CacheMetrics {
501    entry_count: usize,
502    hit_ratio: f64,
503}
504
505// Batch Processor Implementation
506
507impl BatchProcessor {
508    fn new(config: BatchConfig) -> Self {
509        Self {
510            pending_operations: RwLock::new(Vec::new()),
511            config,
512        }
513    }
514
515    async fn add_operation(&self, operation: BatchOperation) -> Result<()> {
516        let mut pending = self.pending_operations.write().await;
517        pending.push(operation);
518
519        // Auto-process if batch is full
520        if self.config.auto_batch && pending.len() >= self.config.max_batch_size {
521            drop(pending); // Release lock
522            let _ = self.process_pending().await;
523        }
524
525        Ok(())
526    }
527
528    async fn process_pending(&self) -> Result<Vec<BatchResult>> {
529        let mut pending = self.pending_operations.write().await;
530        if pending.is_empty() {
531            return Ok(Vec::new());
532        }
533
534        let operations = pending.drain(..).collect::<Vec<_>>();
535        drop(pending); // Release lock
536
537        debug!("Processing batch of {} operations", operations.len());
538
539        // TODO: Implement actual HTTP batching
540        let results = operations
541            .into_iter()
542            .map(|op| BatchResult {
543                id: op.id,
544                status: 200, // Placeholder
545                data: Some(Value::Null),
546                error: None,
547            })
548            .collect();
549
550        Ok(results)
551    }
552
553    async fn get_metrics(&self) -> BatchMetrics {
554        let pending = self.pending_operations.read().await;
555        BatchMetrics {
556            pending_operations: pending.len(),
557            total_operations: 0, // TODO: Track total processed operations
558        }
559    }
560}
561
562#[derive(Debug, Clone)]
563struct BatchMetrics {
564    #[allow(dead_code)] // Used in future metrics implementations
565    pending_operations: usize,
566    total_operations: u64,
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572
573    #[tokio::test]
574    async fn test_connection_pool_creation() {
575        let pool = ConnectionPool::new(ConnectionPoolConfig::default());
576        let client = pool.get_client("localhost").await.unwrap();
577        // Client should be successfully created with proper reference count
578        assert!(Arc::strong_count(&client) >= 1);
579    }
580
581    #[tokio::test]
582    async fn test_cache_set_get() {
583        let cache = RequestCache::new(CacheConfig::default());
584        let test_data = serde_json::json!({"test": "data"});
585
586        cache
587            .set("test_key", test_data.clone(), None)
588            .await
589            .unwrap();
590        let retrieved = cache.get("test_key").await.unwrap();
591
592        assert_eq!(retrieved, Some(test_data));
593    }
594
595    #[tokio::test]
596    async fn test_batch_processor() {
597        let processor = BatchProcessor::new(BatchConfig::default());
598
599        let operation = BatchOperation {
600            id: "test_op".to_string(),
601            method: "GET".to_string(),
602            url: "https://example.com".to_string(),
603            headers: HashMap::new(),
604            body: None,
605            priority: 1,
606        };
607
608        processor.add_operation(operation).await.unwrap();
609        let results = processor.process_pending().await.unwrap();
610
611        assert_eq!(results.len(), 1);
612        assert_eq!(results[0].id, "test_op");
613    }
614}