Skip to main content

sol_parser_sdk/shredstream/
client.rs

1//! ShredStream 客户端
2//!
3//! `solana_entry::entry::Entry` 在 Agave SDK 中带 `deprecated`(需显式启用不稳定 feature 才消除);
4//! 本模块仍依赖其 bincode 布局解码 Shred 侧 `entries` 负载。
5#![allow(deprecated)]
6
7use std::sync::Arc;
8
9use crossbeam_queue::ArrayQueue;
10use futures::StreamExt;
11use solana_entry::entry::Entry as SolanaEntry;
12use solana_sdk::message::VersionedMessage;
13use tokio::sync::Mutex;
14use tokio::task::JoinHandle;
15
16use crate::core::now_micros;
17use crate::shredstream::config::ShredStreamConfig;
18use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
19use crate::DexEvent;
20
21/// ShredStream 客户端
22#[derive(Clone)]
23pub struct ShredStreamClient {
24    endpoint: String,
25    config: ShredStreamConfig,
26    subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
27}
28
29impl ShredStreamClient {
30    /// 创建新客户端
31    pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
32        Self::new_with_config(endpoint, ShredStreamConfig::default()).await
33    }
34
35    /// 使用自定义配置创建客户端
36    pub async fn new_with_config(
37        endpoint: impl Into<String>,
38        config: ShredStreamConfig,
39    ) -> crate::common::AnyResult<Self> {
40        let endpoint = endpoint.into();
41        // 测试连接
42        let _ = ShredstreamProxyClient::connect(endpoint.clone()).await?;
43
44        Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
45    }
46
47    /// 订阅 DEX 事件(自动重连)
48    ///
49    /// 返回一个队列,事件会被推送到该队列中
50    pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
51        // 停止现有订阅
52        self.stop().await;
53
54        let queue = Arc::new(ArrayQueue::new(100_000));
55        let queue_clone = Arc::clone(&queue);
56
57        let endpoint = self.endpoint.clone();
58        let config = self.config.clone();
59
60        let handle = tokio::spawn(async move {
61            let mut delay = config.reconnect_delay_ms;
62            let mut attempts = 0u32;
63
64            loop {
65                if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
66                    log::error!("Max reconnection attempts reached, giving up");
67                    break;
68                }
69                attempts += 1;
70
71                match Self::stream_events(&endpoint, &queue_clone).await {
72                    Ok(_) => {
73                        delay = config.reconnect_delay_ms;
74                        attempts = 0;
75                    }
76                    Err(e) => {
77                        log::error!("ShredStream error: {} - retry in {}ms", e, delay);
78                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
79                        delay = (delay * 2).min(60_000);
80                    }
81                }
82            }
83        });
84
85        *self.subscription_handle.lock().await = Some(handle);
86        Ok(queue)
87    }
88
89    /// 停止订阅
90    pub async fn stop(&self) {
91        if let Some(handle) = self.subscription_handle.lock().await.take() {
92            handle.abort();
93        }
94    }
95
96    /// 核心事件流处理
97    async fn stream_events(
98        endpoint: &str,
99        queue: &Arc<ArrayQueue<DexEvent>>,
100    ) -> Result<(), String> {
101        let mut client = ShredstreamProxyClient::connect(endpoint.to_string())
102            .await
103            .map_err(|e| e.to_string())?;
104        let request = tonic::Request::new(SubscribeEntriesRequest {});
105        let mut stream =
106            client.subscribe_entries(request).await.map_err(|e| e.to_string())?.into_inner();
107
108        log::info!("ShredStream connected, receiving entries...");
109
110        while let Some(message) = stream.next().await {
111            match message {
112                Ok(entry) => {
113                    Self::process_entry(entry, queue);
114                }
115                Err(e) => {
116                    log::error!("Stream error: {:?}", e);
117                    return Err(e.to_string());
118                }
119            }
120        }
121
122        Ok(())
123    }
124
125    /// 处理单个 Entry 消息
126    #[inline]
127    fn process_entry(entry: Entry, queue: &Arc<ArrayQueue<DexEvent>>) {
128        let slot = entry.slot;
129        let recv_us = now_micros();
130
131        // 反序列化 Entry 数据
132        let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
133            Ok(e) => e,
134            Err(e) => {
135                log::debug!("Failed to deserialize entries: {}", e);
136                return;
137            }
138        };
139
140        // 处理每个 Entry 中的交易
141        for entry in entries {
142            for (tx_index, transaction) in entry.transactions.iter().enumerate() {
143                Self::process_transaction(transaction, slot, recv_us, tx_index as u64, queue);
144            }
145        }
146    }
147
148    /// 处理单个交易
149    #[inline]
150    fn process_transaction(
151        transaction: &solana_sdk::transaction::VersionedTransaction,
152        slot: u64,
153        recv_us: i64,
154        tx_index: u64,
155        queue: &Arc<ArrayQueue<DexEvent>>,
156    ) {
157        if transaction.signatures.is_empty() {
158            return;
159        }
160
161        let signature = transaction.signatures[0];
162        if let VersionedMessage::V0(m) = &transaction.message {
163            if !m.address_table_lookups.is_empty() {
164                log::debug!(
165                    target: "sol_parser_sdk::shredstream",
166                    "V0 tx uses address lookup tables; only static keys are available — \
167                     some instruction account indices may resolve to wrong pubkeys (often only 1 BUY gets is_created_buy)"
168                );
169            }
170        }
171        // 热路径:`static_account_keys` 零拷贝、`pump_ix` 内不克隆 CompiledInstruction。
172        let mut events = Vec::new();
173        super::pump_ix::parse_transaction_pump_events(
174            transaction,
175            signature,
176            slot,
177            tx_index,
178            recv_us,
179            &mut events,
180        );
181        crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(&mut events);
182
183        for mut event in events {
184            if let Some(meta) = event.metadata_mut() {
185                meta.grpc_recv_us = recv_us;
186            }
187            let _ = queue.push(event);
188        }
189    }
190}