1use super::types::*;
2use crate::DexEvent;
3use yellowstone_grpc_client::GeyserGrpcClient;
4use yellowstone_grpc_proto::prelude::*;
5use solana_sdk::pubkey::Pubkey;
6use std::collections::HashMap;
7use futures::StreamExt;
8use log::error;
9use tonic::transport::ClientTlsConfig;
10use crossbeam_queue::ArrayQueue;
11use memchr::memmem;
12use std::sync::Arc;
13use once_cell::sync::Lazy;
14
15static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> = Lazy::new(|| memmem::Finder::new(b"Program data: "));
16
17
18#[derive(Clone)]
19pub struct YellowstoneGrpc {
20 endpoint: String,
21 token: Option<String>,
22 config: ClientConfig,
23}
24
25impl YellowstoneGrpc {
26 pub fn new(endpoint: String, token: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
27 Ok(Self {
28 endpoint,
29 token,
30 config: ClientConfig::default(),
31 })
32 }
33
34 pub fn new_with_config(
35 endpoint: String,
36 token: Option<String>,
37 config: ClientConfig,
38 ) -> Result<Self, Box<dyn std::error::Error>> {
39 Ok(Self {
40 endpoint,
41 token,
42 config,
43 })
44 }
45
46 pub async fn subscribe_dex_events(
48 &self,
49 transaction_filters: Vec<TransactionFilter>,
50 account_filters: Vec<AccountFilter>,
51 event_type_filter: Option<EventTypeFilter>,
52 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
53 let queue = Arc::new(ArrayQueue::new(100_000));
54 let queue_clone = Arc::clone(&queue);
55
56 let self_clone = self.clone();
57 tokio::spawn(async move {
58 let _ = self_clone.stream_to_queue(
59 transaction_filters,
60 account_filters,
61 event_type_filter,
62 queue_clone,
63 ).await;
64 });
65
66 Ok(queue)
67 }
68
69 pub async fn stop(&self) {
70 println!("🛑 Stopping gRPC subscription...");
71 }
72 async fn stream_to_queue(
73 &self,
74 transaction_filters: Vec<TransactionFilter>,
75 account_filters: Vec<AccountFilter>,
76 event_type_filter: Option<EventTypeFilter>,
77 queue: Arc<ArrayQueue<DexEvent>>,
78 ) -> Result<(), Box<dyn std::error::Error>> {
79 println!("🚀 Starting Zero-Copy DEX event subscription...");
80
81 let _ = rustls::crypto::ring::default_provider().install_default();
82
83 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())?
84 .x_token(self.token.clone())?
85 .max_decoding_message_size(1024 * 1024 * 1024);
86
87 if self.config.connection_timeout_ms > 0 {
88 builder = builder.connect_timeout(std::time::Duration::from_millis(self.config.connection_timeout_ms));
89 }
90
91 if self.config.enable_tls {
93 let tls_config = ClientTlsConfig::new().with_native_roots();
94 builder = builder.tls_config(tls_config)?;
95 }
96
97 println!("🔗 Connecting to gRPC endpoint: {}", self.endpoint);
98 println!("⏱️ Connection timeout: {}ms", self.config.connection_timeout_ms);
99
100 let mut client = match builder.connect().await {
101 Ok(c) => {
102 println!("✅ Connection established");
103 c
104 },
105 Err(e) => {
106 println!("❌ Connection failed: {:?}", e);
107 return Err(e.into());
108 }
109 };
110 println!("✅ Connected to Yellowstone gRPC");
111
112 println!("📝 Building subscription filters...");
113 let mut accounts: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
114 for (i, filter) in account_filters.iter().enumerate() {
115 let key = format!("account_filter_{}", i);
116 accounts.insert(key, SubscribeRequestFilterAccounts {
117 account: filter.account.clone(),
118 owner: filter.owner.clone(),
119 filters: vec![],
120 nonempty_txn_signature: None,
121 });
122 }
123
124 let mut transactions: HashMap<String, SubscribeRequestFilterTransactions> = HashMap::new();
125 for (i, filter) in transaction_filters.iter().enumerate() {
126 let key = format!("transaction_filter_{}", i);
127 transactions.insert(key, SubscribeRequestFilterTransactions {
128 vote: Some(false),
129 failed: Some(false),
130 signature: None,
131 account_include: filter.account_include.clone(),
132 account_exclude: filter.account_exclude.clone(),
133 account_required: filter.account_required.clone(),
134 });
135 }
136
137 let request = SubscribeRequest {
138 slots: HashMap::new(),
139 accounts,
140 transactions,
141 transactions_status: HashMap::new(),
142 blocks: HashMap::new(),
143 blocks_meta: HashMap::new(),
144 entry: HashMap::new(),
145 commitment: Some(CommitmentLevel::Processed as i32),
146 accounts_data_slice: Vec::new(),
147 ping: None,
148 from_slot: None,
149 };
150
151 println!("📡 Subscribing to stream...");
152 let (_subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?;
153 println!("✅ Subscribed successfully - Zero Copy Mode");
154 println!("👂 Listening for events...");
155
156 let mut msg_count = 0u64;
157 while let Some(message) = stream.next().await {
158 match message {
159 Ok(update_msg) => {
160 msg_count += 1;
161 if msg_count % 100 == 0 {
162 println!("📨 Received {} messages", msg_count);
163 }
164
165 if let Some(update) = update_msg.update_oneof {
166 if let subscribe_update::UpdateOneof::Transaction(transaction_update) = update {
167 let grpc_recv_us = unsafe {
168 let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 };
169 libc::clock_gettime(libc::CLOCK_REALTIME, &mut ts);
170 (ts.tv_sec as i64) * 1_000_000 + (ts.tv_nsec as i64) / 1_000
171 };
172 Self::parse_transaction(&transaction_update, grpc_recv_us, &queue, event_type_filter.as_ref()).await;
173 }
174 }
175 },
176 Err(e) => {
177 error!("Stream error: {:?}", e);
178 println!("❌ Stream error: {:?}", e);
179 },
180 }
181 }
182
183 println!("⚠️ Stream ended");
184
185 Ok(())
186 }
187
188 async fn parse_transaction(
190 transaction_update: &SubscribeUpdateTransaction,
191 grpc_recv_us: i64,
192 queue: &Arc<ArrayQueue<DexEvent>>,
193 event_type_filter: Option<&EventTypeFilter>,
194 ) {
195 if let Some(transaction_info) = &transaction_update.transaction {
196 let tx_index = transaction_info.index;
198
199 if let Some(meta) = &transaction_info.meta {
200 let logs = &meta.log_messages;
201
202 if let Some(tx_msg) = &transaction_info.transaction {
203 if let Some(message) = &tx_msg.message {
204 let mut accounts = Vec::with_capacity(message.account_keys.len());
205 for key in &message.account_keys {
206 if key.len() == 32 {
207 let mut pubkey_bytes = [0u8; 32];
208 pubkey_bytes.copy_from_slice(key);
209 accounts.push(Pubkey::new_from_array(pubkey_bytes));
210 }
211 }
212
213 let signature = if let Some(sig) = tx_msg.signatures.first() {
214 if sig.len() == 64 {
215 let mut sig_array = [0u8; 64];
216 sig_array.copy_from_slice(sig);
217 solana_sdk::signature::Signature::from(sig_array)
218 } else {
219 solana_sdk::signature::Signature::default()
220 }
221 } else {
222 solana_sdk::signature::Signature::default()
223 };
224
225 let block_time = Some(chrono::Utc::now().timestamp());
226 let mut log_events_parsed = false;
227
228 for instruction in &message.instructions {
229 let program_id_index = instruction.program_id_index as usize;
230 if program_id_index < accounts.len() {
231 let _program_id = accounts[program_id_index];
232
233 Self::parse_events(
234 &accounts,
235 logs,
236 signature,
237 transaction_update.slot,
238 tx_index,
239 block_time,
240 grpc_recv_us,
241 queue,
242 &mut log_events_parsed,
243 event_type_filter,
244 );
245 }
246 }
247 }
248 }
249 }
250 }
251 }
252
253 #[inline]
255 fn parse_events(
256 _accounts: &[Pubkey],
257 logs: &[String],
258 signature: solana_sdk::signature::Signature,
259 slot: u64,
260 tx_index: u64,
261 block_time: Option<i64>,
262 grpc_recv_us: i64,
263 queue: &Arc<ArrayQueue<DexEvent>>,
264 log_events_parsed: &mut bool,
265 event_type_filter: Option<&EventTypeFilter>,
266 ) {
267 if !*log_events_parsed {
268 let has_create = event_type_filter
269 .map(|f| f.includes_pumpfun())
270 .unwrap_or(true)
271 && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
272
273 for log in logs.iter() {
274 let log_bytes = log.as_bytes();
275
276 if PROGRAM_DATA_FINDER.find(log_bytes).is_none() {
277 continue;
278 }
279
280 if let Some(log_event) = crate::logs::parse_log(log, signature, slot, tx_index, block_time, grpc_recv_us, event_type_filter, has_create) {
281 let _ = queue.push(log_event);
282 *log_events_parsed = true;
283 return;
284 }
285 }
286
287 *log_events_parsed = true;
288 }
289 }
290}