1use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::{
11 build_subscribe_request, build_subscribe_request_with_event_filter,
12};
13use super::transaction_meta::try_yellowstone_signature;
14use super::types::*;
15use crate::core::{now_micros, EventMetadata}; use crate::instr::read_pubkey_fast;
17use crate::logs::timestamp_to_microseconds;
18use crate::DexEvent;
19use crossbeam_queue::ArrayQueue;
20use futures::{SinkExt, StreamExt};
21use log::error;
22use memchr::memmem;
23use once_cell::sync::Lazy;
24use solana_sdk::pubkey::Pubkey;
25use std::collections::HashMap;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::{mpsc, Mutex};
29use tokio::time::{Duration, Instant};
30use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
32use yellowstone_grpc_proto::prelude::*;
33
34static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
35 Lazy::new(|| memmem::Finder::new(b"Program data: "));
36
37#[derive(Clone)]
40pub struct YellowstoneGrpc {
41 endpoint: String,
42 token: Option<String>,
43 config: ClientConfig,
44 control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
45}
46
47impl YellowstoneGrpc {
48 pub fn new(
49 endpoint: String,
50 token: Option<String>,
51 ) -> Result<Self, Box<dyn std::error::Error>> {
52 crate::warmup::warmup_parser();
53 Ok(Self {
54 endpoint,
55 token,
56 config: ClientConfig::default(),
57 control_tx: Arc::new(Mutex::new(None)),
58 })
59 }
60
61 pub fn new_with_config(
62 endpoint: String,
63 token: Option<String>,
64 config: ClientConfig,
65 ) -> Result<Self, Box<dyn std::error::Error>> {
66 crate::warmup::warmup_parser();
67 Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
68 }
69
70 pub async fn subscribe_dex_events(
72 &self,
73 transaction_filters: Vec<TransactionFilter>,
74 account_filters: Vec<AccountFilter>,
75 event_type_filter: Option<EventTypeFilter>,
76 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
77 let queue = Arc::new(ArrayQueue::new(self.config.buffer_size.max(1)));
78 let queue_clone = Arc::clone(&queue);
79 let self_clone = self.clone();
80
81 tokio::spawn(async move {
82 let mut delay = 1u64;
83 loop {
84 match self_clone
85 .stream_events(
86 &transaction_filters,
87 &account_filters,
88 &event_type_filter,
89 &queue_clone,
90 )
91 .await
92 {
93 Ok(_) => delay = 1,
94 Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
95 }
96 tokio::time::sleep(Duration::from_secs(delay)).await;
97 delay = (delay * 2).min(60);
98 }
99 });
100
101 Ok(queue)
102 }
103
104 pub async fn update_subscription(
106 &self,
107 transaction_filters: Vec<TransactionFilter>,
108 account_filters: Vec<AccountFilter>,
109 ) -> Result<(), Box<dyn std::error::Error>> {
110 let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
111
112 let request = build_subscribe_request(&transaction_filters, &account_filters);
113 sender.send(request).await.map_err(|e| e.to_string())?;
114 Ok(())
115 }
116
117 pub async fn stop(&self) {
118 println!("🛑 Stopping gRPC subscription...");
119 }
120
121 async fn stream_events(
124 &self,
125 tx_filters: &[TransactionFilter],
126 acc_filters: &[AccountFilter],
127 event_filter: &Option<EventTypeFilter>,
128 queue: &Arc<ArrayQueue<DexEvent>>,
129 ) -> Result<(), String> {
130 let _ = rustls::crypto::ring::default_provider().install_default();
131
132 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
134 .map_err(|e| e.to_string())?
135 .x_token(self.token.clone())
136 .map_err(|e| e.to_string())?
137 .max_decoding_message_size(1024 * 1024 * 1024);
138
139 if self.config.connection_timeout_ms > 0 {
140 builder =
141 builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
142 }
143 if self.config.enable_tls {
144 builder = builder
145 .tls_config(ClientTlsConfig::new().with_native_roots())
146 .map_err(|e| e.to_string())?;
147 }
148
149 let mut client = builder.connect().await.map_err(|e| e.to_string())?;
150 let request = build_subscribe_request_with_event_filter(
151 tx_filters,
152 acc_filters,
153 event_filter.as_ref(),
154 CommitmentLevel::Processed,
155 );
156
157 let (subscribe_tx, mut stream) =
158 client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
159
160 self.print_mode_info();
161
162 let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
164 *self.control_tx.lock().await = Some(control_tx);
165 let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
166
167 let mut slot_buffer = SlotBuffer::new();
169 let mut micro_batch = MicroBatchBuffer::new();
170 let mut last_slot = 0u64;
171
172 let order_mode = self.config.order_mode;
173 let timeout_ms = self.config.order_timeout_ms;
174 let batch_us = self.config.micro_batch_us;
175 let check_interval = Duration::from_millis(timeout_ms / 2);
176 let mut next_check = Instant::now() + check_interval;
177
178 loop {
179 self.check_timeout(
181 order_mode,
182 &mut slot_buffer,
183 &mut micro_batch,
184 queue,
185 timeout_ms,
186 batch_us,
187 &mut next_check,
188 check_interval,
189 );
190
191 tokio::select! {
192 msg = stream.next() => {
193 match msg {
194 Some(Ok(update)) => {
195 if matches!(
197 update.update_oneof.as_ref(),
198 Some(subscribe_update::UpdateOneof::Ping(_))
199 ) {
200 if let Err(e) = subscribe_tx
201 .lock()
202 .await
203 .send(SubscribeRequest {
204 ping: Some(SubscribeRequestPing { id: 1 }),
205 ..Default::default()
206 })
207 .await
208 {
209 return Err(e.to_string());
210 }
211 continue;
212 }
213 self.handle_update(
214 update, order_mode, event_filter, queue,
215 &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
216 );
217 }
218 Some(Err(e)) => {
219 error!("Stream error: {:?}", e);
220 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
221 return Err(e.to_string());
222 }
223 None => {
224 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
225 return Ok(());
226 }
227 }
228 }
229 Some(req) = control_rx.recv() => {
230 if let Err(e) = subscribe_tx.lock().await.send(req).await {
231 return Err(e.to_string());
232 }
233 }
234 }
235 }
236 }
237
238 fn print_mode_info(&self) {
239 match self.config.order_mode {
240 OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
241 OrderMode::Ordered => {
242 println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
243 }
244 OrderMode::StreamingOrdered => {
245 println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
246 }
247 OrderMode::MicroBatch => {
248 println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
249 }
250 }
251 }
252
253 #[inline]
254 fn check_timeout(
255 &self,
256 mode: OrderMode,
257 slot_buf: &mut SlotBuffer,
258 micro_buf: &mut MicroBatchBuffer,
259 queue: &Arc<ArrayQueue<DexEvent>>,
260 timeout_ms: u64,
261 batch_us: u64,
262 next_check: &mut Instant,
263 interval: Duration,
264 ) {
265 if Instant::now() < *next_check {
266 return;
267 }
268 *next_check = Instant::now() + interval;
269
270 match mode {
271 OrderMode::Ordered => {
272 if slot_buf.should_timeout(timeout_ms) {
273 for e in slot_buf.flush_all() {
274 let _ = queue.push(e);
275 }
276 }
277 }
278 OrderMode::StreamingOrdered => {
279 if slot_buf.should_timeout(timeout_ms) {
280 for e in slot_buf.flush_streaming_timeout() {
281 let _ = queue.push(e);
282 }
283 }
284 }
285 OrderMode::MicroBatch => {
286 let now_us = get_timestamp_us();
288 if micro_buf.should_flush(now_us, batch_us) {
289 for e in micro_buf.flush() {
290 let _ = queue.push(e);
291 }
292 }
293 }
294 OrderMode::Unordered => {}
295 }
296 }
297
298 fn flush_on_disconnect(
299 &self,
300 mode: OrderMode,
301 buffer: &mut SlotBuffer,
302 queue: &Arc<ArrayQueue<DexEvent>>,
303 ) {
304 if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
305 let events = match mode {
306 OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
307 _ => buffer.flush_all(),
308 };
309 for e in events {
310 let _ = queue.push(e);
311 }
312 }
313 }
314
315 #[inline]
316 fn handle_update(
317 &self,
318 update_msg: SubscribeUpdate,
319 mode: OrderMode,
320 filter: &Option<EventTypeFilter>,
321 queue: &Arc<ArrayQueue<DexEvent>>,
322 slot_buf: &mut SlotBuffer,
323 micro_buf: &mut MicroBatchBuffer,
324 last_slot: &mut u64,
325 batch_us: u64,
326 ) {
327 let created_at = update_msg.created_at.unwrap_or_default();
328 let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
329 let grpc_recv_us = get_timestamp_us();
330
331 let Some(update) = update_msg.update_oneof else { return };
332
333 match update {
334 subscribe_update::UpdateOneof::Transaction(tx) => {
335 self.handle_transaction(
336 tx,
337 mode,
338 filter,
339 queue,
340 slot_buf,
341 micro_buf,
342 last_slot,
343 batch_us,
344 grpc_recv_us,
345 block_time_us,
346 );
347 }
348 subscribe_update::UpdateOneof::Account(acc) => {
349 Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
350 }
351 subscribe_update::UpdateOneof::BlockMeta(block_meta) => {
352 Self::handle_block_meta(block_meta, filter, queue, grpc_recv_us, block_time_us);
353 }
354 _ => {}
355 }
356 }
357
358 #[inline]
359 fn handle_transaction(
360 &self,
361 tx: SubscribeUpdateTransaction,
362 mode: OrderMode,
363 filter: &Option<EventTypeFilter>,
364 queue: &Arc<ArrayQueue<DexEvent>>,
365 slot_buf: &mut SlotBuffer,
366 micro_buf: &mut MicroBatchBuffer,
367 last_slot: &mut u64,
368 batch_us: u64,
369 grpc_us: i64,
370 block_us: i64,
371 ) {
372 let slot = tx.slot;
373
374 match mode {
375 OrderMode::Unordered => {
376 for e in crate::grpc::parse_subscribe_update_transaction_low_latency(
377 &tx,
378 grpc_us,
379 Some(block_us),
380 filter.as_ref(),
381 ) {
382 let _ = queue.push(e);
383 }
384 }
385 OrderMode::Ordered => {
386 if slot > *last_slot && *last_slot > 0 {
387 for e in slot_buf.flush_before(slot) {
388 let _ = queue.push(e);
389 }
390 }
391 *last_slot = slot;
392 for (idx, e) in
393 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
394 {
395 slot_buf.push(slot, idx, e);
396 }
397 }
398 OrderMode::StreamingOrdered => {
399 for (idx, e) in
400 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
401 {
402 for evt in slot_buf.push_streaming(slot, idx, e) {
403 let _ = queue.push(evt);
404 }
405 }
406 }
407 OrderMode::MicroBatch => {
408 for (idx, e) in
409 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
410 {
411 if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
412 for evt in micro_buf.flush() {
413 let _ = queue.push(evt);
414 }
415 }
416 }
417 }
418 }
419 }
420
421 #[inline]
422 fn handle_account(
423 acc: SubscribeUpdateAccount,
424 filter: &Option<EventTypeFilter>,
425 queue: &Arc<ArrayQueue<DexEvent>>,
426 grpc_us: i64,
427 block_us: i64,
428 ) {
429 let Some(info) = acc.account else { return };
430 let data = crate::accounts::AccountData {
431 pubkey: read_pubkey_fast(&info.pubkey),
432 executable: info.executable,
433 lamports: info.lamports,
434 owner: read_pubkey_fast(&info.owner),
435 rent_epoch: info.rent_epoch,
436 data: info.data,
437 };
438 let meta = EventMetadata {
439 signature: Default::default(),
440 slot: acc.slot,
441 tx_index: 0,
442 block_time_us: block_us,
443 grpc_recv_us: grpc_us,
444 recent_blockhash: None,
445 };
446 if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
447 let _ = queue.push(e);
448 }
449 }
450
451 #[inline]
452 fn handle_block_meta(
453 block_meta: SubscribeUpdateBlockMeta,
454 filter: &Option<EventTypeFilter>,
455 queue: &Arc<ArrayQueue<DexEvent>>,
456 grpc_us: i64,
457 fallback_block_us: i64,
458 ) {
459 let block_time_us = block_meta
460 .block_time
461 .as_ref()
462 .map(|t| t.timestamp.saturating_mul(1_000_000))
463 .unwrap_or(fallback_block_us);
464 let event = DexEvent::BlockMeta(crate::core::events::BlockMetaEvent {
465 metadata: EventMetadata {
466 signature: Default::default(),
467 slot: block_meta.slot,
468 tx_index: 0,
469 block_time_us,
470 grpc_recv_us: grpc_us,
471 recent_blockhash: (!block_meta.blockhash.is_empty())
472 .then_some(block_meta.blockhash),
473 },
474 });
475 if filter.as_ref().map(|f| f.should_include_dex_event(&event)).unwrap_or(true) {
476 let _ = queue.push(event);
477 }
478 }
479}
480
481#[inline(always)]
492fn get_timestamp_us() -> i64 {
493 now_micros()
494}
495
496#[inline]
499fn parse_transaction_to_vec(
500 tx: &SubscribeUpdateTransaction,
501 grpc_us: i64,
502 block_us: Option<i64>,
503 filter: Option<&EventTypeFilter>,
504) -> Vec<(u64, DexEvent)> {
505 let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
506 parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
507}
508
509#[inline]
510fn parse_transaction_core(
511 tx: &SubscribeUpdateTransaction,
512 grpc_us: i64,
513 block_us: Option<i64>,
514 filter: Option<&EventTypeFilter>,
515) -> Vec<DexEvent> {
516 let Some(info) = &tx.transaction else { return Vec::new() };
517 let Some(meta) = &info.meta else { return Vec::new() };
518
519 let sig = extract_signature(&info.signature);
520 let slot = tx.slot;
521 let idx = info.index;
522
523 let log_events = parse_logs(
524 meta,
525 &info.transaction,
526 &meta.log_messages,
527 sig,
528 slot,
529 idx,
530 block_us,
531 grpc_us,
532 filter,
533 );
534 let instr_events =
535 parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
536
537 let events =
538 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
539 if let Some(filter) = filter {
540 events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
541 } else {
542 events
543 }
544}
545
546#[inline(always)]
547fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
548 try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
549}
550
551#[inline]
552fn parse_logs(
553 meta: &TransactionStatusMeta,
554 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
555 logs: &[String],
556 sig: solana_sdk::signature::Signature,
557 slot: u64,
558 tx_idx: u64,
559 block_us: Option<i64>,
560 grpc_us: i64,
561 filter: Option<&EventTypeFilter>,
562) -> Vec<DexEvent> {
563 let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
564 if m.recent_blockhash.is_empty() {
565 None
566 } else {
567 Some(m.recent_blockhash.clone())
568 }
569 });
570
571 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
572 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
573
574 let mut outer_idx: i32 = -1;
575 let mut inner_idx: i32 = -1;
576 let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
577 let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
578 let mut result = Vec::with_capacity(4);
579
580 for log in logs {
581 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
582 if depth == 1 {
583 inner_idx = -1;
584 outer_idx += 1;
585 } else {
586 inner_idx += 1;
587 }
588 if let Ok(pk) = Pubkey::from_str(pid) {
589 active_program_stack.truncate(depth.saturating_sub(1));
590 active_program_stack.push(pk);
591 invokes.entry(pk).or_default().push((outer_idx, inner_idx));
592 }
593 }
594
595 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
596 let current_program = active_program_stack.last();
597 if let Some(mut e) = crate::logs::parse_log_with_program_id(
598 log,
599 sig,
600 slot,
601 tx_idx,
602 block_us,
603 grpc_us,
604 filter,
605 has_create,
606 recent_blockhash.as_deref(),
607 current_program,
608 ) {
609 crate::core::account_dispatcher::fill_accounts_with_owned_keys(
610 &mut e,
611 meta,
612 transaction,
613 &invokes,
614 );
615 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
616 result.push(e);
617 }
618 }
619
620 if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
621 if let Ok(pk) = Pubkey::from_str(pid) {
622 if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
623 active_program_stack.truncate(pos);
624 }
625 }
626 }
627 }
628 result
629}
630
631#[inline]
632fn parse_instructions(
633 meta: &TransactionStatusMeta,
634 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
635 sig: solana_sdk::signature::Signature,
636 slot: u64,
637 tx_idx: u64,
638 block_us: Option<i64>,
639 grpc_us: i64,
640 filter: Option<&EventTypeFilter>,
641) -> Vec<DexEvent> {
642 crate::grpc::instruction_parser::parse_instructions_enhanced(
648 meta,
649 transaction,
650 sig,
651 slot,
652 tx_idx,
653 block_us,
654 grpc_us,
655 filter,
656 )
657}