1use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::types::*;
11use crate::core::{EventMetadata, now_micros}; use crate::instr::read_pubkey_fast;
13use crate::logs::timestamp_to_microseconds;
14use crate::DexEvent;
15use crossbeam_queue::ArrayQueue;
16use futures::{SinkExt, StreamExt};
17use log::error;
18use memchr::memmem;
19use once_cell::sync::Lazy;
20use std::collections::HashMap;
21use std::sync::Arc;
22use tokio::sync::{mpsc, Mutex};
23use tokio::time::{Duration, Instant};
24use tonic::transport::ClientTlsConfig;
25use yellowstone_grpc_client::GeyserGrpcClient;
26use yellowstone_grpc_proto::prelude::*;
27
28static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
29 Lazy::new(|| memmem::Finder::new(b"Program data: "));
30
31#[derive(Clone)]
34pub struct YellowstoneGrpc {
35 endpoint: String,
36 token: Option<String>,
37 config: ClientConfig,
38 control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
39}
40
41impl YellowstoneGrpc {
42 pub fn new(endpoint: String, token: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
43 crate::warmup::warmup_parser();
44 Ok(Self {
45 endpoint,
46 token,
47 config: ClientConfig::default(),
48 control_tx: Arc::new(Mutex::new(None)),
49 })
50 }
51
52 pub fn new_with_config(
53 endpoint: String,
54 token: Option<String>,
55 config: ClientConfig,
56 ) -> Result<Self, Box<dyn std::error::Error>> {
57 crate::warmup::warmup_parser();
58 Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
59 }
60
61 pub async fn subscribe_dex_events(
63 &self,
64 transaction_filters: Vec<TransactionFilter>,
65 account_filters: Vec<AccountFilter>,
66 event_type_filter: Option<EventTypeFilter>,
67 ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
68 let queue = Arc::new(ArrayQueue::new(100_000));
69 let queue_clone = Arc::clone(&queue);
70 let self_clone = self.clone();
71
72 tokio::spawn(async move {
73 let mut delay = 1u64;
74 loop {
75 match self_clone.stream_events(&transaction_filters, &account_filters, &event_type_filter, &queue_clone).await {
76 Ok(_) => delay = 1,
77 Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
78 }
79 tokio::time::sleep(Duration::from_secs(delay)).await;
80 delay = (delay * 2).min(60);
81 }
82 });
83
84 Ok(queue)
85 }
86
87 pub async fn update_subscription(
89 &self,
90 transaction_filters: Vec<TransactionFilter>,
91 account_filters: Vec<AccountFilter>,
92 ) -> Result<(), Box<dyn std::error::Error>> {
93 let sender = self.control_tx.lock().await
94 .as_ref()
95 .ok_or("No active subscription")?
96 .clone();
97
98 let request = build_subscribe_request(&transaction_filters, &account_filters);
99 sender.send(request).await.map_err(|e| e.to_string())?;
100 Ok(())
101 }
102
103 pub async fn stop(&self) {
104 println!("🛑 Stopping gRPC subscription...");
105 }
106
107 async fn stream_events(
110 &self,
111 tx_filters: &[TransactionFilter],
112 acc_filters: &[AccountFilter],
113 event_filter: &Option<EventTypeFilter>,
114 queue: &Arc<ArrayQueue<DexEvent>>,
115 ) -> Result<(), String> {
116 let _ = rustls::crypto::ring::default_provider().install_default();
117
118 let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
120 .map_err(|e| e.to_string())?
121 .x_token(self.token.clone())
122 .map_err(|e| e.to_string())?
123 .max_decoding_message_size(1024 * 1024 * 1024);
124
125 if self.config.connection_timeout_ms > 0 {
126 builder = builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
127 }
128 if self.config.enable_tls {
129 builder = builder.tls_config(ClientTlsConfig::new().with_native_roots()).map_err(|e| e.to_string())?;
130 }
131
132 let mut client = builder.connect().await.map_err(|e| e.to_string())?;
133 let request = build_subscribe_request(tx_filters, acc_filters);
134
135 let (subscribe_tx, mut stream) = client
136 .subscribe_with_request(Some(request))
137 .await
138 .map_err(|e| e.to_string())?;
139
140 self.print_mode_info();
141
142 let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
144 *self.control_tx.lock().await = Some(control_tx);
145 let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
146
147 let mut slot_buffer = SlotBuffer::new();
149 let mut micro_batch = MicroBatchBuffer::new();
150 let mut last_slot = 0u64;
151
152 let order_mode = self.config.order_mode;
153 let timeout_ms = self.config.order_timeout_ms;
154 let batch_us = self.config.micro_batch_us;
155 let check_interval = Duration::from_millis(timeout_ms / 2);
156 let mut next_check = Instant::now() + check_interval;
157
158 loop {
159 self.check_timeout(
161 order_mode, &mut slot_buffer, &mut micro_batch, queue,
162 timeout_ms, batch_us, &mut next_check, check_interval
163 );
164
165 tokio::select! {
166 msg = stream.next() => {
167 match msg {
168 Some(Ok(update)) => {
169 self.handle_update(
170 update, order_mode, event_filter, queue,
171 &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
172 );
173 }
174 Some(Err(e)) => {
175 error!("Stream error: {:?}", e);
176 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
177 return Err(e.to_string());
178 }
179 None => {
180 self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
181 return Ok(());
182 }
183 }
184 }
185 Some(req) = control_rx.recv() => {
186 if let Err(e) = subscribe_tx.lock().await.send(req).await {
187 return Err(e.to_string());
188 }
189 }
190 }
191 }
192 }
193
194 fn print_mode_info(&self) {
195 match self.config.order_mode {
196 OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
197 OrderMode::Ordered => println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms),
198 OrderMode::StreamingOrdered => println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms),
199 OrderMode::MicroBatch => println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us),
200 }
201 }
202
203 #[inline]
204 fn check_timeout(
205 &self,
206 mode: OrderMode,
207 slot_buf: &mut SlotBuffer,
208 micro_buf: &mut MicroBatchBuffer,
209 queue: &Arc<ArrayQueue<DexEvent>>,
210 timeout_ms: u64,
211 batch_us: u64,
212 next_check: &mut Instant,
213 interval: Duration,
214 ) {
215 if Instant::now() < *next_check {
216 return;
217 }
218 *next_check = Instant::now() + interval;
219
220 match mode {
221 OrderMode::Ordered => {
222 if slot_buf.should_timeout(timeout_ms) {
223 for e in slot_buf.flush_all() { let _ = queue.push(e); }
224 }
225 }
226 OrderMode::StreamingOrdered => {
227 if slot_buf.should_timeout(timeout_ms) {
228 for e in slot_buf.flush_streaming_timeout() { let _ = queue.push(e); }
229 }
230 }
231 OrderMode::MicroBatch => {
232 let now_us = get_timestamp_us();
234 if micro_buf.should_flush(now_us, batch_us) {
235 for e in micro_buf.flush() { let _ = queue.push(e); }
236 }
237 }
238 OrderMode::Unordered => {}
239 }
240 }
241
242 fn flush_on_disconnect(&self, mode: OrderMode, buffer: &mut SlotBuffer, queue: &Arc<ArrayQueue<DexEvent>>) {
243 if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
244 let events = match mode {
245 OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
246 _ => buffer.flush_all(),
247 };
248 for e in events { let _ = queue.push(e); }
249 }
250 }
251
252 #[inline]
253 fn handle_update(
254 &self,
255 update_msg: SubscribeUpdate,
256 mode: OrderMode,
257 filter: &Option<EventTypeFilter>,
258 queue: &Arc<ArrayQueue<DexEvent>>,
259 slot_buf: &mut SlotBuffer,
260 micro_buf: &mut MicroBatchBuffer,
261 last_slot: &mut u64,
262 batch_us: u64,
263 ) {
264 let block_time_us = timestamp_to_microseconds(&update_msg.created_at.unwrap_or_default()) as i64;
265 let grpc_recv_us = get_timestamp_us();
266
267 let Some(update) = update_msg.update_oneof else { return };
268
269 match update {
270 subscribe_update::UpdateOneof::Transaction(tx) => {
271 self.handle_transaction(tx, mode, filter, queue, slot_buf, micro_buf, last_slot, batch_us, grpc_recv_us, block_time_us);
272 }
273 subscribe_update::UpdateOneof::Account(acc) => {
274 Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
275 }
276 _ => {}
277 }
278 }
279
280 #[inline]
281 fn handle_transaction(
282 &self,
283 tx: SubscribeUpdateTransaction,
284 mode: OrderMode,
285 filter: &Option<EventTypeFilter>,
286 queue: &Arc<ArrayQueue<DexEvent>>,
287 slot_buf: &mut SlotBuffer,
288 micro_buf: &mut MicroBatchBuffer,
289 last_slot: &mut u64,
290 batch_us: u64,
291 grpc_us: i64,
292 block_us: i64,
293 ) {
294 let slot = tx.slot;
295
296 match mode {
297 OrderMode::Unordered => {
298 for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
299 let _ = queue.push(e);
300 }
301 }
302 OrderMode::Ordered => {
303 if slot > *last_slot && *last_slot > 0 {
304 for e in slot_buf.flush_before(slot) { let _ = queue.push(e); }
305 }
306 *last_slot = slot;
307 for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
308 slot_buf.push(slot, idx, e);
309 }
310 }
311 OrderMode::StreamingOrdered => {
312 for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
313 for evt in slot_buf.push_streaming(slot, idx, e) {
314 let _ = queue.push(evt);
315 }
316 }
317 }
318 OrderMode::MicroBatch => {
319 for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
320 if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
321 for evt in micro_buf.flush() { let _ = queue.push(evt); }
322 }
323 }
324 }
325 }
326 }
327
328 #[inline]
329 fn handle_account(
330 acc: SubscribeUpdateAccount,
331 filter: &Option<EventTypeFilter>,
332 queue: &Arc<ArrayQueue<DexEvent>>,
333 grpc_us: i64,
334 block_us: i64,
335 ) {
336 let Some(info) = acc.account else { return };
337 let data = crate::accounts::AccountData {
338 pubkey: read_pubkey_fast(&info.pubkey),
339 executable: info.executable,
340 lamports: info.lamports,
341 owner: read_pubkey_fast(&info.owner),
342 rent_epoch: info.rent_epoch,
343 data: info.data,
344 };
345 let meta = EventMetadata {
346 signature: Default::default(),
347 slot: acc.slot,
348 tx_index: 0,
349 block_time_us: block_us,
350 grpc_recv_us: grpc_us,
351 };
352 if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
353 let _ = queue.push(e);
354 }
355 }
356}
357
358#[inline(always)]
369fn get_timestamp_us() -> i64 {
370 now_micros()
371}
372
373fn build_subscribe_request(tx_filters: &[TransactionFilter], acc_filters: &[AccountFilter]) -> SubscribeRequest {
374 let transactions = tx_filters.iter().enumerate().map(|(i, f)| {
375 (format!("tx_{}", i), SubscribeRequestFilterTransactions {
376 vote: Some(false),
377 failed: Some(false),
378 signature: None,
379 account_include: f.account_include.clone(),
380 account_exclude: f.account_exclude.clone(),
381 account_required: f.account_required.clone(),
382 })
383 }).collect();
384
385 let accounts = acc_filters.iter().enumerate().map(|(i, f)| {
386 (format!("acc_{}", i), SubscribeRequestFilterAccounts {
387 account: f.account.clone(),
388 owner: f.owner.clone(),
389 filters: f.filters.clone(),
390 nonempty_txn_signature: None,
391 })
392 }).collect();
393
394 SubscribeRequest {
395 slots: HashMap::new(),
396 accounts,
397 transactions,
398 transactions_status: HashMap::new(),
399 blocks: HashMap::new(),
400 blocks_meta: HashMap::new(),
401 entry: HashMap::new(),
402 commitment: Some(CommitmentLevel::Processed as i32),
403 accounts_data_slice: Vec::new(),
404 ping: None,
405 from_slot: None,
406 }
407}
408
409#[inline]
412fn parse_transaction_to_vec(
413 tx: &SubscribeUpdateTransaction,
414 grpc_us: i64,
415 block_us: Option<i64>,
416 filter: Option<&EventTypeFilter>,
417) -> Vec<(u64, DexEvent)> {
418 let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
419 parse_transaction_core(tx, grpc_us, block_us, filter)
420 .into_iter()
421 .map(|e| (idx, e))
422 .collect()
423}
424
425#[inline]
426fn parse_transaction_core(
427 tx: &SubscribeUpdateTransaction,
428 grpc_us: i64,
429 block_us: Option<i64>,
430 filter: Option<&EventTypeFilter>,
431) -> Vec<DexEvent> {
432 let Some(info) = &tx.transaction else { return Vec::new() };
433 let Some(meta) = &info.meta else { return Vec::new() };
434
435 let sig = extract_signature(&info.signature);
436 let slot = tx.slot;
437 let idx = info.index;
438
439 let (log_events, instr_events) = rayon::join(
441 || parse_logs(meta, &info.transaction, &meta.log_messages, sig, slot, idx, block_us, grpc_us, filter),
442 || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
443 );
444
445 let mut result = Vec::with_capacity(log_events.len() + instr_events.len());
446 result.extend(log_events);
447 result.extend(instr_events);
448 result
449}
450
451#[inline(always)]
452fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
453 let mut arr = [0u8; 64];
454 arr.copy_from_slice(bytes);
455 solana_sdk::signature::Signature::from(arr)
456}
457
458#[inline]
459fn parse_logs(
460 meta: &TransactionStatusMeta,
461 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
462 logs: &[String],
463 sig: solana_sdk::signature::Signature,
464 slot: u64,
465 tx_idx: u64,
466 block_us: Option<i64>,
467 grpc_us: i64,
468 filter: Option<&EventTypeFilter>,
469) -> Vec<DexEvent> {
470 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
471 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
472
473 let mut outer_idx: i32 = -1;
474 let mut inner_idx: i32 = -1;
475 let mut invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::with_capacity(8);
476 let mut result = Vec::with_capacity(4);
477
478 for log in logs {
479 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
480 if depth == 1 { inner_idx = -1; outer_idx += 1; } else { inner_idx += 1; }
481 invokes.entry(pid).or_default().push((outer_idx, inner_idx));
482 }
483
484 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() { continue; }
485
486 if let Some(mut e) = crate::logs::parse_log(log, sig, slot, tx_idx, block_us, grpc_us, filter, has_create) {
487 crate::core::account_dispatcher::fill_accounts_from_transaction_data(&mut e, meta, transaction, &invokes);
488 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
489 result.push(e);
490 }
491 }
492 result
493}
494
495#[inline]
496fn parse_instructions(
497 meta: &TransactionStatusMeta,
498 transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
499 sig: solana_sdk::signature::Signature,
500 slot: u64,
501 tx_idx: u64,
502 block_us: Option<i64>,
503 grpc_us: i64,
504 filter: Option<&EventTypeFilter>,
505) -> Vec<DexEvent> {
506 crate::grpc::instruction_parser::parse_instructions_enhanced(
512 meta,
513 transaction,
514 sig,
515 slot,
516 tx_idx,
517 block_us,
518 grpc_us,
519 filter,
520 )
521}