1use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::types::*;
11use crate::core::{EventMetadata, now_micros}; use crate::instr::read_pubkey_fast;
13use crate::logs::timestamp_to_microseconds;
14use crate::DexEvent;
15use crossbeam_queue::ArrayQueue;
16use futures::{SinkExt, StreamExt};
17use log::error;
18use memchr::memmem;
19use once_cell::sync::Lazy;
20use std::collections::HashMap;
21use std::sync::Arc;
22use tokio::sync::{mpsc, Mutex};
23use tokio::time::{Duration, Instant};
24use tonic::transport::ClientTlsConfig;
25use yellowstone_grpc_client::GeyserGrpcClient;
26use yellowstone_grpc_proto::prelude::*;
27
28static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
29 Lazy::new(|| memmem::Finder::new(b"Program data: "));
30
31#[derive(Clone)]
34pub struct YellowstoneGrpc {
35 endpoint: String,
36 token: Option<String>,
37 config: ClientConfig,
38 control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
39}
40
41impl YellowstoneGrpc {
42 pub fn new(endpoint: String, token: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
43 crate::warmup::warmup_parser();
44 Ok(Self {
45 endpoint,
46 token,
47 config: ClientConfig::default(),
48 control_tx: Arc::new(Mutex::new(None)),
49 })
50 }
51
52 pub fn new_with_config(
53 endpoint: String,
54 token: Option<String>,
55 config: ClientConfig,
56 ) -> Result<Self, Box<dyn std::error::Error>> {
57 crate::warmup::warmup_parser();
58 Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
59 }
60
61 pub async fn subscribe_dex_events(
63 &self,
64 transaction_filters: Vec<TransactionFilter>,
65 account_filters: Vec<AccountFilter>,
66 event_type_filter: Option<EventTypeFilter>,
67 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
68 let queue = Arc::new(ArrayQueue::new(100_000));
69 let queue_clone = Arc::clone(&queue);
70 let self_clone = self.clone();
71
72 tokio::spawn(async move {
73 let mut delay = 1u64;
74 loop {
75 match self_clone.stream_events(&transaction_filters, &account_filters, &event_type_filter, &queue_clone).await {
76 Ok(_) => delay = 1,
77 Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
78 }
79 tokio::time::sleep(Duration::from_secs(delay)).await;
80 delay = (delay * 2).min(60);
81 }
82 });
83
84 Ok(queue)
85 }
86
87 pub async fn update_subscription(
89 &self,
90 transaction_filters: Vec<TransactionFilter>,
91 account_filters: Vec<AccountFilter>,
92 ) -> Result<(), Box<dyn std::error::Error>> {
93 let sender = self.control_tx.lock().await
94 .as_ref()
95 .ok_or("No active subscription")?
96 .clone();
97
98 let request = build_subscribe_request(&transaction_filters, &account_filters);
99 sender.send(request).await.map_err(|e| e.to_string())?;
100 Ok(())
101 }
102
103 pub async fn stop(&self) {
104 println!("🛑 Stopping gRPC subscription...");
105 }
106
107 async fn stream_events(
110 &self,
111 tx_filters: &[TransactionFilter],
112 acc_filters: &[AccountFilter],
113 event_filter: &Option<EventTypeFilter>,
114 queue: &Arc<ArrayQueue<DexEvent>>,
115 ) -> Result<(), String> {
116 let _ = rustls::crypto::ring::default_provider().install_default();
117
118 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
120 .map_err(|e| e.to_string())?
121 .x_token(self.token.clone())
122 .map_err(|e| e.to_string())?
123 .max_decoding_message_size(1024 * 1024 * 1024);
124
125 if self.config.connection_timeout_ms > 0 {
126 builder = builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
127 }
128 if self.config.enable_tls {
129 builder = builder.tls_config(ClientTlsConfig::new().with_native_roots()).map_err(|e| e.to_string())?;
130 }
131
132 let mut client = builder.connect().await.map_err(|e| e.to_string())?;
133 let request = build_subscribe_request(tx_filters, acc_filters);
134
135 let (subscribe_tx, mut stream) = client
136 .subscribe_with_request(Some(request))
137 .await
138 .map_err(|e| e.to_string())?;
139
140 self.print_mode_info();
141
142 let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
144 *self.control_tx.lock().await = Some(control_tx);
145 let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
146
147 let mut slot_buffer = SlotBuffer::new();
149 let mut micro_batch = MicroBatchBuffer::new();
150 let mut last_slot = 0u64;
151
152 let order_mode = self.config.order_mode;
153 let timeout_ms = self.config.order_timeout_ms;
154 let batch_us = self.config.micro_batch_us;
155 let check_interval = Duration::from_millis(timeout_ms / 2);
156 let mut next_check = Instant::now() + check_interval;
157
158 loop {
159 self.check_timeout(
161 order_mode, &mut slot_buffer, &mut micro_batch, queue,
162 timeout_ms, batch_us, &mut next_check, check_interval
163 );
164
165 tokio::select! {
166 msg = stream.next() => {
167 match msg {
168 Some(Ok(update)) => {
169 self.handle_update(
170 update, order_mode, event_filter, queue,
171 &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
172 );
173 }
174 Some(Err(e)) => {
175 error!("Stream error: {:?}", e);
176 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
177 return Err(e.to_string());
178 }
179 None => {
180 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
181 return Ok(());
182 }
183 }
184 }
185 Some(req) = control_rx.recv() => {
186 if let Err(e) = subscribe_tx.lock().await.send(req).await {
187 return Err(e.to_string());
188 }
189 }
190 }
191 }
192 }
193
194 fn print_mode_info(&self) {
195 match self.config.order_mode {
196 OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
197 OrderMode::Ordered => println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms),
198 OrderMode::StreamingOrdered => println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms),
199 OrderMode::MicroBatch => println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us),
200 }
201 }
202
203 #[inline]
204 fn check_timeout(
205 &self,
206 mode: OrderMode,
207 slot_buf: &mut SlotBuffer,
208 micro_buf: &mut MicroBatchBuffer,
209 queue: &Arc<ArrayQueue<DexEvent>>,
210 timeout_ms: u64,
211 batch_us: u64,
212 next_check: &mut Instant,
213 interval: Duration,
214 ) {
215 if Instant::now() < *next_check {
216 return;
217 }
218 *next_check = Instant::now() + interval;
219
220 match mode {
221 OrderMode::Ordered => {
222 if slot_buf.should_timeout(timeout_ms) {
223 for e in slot_buf.flush_all() { let _ = queue.push(e); }
224 }
225 }
226 OrderMode::StreamingOrdered => {
227 if slot_buf.should_timeout(timeout_ms) {
228 for e in slot_buf.flush_streaming_timeout() { let _ = queue.push(e); }
229 }
230 }
231 OrderMode::MicroBatch => {
232 let now_us = get_timestamp_us();
234 if micro_buf.should_flush(now_us, batch_us) {
235 for e in micro_buf.flush() { let _ = queue.push(e); }
236 }
237 }
238 OrderMode::Unordered => {}
239 }
240 }
241
242 fn flush_on_disconnect(&self, mode: OrderMode, buffer: &mut SlotBuffer, queue: &Arc<ArrayQueue<DexEvent>>) {
243 if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
244 let events = match mode {
245 OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
246 _ => buffer.flush_all(),
247 };
248 for e in events { let _ = queue.push(e); }
249 }
250 }
251
252 #[inline]
253 fn handle_update(
254 &self,
255 update_msg: SubscribeUpdate,
256 mode: OrderMode,
257 filter: &Option<EventTypeFilter>,
258 queue: &Arc<ArrayQueue<DexEvent>>,
259 slot_buf: &mut SlotBuffer,
260 micro_buf: &mut MicroBatchBuffer,
261 last_slot: &mut u64,
262 batch_us: u64,
263 ) {
264 let block_time_us = timestamp_to_microseconds(&update_msg.created_at.unwrap_or_default()) as i64;
265 let grpc_recv_us = get_timestamp_us();
266
267 let Some(update) = update_msg.update_oneof else { return };
268
269 match update {
270 subscribe_update::UpdateOneof::Transaction(tx) => {
271 self.handle_transaction(tx, mode, filter, queue, slot_buf, micro_buf, last_slot, batch_us, grpc_recv_us, block_time_us);
272 }
273 subscribe_update::UpdateOneof::Account(acc) => {
274 Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
275 }
276 _ => {}
277 }
278 }
279
280 #[inline]
281 fn handle_transaction(
282 &self,
283 tx: SubscribeUpdateTransaction,
284 mode: OrderMode,
285 filter: &Option<EventTypeFilter>,
286 queue: &Arc<ArrayQueue<DexEvent>>,
287 slot_buf: &mut SlotBuffer,
288 micro_buf: &mut MicroBatchBuffer,
289 last_slot: &mut u64,
290 batch_us: u64,
291 grpc_us: i64,
292 block_us: i64,
293 ) {
294 let slot = tx.slot;
295
296 match mode {
297 OrderMode::Unordered => {
298 for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
299 let _ = queue.push(e);
300 }
301 }
302 OrderMode::Ordered => {
303 if slot > *last_slot && *last_slot > 0 {
304 for e in slot_buf.flush_before(slot) { let _ = queue.push(e); }
305 }
306 *last_slot = slot;
307 for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
308 slot_buf.push(slot, idx, e);
309 }
310 }
311 OrderMode::StreamingOrdered => {
312 for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
313 for evt in slot_buf.push_streaming(slot, idx, e) {
314 let _ = queue.push(evt);
315 }
316 }
317 }
318 OrderMode::MicroBatch => {
319 for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
320 if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
321 for evt in micro_buf.flush() { let _ = queue.push(evt); }
322 }
323 }
324 }
325 }
326 }
327
328 #[inline]
329 fn handle_account(
330 acc: SubscribeUpdateAccount,
331 filter: &Option<EventTypeFilter>,
332 queue: &Arc<ArrayQueue<DexEvent>>,
333 grpc_us: i64,
334 block_us: i64,
335 ) {
336 let Some(info) = acc.account else { return };
337 let data = crate::accounts::AccountData {
338 pubkey: read_pubkey_fast(&info.pubkey),
339 executable: info.executable,
340 lamports: info.lamports,
341 owner: read_pubkey_fast(&info.owner),
342 rent_epoch: info.rent_epoch,
343 data: info.data,
344 };
345 let meta = EventMetadata {
346 signature: Default::default(),
347 slot: acc.slot,
348 tx_index: 0,
349 block_time_us: block_us,
350 grpc_recv_us: grpc_us,
351 recent_blockhash: None,
352 };
353 if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
354 let _ = queue.push(e);
355 }
356 }
357}
358
359#[inline(always)]
370fn get_timestamp_us() -> i64 {
371 now_micros()
372}
373
374fn build_subscribe_request(tx_filters: &[TransactionFilter], acc_filters: &[AccountFilter]) -> SubscribeRequest {
375 let transactions = tx_filters.iter().enumerate().map(|(i, f)| {
376 (format!("tx_{}", i), SubscribeRequestFilterTransactions {
377 vote: Some(false),
378 failed: Some(false),
379 signature: None,
380 account_include: f.account_include.clone(),
381 account_exclude: f.account_exclude.clone(),
382 account_required: f.account_required.clone(),
383 })
384 }).collect();
385
386 let accounts = acc_filters.iter().enumerate().map(|(i, f)| {
387 (format!("acc_{}", i), SubscribeRequestFilterAccounts {
388 account: f.account.clone(),
389 owner: f.owner.clone(),
390 filters: f.filters.clone(),
391 nonempty_txn_signature: None,
392 })
393 }).collect();
394
395 SubscribeRequest {
396 slots: HashMap::new(),
397 accounts,
398 transactions,
399 transactions_status: HashMap::new(),
400 blocks: HashMap::new(),
401 blocks_meta: HashMap::new(),
402 entry: HashMap::new(),
403 commitment: Some(CommitmentLevel::Processed as i32),
404 accounts_data_slice: Vec::new(),
405 ping: None,
406 from_slot: None,
407 }
408}
409
410#[inline]
413fn parse_transaction_to_vec(
414 tx: &SubscribeUpdateTransaction,
415 grpc_us: i64,
416 block_us: Option<i64>,
417 filter: Option<&EventTypeFilter>,
418) -> Vec<(u64, DexEvent)> {
419 let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
420 parse_transaction_core(tx, grpc_us, block_us, filter)
421 .into_iter()
422 .map(|e| (idx, e))
423 .collect()
424}
425
426#[inline]
427fn parse_transaction_core(
428 tx: &SubscribeUpdateTransaction,
429 grpc_us: i64,
430 block_us: Option<i64>,
431 filter: Option<&EventTypeFilter>,
432) -> Vec<DexEvent> {
433 let Some(info) = &tx.transaction else { return Vec::new() };
434 let Some(meta) = &info.meta else { return Vec::new() };
435
436 let sig = extract_signature(&info.signature);
437 let slot = tx.slot;
438 let idx = info.index;
439
440 let (log_events, instr_events) = rayon::join(
442 || parse_logs(meta, &info.transaction, &meta.log_messages, sig, slot, idx, block_us, grpc_us, filter),
443 || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
444 );
445
446 let mut result = Vec::with_capacity(log_events.len() + instr_events.len());
447 result.extend(log_events);
448 result.extend(instr_events);
449 result
450}
451
452#[inline(always)]
453fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
454 let mut arr = [0u8; 64];
455 arr.copy_from_slice(bytes);
456 solana_sdk::signature::Signature::from(arr)
457}
458
459#[inline]
460fn parse_logs(
461 meta: &TransactionStatusMeta,
462 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
463 logs: &[String],
464 sig: solana_sdk::signature::Signature,
465 slot: u64,
466 tx_idx: u64,
467 block_us: Option<i64>,
468 grpc_us: i64,
469 filter: Option<&EventTypeFilter>,
470) -> Vec<DexEvent> {
471 let recent_blockhash = transaction
472 .as_ref()
473 .and_then(|t| t.message.as_ref())
474 .and_then(|m| {
475 if m.recent_blockhash.is_empty() {
476 None
477 } else {
478 Some(m.recent_blockhash.clone())
479 }
480 });
481
482 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
483 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
484
485 let mut outer_idx: i32 = -1;
486 let mut inner_idx: i32 = -1;
487 let mut invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::with_capacity(8);
488 let mut result = Vec::with_capacity(4);
489
490 for log in logs {
491 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
492 if depth == 1 { inner_idx = -1; outer_idx += 1; } else { inner_idx += 1; }
493 invokes.entry(pid).or_default().push((outer_idx, inner_idx));
494 }
495
496 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() { continue; }
497
498 if let Some(mut e) = crate::logs::parse_log(log, sig, slot, tx_idx, block_us, grpc_us, filter, has_create, recent_blockhash.as_deref()) {
499 crate::core::account_dispatcher::fill_accounts_from_transaction_data(&mut e, meta, transaction, &invokes);
500 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
501 result.push(e);
502 }
503 }
504 result
505}
506
507#[inline]
508fn parse_instructions(
509 meta: &TransactionStatusMeta,
510 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
511 sig: solana_sdk::signature::Signature,
512 slot: u64,
513 tx_idx: u64,
514 block_us: Option<i64>,
515 grpc_us: i64,
516 filter: Option<&EventTypeFilter>,
517) -> Vec<DexEvent> {
518 crate::grpc::instruction_parser::parse_instructions_enhanced(
524 meta,
525 transaction,
526 sig,
527 slot,
528 tx_idx,
529 block_us,
530 grpc_us,
531 filter,
532 )
533}