parser_proxy_ws/
lib.rs

1//! # Parser Proxy WebSocket Library
2//!
3//! A high-performance Solana DEX event parsing and WebSocket streaming library.
4//!
5//! ## Features
6//!
7//! - **Ultra-fast parsing**: 10-20Ξs latency for real-time trading applications
8//! - **Multi-DEX support**: PumpFun, PumpSwap, Raydium, Orca, Meteora, and more
9//! - **WebSocket streaming**: Real-time event broadcasting to multiple clients
10//! - **Configurable filtering**: Fine-grained control over which events to monitor
11//! - **Easy integration**: Simple API with just a config file path
12//!
13//! ## Usage
14//!
15//! ```rust
16//! use parser_proxy_ws::ParserProxyServer;
17//!
18//! #[tokio::main]
19//! async fn main() -> anyhow::Result<()> {
20//!     // Start the parser proxy server with a config file
21//!     let server = ParserProxyServer::new("config.toml")?;
22//!     server.start().await?;
23//!     Ok(())
24//! }
25//! ```
26
27use sol_parser_sdk::grpc::{
28    AccountFilter, ClientConfig, EventTypeFilter, TransactionFilter,
29    YellowstoneGrpc,
30};
31use std::sync::Arc;
32use tokio::net::TcpListener;
33use tracing::{error, info, warn};
34use anyhow::Result;
35
36mod config;
37mod ws_server;
38
39pub use config::Config;
40use ws_server::WsServer;
41
42/// The main parser proxy server that handles gRPC subscriptions and WebSocket broadcasting
43pub struct ParserProxyServer {
44    config: Config,
45}
46
47impl ParserProxyServer {
48    /// Create a new parser proxy server with the specified config file path
49    ///
50    /// # Arguments
51    ///
52    /// * `config_path` - Path to the TOML configuration file
53    ///
54    /// # Examples
55    ///
56    /// ```rust
57    /// use parser_proxy_ws::ParserProxyServer;
58    ///
59    /// let server = ParserProxyServer::new("config.toml")?;
60    /// ```
61    pub fn new<P: AsRef<std::path::Path>>(config_path: P) -> Result<Self> {
62        let config = Config::load_or_default(config_path.as_ref().to_str().unwrap());
63
64        Ok(Self { config })
65    }
66
67    /// Create a new parser proxy server with an existing config
68    ///
69    /// # Arguments
70    ///
71    /// * `config` - Pre-loaded configuration
72    pub fn with_config(config: Config) -> Self {
73        Self { config }
74    }
75
76    /// Start the parser proxy server
77    ///
78    /// This method will:
79    /// 1. Initialize the WebSocket server
80    /// 2. Connect to the Yellowstone gRPC endpoint
81    /// 3. Subscribe to configured DEX events
82    /// 4. Start broadcasting events to WebSocket clients
83    ///
84    /// The method will run indefinitely until interrupted.
85    ///
86    /// # Examples
87    ///
88    /// ```rust
89    /// use parser_proxy_ws::ParserProxyServer;
90    ///
91    /// #[tokio::main]
92    /// async fn main() -> anyhow::Result<()> {
93    ///     let server = ParserProxyServer::new("config.toml")?;
94    ///     server.start().await?;
95    ///     Ok(())
96    /// }
97    /// ```
98    pub async fn start(self) -> Result<()> {
99        self.init_tracing();
100
101        info!("🚀 Starting Parser Proxy WebSocket Server...");
102
103        let ws_server = Arc::new(WsServer::new());
104        let ws_server_clone = ws_server.clone();
105
106        // Start WebSocket server
107        let addr = format!("{}:{}", self.config.server.host, self.config.server.port);
108        let listener = TcpListener::bind(&addr).await?;
109        info!("ðŸ“Ą WebSocket server listening on: ws://{}", addr);
110
111        tokio::spawn(async move {
112            ws_server_clone.run(listener).await;
113        });
114
115        // Start gRPC client and event processing
116        self.start_grpc_processing(ws_server).await?;
117
118        Ok(())
119    }
120
121    /// Start the parser proxy server and wait for shutdown signal
122    ///
123    /// This is a convenience method that calls `start()` and waits for Ctrl+C.
124    pub async fn run(self) -> Result<()> {
125        let server_task = tokio::spawn(async move {
126            if let Err(e) = self.start().await {
127                error!("Server error: {}", e);
128            }
129        });
130
131        info!("🛑 Press Ctrl+C to stop...");
132        tokio::signal::ctrl_c().await?;
133        info!("👋 Shutting down gracefully...");
134
135        server_task.abort();
136        Ok(())
137    }
138
139    /// Get the server configuration
140    pub fn config(&self) -> &Config {
141        &self.config
142    }
143
144    /// Get the WebSocket URL that clients can connect to
145    pub fn ws_url(&self) -> String {
146        format!("ws://{}:{}", self.config.server.host, self.config.server.port)
147    }
148
149    fn init_tracing(&self) {
150        if std::env::var("RUST_LOG").is_err() {
151            std::env::set_var("RUST_LOG", "info");
152        }
153
154        #[cfg(feature = "binary")]
155        {
156            tracing_subscriber::fmt()
157                .with_env_filter(
158                    tracing_subscriber::EnvFilter::try_from_default_env()
159                        .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
160                )
161                .init();
162        }
163
164        let _ = rustls::crypto::ring::default_provider().install_default();
165    }
166
167    async fn start_grpc_processing(
168        &self,
169        ws_server: Arc<WsServer>,
170    ) -> Result<()> {
171        let mut grpc_config = ClientConfig::default();
172        grpc_config.enable_metrics = self.config.grpc.enable_metrics;
173        grpc_config.connection_timeout_ms = self.config.grpc.connection_timeout_ms;
174        grpc_config.request_timeout_ms = self.config.grpc.request_timeout_ms;
175        grpc_config.enable_tls = self.config.grpc.enable_tls;
176
177        let grpc = YellowstoneGrpc::new_with_config(
178            self.config.grpc.endpoint.clone(),
179            self.config.grpc.token.clone(),
180            grpc_config,
181        )
182        .map_err(|e| anyhow::anyhow!("gRPC client creation failed: {}", e))?;
183
184        info!("✅ gRPC client created successfully");
185
186        let protocols = self.config.get_enabled_protocols();
187        if protocols.is_empty() {
188            warn!("⚠ïļ  No protocols enabled in config, server will receive no events");
189            return Ok(());
190        }
191
192        info!("📊 Monitoring protocols: {:?}", protocols);
193
194        let transaction_filter = TransactionFilter::for_protocols(&protocols);
195        let account_filter = AccountFilter::for_protocols(&protocols);
196
197        let event_types = self.config.get_enabled_event_types();
198        if event_types.is_empty() {
199            warn!("⚠ïļ  No event types enabled in config, server will receive no events");
200            return Ok(());
201        }
202
203        info!("ðŸŽŊ Monitoring event types: {:?}", event_types);
204        let event_filter = EventTypeFilter::include_only(event_types);
205
206        info!("🎧 Starting subscription...");
207
208        let queue = grpc
209            .subscribe_dex_events(
210                vec![transaction_filter],
211                vec![account_filter],
212                Some(event_filter),
213            )
214            .await
215            .map_err(|e| anyhow::anyhow!("Subscription failed: {}", e))?;
216
217        info!("✅ Subscription established");
218
219        tokio::spawn(async move {
220            let mut spin_count = 0u32;
221            loop {
222                if let Some(event) = queue.pop() {
223                    spin_count = 0;
224
225                    let event_json = match serde_json::to_string(&event) {
226                        Ok(json) => json,
227                        Err(e) => {
228                            error!("Failed to serialize event: {}", e);
229                            continue;
230                        }
231                    };
232
233                    ws_server.broadcast(&event_json).await;
234                } else {
235                    spin_count += 1;
236                    if spin_count < 1000 {
237                        std::hint::spin_loop();
238                    } else {
239                        tokio::task::yield_now().await;
240                        spin_count = 0;
241                    }
242                }
243            }
244        });
245
246        // Keep the main task alive
247        loop {
248            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
249        }
250    }
251}
252
253/// Convenience function to start a parser proxy server with default settings
254///
255/// # Arguments
256///
257/// * `config_path` - Path to the TOML configuration file
258///
259/// # Examples
260///
261/// ```rust
262/// use parser_proxy_ws::run_server;
263///
264/// #[tokio::main]
265/// async fn main() -> anyhow::Result<()> {
266///     run_server("config.toml").await?;
267///     Ok(())
268/// }
269/// ```
270pub async fn run_server<P: AsRef<std::path::Path>>(config_path: P) -> Result<()> {
271    let server = ParserProxyServer::new(config_path)?;
272    server.run().await
273}
274
275/// Re-export commonly used types for convenience
276pub mod prelude {
277    pub use crate::{ParserProxyServer, Config, run_server};
278    pub use sol_parser_sdk::core::events::*;
279    pub use sol_parser_sdk::grpc::EventType;
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use std::fs;
286
287    #[test]
288    fn test_server_creation() {
289        // Create a minimal test config
290        let test_config = r#"
291[server]
292host = "127.0.0.1"
293port = 9001
294
295[grpc]
296endpoint = "https://test-endpoint.com"
297token = ""
298enable_metrics = false
299enable_tls = true
300connection_timeout_ms = 5000
301request_timeout_ms = 10000
302
303[protocols]
304pumpfun = true
305
306[events]
307pumpfun_trade = true
308"#;
309
310        fs::write("test_config.toml", test_config).unwrap();
311
312        let server = ParserProxyServer::new("test_config.toml");
313        assert!(server.is_ok());
314
315        let server = server.unwrap();
316        assert_eq!(server.config().server.host, "127.0.0.1");
317        assert_eq!(server.config().server.port, 9001);
318        assert_eq!(server.ws_url(), "ws://127.0.0.1:9001");
319
320        // Cleanup
321        fs::remove_file("test_config.toml").unwrap();
322    }
323}