Skip to main content

sol_parser_sdk/shredstream/
client.rs

1//! ShredStream 客户端
2//!
3//! `solana_entry::entry::Entry` 在 Agave SDK 中带 `deprecated`(需显式启用不稳定 feature 才消除);
4//! 本模块仍依赖其 bincode 布局解码 Shred 侧 `entries` 负载。
5#![allow(deprecated)]
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12use futures::StreamExt;
13use solana_entry::entry::Entry as SolanaEntry;
14use solana_sdk::message::VersionedMessage;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17use tonic::transport::{Channel, Endpoint};
18
19use crate::core::now_micros;
20use crate::grpc::types::EventTypeFilter;
21use crate::shredstream::config::ShredStreamConfig;
22use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
23use crate::DexEvent;
24
25static SHREDSTREAM_DROPPED_EVENTS: AtomicU64 = AtomicU64::new(0);
26
27enum EventSink<'a> {
28    Queue(&'a Arc<ArrayQueue<DexEvent>>),
29    Callback(&'a (dyn Fn(DexEvent) + Send + Sync)),
30}
31
32impl EventSink<'_> {
33    #[inline]
34    fn deliver(&self, event: DexEvent) {
35        match self {
36            EventSink::Queue(queue) => {
37                if queue.push(event).is_err() {
38                    record_shredstream_dropped_event();
39                }
40            }
41            EventSink::Callback(callback) => callback(event),
42        }
43    }
44}
45
46#[inline]
47fn record_shredstream_dropped_event() -> u64 {
48    let dropped = SHREDSTREAM_DROPPED_EVENTS.fetch_add(1, Ordering::Relaxed) + 1;
49    if dropped <= 10 || dropped.is_power_of_two() {
50        log::warn!(
51            target: "sol_parser_sdk::shredstream",
52            "ShredStream event queue is full; dropped event count={}",
53            dropped
54        );
55    }
56    dropped
57}
58
59/// ShredStream 客户端
60#[derive(Clone)]
61pub struct ShredStreamClient {
62    endpoint: String,
63    config: ShredStreamConfig,
64    subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
65}
66
67impl ShredStreamClient {
68    /// 创建新客户端
69    pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
70        Self::new_with_config(endpoint, ShredStreamConfig::default()).await
71    }
72
73    /// 使用自定义配置创建客户端
74    pub async fn new_with_config(
75        endpoint: impl Into<String>,
76        config: ShredStreamConfig,
77    ) -> crate::common::AnyResult<Self> {
78        let endpoint = endpoint.into();
79        // 测试连接
80        let _ = Self::connect_client(&endpoint, &config).await?;
81
82        Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
83    }
84
85    /// 订阅 DEX 事件(自动重连)
86    ///
87    /// 返回一个队列,事件会被推送到该队列中
88    pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
89        self.subscribe_with_filter(None).await
90    }
91
92    /// 订阅 DEX 事件,并在 ShredStream 热路径中按 SDK 事件类型提前过滤。
93    ///
94    /// 过滤发生在解析分发前,用于低延迟场景避免解析不需要的协议/事件。
95    pub async fn subscribe_with_filter(
96        &self,
97        event_type_filter: Option<EventTypeFilter>,
98    ) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
99        // 停止现有订阅
100        self.stop().await;
101
102        let queue = Arc::new(ArrayQueue::new(100_000));
103        let queue_clone = Arc::clone(&queue);
104
105        let endpoint = self.endpoint.clone();
106        let config = self.config.clone();
107
108        let handle = tokio::spawn(async move {
109            let mut delay = config.reconnect_delay_ms;
110            let mut attempts = 0u32;
111
112            loop {
113                if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
114                    log::error!("Max reconnection attempts reached, giving up");
115                    break;
116                }
117                attempts += 1;
118
119                match Self::stream_events(
120                    &endpoint,
121                    &config,
122                    event_type_filter.as_ref(),
123                    EventSink::Queue(&queue_clone),
124                )
125                .await
126                {
127                    Ok(_) => {
128                        delay = config.reconnect_delay_ms;
129                        attempts = 0;
130                    }
131                    Err(e) => {
132                        log::error!("ShredStream error: {} - retry in {}ms", e, delay);
133                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
134                        delay = (delay * 2).min(60_000);
135                    }
136                }
137            }
138        });
139
140        *self.subscription_handle.lock().await = Some(handle);
141        Ok(queue)
142    }
143
144    /// 订阅 DEX 事件,并在解析热路径中直接回调事件,避免跨任务队列调度。
145    ///
146    /// 这是最低延迟路径;回调会在 ShredStream 读流任务内执行,应避免阻塞 I/O 或重计算。
147    pub async fn subscribe_with_filter_callback<F>(
148        &self,
149        event_type_filter: Option<EventTypeFilter>,
150        callback: F,
151    ) -> crate::common::AnyResult<()>
152    where
153        F: Fn(DexEvent) + Send + Sync + 'static,
154    {
155        self.stop().await;
156
157        let endpoint = self.endpoint.clone();
158        let config = self.config.clone();
159        let callback = Arc::new(callback);
160
161        let handle = tokio::spawn(async move {
162            let mut delay = config.reconnect_delay_ms;
163            let mut attempts = 0u32;
164
165            loop {
166                if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
167                    log::error!("Max reconnection attempts reached, giving up");
168                    break;
169                }
170                attempts += 1;
171
172                match Self::stream_events_callback(
173                    &endpoint,
174                    &config,
175                    event_type_filter.as_ref(),
176                    callback.clone(),
177                )
178                .await
179                {
180                    Ok(_) => {
181                        delay = config.reconnect_delay_ms;
182                        attempts = 0;
183                    }
184                    Err(e) => {
185                        log::error!("ShredStream error: {} - retry in {}ms", e, delay);
186                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
187                        delay = (delay * 2).min(60_000);
188                    }
189                }
190            }
191        });
192
193        *self.subscription_handle.lock().await = Some(handle);
194        Ok(())
195    }
196
197    /// 停止订阅
198    pub async fn stop(&self) {
199        if let Some(handle) = self.subscription_handle.lock().await.take() {
200            handle.abort();
201        }
202    }
203
204    async fn connect_client(
205        endpoint: &str,
206        config: &ShredStreamConfig,
207    ) -> crate::common::AnyResult<ShredstreamProxyClient<Channel>> {
208        let mut builder = Endpoint::from_shared(endpoint.to_string())?;
209        if config.connection_timeout_ms > 0 {
210            builder = builder.connect_timeout(Duration::from_millis(config.connection_timeout_ms));
211        }
212        let channel = builder.connect().await?;
213        Ok(ShredstreamProxyClient::new(channel)
214            .max_decoding_message_size(config.max_decoding_message_size))
215    }
216
217    /// 核心事件流处理
218    async fn stream_events(
219        endpoint: &str,
220        config: &ShredStreamConfig,
221        event_type_filter: Option<&EventTypeFilter>,
222        sink: EventSink<'_>,
223    ) -> Result<(), String> {
224        let mut client = Self::connect_client(endpoint, config).await.map_err(|e| e.to_string())?;
225        let request = tonic::Request::new(SubscribeEntriesRequest {});
226        let response = if config.request_timeout_ms > 0 {
227            tokio::time::timeout(
228                Duration::from_millis(config.request_timeout_ms),
229                client.subscribe_entries(request),
230            )
231            .await
232            .map_err(|_| {
233                format!(
234                    "ShredStream subscribe request timed out after {}ms",
235                    config.request_timeout_ms
236                )
237            })?
238            .map_err(|e| e.to_string())?
239        } else {
240            client.subscribe_entries(request).await.map_err(|e| e.to_string())?
241        };
242        let mut stream = response.into_inner();
243
244        log::info!("ShredStream connected, receiving entries...");
245
246        let mut events = Vec::with_capacity(4);
247        while let Some(message) = stream.next().await {
248            match message {
249                Ok(entry) => {
250                    Self::process_entry(entry, event_type_filter, &sink, &mut events);
251                }
252                Err(e) => {
253                    log::error!("Stream error: {:?}", e);
254                    return Err(e.to_string());
255                }
256            }
257        }
258
259        Ok(())
260    }
261
262    async fn stream_events_callback(
263        endpoint: &str,
264        config: &ShredStreamConfig,
265        event_type_filter: Option<&EventTypeFilter>,
266        callback: Arc<dyn Fn(DexEvent) + Send + Sync>,
267    ) -> Result<(), String> {
268        Self::stream_events(
269            endpoint,
270            config,
271            event_type_filter,
272            EventSink::Callback(callback.as_ref()),
273        )
274        .await
275    }
276
277    /// 处理单个 Entry 消息
278    #[inline]
279    fn process_entry(
280        entry: Entry,
281        event_type_filter: Option<&EventTypeFilter>,
282        sink: &EventSink<'_>,
283        events: &mut Vec<DexEvent>,
284    ) {
285        let slot = entry.slot;
286        let recv_us = now_micros();
287
288        // 反序列化 Entry 数据
289        let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
290            Ok(e) => e,
291            Err(e) => {
292                log::debug!("Failed to deserialize entries: {}", e);
293                return;
294            }
295        };
296
297        // 处理每个 Entry 中的交易
298        let mut tx_index = 0u64;
299        for entry in entries {
300            for transaction in entry.transactions.iter() {
301                events.clear();
302                Self::process_transaction(
303                    transaction,
304                    slot,
305                    recv_us,
306                    tx_index,
307                    event_type_filter,
308                    events,
309                    sink,
310                );
311                tx_index += 1;
312            }
313        }
314    }
315
316    /// 处理单个交易
317    #[inline]
318    fn process_transaction(
319        transaction: &solana_sdk::transaction::VersionedTransaction,
320        slot: u64,
321        recv_us: i64,
322        tx_index: u64,
323        event_type_filter: Option<&EventTypeFilter>,
324        events: &mut Vec<DexEvent>,
325        sink: &EventSink<'_>,
326    ) {
327        if transaction.signatures.is_empty() {
328            return;
329        }
330
331        Self::parse_transaction_events(
332            transaction,
333            slot,
334            recv_us,
335            tx_index,
336            event_type_filter,
337            events,
338        );
339
340        for event in events.drain(..) {
341            sink.deliver(event);
342        }
343    }
344
345    #[inline]
346    fn parse_transaction_events(
347        transaction: &solana_sdk::transaction::VersionedTransaction,
348        slot: u64,
349        recv_us: i64,
350        tx_index: u64,
351        event_type_filter: Option<&EventTypeFilter>,
352        events: &mut Vec<DexEvent>,
353    ) {
354        if transaction.signatures.is_empty() {
355            return;
356        }
357
358        let signature = transaction.signatures[0];
359        if let VersionedMessage::V0(m) = &transaction.message {
360            if !m.address_table_lookups.is_empty() {
361                log::trace!(
362                    target: "sol_parser_sdk::shredstream",
363                    "V0 tx uses address lookup tables; shred parser will use static accounts and default placeholders for ALT-loaded accounts"
364                );
365            }
366        }
367        // 热路径:`static_account_keys` 零拷贝、`pump_ix` 内不克隆 CompiledInstruction。
368        super::pump_ix::parse_transaction_dex_events_with_filter(
369            transaction,
370            signature,
371            slot,
372            tx_index,
373            recv_us,
374            event_type_filter,
375            events,
376        );
377        crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(events);
378
379        for event in events.iter_mut() {
380            if let Some(meta) = event.metadata_mut() {
381                meta.grpc_recv_us = recv_us;
382            }
383        }
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::core::events::{EventMetadata, PumpFunCreateTokenEvent};
391    use crate::instr::program_ids::PUMPFUN_PROGRAM_ID;
392    use solana_sdk::hash::Hash;
393    use solana_sdk::message::{
394        compiled_instruction::CompiledInstruction, v0, MessageHeader, VersionedMessage,
395    };
396    use solana_sdk::pubkey::Pubkey;
397    use solana_sdk::signature::Signature;
398    use solana_sdk::transaction::VersionedTransaction;
399    use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
400
401    fn push_string(data: &mut Vec<u8>, value: &str) {
402        data.extend_from_slice(&(value.len() as u32).to_le_bytes());
403        data.extend_from_slice(value.as_bytes());
404    }
405
406    fn pumpfun_create_data() -> Vec<u8> {
407        let mut data = Vec::new();
408        data.extend_from_slice(&[24, 30, 200, 40, 5, 28, 7, 119]);
409        push_string(&mut data, "Callback Test");
410        push_string(&mut data, "CBT");
411        push_string(&mut data, "https://example.invalid/callback.json");
412        data.extend_from_slice(Pubkey::new_unique().as_ref());
413        data
414    }
415
416    fn pumpfun_create_tx() -> VersionedTransaction {
417        let mut account_keys = (0..10).map(|_| Pubkey::new_unique()).collect::<Vec<_>>();
418        account_keys.push(PUMPFUN_PROGRAM_ID);
419
420        VersionedTransaction {
421            signatures: vec![Signature::default()],
422            message: VersionedMessage::V0(v0::Message {
423                header: MessageHeader {
424                    num_required_signatures: 1,
425                    num_readonly_signed_accounts: 0,
426                    num_readonly_unsigned_accounts: 0,
427                },
428                account_keys,
429                recent_blockhash: Hash::default(),
430                instructions: vec![CompiledInstruction::new_from_raw_parts(
431                    10,
432                    pumpfun_create_data(),
433                    (0..10).collect(),
434                )],
435                address_table_lookups: Vec::new(),
436            }),
437        }
438    }
439
440    #[test]
441    fn dropped_counter_increments_without_panicking() {
442        let before = SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed);
443        let queue = ArrayQueue::new(1);
444
445        queue
446            .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
447                metadata: EventMetadata::default(),
448                ..Default::default()
449            }))
450            .expect("first push fits");
451
452        if queue
453            .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
454                metadata: EventMetadata::default(),
455                ..Default::default()
456            }))
457            .is_err()
458        {
459            record_shredstream_dropped_event();
460        }
461
462        assert!(SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed) > before);
463    }
464
465    #[test]
466    fn callback_path_delivers_events_without_queue() {
467        let entries = vec![SolanaEntry {
468            num_hashes: 1,
469            hash: Hash::default(),
470            transactions: vec![pumpfun_create_tx()],
471        }];
472        let entry = Entry { slot: 42, entries: bincode::serialize(&entries).unwrap() };
473        let count = AtomicUsize::new(0);
474
475        let mut events = Vec::with_capacity(4);
476        ShredStreamClient::process_entry(
477            entry,
478            None,
479            &EventSink::Callback(&|event| {
480                assert!(matches!(event, DexEvent::PumpFunCreate(_)));
481                assert_eq!(event.metadata().slot, 42);
482                count.fetch_add(1, AtomicOrdering::Relaxed);
483            }),
484            &mut events,
485        );
486
487        assert_eq!(count.load(AtomicOrdering::Relaxed), 1);
488    }
489}