Skip to main content

sol_parser_sdk/grpc/
client.rs

1//! Yellowstone gRPC 客户端 - 超低延迟 DEX 事件订阅
2//!
3//! 支持多种事件输出模式:
4//! - Unordered: 10-20μs 极低延迟
5//! - MicroBatch: 50-200μs 微批次有序
6//! - StreamingOrdered: 0.1-5ms 流式有序
7//! - Ordered: 1-50ms 完全有序
8
9use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::types::*;
11use crate::core::{EventMetadata, now_micros};  // 导入高性能时钟
12use crate::instr::read_pubkey_fast;
13use crate::logs::timestamp_to_microseconds;
14use crate::DexEvent;
15use crossbeam_queue::ArrayQueue;
16use futures::{SinkExt, StreamExt};
17use log::error;
18use memchr::memmem;
19use once_cell::sync::Lazy;
20use std::collections::HashMap;
21use std::sync::Arc;
22use tokio::sync::{mpsc, Mutex};
23use tokio::time::{Duration, Instant};
24use tonic::transport::ClientTlsConfig;
25use yellowstone_grpc_client::GeyserGrpcClient;
26use yellowstone_grpc_proto::prelude::*;
27
28static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
29    Lazy::new(|| memmem::Finder::new(b"Program data: "));
30
31// ==================== YellowstoneGrpc 客户端 ====================
32
33#[derive(Clone)]
34pub struct YellowstoneGrpc {
35    endpoint: String,
36    token: Option<String>,
37    config: ClientConfig,
38    control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
39}
40
41impl YellowstoneGrpc {
42    pub fn new(endpoint: String, token: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
43        crate::warmup::warmup_parser();
44        Ok(Self {
45            endpoint,
46            token,
47            config: ClientConfig::default(),
48            control_tx: Arc::new(Mutex::new(None)),
49        })
50    }
51
52    pub fn new_with_config(
53        endpoint: String,
54        token: Option<String>,
55        config: ClientConfig,
56    ) -> Result<Self, Box<dyn std::error::Error>> {
57        crate::warmup::warmup_parser();
58        Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
59    }
60
61    /// 订阅 DEX 事件(自动重连)
62    pub async fn subscribe_dex_events(
63        &self,
64        transaction_filters: Vec<TransactionFilter>,
65        account_filters: Vec<AccountFilter>,
66        event_type_filter: Option<EventTypeFilter>,
67    ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
68        let queue = Arc::new(ArrayQueue::new(100_000));
69        let queue_clone = Arc::clone(&queue);
70        let self_clone = self.clone();
71
72        tokio::spawn(async move {
73            let mut delay = 1u64;
74            loop {
75                match self_clone.stream_events(&transaction_filters, &account_filters, &event_type_filter, &queue_clone).await {
76                    Ok(_) => delay = 1,
77                    Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
78                }
79                tokio::time::sleep(Duration::from_secs(delay)).await;
80                delay = (delay * 2).min(60);
81            }
82        });
83
84        Ok(queue)
85    }
86
87    /// 动态更新订阅过滤器
88    pub async fn update_subscription(
89        &self,
90        transaction_filters: Vec<TransactionFilter>,
91        account_filters: Vec<AccountFilter>,
92    ) -> Result<(), Box<dyn std::error::Error>> {
93        let sender = self.control_tx.lock().await
94            .as_ref()
95            .ok_or("No active subscription")?
96            .clone();
97        
98        let request = build_subscribe_request(&transaction_filters, &account_filters);
99        sender.send(request).await.map_err(|e| e.to_string())?;
100        Ok(())
101    }
102
103    pub async fn stop(&self) {
104        println!("🛑 Stopping gRPC subscription...");
105    }
106
107    // ==================== 核心事件流处理 ====================
108
109    async fn stream_events(
110        &self,
111        tx_filters: &[TransactionFilter],
112        acc_filters: &[AccountFilter],
113        event_filter: &Option<EventTypeFilter>,
114        queue: &Arc<ArrayQueue<DexEvent>>,
115    ) -> Result<(), String> {
116        let _ = rustls::crypto::ring::default_provider().install_default();
117        
118        // 构建客户端
119        let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
120            .map_err(|e| e.to_string())?
121            .x_token(self.token.clone())
122            .map_err(|e| e.to_string())?
123            .max_decoding_message_size(1024 * 1024 * 1024);
124
125        if self.config.connection_timeout_ms > 0 {
126            builder = builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
127        }
128        if self.config.enable_tls {
129            builder = builder.tls_config(ClientTlsConfig::new().with_native_roots()).map_err(|e| e.to_string())?;
130        }
131
132        let mut client = builder.connect().await.map_err(|e| e.to_string())?;
133        let request = build_subscribe_request(tx_filters, acc_filters);
134        
135        let (subscribe_tx, mut stream) = client
136            .subscribe_with_request(Some(request))
137            .await
138            .map_err(|e| e.to_string())?;
139
140        self.print_mode_info();
141
142        // 设置控制通道
143        let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
144        *self.control_tx.lock().await = Some(control_tx);
145        let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
146
147        // 初始化缓冲区
148        let mut slot_buffer = SlotBuffer::new();
149        let mut micro_batch = MicroBatchBuffer::new();
150        let mut last_slot = 0u64;
151
152        let order_mode = self.config.order_mode;
153        let timeout_ms = self.config.order_timeout_ms;
154        let batch_us = self.config.micro_batch_us;
155        let check_interval = Duration::from_millis(timeout_ms / 2);
156        let mut next_check = Instant::now() + check_interval;
157
158        loop {
159            // Periodic timeout check for ordered modes and MicroBatch
160            self.check_timeout(
161                order_mode, &mut slot_buffer, &mut micro_batch, queue,
162                timeout_ms, batch_us, &mut next_check, check_interval
163            );
164
165            tokio::select! {
166                msg = stream.next() => {
167                    match msg {
168                        Some(Ok(update)) => {
169                            self.handle_update(
170                                update, order_mode, event_filter, queue,
171                                &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
172                            );
173                        }
174                        Some(Err(e)) => {
175                            error!("Stream error: {:?}", e);
176                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
177                            return Err(e.to_string());
178                        }
179                        None => {
180                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
181                            return Ok(());
182                        }
183                    }
184                }
185                Some(req) = control_rx.recv() => {
186                    if let Err(e) = subscribe_tx.lock().await.send(req).await {
187                        return Err(e.to_string());
188                    }
189                }
190            }
191        }
192    }
193
194    fn print_mode_info(&self) {
195        match self.config.order_mode {
196            OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
197            OrderMode::Ordered => println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms),
198            OrderMode::StreamingOrdered => println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms),
199            OrderMode::MicroBatch => println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us),
200        }
201    }
202
203    #[inline]
204    fn check_timeout(
205        &self,
206        mode: OrderMode,
207        slot_buf: &mut SlotBuffer,
208        micro_buf: &mut MicroBatchBuffer,
209        queue: &Arc<ArrayQueue<DexEvent>>,
210        timeout_ms: u64,
211        batch_us: u64,
212        next_check: &mut Instant,
213        interval: Duration,
214    ) {
215        if Instant::now() < *next_check {
216            return;
217        }
218        *next_check = Instant::now() + interval;
219        
220        match mode {
221            OrderMode::Ordered => {
222                if slot_buf.should_timeout(timeout_ms) {
223                    for e in slot_buf.flush_all() { let _ = queue.push(e); }
224                }
225            }
226            OrderMode::StreamingOrdered => {
227                if slot_buf.should_timeout(timeout_ms) {
228                    for e in slot_buf.flush_streaming_timeout() { let _ = queue.push(e); }
229                }
230            }
231            OrderMode::MicroBatch => {
232                // Periodic flush for MicroBatch mode
233                let now_us = get_timestamp_us();
234                if micro_buf.should_flush(now_us, batch_us) {
235                    for e in micro_buf.flush() { let _ = queue.push(e); }
236                }
237            }
238            OrderMode::Unordered => {}
239        }
240    }
241
242    fn flush_on_disconnect(&self, mode: OrderMode, buffer: &mut SlotBuffer, queue: &Arc<ArrayQueue<DexEvent>>) {
243        if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
244            let events = match mode {
245                OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
246                _ => buffer.flush_all(),
247            };
248            for e in events { let _ = queue.push(e); }
249        }
250    }
251
252    #[inline]
253    fn handle_update(
254        &self,
255        update_msg: SubscribeUpdate,
256        mode: OrderMode,
257        filter: &Option<EventTypeFilter>,
258        queue: &Arc<ArrayQueue<DexEvent>>,
259        slot_buf: &mut SlotBuffer,
260        micro_buf: &mut MicroBatchBuffer,
261        last_slot: &mut u64,
262        batch_us: u64,
263    ) {
264        let block_time_us = timestamp_to_microseconds(&update_msg.created_at.unwrap_or_default()) as i64;
265        let grpc_recv_us = get_timestamp_us();
266
267        let Some(update) = update_msg.update_oneof else { return };
268
269        match update {
270            subscribe_update::UpdateOneof::Transaction(tx) => {
271                self.handle_transaction(tx, mode, filter, queue, slot_buf, micro_buf, last_slot, batch_us, grpc_recv_us, block_time_us);
272            }
273            subscribe_update::UpdateOneof::Account(acc) => {
274                Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
275            }
276            _ => {}
277        }
278    }
279
280    #[inline]
281    fn handle_transaction(
282        &self,
283        tx: SubscribeUpdateTransaction,
284        mode: OrderMode,
285        filter: &Option<EventTypeFilter>,
286        queue: &Arc<ArrayQueue<DexEvent>>,
287        slot_buf: &mut SlotBuffer,
288        micro_buf: &mut MicroBatchBuffer,
289        last_slot: &mut u64,
290        batch_us: u64,
291        grpc_us: i64,
292        block_us: i64,
293    ) {
294        let slot = tx.slot;
295        
296        match mode {
297            OrderMode::Unordered => {
298                for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
299                    let _ = queue.push(e);
300                }
301            }
302            OrderMode::Ordered => {
303                if slot > *last_slot && *last_slot > 0 {
304                    for e in slot_buf.flush_before(slot) { let _ = queue.push(e); }
305                }
306                *last_slot = slot;
307                for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
308                    slot_buf.push(slot, idx, e);
309                }
310            }
311            OrderMode::StreamingOrdered => {
312                for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
313                    for evt in slot_buf.push_streaming(slot, idx, e) {
314                        let _ = queue.push(evt);
315                    }
316                }
317            }
318            OrderMode::MicroBatch => {
319                for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
320                    if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
321                        for evt in micro_buf.flush() { let _ = queue.push(evt); }
322                    }
323                }
324            }
325        }
326    }
327
328    #[inline]
329    fn handle_account(
330        acc: SubscribeUpdateAccount,
331        filter: &Option<EventTypeFilter>,
332        queue: &Arc<ArrayQueue<DexEvent>>,
333        grpc_us: i64,
334        block_us: i64,
335    ) {
336        let Some(info) = acc.account else { return };
337        let data = crate::accounts::AccountData {
338            pubkey: read_pubkey_fast(&info.pubkey),
339            executable: info.executable,
340            lamports: info.lamports,
341            owner: read_pubkey_fast(&info.owner),
342            rent_epoch: info.rent_epoch,
343            data: info.data,
344        };
345        let meta = EventMetadata {
346            signature: Default::default(),
347            slot: acc.slot,
348            tx_index: 0,
349            block_time_us: block_us,
350            grpc_recv_us: grpc_us,
351            recent_blockhash: None,
352        };
353        if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
354            let _ = queue.push(e);
355        }
356    }
357}
358
359// ==================== 辅助函数 ====================
360
361/// 获取当前时间戳(微秒)
362///
363/// 使用高性能时钟,避免系统调用开销
364///
365/// # 性能优势
366/// - 旧实现:使用 libc::clock_gettime,每次调用约 1-2μs
367/// - 新实现:使用高性能时钟,每次调用约 10-50ns
368/// - 性能提升:20-100 倍
369#[inline(always)]
370fn get_timestamp_us() -> i64 {
371    now_micros()
372}
373
374fn build_subscribe_request(tx_filters: &[TransactionFilter], acc_filters: &[AccountFilter]) -> SubscribeRequest {
375    let transactions = tx_filters.iter().enumerate().map(|(i, f)| {
376        (format!("tx_{}", i), SubscribeRequestFilterTransactions {
377            vote: Some(false),
378            failed: Some(false),
379            signature: None,
380            account_include: f.account_include.clone(),
381            account_exclude: f.account_exclude.clone(),
382            account_required: f.account_required.clone(),
383        })
384    }).collect();
385
386    let accounts = acc_filters.iter().enumerate().map(|(i, f)| {
387        (format!("acc_{}", i), SubscribeRequestFilterAccounts {
388            account: f.account.clone(),
389            owner: f.owner.clone(),
390            filters: f.filters.clone(),
391            nonempty_txn_signature: None,
392        })
393    }).collect();
394
395    SubscribeRequest {
396        slots: HashMap::new(),
397        accounts,
398        transactions,
399        transactions_status: HashMap::new(),
400        blocks: HashMap::new(),
401        blocks_meta: HashMap::new(),
402        entry: HashMap::new(),
403        commitment: Some(CommitmentLevel::Processed as i32),
404        accounts_data_slice: Vec::new(),
405        ping: None,
406        from_slot: None,
407    }
408}
409
410// ==================== 交易解析 ====================
411
412#[inline]
413fn parse_transaction_to_vec(
414    tx: &SubscribeUpdateTransaction,
415    grpc_us: i64,
416    block_us: Option<i64>,
417    filter: Option<&EventTypeFilter>,
418) -> Vec<(u64, DexEvent)> {
419    let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
420    parse_transaction_core(tx, grpc_us, block_us, filter)
421        .into_iter()
422        .map(|e| (idx, e))
423        .collect()
424}
425
426#[inline]
427fn parse_transaction_core(
428    tx: &SubscribeUpdateTransaction,
429    grpc_us: i64,
430    block_us: Option<i64>,
431    filter: Option<&EventTypeFilter>,
432) -> Vec<DexEvent> {
433    let Some(info) = &tx.transaction else { return Vec::new() };
434    let Some(meta) = &info.meta else { return Vec::new() };
435
436    let sig = extract_signature(&info.signature);
437    let slot = tx.slot;
438    let idx = info.index;
439
440    // 并行解析 logs 和 instructions
441    let (log_events, instr_events) = rayon::join(
442        || parse_logs(meta, &info.transaction, &meta.log_messages, sig, slot, idx, block_us, grpc_us, filter),
443        || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
444    );
445
446    let mut result = Vec::with_capacity(log_events.len() + instr_events.len());
447    result.extend(log_events);
448    result.extend(instr_events);
449    result
450}
451
452#[inline(always)]
453fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
454    let mut arr = [0u8; 64];
455    arr.copy_from_slice(bytes);
456    solana_sdk::signature::Signature::from(arr)
457}
458
459#[inline]
460fn parse_logs(
461    meta: &TransactionStatusMeta,
462    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
463    logs: &[String],
464    sig: solana_sdk::signature::Signature,
465    slot: u64,
466    tx_idx: u64,
467    block_us: Option<i64>,
468    grpc_us: i64,
469    filter: Option<&EventTypeFilter>,
470) -> Vec<DexEvent> {
471    let recent_blockhash = transaction
472        .as_ref()
473        .and_then(|t| t.message.as_ref())
474        .and_then(|m| {
475            if m.recent_blockhash.is_empty() {
476                None
477            } else {
478                Some(m.recent_blockhash.clone())
479            }
480        });
481
482    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
483    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
484
485    let mut outer_idx: i32 = -1;
486    let mut inner_idx: i32 = -1;
487    let mut invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::with_capacity(8);
488    let mut result = Vec::with_capacity(4);
489
490    for log in logs {
491        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
492            if depth == 1 { inner_idx = -1; outer_idx += 1; } else { inner_idx += 1; }
493            invokes.entry(pid).or_default().push((outer_idx, inner_idx));
494        }
495
496        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() { continue; }
497
498        if let Some(mut e) = crate::logs::parse_log(log, sig, slot, tx_idx, block_us, grpc_us, filter, has_create, recent_blockhash.as_deref()) {
499            crate::core::account_dispatcher::fill_accounts_from_transaction_data(&mut e, meta, transaction, &invokes);
500            crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
501            result.push(e);
502        }
503    }
504    result
505}
506
507#[inline]
508fn parse_instructions(
509    meta: &TransactionStatusMeta,
510    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
511    sig: solana_sdk::signature::Signature,
512    slot: u64,
513    tx_idx: u64,
514    block_us: Option<i64>,
515    grpc_us: i64,
516    filter: Option<&EventTypeFilter>,
517) -> Vec<DexEvent> {
518    // 使用增强的 instruction 解析器
519    // 支持:
520    // - 主指令解析(8字节 discriminator)
521    // - Inner instruction 解析(16字节 discriminator)
522    // - 自动事件合并(instruction + inner instruction)
523    crate::grpc::instruction_parser::parse_instructions_enhanced(
524        meta,
525        transaction,
526        sig,
527        slot,
528        tx_idx,
529        block_us,
530        grpc_us,
531        filter,
532    )
533}