edb_rpc_proxy/
proxy.rs

1// EDB - Ethereum Debugger
2// Copyright (C) 2024 Zhuo Zhang and Wuqi Zhang
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Core proxy server implementation
18
19use crate::{
20    cache::CacheManager,
21    health::HealthService,
22    metrics::MetricsCollector,
23    providers::{ProviderManager, DEFAULT_MAINNET_RPCS},
24    registry::EdbRegistry,
25    rpc::RpcHandler,
26};
27use axum::{
28    extract::State,
29    http::{Method, StatusCode},
30    response::Json,
31    routing::post,
32    Router,
33};
34use eyre::Result;
35use serde_json::Value;
36use std::{net::SocketAddr, path::PathBuf, sync::Arc};
37use tokio::{net::TcpListener, sync::broadcast};
38use tower_http::cors::{Any, CorsLayer};
39use tracing::{debug, info, warn};
40
41/// Builder for configuring ProxyServer with fluent API and sensible defaults
42#[derive(Debug, Clone)]
43pub struct ProxyServerBuilder {
44    rpc_urls: Option<Vec<String>>,
45    max_cache_items: u32,
46    cache_dir: Option<PathBuf>,
47    grace_period: u64,
48    heartbeat_interval: u64,
49    max_failures: u32,
50    health_check_interval: u64,
51    cache_save_interval: u64,
52}
53
54impl Default for ProxyServerBuilder {
55    fn default() -> Self {
56        Self {
57            // General Configuration
58            rpc_urls: None, // Will use DEFAULT_MAINNET_RPCS
59
60            // Cache Configuration
61            max_cache_items: 1024000,
62            cache_dir: None,        // Will use ~/.edb/cache/rpc/<chain_id>
63            cache_save_interval: 5, // 5 minutes
64
65            // Provider Health Check Configuration
66            max_failures: 3,
67            health_check_interval: 60,
68
69            // EDB Register Configuration
70            grace_period: 0, // No auto-shutdown by default
71            heartbeat_interval: 10,
72        }
73    }
74}
75
76impl ProxyServerBuilder {
77    /// Create a new builder with default values
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    /// Set custom RPC URLs (comma-separated string or Vec)
83    #[allow(dead_code)]
84    pub fn rpc_urls<T: Into<Vec<String>>>(mut self, urls: T) -> Self {
85        self.rpc_urls = Some(urls.into());
86        self
87    }
88
89    /// Set custom RPC URLs from comma-separated string
90    pub fn rpc_urls_str(mut self, urls: &str) -> Self {
91        self.rpc_urls =
92            Some(urls.split(',').map(|s| s.trim().to_string()).filter(|s| !s.is_empty()).collect());
93        self
94    }
95
96    /// Set maximum number of cached items
97    pub fn max_cache_items(mut self, max_items: u32) -> Self {
98        self.max_cache_items = max_items;
99        self
100    }
101
102    /// Set cache directory path
103    pub fn cache_dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
104        self.cache_dir = Some(dir.into());
105        self
106    }
107
108    /// Set grace period in seconds before shutdown when no EDB instances (0 = no auto-shutdown)
109    pub fn grace_period(mut self, seconds: u64) -> Self {
110        self.grace_period = seconds;
111        self
112    }
113
114    /// Set heartbeat check interval in seconds
115    pub fn heartbeat_interval(mut self, seconds: u64) -> Self {
116        self.heartbeat_interval = seconds;
117        self
118    }
119
120    /// Set maximum consecutive failures before marking provider unhealthy
121    pub fn max_failures(mut self, failures: u32) -> Self {
122        self.max_failures = failures;
123        self
124    }
125
126    /// Set provider health check interval in seconds
127    pub fn health_check_interval(mut self, seconds: u64) -> Self {
128        self.health_check_interval = seconds;
129        self
130    }
131
132    /// Set cache save interval in minutes (0 = save only on shutdown)
133    pub fn cache_save_interval(mut self, minutes: u64) -> Self {
134        self.cache_save_interval = minutes;
135        self
136    }
137
138    /// Build the ProxyServer with the configured settings
139    pub async fn build(self) -> Result<ProxyServer> {
140        // Resolve RPC URLs
141        let rpc_urls = self
142            .rpc_urls
143            .unwrap_or_else(|| DEFAULT_MAINNET_RPCS.iter().map(|s| s.to_string()).collect());
144
145        // Resolve cache path
146        let cache_path = CacheManager::get_cache_path(&rpc_urls, self.cache_dir).await?;
147
148        // Now call the simplified ProxyServer::new with deterministic values
149        ProxyServer::new(
150            rpc_urls,
151            self.max_cache_items,
152            cache_path,
153            self.grace_period,
154            self.heartbeat_interval,
155            self.max_failures,
156            self.health_check_interval,
157            self.cache_save_interval,
158        )
159        .await
160    }
161}
162
163/// Main proxy server that combines RPC handling, registry, and health services
164///
165/// The ProxyServer coordinates between:
166/// - RPC request handling and caching
167/// - EDB instance registry and lifecycle management
168/// - Health check and monitoring endpoints
169///
170/// Use ProxyServerBuilder for easy configuration:
171/// ```no_run
172/// # use edb_rpc_proxy::proxy::ProxyServerBuilder;
173/// # async fn example() -> eyre::Result<()> {
174/// let proxy = ProxyServerBuilder::new()
175///     .max_cache_items(50000)
176///     .grace_period(300)
177///     .build()
178///     .await?;
179/// # Ok(())
180/// # }
181/// ```
182#[derive(Clone)]
183pub struct ProxyServer {
184    /// RPC request handler with caching capabilities
185    pub rpc_handler: Arc<RpcHandler>,
186    /// Registry for tracking connected EDB instances
187    pub registry: Arc<EdbRegistry>,
188    /// Health check service for monitoring
189    pub health_service: Arc<HealthService>,
190    /// Metrics collector for performance tracking
191    pub metrics_collector: Arc<MetricsCollector>,
192    /// Shutdown signal sender
193    shutdown_tx: broadcast::Sender<()>,
194}
195
196#[derive(Clone)]
197struct AppState {
198    proxy: ProxyServer,
199}
200
201impl ProxyServer {
202    /// Creates a new proxy server with deterministic configuration
203    ///
204    /// This method is now simplified to take concrete values rather than Options.
205    /// Use ProxyServerBuilder for a more convenient fluent API.
206    ///
207    /// # Arguments
208    /// * `rpc_urls` - List of upstream RPC endpoint URLs
209    /// * `max_cache_items` - Maximum number of items to cache
210    /// * `cache_path` - Resolved path for cache persistence
211    /// * `grace_period` - Seconds to wait before shutdown when no EDB instances
212    /// * `heartbeat_interval` - Seconds between heartbeat checks
213    /// * `max_failures` - Maximum consecutive failures before marking provider unhealthy
214    /// * `health_check_interval` - Seconds between provider health checks
215    /// * `cache_save_interval` - Minutes between periodic cache saves
216    ///
217    /// # Returns
218    /// A new ProxyServer instance with background tasks started
219    #[allow(clippy::too_many_arguments)]
220    async fn new(
221        rpc_urls: Vec<String>,
222        max_cache_items: u32,
223        cache_path: PathBuf,
224        grace_period: u64,
225        heartbeat_interval: u64,
226        max_failures: u32,
227        health_check_interval: u64,
228        cache_save_interval: u64,
229    ) -> Result<Self> {
230        info!("Starting EDB RPC Proxy with {} providers", rpc_urls.len());
231        for url in &rpc_urls {
232            info!("  - {}", url);
233        }
234
235        let cache_manager = Arc::new(CacheManager::new(max_cache_items, cache_path)?);
236        let metrics_collector = Arc::new(MetricsCollector::new());
237
238        // Create provider manager with all URLs
239        let provider_manager = Arc::new(ProviderManager::new(rpc_urls, max_failures).await?);
240
241        // Create RPC handler with provider manager
242        let rpc_handler = Arc::new(RpcHandler::new(
243            provider_manager.clone(),
244            cache_manager.clone(),
245            metrics_collector.clone(),
246        )?);
247        let health_service = Arc::new(HealthService::new());
248        let (shutdown_tx, _) = broadcast::channel(1);
249
250        // Create registry with shutdown channel
251        let registry =
252            Arc::new(EdbRegistry::new(grace_period, heartbeat_interval, shutdown_tx.clone()));
253
254        // Start background tasks (if grace period is active)
255        if grace_period > 0 {
256            let registry_clone = Arc::clone(&registry);
257            tokio::spawn(async move {
258                registry_clone.start_heartbeat_monitor().await;
259            });
260        }
261
262        // Start periodic health checks for providers
263        let provider_manager_clone = provider_manager.clone();
264        tokio::spawn(async move {
265            let mut interval =
266                tokio::time::interval(std::time::Duration::from_secs(health_check_interval));
267            loop {
268                interval.tick().await;
269                provider_manager_clone.health_check_all().await;
270            }
271        });
272
273        // Start periodic cache saving (if enabled)
274        if cache_save_interval > 0 {
275            let cache_manager_clone = cache_manager.clone();
276            tokio::spawn(async move {
277                let mut interval =
278                    tokio::time::interval(std::time::Duration::from_secs(cache_save_interval * 60));
279                loop {
280                    interval.tick().await;
281                    if let Err(e) = cache_manager_clone.save_to_disk().await {
282                        warn!("Failed to save cache periodically: {}", e);
283                    } else {
284                        debug!("Cache saved to disk (periodic save)");
285                    }
286                }
287            });
288        }
289
290        // Start background metrics collection task
291        let metrics_collector_clone = metrics_collector.clone();
292        let cache_manager_clone = cache_manager.clone();
293        let provider_manager_clone = provider_manager;
294        let registry_clone = registry.clone();
295        tokio::spawn(async move {
296            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
297            loop {
298                interval.tick().await;
299
300                // Collect current metrics for historical tracking
301                let cache_stats = cache_manager_clone.detailed_stats().await;
302                let providers_info = provider_manager_clone.get_providers_info().await;
303                let healthy_providers =
304                    providers_info.iter().filter(|p| p.is_healthy).count() as u64;
305                let total_providers = providers_info.len() as u64;
306                let active_instances = registry_clone.get_active_instances().await.len();
307
308                let total_entries =
309                    cache_stats.get("total_entries").and_then(|v| v.as_u64()).unwrap_or(0);
310                metrics_collector_clone.add_historical_point(
311                    total_entries,
312                    healthy_providers,
313                    total_providers,
314                    active_instances,
315                );
316            }
317        });
318
319        Ok(Self { rpc_handler, registry, health_service, metrics_collector, shutdown_tx })
320    }
321
322    /// Returns a reference to the cache manager
323    ///
324    /// # Returns
325    /// Reference to the underlying cache manager
326    pub fn cache_manager(&self) -> &Arc<CacheManager> {
327        self.rpc_handler.cache_manager()
328    }
329
330    /// Starts the proxy server listening on the specified address
331    ///
332    /// Creates an Axum web server with routes for:
333    /// - Standard JSON-RPC requests (POST /)
334    /// - EDB-specific management endpoints (edb_ping, edb_register, etc.)
335    ///
336    /// # Arguments
337    /// * `addr` - Socket address to bind to
338    ///
339    /// # Returns
340    /// Result indicating server startup success or failure
341    pub async fn serve(self, addr: SocketAddr) -> Result<()> {
342        let mut shutdown_rx = self.shutdown_tx.subscribe();
343        let cache_manager_for_shutdown = self.cache_manager().clone();
344
345        let app = Router::new()
346            .route("/", post(handle_rpc))
347            .layer(
348                CorsLayer::new()
349                    .allow_methods([Method::POST, Method::GET])
350                    .allow_headers(Any)
351                    .allow_origin(Any),
352            )
353            .with_state(AppState { proxy: self });
354
355        let listener = TcpListener::bind(addr).await?;
356        info!("EDB RPC Proxy listening on {}", addr);
357
358        // Create the server with graceful shutdown
359        let server = axum::serve(listener, app).with_graceful_shutdown(async move {
360            let _ = shutdown_rx.recv().await;
361            info!("Shutdown signal received, saving cache and stopping server gracefully");
362
363            // Save cache before shutdown
364            if let Err(e) = cache_manager_for_shutdown.save_to_disk().await {
365                warn!("Failed to save cache during shutdown: {}", e);
366            } else {
367                info!("Cache saved successfully during shutdown");
368            }
369        });
370
371        server.await?;
372
373        Ok(())
374    }
375}
376
377async fn handle_rpc(
378    State(state): State<AppState>,
379    Json(request): Json<Value>,
380) -> Result<Json<Value>, StatusCode> {
381    // Handle special EDB health check methods
382    debug!("Received RPC request: {}", request);
383    let response = if let Some(method) = request.get("method").and_then(|m| m.as_str()) {
384        match method {
385            "edb_ping" => {
386                let response = state.proxy.health_service.ping().await;
387                Ok(Json(response))
388            }
389            "edb_info" => {
390                let response = state.proxy.health_service.info().await;
391                Ok(Json(response))
392            }
393            "edb_register" => {
394                if let Some(params) = request.get("params").and_then(|p| p.as_array()) {
395                    if let (Some(pid), Some(timestamp)) = (
396                        params.first().and_then(|v| v.as_u64()),
397                        params.get(1).and_then(|v| v.as_u64()),
398                    ) {
399                        let response =
400                            state.proxy.registry.register_edb_instance(pid as u32, timestamp).await;
401                        Ok(Json(response))
402                    } else {
403                        Err(StatusCode::BAD_REQUEST)
404                    }
405                } else {
406                    Err(StatusCode::BAD_REQUEST)
407                }
408            }
409            "edb_heartbeat" => {
410                if let Some(params) = request.get("params").and_then(|p| p.as_array()) {
411                    if let Some(pid) = params.first().and_then(|v| v.as_u64()) {
412                        let response = state.proxy.registry.heartbeat(pid as u32).await;
413                        Ok(Json(response))
414                    } else {
415                        Err(StatusCode::BAD_REQUEST)
416                    }
417                } else {
418                    Err(StatusCode::BAD_REQUEST)
419                }
420            }
421            "edb_cache_stats" => {
422                let stats = state.proxy.cache_manager().detailed_stats().await;
423                let response = serde_json::json!({
424                    "jsonrpc": "2.0",
425                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
426                    "result": stats
427                });
428                Ok(Json(response))
429            }
430            "edb_active_instances" => {
431                let active_pids = state.proxy.registry.get_active_instances().await;
432                let response = serde_json::json!({
433                    "jsonrpc": "2.0",
434                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
435                    "result": {
436                        "active_instances": active_pids,
437                        "count": active_pids.len()
438                    }
439                });
440                Ok(Json(response))
441            }
442            "edb_providers" => {
443                let providers_info =
444                    state.proxy.rpc_handler.provider_manager().get_providers_info().await;
445                let healthy_count =
446                    state.proxy.rpc_handler.provider_manager().healthy_provider_count().await;
447                let response = serde_json::json!({
448                    "jsonrpc": "2.0",
449                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
450                    "result": {
451                        "providers": providers_info,
452                        "healthy_count": healthy_count,
453                        "total_count": providers_info.len()
454                    }
455                });
456                Ok(Json(response))
457            }
458            "edb_cache_metrics" => {
459                let metrics = state.proxy.metrics_collector;
460                let method_stats = metrics.get_method_stats();
461                let response = serde_json::json!({
462                    "jsonrpc": "2.0",
463                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
464                    "result": {
465                        "total_requests": metrics.total_requests.load(std::sync::atomic::Ordering::Relaxed),
466                        "cache_hits": metrics.cache_hits.load(std::sync::atomic::Ordering::Relaxed),
467                        "cache_misses": metrics.cache_misses.load(std::sync::atomic::Ordering::Relaxed),
468                        "hit_rate": format!("{:.1}%", metrics.cache_hit_rate()),
469                        "error_rate": format!("{:.1}%", metrics.error_rate()),
470                        "method_stats": method_stats,
471                        "total_errors": metrics.total_errors.load(std::sync::atomic::Ordering::Relaxed),
472                        "rate_limit_errors": metrics.rate_limit_errors.load(std::sync::atomic::Ordering::Relaxed),
473                        "user_errors": metrics.user_errors.load(std::sync::atomic::Ordering::Relaxed)
474                    }
475                });
476                Ok(Json(response))
477            }
478            "edb_provider_metrics" => {
479                let metrics = state.proxy.metrics_collector;
480                let provider_usage = metrics.get_provider_usage();
481                let total_requests_to_providers =
482                    metrics.cache_misses.load(std::sync::atomic::Ordering::Relaxed);
483
484                let providers: Vec<serde_json::Value> = provider_usage.iter().map(|(url, usage)| {
485                    serde_json::json!({
486                        "url": url,
487                        "request_count": usage.request_count,
488                        "success_rate": format!("{:.1}%", usage.success_rate()),
489                        "avg_response_time_ms": usage.avg_response_time_ms(),
490                        "load_percentage": format!("{:.1}%", usage.load_percentage(total_requests_to_providers)),
491                        "last_used_timestamp": usage.last_used_timestamp,
492                        "error_count": usage.error_count
493                    })
494                }).collect();
495
496                let response = serde_json::json!({
497                    "jsonrpc": "2.0",
498                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
499                    "result": {
500                        "providers": providers,
501                    }
502                });
503                Ok(Json(response))
504            }
505            "edb_metrics_history" => {
506                let metrics = state.proxy.metrics_collector;
507                let history = metrics.get_metrics_history();
508
509                let cache_history: Vec<serde_json::Value> = history.iter().map(|h| {
510                    serde_json::json!({
511                        "timestamp": h.timestamp,
512                        "cache_size": h.cache_size,
513                        "hit_rate": if h.cache_hits + h.cache_misses > 0 {
514                            (h.cache_hits as f64 / (h.cache_hits + h.cache_misses) as f64) * 100.0
515                        } else { 0.0 },
516                        "requests_per_minute": h.requests_per_minute
517                    })
518                }).collect();
519
520                let provider_history: Vec<serde_json::Value> = history
521                    .iter()
522                    .map(|h| {
523                        serde_json::json!({
524                            "timestamp": h.timestamp,
525                            "healthy_providers": h.healthy_providers,
526                            "total_providers": h.total_providers,
527                            "avg_response_time_ms": h.avg_response_time_ms
528                        })
529                    })
530                    .collect();
531
532                let response = serde_json::json!({
533                    "jsonrpc": "2.0",
534                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
535                    "result": {
536                        "cache_history": cache_history,
537                        "provider_history": provider_history
538                    }
539                });
540                Ok(Json(response))
541            }
542            "edb_request_metrics" => {
543                let metrics = state.proxy.metrics_collector;
544                let recent_methods: Vec<String> = metrics
545                    .get_method_stats()
546                    .iter()
547                    .filter(|(_, stats)| stats.total_requests > 0)
548                    .take(10)
549                    .map(|(method, _)| method.clone())
550                    .collect();
551
552                let response = serde_json::json!({
553                    "jsonrpc": "2.0",
554                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
555                    "result": {
556                        "requests_per_minute": metrics.requests_per_minute(),
557                        "active_requests": 0, // TODO: implement active request tracking
558                        "recent_methods": recent_methods,
559                        "error_rate": format!("{:.1}%", metrics.error_rate()),
560                        "peak_requests_per_minute": 0 // TODO: implement peak tracking
561                    }
562                });
563                Ok(Json(response))
564            }
565            "edb_deleteCache" => {
566                let params = request.get("params").unwrap_or(&serde_json::Value::Null);
567
568                let result = if params.is_array() && !params.as_array().unwrap().is_empty() {
569                    let arr = params.as_array().unwrap();
570                    let method = arr[0].as_str().unwrap_or("");
571
572                    if arr.len() == 1 {
573                        // Just method: delete all entries for this method
574                        match state.proxy.rpc_handler.cache_manager().delete_by_method(method).await
575                        {
576                            Ok(deleted_count) => {
577                                serde_json::json!({
578                                    "success": true,
579                                    "deleted_count": deleted_count,
580                                    "message": format!("Deleted {} entries for method '{}'", deleted_count, method)
581                                })
582                            }
583                            Err(e) => {
584                                warn!("Failed to delete cache entries: {}", e);
585                                serde_json::json!({
586                                    "success": false,
587                                    "error": format!("Failed to delete cache entries: {}", e)
588                                })
589                            }
590                        }
591                    } else {
592                        // Method + params: delete specific request
593                        let req = serde_json::json!({
594                            "method": method,
595                            "params": arr[1].clone()
596                        });
597                        let key = state.proxy.rpc_handler.generate_cache_key(&req);
598
599                        match state.proxy.rpc_handler.cache_manager().delete_by_key(&key).await {
600                            Ok(true) => {
601                                serde_json::json!({
602                                    "success": true,
603                                    "deleted_count": 1,
604                                    "message": "Deleted specific cache entry"
605                                })
606                            }
607                            Ok(false) => {
608                                serde_json::json!({
609                                    "success": true,
610                                    "deleted_count": 0,
611                                    "message": "Cache entry not found"
612                                })
613                            }
614                            Err(e) => {
615                                warn!("Failed to delete cache entry: {}", e);
616                                serde_json::json!({
617                                    "success": false,
618                                    "error": format!("Failed to delete cache entry: {}", e)
619                                })
620                            }
621                        }
622                    }
623                } else {
624                    serde_json::json!({
625                        "success": false,
626                        "error": "Invalid parameter format. Expected [method] or [method, params]"
627                    })
628                };
629
630                let response = serde_json::json!({
631                    "jsonrpc": "2.0",
632                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
633                    "result": result
634                });
635
636                Ok(Json(response))
637            }
638            "edb_shutdown" => {
639                info!("Shutdown request received");
640                let response = serde_json::json!({
641                    "jsonrpc": "2.0",
642                    "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
643                    "result": {
644                        "status": "shutting_down",
645                        "message": "Server shutdown initiated"
646                    }
647                });
648
649                // Send shutdown signal (non-blocking)
650                let _ = state.proxy.shutdown_tx.send(());
651
652                Ok(Json(response))
653            }
654            _ => {
655                // Forward to RPC handler
656                match state.proxy.rpc_handler.handle_request(request).await {
657                    Ok(response) => Ok(Json(response)),
658                    Err(e) => {
659                        warn!("RPC request failed: {}", e);
660                        Err(StatusCode::INTERNAL_SERVER_ERROR)
661                    }
662                }
663            }
664        }
665    } else {
666        warn!("Invalid RPC request: {}", request);
667        Err(StatusCode::BAD_REQUEST)
668    };
669
670    debug!("RPC response: {}", &format!("{response:?}").chars().take(200).collect::<String>());
671    response
672}