1use crate::{
10 error::{Error, Result},
11 types::SupabaseConfig,
12};
13use reqwest::Client as HttpClient;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::{
17 collections::HashMap,
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22#[cfg(not(target_arch = "wasm32"))]
23use tokio::sync::RwLock;
24
25#[cfg(target_arch = "wasm32")]
26use crate::async_runtime::RuntimeLock as RwLock;
27
28use tracing::{debug, info};
29
30#[derive(Debug, Clone)]
32pub struct Performance {
33 #[allow(dead_code)] http_client: Arc<HttpClient>,
35 #[allow(dead_code)] config: Arc<SupabaseConfig>,
37 connection_pool: Arc<ConnectionPool>,
38 cache: Arc<RequestCache>,
39 batch_processor: Arc<BatchProcessor>,
40}
41
42#[derive(Debug)]
44pub struct ConnectionPool {
45 pools: RwLock<HashMap<String, Arc<HttpClient>>>,
46 config: ConnectionPoolConfig,
47}
48
49#[derive(Debug, Clone)]
51pub struct ConnectionPoolConfig {
52 pub max_connections_per_host: usize,
54 pub idle_timeout: Duration,
56 pub keep_alive_timeout: Duration,
58 pub http2: bool,
60 pub user_agent: Option<String>,
62}
63
64impl Default for ConnectionPoolConfig {
65 fn default() -> Self {
66 Self {
67 max_connections_per_host: 10,
68 idle_timeout: Duration::from_secs(90),
69 keep_alive_timeout: Duration::from_secs(60),
70 http2: true,
71 user_agent: Some("supabase-rust/0.4.2".to_string()),
72 }
73 }
74}
75
76#[derive(Debug)]
78pub struct RequestCache {
79 cache: RwLock<HashMap<String, CacheEntry>>,
80 config: CacheConfig,
81}
82
83#[derive(Debug, Clone)]
85pub struct CacheConfig {
86 pub max_entries: usize,
88 pub default_ttl: Duration,
90 pub enable_compression: bool,
92 pub cache_success_only: bool,
94}
95
96impl Default for CacheConfig {
97 fn default() -> Self {
98 Self {
99 max_entries: 1000,
100 default_ttl: Duration::from_secs(300), enable_compression: true,
102 cache_success_only: true,
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
109pub struct CacheEntry {
110 pub data: Value,
112 pub created_at: Instant,
114 pub ttl: Duration,
116 pub size_bytes: usize,
118 pub hit_count: u64,
120}
121
122#[derive(Debug)]
124pub struct BatchProcessor {
125 pending_operations: RwLock<Vec<BatchOperation>>,
126 config: BatchConfig,
127}
128
129#[derive(Debug, Clone)]
131pub struct BatchConfig {
132 pub max_batch_size: usize,
134 pub flush_interval: Duration,
136 pub auto_batch: bool,
138 pub batch_timeout: Duration,
140}
141
142impl Default for BatchConfig {
143 fn default() -> Self {
144 Self {
145 max_batch_size: 50,
146 flush_interval: Duration::from_millis(100),
147 auto_batch: true,
148 batch_timeout: Duration::from_secs(5),
149 }
150 }
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct BatchOperation {
156 pub id: String,
158 pub method: String,
160 pub url: String,
162 pub headers: HashMap<String, String>,
164 pub body: Option<Value>,
166 pub priority: u8,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct BatchResult {
173 pub id: String,
175 pub status: u16,
177 pub data: Option<Value>,
179 pub error: Option<String>,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct PerformanceMetrics {
186 pub active_connections: usize,
188 pub cache_hit_ratio: f64,
190 pub cache_entries: usize,
192 pub avg_response_time_ms: f64,
194 pub total_requests: u64,
196 pub successful_requests: u64,
198 pub failed_requests: u64,
200 pub batched_operations: u64,
202}
203
204impl Performance {
205 pub fn new(config: Arc<SupabaseConfig>, http_client: Arc<HttpClient>) -> Result<Self> {
207 debug!("Initializing Performance module");
208
209 let connection_pool = Arc::new(ConnectionPool::new(ConnectionPoolConfig::default()));
210 let cache = Arc::new(RequestCache::new(CacheConfig::default()));
211 let batch_processor = Arc::new(BatchProcessor::new(BatchConfig::default()));
212
213 Ok(Self {
214 http_client,
215 config,
216 connection_pool,
217 cache,
218 batch_processor,
219 })
220 }
221
222 pub fn new_with_config(
224 config: Arc<SupabaseConfig>,
225 http_client: Arc<HttpClient>,
226 pool_config: ConnectionPoolConfig,
227 cache_config: CacheConfig,
228 batch_config: BatchConfig,
229 ) -> Result<Self> {
230 debug!("Initializing Performance module with custom config");
231
232 let connection_pool = Arc::new(ConnectionPool::new(pool_config));
233 let cache = Arc::new(RequestCache::new(cache_config));
234 let batch_processor = Arc::new(BatchProcessor::new(batch_config));
235
236 Ok(Self {
237 http_client,
238 config,
239 connection_pool,
240 cache,
241 batch_processor,
242 })
243 }
244
245 pub async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
247 self.connection_pool.get_client(host).await
248 }
249
250 pub async fn cache_response(
252 &self,
253 key: &str,
254 data: Value,
255 ttl: Option<Duration>,
256 ) -> Result<()> {
257 self.cache.set(key, data, ttl).await
258 }
259
260 pub async fn get_cached_response(&self, key: &str) -> Result<Option<Value>> {
262 self.cache.get(key).await
263 }
264
265 pub async fn add_to_batch(&self, operation: BatchOperation) -> Result<()> {
267 self.batch_processor.add_operation(operation).await
268 }
269
270 pub async fn process_batch(&self) -> Result<Vec<BatchResult>> {
272 self.batch_processor.process_pending().await
273 }
274
275 pub async fn get_metrics(&self) -> PerformanceMetrics {
277 let connection_metrics = self.connection_pool.get_metrics().await;
278 let cache_metrics = self.cache.get_metrics().await;
279 let batch_metrics = self.batch_processor.get_metrics().await;
280
281 PerformanceMetrics {
282 active_connections: connection_metrics.active_count,
283 cache_hit_ratio: cache_metrics.hit_ratio,
284 cache_entries: cache_metrics.entry_count,
285 avg_response_time_ms: 0.0, total_requests: 0, successful_requests: 0, failed_requests: 0, batched_operations: batch_metrics.total_operations,
290 }
291 }
292
293 pub async fn clear_cache(&self) -> Result<()> {
295 self.cache.clear().await
296 }
297
298 pub async fn warm_up_connections(&self, hosts: Vec<String>) -> Result<()> {
300 for host in hosts {
301 let _ = self.connection_pool.get_client(&host).await?;
302 debug!("Warmed up connection for host: {}", host);
303 }
304 Ok(())
305 }
306}
307
308impl ConnectionPool {
311 fn new(config: ConnectionPoolConfig) -> Self {
312 Self {
313 pools: RwLock::new(HashMap::new()),
314 config,
315 }
316 }
317
318 async fn get_client(&self, host: &str) -> Result<Arc<HttpClient>> {
319 {
321 let pools = self.pools.read().await;
322 if let Some(client) = pools.get(host) {
323 return Ok(Arc::clone(client));
324 }
325 }
326
327 let client = self.create_optimized_client().await?;
329 let client_arc = Arc::new(client);
330
331 {
333 let mut pools = self.pools.write().await;
334 pools.insert(host.to_string(), Arc::clone(&client_arc));
335 }
336
337 info!("Created new HTTP client for host: {}", host);
338 Ok(client_arc)
339 }
340
341 async fn create_optimized_client(&self) -> Result<HttpClient> {
342 let mut builder = HttpClient::builder()
343 .pool_max_idle_per_host(self.config.max_connections_per_host)
344 .pool_idle_timeout(self.config.idle_timeout)
345 .tcp_keepalive(Some(self.config.keep_alive_timeout));
346
347 if let Some(user_agent) = &self.config.user_agent {
348 builder = builder.user_agent(user_agent);
349 }
350
351 builder
352 .build()
353 .map_err(|e| Error::config(format!("Failed to create HTTP client: {}", e)))
354 }
355
356 async fn get_metrics(&self) -> ConnectionMetrics {
357 let pools = self.pools.read().await;
358 ConnectionMetrics {
359 active_count: pools.len(),
360 total_created: pools.len() as u64, }
362 }
363}
364
365#[derive(Debug, Clone)]
366struct ConnectionMetrics {
367 active_count: usize,
368 #[allow(dead_code)] total_created: u64,
370}
371
372impl RequestCache {
375 fn new(config: CacheConfig) -> Self {
376 Self {
377 cache: RwLock::new(HashMap::new()),
378 config,
379 }
380 }
381
382 async fn set(&self, key: &str, data: Value, ttl: Option<Duration>) -> Result<()> {
383 let entry = CacheEntry {
384 data,
385 created_at: Instant::now(),
386 ttl: ttl.unwrap_or(self.config.default_ttl),
387 size_bytes: 0, hit_count: 0,
389 };
390
391 let mut cache = self.cache.write().await;
392
393 if cache.len() >= self.config.max_entries {
395 self.evict_oldest(&mut cache);
396 }
397
398 cache.insert(key.to_string(), entry);
399 debug!("Cached response for key: {}", key);
400 Ok(())
401 }
402
403 async fn get(&self, key: &str) -> Result<Option<Value>> {
404 let mut cache = self.cache.write().await;
405
406 if let Some(entry) = cache.get_mut(key) {
407 if entry.created_at.elapsed() > entry.ttl {
409 cache.remove(key);
410 debug!("Cache entry expired for key: {}", key);
411 return Ok(None);
412 }
413
414 entry.hit_count += 1;
416 debug!("Cache hit for key: {}", key);
417 Ok(Some(entry.data.clone()))
418 } else {
419 debug!("Cache miss for key: {}", key);
420 Ok(None)
421 }
422 }
423
424 async fn clear(&self) -> Result<()> {
425 let mut cache = self.cache.write().await;
426 cache.clear();
427 info!("Cache cleared");
428 Ok(())
429 }
430
431 async fn get_metrics(&self) -> CacheMetrics {
432 let cache = self.cache.read().await;
433 let total_hits: u64 = cache.values().map(|entry| entry.hit_count).sum();
434 let total_requests = total_hits + cache.len() as u64; CacheMetrics {
437 entry_count: cache.len(),
438 hit_ratio: if total_requests > 0 {
439 total_hits as f64 / total_requests as f64
440 } else {
441 0.0
442 },
443 }
444 }
445
446 fn evict_oldest(&self, cache: &mut HashMap<String, CacheEntry>) {
447 if let Some((oldest_key, _)) = cache
448 .iter()
449 .min_by_key(|(_, entry)| entry.created_at)
450 .map(|(k, v)| (k.clone(), v.created_at))
451 {
452 cache.remove(&oldest_key);
453 debug!("Evicted oldest cache entry: {}", oldest_key);
454 }
455 }
456}
457
458#[derive(Debug, Clone)]
459struct CacheMetrics {
460 entry_count: usize,
461 hit_ratio: f64,
462}
463
464impl BatchProcessor {
467 fn new(config: BatchConfig) -> Self {
468 Self {
469 pending_operations: RwLock::new(Vec::new()),
470 config,
471 }
472 }
473
474 async fn add_operation(&self, operation: BatchOperation) -> Result<()> {
475 let mut pending = self.pending_operations.write().await;
476 pending.push(operation);
477
478 if self.config.auto_batch && pending.len() >= self.config.max_batch_size {
480 drop(pending); let _ = self.process_pending().await;
482 }
483
484 Ok(())
485 }
486
487 async fn process_pending(&self) -> Result<Vec<BatchResult>> {
488 let mut pending = self.pending_operations.write().await;
489 if pending.is_empty() {
490 return Ok(Vec::new());
491 }
492
493 let operations = pending.drain(..).collect::<Vec<_>>();
494 drop(pending); debug!("Processing batch of {} operations", operations.len());
497
498 let results = operations
500 .into_iter()
501 .map(|op| BatchResult {
502 id: op.id,
503 status: 200, data: Some(Value::Null),
505 error: None,
506 })
507 .collect();
508
509 Ok(results)
510 }
511
512 async fn get_metrics(&self) -> BatchMetrics {
513 let pending = self.pending_operations.read().await;
514 BatchMetrics {
515 pending_operations: pending.len(),
516 total_operations: 0, }
518 }
519}
520
521#[derive(Debug, Clone)]
522struct BatchMetrics {
523 #[allow(dead_code)] pending_operations: usize,
525 total_operations: u64,
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531
532 #[tokio::test]
533 async fn test_connection_pool_creation() {
534 let pool = ConnectionPool::new(ConnectionPoolConfig::default());
535 let client = pool.get_client("localhost").await.unwrap();
536 assert!(Arc::strong_count(&client) >= 1);
538 }
539
540 #[tokio::test]
541 async fn test_cache_set_get() {
542 let cache = RequestCache::new(CacheConfig::default());
543 let test_data = serde_json::json!({"test": "data"});
544
545 cache
546 .set("test_key", test_data.clone(), None)
547 .await
548 .unwrap();
549 let retrieved = cache.get("test_key").await.unwrap();
550
551 assert_eq!(retrieved, Some(test_data));
552 }
553
554 #[tokio::test]
555 async fn test_batch_processor() {
556 let processor = BatchProcessor::new(BatchConfig::default());
557
558 let operation = BatchOperation {
559 id: "test_op".to_string(),
560 method: "GET".to_string(),
561 url: "https://example.com".to_string(),
562 headers: HashMap::new(),
563 body: None,
564 priority: 1,
565 };
566
567 processor.add_operation(operation).await.unwrap();
568 let results = processor.process_pending().await.unwrap();
569
570 assert_eq!(results.len(), 1);
571 assert_eq!(results[0].id, "test_op");
572 }
573}