1use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::{
11 build_subscribe_request, build_subscribe_request_with_event_filter,
12};
13use super::transaction_meta::try_yellowstone_signature;
14use super::types::*;
15use crate::core::{now_micros, EventMetadata}; use crate::instr::read_pubkey_fast;
17use crate::logs::timestamp_to_microseconds;
18use crate::DexEvent;
19use crossbeam_queue::ArrayQueue;
20use futures::{SinkExt, StreamExt};
21use log::error;
22use memchr::memmem;
23use once_cell::sync::Lazy;
24use solana_sdk::pubkey::Pubkey;
25use std::collections::HashMap;
26use std::str::FromStr;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29use tokio::sync::{mpsc, Mutex};
30use tokio::task::JoinHandle;
31use tokio::time::{Duration, Instant};
32use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
34use yellowstone_grpc_proto::prelude::*;
35
36static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
37 Lazy::new(|| memmem::Finder::new(b"Program data: "));
38
39#[derive(Clone)]
42pub struct YellowstoneGrpc {
43 endpoint: String,
44 token: Option<String>,
45 config: ClientConfig,
46 control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
47 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
48 subscription_lifecycle: Arc<Mutex<()>>,
49 stop_signal: Arc<Mutex<Option<Arc<AtomicBool>>>>,
50}
51
52impl YellowstoneGrpc {
53 pub fn new(
54 endpoint: String,
55 token: Option<String>,
56 ) -> Result<Self, Box<dyn std::error::Error>> {
57 crate::warmup::warmup_parser();
58 Ok(Self {
59 endpoint,
60 token,
61 config: ClientConfig::default(),
62 control_tx: Arc::new(Mutex::new(None)),
63 subscription_handle: Arc::new(Mutex::new(None)),
64 subscription_lifecycle: Arc::new(Mutex::new(())),
65 stop_signal: Arc::new(Mutex::new(None)),
66 })
67 }
68
69 pub fn new_with_config(
70 endpoint: String,
71 token: Option<String>,
72 config: ClientConfig,
73 ) -> Result<Self, Box<dyn std::error::Error>> {
74 crate::warmup::warmup_parser();
75 Ok(Self {
76 endpoint,
77 token,
78 config,
79 control_tx: Arc::new(Mutex::new(None)),
80 subscription_handle: Arc::new(Mutex::new(None)),
81 subscription_lifecycle: Arc::new(Mutex::new(())),
82 stop_signal: Arc::new(Mutex::new(None)),
83 })
84 }
85
86 pub async fn subscribe_dex_events(
88 &self,
89 transaction_filters: Vec<TransactionFilter>,
90 account_filters: Vec<AccountFilter>,
91 event_type_filter: Option<EventTypeFilter>,
92 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
93 let _lifecycle = self.subscription_lifecycle.lock().await;
94 self.stop_without_lifecycle_lock().await;
95
96 let queue = Arc::new(ArrayQueue::new(self.config.buffer_size.max(1)));
97 let queue_clone = Arc::clone(&queue);
98 let self_clone = self.clone();
99 let stop_signal = Arc::new(AtomicBool::new(false));
100 *self.stop_signal.lock().await = Some(Arc::clone(&stop_signal));
101
102 let handle = tokio::spawn(async move {
103 let mut delay = 1u64;
104 loop {
105 if stop_signal.load(Ordering::SeqCst) {
106 break;
107 }
108
109 match self_clone
110 .stream_events(
111 &transaction_filters,
112 &account_filters,
113 &event_type_filter,
114 &queue_clone,
115 )
116 .await
117 {
118 Ok(_) => delay = 1,
119 Err(e) => {
120 if stop_signal.load(Ordering::SeqCst) {
121 break;
122 }
123 error!("Grpc error: {} - retry in {}s", e, delay);
124 }
125 }
126
127 if stop_signal.load(Ordering::SeqCst) {
128 break;
129 }
130 tokio::time::sleep(Duration::from_secs(delay)).await;
131 delay = (delay * 2).min(60);
132 }
133 });
134
135 *self.subscription_handle.lock().await = Some(handle);
136 Ok(queue)
137 }
138
139 pub async fn update_subscription(
141 &self,
142 transaction_filters: Vec<TransactionFilter>,
143 account_filters: Vec<AccountFilter>,
144 ) -> Result<(), Box<dyn std::error::Error>> {
145 let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
146
147 let request = build_subscribe_request(&transaction_filters, &account_filters);
148 sender.send(request).await.map_err(|e| e.to_string())?;
149 Ok(())
150 }
151
152 pub async fn stop(&self) {
153 let _lifecycle = self.subscription_lifecycle.lock().await;
154 self.stop_without_lifecycle_lock().await;
155 }
156
157 async fn stop_without_lifecycle_lock(&self) {
158 if let Some(stop_signal) = self.stop_signal.lock().await.take() {
159 stop_signal.store(true, Ordering::SeqCst);
160 }
161 self.control_tx.lock().await.take();
162 let handle = self.subscription_handle.lock().await.take();
163 if let Some(handle) = handle {
164 handle.abort();
165 let _ = handle.await;
166 }
167 }
168
169 async fn stream_events(
172 &self,
173 tx_filters: &[TransactionFilter],
174 acc_filters: &[AccountFilter],
175 event_filter: &Option<EventTypeFilter>,
176 queue: &Arc<ArrayQueue<DexEvent>>,
177 ) -> Result<(), String> {
178 let _ = rustls::crypto::ring::default_provider().install_default();
179
180 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
182 .map_err(|e| e.to_string())?
183 .x_token(self.token.clone())
184 .map_err(|e| e.to_string())?
185 .max_decoding_message_size(1024 * 1024 * 1024);
186
187 if self.config.connection_timeout_ms > 0 {
188 builder =
189 builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
190 }
191 if self.config.enable_tls {
192 builder = builder
193 .tls_config(ClientTlsConfig::new().with_native_roots())
194 .map_err(|e| e.to_string())?;
195 }
196
197 let mut client = builder.connect().await.map_err(|e| e.to_string())?;
198 let request = build_subscribe_request_with_event_filter(
199 tx_filters,
200 acc_filters,
201 event_filter.as_ref(),
202 CommitmentLevel::Processed,
203 );
204
205 let (subscribe_tx, mut stream) =
206 client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
207
208 self.print_mode_info();
209
210 let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
212 *self.control_tx.lock().await = Some(control_tx);
213 let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
214
215 let mut slot_buffer = SlotBuffer::new();
217 let mut micro_batch = MicroBatchBuffer::new();
218 let mut last_slot = 0u64;
219
220 let order_mode = self.config.order_mode;
221 let timeout_ms = self.config.order_timeout_ms;
222 let batch_us = self.config.micro_batch_us;
223 let check_interval = Duration::from_millis(timeout_ms / 2);
224 let mut next_check = Instant::now() + check_interval;
225
226 loop {
227 self.check_timeout(
229 order_mode,
230 &mut slot_buffer,
231 &mut micro_batch,
232 queue,
233 timeout_ms,
234 batch_us,
235 &mut next_check,
236 check_interval,
237 );
238
239 tokio::select! {
240 msg = stream.next() => {
241 match msg {
242 Some(Ok(update)) => {
243 if matches!(
245 update.update_oneof.as_ref(),
246 Some(subscribe_update::UpdateOneof::Ping(_))
247 ) {
248 if let Err(e) = subscribe_tx
249 .lock()
250 .await
251 .send(SubscribeRequest {
252 ping: Some(SubscribeRequestPing { id: 1 }),
253 ..Default::default()
254 })
255 .await
256 {
257 self.control_tx.lock().await.take();
258 return Err(e.to_string());
259 }
260 continue;
261 }
262 self.handle_update(
263 update, order_mode, event_filter, queue,
264 &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
265 );
266 }
267 Some(Err(e)) => {
268 error!("Grpc Stream error: {:?}", e);
269 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
270 self.control_tx.lock().await.take();
271 return Err(e.to_string());
272 }
273 None => {
274 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
275 self.control_tx.lock().await.take();
276 return Ok(());
277 }
278 }
279 }
280 Some(req) = control_rx.recv() => {
281 if let Err(e) = subscribe_tx.lock().await.send(req).await {
282 self.control_tx.lock().await.take();
283 return Err(e.to_string());
284 }
285 }
286 }
287 }
288 }
289
290 fn print_mode_info(&self) {
291 match self.config.order_mode {
292 OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
293 OrderMode::Ordered => {
294 println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
295 }
296 OrderMode::StreamingOrdered => {
297 println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
298 }
299 OrderMode::MicroBatch => {
300 println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
301 }
302 }
303 }
304
305 #[inline]
306 fn check_timeout(
307 &self,
308 mode: OrderMode,
309 slot_buf: &mut SlotBuffer,
310 micro_buf: &mut MicroBatchBuffer,
311 queue: &Arc<ArrayQueue<DexEvent>>,
312 timeout_ms: u64,
313 batch_us: u64,
314 next_check: &mut Instant,
315 interval: Duration,
316 ) {
317 if Instant::now() < *next_check {
318 return;
319 }
320 *next_check = Instant::now() + interval;
321
322 match mode {
323 OrderMode::Ordered => {
324 if slot_buf.should_timeout(timeout_ms) {
325 for e in slot_buf.flush_all() {
326 let _ = queue.push(e);
327 }
328 }
329 }
330 OrderMode::StreamingOrdered => {
331 if slot_buf.should_timeout(timeout_ms) {
332 for e in slot_buf.flush_streaming_timeout() {
333 let _ = queue.push(e);
334 }
335 }
336 }
337 OrderMode::MicroBatch => {
338 let now_us = get_timestamp_us();
340 if micro_buf.should_flush(now_us, batch_us) {
341 for e in micro_buf.flush() {
342 let _ = queue.push(e);
343 }
344 }
345 }
346 OrderMode::Unordered => {}
347 }
348 }
349
350 fn flush_on_disconnect(
351 &self,
352 mode: OrderMode,
353 buffer: &mut SlotBuffer,
354 queue: &Arc<ArrayQueue<DexEvent>>,
355 ) {
356 if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
357 let events = match mode {
358 OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
359 _ => buffer.flush_all(),
360 };
361 for e in events {
362 let _ = queue.push(e);
363 }
364 }
365 }
366
367 #[inline]
368 fn handle_update(
369 &self,
370 update_msg: SubscribeUpdate,
371 mode: OrderMode,
372 filter: &Option<EventTypeFilter>,
373 queue: &Arc<ArrayQueue<DexEvent>>,
374 slot_buf: &mut SlotBuffer,
375 micro_buf: &mut MicroBatchBuffer,
376 last_slot: &mut u64,
377 batch_us: u64,
378 ) {
379 let created_at = update_msg.created_at.unwrap_or_default();
380 let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
381 let grpc_recv_us = get_timestamp_us();
382
383 let Some(update) = update_msg.update_oneof else { return };
384
385 match update {
386 subscribe_update::UpdateOneof::Transaction(tx) => {
387 self.handle_transaction(
388 tx,
389 mode,
390 filter,
391 queue,
392 slot_buf,
393 micro_buf,
394 last_slot,
395 batch_us,
396 grpc_recv_us,
397 block_time_us,
398 );
399 }
400 subscribe_update::UpdateOneof::Account(acc) => {
401 Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
402 }
403 subscribe_update::UpdateOneof::BlockMeta(block_meta) => {
404 Self::handle_block_meta(block_meta, filter, queue, grpc_recv_us, block_time_us);
405 }
406 _ => {}
407 }
408 }
409
410 #[inline]
411 fn handle_transaction(
412 &self,
413 tx: SubscribeUpdateTransaction,
414 mode: OrderMode,
415 filter: &Option<EventTypeFilter>,
416 queue: &Arc<ArrayQueue<DexEvent>>,
417 slot_buf: &mut SlotBuffer,
418 micro_buf: &mut MicroBatchBuffer,
419 last_slot: &mut u64,
420 batch_us: u64,
421 grpc_us: i64,
422 block_us: i64,
423 ) {
424 let slot = tx.slot;
425
426 match mode {
427 OrderMode::Unordered => {
428 for e in crate::grpc::parse_subscribe_update_transaction_low_latency(
429 &tx,
430 grpc_us,
431 Some(block_us),
432 filter.as_ref(),
433 ) {
434 let _ = queue.push(e);
435 }
436 }
437 OrderMode::Ordered => {
438 if slot > *last_slot && *last_slot > 0 {
439 for e in slot_buf.flush_before(slot) {
440 let _ = queue.push(e);
441 }
442 }
443 *last_slot = slot;
444 for (idx, e) in
445 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
446 {
447 slot_buf.push(slot, idx, e);
448 }
449 }
450 OrderMode::StreamingOrdered => {
451 for (idx, e) in
452 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
453 {
454 for evt in slot_buf.push_streaming(slot, idx, e) {
455 let _ = queue.push(evt);
456 }
457 }
458 }
459 OrderMode::MicroBatch => {
460 for (idx, e) in
461 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
462 {
463 if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
464 for evt in micro_buf.flush() {
465 let _ = queue.push(evt);
466 }
467 }
468 }
469 }
470 }
471 }
472
473 #[inline]
474 fn handle_account(
475 acc: SubscribeUpdateAccount,
476 filter: &Option<EventTypeFilter>,
477 queue: &Arc<ArrayQueue<DexEvent>>,
478 grpc_us: i64,
479 block_us: i64,
480 ) {
481 let Some(info) = acc.account else { return };
482 let data = crate::accounts::AccountData {
483 pubkey: read_pubkey_fast(&info.pubkey),
484 executable: info.executable,
485 lamports: info.lamports,
486 owner: read_pubkey_fast(&info.owner),
487 rent_epoch: info.rent_epoch,
488 data: info.data,
489 };
490 let meta = EventMetadata {
491 signature: Default::default(),
492 slot: acc.slot,
493 tx_index: 0,
494 block_time_us: block_us,
495 grpc_recv_us: grpc_us,
496 recent_blockhash: None,
497 };
498 if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
499 let _ = queue.push(e);
500 }
501 }
502
503 #[inline]
504 fn handle_block_meta(
505 block_meta: SubscribeUpdateBlockMeta,
506 filter: &Option<EventTypeFilter>,
507 queue: &Arc<ArrayQueue<DexEvent>>,
508 grpc_us: i64,
509 fallback_block_us: i64,
510 ) {
511 let block_time_us = block_meta
512 .block_time
513 .as_ref()
514 .map(|t| t.timestamp.saturating_mul(1_000_000))
515 .unwrap_or(fallback_block_us);
516 let event = DexEvent::BlockMeta(crate::core::events::BlockMetaEvent {
517 metadata: EventMetadata {
518 signature: Default::default(),
519 slot: block_meta.slot,
520 tx_index: 0,
521 block_time_us,
522 grpc_recv_us: grpc_us,
523 recent_blockhash: (!block_meta.blockhash.is_empty())
524 .then_some(block_meta.blockhash),
525 },
526 });
527 if filter.as_ref().map(|f| f.should_include_dex_event(&event)).unwrap_or(true) {
528 let _ = queue.push(event);
529 }
530 }
531}
532
533#[inline(always)]
544fn get_timestamp_us() -> i64 {
545 now_micros()
546}
547
548#[inline]
551fn parse_transaction_to_vec(
552 tx: &SubscribeUpdateTransaction,
553 grpc_us: i64,
554 block_us: Option<i64>,
555 filter: Option<&EventTypeFilter>,
556) -> Vec<(u64, DexEvent)> {
557 let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
558 parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
559}
560
561#[inline]
562fn parse_transaction_core(
563 tx: &SubscribeUpdateTransaction,
564 grpc_us: i64,
565 block_us: Option<i64>,
566 filter: Option<&EventTypeFilter>,
567) -> Vec<DexEvent> {
568 let Some(info) = &tx.transaction else { return Vec::new() };
569 let Some(meta) = &info.meta else { return Vec::new() };
570
571 let sig = extract_signature(&info.signature);
572 let slot = tx.slot;
573 let idx = info.index;
574
575 let log_events = parse_logs(
576 meta,
577 &info.transaction,
578 &meta.log_messages,
579 sig,
580 slot,
581 idx,
582 block_us,
583 grpc_us,
584 filter,
585 );
586 let instr_events =
587 parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
588
589 let events =
590 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
591 if let Some(filter) = filter {
592 events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
593 } else {
594 events
595 }
596}
597
598#[inline(always)]
599fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
600 try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
601}
602
603#[inline]
604fn parse_logs(
605 meta: &TransactionStatusMeta,
606 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
607 logs: &[String],
608 sig: solana_sdk::signature::Signature,
609 slot: u64,
610 tx_idx: u64,
611 block_us: Option<i64>,
612 grpc_us: i64,
613 filter: Option<&EventTypeFilter>,
614) -> Vec<DexEvent> {
615 let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
616 if m.recent_blockhash.is_empty() {
617 None
618 } else {
619 Some(m.recent_blockhash.clone())
620 }
621 });
622
623 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
624 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
625
626 let mut outer_idx: i32 = -1;
627 let mut inner_idx: i32 = -1;
628 let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
629 let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
630 let mut result = Vec::with_capacity(4);
631
632 for log in logs {
633 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
634 if depth == 1 {
635 inner_idx = -1;
636 outer_idx += 1;
637 } else {
638 inner_idx += 1;
639 }
640 if let Ok(pk) = Pubkey::from_str(pid) {
641 active_program_stack.truncate(depth.saturating_sub(1));
642 active_program_stack.push(pk);
643 invokes.entry(pk).or_default().push((outer_idx, inner_idx));
644 }
645 }
646
647 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
648 let current_program = active_program_stack.last();
649 if let Some(mut e) = crate::logs::parse_log_with_program_id(
650 log,
651 sig,
652 slot,
653 tx_idx,
654 block_us,
655 grpc_us,
656 filter,
657 has_create,
658 recent_blockhash.as_deref(),
659 current_program,
660 ) {
661 crate::core::account_dispatcher::fill_accounts_with_owned_keys(
662 &mut e,
663 meta,
664 transaction,
665 &invokes,
666 );
667 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
668 result.push(e);
669 }
670 }
671
672 if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
673 if let Ok(pk) = Pubkey::from_str(pid) {
674 if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
675 active_program_stack.truncate(pos);
676 }
677 }
678 }
679 }
680 result
681}
682
683#[inline]
684fn parse_instructions(
685 meta: &TransactionStatusMeta,
686 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
687 sig: solana_sdk::signature::Signature,
688 slot: u64,
689 tx_idx: u64,
690 block_us: Option<i64>,
691 grpc_us: i64,
692 filter: Option<&EventTypeFilter>,
693) -> Vec<DexEvent> {
694 crate::grpc::instruction_parser::parse_instructions_enhanced(
700 meta,
701 transaction,
702 sig,
703 slot,
704 tx_idx,
705 block_us,
706 grpc_us,
707 filter,
708 )
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714
715 #[tokio::test]
716 async fn stop_clears_subscription_state_and_aborts_handle() {
717 let grpc = YellowstoneGrpc::new("http://127.0.0.1:1".to_string(), None).unwrap();
718 let (tx, _rx) = mpsc::channel::<SubscribeRequest>(1);
719 let handle = tokio::spawn(async {
720 std::future::pending::<()>().await;
721 });
722
723 let stop_signal = Arc::new(AtomicBool::new(false));
724 *grpc.control_tx.lock().await = Some(tx);
725 *grpc.subscription_handle.lock().await = Some(handle);
726 *grpc.stop_signal.lock().await = Some(Arc::clone(&stop_signal));
727
728 grpc.stop().await;
729
730 assert!(stop_signal.load(Ordering::SeqCst));
731 assert!(grpc.stop_signal.lock().await.is_none());
732 assert!(grpc.control_tx.lock().await.is_none());
733 assert!(grpc.subscription_handle.lock().await.is_none());
734 }
735}