1use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::build_subscribe_request;
11use super::transaction_meta::try_yellowstone_signature;
12use super::types::*;
13use crate::core::{now_micros, EventMetadata}; use crate::instr::read_pubkey_fast;
15use crate::logs::timestamp_to_microseconds;
16use crate::DexEvent;
17use crossbeam_queue::ArrayQueue;
18use futures::{SinkExt, StreamExt};
19use log::error;
20use memchr::memmem;
21use once_cell::sync::Lazy;
22use solana_sdk::pubkey::Pubkey;
23use std::collections::HashMap;
24use std::str::FromStr;
25use std::sync::Arc;
26use tokio::sync::{mpsc, Mutex};
27use tokio::time::{Duration, Instant};
28use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
30use yellowstone_grpc_proto::prelude::*;
31
32static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
33 Lazy::new(|| memmem::Finder::new(b"Program data: "));
34
35#[derive(Clone)]
38pub struct YellowstoneGrpc {
39 endpoint: String,
40 token: Option<String>,
41 config: ClientConfig,
42 control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
43}
44
45impl YellowstoneGrpc {
46 pub fn new(
47 endpoint: String,
48 token: Option<String>,
49 ) -> Result<Self, Box<dyn std::error::Error>> {
50 crate::warmup::warmup_parser();
51 Ok(Self {
52 endpoint,
53 token,
54 config: ClientConfig::default(),
55 control_tx: Arc::new(Mutex::new(None)),
56 })
57 }
58
59 pub fn new_with_config(
60 endpoint: String,
61 token: Option<String>,
62 config: ClientConfig,
63 ) -> Result<Self, Box<dyn std::error::Error>> {
64 crate::warmup::warmup_parser();
65 Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
66 }
67
68 pub async fn subscribe_dex_events(
70 &self,
71 transaction_filters: Vec<TransactionFilter>,
72 account_filters: Vec<AccountFilter>,
73 event_type_filter: Option<EventTypeFilter>,
74 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
75 let queue = Arc::new(ArrayQueue::new(self.config.buffer_size.max(1)));
76 let queue_clone = Arc::clone(&queue);
77 let self_clone = self.clone();
78
79 tokio::spawn(async move {
80 let mut delay = 1u64;
81 loop {
82 match self_clone
83 .stream_events(
84 &transaction_filters,
85 &account_filters,
86 &event_type_filter,
87 &queue_clone,
88 )
89 .await
90 {
91 Ok(_) => delay = 1,
92 Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
93 }
94 tokio::time::sleep(Duration::from_secs(delay)).await;
95 delay = (delay * 2).min(60);
96 }
97 });
98
99 Ok(queue)
100 }
101
102 pub async fn update_subscription(
104 &self,
105 transaction_filters: Vec<TransactionFilter>,
106 account_filters: Vec<AccountFilter>,
107 ) -> Result<(), Box<dyn std::error::Error>> {
108 let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
109
110 let request = build_subscribe_request(&transaction_filters, &account_filters);
111 sender.send(request).await.map_err(|e| e.to_string())?;
112 Ok(())
113 }
114
115 pub async fn stop(&self) {
116 println!("🛑 Stopping gRPC subscription...");
117 }
118
119 async fn stream_events(
122 &self,
123 tx_filters: &[TransactionFilter],
124 acc_filters: &[AccountFilter],
125 event_filter: &Option<EventTypeFilter>,
126 queue: &Arc<ArrayQueue<DexEvent>>,
127 ) -> Result<(), String> {
128 let _ = rustls::crypto::ring::default_provider().install_default();
129
130 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
132 .map_err(|e| e.to_string())?
133 .x_token(self.token.clone())
134 .map_err(|e| e.to_string())?
135 .max_decoding_message_size(1024 * 1024 * 1024);
136
137 if self.config.connection_timeout_ms > 0 {
138 builder =
139 builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
140 }
141 if self.config.enable_tls {
142 builder = builder
143 .tls_config(ClientTlsConfig::new().with_native_roots())
144 .map_err(|e| e.to_string())?;
145 }
146
147 let mut client = builder.connect().await.map_err(|e| e.to_string())?;
148 let request = build_subscribe_request(tx_filters, acc_filters);
149
150 let (subscribe_tx, mut stream) =
151 client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
152
153 self.print_mode_info();
154
155 let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
157 *self.control_tx.lock().await = Some(control_tx);
158 let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
159
160 let mut slot_buffer = SlotBuffer::new();
162 let mut micro_batch = MicroBatchBuffer::new();
163 let mut last_slot = 0u64;
164
165 let order_mode = self.config.order_mode;
166 let timeout_ms = self.config.order_timeout_ms;
167 let batch_us = self.config.micro_batch_us;
168 let check_interval = Duration::from_millis(timeout_ms / 2);
169 let mut next_check = Instant::now() + check_interval;
170
171 loop {
172 self.check_timeout(
174 order_mode,
175 &mut slot_buffer,
176 &mut micro_batch,
177 queue,
178 timeout_ms,
179 batch_us,
180 &mut next_check,
181 check_interval,
182 );
183
184 tokio::select! {
185 msg = stream.next() => {
186 match msg {
187 Some(Ok(update)) => {
188 if matches!(
190 update.update_oneof.as_ref(),
191 Some(subscribe_update::UpdateOneof::Ping(_))
192 ) {
193 if let Err(e) = subscribe_tx
194 .lock()
195 .await
196 .send(SubscribeRequest {
197 ping: Some(SubscribeRequestPing { id: 1 }),
198 ..Default::default()
199 })
200 .await
201 {
202 return Err(e.to_string());
203 }
204 continue;
205 }
206 self.handle_update(
207 update, order_mode, event_filter, queue,
208 &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
209 );
210 }
211 Some(Err(e)) => {
212 error!("Stream error: {:?}", e);
213 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
214 return Err(e.to_string());
215 }
216 None => {
217 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
218 return Ok(());
219 }
220 }
221 }
222 Some(req) = control_rx.recv() => {
223 if let Err(e) = subscribe_tx.lock().await.send(req).await {
224 return Err(e.to_string());
225 }
226 }
227 }
228 }
229 }
230
231 fn print_mode_info(&self) {
232 match self.config.order_mode {
233 OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
234 OrderMode::Ordered => {
235 println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
236 }
237 OrderMode::StreamingOrdered => {
238 println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
239 }
240 OrderMode::MicroBatch => {
241 println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
242 }
243 }
244 }
245
246 #[inline]
247 fn check_timeout(
248 &self,
249 mode: OrderMode,
250 slot_buf: &mut SlotBuffer,
251 micro_buf: &mut MicroBatchBuffer,
252 queue: &Arc<ArrayQueue<DexEvent>>,
253 timeout_ms: u64,
254 batch_us: u64,
255 next_check: &mut Instant,
256 interval: Duration,
257 ) {
258 if Instant::now() < *next_check {
259 return;
260 }
261 *next_check = Instant::now() + interval;
262
263 match mode {
264 OrderMode::Ordered => {
265 if slot_buf.should_timeout(timeout_ms) {
266 for e in slot_buf.flush_all() {
267 let _ = queue.push(e);
268 }
269 }
270 }
271 OrderMode::StreamingOrdered => {
272 if slot_buf.should_timeout(timeout_ms) {
273 for e in slot_buf.flush_streaming_timeout() {
274 let _ = queue.push(e);
275 }
276 }
277 }
278 OrderMode::MicroBatch => {
279 let now_us = get_timestamp_us();
281 if micro_buf.should_flush(now_us, batch_us) {
282 for e in micro_buf.flush() {
283 let _ = queue.push(e);
284 }
285 }
286 }
287 OrderMode::Unordered => {}
288 }
289 }
290
291 fn flush_on_disconnect(
292 &self,
293 mode: OrderMode,
294 buffer: &mut SlotBuffer,
295 queue: &Arc<ArrayQueue<DexEvent>>,
296 ) {
297 if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
298 let events = match mode {
299 OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
300 _ => buffer.flush_all(),
301 };
302 for e in events {
303 let _ = queue.push(e);
304 }
305 }
306 }
307
308 #[inline]
309 fn handle_update(
310 &self,
311 update_msg: SubscribeUpdate,
312 mode: OrderMode,
313 filter: &Option<EventTypeFilter>,
314 queue: &Arc<ArrayQueue<DexEvent>>,
315 slot_buf: &mut SlotBuffer,
316 micro_buf: &mut MicroBatchBuffer,
317 last_slot: &mut u64,
318 batch_us: u64,
319 ) {
320 let created_at = update_msg.created_at.unwrap_or_default();
321 let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
322 let grpc_recv_us = get_timestamp_us();
323
324 let Some(update) = update_msg.update_oneof else { return };
325
326 match update {
327 subscribe_update::UpdateOneof::Transaction(tx) => {
328 self.handle_transaction(
329 tx,
330 mode,
331 filter,
332 queue,
333 slot_buf,
334 micro_buf,
335 last_slot,
336 batch_us,
337 grpc_recv_us,
338 block_time_us,
339 );
340 }
341 subscribe_update::UpdateOneof::Account(acc) => {
342 Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
343 }
344 _ => {}
345 }
346 }
347
348 #[inline]
349 fn handle_transaction(
350 &self,
351 tx: SubscribeUpdateTransaction,
352 mode: OrderMode,
353 filter: &Option<EventTypeFilter>,
354 queue: &Arc<ArrayQueue<DexEvent>>,
355 slot_buf: &mut SlotBuffer,
356 micro_buf: &mut MicroBatchBuffer,
357 last_slot: &mut u64,
358 batch_us: u64,
359 grpc_us: i64,
360 block_us: i64,
361 ) {
362 let slot = tx.slot;
363
364 match mode {
365 OrderMode::Unordered => {
366 for e in crate::grpc::parse_subscribe_update_transaction_low_latency(
367 &tx,
368 grpc_us,
369 Some(block_us),
370 filter.as_ref(),
371 ) {
372 let _ = queue.push(e);
373 }
374 }
375 OrderMode::Ordered => {
376 if slot > *last_slot && *last_slot > 0 {
377 for e in slot_buf.flush_before(slot) {
378 let _ = queue.push(e);
379 }
380 }
381 *last_slot = slot;
382 for (idx, e) in
383 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
384 {
385 slot_buf.push(slot, idx, e);
386 }
387 }
388 OrderMode::StreamingOrdered => {
389 for (idx, e) in
390 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
391 {
392 for evt in slot_buf.push_streaming(slot, idx, e) {
393 let _ = queue.push(evt);
394 }
395 }
396 }
397 OrderMode::MicroBatch => {
398 for (idx, e) in
399 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
400 {
401 if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
402 for evt in micro_buf.flush() {
403 let _ = queue.push(evt);
404 }
405 }
406 }
407 }
408 }
409 }
410
411 #[inline]
412 fn handle_account(
413 acc: SubscribeUpdateAccount,
414 filter: &Option<EventTypeFilter>,
415 queue: &Arc<ArrayQueue<DexEvent>>,
416 grpc_us: i64,
417 block_us: i64,
418 ) {
419 let Some(info) = acc.account else { return };
420 let data = crate::accounts::AccountData {
421 pubkey: read_pubkey_fast(&info.pubkey),
422 executable: info.executable,
423 lamports: info.lamports,
424 owner: read_pubkey_fast(&info.owner),
425 rent_epoch: info.rent_epoch,
426 data: info.data,
427 };
428 let meta = EventMetadata {
429 signature: Default::default(),
430 slot: acc.slot,
431 tx_index: 0,
432 block_time_us: block_us,
433 grpc_recv_us: grpc_us,
434 recent_blockhash: None,
435 };
436 if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
437 let _ = queue.push(e);
438 }
439 }
440}
441
442#[inline(always)]
453fn get_timestamp_us() -> i64 {
454 now_micros()
455}
456
457#[inline]
460fn parse_transaction_to_vec(
461 tx: &SubscribeUpdateTransaction,
462 grpc_us: i64,
463 block_us: Option<i64>,
464 filter: Option<&EventTypeFilter>,
465) -> Vec<(u64, DexEvent)> {
466 let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
467 parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
468}
469
470#[inline]
471fn parse_transaction_core(
472 tx: &SubscribeUpdateTransaction,
473 grpc_us: i64,
474 block_us: Option<i64>,
475 filter: Option<&EventTypeFilter>,
476) -> Vec<DexEvent> {
477 let Some(info) = &tx.transaction else { return Vec::new() };
478 let Some(meta) = &info.meta else { return Vec::new() };
479
480 let sig = extract_signature(&info.signature);
481 let slot = tx.slot;
482 let idx = info.index;
483
484 let log_events = parse_logs(
485 meta,
486 &info.transaction,
487 &meta.log_messages,
488 sig,
489 slot,
490 idx,
491 block_us,
492 grpc_us,
493 filter,
494 );
495 let instr_events =
496 parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
497
498 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
499}
500
501#[inline(always)]
502fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
503 try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
504}
505
506#[inline]
507fn parse_logs(
508 meta: &TransactionStatusMeta,
509 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
510 logs: &[String],
511 sig: solana_sdk::signature::Signature,
512 slot: u64,
513 tx_idx: u64,
514 block_us: Option<i64>,
515 grpc_us: i64,
516 filter: Option<&EventTypeFilter>,
517) -> Vec<DexEvent> {
518 let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
519 if m.recent_blockhash.is_empty() {
520 None
521 } else {
522 Some(m.recent_blockhash.clone())
523 }
524 });
525
526 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
527 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
528
529 let mut outer_idx: i32 = -1;
530 let mut inner_idx: i32 = -1;
531 let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
532 let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
533 let mut result = Vec::with_capacity(4);
534
535 for log in logs {
536 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
537 if depth == 1 {
538 inner_idx = -1;
539 outer_idx += 1;
540 } else {
541 inner_idx += 1;
542 }
543 if let Ok(pk) = Pubkey::from_str(pid) {
544 active_program_stack.truncate(depth.saturating_sub(1));
545 active_program_stack.push(pk);
546 invokes.entry(pk).or_default().push((outer_idx, inner_idx));
547 }
548 }
549
550 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
551 let current_program = active_program_stack.last();
552 if let Some(mut e) = crate::logs::parse_log_with_program_id(
553 log,
554 sig,
555 slot,
556 tx_idx,
557 block_us,
558 grpc_us,
559 filter,
560 has_create,
561 recent_blockhash.as_deref(),
562 current_program,
563 ) {
564 crate::core::account_dispatcher::fill_accounts_with_owned_keys(
565 &mut e,
566 meta,
567 transaction,
568 &invokes,
569 );
570 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
571 result.push(e);
572 }
573 }
574
575 if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
576 if let Ok(pk) = Pubkey::from_str(pid) {
577 if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
578 active_program_stack.truncate(pos);
579 }
580 }
581 }
582 }
583 result
584}
585
586#[inline]
587fn parse_instructions(
588 meta: &TransactionStatusMeta,
589 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
590 sig: solana_sdk::signature::Signature,
591 slot: u64,
592 tx_idx: u64,
593 block_us: Option<i64>,
594 grpc_us: i64,
595 filter: Option<&EventTypeFilter>,
596) -> Vec<DexEvent> {
597 crate::grpc::instruction_parser::parse_instructions_enhanced(
603 meta,
604 transaction,
605 sig,
606 slot,
607 tx_idx,
608 block_us,
609 grpc_us,
610 filter,
611 )
612}