Skip to main content

sol_parser_sdk/shredstream/
client.rs

1//! ShredStream 客户端
2//!
3//! `solana_entry::entry::Entry` 在 Agave SDK 中带 `deprecated`(需显式启用不稳定 feature 才消除);
4//! 本模块仍依赖其 bincode 布局解码 Shred 侧 `entries` 负载。
5#![allow(deprecated)]
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12use futures::StreamExt;
13use solana_entry::entry::Entry as SolanaEntry;
14use solana_sdk::message::VersionedMessage;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17use tonic::transport::{Channel, Endpoint};
18
19use crate::core::now_micros;
20use crate::grpc::types::EventTypeFilter;
21use crate::shredstream::config::ShredStreamConfig;
22use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
23use crate::DexEvent;
24
25static SHREDSTREAM_DROPPED_EVENTS: AtomicU64 = AtomicU64::new(0);
26
27#[inline]
28fn record_shredstream_dropped_event() -> u64 {
29    let dropped = SHREDSTREAM_DROPPED_EVENTS.fetch_add(1, Ordering::Relaxed) + 1;
30    if dropped <= 10 || dropped.is_power_of_two() {
31        log::warn!(
32            target: "sol_parser_sdk::shredstream",
33            "ShredStream event queue is full; dropped event count={}",
34            dropped
35        );
36    }
37    dropped
38}
39
40/// ShredStream 客户端
41#[derive(Clone)]
42pub struct ShredStreamClient {
43    endpoint: String,
44    config: ShredStreamConfig,
45    subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
46}
47
48impl ShredStreamClient {
49    /// 创建新客户端
50    pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
51        Self::new_with_config(endpoint, ShredStreamConfig::default()).await
52    }
53
54    /// 使用自定义配置创建客户端
55    pub async fn new_with_config(
56        endpoint: impl Into<String>,
57        config: ShredStreamConfig,
58    ) -> crate::common::AnyResult<Self> {
59        let endpoint = endpoint.into();
60        // 测试连接
61        let _ = Self::connect_client(&endpoint, &config).await?;
62
63        Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
64    }
65
66    /// 订阅 DEX 事件(自动重连)
67    ///
68    /// 返回一个队列,事件会被推送到该队列中
69    pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
70        self.subscribe_with_filter(None).await
71    }
72
73    /// 订阅 DEX 事件,并在 ShredStream 热路径中按 SDK 事件类型提前过滤。
74    ///
75    /// 过滤发生在解析分发前,用于低延迟场景避免解析不需要的协议/事件。
76    pub async fn subscribe_with_filter(
77        &self,
78        event_type_filter: Option<EventTypeFilter>,
79    ) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
80        // 停止现有订阅
81        self.stop().await;
82
83        let queue = Arc::new(ArrayQueue::new(100_000));
84        let queue_clone = Arc::clone(&queue);
85
86        let endpoint = self.endpoint.clone();
87        let config = self.config.clone();
88
89        let handle = tokio::spawn(async move {
90            let mut delay = config.reconnect_delay_ms;
91            let mut attempts = 0u32;
92
93            loop {
94                if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
95                    log::error!("Max reconnection attempts reached, giving up");
96                    break;
97                }
98                attempts += 1;
99
100                match Self::stream_events(
101                    &endpoint,
102                    &config,
103                    &queue_clone,
104                    event_type_filter.as_ref(),
105                )
106                .await
107                {
108                    Ok(_) => {
109                        delay = config.reconnect_delay_ms;
110                        attempts = 0;
111                    }
112                    Err(e) => {
113                        log::error!("ShredStream error: {} - retry in {}ms", e, delay);
114                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
115                        delay = (delay * 2).min(60_000);
116                    }
117                }
118            }
119        });
120
121        *self.subscription_handle.lock().await = Some(handle);
122        Ok(queue)
123    }
124
125    /// 停止订阅
126    pub async fn stop(&self) {
127        if let Some(handle) = self.subscription_handle.lock().await.take() {
128            handle.abort();
129        }
130    }
131
132    async fn connect_client(
133        endpoint: &str,
134        config: &ShredStreamConfig,
135    ) -> crate::common::AnyResult<ShredstreamProxyClient<Channel>> {
136        let mut builder = Endpoint::from_shared(endpoint.to_string())?;
137        if config.connection_timeout_ms > 0 {
138            builder = builder.connect_timeout(Duration::from_millis(config.connection_timeout_ms));
139        }
140        let channel = builder.connect().await?;
141        Ok(ShredstreamProxyClient::new(channel)
142            .max_decoding_message_size(config.max_decoding_message_size))
143    }
144
145    /// 核心事件流处理
146    async fn stream_events(
147        endpoint: &str,
148        config: &ShredStreamConfig,
149        queue: &Arc<ArrayQueue<DexEvent>>,
150        event_type_filter: Option<&EventTypeFilter>,
151    ) -> Result<(), String> {
152        let mut client = Self::connect_client(endpoint, config).await.map_err(|e| e.to_string())?;
153        let request = tonic::Request::new(SubscribeEntriesRequest {});
154        let response = if config.request_timeout_ms > 0 {
155            tokio::time::timeout(
156                Duration::from_millis(config.request_timeout_ms),
157                client.subscribe_entries(request),
158            )
159            .await
160            .map_err(|_| {
161                format!(
162                    "ShredStream subscribe request timed out after {}ms",
163                    config.request_timeout_ms
164                )
165            })?
166            .map_err(|e| e.to_string())?
167        } else {
168            client.subscribe_entries(request).await.map_err(|e| e.to_string())?
169        };
170        let mut stream = response.into_inner();
171
172        log::info!("ShredStream connected, receiving entries...");
173
174        while let Some(message) = stream.next().await {
175            match message {
176                Ok(entry) => {
177                    Self::process_entry(entry, queue, event_type_filter);
178                }
179                Err(e) => {
180                    log::error!("Stream error: {:?}", e);
181                    return Err(e.to_string());
182                }
183            }
184        }
185
186        Ok(())
187    }
188
189    /// 处理单个 Entry 消息
190    #[inline]
191    fn process_entry(
192        entry: Entry,
193        queue: &Arc<ArrayQueue<DexEvent>>,
194        event_type_filter: Option<&EventTypeFilter>,
195    ) {
196        let slot = entry.slot;
197        let recv_us = now_micros();
198
199        // 反序列化 Entry 数据
200        let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
201            Ok(e) => e,
202            Err(e) => {
203                log::debug!("Failed to deserialize entries: {}", e);
204                return;
205            }
206        };
207
208        // 处理每个 Entry 中的交易
209        let mut events = Vec::with_capacity(4);
210        let mut tx_index = 0u64;
211        for entry in entries {
212            for transaction in entry.transactions.iter() {
213                events.clear();
214                Self::process_transaction(
215                    transaction,
216                    slot,
217                    recv_us,
218                    tx_index,
219                    queue,
220                    event_type_filter,
221                    &mut events,
222                );
223                tx_index += 1;
224            }
225        }
226    }
227
228    /// 处理单个交易
229    #[inline]
230    fn process_transaction(
231        transaction: &solana_sdk::transaction::VersionedTransaction,
232        slot: u64,
233        recv_us: i64,
234        tx_index: u64,
235        queue: &Arc<ArrayQueue<DexEvent>>,
236        event_type_filter: Option<&EventTypeFilter>,
237        events: &mut Vec<DexEvent>,
238    ) {
239        if transaction.signatures.is_empty() {
240            return;
241        }
242
243        let signature = transaction.signatures[0];
244        if let VersionedMessage::V0(m) = &transaction.message {
245            if !m.address_table_lookups.is_empty() {
246                log::trace!(
247                    target: "sol_parser_sdk::shredstream",
248                    "V0 tx uses address lookup tables; shred parser will use static accounts and default placeholders for ALT-loaded accounts"
249                );
250            }
251        }
252        // 热路径:`static_account_keys` 零拷贝、`pump_ix` 内不克隆 CompiledInstruction。
253        super::pump_ix::parse_transaction_dex_events_with_filter(
254            transaction,
255            signature,
256            slot,
257            tx_index,
258            recv_us,
259            event_type_filter,
260            events,
261        );
262        crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(events);
263
264        for mut event in events.drain(..) {
265            if let Some(meta) = event.metadata_mut() {
266                meta.grpc_recv_us = recv_us;
267            }
268            if queue.push(event).is_err() {
269                record_shredstream_dropped_event();
270            }
271        }
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use crate::core::events::{EventMetadata, PumpFunCreateTokenEvent};
279
280    #[test]
281    fn dropped_counter_increments_without_panicking() {
282        let before = SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed);
283        let queue = ArrayQueue::new(1);
284
285        queue
286            .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
287                metadata: EventMetadata::default(),
288                ..Default::default()
289            }))
290            .expect("first push fits");
291
292        if queue
293            .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
294                metadata: EventMetadata::default(),
295                ..Default::default()
296            }))
297            .is_err()
298        {
299            record_shredstream_dropped_event();
300        }
301
302        assert!(SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed) >= before + 1);
303    }
304}