Skip to main content

sol_parser_sdk/shredstream/
client.rs

1//! ShredStream 客户端
2
3use std::sync::Arc;
4
5use crossbeam_queue::ArrayQueue;
6use futures::StreamExt;
7use solana_entry::entry::Entry as SolanaEntry;
8use solana_sdk::message::VersionedMessage;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11
12use crate::core::now_micros;
13use crate::shredstream::config::ShredStreamConfig;
14use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
15use crate::DexEvent;
16
17/// ShredStream 客户端
18#[derive(Clone)]
19pub struct ShredStreamClient {
20    endpoint: String,
21    config: ShredStreamConfig,
22    subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
23}
24
25impl ShredStreamClient {
26    /// 创建新客户端
27    pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
28        Self::new_with_config(endpoint, ShredStreamConfig::default()).await
29    }
30
31    /// 使用自定义配置创建客户端
32    pub async fn new_with_config(
33        endpoint: impl Into<String>,
34        config: ShredStreamConfig,
35    ) -> crate::common::AnyResult<Self> {
36        let endpoint = endpoint.into();
37        // 测试连接
38        let _ = ShredstreamProxyClient::connect(endpoint.clone()).await?;
39
40        Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
41    }
42
43    /// 订阅 DEX 事件(自动重连)
44    ///
45    /// 返回一个队列,事件会被推送到该队列中
46    pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
47        // 停止现有订阅
48        self.stop().await;
49
50        let queue = Arc::new(ArrayQueue::new(100_000));
51        let queue_clone = Arc::clone(&queue);
52
53        let endpoint = self.endpoint.clone();
54        let config = self.config.clone();
55
56        let handle = tokio::spawn(async move {
57            let mut delay = config.reconnect_delay_ms;
58            let mut attempts = 0u32;
59
60            loop {
61                if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
62                    log::error!("Max reconnection attempts reached, giving up");
63                    break;
64                }
65                attempts += 1;
66
67                match Self::stream_events(&endpoint, &queue_clone).await {
68                    Ok(_) => {
69                        delay = config.reconnect_delay_ms;
70                        attempts = 0;
71                    }
72                    Err(e) => {
73                        log::error!("ShredStream error: {} - retry in {}ms", e, delay);
74                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
75                        delay = (delay * 2).min(60_000);
76                    }
77                }
78            }
79        });
80
81        *self.subscription_handle.lock().await = Some(handle);
82        Ok(queue)
83    }
84
85    /// 停止订阅
86    pub async fn stop(&self) {
87        if let Some(handle) = self.subscription_handle.lock().await.take() {
88            handle.abort();
89        }
90    }
91
92    /// 核心事件流处理
93    async fn stream_events(
94        endpoint: &str,
95        queue: &Arc<ArrayQueue<DexEvent>>,
96    ) -> Result<(), String> {
97        let mut client = ShredstreamProxyClient::connect(endpoint.to_string())
98            .await
99            .map_err(|e| e.to_string())?;
100        let request = tonic::Request::new(SubscribeEntriesRequest {});
101        let mut stream =
102            client.subscribe_entries(request).await.map_err(|e| e.to_string())?.into_inner();
103
104        log::info!("ShredStream connected, receiving entries...");
105
106        while let Some(message) = stream.next().await {
107            match message {
108                Ok(entry) => {
109                    Self::process_entry(entry, queue);
110                }
111                Err(e) => {
112                    log::error!("Stream error: {:?}", e);
113                    return Err(e.to_string());
114                }
115            }
116        }
117
118        Ok(())
119    }
120
121    /// 处理单个 Entry 消息
122    #[inline]
123    fn process_entry(entry: Entry, queue: &Arc<ArrayQueue<DexEvent>>) {
124        let slot = entry.slot;
125        let recv_us = now_micros();
126
127        // 反序列化 Entry 数据
128        let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
129            Ok(e) => e,
130            Err(e) => {
131                log::debug!("Failed to deserialize entries: {}", e);
132                return;
133            }
134        };
135
136        // 处理每个 Entry 中的交易
137        for entry in entries {
138            for (tx_index, transaction) in entry.transactions.iter().enumerate() {
139                Self::process_transaction(transaction, slot, recv_us, tx_index as u64, queue);
140            }
141        }
142    }
143
144    /// 处理单个交易
145    #[inline]
146    fn process_transaction(
147        transaction: &solana_sdk::transaction::VersionedTransaction,
148        slot: u64,
149        recv_us: i64,
150        tx_index: u64,
151        queue: &Arc<ArrayQueue<DexEvent>>,
152    ) {
153        if transaction.signatures.is_empty() {
154            return;
155        }
156
157        let signature = transaction.signatures[0];
158        if let VersionedMessage::V0(m) = &transaction.message {
159            if !m.address_table_lookups.is_empty() {
160                log::debug!(
161                    target: "sol_parser_sdk::shredstream",
162                    "V0 tx uses address lookup tables; only static keys are available — \
163                     some instruction account indices may resolve to wrong pubkeys (often only 1 BUY gets is_created_buy)"
164                );
165            }
166        }
167        // 热路径:`static_account_keys` 零拷贝、`pump_ix` 内不克隆 CompiledInstruction。
168        let mut events = Vec::new();
169        super::pump_ix::parse_transaction_pump_events(
170            transaction,
171            signature,
172            slot,
173            tx_index,
174            recv_us,
175            &mut events,
176        );
177        crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(&mut events);
178
179        for mut event in events {
180            if let Some(meta) = event.metadata_mut() {
181                meta.grpc_recv_us = recv_us;
182            }
183            let _ = queue.push(event);
184        }
185    }
186}