Skip to main content

sol_parser_sdk/grpc/
client.rs

1//! Yellowstone gRPC 客户端 - 超低延迟 DEX 事件订阅
2//!
3//! 支持多种事件输出模式:
4//! - Unordered: 10-20μs 极低延迟
5//! - MicroBatch: 50-200μs 微批次有序
6//! - StreamingOrdered: 0.1-5ms 流式有序
7//! - Ordered: 1-50ms 完全有序
8
9use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::types::*;
11use crate::core::{EventMetadata, now_micros};  // 导入高性能时钟
12use crate::instr::read_pubkey_fast;
13use crate::logs::timestamp_to_microseconds;
14use crate::DexEvent;
15use crossbeam_queue::ArrayQueue;
16use futures::{SinkExt, StreamExt};
17use log::error;
18use memchr::memmem;
19use once_cell::sync::Lazy;
20use std::collections::HashMap;
21use std::sync::Arc;
22use tokio::sync::{mpsc, Mutex};
23use tokio::time::{Duration, Instant};
24use tonic::transport::ClientTlsConfig;
25use yellowstone_grpc_client::GeyserGrpcClient;
26use yellowstone_grpc_proto::prelude::*;
27
28static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
29    Lazy::new(|| memmem::Finder::new(b"Program data: "));
30
31// ==================== YellowstoneGrpc 客户端 ====================
32
33#[derive(Clone)]
34pub struct YellowstoneGrpc {
35    endpoint: String,
36    token: Option<String>,
37    config: ClientConfig,
38    control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
39}
40
41impl YellowstoneGrpc {
42    pub fn new(endpoint: String, token: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
43        crate::warmup::warmup_parser();
44        Ok(Self {
45            endpoint,
46            token,
47            config: ClientConfig::default(),
48            control_tx: Arc::new(Mutex::new(None)),
49        })
50    }
51
52    pub fn new_with_config(
53        endpoint: String,
54        token: Option<String>,
55        config: ClientConfig,
56    ) -> Result<Self, Box<dyn std::error::Error>> {
57        crate::warmup::warmup_parser();
58        Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
59    }
60
61    /// 订阅 DEX 事件(自动重连)
62    pub async fn subscribe_dex_events(
63        &self,
64        transaction_filters: Vec<TransactionFilter>,
65        account_filters: Vec<AccountFilter>,
66        event_type_filter: Option<EventTypeFilter>,
67    ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
68        let queue = Arc::new(ArrayQueue::new(100_000));
69        let queue_clone = Arc::clone(&queue);
70        let self_clone = self.clone();
71
72        tokio::spawn(async move {
73            let mut delay = 1u64;
74            loop {
75                match self_clone.stream_events(&transaction_filters, &account_filters, &event_type_filter, &queue_clone).await {
76                    Ok(_) => delay = 1,
77                    Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
78                }
79                tokio::time::sleep(Duration::from_secs(delay)).await;
80                delay = (delay * 2).min(60);
81            }
82        });
83
84        Ok(queue)
85    }
86
87    /// 动态更新订阅过滤器
88    pub async fn update_subscription(
89        &self,
90        transaction_filters: Vec<TransactionFilter>,
91        account_filters: Vec<AccountFilter>,
92    ) -> Result<(), Box<dyn std::error::Error>> {
93        let sender = self.control_tx.lock().await
94            .as_ref()
95            .ok_or("No active subscription")?
96            .clone();
97        
98        let request = build_subscribe_request(&transaction_filters, &account_filters);
99        sender.send(request).await.map_err(|e| e.to_string())?;
100        Ok(())
101    }
102
103    pub async fn stop(&self) {
104        println!("🛑 Stopping gRPC subscription...");
105    }
106
107    // ==================== 核心事件流处理 ====================
108
109    async fn stream_events(
110        &self,
111        tx_filters: &[TransactionFilter],
112        acc_filters: &[AccountFilter],
113        event_filter: &Option<EventTypeFilter>,
114        queue: &Arc<ArrayQueue<DexEvent>>,
115    ) -> Result<(), String> {
116        let _ = rustls::crypto::ring::default_provider().install_default();
117        
118        // 构建客户端
119        let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
120            .map_err(|e| e.to_string())?
121            .x_token(self.token.clone())
122            .map_err(|e| e.to_string())?
123            .max_decoding_message_size(1024 * 1024 * 1024);
124
125        if self.config.connection_timeout_ms > 0 {
126            builder = builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
127        }
128        if self.config.enable_tls {
129            builder = builder.tls_config(ClientTlsConfig::new().with_native_roots()).map_err(|e| e.to_string())?;
130        }
131
132        let mut client = builder.connect().await.map_err(|e| e.to_string())?;
133        let request = build_subscribe_request(tx_filters, acc_filters);
134        
135        let (subscribe_tx, mut stream) = client
136            .subscribe_with_request(Some(request))
137            .await
138            .map_err(|e| e.to_string())?;
139
140        self.print_mode_info();
141
142        // 设置控制通道
143        let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
144        *self.control_tx.lock().await = Some(control_tx);
145        let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
146
147        // 初始化缓冲区
148        let mut slot_buffer = SlotBuffer::new();
149        let mut micro_batch = MicroBatchBuffer::new();
150        let mut last_slot = 0u64;
151
152        let order_mode = self.config.order_mode;
153        let timeout_ms = self.config.order_timeout_ms;
154        let batch_us = self.config.micro_batch_us;
155        let check_interval = Duration::from_millis(timeout_ms / 2);
156        let mut next_check = Instant::now() + check_interval;
157
158        loop {
159            // Periodic timeout check for ordered modes and MicroBatch
160            self.check_timeout(
161                order_mode, &mut slot_buffer, &mut micro_batch, queue,
162                timeout_ms, batch_us, &mut next_check, check_interval
163            );
164
165            tokio::select! {
166                msg = stream.next() => {
167                    match msg {
168                        Some(Ok(update)) => {
169                            self.handle_update(
170                                update, order_mode, event_filter, queue,
171                                &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
172                            );
173                        }
174                        Some(Err(e)) => {
175                            error!("Stream error: {:?}", e);
176                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
177                            return Err(e.to_string());
178                        }
179                        None => {
180                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
181                            return Ok(());
182                        }
183                    }
184                }
185                Some(req) = control_rx.recv() => {
186                    if let Err(e) = subscribe_tx.lock().await.send(req).await {
187                        return Err(e.to_string());
188                    }
189                }
190            }
191        }
192    }
193
194    fn print_mode_info(&self) {
195        match self.config.order_mode {
196            OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
197            OrderMode::Ordered => println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms),
198            OrderMode::StreamingOrdered => println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms),
199            OrderMode::MicroBatch => println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us),
200        }
201    }
202
203    #[inline]
204    fn check_timeout(
205        &self,
206        mode: OrderMode,
207        slot_buf: &mut SlotBuffer,
208        micro_buf: &mut MicroBatchBuffer,
209        queue: &Arc<ArrayQueue<DexEvent>>,
210        timeout_ms: u64,
211        batch_us: u64,
212        next_check: &mut Instant,
213        interval: Duration,
214    ) {
215        if Instant::now() < *next_check {
216            return;
217        }
218        *next_check = Instant::now() + interval;
219        
220        match mode {
221            OrderMode::Ordered => {
222                if slot_buf.should_timeout(timeout_ms) {
223                    for e in slot_buf.flush_all() { let _ = queue.push(e); }
224                }
225            }
226            OrderMode::StreamingOrdered => {
227                if slot_buf.should_timeout(timeout_ms) {
228                    for e in slot_buf.flush_streaming_timeout() { let _ = queue.push(e); }
229                }
230            }
231            OrderMode::MicroBatch => {
232                // Periodic flush for MicroBatch mode
233                let now_us = get_timestamp_us();
234                if micro_buf.should_flush(now_us, batch_us) {
235                    for e in micro_buf.flush() { let _ = queue.push(e); }
236                }
237            }
238            OrderMode::Unordered => {}
239        }
240    }
241
242    fn flush_on_disconnect(&self, mode: OrderMode, buffer: &mut SlotBuffer, queue: &Arc<ArrayQueue<DexEvent>>) {
243        if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
244            let events = match mode {
245                OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
246                _ => buffer.flush_all(),
247            };
248            for e in events { let _ = queue.push(e); }
249        }
250    }
251
252    #[inline]
253    fn handle_update(
254        &self,
255        update_msg: SubscribeUpdate,
256        mode: OrderMode,
257        filter: &Option<EventTypeFilter>,
258        queue: &Arc<ArrayQueue<DexEvent>>,
259        slot_buf: &mut SlotBuffer,
260        micro_buf: &mut MicroBatchBuffer,
261        last_slot: &mut u64,
262        batch_us: u64,
263    ) {
264        let block_time_us = timestamp_to_microseconds(&update_msg.created_at.unwrap_or_default()) as i64;
265        let grpc_recv_us = get_timestamp_us();
266
267        let Some(update) = update_msg.update_oneof else { return };
268
269        match update {
270            subscribe_update::UpdateOneof::Transaction(tx) => {
271                self.handle_transaction(tx, mode, filter, queue, slot_buf, micro_buf, last_slot, batch_us, grpc_recv_us, block_time_us);
272            }
273            subscribe_update::UpdateOneof::Account(acc) => {
274                Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
275            }
276            _ => {}
277        }
278    }
279
280    #[inline]
281    fn handle_transaction(
282        &self,
283        tx: SubscribeUpdateTransaction,
284        mode: OrderMode,
285        filter: &Option<EventTypeFilter>,
286        queue: &Arc<ArrayQueue<DexEvent>>,
287        slot_buf: &mut SlotBuffer,
288        micro_buf: &mut MicroBatchBuffer,
289        last_slot: &mut u64,
290        batch_us: u64,
291        grpc_us: i64,
292        block_us: i64,
293    ) {
294        let slot = tx.slot;
295        
296        match mode {
297            OrderMode::Unordered => {
298                for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
299                    let _ = queue.push(e);
300                }
301            }
302            OrderMode::Ordered => {
303                if slot > *last_slot && *last_slot > 0 {
304                    for e in slot_buf.flush_before(slot) { let _ = queue.push(e); }
305                }
306                *last_slot = slot;
307                for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
308                    slot_buf.push(slot, idx, e);
309                }
310            }
311            OrderMode::StreamingOrdered => {
312                for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
313                    for evt in slot_buf.push_streaming(slot, idx, e) {
314                        let _ = queue.push(evt);
315                    }
316                }
317            }
318            OrderMode::MicroBatch => {
319                for (idx, e) in parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref()) {
320                    if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
321                        for evt in micro_buf.flush() { let _ = queue.push(evt); }
322                    }
323                }
324            }
325        }
326    }
327
328    #[inline]
329    fn handle_account(
330        acc: SubscribeUpdateAccount,
331        filter: &Option<EventTypeFilter>,
332        queue: &Arc<ArrayQueue<DexEvent>>,
333        grpc_us: i64,
334        block_us: i64,
335    ) {
336        let Some(info) = acc.account else { return };
337        let data = crate::accounts::AccountData {
338            pubkey: read_pubkey_fast(&info.pubkey),
339            executable: info.executable,
340            lamports: info.lamports,
341            owner: read_pubkey_fast(&info.owner),
342            rent_epoch: info.rent_epoch,
343            data: info.data,
344        };
345        let meta = EventMetadata {
346            signature: Default::default(),
347            slot: acc.slot,
348            tx_index: 0,
349            block_time_us: block_us,
350            grpc_recv_us: grpc_us,
351        };
352        if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
353            let _ = queue.push(e);
354        }
355    }
356}
357
358// ==================== 辅助函数 ====================
359
360/// 获取当前时间戳(微秒)
361///
362/// 使用高性能时钟,避免系统调用开销
363///
364/// # 性能优势
365/// - 旧实现:使用 libc::clock_gettime,每次调用约 1-2μs
366/// - 新实现:使用高性能时钟,每次调用约 10-50ns
367/// - 性能提升:20-100 倍
368#[inline(always)]
369fn get_timestamp_us() -> i64 {
370    now_micros()
371}
372
373fn build_subscribe_request(tx_filters: &[TransactionFilter], acc_filters: &[AccountFilter]) -> SubscribeRequest {
374    let transactions = tx_filters.iter().enumerate().map(|(i, f)| {
375        (format!("tx_{}", i), SubscribeRequestFilterTransactions {
376            vote: Some(false),
377            failed: Some(false),
378            signature: None,
379            account_include: f.account_include.clone(),
380            account_exclude: f.account_exclude.clone(),
381            account_required: f.account_required.clone(),
382        })
383    }).collect();
384
385    let accounts = acc_filters.iter().enumerate().map(|(i, f)| {
386        (format!("acc_{}", i), SubscribeRequestFilterAccounts {
387            account: f.account.clone(),
388            owner: f.owner.clone(),
389            filters: f.filters.clone(),
390            nonempty_txn_signature: None,
391        })
392    }).collect();
393
394    SubscribeRequest {
395        slots: HashMap::new(),
396        accounts,
397        transactions,
398        transactions_status: HashMap::new(),
399        blocks: HashMap::new(),
400        blocks_meta: HashMap::new(),
401        entry: HashMap::new(),
402        commitment: Some(CommitmentLevel::Processed as i32),
403        accounts_data_slice: Vec::new(),
404        ping: None,
405        from_slot: None,
406    }
407}
408
409// ==================== 交易解析 ====================
410
411#[inline]
412fn parse_transaction_to_vec(
413    tx: &SubscribeUpdateTransaction,
414    grpc_us: i64,
415    block_us: Option<i64>,
416    filter: Option<&EventTypeFilter>,
417) -> Vec<(u64, DexEvent)> {
418    let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
419    parse_transaction_core(tx, grpc_us, block_us, filter)
420        .into_iter()
421        .map(|e| (idx, e))
422        .collect()
423}
424
425#[inline]
426fn parse_transaction_core(
427    tx: &SubscribeUpdateTransaction,
428    grpc_us: i64,
429    block_us: Option<i64>,
430    filter: Option<&EventTypeFilter>,
431) -> Vec<DexEvent> {
432    let Some(info) = &tx.transaction else { return Vec::new() };
433    let Some(meta) = &info.meta else { return Vec::new() };
434
435    let sig = extract_signature(&info.signature);
436    let slot = tx.slot;
437    let idx = info.index;
438
439    // 并行解析 logs 和 instructions
440    let (log_events, instr_events) = rayon::join(
441        || parse_logs(meta, &info.transaction, &meta.log_messages, sig, slot, idx, block_us, grpc_us, filter),
442        || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
443    );
444
445    let mut result = Vec::with_capacity(log_events.len() + instr_events.len());
446    result.extend(log_events);
447    result.extend(instr_events);
448    result
449}
450
451#[inline(always)]
452fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
453    let mut arr = [0u8; 64];
454    arr.copy_from_slice(bytes);
455    solana_sdk::signature::Signature::from(arr)
456}
457
458#[inline]
459fn parse_logs(
460    meta: &TransactionStatusMeta,
461    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
462    logs: &[String],
463    sig: solana_sdk::signature::Signature,
464    slot: u64,
465    tx_idx: u64,
466    block_us: Option<i64>,
467    grpc_us: i64,
468    filter: Option<&EventTypeFilter>,
469) -> Vec<DexEvent> {
470    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
471    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
472
473    let mut outer_idx: i32 = -1;
474    let mut inner_idx: i32 = -1;
475    let mut invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::with_capacity(8);
476    let mut result = Vec::with_capacity(4);
477
478    for log in logs {
479        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
480            if depth == 1 { inner_idx = -1; outer_idx += 1; } else { inner_idx += 1; }
481            invokes.entry(pid).or_default().push((outer_idx, inner_idx));
482        }
483
484        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() { continue; }
485
486        if let Some(mut e) = crate::logs::parse_log(log, sig, slot, tx_idx, block_us, grpc_us, filter, has_create) {
487            crate::core::account_dispatcher::fill_accounts_from_transaction_data(&mut e, meta, transaction, &invokes);
488            crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
489            result.push(e);
490        }
491    }
492    result
493}
494
495#[inline]
496fn parse_instructions(
497    meta: &TransactionStatusMeta,
498    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
499    sig: solana_sdk::signature::Signature,
500    slot: u64,
501    tx_idx: u64,
502    block_us: Option<i64>,
503    grpc_us: i64,
504    filter: Option<&EventTypeFilter>,
505) -> Vec<DexEvent> {
506    // 使用增强的 instruction 解析器
507    // 支持:
508    // - 主指令解析(8字节 discriminator)
509    // - Inner instruction 解析(16字节 discriminator)
510    // - 自动事件合并(instruction + inner instruction)
511    crate::grpc::instruction_parser::parse_instructions_enhanced(
512        meta,
513        transaction,
514        sig,
515        slot,
516        tx_idx,
517        block_us,
518        grpc_us,
519        filter,
520    )
521}