Skip to main content

sol_parser_sdk/shredstream/
client.rs

1//! ShredStream 客户端
2//!
3//! `solana_entry::entry::Entry` 在 Agave SDK 中带 `deprecated`(需显式启用不稳定 feature 才消除);
4//! 本模块仍依赖其 bincode 布局解码 Shred 侧 `entries` 负载。
5#![allow(deprecated)]
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use crossbeam_queue::ArrayQueue;
11use futures::StreamExt;
12use solana_entry::entry::Entry as SolanaEntry;
13use solana_sdk::message::VersionedMessage;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16use tonic::transport::{Channel, Endpoint};
17
18use crate::core::now_micros;
19use crate::grpc::types::EventTypeFilter;
20use crate::shredstream::config::ShredStreamConfig;
21use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
22use crate::DexEvent;
23
24/// ShredStream 客户端
25#[derive(Clone)]
26pub struct ShredStreamClient {
27    endpoint: String,
28    config: ShredStreamConfig,
29    subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
30}
31
32impl ShredStreamClient {
33    /// 创建新客户端
34    pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
35        Self::new_with_config(endpoint, ShredStreamConfig::default()).await
36    }
37
38    /// 使用自定义配置创建客户端
39    pub async fn new_with_config(
40        endpoint: impl Into<String>,
41        config: ShredStreamConfig,
42    ) -> crate::common::AnyResult<Self> {
43        let endpoint = endpoint.into();
44        // 测试连接
45        let _ = Self::connect_client(&endpoint, &config).await?;
46
47        Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
48    }
49
50    /// 订阅 DEX 事件(自动重连)
51    ///
52    /// 返回一个队列,事件会被推送到该队列中
53    pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
54        self.subscribe_with_filter(None).await
55    }
56
57    /// 订阅 DEX 事件,并在 ShredStream 热路径中按 SDK 事件类型提前过滤。
58    ///
59    /// 过滤发生在解析分发前,用于低延迟场景避免解析不需要的协议/事件。
60    pub async fn subscribe_with_filter(
61        &self,
62        event_type_filter: Option<EventTypeFilter>,
63    ) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
64        // 停止现有订阅
65        self.stop().await;
66
67        let queue = Arc::new(ArrayQueue::new(100_000));
68        let queue_clone = Arc::clone(&queue);
69
70        let endpoint = self.endpoint.clone();
71        let config = self.config.clone();
72
73        let handle = tokio::spawn(async move {
74            let mut delay = config.reconnect_delay_ms;
75            let mut attempts = 0u32;
76
77            loop {
78                if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
79                    log::error!("Max reconnection attempts reached, giving up");
80                    break;
81                }
82                attempts += 1;
83
84                match Self::stream_events(
85                    &endpoint,
86                    &config,
87                    &queue_clone,
88                    event_type_filter.as_ref(),
89                )
90                .await
91                {
92                    Ok(_) => {
93                        delay = config.reconnect_delay_ms;
94                        attempts = 0;
95                    }
96                    Err(e) => {
97                        log::error!("ShredStream error: {} - retry in {}ms", e, delay);
98                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
99                        delay = (delay * 2).min(60_000);
100                    }
101                }
102            }
103        });
104
105        *self.subscription_handle.lock().await = Some(handle);
106        Ok(queue)
107    }
108
109    /// 停止订阅
110    pub async fn stop(&self) {
111        if let Some(handle) = self.subscription_handle.lock().await.take() {
112            handle.abort();
113        }
114    }
115
116    async fn connect_client(
117        endpoint: &str,
118        config: &ShredStreamConfig,
119    ) -> crate::common::AnyResult<ShredstreamProxyClient<Channel>> {
120        let mut builder = Endpoint::from_shared(endpoint.to_string())?;
121        if config.connection_timeout_ms > 0 {
122            builder = builder.connect_timeout(Duration::from_millis(config.connection_timeout_ms));
123        }
124        let channel = builder.connect().await?;
125        Ok(ShredstreamProxyClient::new(channel)
126            .max_decoding_message_size(config.max_decoding_message_size))
127    }
128
129    /// 核心事件流处理
130    async fn stream_events(
131        endpoint: &str,
132        config: &ShredStreamConfig,
133        queue: &Arc<ArrayQueue<DexEvent>>,
134        event_type_filter: Option<&EventTypeFilter>,
135    ) -> Result<(), String> {
136        let mut client = Self::connect_client(endpoint, config).await.map_err(|e| e.to_string())?;
137        let request = tonic::Request::new(SubscribeEntriesRequest {});
138        let response = if config.request_timeout_ms > 0 {
139            tokio::time::timeout(
140                Duration::from_millis(config.request_timeout_ms),
141                client.subscribe_entries(request),
142            )
143            .await
144            .map_err(|_| {
145                format!(
146                    "ShredStream subscribe request timed out after {}ms",
147                    config.request_timeout_ms
148                )
149            })?
150            .map_err(|e| e.to_string())?
151        } else {
152            client.subscribe_entries(request).await.map_err(|e| e.to_string())?
153        };
154        let mut stream = response.into_inner();
155
156        log::info!("ShredStream connected, receiving entries...");
157
158        while let Some(message) = stream.next().await {
159            match message {
160                Ok(entry) => {
161                    Self::process_entry(entry, queue, event_type_filter);
162                }
163                Err(e) => {
164                    log::error!("Stream error: {:?}", e);
165                    return Err(e.to_string());
166                }
167            }
168        }
169
170        Ok(())
171    }
172
173    /// 处理单个 Entry 消息
174    #[inline]
175    fn process_entry(
176        entry: Entry,
177        queue: &Arc<ArrayQueue<DexEvent>>,
178        event_type_filter: Option<&EventTypeFilter>,
179    ) {
180        let slot = entry.slot;
181        let recv_us = now_micros();
182
183        // 反序列化 Entry 数据
184        let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
185            Ok(e) => e,
186            Err(e) => {
187                log::debug!("Failed to deserialize entries: {}", e);
188                return;
189            }
190        };
191
192        // 处理每个 Entry 中的交易
193        let mut events = Vec::with_capacity(4);
194        let mut tx_index = 0u64;
195        for entry in entries {
196            for transaction in entry.transactions.iter() {
197                events.clear();
198                Self::process_transaction(
199                    transaction,
200                    slot,
201                    recv_us,
202                    tx_index,
203                    queue,
204                    event_type_filter,
205                    &mut events,
206                );
207                tx_index += 1;
208            }
209        }
210    }
211
212    /// 处理单个交易
213    #[inline]
214    fn process_transaction(
215        transaction: &solana_sdk::transaction::VersionedTransaction,
216        slot: u64,
217        recv_us: i64,
218        tx_index: u64,
219        queue: &Arc<ArrayQueue<DexEvent>>,
220        event_type_filter: Option<&EventTypeFilter>,
221        events: &mut Vec<DexEvent>,
222    ) {
223        if transaction.signatures.is_empty() {
224            return;
225        }
226
227        let signature = transaction.signatures[0];
228        if let VersionedMessage::V0(m) = &transaction.message {
229            if !m.address_table_lookups.is_empty() {
230                log::debug!(
231                    target: "sol_parser_sdk::shredstream",
232                    "V0 tx uses address lookup tables; only static keys are available — \
233                     some instruction account indices may resolve to wrong pubkeys (often only 1 BUY gets is_created_buy)"
234                );
235            }
236        }
237        // 热路径:`static_account_keys` 零拷贝、`pump_ix` 内不克隆 CompiledInstruction。
238        super::pump_ix::parse_transaction_dex_events_with_filter(
239            transaction,
240            signature,
241            slot,
242            tx_index,
243            recv_us,
244            event_type_filter,
245            events,
246        );
247        crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(events);
248
249        for mut event in events.drain(..) {
250            if let Some(meta) = event.metadata_mut() {
251                meta.grpc_recv_us = recv_us;
252            }
253            let _ = queue.push(event);
254        }
255    }
256}