1use crate::{
20 cache::CacheManager,
21 health::HealthService,
22 metrics::MetricsCollector,
23 providers::{ProviderManager, DEFAULT_MAINNET_RPCS},
24 registry::EdbRegistry,
25 rpc::RpcHandler,
26};
27use axum::{
28 extract::State,
29 http::{Method, StatusCode},
30 response::Json,
31 routing::post,
32 Router,
33};
34use eyre::Result;
35use serde_json::Value;
36use std::{net::SocketAddr, path::PathBuf, sync::Arc};
37use tokio::{net::TcpListener, sync::broadcast};
38use tower_http::cors::{Any, CorsLayer};
39use tracing::{debug, info, warn};
40
41#[derive(Debug, Clone)]
43pub struct ProxyServerBuilder {
44 rpc_urls: Option<Vec<String>>,
45 max_cache_items: u32,
46 cache_dir: Option<PathBuf>,
47 grace_period: u64,
48 heartbeat_interval: u64,
49 max_failures: u32,
50 health_check_interval: u64,
51 cache_save_interval: u64,
52}
53
54impl Default for ProxyServerBuilder {
55 fn default() -> Self {
56 Self {
57 rpc_urls: None, max_cache_items: 1024000,
62 cache_dir: None, cache_save_interval: 5, max_failures: 3,
67 health_check_interval: 60,
68
69 grace_period: 0, heartbeat_interval: 10,
72 }
73 }
74}
75
76impl ProxyServerBuilder {
77 pub fn new() -> Self {
79 Self::default()
80 }
81
82 #[allow(dead_code)]
84 pub fn rpc_urls<T: Into<Vec<String>>>(mut self, urls: T) -> Self {
85 self.rpc_urls = Some(urls.into());
86 self
87 }
88
89 pub fn rpc_urls_str(mut self, urls: &str) -> Self {
91 self.rpc_urls =
92 Some(urls.split(',').map(|s| s.trim().to_string()).filter(|s| !s.is_empty()).collect());
93 self
94 }
95
96 pub fn max_cache_items(mut self, max_items: u32) -> Self {
98 self.max_cache_items = max_items;
99 self
100 }
101
102 pub fn cache_dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
104 self.cache_dir = Some(dir.into());
105 self
106 }
107
108 pub fn grace_period(mut self, seconds: u64) -> Self {
110 self.grace_period = seconds;
111 self
112 }
113
114 pub fn heartbeat_interval(mut self, seconds: u64) -> Self {
116 self.heartbeat_interval = seconds;
117 self
118 }
119
120 pub fn max_failures(mut self, failures: u32) -> Self {
122 self.max_failures = failures;
123 self
124 }
125
126 pub fn health_check_interval(mut self, seconds: u64) -> Self {
128 self.health_check_interval = seconds;
129 self
130 }
131
132 pub fn cache_save_interval(mut self, minutes: u64) -> Self {
134 self.cache_save_interval = minutes;
135 self
136 }
137
138 pub async fn build(self) -> Result<ProxyServer> {
140 let rpc_urls = self
142 .rpc_urls
143 .unwrap_or_else(|| DEFAULT_MAINNET_RPCS.iter().map(|s| s.to_string()).collect());
144
145 let cache_path = CacheManager::get_cache_path(&rpc_urls, self.cache_dir).await?;
147
148 ProxyServer::new(
150 rpc_urls,
151 self.max_cache_items,
152 cache_path,
153 self.grace_period,
154 self.heartbeat_interval,
155 self.max_failures,
156 self.health_check_interval,
157 self.cache_save_interval,
158 )
159 .await
160 }
161}
162
163#[derive(Clone)]
183pub struct ProxyServer {
184 pub rpc_handler: Arc<RpcHandler>,
186 pub registry: Arc<EdbRegistry>,
188 pub health_service: Arc<HealthService>,
190 pub metrics_collector: Arc<MetricsCollector>,
192 shutdown_tx: broadcast::Sender<()>,
194}
195
196#[derive(Clone)]
197struct AppState {
198 proxy: ProxyServer,
199}
200
201impl ProxyServer {
202 #[allow(clippy::too_many_arguments)]
220 async fn new(
221 rpc_urls: Vec<String>,
222 max_cache_items: u32,
223 cache_path: PathBuf,
224 grace_period: u64,
225 heartbeat_interval: u64,
226 max_failures: u32,
227 health_check_interval: u64,
228 cache_save_interval: u64,
229 ) -> Result<Self> {
230 info!("Starting EDB RPC Proxy with {} providers", rpc_urls.len());
231 for url in &rpc_urls {
232 info!(" - {}", url);
233 }
234
235 let cache_manager = Arc::new(CacheManager::new(max_cache_items, cache_path)?);
236 let metrics_collector = Arc::new(MetricsCollector::new());
237
238 let provider_manager = Arc::new(ProviderManager::new(rpc_urls, max_failures).await?);
240
241 let rpc_handler = Arc::new(RpcHandler::new(
243 provider_manager.clone(),
244 cache_manager.clone(),
245 metrics_collector.clone(),
246 )?);
247 let health_service = Arc::new(HealthService::new());
248 let (shutdown_tx, _) = broadcast::channel(1);
249
250 let registry =
252 Arc::new(EdbRegistry::new(grace_period, heartbeat_interval, shutdown_tx.clone()));
253
254 if grace_period > 0 {
256 let registry_clone = Arc::clone(®istry);
257 tokio::spawn(async move {
258 registry_clone.start_heartbeat_monitor().await;
259 });
260 }
261
262 let provider_manager_clone = provider_manager.clone();
264 tokio::spawn(async move {
265 let mut interval =
266 tokio::time::interval(std::time::Duration::from_secs(health_check_interval));
267 loop {
268 interval.tick().await;
269 provider_manager_clone.health_check_all().await;
270 }
271 });
272
273 if cache_save_interval > 0 {
275 let cache_manager_clone = cache_manager.clone();
276 tokio::spawn(async move {
277 let mut interval =
278 tokio::time::interval(std::time::Duration::from_secs(cache_save_interval * 60));
279 loop {
280 interval.tick().await;
281 if let Err(e) = cache_manager_clone.save_to_disk().await {
282 warn!("Failed to save cache periodically: {}", e);
283 } else {
284 debug!("Cache saved to disk (periodic save)");
285 }
286 }
287 });
288 }
289
290 let metrics_collector_clone = metrics_collector.clone();
292 let cache_manager_clone = cache_manager.clone();
293 let provider_manager_clone = provider_manager;
294 let registry_clone = registry.clone();
295 tokio::spawn(async move {
296 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
297 loop {
298 interval.tick().await;
299
300 let cache_stats = cache_manager_clone.detailed_stats().await;
302 let providers_info = provider_manager_clone.get_providers_info().await;
303 let healthy_providers =
304 providers_info.iter().filter(|p| p.is_healthy).count() as u64;
305 let total_providers = providers_info.len() as u64;
306 let active_instances = registry_clone.get_active_instances().await.len();
307
308 let total_entries =
309 cache_stats.get("total_entries").and_then(|v| v.as_u64()).unwrap_or(0);
310 metrics_collector_clone.add_historical_point(
311 total_entries,
312 healthy_providers,
313 total_providers,
314 active_instances,
315 );
316 }
317 });
318
319 Ok(Self { rpc_handler, registry, health_service, metrics_collector, shutdown_tx })
320 }
321
322 pub fn cache_manager(&self) -> &Arc<CacheManager> {
327 self.rpc_handler.cache_manager()
328 }
329
330 pub async fn serve(self, addr: SocketAddr) -> Result<()> {
342 let mut shutdown_rx = self.shutdown_tx.subscribe();
343 let cache_manager_for_shutdown = self.cache_manager().clone();
344
345 let app = Router::new()
346 .route("/", post(handle_rpc))
347 .layer(
348 CorsLayer::new()
349 .allow_methods([Method::POST, Method::GET])
350 .allow_headers(Any)
351 .allow_origin(Any),
352 )
353 .with_state(AppState { proxy: self });
354
355 let listener = TcpListener::bind(addr).await?;
356 info!("EDB RPC Proxy listening on {}", addr);
357
358 let server = axum::serve(listener, app).with_graceful_shutdown(async move {
360 let _ = shutdown_rx.recv().await;
361 info!("Shutdown signal received, saving cache and stopping server gracefully");
362
363 if let Err(e) = cache_manager_for_shutdown.save_to_disk().await {
365 warn!("Failed to save cache during shutdown: {}", e);
366 } else {
367 info!("Cache saved successfully during shutdown");
368 }
369 });
370
371 server.await?;
372
373 Ok(())
374 }
375}
376
377async fn handle_rpc(
378 State(state): State<AppState>,
379 Json(request): Json<Value>,
380) -> Result<Json<Value>, StatusCode> {
381 debug!("Received RPC request: {}", request);
383 let response = if let Some(method) = request.get("method").and_then(|m| m.as_str()) {
384 match method {
385 "edb_ping" => {
386 let response = state.proxy.health_service.ping().await;
387 Ok(Json(response))
388 }
389 "edb_info" => {
390 let response = state.proxy.health_service.info().await;
391 Ok(Json(response))
392 }
393 "edb_register" => {
394 if let Some(params) = request.get("params").and_then(|p| p.as_array()) {
395 if let (Some(pid), Some(timestamp)) = (
396 params.first().and_then(|v| v.as_u64()),
397 params.get(1).and_then(|v| v.as_u64()),
398 ) {
399 let response =
400 state.proxy.registry.register_edb_instance(pid as u32, timestamp).await;
401 Ok(Json(response))
402 } else {
403 Err(StatusCode::BAD_REQUEST)
404 }
405 } else {
406 Err(StatusCode::BAD_REQUEST)
407 }
408 }
409 "edb_heartbeat" => {
410 if let Some(params) = request.get("params").and_then(|p| p.as_array()) {
411 if let Some(pid) = params.first().and_then(|v| v.as_u64()) {
412 let response = state.proxy.registry.heartbeat(pid as u32).await;
413 Ok(Json(response))
414 } else {
415 Err(StatusCode::BAD_REQUEST)
416 }
417 } else {
418 Err(StatusCode::BAD_REQUEST)
419 }
420 }
421 "edb_cache_stats" => {
422 let stats = state.proxy.cache_manager().detailed_stats().await;
423 let response = serde_json::json!({
424 "jsonrpc": "2.0",
425 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
426 "result": stats
427 });
428 Ok(Json(response))
429 }
430 "edb_active_instances" => {
431 let active_pids = state.proxy.registry.get_active_instances().await;
432 let response = serde_json::json!({
433 "jsonrpc": "2.0",
434 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
435 "result": {
436 "active_instances": active_pids,
437 "count": active_pids.len()
438 }
439 });
440 Ok(Json(response))
441 }
442 "edb_providers" => {
443 let providers_info =
444 state.proxy.rpc_handler.provider_manager().get_providers_info().await;
445 let healthy_count =
446 state.proxy.rpc_handler.provider_manager().healthy_provider_count().await;
447 let response = serde_json::json!({
448 "jsonrpc": "2.0",
449 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
450 "result": {
451 "providers": providers_info,
452 "healthy_count": healthy_count,
453 "total_count": providers_info.len()
454 }
455 });
456 Ok(Json(response))
457 }
458 "edb_cache_metrics" => {
459 let metrics = state.proxy.metrics_collector;
460 let method_stats = metrics.get_method_stats();
461 let response = serde_json::json!({
462 "jsonrpc": "2.0",
463 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
464 "result": {
465 "total_requests": metrics.total_requests.load(std::sync::atomic::Ordering::Relaxed),
466 "cache_hits": metrics.cache_hits.load(std::sync::atomic::Ordering::Relaxed),
467 "cache_misses": metrics.cache_misses.load(std::sync::atomic::Ordering::Relaxed),
468 "hit_rate": format!("{:.1}%", metrics.cache_hit_rate()),
469 "error_rate": format!("{:.1}%", metrics.error_rate()),
470 "method_stats": method_stats,
471 "total_errors": metrics.total_errors.load(std::sync::atomic::Ordering::Relaxed),
472 "rate_limit_errors": metrics.rate_limit_errors.load(std::sync::atomic::Ordering::Relaxed),
473 "user_errors": metrics.user_errors.load(std::sync::atomic::Ordering::Relaxed)
474 }
475 });
476 Ok(Json(response))
477 }
478 "edb_provider_metrics" => {
479 let metrics = state.proxy.metrics_collector;
480 let provider_usage = metrics.get_provider_usage();
481 let total_requests_to_providers =
482 metrics.cache_misses.load(std::sync::atomic::Ordering::Relaxed);
483
484 let providers: Vec<serde_json::Value> = provider_usage.iter().map(|(url, usage)| {
485 serde_json::json!({
486 "url": url,
487 "request_count": usage.request_count,
488 "success_rate": format!("{:.1}%", usage.success_rate()),
489 "avg_response_time_ms": usage.avg_response_time_ms(),
490 "load_percentage": format!("{:.1}%", usage.load_percentage(total_requests_to_providers)),
491 "last_used_timestamp": usage.last_used_timestamp,
492 "error_count": usage.error_count
493 })
494 }).collect();
495
496 let response = serde_json::json!({
497 "jsonrpc": "2.0",
498 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
499 "result": {
500 "providers": providers,
501 }
502 });
503 Ok(Json(response))
504 }
505 "edb_metrics_history" => {
506 let metrics = state.proxy.metrics_collector;
507 let history = metrics.get_metrics_history();
508
509 let cache_history: Vec<serde_json::Value> = history.iter().map(|h| {
510 serde_json::json!({
511 "timestamp": h.timestamp,
512 "cache_size": h.cache_size,
513 "hit_rate": if h.cache_hits + h.cache_misses > 0 {
514 (h.cache_hits as f64 / (h.cache_hits + h.cache_misses) as f64) * 100.0
515 } else { 0.0 },
516 "requests_per_minute": h.requests_per_minute
517 })
518 }).collect();
519
520 let provider_history: Vec<serde_json::Value> = history
521 .iter()
522 .map(|h| {
523 serde_json::json!({
524 "timestamp": h.timestamp,
525 "healthy_providers": h.healthy_providers,
526 "total_providers": h.total_providers,
527 "avg_response_time_ms": h.avg_response_time_ms
528 })
529 })
530 .collect();
531
532 let response = serde_json::json!({
533 "jsonrpc": "2.0",
534 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
535 "result": {
536 "cache_history": cache_history,
537 "provider_history": provider_history
538 }
539 });
540 Ok(Json(response))
541 }
542 "edb_request_metrics" => {
543 let metrics = state.proxy.metrics_collector;
544 let recent_methods: Vec<String> = metrics
545 .get_method_stats()
546 .iter()
547 .filter(|(_, stats)| stats.total_requests > 0)
548 .take(10)
549 .map(|(method, _)| method.clone())
550 .collect();
551
552 let response = serde_json::json!({
553 "jsonrpc": "2.0",
554 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
555 "result": {
556 "requests_per_minute": metrics.requests_per_minute(),
557 "active_requests": 0, "recent_methods": recent_methods,
559 "error_rate": format!("{:.1}%", metrics.error_rate()),
560 "peak_requests_per_minute": 0 }
562 });
563 Ok(Json(response))
564 }
565 "edb_deleteCache" => {
566 let params = request.get("params").unwrap_or(&serde_json::Value::Null);
567
568 let result = if params.is_array() && !params.as_array().unwrap().is_empty() {
569 let arr = params.as_array().unwrap();
570 let method = arr[0].as_str().unwrap_or("");
571
572 if arr.len() == 1 {
573 match state.proxy.rpc_handler.cache_manager().delete_by_method(method).await
575 {
576 Ok(deleted_count) => {
577 serde_json::json!({
578 "success": true,
579 "deleted_count": deleted_count,
580 "message": format!("Deleted {} entries for method '{}'", deleted_count, method)
581 })
582 }
583 Err(e) => {
584 warn!("Failed to delete cache entries: {}", e);
585 serde_json::json!({
586 "success": false,
587 "error": format!("Failed to delete cache entries: {}", e)
588 })
589 }
590 }
591 } else {
592 let req = serde_json::json!({
594 "method": method,
595 "params": arr[1].clone()
596 });
597 let key = state.proxy.rpc_handler.generate_cache_key(&req);
598
599 match state.proxy.rpc_handler.cache_manager().delete_by_key(&key).await {
600 Ok(true) => {
601 serde_json::json!({
602 "success": true,
603 "deleted_count": 1,
604 "message": "Deleted specific cache entry"
605 })
606 }
607 Ok(false) => {
608 serde_json::json!({
609 "success": true,
610 "deleted_count": 0,
611 "message": "Cache entry not found"
612 })
613 }
614 Err(e) => {
615 warn!("Failed to delete cache entry: {}", e);
616 serde_json::json!({
617 "success": false,
618 "error": format!("Failed to delete cache entry: {}", e)
619 })
620 }
621 }
622 }
623 } else {
624 serde_json::json!({
625 "success": false,
626 "error": "Invalid parameter format. Expected [method] or [method, params]"
627 })
628 };
629
630 let response = serde_json::json!({
631 "jsonrpc": "2.0",
632 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
633 "result": result
634 });
635
636 Ok(Json(response))
637 }
638 "edb_shutdown" => {
639 info!("Shutdown request received");
640 let response = serde_json::json!({
641 "jsonrpc": "2.0",
642 "id": request.get("id").unwrap_or(&serde_json::Value::from(1)),
643 "result": {
644 "status": "shutting_down",
645 "message": "Server shutdown initiated"
646 }
647 });
648
649 let _ = state.proxy.shutdown_tx.send(());
651
652 Ok(Json(response))
653 }
654 _ => {
655 match state.proxy.rpc_handler.handle_request(request).await {
657 Ok(response) => Ok(Json(response)),
658 Err(e) => {
659 warn!("RPC request failed: {}", e);
660 Err(StatusCode::INTERNAL_SERVER_ERROR)
661 }
662 }
663 }
664 }
665 } else {
666 warn!("Invalid RPC request: {}", request);
667 Err(StatusCode::BAD_REQUEST)
668 };
669
670 debug!("RPC response: {}", &format!("{response:?}").chars().take(200).collect::<String>());
671 response
672}