Skip to main content

sol_parser_sdk/grpc/
client.rs

1//! Yellowstone gRPC 客户端 - 超低延迟 DEX 事件订阅
2//!
3//! 支持多种事件输出模式:
4//! - Unordered: 10-20μs 极低延迟
5//! - MicroBatch: 50-200μs 微批次有序
6//! - StreamingOrdered: 0.1-5ms 流式有序
7//! - Ordered: 1-50ms 完全有序
8
9use super::buffers::{MicroBatchBuffer, SlotBuffer};
10use super::subscribe_builder::build_subscribe_request;
11use super::transaction_meta::try_yellowstone_signature;
12use super::types::*;
13use crate::core::{now_micros, EventMetadata}; // 导入高性能时钟
14use crate::instr::read_pubkey_fast;
15use crate::logs::timestamp_to_microseconds;
16use crate::DexEvent;
17use crossbeam_queue::ArrayQueue;
18use futures::{SinkExt, StreamExt};
19use log::error;
20use memchr::memmem;
21use once_cell::sync::Lazy;
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::{mpsc, Mutex};
25use tokio::time::{Duration, Instant};
26// Note: ClientTlsConfig moved to yellowstone_grpc_client in newer versions
27use yellowstone_grpc_client::{GeyserGrpcClient, ClientTlsConfig};
28use yellowstone_grpc_proto::prelude::*;
29
30static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
31    Lazy::new(|| memmem::Finder::new(b"Program data: "));
32
33// ==================== YellowstoneGrpc 客户端 ====================
34
35#[derive(Clone)]
36pub struct YellowstoneGrpc {
37    endpoint: String,
38    token: Option<String>,
39    config: ClientConfig,
40    control_tx: Arc<Mutex<Option<mpsc::Sender<SubscribeRequest>>>>,
41}
42
43impl YellowstoneGrpc {
44    pub fn new(
45        endpoint: String,
46        token: Option<String>,
47    ) -> Result<Self, Box<dyn std::error::Error>> {
48        crate::warmup::warmup_parser();
49        Ok(Self {
50            endpoint,
51            token,
52            config: ClientConfig::default(),
53            control_tx: Arc::new(Mutex::new(None)),
54        })
55    }
56
57    pub fn new_with_config(
58        endpoint: String,
59        token: Option<String>,
60        config: ClientConfig,
61    ) -> Result<Self, Box<dyn std::error::Error>> {
62        crate::warmup::warmup_parser();
63        Ok(Self { endpoint, token, config, control_tx: Arc::new(Mutex::new(None)) })
64    }
65
66    /// 订阅 DEX 事件(自动重连)
67    pub async fn subscribe_dex_events(
68        &self,
69        transaction_filters: Vec<TransactionFilter>,
70        account_filters: Vec<AccountFilter>,
71        event_type_filter: Option<EventTypeFilter>,
72    ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
73        let queue = Arc::new(ArrayQueue::new(100_000));
74        let queue_clone = Arc::clone(&queue);
75        let self_clone = self.clone();
76
77        tokio::spawn(async move {
78            let mut delay = 1u64;
79            loop {
80                match self_clone
81                    .stream_events(
82                        &transaction_filters,
83                        &account_filters,
84                        &event_type_filter,
85                        &queue_clone,
86                    )
87                    .await
88                {
89                    Ok(_) => delay = 1,
90                    Err(e) => println!("❌ gRPC error: {} - retry in {}s", e, delay),
91                }
92                tokio::time::sleep(Duration::from_secs(delay)).await;
93                delay = (delay * 2).min(60);
94            }
95        });
96
97        Ok(queue)
98    }
99
100    /// 动态更新订阅过滤器
101    pub async fn update_subscription(
102        &self,
103        transaction_filters: Vec<TransactionFilter>,
104        account_filters: Vec<AccountFilter>,
105    ) -> Result<(), Box<dyn std::error::Error>> {
106        let sender = self.control_tx.lock().await.as_ref().ok_or("No active subscription")?.clone();
107
108        let request = build_subscribe_request(&transaction_filters, &account_filters);
109        sender.send(request).await.map_err(|e| e.to_string())?;
110        Ok(())
111    }
112
113    pub async fn stop(&self) {
114        println!("🛑 Stopping gRPC subscription...");
115    }
116
117    // ==================== 核心事件流处理 ====================
118
119    async fn stream_events(
120        &self,
121        tx_filters: &[TransactionFilter],
122        acc_filters: &[AccountFilter],
123        event_filter: &Option<EventTypeFilter>,
124        queue: &Arc<ArrayQueue<DexEvent>>,
125    ) -> Result<(), String> {
126        let _ = rustls::crypto::ring::default_provider().install_default();
127
128        // 构建客户端
129        let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())
130            .map_err(|e| e.to_string())?
131            .x_token(self.token.clone())
132            .map_err(|e| e.to_string())?
133            .max_decoding_message_size(1024 * 1024 * 1024);
134
135        if self.config.connection_timeout_ms > 0 {
136            builder =
137                builder.connect_timeout(Duration::from_millis(self.config.connection_timeout_ms));
138        }
139        if self.config.enable_tls {
140            builder = builder
141                .tls_config(ClientTlsConfig::new().with_native_roots())
142                .map_err(|e| e.to_string())?;
143        }
144
145        let mut client = builder.connect().await.map_err(|e| e.to_string())?;
146        let request = build_subscribe_request(tx_filters, acc_filters);
147
148        let (subscribe_tx, mut stream) =
149            client.subscribe_with_request(Some(request)).await.map_err(|e| e.to_string())?;
150
151        self.print_mode_info();
152
153        // 设置控制通道
154        let (control_tx, mut control_rx) = mpsc::channel::<SubscribeRequest>(100);
155        *self.control_tx.lock().await = Some(control_tx);
156        let subscribe_tx = Arc::new(Mutex::new(subscribe_tx));
157
158        // 初始化缓冲区
159        let mut slot_buffer = SlotBuffer::new();
160        let mut micro_batch = MicroBatchBuffer::new();
161        let mut last_slot = 0u64;
162
163        let order_mode = self.config.order_mode;
164        let timeout_ms = self.config.order_timeout_ms;
165        let batch_us = self.config.micro_batch_us;
166        let check_interval = Duration::from_millis(timeout_ms / 2);
167        let mut next_check = Instant::now() + check_interval;
168
169        loop {
170            // Periodic timeout check for ordered modes and MicroBatch
171            self.check_timeout(
172                order_mode,
173                &mut slot_buffer,
174                &mut micro_batch,
175                queue,
176                timeout_ms,
177                batch_us,
178                &mut next_check,
179                check_interval,
180            );
181
182            tokio::select! {
183                msg = stream.next() => {
184                    match msg {
185                        Some(Ok(update)) => {
186                            self.handle_update(
187                                update, order_mode, event_filter, queue,
188                                &mut slot_buffer, &mut micro_batch, &mut last_slot, batch_us
189                            );
190                        }
191                        Some(Err(e)) => {
192                            error!("Stream error: {:?}", e);
193                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
194                            return Err(e.to_string());
195                        }
196                        None => {
197                            self.flush_on_disconnect(order_mode, &mut slot_buffer, queue);
198                            return Ok(());
199                        }
200                    }
201                }
202                Some(req) = control_rx.recv() => {
203                    if let Err(e) = subscribe_tx.lock().await.send(req).await {
204                        return Err(e.to_string());
205                    }
206                }
207            }
208        }
209    }
210
211    fn print_mode_info(&self) {
212        match self.config.order_mode {
213            OrderMode::Unordered => println!("✅ Unordered Mode (10-20μs)"),
214            OrderMode::Ordered => {
215                println!("✅ Ordered Mode (timeout={}ms)", self.config.order_timeout_ms)
216            }
217            OrderMode::StreamingOrdered => {
218                println!("✅ StreamingOrdered Mode (timeout={}ms)", self.config.order_timeout_ms)
219            }
220            OrderMode::MicroBatch => {
221                println!("✅ MicroBatch Mode (window={}μs)", self.config.micro_batch_us)
222            }
223        }
224    }
225
226    #[inline]
227    fn check_timeout(
228        &self,
229        mode: OrderMode,
230        slot_buf: &mut SlotBuffer,
231        micro_buf: &mut MicroBatchBuffer,
232        queue: &Arc<ArrayQueue<DexEvent>>,
233        timeout_ms: u64,
234        batch_us: u64,
235        next_check: &mut Instant,
236        interval: Duration,
237    ) {
238        if Instant::now() < *next_check {
239            return;
240        }
241        *next_check = Instant::now() + interval;
242
243        match mode {
244            OrderMode::Ordered => {
245                if slot_buf.should_timeout(timeout_ms) {
246                    for e in slot_buf.flush_all() {
247                        let _ = queue.push(e);
248                    }
249                }
250            }
251            OrderMode::StreamingOrdered => {
252                if slot_buf.should_timeout(timeout_ms) {
253                    for e in slot_buf.flush_streaming_timeout() {
254                        let _ = queue.push(e);
255                    }
256                }
257            }
258            OrderMode::MicroBatch => {
259                // Periodic flush for MicroBatch mode
260                let now_us = get_timestamp_us();
261                if micro_buf.should_flush(now_us, batch_us) {
262                    for e in micro_buf.flush() {
263                        let _ = queue.push(e);
264                    }
265                }
266            }
267            OrderMode::Unordered => {}
268        }
269    }
270
271    fn flush_on_disconnect(
272        &self,
273        mode: OrderMode,
274        buffer: &mut SlotBuffer,
275        queue: &Arc<ArrayQueue<DexEvent>>,
276    ) {
277        if matches!(mode, OrderMode::Ordered | OrderMode::StreamingOrdered) {
278            let events = match mode {
279                OrderMode::StreamingOrdered => buffer.flush_streaming_timeout(),
280                _ => buffer.flush_all(),
281            };
282            for e in events {
283                let _ = queue.push(e);
284            }
285        }
286    }
287
288    #[inline]
289    fn handle_update(
290        &self,
291        update_msg: SubscribeUpdate,
292        mode: OrderMode,
293        filter: &Option<EventTypeFilter>,
294        queue: &Arc<ArrayQueue<DexEvent>>,
295        slot_buf: &mut SlotBuffer,
296        micro_buf: &mut MicroBatchBuffer,
297        last_slot: &mut u64,
298        batch_us: u64,
299    ) {
300        let created_at = update_msg.created_at.unwrap_or_default();
301        let block_time_us = timestamp_to_microseconds(created_at.seconds, created_at.nanos) as i64;
302        let grpc_recv_us = get_timestamp_us();
303
304        let Some(update) = update_msg.update_oneof else { return };
305
306        match update {
307            subscribe_update::UpdateOneof::Transaction(tx) => {
308                self.handle_transaction(
309                    tx,
310                    mode,
311                    filter,
312                    queue,
313                    slot_buf,
314                    micro_buf,
315                    last_slot,
316                    batch_us,
317                    grpc_recv_us,
318                    block_time_us,
319                );
320            }
321            subscribe_update::UpdateOneof::Account(acc) => {
322                Self::handle_account(acc, filter, queue, grpc_recv_us, block_time_us);
323            }
324            _ => {}
325        }
326    }
327
328    #[inline]
329    fn handle_transaction(
330        &self,
331        tx: SubscribeUpdateTransaction,
332        mode: OrderMode,
333        filter: &Option<EventTypeFilter>,
334        queue: &Arc<ArrayQueue<DexEvent>>,
335        slot_buf: &mut SlotBuffer,
336        micro_buf: &mut MicroBatchBuffer,
337        last_slot: &mut u64,
338        batch_us: u64,
339        grpc_us: i64,
340        block_us: i64,
341    ) {
342        let slot = tx.slot;
343
344        match mode {
345            OrderMode::Unordered => {
346                for e in parse_transaction_core(&tx, grpc_us, Some(block_us), filter.as_ref()) {
347                    let _ = queue.push(e);
348                }
349            }
350            OrderMode::Ordered => {
351                if slot > *last_slot && *last_slot > 0 {
352                    for e in slot_buf.flush_before(slot) {
353                        let _ = queue.push(e);
354                    }
355                }
356                *last_slot = slot;
357                for (idx, e) in
358                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
359                {
360                    slot_buf.push(slot, idx, e);
361                }
362            }
363            OrderMode::StreamingOrdered => {
364                for (idx, e) in
365                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
366                {
367                    for evt in slot_buf.push_streaming(slot, idx, e) {
368                        let _ = queue.push(evt);
369                    }
370                }
371            }
372            OrderMode::MicroBatch => {
373                for (idx, e) in
374                    parse_transaction_to_vec(&tx, grpc_us, Some(block_us), filter.as_ref())
375                {
376                    if micro_buf.push(slot, idx, e, grpc_us, batch_us) {
377                        for evt in micro_buf.flush() {
378                            let _ = queue.push(evt);
379                        }
380                    }
381                }
382            }
383        }
384    }
385
386    #[inline]
387    fn handle_account(
388        acc: SubscribeUpdateAccount,
389        filter: &Option<EventTypeFilter>,
390        queue: &Arc<ArrayQueue<DexEvent>>,
391        grpc_us: i64,
392        block_us: i64,
393    ) {
394        let Some(info) = acc.account else { return };
395        let data = crate::accounts::AccountData {
396            pubkey: read_pubkey_fast(&info.pubkey),
397            executable: info.executable,
398            lamports: info.lamports,
399            owner: read_pubkey_fast(&info.owner),
400            rent_epoch: info.rent_epoch,
401            data: info.data,
402        };
403        let meta = EventMetadata {
404            signature: Default::default(),
405            slot: acc.slot,
406            tx_index: 0,
407            block_time_us: block_us,
408            grpc_recv_us: grpc_us,
409            recent_blockhash: None,
410        };
411        if let Some(e) = crate::accounts::parse_account_unified(&data, meta, filter.as_ref()) {
412            let _ = queue.push(e);
413        }
414    }
415}
416
417// ==================== 辅助函数 ====================
418
419/// 获取当前时间戳(微秒)
420///
421/// 使用高性能时钟,避免系统调用开销
422///
423/// # 性能优势
424/// - 旧实现:使用 libc::clock_gettime,每次调用约 1-2μs
425/// - 新实现:使用高性能时钟,每次调用约 10-50ns
426/// - 性能提升:20-100 倍
427#[inline(always)]
428fn get_timestamp_us() -> i64 {
429    now_micros()
430}
431
432// ==================== 交易解析 ====================
433
434#[inline]
435fn parse_transaction_to_vec(
436    tx: &SubscribeUpdateTransaction,
437    grpc_us: i64,
438    block_us: Option<i64>,
439    filter: Option<&EventTypeFilter>,
440) -> Vec<(u64, DexEvent)> {
441    let idx = tx.transaction.as_ref().map(|t| t.index).unwrap_or(0);
442    parse_transaction_core(tx, grpc_us, block_us, filter).into_iter().map(|e| (idx, e)).collect()
443}
444
445#[inline]
446fn parse_transaction_core(
447    tx: &SubscribeUpdateTransaction,
448    grpc_us: i64,
449    block_us: Option<i64>,
450    filter: Option<&EventTypeFilter>,
451) -> Vec<DexEvent> {
452    let Some(info) = &tx.transaction else { return Vec::new() };
453    let Some(meta) = &info.meta else { return Vec::new() };
454
455    let sig = extract_signature(&info.signature);
456    let slot = tx.slot;
457    let idx = info.index;
458
459    // 并行解析 logs 和 instructions
460    let (log_events, instr_events) = rayon::join(
461        || {
462            parse_logs(
463                meta,
464                &info.transaction,
465                &meta.log_messages,
466                sig,
467                slot,
468                idx,
469                block_us,
470                grpc_us,
471                filter,
472            )
473        },
474        || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
475    );
476
477    let mut result = Vec::with_capacity(log_events.len() + instr_events.len());
478    result.extend(log_events);
479    result.extend(instr_events);
480    result
481}
482
483#[inline(always)]
484fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
485    try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
486}
487
488#[inline]
489fn parse_logs(
490    meta: &TransactionStatusMeta,
491    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
492    logs: &[String],
493    sig: solana_sdk::signature::Signature,
494    slot: u64,
495    tx_idx: u64,
496    block_us: Option<i64>,
497    grpc_us: i64,
498    filter: Option<&EventTypeFilter>,
499) -> Vec<DexEvent> {
500    let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
501        if m.recent_blockhash.is_empty() {
502            None
503        } else {
504            Some(m.recent_blockhash.clone())
505        }
506    });
507
508    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
509    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
510
511    let mut outer_idx: i32 = -1;
512    let mut inner_idx: i32 = -1;
513    let mut invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::with_capacity(8);
514    let mut result = Vec::with_capacity(4);
515
516    for log in logs {
517        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
518            if depth == 1 {
519                inner_idx = -1;
520                outer_idx += 1;
521            } else {
522                inner_idx += 1;
523            }
524            invokes.entry(pid).or_default().push((outer_idx, inner_idx));
525        }
526
527        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_none() {
528            continue;
529        }
530
531        if let Some(mut e) = crate::logs::parse_log(
532            log,
533            sig,
534            slot,
535            tx_idx,
536            block_us,
537            grpc_us,
538            filter,
539            has_create,
540            recent_blockhash.as_deref(),
541        ) {
542            crate::core::account_dispatcher::fill_accounts_from_transaction_data(
543                &mut e,
544                meta,
545                transaction,
546                &invokes,
547            );
548            crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
549            result.push(e);
550        }
551    }
552    result
553}
554
555#[inline]
556fn parse_instructions(
557    meta: &TransactionStatusMeta,
558    transaction: &Option<yellowstone_grpc_proto::prelude::Transaction>,
559    sig: solana_sdk::signature::Signature,
560    slot: u64,
561    tx_idx: u64,
562    block_us: Option<i64>,
563    grpc_us: i64,
564    filter: Option<&EventTypeFilter>,
565) -> Vec<DexEvent> {
566    // 使用增强的 instruction 解析器
567    // 支持:
568    // - 主指令解析(8字节 discriminator)
569    // - Inner instruction 解析(16字节 discriminator)
570    // - 自动事件合并(instruction + inner instruction)
571    crate::grpc::instruction_parser::parse_instructions_enhanced(
572        meta,
573        transaction,
574        sig,
575        slot,
576        tx_idx,
577        block_us,
578        grpc_us,
579        filter,
580    )
581}