1use sol_parser_sdk::grpc::{
28 AccountFilter, ClientConfig, EventTypeFilter, TransactionFilter,
29 YellowstoneGrpc,
30};
31use std::sync::Arc;
32use tokio::net::TcpListener;
33use tracing::{error, info, warn};
34use anyhow::Result;
35
36mod config;
37mod ws_server;
38
39pub use config::Config;
40use ws_server::WsServer;
41
42pub struct ParserProxyServer {
44 config: Config,
45}
46
47impl ParserProxyServer {
48 pub fn new<P: AsRef<std::path::Path>>(config_path: P) -> Result<Self> {
62 let config = Config::load_or_default(config_path.as_ref().to_str().unwrap());
63
64 Ok(Self { config })
65 }
66
67 pub fn with_config(config: Config) -> Self {
73 Self { config }
74 }
75
76 pub async fn start(self) -> Result<()> {
99 self.init_tracing();
100
101 info!("ð Starting Parser Proxy WebSocket Server...");
102
103 let ws_server = Arc::new(WsServer::new());
104 let ws_server_clone = ws_server.clone();
105
106 let addr = format!("{}:{}", self.config.server.host, self.config.server.port);
108 let listener = TcpListener::bind(&addr).await?;
109 info!("ðĄ WebSocket server listening on: ws://{}", addr);
110
111 tokio::spawn(async move {
112 ws_server_clone.run(listener).await;
113 });
114
115 self.start_grpc_processing(ws_server).await?;
117
118 Ok(())
119 }
120
121 pub async fn run(self) -> Result<()> {
125 let server_task = tokio::spawn(async move {
126 if let Err(e) = self.start().await {
127 error!("Server error: {}", e);
128 }
129 });
130
131 info!("ð Press Ctrl+C to stop...");
132 tokio::signal::ctrl_c().await?;
133 info!("ð Shutting down gracefully...");
134
135 server_task.abort();
136 Ok(())
137 }
138
139 pub fn config(&self) -> &Config {
141 &self.config
142 }
143
144 pub fn ws_url(&self) -> String {
146 format!("ws://{}:{}", self.config.server.host, self.config.server.port)
147 }
148
149 fn init_tracing(&self) {
150 if std::env::var("RUST_LOG").is_err() {
151 std::env::set_var("RUST_LOG", "info");
152 }
153
154 #[cfg(feature = "binary")]
155 {
156 tracing_subscriber::fmt()
157 .with_env_filter(
158 tracing_subscriber::EnvFilter::try_from_default_env()
159 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
160 )
161 .init();
162 }
163
164 let _ = rustls::crypto::ring::default_provider().install_default();
165 }
166
167 async fn start_grpc_processing(
168 &self,
169 ws_server: Arc<WsServer>,
170 ) -> Result<()> {
171 let mut grpc_config = ClientConfig::default();
172 grpc_config.enable_metrics = self.config.grpc.enable_metrics;
173 grpc_config.connection_timeout_ms = self.config.grpc.connection_timeout_ms;
174 grpc_config.request_timeout_ms = self.config.grpc.request_timeout_ms;
175 grpc_config.enable_tls = self.config.grpc.enable_tls;
176
177 let grpc = YellowstoneGrpc::new_with_config(
178 self.config.grpc.endpoint.clone(),
179 self.config.grpc.token.clone(),
180 grpc_config,
181 )
182 .map_err(|e| anyhow::anyhow!("gRPC client creation failed: {}", e))?;
183
184 info!("â
gRPC client created successfully");
185
186 let protocols = self.config.get_enabled_protocols();
187 if protocols.is_empty() {
188 warn!("â ïļ No protocols enabled in config, server will receive no events");
189 return Ok(());
190 }
191
192 info!("ð Monitoring protocols: {:?}", protocols);
193
194 let transaction_filter = TransactionFilter::for_protocols(&protocols);
195 let account_filter = AccountFilter::for_protocols(&protocols);
196
197 let event_types = self.config.get_enabled_event_types();
198 if event_types.is_empty() {
199 warn!("â ïļ No event types enabled in config, server will receive no events");
200 return Ok(());
201 }
202
203 info!("ðŊ Monitoring event types: {:?}", event_types);
204 let event_filter = EventTypeFilter::include_only(event_types);
205
206 info!("ð§ Starting subscription...");
207
208 let queue = grpc
209 .subscribe_dex_events(
210 vec![transaction_filter],
211 vec![account_filter],
212 Some(event_filter),
213 )
214 .await
215 .map_err(|e| anyhow::anyhow!("Subscription failed: {}", e))?;
216
217 info!("â
Subscription established");
218
219 tokio::spawn(async move {
220 let mut spin_count = 0u32;
221 loop {
222 if let Some(event) = queue.pop() {
223 spin_count = 0;
224
225 let event_json = match serde_json::to_string(&event) {
226 Ok(json) => json,
227 Err(e) => {
228 error!("Failed to serialize event: {}", e);
229 continue;
230 }
231 };
232
233 ws_server.broadcast(&event_json).await;
234 } else {
235 spin_count += 1;
236 if spin_count < 1000 {
237 std::hint::spin_loop();
238 } else {
239 tokio::task::yield_now().await;
240 spin_count = 0;
241 }
242 }
243 }
244 });
245
246 loop {
248 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
249 }
250 }
251}
252
253pub async fn run_server<P: AsRef<std::path::Path>>(config_path: P) -> Result<()> {
271 let server = ParserProxyServer::new(config_path)?;
272 server.run().await
273}
274
275pub mod prelude {
277 pub use crate::{ParserProxyServer, Config, run_server};
278 pub use sol_parser_sdk::core::events::*;
279 pub use sol_parser_sdk::grpc::EventType;
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use std::fs;
286
287 #[test]
288 fn test_server_creation() {
289 let test_config = r#"
291[server]
292host = "127.0.0.1"
293port = 9001
294
295[grpc]
296endpoint = "https://test-endpoint.com"
297token = ""
298enable_metrics = false
299enable_tls = true
300connection_timeout_ms = 5000
301request_timeout_ms = 10000
302
303[protocols]
304pumpfun = true
305
306[events]
307pumpfun_trade = true
308"#;
309
310 fs::write("test_config.toml", test_config).unwrap();
311
312 let server = ParserProxyServer::new("test_config.toml");
313 assert!(server.is_ok());
314
315 let server = server.unwrap();
316 assert_eq!(server.config().server.host, "127.0.0.1");
317 assert_eq!(server.config().server.port, 9001);
318 assert_eq!(server.ws_url(), "ws://127.0.0.1:9001");
319
320 fs::remove_file("test_config.toml").unwrap();
322 }
323}