Skip to main content

sol_parser_sdk/grpc/
client.rs

1//! Yellowstone gRPC 客户端 - 超低延迟 DEX 事件订阅
2//!
3//! 支持多种事件输出模式:
4//! - Unordered: 10-20μs 极低延迟
5//! - MicroBatch: 50-200μs 微批次有序
6//! - StreamingOrdered: 0.1-5ms 流式有序
7//! - Ordered: 1-50ms 完全有序
8
9use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::{
11    build_subscribe_request, build_subscribe_request_with_event_filter,
12};
13use super::transaction_meta::try_yellowstone_signature;
14use super::types::*;
15use crate::core::{now_micros, EventMetadata}; // 导入高性能时钟
16use crate::instr::read_pubkey_fast;
17use crate::logs::timestamp_to_microseconds;
18use crate::DexEvent;
19use crossbeam_queue::ArrayQueue;
20use futures::{SinkExt, StreamExt};
21use log::error;
22use memchr::memmem;
23use once_cell::sync::Lazy;
24use solana_sdk::pubkey::Pubkey;
25use std::collections::HashMap;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::{mpsc, Mutex};
29use tokio::time::{Duration, Instant};
30// Note: ClientTlsConfig moved to yellowstone_grpc_client in newer versions
31use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
32use yellowstone_grpc_proto::prelude::*;
33
34static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
35    Lazy::new(|| memmem::Finder::new(b"Program data: "));
36
37// ==================== YellowstoneGrpc 客户端 ====================
38
39#[derive(Clone)]
40pub struct YellowstoneGrpc {
41    endpoint: String,
42    token: Option<String>,
43    config: ClientConfig,
44    control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
45}
46
47impl YellowstoneGrpc {
48    pub fn new(
49        endpoint: String,
50        token: Option<String>,
51    ) -> Result<Self, Box<dyn std::error::Error>> {
52        crate::warmup::warmup_parser();
53        Ok(Self {
54            endpoint,
55            token,
56            config: ClientConfig::default(),
57            control_tx: Arc::new(Mutex::new(None)),
58        })
59    }
60
61    pub fn new_with_config(
62        endpoint: String,
63        token: Option<String>,
64        config: ClientConfig,
65    ) -> Result<Self, Box<dyn std::error::Error>> {
66        crate::warmup::warmup_parser();
67        Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
68    }
69
70    /// 订阅 DEX 事件(自动重连)
71    pub async fn subscribe_dex_events(
72        &self,
73        transaction_filters: Vec<TransactionFilter>,
74        account_filters: Vec<AccountFilter>,
75        event_type_filter: Option<EventTypeFilter>,
76    ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
77        let queue = Arc::new(ArrayQueue::new(self.config.buffer_size.max(1)));
78        let queue_clone = Arc::clone(&queue);
79        let self_clone = self.clone();
80
81        tokio::spawn(async move {
82            let mut delay = 1u64;
83            loop {
84                match self_clone
85                    .stream_events(
86                        &transaction_filters,
87                        &account_filters,
88                        &event_type_filter,
89                        &queue_clone,
90                    )
91                    .await
92                {
93                    Ok(_) => delay = 1,
94                    Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
95                }
96                tokio::time::sleep(Duration::from_secs(delay)).await;
97                delay = (delay * 2).min(60);
98            }
99        });
100
101        Ok(queue)
102    }
103
104    /// 动态更新订阅过滤器
105    pub async fn update_subscription(
106        &self,
107        transaction_filters: Vec<TransactionFilter>,
108        account_filters: Vec<AccountFilter>,
109    ) -> Result<(), Box<dyn std::error::Error>> {
110        let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
111
112        let request = build_subscribe_request(&transaction_filters, &account_filters);
113        sender.send(request).await.map_err(|e| e.to_string())?;
114        Ok(())
115    }
116
117    pub async fn stop(&self) {
118        println!("🛑 Stopping gRPC subscription...");
119    }
120
121    // ==================== 核心事件流处理 ====================
122
123    async fn stream_events(
124        &self,
125        tx_filters: &[TransactionFilter],
126        acc_filters: &[AccountFilter],
127        event_filter: &Option<EventTypeFilter>,
128        queue: &Arc<ArrayQueue<DexEvent>>,
129    ) -> Result<(), String> {
130        let _ = rustls::crypto::ring::default_provider().install_default();
131
132        // 构建客户端
133        let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
134            .map_err(|e| e.to_string())?
135            .x_token(self.token.clone())
136            .map_err(|e| e.to_string())?
137            .max_decoding_message_size(1024 * 1024 * 1024);
138
139        if self.config.connection_timeout_ms > 0 {
140            builder =
141                builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
142        }
143        if self.config.enable_tls {
144            builder = builder
145                .tls_config(ClientTlsConfig::new().with_native_roots())
146                .map_err(|e| e.to_string())?;
147        }
148
149        let mut client = builder.connect().await.map_err(|e| e.to_string())?;
150        let request = build_subscribe_request_with_event_filter(
151            tx_filters,
152            acc_filters,
153            event_filter.as_ref(),
154            CommitmentLevel::Processed,
155        );
156
157        let (subscribe_tx, mut stream) =
158            client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
159
160        self.print_mode_info();
161
162        // 设置控制通道
163        let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
164        *self.control_tx.lock().await = Some(control_tx);
165        let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
166
167        // 初始化缓冲区
168        let mut slot_buffer = SlotBuffer::new();
169        let mut micro_batch = MicroBatchBuffer::new();
170        let mut last_slot = 0u64;
171
172        let order_mode = self.config.order_mode;
173        let timeout_ms = self.config.order_timeout_ms;
174        let batch_us = self.config.micro_batch_us;
175        let check_interval = Duration::from_millis(timeout_ms / 2);
176        let mut next_check = Instant::now() + check_interval;
177
178        loop {
179            // Periodic timeout check for ordered modes and MicroBatch
180            self.check_timeout(
181                order_mode,
182                &mut slot_buffer,
183                &mut micro_batch,
184                queue,
185                timeout_ms,
186                batch_us,
187                &mut next_check,
188                check_interval,
189            );
190
191            tokio::select! {
192                msg = stream.next() => {
193                    match msg {
194                        Some(Ok(update)) => {
195                            // Geyser 会周期性下发 ping;必须在同一 subscribe 流上回写 SubscribeRequest.ping,否则公共节点 / LB 可能 RST_STREAM。
196                            if matches!(
197                                update.update_oneof.as_ref(),
198                                Some(subscribe_update::UpdateOneof::Ping(_))
199                            ) {
200                                if let Err(e) = subscribe_tx
201                                    .lock()
202                                    .await
203                                    .send(SubscribeRequest {
204                                        ping: Some(SubscribeRequestPing { id: 1 }),
205                                        ..Default::default()
206                                    })
207                                    .await
208                                {
209                                    return Err(e.to_string());
210                                }
211                                continue;
212                            }
213                            self.handle_update(
214                                update, order_mode, event_filter, queue,
215                                &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
216                            );
217                        }
218                        Some(Err(e)) => {
219                            error!("Stream error: {:?}", e);
220                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
221                            return Err(e.to_string());
222                        }
223                        None => {
224                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
225                            return Ok(());
226                        }
227                    }
228                }
229                Some(req) = control_rx.recv() => {
230                    if let Err(e) = subscribe_tx.lock().await.send(req).await {
231                        return Err(e.to_string());
232                    }
233                }
234            }
235        }
236    }
237
238    fn print_mode_info(&self) {
239        match self.config.order_mode {
240            OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
241            OrderMode::Ordered => {
242                println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
243            }
244            OrderMode::StreamingOrdered => {
245                println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
246            }
247            OrderMode::MicroBatch => {
248                println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
249            }
250        }
251    }
252
253    #[inline]
254    fn check_timeout(
255        &self,
256        mode: OrderMode,
257        slot_buf: &mut SlotBuffer,
258        micro_buf: &mut MicroBatchBuffer,
259        queue: &Arc<ArrayQueue<DexEvent>>,
260        timeout_ms: u64,
261        batch_us: u64,
262        next_check: &mut Instant,
263        interval: Duration,
264    ) {
265        if Instant::now() < *next_check {
266            return;
267        }
268        *next_check = Instant::now() + interval;
269
270        match mode {
271            OrderMode::Ordered => {
272                if slot_buf.should_timeout(timeout_ms) {
273                    for e in slot_buf.flush_all() {
274                        let _ = queue.push(e);
275                    }
276                }
277            }
278            OrderMode::StreamingOrdered => {
279                if slot_buf.should_timeout(timeout_ms) {
280                    for e in slot_buf.flush_streaming_timeout() {
281                        let _ = queue.push(e);
282                    }
283                }
284            }
285            OrderMode::MicroBatch => {
286                // Periodic flush for MicroBatch mode
287                let now_us = get_timestamp_us();
288                if micro_buf.should_flush(now_us, batch_us) {
289                    for e in micro_buf.flush() {
290                        let _ = queue.push(e);
291                    }
292                }
293            }
294            OrderMode::Unordered => {}
295        }
296    }
297
298    fn flush_on_disconnect(
299        &self,
300        mode: OrderMode,
301        buffer: &mut SlotBuffer,
302        queue: &Arc<ArrayQueue<DexEvent>>,
303    ) {
304        if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
305            let events = match mode {
306                OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
307                _ => buffer.flush_all(),
308            };
309            for e in events {
310                let _ = queue.push(e);
311            }
312        }
313    }
314
315    #[inline]
316    fn handle_update(
317        &self,
318        update_msg: SubscribeUpdate,
319        mode: OrderMode,
320        filter: &Option<EventTypeFilter>,
321        queue: &Arc<ArrayQueue<DexEvent>>,
322        slot_buf: &mut SlotBuffer,
323        micro_buf: &mut MicroBatchBuffer,
324        last_slot: &mut u64,
325        batch_us: u64,
326    ) {
327        let created_at = update_msg.created_at.unwrap_or_default();
328        let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
329        let grpc_recv_us = get_timestamp_us();
330
331        let Some(update) = update_msg.update_oneof else { return };
332
333        match update {
334            subscribe_update::UpdateOneof::Transaction(tx) => {
335                self.handle_transaction(
336                    tx,
337                    mode,
338                    filter,
339                    queue,
340                    slot_buf,
341                    micro_buf,
342                    last_slot,
343                    batch_us,
344                    grpc_recv_us,
345                    block_time_us,
346                );
347            }
348            subscribe_update::UpdateOneof::Account(acc) => {
349                Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
350            }
351            subscribe_update::UpdateOneof::BlockMeta(block_meta) => {
352                Self::handle_block_meta(block_meta, filter, queue, grpc_recv_us, block_time_us);
353            }
354            _ => {}
355        }
356    }
357
358    #[inline]
359    fn handle_transaction(
360        &self,
361        tx: SubscribeUpdateTransaction,
362        mode: OrderMode,
363        filter: &Option<EventTypeFilter>,
364        queue: &Arc<ArrayQueue<DexEvent>>,
365        slot_buf: &mut SlotBuffer,
366        micro_buf: &mut MicroBatchBuffer,
367        last_slot: &mut u64,
368        batch_us: u64,
369        grpc_us: i64,
370        block_us: i64,
371    ) {
372        let slot = tx.slot;
373
374        match mode {
375            OrderMode::Unordered => {
376                for e in crate::grpc::parse_subscribe_update_transaction_low_latency(
377                    &tx,
378                    grpc_us,
379                    Some(block_us),
380                    filter.as_ref(),
381                ) {
382                    let _ = queue.push(e);
383                }
384            }
385            OrderMode::Ordered => {
386                if slot > *last_slot && *last_slot > 0 {
387                    for e in slot_buf.flush_before(slot) {
388                        let _ = queue.push(e);
389                    }
390                }
391                *last_slot = slot;
392                for (idx, e) in
393                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
394                {
395                    slot_buf.push(slot, idx, e);
396                }
397            }
398            OrderMode::StreamingOrdered => {
399                for (idx, e) in
400                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
401                {
402                    for evt in slot_buf.push_streaming(slot, idx, e) {
403                        let _ = queue.push(evt);
404                    }
405                }
406            }
407            OrderMode::MicroBatch => {
408                for (idx, e) in
409                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
410                {
411                    if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
412                        for evt in micro_buf.flush() {
413                            let _ = queue.push(evt);
414                        }
415                    }
416                }
417            }
418        }
419    }
420
421    #[inline]
422    fn handle_account(
423        acc: SubscribeUpdateAccount,
424        filter: &Option<EventTypeFilter>,
425        queue: &Arc<ArrayQueue<DexEvent>>,
426        grpc_us: i64,
427        block_us: i64,
428    ) {
429        let Some(info) = acc.account else { return };
430        let data = crate::accounts::AccountData {
431            pubkey: read_pubkey_fast(&info.pubkey),
432            executable: info.executable,
433            lamports: info.lamports,
434            owner: read_pubkey_fast(&info.owner),
435            rent_epoch: info.rent_epoch,
436            data: info.data,
437        };
438        let meta = EventMetadata {
439            signature: Default::default(),
440            slot: acc.slot,
441            tx_index: 0,
442            block_time_us: block_us,
443            grpc_recv_us: grpc_us,
444            recent_blockhash: None,
445        };
446        if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
447            let _ = queue.push(e);
448        }
449    }
450
451    #[inline]
452    fn handle_block_meta(
453        block_meta: SubscribeUpdateBlockMeta,
454        filter: &Option<EventTypeFilter>,
455        queue: &Arc<ArrayQueue<DexEvent>>,
456        grpc_us: i64,
457        fallback_block_us: i64,
458    ) {
459        let block_time_us = block_meta
460            .block_time
461            .as_ref()
462            .map(|t| t.timestamp.saturating_mul(1_000_000))
463            .unwrap_or(fallback_block_us);
464        let event = DexEvent::BlockMeta(crate::core::events::BlockMetaEvent {
465            metadata: EventMetadata {
466                signature: Default::default(),
467                slot: block_meta.slot,
468                tx_index: 0,
469                block_time_us,
470                grpc_recv_us: grpc_us,
471                recent_blockhash: (!block_meta.blockhash.is_empty())
472                    .then_some(block_meta.blockhash),
473            },
474        });
475        if filter.as_ref().map(|f| f.should_include_dex_event(&event)).unwrap_or(true) {
476            let _ = queue.push(event);
477        }
478    }
479}
480
481// ==================== 辅助函数 ====================
482
483/// 获取当前时间戳(微秒)
484///
485/// 使用高性能时钟,避免系统调用开销
486///
487/// # 性能优势
488/// - 旧实现:使用 libc::clock_gettime,每次调用约 1-2μs
489/// - 新实现:使用高性能时钟,每次调用约 10-50ns
490/// - 性能提升:20-100 倍
491#[inline(always)]
492fn get_timestamp_us() -> i64 {
493    now_micros()
494}
495
496// ==================== 交易解析 ====================
497
498#[inline]
499fn parse_transaction_to_vec(
500    tx: &SubscribeUpdateTransaction,
501    grpc_us: i64,
502    block_us: Option<i64>,
503    filter: Option<&EventTypeFilter>,
504) -> Vec<(u64, DexEvent)> {
505    let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
506    parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
507}
508
509#[inline]
510fn parse_transaction_core(
511    tx: &SubscribeUpdateTransaction,
512    grpc_us: i64,
513    block_us: Option<i64>,
514    filter: Option<&EventTypeFilter>,
515) -> Vec<DexEvent> {
516    let Some(info) = &tx.transaction else { return Vec::new() };
517    let Some(meta) = &info.meta else { return Vec::new() };
518
519    let sig = extract_signature(&info.signature);
520    let slot = tx.slot;
521    let idx = info.index;
522
523    let log_events = parse_logs(
524        meta,
525        &info.transaction,
526        &meta.log_messages,
527        sig,
528        slot,
529        idx,
530        block_us,
531        grpc_us,
532        filter,
533    );
534    let instr_events =
535        parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
536
537    let events =
538        crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
539    if let Some(filter) = filter {
540        events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
541    } else {
542        events
543    }
544}
545
546#[inline(always)]
547fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
548    try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
549}
550
551#[inline]
552fn parse_logs(
553    meta: &TransactionStatusMeta,
554    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
555    logs: &[String],
556    sig: solana_sdk::signature::Signature,
557    slot: u64,
558    tx_idx: u64,
559    block_us: Option<i64>,
560    grpc_us: i64,
561    filter: Option<&EventTypeFilter>,
562) -> Vec<DexEvent> {
563    let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
564        if m.recent_blockhash.is_empty() {
565            None
566        } else {
567            Some(m.recent_blockhash.clone())
568        }
569    });
570
571    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
572    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
573
574    let mut outer_idx: i32 = -1;
575    let mut inner_idx: i32 = -1;
576    let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
577    let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
578    let mut result = Vec::with_capacity(4);
579
580    for log in logs {
581        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
582            if depth == 1 {
583                inner_idx = -1;
584                outer_idx += 1;
585            } else {
586                inner_idx += 1;
587            }
588            if let Ok(pk) = Pubkey::from_str(pid) {
589                active_program_stack.truncate(depth.saturating_sub(1));
590                active_program_stack.push(pk);
591                invokes.entry(pk).or_default().push((outer_idx, inner_idx));
592            }
593        }
594
595        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
596            let current_program = active_program_stack.last();
597            if let Some(mut e) = crate::logs::parse_log_with_program_id(
598                log,
599                sig,
600                slot,
601                tx_idx,
602                block_us,
603                grpc_us,
604                filter,
605                has_create,
606                recent_blockhash.as_deref(),
607                current_program,
608            ) {
609                crate::core::account_dispatcher::fill_accounts_with_owned_keys(
610                    &mut e,
611                    meta,
612                    transaction,
613                    &invokes,
614                );
615                crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
616                result.push(e);
617            }
618        }
619
620        if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
621            if let Ok(pk) = Pubkey::from_str(pid) {
622                if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
623                    active_program_stack.truncate(pos);
624                }
625            }
626        }
627    }
628    result
629}
630
631#[inline]
632fn parse_instructions(
633    meta: &TransactionStatusMeta,
634    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
635    sig: solana_sdk::signature::Signature,
636    slot: u64,
637    tx_idx: u64,
638    block_us: Option<i64>,
639    grpc_us: i64,
640    filter: Option<&EventTypeFilter>,
641) -> Vec<DexEvent> {
642    // 使用增强的 instruction 解析器
643    // 支持:
644    // - 主指令解析(8字节 discriminator)
645    // - Inner instruction 解析(16字节 discriminator)
646    // - 自动事件合并(instruction + inner instruction)
647    crate::grpc::instruction_parser::parse_instructions_enhanced(
648        meta,
649        transaction,
650        sig,
651        slot,
652        tx_idx,
653        block_us,
654        grpc_us,
655        filter,
656    )
657}