sol_parser_sdk/grpc/
client.rs

1use super::types::*;
2use crate::DexEvent;
3use yellowstone_grpc_client::GeyserGrpcClient;
4use yellowstone_grpc_proto::prelude::*;
5use solana_sdk::pubkey::Pubkey;
6use std::collections::HashMap;
7use futures::StreamExt;
8use log::error;
9use tonic::transport::ClientTlsConfig;
10use crossbeam_queue::ArrayQueue;
11use memchr::memmem;
12use std::sync::Arc;
13use once_cell::sync::Lazy;
14
15static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> = Lazy::new(|| memmem::Finder::new(b"Program data: "));
16
17
18#[derive(Clone)]
19pub struct YellowstoneGrpc {
20    endpoint: String,
21    token: Option<String>,
22    config: ClientConfig,
23}
24
25impl YellowstoneGrpc {
26    pub fn new(endpoint: String, token: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
27        Ok(Self {
28            endpoint,
29            token,
30            config: ClientConfig::default(),
31        })
32    }
33
34    pub fn new_with_config(
35        endpoint: String,
36        token: Option<String>,
37        config: ClientConfig,
38    ) -> Result<Self, Box<dyn std::error::Error>> {
39        Ok(Self {
40            endpoint,
41            token,
42            config,
43        })
44    }
45
46    /// 订阅DEX事件(零拷贝无锁队列)
47    pub async fn subscribe_dex_events(
48        &self,
49        transaction_filters: Vec<TransactionFilter>,
50        account_filters: Vec<AccountFilter>,
51        event_type_filter: Option<EventTypeFilter>,
52    ) -> Result<Arc<ArrayQueue<DexEvent>>, Box<dyn std::error::Error>> {
53        let queue = Arc::new(ArrayQueue::new(100_000));
54        let queue_clone = Arc::clone(&queue);
55
56        let self_clone = self.clone();
57        tokio::spawn(async move {
58            let _ = self_clone.stream_to_queue(
59                transaction_filters,
60                account_filters,
61                event_type_filter,
62                queue_clone,
63            ).await;
64        });
65
66        Ok(queue)
67    }
68
69    pub async fn stop(&self) {
70        println!("🛑 Stopping gRPC subscription...");
71    }
72    async fn stream_to_queue(
73        &self,
74        transaction_filters: Vec<TransactionFilter>,
75        account_filters: Vec<AccountFilter>,
76        event_type_filter: Option<EventTypeFilter>,
77        queue: Arc<ArrayQueue<DexEvent>>,
78    ) -> Result<(), Box<dyn std::error::Error>> {
79        println!("🚀 Starting Zero-Copy DEX event subscription...");
80
81        let _ = rustls::crypto::ring::default_provider().install_default();
82
83        let mut builder = GeyserGrpcClient::build_from_shared(self.endpoint.clone())?
84            .x_token(self.token.clone())?
85            .max_decoding_message_size(1024 * 1024 * 1024);
86
87        if self.config.connection_timeout_ms > 0 {
88            builder = builder.connect_timeout(std::time::Duration::from_millis(self.config.connection_timeout_ms));
89        }
90
91        // 添加 TLS 配置
92        if self.config.enable_tls {
93            let tls_config = ClientTlsConfig::new().with_native_roots();
94            builder = builder.tls_config(tls_config)?;
95        }
96
97        println!("🔗 Connecting to gRPC endpoint: {}", self.endpoint);
98        println!("⏱️  Connection timeout: {}ms", self.config.connection_timeout_ms);
99
100        let mut client = match builder.connect().await {
101            Ok(c) => {
102                println!("✅ Connection established");
103                c
104            },
105            Err(e) => {
106                println!("❌ Connection failed: {:?}", e);
107                return Err(e.into());
108            }
109        };
110        println!("✅ Connected to Yellowstone gRPC");
111
112        println!("📝 Building subscription filters...");
113        let mut accounts: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
114        for (i, filter) in account_filters.iter().enumerate() {
115            let key = format!("account_filter_{}", i);
116            accounts.insert(key, SubscribeRequestFilterAccounts {
117                account: filter.account.clone(),
118                owner: filter.owner.clone(),
119                filters: vec![],
120                nonempty_txn_signature: None,
121            });
122        }
123
124        let mut transactions: HashMap<String, SubscribeRequestFilterTransactions> = HashMap::new();
125        for (i, filter) in transaction_filters.iter().enumerate() {
126            let key = format!("transaction_filter_{}", i);
127            transactions.insert(key, SubscribeRequestFilterTransactions {
128                vote: Some(false),
129                failed: Some(false),
130                signature: None,
131                account_include: filter.account_include.clone(),
132                account_exclude: filter.account_exclude.clone(),
133                account_required: filter.account_required.clone(),
134            });
135        }
136
137        let request = SubscribeRequest {
138            slots: HashMap::new(),
139            accounts,
140            transactions,
141            transactions_status: HashMap::new(),
142            blocks: HashMap::new(),
143            blocks_meta: HashMap::new(),
144            entry: HashMap::new(),
145            commitment: Some(CommitmentLevel::Processed as i32),
146            accounts_data_slice: Vec::new(),
147            ping: None,
148            from_slot: None,
149        };
150
151        println!("📡 Subscribing to stream...");
152        let (_subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?;
153        println!("✅ Subscribed successfully - Zero Copy Mode");
154        println!("👂 Listening for events...");
155
156        let mut msg_count = 0u64;
157        while let Some(message) = stream.next().await {
158            match message {
159                Ok(update_msg) => {
160                    msg_count += 1;
161                    if msg_count % 100 == 0 {
162                        println!("📨 Received {} messages", msg_count);
163                    }
164
165                    if let Some(update) = update_msg.update_oneof {
166                        if let subscribe_update::UpdateOneof::Transaction(transaction_update) = update {
167                            let grpc_recv_us = unsafe {
168                                let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 };
169                                libc::clock_gettime(libc::CLOCK_REALTIME, &mut ts);
170                                (ts.tv_sec as i64) * 1_000_000 + (ts.tv_nsec as i64) / 1_000
171                            };
172                            Self::parse_transaction(&transaction_update, grpc_recv_us, &queue, event_type_filter.as_ref()).await;
173                        }
174                    }
175                },
176                Err(e) => {
177                    error!("Stream error: {:?}", e);
178                    println!("❌ Stream error: {:?}", e);
179                },
180            }
181        }
182
183        println!("⚠️  Stream ended");
184
185        Ok(())
186    }
187
188    /// 解析交易事件
189    async fn parse_transaction(
190        transaction_update: &SubscribeUpdateTransaction,
191        grpc_recv_us: i64,
192        queue: &Arc<ArrayQueue<DexEvent>>,
193        event_type_filter: Option<&EventTypeFilter>,
194    ) {
195        if let Some(transaction_info) = &transaction_update.transaction {
196            // 从 transaction_info.index 获取交易索引
197            let tx_index = transaction_info.index;
198            
199            if let Some(meta) = &transaction_info.meta {
200                let logs = &meta.log_messages;
201
202                if let Some(tx_msg) = &transaction_info.transaction {
203                    if let Some(message) = &tx_msg.message {
204                        let mut accounts = Vec::with_capacity(message.account_keys.len());
205                        for key in &message.account_keys {
206                            if key.len() == 32 {
207                                let mut pubkey_bytes = [0u8; 32];
208                                pubkey_bytes.copy_from_slice(key);
209                                accounts.push(Pubkey::new_from_array(pubkey_bytes));
210                            }
211                        }
212
213                        let signature = if let Some(sig) = tx_msg.signatures.first() {
214                            if sig.len() == 64 {
215                                let mut sig_array = [0u8; 64];
216                                sig_array.copy_from_slice(sig);
217                                solana_sdk::signature::Signature::from(sig_array)
218                            } else {
219                                solana_sdk::signature::Signature::default()
220                            }
221                        } else {
222                            solana_sdk::signature::Signature::default()
223                        };
224
225                        let block_time = Some(chrono::Utc::now().timestamp());
226                        let mut log_events_parsed = false;
227
228                        for instruction in &message.instructions {
229                            let program_id_index = instruction.program_id_index as usize;
230                            if program_id_index < accounts.len() {
231                                let _program_id = accounts[program_id_index];
232
233                                Self::parse_events(
234                                    &accounts,
235                                    logs,
236                                    signature,
237                                    transaction_update.slot,
238                                    tx_index,
239                                    block_time,
240                                    grpc_recv_us,
241                                    queue,
242                                    &mut log_events_parsed,
243                                    event_type_filter,
244                                );
245                            }
246                        }
247                    }
248                }
249            }
250        }
251    }
252
253    /// 解析日志事件到队列
254    #[inline]
255    fn parse_events(
256        _accounts: &[Pubkey],
257        logs: &[String],
258        signature: solana_sdk::signature::Signature,
259        slot: u64,
260        tx_index: u64,
261        block_time: Option<i64>,
262        grpc_recv_us: i64,
263        queue: &Arc<ArrayQueue<DexEvent>>,
264        log_events_parsed: &mut bool,
265        event_type_filter: Option<&EventTypeFilter>,
266    ) {
267        if !*log_events_parsed {
268            let has_create = event_type_filter
269                .map(|f| f.includes_pumpfun())
270                .unwrap_or(true)
271                && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
272
273            for log in logs.iter() {
274                let log_bytes = log.as_bytes();
275
276                if PROGRAM_DATA_FINDER.find(log_bytes).is_none() {
277                    continue;
278                }
279
280                if let Some(log_event) = crate::logs::parse_log(log, signature, slot, tx_index, block_time, grpc_recv_us, event_type_filter, has_create) {
281                    let _ = queue.push(log_event);
282                    *log_events_parsed = true;
283                    return;
284                }
285            }
286
287            *log_events_parsed = true;
288        }
289    }
290}