Skip to main content

sol_parser_sdk/grpc/
client.rs

1//! Yellowstone gRPC 客户端 - 超低延迟 DEX 事件订阅
2//!
3//! 支持多种事件输出模式:
4//! - Unordered: 10-20μs 极低延迟
5//! - MicroBatch: 50-200μs 微批次有序
6//! - StreamingOrdered: 0.1-5ms 流式有序
7//! - Ordered: 1-50ms 完全有序
8
9use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::build_subscribe_request;
11use super::transaction_meta::try_yellowstone_signature;
12use super::types::*;
13use crate::core::{now_micros, EventMetadata}; // 导入高性能时钟
14use crate::instr::read_pubkey_fast;
15use crate::logs::timestamp_to_microseconds;
16use crate::DexEvent;
17use crossbeam_queue::ArrayQueue;
18use solana_sdk::pubkey::Pubkey;
19use std::str::FromStr;
20use futures::{SinkExt, StreamExt};
21use log::error;
22use memchr::memmem;
23use once_cell::sync::Lazy;
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::{mpsc, Mutex};
27use tokio::time::{Duration, Instant};
28// Note: ClientTlsConfig moved to yellowstone_grpc_client in newer versions
29use yellowstone_grpc_client::{GeyserGrpcClient, ClientTlsConfig};
30use yellowstone_grpc_proto::prelude::*;
31
32static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
33    Lazy::new(|| memmem::Finder::new(b"Program data: "));
34
35// ==================== YellowstoneGrpc 客户端 ====================
36
37#[derive(Clone)]
38pub struct YellowstoneGrpc {
39    endpoint: String,
40    token: Option<String>,
41    config: ClientConfig,
42    control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
43}
44
45impl YellowstoneGrpc {
46    pub fn new(
47        endpoint: String,
48        token: Option<String>,
49    ) -> Result<Self, Box<dyn std::error::Error>> {
50        crate::warmup::warmup_parser();
51        Ok(Self {
52            endpoint,
53            token,
54            config: ClientConfig::default(),
55            control_tx: Arc::new(Mutex::new(None)),
56        })
57    }
58
59    pub fn new_with_config(
60        endpoint: String,
61        token: Option<String>,
62        config: ClientConfig,
63    ) -> Result<Self, Box<dyn std::error::Error>> {
64        crate::warmup::warmup_parser();
65        Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
66    }
67
68    /// 订阅 DEX 事件(自动重连)
69    pub async fn subscribe_dex_events(
70        &self,
71        transaction_filters: Vec<TransactionFilter>,
72        account_filters: Vec<AccountFilter>,
73        event_type_filter: Option<EventTypeFilter>,
74    ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
75        let queue = Arc::new(ArrayQueue::new(100_000));
76        let queue_clone = Arc::clone(&queue);
77        let self_clone = self.clone();
78
79        tokio::spawn(async move {
80            let mut delay = 1u64;
81            loop {
82                match self_clone
83                    .stream_events(
84                        &transaction_filters,
85                        &account_filters,
86                        &event_type_filter,
87                        &queue_clone,
88                    )
89                    .await
90                {
91                    Ok(_) => delay = 1,
92                    Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
93                }
94                tokio::time::sleep(Duration::from_secs(delay)).await;
95                delay = (delay * 2).min(60);
96            }
97        });
98
99        Ok(queue)
100    }
101
102    /// 动态更新订阅过滤器
103    pub async fn update_subscription(
104        &self,
105        transaction_filters: Vec<TransactionFilter>,
106        account_filters: Vec<AccountFilter>,
107    ) -> Result<(), Box<dyn std::error::Error>> {
108        let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
109
110        let request = build_subscribe_request(&transaction_filters, &account_filters);
111        sender.send(request).await.map_err(|e| e.to_string())?;
112        Ok(())
113    }
114
115    pub async fn stop(&self) {
116        println!("🛑 Stopping gRPC subscription...");
117    }
118
119    // ==================== 核心事件流处理 ====================
120
121    async fn stream_events(
122        &self,
123        tx_filters: &[TransactionFilter],
124        acc_filters: &[AccountFilter],
125        event_filter: &Option<EventTypeFilter>,
126        queue: &Arc<ArrayQueue<DexEvent>>,
127    ) -> Result<(), String> {
128        let _ = rustls::crypto::ring::default_provider().install_default();
129
130        // 构建客户端
131        let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
132            .map_err(|e| e.to_string())?
133            .x_token(self.token.clone())
134            .map_err(|e| e.to_string())?
135            .max_decoding_message_size(1024 * 1024 * 1024);
136
137        if self.config.connection_timeout_ms > 0 {
138            builder =
139                builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
140        }
141        if self.config.enable_tls {
142            builder = builder
143                .tls_config(ClientTlsConfig::new().with_native_roots())
144                .map_err(|e| e.to_string())?;
145        }
146
147        let mut client = builder.connect().await.map_err(|e| e.to_string())?;
148        let request = build_subscribe_request(tx_filters, acc_filters);
149
150        let (subscribe_tx, mut stream) =
151            client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
152
153        self.print_mode_info();
154
155        // 设置控制通道
156        let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
157        *self.control_tx.lock().await = Some(control_tx);
158        let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
159
160        // 初始化缓冲区
161        let mut slot_buffer = SlotBuffer::new();
162        let mut micro_batch = MicroBatchBuffer::new();
163        let mut last_slot = 0u64;
164
165        let order_mode = self.config.order_mode;
166        let timeout_ms = self.config.order_timeout_ms;
167        let batch_us = self.config.micro_batch_us;
168        let check_interval = Duration::from_millis(timeout_ms / 2);
169        let mut next_check = Instant::now() + check_interval;
170
171        loop {
172            // Periodic timeout check for ordered modes and MicroBatch
173            self.check_timeout(
174                order_mode,
175                &mut slot_buffer,
176                &mut micro_batch,
177                queue,
178                timeout_ms,
179                batch_us,
180                &mut next_check,
181                check_interval,
182            );
183
184            tokio::select! {
185                msg = stream.next() => {
186                    match msg {
187                        Some(Ok(update)) => {
188                            // Geyser 会周期性下发 ping;必须在同一 subscribe 流上回写 SubscribeRequest.ping,否则公共节点 / LB 可能 RST_STREAM。
189                            if matches!(
190                                update.update_oneof.as_ref(),
191                                Some(subscribe_update::UpdateOneof::Ping(_))
192                            ) {
193                                if let Err(e) = subscribe_tx
194                                    .lock()
195                                    .await
196                                    .send(SubscribeRequest {
197                                        ping: Some(SubscribeRequestPing { id: 1 }),
198                                        ..Default::default()
199                                    })
200                                    .await
201                                {
202                                    return Err(e.to_string());
203                                }
204                                continue;
205                            }
206                            self.handle_update(
207                                update, order_mode, event_filter, queue,
208                                &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
209                            );
210                        }
211                        Some(Err(e)) => {
212                            error!("Stream error: {:?}", e);
213                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
214                            return Err(e.to_string());
215                        }
216                        None => {
217                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
218                            return Ok(());
219                        }
220                    }
221                }
222                Some(req) = control_rx.recv() => {
223                    if let Err(e) = subscribe_tx.lock().await.send(req).await {
224                        return Err(e.to_string());
225                    }
226                }
227            }
228        }
229    }
230
231    fn print_mode_info(&self) {
232        match self.config.order_mode {
233            OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
234            OrderMode::Ordered => {
235                println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
236            }
237            OrderMode::StreamingOrdered => {
238                println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
239            }
240            OrderMode::MicroBatch => {
241                println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
242            }
243        }
244    }
245
246    #[inline]
247    fn check_timeout(
248        &self,
249        mode: OrderMode,
250        slot_buf: &mut SlotBuffer,
251        micro_buf: &mut MicroBatchBuffer,
252        queue: &Arc<ArrayQueue<DexEvent>>,
253        timeout_ms: u64,
254        batch_us: u64,
255        next_check: &mut Instant,
256        interval: Duration,
257    ) {
258        if Instant::now() < *next_check {
259            return;
260        }
261        *next_check = Instant::now() + interval;
262
263        match mode {
264            OrderMode::Ordered => {
265                if slot_buf.should_timeout(timeout_ms) {
266                    for e in slot_buf.flush_all() {
267                        let _ = queue.push(e);
268                    }
269                }
270            }
271            OrderMode::StreamingOrdered => {
272                if slot_buf.should_timeout(timeout_ms) {
273                    for e in slot_buf.flush_streaming_timeout() {
274                        let _ = queue.push(e);
275                    }
276                }
277            }
278            OrderMode::MicroBatch => {
279                // Periodic flush for MicroBatch mode
280                let now_us = get_timestamp_us();
281                if micro_buf.should_flush(now_us, batch_us) {
282                    for e in micro_buf.flush() {
283                        let _ = queue.push(e);
284                    }
285                }
286            }
287            OrderMode::Unordered => {}
288        }
289    }
290
291    fn flush_on_disconnect(
292        &self,
293        mode: OrderMode,
294        buffer: &mut SlotBuffer,
295        queue: &Arc<ArrayQueue<DexEvent>>,
296    ) {
297        if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
298            let events = match mode {
299                OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
300                _ => buffer.flush_all(),
301            };
302            for e in events {
303                let _ = queue.push(e);
304            }
305        }
306    }
307
308    #[inline]
309    fn handle_update(
310        &self,
311        update_msg: SubscribeUpdate,
312        mode: OrderMode,
313        filter: &Option<EventTypeFilter>,
314        queue: &Arc<ArrayQueue<DexEvent>>,
315        slot_buf: &mut SlotBuffer,
316        micro_buf: &mut MicroBatchBuffer,
317        last_slot: &mut u64,
318        batch_us: u64,
319    ) {
320        let created_at = update_msg.created_at.unwrap_or_default();
321        let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
322        let grpc_recv_us = get_timestamp_us();
323
324        let Some(update) = update_msg.update_oneof else { return };
325
326        match update {
327            subscribe_update::UpdateOneof::Transaction(tx) => {
328                self.handle_transaction(
329                    tx,
330                    mode,
331                    filter,
332                    queue,
333                    slot_buf,
334                    micro_buf,
335                    last_slot,
336                    batch_us,
337                    grpc_recv_us,
338                    block_time_us,
339                );
340            }
341            subscribe_update::UpdateOneof::Account(acc) => {
342                Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
343            }
344            _ => {}
345        }
346    }
347
348    #[inline]
349    fn handle_transaction(
350        &self,
351        tx: SubscribeUpdateTransaction,
352        mode: OrderMode,
353        filter: &Option<EventTypeFilter>,
354        queue: &Arc<ArrayQueue<DexEvent>>,
355        slot_buf: &mut SlotBuffer,
356        micro_buf: &mut MicroBatchBuffer,
357        last_slot: &mut u64,
358        batch_us: u64,
359        grpc_us: i64,
360        block_us: i64,
361    ) {
362        let slot = tx.slot;
363
364        match mode {
365            OrderMode::Unordered => {
366                for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
367                    let _ = queue.push(e);
368                }
369            }
370            OrderMode::Ordered => {
371                if slot > *last_slot && *last_slot > 0 {
372                    for e in slot_buf.flush_before(slot) {
373                        let _ = queue.push(e);
374                    }
375                }
376                *last_slot = slot;
377                for (idx, e) in
378                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
379                {
380                    slot_buf.push(slot, idx, e);
381                }
382            }
383            OrderMode::StreamingOrdered => {
384                for (idx, e) in
385                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
386                {
387                    for evt in slot_buf.push_streaming(slot, idx, e) {
388                        let _ = queue.push(evt);
389                    }
390                }
391            }
392            OrderMode::MicroBatch => {
393                for (idx, e) in
394                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
395                {
396                    if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
397                        for evt in micro_buf.flush() {
398                            let _ = queue.push(evt);
399                        }
400                    }
401                }
402            }
403        }
404    }
405
406    #[inline]
407    fn handle_account(
408        acc: SubscribeUpdateAccount,
409        filter: &Option<EventTypeFilter>,
410        queue: &Arc<ArrayQueue<DexEvent>>,
411        grpc_us: i64,
412        block_us: i64,
413    ) {
414        let Some(info) = acc.account else { return };
415        let data = crate::accounts::AccountData {
416            pubkey: read_pubkey_fast(&info.pubkey),
417            executable: info.executable,
418            lamports: info.lamports,
419            owner: read_pubkey_fast(&info.owner),
420            rent_epoch: info.rent_epoch,
421            data: info.data,
422        };
423        let meta = EventMetadata {
424            signature: Default::default(),
425            slot: acc.slot,
426            tx_index: 0,
427            block_time_us: block_us,
428            grpc_recv_us: grpc_us,
429            recent_blockhash: None,
430        };
431        if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
432            let _ = queue.push(e);
433        }
434    }
435}
436
437// ==================== 辅助函数 ====================
438
439/// 获取当前时间戳(微秒)
440///
441/// 使用高性能时钟,避免系统调用开销
442///
443/// # 性能优势
444/// - 旧实现:使用 libc::clock_gettime,每次调用约 1-2μs
445/// - 新实现:使用高性能时钟,每次调用约 10-50ns
446/// - 性能提升:20-100 倍
447#[inline(always)]
448fn get_timestamp_us() -> i64 {
449    now_micros()
450}
451
452// ==================== 交易解析 ====================
453
454#[inline]
455fn parse_transaction_to_vec(
456    tx: &SubscribeUpdateTransaction,
457    grpc_us: i64,
458    block_us: Option<i64>,
459    filter: Option<&EventTypeFilter>,
460) -> Vec<(u64, DexEvent)> {
461    let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
462    parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
463}
464
465#[inline]
466fn parse_transaction_core(
467    tx: &SubscribeUpdateTransaction,
468    grpc_us: i64,
469    block_us: Option<i64>,
470    filter: Option<&EventTypeFilter>,
471) -> Vec<DexEvent> {
472    let Some(info) = &tx.transaction else { return Vec::new() };
473    let Some(meta) = &info.meta else { return Vec::new() };
474
475    let sig = extract_signature(&info.signature);
476    let slot = tx.slot;
477    let idx = info.index;
478
479    // 并行解析 logs 和 instructions
480    let (log_events, instr_events) = rayon::join(
481        || {
482            parse_logs(
483                meta,
484                &info.transaction,
485                &meta.log_messages,
486                sig,
487                slot,
488                idx,
489                block_us,
490                grpc_us,
491                filter,
492            )
493        },
494        || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
495    );
496
497    crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
498}
499
500#[inline(always)]
501fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
502    try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
503}
504
505#[inline]
506fn parse_logs(
507    meta: &TransactionStatusMeta,
508    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
509    logs: &[String],
510    sig: solana_sdk::signature::Signature,
511    slot: u64,
512    tx_idx: u64,
513    block_us: Option<i64>,
514    grpc_us: i64,
515    filter: Option<&EventTypeFilter>,
516) -> Vec<DexEvent> {
517    let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
518        if m.recent_blockhash.is_empty() {
519            None
520        } else {
521            Some(m.recent_blockhash.clone())
522        }
523    });
524
525    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
526    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
527
528    let mut outer_idx: i32 = -1;
529    let mut inner_idx: i32 = -1;
530    let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
531    let mut result = Vec::with_capacity(4);
532
533    for log in logs {
534        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
535            if depth == 1 {
536                inner_idx = -1;
537                outer_idx += 1;
538            } else {
539                inner_idx += 1;
540            }
541            if let Ok(pk) = Pubkey::from_str(pid) {
542                invokes.entry(pk).or_default().push((outer_idx, inner_idx));
543            }
544        }
545
546        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() {
547            continue;
548        }
549
550        if let Some(mut e) = crate::logs::parse_log(
551            log,
552            sig,
553            slot,
554            tx_idx,
555            block_us,
556            grpc_us,
557            filter,
558            has_create,
559            recent_blockhash.as_deref(),
560        ) {
561            crate::core::account_dispatcher::fill_accounts_with_owned_keys(&mut e, meta, transaction, &invokes);
562            crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
563            result.push(e);
564        }
565    }
566    result
567}
568
569#[inline]
570fn parse_instructions(
571    meta: &TransactionStatusMeta,
572    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
573    sig: solana_sdk::signature::Signature,
574    slot: u64,
575    tx_idx: u64,
576    block_us: Option<i64>,
577    grpc_us: i64,
578    filter: Option<&EventTypeFilter>,
579) -> Vec<DexEvent> {
580    // 使用增强的 instruction 解析器
581    // 支持:
582    // - 主指令解析(8字节 discriminator)
583    // - Inner instruction 解析(16字节 discriminator)
584    // - 自动事件合并(instruction + inner instruction)
585    crate::grpc::instruction_parser::parse_instructions_enhanced(
586        meta,
587        transaction,
588        sig,
589        slot,
590        tx_idx,
591        block_us,
592        grpc_us,
593        filter,
594    )
595}