1use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::build_subscribe_request;
11use super::transaction_meta::try_yellowstone_signature;
12use super::types::*;
13use crate::core::{now_micros, EventMetadata}; use crate::instr::read_pubkey_fast;
15use crate::logs::timestamp_to_microseconds;
16use crate::DexEvent;
17use crossbeam_queue::ArrayQueue;
18use futures::{SinkExt, StreamExt};
19use log::error;
20use memchr::memmem;
21use once_cell::sync::Lazy;
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::{mpsc, Mutex};
25use tokio::time::{Duration, Instant};
26use yellowstone_grpc_client::{GeyserGrpcClient, ClientTlsConfig};
28use yellowstone_grpc_proto::prelude::*;
29
30static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
31 Lazy::new(|| memmem::Finder::new(b"Program data: "));
32
33#[derive(Clone)]
36pub struct YellowstoneGrpc {
37 endpoint: String,
38 token: Option<String>,
39 config: ClientConfig,
40 control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
41}
42
43impl YellowstoneGrpc {
44 pub fn new(
45 endpoint: String,
46 token: Option<String>,
47 ) -> Result<Self, Box<dyn std::error::Error>> {
48 crate::warmup::warmup_parser();
49 Ok(Self {
50 endpoint,
51 token,
52 config: ClientConfig::default(),
53 control_tx: Arc::new(Mutex::new(None)),
54 })
55 }
56
57 pub fn new_with_config(
58 endpoint: String,
59 token: Option<String>,
60 config: ClientConfig,
61 ) -> Result<Self, Box<dyn std::error::Error>> {
62 crate::warmup::warmup_parser();
63 Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
64 }
65
66 pub async fn subscribe_dex_events(
68 &self,
69 transaction_filters: Vec<TransactionFilter>,
70 account_filters: Vec<AccountFilter>,
71 event_type_filter: Option<EventTypeFilter>,
72 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
73 let queue = Arc::new(ArrayQueue::new(100_000));
74 let queue_clone = Arc::clone(&queue);
75 let self_clone = self.clone();
76
77 tokio::spawn(async move {
78 let mut delay = 1u64;
79 loop {
80 match self_clone
81 .stream_events(
82 &transaction_filters,
83 &account_filters,
84 &event_type_filter,
85 &queue_clone,
86 )
87 .await
88 {
89 Ok(_) => delay = 1,
90 Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
91 }
92 tokio::time::sleep(Duration::from_secs(delay)).await;
93 delay = (delay * 2).min(60);
94 }
95 });
96
97 Ok(queue)
98 }
99
100 pub async fn update_subscription(
102 &self,
103 transaction_filters: Vec<TransactionFilter>,
104 account_filters: Vec<AccountFilter>,
105 ) -> Result<(), Box<dyn std::error::Error>> {
106 let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
107
108 let request = build_subscribe_request(&transaction_filters, &account_filters);
109 sender.send(request).await.map_err(|e| e.to_string())?;
110 Ok(())
111 }
112
113 pub async fn stop(&self) {
114 println!("🛑 Stopping gRPC subscription...");
115 }
116
117 async fn stream_events(
120 &self,
121 tx_filters: &[TransactionFilter],
122 acc_filters: &[AccountFilter],
123 event_filter: &Option<EventTypeFilter>,
124 queue: &Arc<ArrayQueue<DexEvent>>,
125 ) -> Result<(), String> {
126 let _ = rustls::crypto::ring::default_provider().install_default();
127
128 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
130 .map_err(|e| e.to_string())?
131 .x_token(self.token.clone())
132 .map_err(|e| e.to_string())?
133 .max_decoding_message_size(1024 * 1024 * 1024);
134
135 if self.config.connection_timeout_ms > 0 {
136 builder =
137 builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
138 }
139 if self.config.enable_tls {
140 builder = builder
141 .tls_config(ClientTlsConfig::new().with_native_roots())
142 .map_err(|e| e.to_string())?;
143 }
144
145 let mut client = builder.connect().await.map_err(|e| e.to_string())?;
146 let request = build_subscribe_request(tx_filters, acc_filters);
147
148 let (subscribe_tx, mut stream) =
149 client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
150
151 self.print_mode_info();
152
153 let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
155 *self.control_tx.lock().await = Some(control_tx);
156 let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
157
158 let mut slot_buffer = SlotBuffer::new();
160 let mut micro_batch = MicroBatchBuffer::new();
161 let mut last_slot = 0u64;
162
163 let order_mode = self.config.order_mode;
164 let timeout_ms = self.config.order_timeout_ms;
165 let batch_us = self.config.micro_batch_us;
166 let check_interval = Duration::from_millis(timeout_ms / 2);
167 let mut next_check = Instant::now() + check_interval;
168
169 loop {
170 self.check_timeout(
172 order_mode,
173 &mut slot_buffer,
174 &mut micro_batch,
175 queue,
176 timeout_ms,
177 batch_us,
178 &mut next_check,
179 check_interval,
180 );
181
182 tokio::select! {
183 msg = stream.next() => {
184 match msg {
185 Some(Ok(update)) => {
186 self.handle_update(
187 update, order_mode, event_filter, queue,
188 &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
189 );
190 }
191 Some(Err(e)) => {
192 error!("Stream error: {:?}", e);
193 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
194 return Err(e.to_string());
195 }
196 None => {
197 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
198 return Ok(());
199 }
200 }
201 }
202 Some(req) = control_rx.recv() => {
203 if let Err(e) = subscribe_tx.lock().await.send(req).await {
204 return Err(e.to_string());
205 }
206 }
207 }
208 }
209 }
210
211 fn print_mode_info(&self) {
212 match self.config.order_mode {
213 OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
214 OrderMode::Ordered => {
215 println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
216 }
217 OrderMode::StreamingOrdered => {
218 println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
219 }
220 OrderMode::MicroBatch => {
221 println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
222 }
223 }
224 }
225
226 #[inline]
227 fn check_timeout(
228 &self,
229 mode: OrderMode,
230 slot_buf: &mut SlotBuffer,
231 micro_buf: &mut MicroBatchBuffer,
232 queue: &Arc<ArrayQueue<DexEvent>>,
233 timeout_ms: u64,
234 batch_us: u64,
235 next_check: &mut Instant,
236 interval: Duration,
237 ) {
238 if Instant::now() < *next_check {
239 return;
240 }
241 *next_check = Instant::now() + interval;
242
243 match mode {
244 OrderMode::Ordered => {
245 if slot_buf.should_timeout(timeout_ms) {
246 for e in slot_buf.flush_all() {
247 let _ = queue.push(e);
248 }
249 }
250 }
251 OrderMode::StreamingOrdered => {
252 if slot_buf.should_timeout(timeout_ms) {
253 for e in slot_buf.flush_streaming_timeout() {
254 let _ = queue.push(e);
255 }
256 }
257 }
258 OrderMode::MicroBatch => {
259 let now_us = get_timestamp_us();
261 if micro_buf.should_flush(now_us, batch_us) {
262 for e in micro_buf.flush() {
263 let _ = queue.push(e);
264 }
265 }
266 }
267 OrderMode::Unordered => {}
268 }
269 }
270
271 fn flush_on_disconnect(
272 &self,
273 mode: OrderMode,
274 buffer: &mut SlotBuffer,
275 queue: &Arc<ArrayQueue<DexEvent>>,
276 ) {
277 if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
278 let events = match mode {
279 OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
280 _ => buffer.flush_all(),
281 };
282 for e in events {
283 let _ = queue.push(e);
284 }
285 }
286 }
287
288 #[inline]
289 fn handle_update(
290 &self,
291 update_msg: SubscribeUpdate,
292 mode: OrderMode,
293 filter: &Option<EventTypeFilter>,
294 queue: &Arc<ArrayQueue<DexEvent>>,
295 slot_buf: &mut SlotBuffer,
296 micro_buf: &mut MicroBatchBuffer,
297 last_slot: &mut u64,
298 batch_us: u64,
299 ) {
300 let created_at = update_msg.created_at.unwrap_or_default();
301 let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
302 let grpc_recv_us = get_timestamp_us();
303
304 let Some(update) = update_msg.update_oneof else { return };
305
306 match update {
307 subscribe_update::UpdateOneof::Transaction(tx) => {
308 self.handle_transaction(
309 tx,
310 mode,
311 filter,
312 queue,
313 slot_buf,
314 micro_buf,
315 last_slot,
316 batch_us,
317 grpc_recv_us,
318 block_time_us,
319 );
320 }
321 subscribe_update::UpdateOneof::Account(acc) => {
322 Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
323 }
324 _ => {}
325 }
326 }
327
328 #[inline]
329 fn handle_transaction(
330 &self,
331 tx: SubscribeUpdateTransaction,
332 mode: OrderMode,
333 filter: &Option<EventTypeFilter>,
334 queue: &Arc<ArrayQueue<DexEvent>>,
335 slot_buf: &mut SlotBuffer,
336 micro_buf: &mut MicroBatchBuffer,
337 last_slot: &mut u64,
338 batch_us: u64,
339 grpc_us: i64,
340 block_us: i64,
341 ) {
342 let slot = tx.slot;
343
344 match mode {
345 OrderMode::Unordered => {
346 for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
347 let _ = queue.push(e);
348 }
349 }
350 OrderMode::Ordered => {
351 if slot > *last_slot && *last_slot > 0 {
352 for e in slot_buf.flush_before(slot) {
353 let _ = queue.push(e);
354 }
355 }
356 *last_slot = slot;
357 for (idx, e) in
358 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
359 {
360 slot_buf.push(slot, idx, e);
361 }
362 }
363 OrderMode::StreamingOrdered => {
364 for (idx, e) in
365 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
366 {
367 for evt in slot_buf.push_streaming(slot, idx, e) {
368 let _ = queue.push(evt);
369 }
370 }
371 }
372 OrderMode::MicroBatch => {
373 for (idx, e) in
374 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
375 {
376 if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
377 for evt in micro_buf.flush() {
378 let _ = queue.push(evt);
379 }
380 }
381 }
382 }
383 }
384 }
385
386 #[inline]
387 fn handle_account(
388 acc: SubscribeUpdateAccount,
389 filter: &Option<EventTypeFilter>,
390 queue: &Arc<ArrayQueue<DexEvent>>,
391 grpc_us: i64,
392 block_us: i64,
393 ) {
394 let Some(info) = acc.account else { return };
395 let data = crate::accounts::AccountData {
396 pubkey: read_pubkey_fast(&info.pubkey),
397 executable: info.executable,
398 lamports: info.lamports,
399 owner: read_pubkey_fast(&info.owner),
400 rent_epoch: info.rent_epoch,
401 data: info.data,
402 };
403 let meta = EventMetadata {
404 signature: Default::default(),
405 slot: acc.slot,
406 tx_index: 0,
407 block_time_us: block_us,
408 grpc_recv_us: grpc_us,
409 recent_blockhash: None,
410 };
411 if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
412 let _ = queue.push(e);
413 }
414 }
415}
416
417#[inline(always)]
428fn get_timestamp_us() -> i64 {
429 now_micros()
430}
431
432#[inline]
435fn parse_transaction_to_vec(
436 tx: &SubscribeUpdateTransaction,
437 grpc_us: i64,
438 block_us: Option<i64>,
439 filter: Option<&EventTypeFilter>,
440) -> Vec<(u64, DexEvent)> {
441 let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
442 parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
443}
444
445#[inline]
446fn parse_transaction_core(
447 tx: &SubscribeUpdateTransaction,
448 grpc_us: i64,
449 block_us: Option<i64>,
450 filter: Option<&EventTypeFilter>,
451) -> Vec<DexEvent> {
452 let Some(info) = &tx.transaction else { return Vec::new() };
453 let Some(meta) = &info.meta else { return Vec::new() };
454
455 let sig = extract_signature(&info.signature);
456 let slot = tx.slot;
457 let idx = info.index;
458
459 let (log_events, instr_events) = rayon::join(
461 || {
462 parse_logs(
463 meta,
464 &info.transaction,
465 &meta.log_messages,
466 sig,
467 slot,
468 idx,
469 block_us,
470 grpc_us,
471 filter,
472 )
473 },
474 || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
475 );
476
477 let mut result = Vec::with_capacity(log_events.len() + instr_events.len());
478 result.extend(log_events);
479 result.extend(instr_events);
480 result
481}
482
483#[inline(always)]
484fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
485 try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
486}
487
488#[inline]
489fn parse_logs(
490 meta: &TransactionStatusMeta,
491 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
492 logs: &[String],
493 sig: solana_sdk::signature::Signature,
494 slot: u64,
495 tx_idx: u64,
496 block_us: Option<i64>,
497 grpc_us: i64,
498 filter: Option<&EventTypeFilter>,
499) -> Vec<DexEvent> {
500 let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
501 if m.recent_blockhash.is_empty() {
502 None
503 } else {
504 Some(m.recent_blockhash.clone())
505 }
506 });
507
508 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
509 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
510
511 let mut outer_idx: i32 = -1;
512 let mut inner_idx: i32 = -1;
513 let mut invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::with_capacity(8);
514 let mut result = Vec::with_capacity(4);
515
516 for log in logs {
517 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
518 if depth == 1 {
519 inner_idx = -1;
520 outer_idx += 1;
521 } else {
522 inner_idx += 1;
523 }
524 invokes.entry(pid).or_default().push((outer_idx, inner_idx));
525 }
526
527 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() {
528 continue;
529 }
530
531 if let Some(mut e) = crate::logs::parse_log(
532 log,
533 sig,
534 slot,
535 tx_idx,
536 block_us,
537 grpc_us,
538 filter,
539 has_create,
540 recent_blockhash.as_deref(),
541 ) {
542 crate::core::account_dispatcher::fill_accounts_from_transaction_data(
543 &mut e,
544 meta,
545 transaction,
546 &invokes,
547 );
548 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
549 result.push(e);
550 }
551 }
552 result
553}
554
555#[inline]
556fn parse_instructions(
557 meta: &TransactionStatusMeta,
558 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
559 sig: solana_sdk::signature::Signature,
560 slot: u64,
561 tx_idx: u64,
562 block_us: Option<i64>,
563 grpc_us: i64,
564 filter: Option<&EventTypeFilter>,
565) -> Vec<DexEvent> {
566 crate::grpc::instruction_parser::parse_instructions_enhanced(
572 meta,
573 transaction,
574 sig,
575 slot,
576 tx_idx,
577 block_us,
578 grpc_us,
579 filter,
580 )
581}