supabase/
performance.rs

1//! Performance optimization module
2//!
3//! This module provides performance enhancements for Supabase operations:
4//! - **Connection Pooling**: Efficient HTTP client connection management
5//! - **Request Caching**: Intelligent API response caching
6//! - **Batch Operations**: Multi-request optimization
7//! - **Compression**: Request/response compression support
8
9use crate::{
10    error::{Error, Result},
11    types::SupabaseConfig,
12};
13use reqwest::Client as HttpClient;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::{
17    collections::HashMap,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22#[cfg(not(target_arch = "wasm32"))]
23use tokio::sync::RwLock;
24
25#[cfg(target_arch = "wasm32")]
26use crate::async_runtime::RuntimeLock as RwLock;
27
28use tracing::{debug, info};
29
30/// Performance optimization manager
31#[derive(Debug, Clone)]
32pub struct Performance {
33    #[allow(dead_code)] // Used in future implementations
34    http_client: Arc<HttpClient>,
35    #[allow(dead_code)] // Used in future implementations
36    config: Arc<SupabaseConfig>,
37    connection_pool: Arc<ConnectionPool>,
38    cache: Arc<RequestCache>,
39    batch_processor: Arc<BatchProcessor>,
40}
41
42/// Connection pool for HTTP clients
43#[derive(Debug)]
44pub struct ConnectionPool {
45    pools: RwLock<HashMap<String, Arc<HttpClient>>>,
46    config: ConnectionPoolConfig,
47}
48
49/// Connection pool configuration
50#[derive(Debug, Clone)]
51pub struct ConnectionPoolConfig {
52    /// Maximum connections per host
53    pub max_connections_per_host: usize,
54    /// Connection idle timeout
55    pub idle_timeout: Duration,
56    /// Connection keep-alive timeout
57    pub keep_alive_timeout: Duration,
58    /// Enable HTTP/2
59    pub http2: bool,
60    /// User agent string
61    pub user_agent: Option<String>,
62}
63
64impl Default for ConnectionPoolConfig {
65    fn default() -> Self {
66        Self {
67            max_connections_per_host: 10,
68            idle_timeout: Duration::from_secs(90),
69            keep_alive_timeout: Duration::from_secs(60),
70            http2: true,
71            user_agent: Some("supabase-rust/0.4.2".to_string()),
72        }
73    }
74}
75
76/// Request cache for API responses
77#[derive(Debug)]
78pub struct RequestCache {
79    cache: RwLock<HashMap<String, CacheEntry>>,
80    config: CacheConfig,
81}
82
83/// Cache configuration
84#[derive(Debug, Clone)]
85pub struct CacheConfig {
86    /// Maximum cache size (number of entries)
87    pub max_entries: usize,
88    /// Default cache TTL
89    pub default_ttl: Duration,
90    /// Enable cache compression
91    pub enable_compression: bool,
92    /// Cache only successful responses
93    pub cache_success_only: bool,
94}
95
96impl Default for CacheConfig {
97    fn default() -> Self {
98        Self {
99            max_entries: 1000,
100            default_ttl: Duration::from_secs(300), // 5 minutes
101            enable_compression: true,
102            cache_success_only: true,
103        }
104    }
105}
106
107/// Cache entry with metadata
108#[derive(Debug, Clone)]
109pub struct CacheEntry {
110    /// Cached response data
111    pub data: Value,
112    /// Entry creation time
113    pub created_at: Instant,
114    /// Time-to-live duration
115    pub ttl: Duration,
116    /// Response size (compressed if enabled)
117    pub size_bytes: usize,
118    /// Cache hit count
119    pub hit_count: u64,
120}
121
122/// Batch processing for multiple operations
123#[derive(Debug)]
124pub struct BatchProcessor {
125    pending_operations: RwLock<Vec<BatchOperation>>,
126    config: BatchConfig,
127}
128
129/// Batch processing configuration
130#[derive(Debug, Clone)]
131pub struct BatchConfig {
132    /// Maximum batch size
133    pub max_batch_size: usize,
134    /// Batch flush interval
135    pub flush_interval: Duration,
136    /// Enable automatic batching
137    pub auto_batch: bool,
138    /// Batch timeout
139    pub batch_timeout: Duration,
140}
141
142impl Default for BatchConfig {
143    fn default() -> Self {
144        Self {
145            max_batch_size: 50,
146            flush_interval: Duration::from_millis(100),
147            auto_batch: true,
148            batch_timeout: Duration::from_secs(5),
149        }
150    }
151}
152
153/// Batch operation
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct BatchOperation {
156    /// Operation ID
157    pub id: String,
158    /// HTTP method
159    pub method: String,
160    /// Request URL
161    pub url: String,
162    /// Request headers
163    pub headers: HashMap<String, String>,
164    /// Request body
165    pub body: Option<Value>,
166    /// Operation priority
167    pub priority: u8,
168}
169
170/// Batch execution result
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct BatchResult {
173    /// Operation ID
174    pub id: String,
175    /// HTTP status code
176    pub status: u16,
177    /// Response data
178    pub data: Option<Value>,
179    /// Error message if any
180    pub error: Option<String>,
181}
182
183/// Performance metrics
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct PerformanceMetrics {
186    /// Active connections count
187    pub active_connections: usize,
188    /// Cache hit ratio (0.0 to 1.0)
189    pub cache_hit_ratio: f64,
190    /// Cache entry count
191    pub cache_entries: usize,
192    /// Average response time (ms)
193    pub avg_response_time_ms: f64,
194    /// Total requests processed
195    pub total_requests: u64,
196    /// Successful requests count
197    pub successful_requests: u64,
198    /// Failed requests count
199    pub failed_requests: u64,
200    /// Total batched operations
201    pub batched_operations: u64,
202}
203
204impl Performance {
205    /// Create a new Performance instance
206    pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
207        debug!("Initializing Performance module");
208
209        let connection_pool = Arc::new(ConnectionPool::new(ConnectionPoolConfig::default()));
210        let cache = Arc::new(RequestCache::new(CacheConfig::default()));
211        let batch_processor = Arc::new(BatchProcessor::new(BatchConfig::default()));
212
213        Ok(Self {
214            http_client,
215            config,
216            connection_pool,
217            cache,
218            batch_processor,
219        })
220    }
221
222    /// Create with custom configuration
223    pub fn new_with_config(
224        config: Arc<SupabaseConfig>,
225        http_client: Arc<HttpClient>,
226        pool_config: ConnectionPoolConfig,
227        cache_config: CacheConfig,
228        batch_config: BatchConfig,
229    ) -> Result<Self> {
230        debug!("Initializing Performance module with custom config");
231
232        let connection_pool = Arc::new(ConnectionPool::new(pool_config));
233        let cache = Arc::new(RequestCache::new(cache_config));
234        let batch_processor = Arc::new(BatchProcessor::new(batch_config));
235
236        Ok(Self {
237            http_client,
238            config,
239            connection_pool,
240            cache,
241            batch_processor,
242        })
243    }
244
245    /// Get optimized HTTP client for a host
246    pub async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
247        self.connection_pool.get_client(host).await
248    }
249
250    /// Cache a response with optional TTL
251    pub async fn cache_response(
252        &self,
253        key: &str,
254        data: Value,
255        ttl: Option<Duration>,
256    ) -> Result<()> {
257        self.cache.set(key, data, ttl).await
258    }
259
260    /// Get cached response
261    pub async fn get_cached_response(&self, key: &str) -> Result<Option<Value>> {
262        self.cache.get(key).await
263    }
264
265    /// Add operation to batch processing queue
266    pub async fn add_to_batch(&self, operation: BatchOperation) -> Result<()> {
267        self.batch_processor.add_operation(operation).await
268    }
269
270    /// Process pending batch operations
271    pub async fn process_batch(&self) -> Result<Vec<BatchResult>> {
272        self.batch_processor.process_pending().await
273    }
274
275    /// Get performance metrics
276    pub async fn get_metrics(&self) -> PerformanceMetrics {
277        let connection_metrics = self.connection_pool.get_metrics().await;
278        let cache_metrics = self.cache.get_metrics().await;
279        let batch_metrics = self.batch_processor.get_metrics().await;
280
281        PerformanceMetrics {
282            active_connections: connection_metrics.active_count,
283            cache_hit_ratio: cache_metrics.hit_ratio,
284            cache_entries: cache_metrics.entry_count,
285            avg_response_time_ms: 0.0, // TODO: Implement response time tracking
286            total_requests: 0,         // TODO: Implement request tracking
287            successful_requests: 0,    // TODO: Implement success tracking
288            failed_requests: 0,        // TODO: Implement failure tracking
289            batched_operations: batch_metrics.total_operations,
290        }
291    }
292
293    /// Clear all caches
294    pub async fn clear_cache(&self) -> Result<()> {
295        self.cache.clear().await
296    }
297
298    /// Warm up connections for specified hosts
299    pub async fn warm_up_connections(&self, hosts: Vec<String>) -> Result<()> {
300        for host in hosts {
301            let _ = self.connection_pool.get_client(&host).await?;
302            debug!("Warmed up connection for host: {}", host);
303        }
304        Ok(())
305    }
306}
307
308// Connection Pool Implementation
309
310impl ConnectionPool {
311    fn new(config: ConnectionPoolConfig) -> Self {
312        Self {
313            pools: RwLock::new(HashMap::new()),
314            config,
315        }
316    }
317
318    async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
319        // Check if client already exists
320        {
321            let pools = self.pools.read().await;
322            if let Some(client) = pools.get(host) {
323                return Ok(Arc::clone(client));
324            }
325        }
326
327        // Create new optimized client
328        let client = self.create_optimized_client().await?;
329        let client_arc = Arc::new(client);
330
331        // Store in pool
332        {
333            let mut pools = self.pools.write().await;
334            pools.insert(host.to_string(), Arc::clone(&client_arc));
335        }
336
337        info!("Created new HTTP client for host: {}", host);
338        Ok(client_arc)
339    }
340
341    async fn create_optimized_client(&self) -> Result<HttpClient> {
342        let mut builder = HttpClient::builder()
343            .pool_max_idle_per_host(self.config.max_connections_per_host)
344            .pool_idle_timeout(self.config.idle_timeout)
345            .tcp_keepalive(Some(self.config.keep_alive_timeout));
346
347        if let Some(user_agent) = &self.config.user_agent {
348            builder = builder.user_agent(user_agent);
349        }
350
351        builder
352            .build()
353            .map_err(|e| Error::config(format!("Failed to create HTTP client: {}", e)))
354    }
355
356    async fn get_metrics(&self) -> ConnectionMetrics {
357        let pools = self.pools.read().await;
358        ConnectionMetrics {
359            active_count: pools.len(),
360            total_created: pools.len() as u64, // Simplified for now
361        }
362    }
363}
364
365#[derive(Debug, Clone)]
366struct ConnectionMetrics {
367    active_count: usize,
368    #[allow(dead_code)] // Used in future metrics implementations
369    total_created: u64,
370}
371
372// Request Cache Implementation
373
374impl RequestCache {
375    fn new(config: CacheConfig) -> Self {
376        Self {
377            cache: RwLock::new(HashMap::new()),
378            config,
379        }
380    }
381
382    async fn set(&self, key: &str, data: Value, ttl: Option<Duration>) -> Result<()> {
383        let entry = CacheEntry {
384            data,
385            created_at: Instant::now(),
386            ttl: ttl.unwrap_or(self.config.default_ttl),
387            size_bytes: 0, // TODO: Calculate actual size
388            hit_count: 0,
389        };
390
391        let mut cache = self.cache.write().await;
392
393        // Check cache size limit
394        if cache.len() >= self.config.max_entries {
395            self.evict_oldest(&mut cache);
396        }
397
398        cache.insert(key.to_string(), entry);
399        debug!("Cached response for key: {}", key);
400        Ok(())
401    }
402
403    async fn get(&self, key: &str) -> Result<Option<Value>> {
404        let mut cache = self.cache.write().await;
405
406        if let Some(entry) = cache.get_mut(key) {
407            // Check if expired
408            if entry.created_at.elapsed() > entry.ttl {
409                cache.remove(key);
410                debug!("Cache entry expired for key: {}", key);
411                return Ok(None);
412            }
413
414            // Update hit count
415            entry.hit_count += 1;
416            debug!("Cache hit for key: {}", key);
417            Ok(Some(entry.data.clone()))
418        } else {
419            debug!("Cache miss for key: {}", key);
420            Ok(None)
421        }
422    }
423
424    async fn clear(&self) -> Result<()> {
425        let mut cache = self.cache.write().await;
426        cache.clear();
427        info!("Cache cleared");
428        Ok(())
429    }
430
431    async fn get_metrics(&self) -> CacheMetrics {
432        let cache = self.cache.read().await;
433        let total_hits: u64 = cache.values().map(|entry| entry.hit_count).sum();
434        let total_requests = total_hits + cache.len() as u64; // Simplified calculation
435
436        CacheMetrics {
437            entry_count: cache.len(),
438            hit_ratio: if total_requests > 0 {
439                total_hits as f64 / total_requests as f64
440            } else {
441                0.0
442            },
443        }
444    }
445
446    fn evict_oldest(&self, cache: &mut HashMap<String, CacheEntry>) {
447        if let Some((oldest_key, _)) = cache
448            .iter()
449            .min_by_key(|(_, entry)| entry.created_at)
450            .map(|(k, v)| (k.clone(), v.created_at))
451        {
452            cache.remove(&oldest_key);
453            debug!("Evicted oldest cache entry: {}", oldest_key);
454        }
455    }
456}
457
458#[derive(Debug, Clone)]
459struct CacheMetrics {
460    entry_count: usize,
461    hit_ratio: f64,
462}
463
464// Batch Processor Implementation
465
466impl BatchProcessor {
467    fn new(config: BatchConfig) -> Self {
468        Self {
469            pending_operations: RwLock::new(Vec::new()),
470            config,
471        }
472    }
473
474    async fn add_operation(&self, operation: BatchOperation) -> Result<()> {
475        let mut pending = self.pending_operations.write().await;
476        pending.push(operation);
477
478        // Auto-process if batch is full
479        if self.config.auto_batch && pending.len() >= self.config.max_batch_size {
480            drop(pending); // Release lock
481            let _ = self.process_pending().await;
482        }
483
484        Ok(())
485    }
486
487    async fn process_pending(&self) -> Result<Vec<BatchResult>> {
488        let mut pending = self.pending_operations.write().await;
489        if pending.is_empty() {
490            return Ok(Vec::new());
491        }
492
493        let operations = pending.drain(..).collect::<Vec<_>>();
494        drop(pending); // Release lock
495
496        debug!("Processing batch of {} operations", operations.len());
497
498        // TODO: Implement actual HTTP batching
499        let results = operations
500            .into_iter()
501            .map(|op| BatchResult {
502                id: op.id,
503                status: 200, // Placeholder
504                data: Some(Value::Null),
505                error: None,
506            })
507            .collect();
508
509        Ok(results)
510    }
511
512    async fn get_metrics(&self) -> BatchMetrics {
513        let pending = self.pending_operations.read().await;
514        BatchMetrics {
515            pending_operations: pending.len(),
516            total_operations: 0, // TODO: Track total processed operations
517        }
518    }
519}
520
521#[derive(Debug, Clone)]
522struct BatchMetrics {
523    #[allow(dead_code)] // Used in future metrics implementations
524    pending_operations: usize,
525    total_operations: u64,
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    #[tokio::test]
533    async fn test_connection_pool_creation() {
534        let pool = ConnectionPool::new(ConnectionPoolConfig::default());
535        let client = pool.get_client("localhost").await.unwrap();
536        // Client should be successfully created with proper reference count
537        assert!(Arc::strong_count(&client) >= 1);
538    }
539
540    #[tokio::test]
541    async fn test_cache_set_get() {
542        let cache = RequestCache::new(CacheConfig::default());
543        let test_data = serde_json::json!({"test": "data"});
544
545        cache
546            .set("test_key", test_data.clone(), None)
547            .await
548            .unwrap();
549        let retrieved = cache.get("test_key").await.unwrap();
550
551        assert_eq!(retrieved, Some(test_data));
552    }
553
554    #[tokio::test]
555    async fn test_batch_processor() {
556        let processor = BatchProcessor::new(BatchConfig::default());
557
558        let operation = BatchOperation {
559            id: "test_op".to_string(),
560            method: "GET".to_string(),
561            url: "https://example.com".to_string(),
562            headers: HashMap::new(),
563            body: None,
564            priority: 1,
565        };
566
567        processor.add_operation(operation).await.unwrap();
568        let results = processor.process_pending().await.unwrap();
569
570        assert_eq!(results.len(), 1);
571        assert_eq!(results[0].id, "test_op");
572    }
573}