use sol_parser_sdk::grpc::{
AccountFilter, ClientConfig, EventTypeFilter, TransactionFilter,
YellowstoneGrpc,
};
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing::{error, info, warn};
use anyhow::Result;
mod config;
mod ws_server;
pub use config::Config;
use ws_server::WsServer;
pub struct ParserProxyServer {
config: Config,
}
impl ParserProxyServer {
pub fn new<P: AsRef<std::path::Path>>(config_path: P) -> Result<Self> {
let config = Config::load_or_default(config_path.as_ref().to_str().unwrap());
Ok(Self { config })
}
pub fn with_config(config: Config) -> Self {
Self { config }
}
pub async fn start(self) -> Result<()> {
self.init_tracing();
info!("ð Starting Parser Proxy WebSocket Server...");
let ws_server = Arc::new(WsServer::new());
let ws_server_clone = ws_server.clone();
let addr = format!("{}:{}", self.config.server.host, self.config.server.port);
let listener = TcpListener::bind(&addr).await?;
info!("ðĄ WebSocket server listening on: ws://{}", addr);
tokio::spawn(async move {
ws_server_clone.run(listener).await;
});
self.start_grpc_processing(ws_server).await?;
Ok(())
}
pub async fn run(self) -> Result<()> {
let server_task = tokio::spawn(async move {
if let Err(e) = self.start().await {
error!("Server error: {}", e);
}
});
info!("ð Press Ctrl+C to stop...");
tokio::signal::ctrl_c().await?;
info!("ð Shutting down gracefully...");
server_task.abort();
Ok(())
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn ws_url(&self) -> String {
format!("ws://{}:{}", self.config.server.host, self.config.server.port)
}
fn init_tracing(&self) {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
#[cfg(feature = "binary")]
{
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
}
let _ = rustls::crypto::ring::default_provider().install_default();
}
async fn start_grpc_processing(
&self,
ws_server: Arc<WsServer>,
) -> Result<()> {
let mut grpc_config = ClientConfig::default();
grpc_config.enable_metrics = self.config.grpc.enable_metrics;
grpc_config.connection_timeout_ms = self.config.grpc.connection_timeout_ms;
grpc_config.request_timeout_ms = self.config.grpc.request_timeout_ms;
grpc_config.enable_tls = self.config.grpc.enable_tls;
let grpc = YellowstoneGrpc::new_with_config(
self.config.grpc.endpoint.clone(),
self.config.grpc.token.clone(),
grpc_config,
)
.map_err(|e| anyhow::anyhow!("gRPC client creation failed: {}", e))?;
info!("â
gRPC client created successfully");
let protocols = self.config.get_enabled_protocols();
if protocols.is_empty() {
warn!("â ïļ No protocols enabled in config, server will receive no events");
return Ok(());
}
info!("ð Monitoring protocols: {:?}", protocols);
let transaction_filter = TransactionFilter::for_protocols(&protocols);
let account_filter = AccountFilter::for_protocols(&protocols);
let event_types = self.config.get_enabled_event_types();
if event_types.is_empty() {
warn!("â ïļ No event types enabled in config, server will receive no events");
return Ok(());
}
info!("ðŊ Monitoring event types: {:?}", event_types);
let event_filter = EventTypeFilter::include_only(event_types);
info!("ð§ Starting subscription...");
let queue = grpc
.subscribe_dex_events(
vec![transaction_filter],
vec![account_filter],
Some(event_filter),
)
.await
.map_err(|e| anyhow::anyhow!("Subscription failed: {}", e))?;
info!("â
Subscription established");
tokio::spawn(async move {
let mut spin_count = 0u32;
loop {
if let Some(event) = queue.pop() {
spin_count = 0;
let event_json = match serde_json::to_string(&event) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize event: {}", e);
continue;
}
};
ws_server.broadcast(&event_json).await;
} else {
spin_count += 1;
if spin_count < 1000 {
std::hint::spin_loop();
} else {
tokio::task::yield_now().await;
spin_count = 0;
}
}
}
});
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
pub async fn run_server<P: AsRef<std::path::Path>>(config_path: P) -> Result<()> {
let server = ParserProxyServer::new(config_path)?;
server.run().await
}
pub mod prelude {
pub use crate::{ParserProxyServer, Config, run_server};
pub use sol_parser_sdk::core::events::*;
pub use sol_parser_sdk::grpc::EventType;
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
#[test]
fn test_server_creation() {
let test_config = r#"
[server]
host = "127.0.0.1"
port = 9001
[grpc]
endpoint = "https://test-endpoint.com"
token = ""
enable_metrics = false
enable_tls = true
connection_timeout_ms = 5000
request_timeout_ms = 10000
[protocols]
pumpfun = true
[events]
pumpfun_trade = true
"#;
fs::write("test_config.toml", test_config).unwrap();
let server = ParserProxyServer::new("test_config.toml");
assert!(server.is_ok());
let server = server.unwrap();
assert_eq!(server.config().server.host, "127.0.0.1");
assert_eq!(server.config().server.port, 9001);
assert_eq!(server.ws_url(), "ws://127.0.0.1:9001");
fs::remove_file("test_config.toml").unwrap();
}
}