1use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::build_subscribe_request;
11use super::transaction_meta::try_yellowstone_signature;
12use super::types::*;
13use crate::core::{now_micros, EventMetadata}; use crate::instr::read_pubkey_fast;
15use crate::logs::timestamp_to_microseconds;
16use crate::DexEvent;
17use crossbeam_queue::ArrayQueue;
18use solana_sdk::pubkey::Pubkey;
19use std::str::FromStr;
20use futures::{SinkExt, StreamExt};
21use log::error;
22use memchr::memmem;
23use once_cell::sync::Lazy;
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::{mpsc, Mutex};
27use tokio::time::{Duration, Instant};
28use yellowstone_grpc_client::{GeyserGrpcClient, ClientTlsConfig};
30use yellowstone_grpc_proto::prelude::*;
31
32static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
33 Lazy::new(|| memmem::Finder::new(b"Program data: "));
34
35#[derive(Clone)]
38pub struct YellowstoneGrpc {
39 endpoint: String,
40 token: Option<String>,
41 config: ClientConfig,
42 control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
43}
44
45impl YellowstoneGrpc {
46 pub fn new(
47 endpoint: String,
48 token: Option<String>,
49 ) -> Result<Self, Box<dyn std::error::Error>> {
50 crate::warmup::warmup_parser();
51 Ok(Self {
52 endpoint,
53 token,
54 config: ClientConfig::default(),
55 control_tx: Arc::new(Mutex::new(None)),
56 })
57 }
58
59 pub fn new_with_config(
60 endpoint: String,
61 token: Option<String>,
62 config: ClientConfig,
63 ) -> Result<Self, Box<dyn std::error::Error>> {
64 crate::warmup::warmup_parser();
65 Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
66 }
67
68 pub async fn subscribe_dex_events(
70 &self,
71 transaction_filters: Vec<TransactionFilter>,
72 account_filters: Vec<AccountFilter>,
73 event_type_filter: Option<EventTypeFilter>,
74 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
75 let queue = Arc::new(ArrayQueue::new(100_000));
76 let queue_clone = Arc::clone(&queue);
77 let self_clone = self.clone();
78
79 tokio::spawn(async move {
80 let mut delay = 1u64;
81 loop {
82 match self_clone
83 .stream_events(
84 &transaction_filters,
85 &account_filters,
86 &event_type_filter,
87 &queue_clone,
88 )
89 .await
90 {
91 Ok(_) => delay = 1,
92 Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
93 }
94 tokio::time::sleep(Duration::from_secs(delay)).await;
95 delay = (delay * 2).min(60);
96 }
97 });
98
99 Ok(queue)
100 }
101
102 pub async fn update_subscription(
104 &self,
105 transaction_filters: Vec<TransactionFilter>,
106 account_filters: Vec<AccountFilter>,
107 ) -> Result<(), Box<dyn std::error::Error>> {
108 let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
109
110 let request = build_subscribe_request(&transaction_filters, &account_filters);
111 sender.send(request).await.map_err(|e| e.to_string())?;
112 Ok(())
113 }
114
115 pub async fn stop(&self) {
116 println!("🛑 Stopping gRPC subscription...");
117 }
118
119 async fn stream_events(
122 &self,
123 tx_filters: &[TransactionFilter],
124 acc_filters: &[AccountFilter],
125 event_filter: &Option<EventTypeFilter>,
126 queue: &Arc<ArrayQueue<DexEvent>>,
127 ) -> Result<(), String> {
128 let _ = rustls::crypto::ring::default_provider().install_default();
129
130 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
132 .map_err(|e| e.to_string())?
133 .x_token(self.token.clone())
134 .map_err(|e| e.to_string())?
135 .max_decoding_message_size(1024 * 1024 * 1024);
136
137 if self.config.connection_timeout_ms > 0 {
138 builder =
139 builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
140 }
141 if self.config.enable_tls {
142 builder = builder
143 .tls_config(ClientTlsConfig::new().with_native_roots())
144 .map_err(|e| e.to_string())?;
145 }
146
147 let mut client = builder.connect().await.map_err(|e| e.to_string())?;
148 let request = build_subscribe_request(tx_filters, acc_filters);
149
150 let (subscribe_tx, mut stream) =
151 client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
152
153 self.print_mode_info();
154
155 let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
157 *self.control_tx.lock().await = Some(control_tx);
158 let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
159
160 let mut slot_buffer = SlotBuffer::new();
162 let mut micro_batch = MicroBatchBuffer::new();
163 let mut last_slot = 0u64;
164
165 let order_mode = self.config.order_mode;
166 let timeout_ms = self.config.order_timeout_ms;
167 let batch_us = self.config.micro_batch_us;
168 let check_interval = Duration::from_millis(timeout_ms / 2);
169 let mut next_check = Instant::now() + check_interval;
170
171 loop {
172 self.check_timeout(
174 order_mode,
175 &mut slot_buffer,
176 &mut micro_batch,
177 queue,
178 timeout_ms,
179 batch_us,
180 &mut next_check,
181 check_interval,
182 );
183
184 tokio::select! {
185 msg = stream.next() => {
186 match msg {
187 Some(Ok(update)) => {
188 if matches!(
190 update.update_oneof.as_ref(),
191 Some(subscribe_update::UpdateOneof::Ping(_))
192 ) {
193 if let Err(e) = subscribe_tx
194 .lock()
195 .await
196 .send(SubscribeRequest {
197 ping: Some(SubscribeRequestPing { id: 1 }),
198 ..Default::default()
199 })
200 .await
201 {
202 return Err(e.to_string());
203 }
204 continue;
205 }
206 self.handle_update(
207 update, order_mode, event_filter, queue,
208 &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
209 );
210 }
211 Some(Err(e)) => {
212 error!("Stream error: {:?}", e);
213 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
214 return Err(e.to_string());
215 }
216 None => {
217 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
218 return Ok(());
219 }
220 }
221 }
222 Some(req) = control_rx.recv() => {
223 if let Err(e) = subscribe_tx.lock().await.send(req).await {
224 return Err(e.to_string());
225 }
226 }
227 }
228 }
229 }
230
231 fn print_mode_info(&self) {
232 match self.config.order_mode {
233 OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
234 OrderMode::Ordered => {
235 println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
236 }
237 OrderMode::StreamingOrdered => {
238 println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
239 }
240 OrderMode::MicroBatch => {
241 println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
242 }
243 }
244 }
245
246 #[inline]
247 fn check_timeout(
248 &self,
249 mode: OrderMode,
250 slot_buf: &mut SlotBuffer,
251 micro_buf: &mut MicroBatchBuffer,
252 queue: &Arc<ArrayQueue<DexEvent>>,
253 timeout_ms: u64,
254 batch_us: u64,
255 next_check: &mut Instant,
256 interval: Duration,
257 ) {
258 if Instant::now() < *next_check {
259 return;
260 }
261 *next_check = Instant::now() + interval;
262
263 match mode {
264 OrderMode::Ordered => {
265 if slot_buf.should_timeout(timeout_ms) {
266 for e in slot_buf.flush_all() {
267 let _ = queue.push(e);
268 }
269 }
270 }
271 OrderMode::StreamingOrdered => {
272 if slot_buf.should_timeout(timeout_ms) {
273 for e in slot_buf.flush_streaming_timeout() {
274 let _ = queue.push(e);
275 }
276 }
277 }
278 OrderMode::MicroBatch => {
279 let now_us = get_timestamp_us();
281 if micro_buf.should_flush(now_us, batch_us) {
282 for e in micro_buf.flush() {
283 let _ = queue.push(e);
284 }
285 }
286 }
287 OrderMode::Unordered => {}
288 }
289 }
290
291 fn flush_on_disconnect(
292 &self,
293 mode: OrderMode,
294 buffer: &mut SlotBuffer,
295 queue: &Arc<ArrayQueue<DexEvent>>,
296 ) {
297 if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
298 let events = match mode {
299 OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
300 _ => buffer.flush_all(),
301 };
302 for e in events {
303 let _ = queue.push(e);
304 }
305 }
306 }
307
308 #[inline]
309 fn handle_update(
310 &self,
311 update_msg: SubscribeUpdate,
312 mode: OrderMode,
313 filter: &Option<EventTypeFilter>,
314 queue: &Arc<ArrayQueue<DexEvent>>,
315 slot_buf: &mut SlotBuffer,
316 micro_buf: &mut MicroBatchBuffer,
317 last_slot: &mut u64,
318 batch_us: u64,
319 ) {
320 let created_at = update_msg.created_at.unwrap_or_default();
321 let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
322 let grpc_recv_us = get_timestamp_us();
323
324 let Some(update) = update_msg.update_oneof else { return };
325
326 match update {
327 subscribe_update::UpdateOneof::Transaction(tx) => {
328 self.handle_transaction(
329 tx,
330 mode,
331 filter,
332 queue,
333 slot_buf,
334 micro_buf,
335 last_slot,
336 batch_us,
337 grpc_recv_us,
338 block_time_us,
339 );
340 }
341 subscribe_update::UpdateOneof::Account(acc) => {
342 Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
343 }
344 _ => {}
345 }
346 }
347
348 #[inline]
349 fn handle_transaction(
350 &self,
351 tx: SubscribeUpdateTransaction,
352 mode: OrderMode,
353 filter: &Option<EventTypeFilter>,
354 queue: &Arc<ArrayQueue<DexEvent>>,
355 slot_buf: &mut SlotBuffer,
356 micro_buf: &mut MicroBatchBuffer,
357 last_slot: &mut u64,
358 batch_us: u64,
359 grpc_us: i64,
360 block_us: i64,
361 ) {
362 let slot = tx.slot;
363
364 match mode {
365 OrderMode::Unordered => {
366 for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
367 let _ = queue.push(e);
368 }
369 }
370 OrderMode::Ordered => {
371 if slot > *last_slot && *last_slot > 0 {
372 for e in slot_buf.flush_before(slot) {
373 let _ = queue.push(e);
374 }
375 }
376 *last_slot = slot;
377 for (idx, e) in
378 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
379 {
380 slot_buf.push(slot, idx, e);
381 }
382 }
383 OrderMode::StreamingOrdered => {
384 for (idx, e) in
385 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
386 {
387 for evt in slot_buf.push_streaming(slot, idx, e) {
388 let _ = queue.push(evt);
389 }
390 }
391 }
392 OrderMode::MicroBatch => {
393 for (idx, e) in
394 parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
395 {
396 if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
397 for evt in micro_buf.flush() {
398 let _ = queue.push(evt);
399 }
400 }
401 }
402 }
403 }
404 }
405
406 #[inline]
407 fn handle_account(
408 acc: SubscribeUpdateAccount,
409 filter: &Option<EventTypeFilter>,
410 queue: &Arc<ArrayQueue<DexEvent>>,
411 grpc_us: i64,
412 block_us: i64,
413 ) {
414 let Some(info) = acc.account else { return };
415 let data = crate::accounts::AccountData {
416 pubkey: read_pubkey_fast(&info.pubkey),
417 executable: info.executable,
418 lamports: info.lamports,
419 owner: read_pubkey_fast(&info.owner),
420 rent_epoch: info.rent_epoch,
421 data: info.data,
422 };
423 let meta = EventMetadata {
424 signature: Default::default(),
425 slot: acc.slot,
426 tx_index: 0,
427 block_time_us: block_us,
428 grpc_recv_us: grpc_us,
429 recent_blockhash: None,
430 };
431 if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
432 let _ = queue.push(e);
433 }
434 }
435}
436
437#[inline(always)]
448fn get_timestamp_us() -> i64 {
449 now_micros()
450}
451
452#[inline]
455fn parse_transaction_to_vec(
456 tx: &SubscribeUpdateTransaction,
457 grpc_us: i64,
458 block_us: Option<i64>,
459 filter: Option<&EventTypeFilter>,
460) -> Vec<(u64, DexEvent)> {
461 let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
462 parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
463}
464
465#[inline]
466fn parse_transaction_core(
467 tx: &SubscribeUpdateTransaction,
468 grpc_us: i64,
469 block_us: Option<i64>,
470 filter: Option<&EventTypeFilter>,
471) -> Vec<DexEvent> {
472 let Some(info) = &tx.transaction else { return Vec::new() };
473 let Some(meta) = &info.meta else { return Vec::new() };
474
475 let sig = extract_signature(&info.signature);
476 let slot = tx.slot;
477 let idx = info.index;
478
479 let (log_events, instr_events) = rayon::join(
481 || {
482 parse_logs(
483 meta,
484 &info.transaction,
485 &meta.log_messages,
486 sig,
487 slot,
488 idx,
489 block_us,
490 grpc_us,
491 filter,
492 )
493 },
494 || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
495 );
496
497 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
498}
499
500#[inline(always)]
501fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
502 try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
503}
504
505#[inline]
506fn parse_logs(
507 meta: &TransactionStatusMeta,
508 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
509 logs: &[String],
510 sig: solana_sdk::signature::Signature,
511 slot: u64,
512 tx_idx: u64,
513 block_us: Option<i64>,
514 grpc_us: i64,
515 filter: Option<&EventTypeFilter>,
516) -> Vec<DexEvent> {
517 let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
518 if m.recent_blockhash.is_empty() {
519 None
520 } else {
521 Some(m.recent_blockhash.clone())
522 }
523 });
524
525 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
526 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
527
528 let mut outer_idx: i32 = -1;
529 let mut inner_idx: i32 = -1;
530 let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
531 let mut result = Vec::with_capacity(4);
532
533 for log in logs {
534 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
535 if depth == 1 {
536 inner_idx = -1;
537 outer_idx += 1;
538 } else {
539 inner_idx += 1;
540 }
541 if let Ok(pk) = Pubkey::from_str(pid) {
542 invokes.entry(pk).or_default().push((outer_idx, inner_idx));
543 }
544 }
545
546 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() {
547 continue;
548 }
549
550 if let Some(mut e) = crate::logs::parse_log(
551 log,
552 sig,
553 slot,
554 tx_idx,
555 block_us,
556 grpc_us,
557 filter,
558 has_create,
559 recent_blockhash.as_deref(),
560 ) {
561 crate::core::account_dispatcher::fill_accounts_with_owned_keys(&mut e, meta, transaction, &invokes);
562 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
563 result.push(e);
564 }
565 }
566 result
567}
568
569#[inline]
570fn parse_instructions(
571 meta: &TransactionStatusMeta,
572 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
573 sig: solana_sdk::signature::Signature,
574 slot: u64,
575 tx_idx: u64,
576 block_us: Option<i64>,
577 grpc_us: i64,
578 filter: Option<&EventTypeFilter>,
579) -> Vec<DexEvent> {
580 crate::grpc::instruction_parser::parse_instructions_enhanced(
586 meta,
587 transaction,
588 sig,
589 slot,
590 tx_idx,
591 block_us,
592 grpc_us,
593 filter,
594 )
595}