sol_parser_sdk/shredstream/
client.rs1#![allow(deprecated)]
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12use futures::StreamExt;
13use solana_entry::entry::Entry as SolanaEntry;
14use solana_sdk::message::VersionedMessage;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17use tonic::transport::{Channel, Endpoint};
18
19use crate::core::now_micros;
20use crate::grpc::types::EventTypeFilter;
21use crate::shredstream::config::ShredStreamConfig;
22use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
23use crate::DexEvent;
24
25static SHREDSTREAM_DROPPED_EVENTS: AtomicU64 = AtomicU64::new(0);
26
27#[inline]
28fn record_shredstream_dropped_event() -> u64 {
29 let dropped = SHREDSTREAM_DROPPED_EVENTS.fetch_add(1, Ordering::Relaxed) + 1;
30 if dropped <= 10 || dropped.is_power_of_two() {
31 log::warn!(
32 target: "sol_parser_sdk::shredstream",
33 "ShredStream event queue is full; dropped event count={}",
34 dropped
35 );
36 }
37 dropped
38}
39
40#[derive(Clone)]
42pub struct ShredStreamClient {
43 endpoint: String,
44 config: ShredStreamConfig,
45 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
46}
47
48impl ShredStreamClient {
49 pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
51 Self::new_with_config(endpoint, ShredStreamConfig::default()).await
52 }
53
54 pub async fn new_with_config(
56 endpoint: impl Into<String>,
57 config: ShredStreamConfig,
58 ) -> crate::common::AnyResult<Self> {
59 let endpoint = endpoint.into();
60 let _ = Self::connect_client(&endpoint, &config).await?;
62
63 Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
64 }
65
66 pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
70 self.subscribe_with_filter(None).await
71 }
72
73 pub async fn subscribe_with_filter(
77 &self,
78 event_type_filter: Option<EventTypeFilter>,
79 ) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
80 self.stop().await;
82
83 let queue = Arc::new(ArrayQueue::new(100_000));
84 let queue_clone = Arc::clone(&queue);
85
86 let endpoint = self.endpoint.clone();
87 let config = self.config.clone();
88
89 let handle = tokio::spawn(async move {
90 let mut delay = config.reconnect_delay_ms;
91 let mut attempts = 0u32;
92
93 loop {
94 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
95 log::error!("Max reconnection attempts reached, giving up");
96 break;
97 }
98 attempts += 1;
99
100 match Self::stream_events(
101 &endpoint,
102 &config,
103 &queue_clone,
104 event_type_filter.as_ref(),
105 )
106 .await
107 {
108 Ok(_) => {
109 delay = config.reconnect_delay_ms;
110 attempts = 0;
111 }
112 Err(e) => {
113 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
114 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
115 delay = (delay * 2).min(60_000);
116 }
117 }
118 }
119 });
120
121 *self.subscription_handle.lock().await = Some(handle);
122 Ok(queue)
123 }
124
125 pub async fn stop(&self) {
127 if let Some(handle) = self.subscription_handle.lock().await.take() {
128 handle.abort();
129 }
130 }
131
132 async fn connect_client(
133 endpoint: &str,
134 config: &ShredStreamConfig,
135 ) -> crate::common::AnyResult<ShredstreamProxyClient<Channel>> {
136 let mut builder = Endpoint::from_shared(endpoint.to_string())?;
137 if config.connection_timeout_ms > 0 {
138 builder = builder.connect_timeout(Duration::from_millis(config.connection_timeout_ms));
139 }
140 let channel = builder.connect().await?;
141 Ok(ShredstreamProxyClient::new(channel)
142 .max_decoding_message_size(config.max_decoding_message_size))
143 }
144
145 async fn stream_events(
147 endpoint: &str,
148 config: &ShredStreamConfig,
149 queue: &Arc<ArrayQueue<DexEvent>>,
150 event_type_filter: Option<&EventTypeFilter>,
151 ) -> Result<(), String> {
152 let mut client = Self::connect_client(endpoint, config).await.map_err(|e| e.to_string())?;
153 let request = tonic::Request::new(SubscribeEntriesRequest {});
154 let response = if config.request_timeout_ms > 0 {
155 tokio::time::timeout(
156 Duration::from_millis(config.request_timeout_ms),
157 client.subscribe_entries(request),
158 )
159 .await
160 .map_err(|_| {
161 format!(
162 "ShredStream subscribe request timed out after {}ms",
163 config.request_timeout_ms
164 )
165 })?
166 .map_err(|e| e.to_string())?
167 } else {
168 client.subscribe_entries(request).await.map_err(|e| e.to_string())?
169 };
170 let mut stream = response.into_inner();
171
172 log::info!("ShredStream connected, receiving entries...");
173
174 while let Some(message) = stream.next().await {
175 match message {
176 Ok(entry) => {
177 Self::process_entry(entry, queue, event_type_filter);
178 }
179 Err(e) => {
180 log::error!("Stream error: {:?}", e);
181 return Err(e.to_string());
182 }
183 }
184 }
185
186 Ok(())
187 }
188
189 #[inline]
191 fn process_entry(
192 entry: Entry,
193 queue: &Arc<ArrayQueue<DexEvent>>,
194 event_type_filter: Option<&EventTypeFilter>,
195 ) {
196 let slot = entry.slot;
197 let recv_us = now_micros();
198
199 let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
201 Ok(e) => e,
202 Err(e) => {
203 log::debug!("Failed to deserialize entries: {}", e);
204 return;
205 }
206 };
207
208 let mut events = Vec::with_capacity(4);
210 let mut tx_index = 0u64;
211 for entry in entries {
212 for transaction in entry.transactions.iter() {
213 events.clear();
214 Self::process_transaction(
215 transaction,
216 slot,
217 recv_us,
218 tx_index,
219 queue,
220 event_type_filter,
221 &mut events,
222 );
223 tx_index += 1;
224 }
225 }
226 }
227
228 #[inline]
230 fn process_transaction(
231 transaction: &solana_sdk::transaction::VersionedTransaction,
232 slot: u64,
233 recv_us: i64,
234 tx_index: u64,
235 queue: &Arc<ArrayQueue<DexEvent>>,
236 event_type_filter: Option<&EventTypeFilter>,
237 events: &mut Vec<DexEvent>,
238 ) {
239 if transaction.signatures.is_empty() {
240 return;
241 }
242
243 let signature = transaction.signatures[0];
244 if let VersionedMessage::V0(m) = &transaction.message {
245 if !m.address_table_lookups.is_empty() {
246 log::trace!(
247 target: "sol_parser_sdk::shredstream",
248 "V0 tx uses address lookup tables; shred parser will use static accounts and default placeholders for ALT-loaded accounts"
249 );
250 }
251 }
252 super::pump_ix::parse_transaction_dex_events_with_filter(
254 transaction,
255 signature,
256 slot,
257 tx_index,
258 recv_us,
259 event_type_filter,
260 events,
261 );
262 crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(events);
263
264 for mut event in events.drain(..) {
265 if let Some(meta) = event.metadata_mut() {
266 meta.grpc_recv_us = recv_us;
267 }
268 if queue.push(event).is_err() {
269 record_shredstream_dropped_event();
270 }
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use crate::core::events::{EventMetadata, PumpFunCreateTokenEvent};
279
280 #[test]
281 fn dropped_counter_increments_without_panicking() {
282 let before = SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed);
283 let queue = ArrayQueue::new(1);
284
285 queue
286 .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
287 metadata: EventMetadata::default(),
288 ..Default::default()
289 }))
290 .expect("first push fits");
291
292 if queue
293 .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
294 metadata: EventMetadata::default(),
295 ..Default::default()
296 }))
297 .is_err()
298 {
299 record_shredstream_dropped_event();
300 }
301
302 assert!(SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed) >= before + 1);
303 }
304}