use sol_parser_sdk::core::now_micros;
use sol_parser_sdk::grpc::{
AccountFilter, ClientConfig, EventType, EventTypeFilter, OrderMode, Protocol,
TransactionFilter, YellowstoneGrpc,
};
use sol_parser_sdk::DexEvent;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = rustls::crypto::ring::default_provider().install_default();
println!("ð PumpFun Trade Event Filter Example");
println!("======================================\n");
run_example().await?;
Ok(())
}
async fn run_example() -> Result<(), Box<dyn std::error::Error>> {
let config = ClientConfig {
enable_metrics: true,
connection_timeout_ms: 10000,
request_timeout_ms: 30000,
enable_tls: true,
order_mode: OrderMode::Unordered, ..Default::default()
};
println!("ð Configuration:");
println!(" Order Mode: {:?} (ultra-low latency)", config.order_mode);
println!();
const GRPC_ENDPOINT: &str = "https://solana-yellowstone-grpc.publicnode.com:443";
const GRPC_AUTH_TOKEN: &str =
"cd1c3642f88c86f9f8e7f15831faf9f067b997c6ac2b72c81d115e8d071af77a";
let grpc = YellowstoneGrpc::new_with_config(
GRPC_ENDPOINT.to_string(),
Some(std::env::var("GRPC_AUTH_TOKEN").unwrap_or_else(|_| GRPC_AUTH_TOKEN.to_string())),
config,
)?;
println!("â
gRPC client created (parser pre-warmed)");
let protocols = vec![Protocol::PumpFun];
println!("ð Protocols: {:?}", protocols);
let transaction_filter = TransactionFilter::for_protocols(&protocols);
let account_filter = AccountFilter::for_protocols(&protocols);
let event_filter = EventTypeFilter::include_only(vec![
EventType::PumpFunBuy,
EventType::PumpFunSell,
EventType::PumpFunBuyExactSolIn,
EventType::PumpFunCreate,
]);
println!("ðŊ Event Filter: Buy, Sell, BuyExactSolIn, Create");
println!("ð§ Starting subscription...\n");
let queue = grpc
.subscribe_dex_events(vec![transaction_filter], vec![account_filter], Some(event_filter))
.await?;
let mut event_count = 0u64;
let mut buy_count = 0u64;
let mut sell_count = 0u64;
let mut buy_exact_count = 0u64;
let mut create_count = 0u64;
tokio::spawn(async move {
let mut spin_count = 0u32;
loop {
if let Some(event) = queue.pop() {
spin_count = 0;
event_count += 1;
let now_us = now_micros();
match &event {
DexEvent::PumpFunBuy(e) => {
buy_count += 1;
let latency_us = now_us - e.metadata.grpc_recv_us;
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ðĒ PumpFun BUY #{}", event_count);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Signature : {}", e.metadata.signature);
println!(
"â Slot : {} | TxIndex: {}",
e.metadata.slot, e.metadata.tx_index
);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Mint : {}", e.mint);
println!("â SOL Amount : {} lamports", e.sol_amount);
println!("â Token Amt : {}", e.token_amount);
println!("â User : {}", e.user);
println!("â ix_name : {}", e.ix_name);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ð Latency : {} Ξs", latency_us);
println!(
"â ð Stats : Buy={} Sell={} BuyExact={}",
buy_count, sell_count, buy_exact_count
);
println!(
"ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ\n"
);
}
DexEvent::PumpFunSell(e) => {
sell_count += 1;
let latency_us = now_us - e.metadata.grpc_recv_us;
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ðī PumpFun SELL #{}", event_count);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Signature : {}", e.metadata.signature);
println!(
"â Slot : {} | TxIndex: {}",
e.metadata.slot, e.metadata.tx_index
);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Mint : {}", e.mint);
println!("â SOL Amount : {} lamports", e.sol_amount);
println!("â Token Amt : {}", e.token_amount);
println!("â User : {}", e.user);
println!("â ix_name : {}", e.ix_name);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ð Latency : {} Ξs", latency_us);
println!(
"â ð Stats : Buy={} Sell={} BuyExact={}",
buy_count, sell_count, buy_exact_count
);
println!(
"ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ\n"
);
}
DexEvent::PumpFunBuyExactSolIn(e) => {
buy_exact_count += 1;
let latency_us = now_us - e.metadata.grpc_recv_us;
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ðĄ PumpFun BUY_EXACT_SOL_IN #{}", event_count);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Signature : {}", e.metadata.signature);
println!(
"â Slot : {} | TxIndex: {}",
e.metadata.slot, e.metadata.tx_index
);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Mint : {}", e.mint);
println!("â SOL Amount : {} lamports (exact input)", e.sol_amount);
println!("â Token Amt : {} (min output)", e.token_amount);
println!("â User : {}", e.user);
println!("â ix_name : {}", e.ix_name);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ð Latency : {} Ξs", latency_us);
println!(
"â ð Stats : Buy={} Sell={} BuyExact={}",
buy_count, sell_count, buy_exact_count
);
println!(
"ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ\n"
);
}
DexEvent::PumpFunTrade(e) => {
let _latency_us = now_us - e.metadata.grpc_recv_us;
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ⊠PumpFun TRADE (unknown type) #{}", event_count);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ix_name : {} (is_buy={})", e.ix_name, e.is_buy);
println!("â Signature : {}", e.metadata.signature);
println!(
"ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ\n"
);
}
DexEvent::PumpFunCreate(e) => {
create_count += 1;
let latency_us = now_us - e.metadata.grpc_recv_us;
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ð PumpFun CREATE #{}", event_count);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Signature : {}", e.metadata.signature);
println!(
"â Slot : {} | TxIndex: {}",
e.metadata.slot, e.metadata.tx_index
);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â Name : {}", e.name);
println!("â Symbol : {}", e.symbol);
println!("â Mint : {}", e.mint);
println!("â Creator : {}", e.creator);
println!("ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ");
println!("â ð Latency : {} Ξs", latency_us);
println!("â ð Creates : {}", create_count);
println!(
"ââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ\n"
);
}
_ => {}
}
} else {
spin_count += 1;
if spin_count < 1000 {
std::hint::spin_loop();
} else {
tokio::task::yield_now().await;
spin_count = 0;
}
}
}
});
let grpc_clone = grpc.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(600)).await;
println!("â° Auto-stopping after 10 minutes...");
grpc_clone.stop().await;
});
println!("ð Press Ctrl+C to stop...\n");
tokio::signal::ctrl_c().await?;
println!("\nð Shutting down gracefully...");
Ok(())
}