sol_parser_sdk/shredstream/
client.rs1#![allow(deprecated)]
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use crossbeam_queue::ArrayQueue;
11use futures::StreamExt;
12use solana_entry::entry::Entry as SolanaEntry;
13use solana_sdk::message::VersionedMessage;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16use tonic::transport::{Channel, Endpoint};
17
18use crate::core::now_micros;
19use crate::grpc::types::EventTypeFilter;
20use crate::shredstream::config::ShredStreamConfig;
21use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
22use crate::DexEvent;
23
24#[derive(Clone)]
26pub struct ShredStreamClient {
27 endpoint: String,
28 config: ShredStreamConfig,
29 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
30}
31
32impl ShredStreamClient {
33 pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
35 Self::new_with_config(endpoint, ShredStreamConfig::default()).await
36 }
37
38 pub async fn new_with_config(
40 endpoint: impl Into<String>,
41 config: ShredStreamConfig,
42 ) -> crate::common::AnyResult<Self> {
43 let endpoint = endpoint.into();
44 let _ = Self::connect_client(&endpoint, &config).await?;
46
47 Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
48 }
49
50 pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
54 self.subscribe_with_filter(None).await
55 }
56
57 pub async fn subscribe_with_filter(
61 &self,
62 event_type_filter: Option<EventTypeFilter>,
63 ) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
64 self.stop().await;
66
67 let queue = Arc::new(ArrayQueue::new(100_000));
68 let queue_clone = Arc::clone(&queue);
69
70 let endpoint = self.endpoint.clone();
71 let config = self.config.clone();
72
73 let handle = tokio::spawn(async move {
74 let mut delay = config.reconnect_delay_ms;
75 let mut attempts = 0u32;
76
77 loop {
78 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
79 log::error!("Max reconnection attempts reached, giving up");
80 break;
81 }
82 attempts += 1;
83
84 match Self::stream_events(
85 &endpoint,
86 &config,
87 &queue_clone,
88 event_type_filter.as_ref(),
89 )
90 .await
91 {
92 Ok(_) => {
93 delay = config.reconnect_delay_ms;
94 attempts = 0;
95 }
96 Err(e) => {
97 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
98 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
99 delay = (delay * 2).min(60_000);
100 }
101 }
102 }
103 });
104
105 *self.subscription_handle.lock().await = Some(handle);
106 Ok(queue)
107 }
108
109 pub async fn stop(&self) {
111 if let Some(handle) = self.subscription_handle.lock().await.take() {
112 handle.abort();
113 }
114 }
115
116 async fn connect_client(
117 endpoint: &str,
118 config: &ShredStreamConfig,
119 ) -> crate::common::AnyResult<ShredstreamProxyClient<Channel>> {
120 let mut builder = Endpoint::from_shared(endpoint.to_string())?;
121 if config.connection_timeout_ms > 0 {
122 builder = builder.connect_timeout(Duration::from_millis(config.connection_timeout_ms));
123 }
124 let channel = builder.connect().await?;
125 Ok(ShredstreamProxyClient::new(channel)
126 .max_decoding_message_size(config.max_decoding_message_size))
127 }
128
129 async fn stream_events(
131 endpoint: &str,
132 config: &ShredStreamConfig,
133 queue: &Arc<ArrayQueue<DexEvent>>,
134 event_type_filter: Option<&EventTypeFilter>,
135 ) -> Result<(), String> {
136 let mut client = Self::connect_client(endpoint, config).await.map_err(|e| e.to_string())?;
137 let request = tonic::Request::new(SubscribeEntriesRequest {});
138 let response = if config.request_timeout_ms > 0 {
139 tokio::time::timeout(
140 Duration::from_millis(config.request_timeout_ms),
141 client.subscribe_entries(request),
142 )
143 .await
144 .map_err(|_| {
145 format!(
146 "ShredStream subscribe request timed out after {}ms",
147 config.request_timeout_ms
148 )
149 })?
150 .map_err(|e| e.to_string())?
151 } else {
152 client.subscribe_entries(request).await.map_err(|e| e.to_string())?
153 };
154 let mut stream = response.into_inner();
155
156 log::info!("ShredStream connected, receiving entries...");
157
158 while let Some(message) = stream.next().await {
159 match message {
160 Ok(entry) => {
161 Self::process_entry(entry, queue, event_type_filter);
162 }
163 Err(e) => {
164 log::error!("Stream error: {:?}", e);
165 return Err(e.to_string());
166 }
167 }
168 }
169
170 Ok(())
171 }
172
173 #[inline]
175 fn process_entry(
176 entry: Entry,
177 queue: &Arc<ArrayQueue<DexEvent>>,
178 event_type_filter: Option<&EventTypeFilter>,
179 ) {
180 let slot = entry.slot;
181 let recv_us = now_micros();
182
183 let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
185 Ok(e) => e,
186 Err(e) => {
187 log::debug!("Failed to deserialize entries: {}", e);
188 return;
189 }
190 };
191
192 let mut events = Vec::with_capacity(4);
194 let mut tx_index = 0u64;
195 for entry in entries {
196 for transaction in entry.transactions.iter() {
197 events.clear();
198 Self::process_transaction(
199 transaction,
200 slot,
201 recv_us,
202 tx_index,
203 queue,
204 event_type_filter,
205 &mut events,
206 );
207 tx_index += 1;
208 }
209 }
210 }
211
212 #[inline]
214 fn process_transaction(
215 transaction: &solana_sdk::transaction::VersionedTransaction,
216 slot: u64,
217 recv_us: i64,
218 tx_index: u64,
219 queue: &Arc<ArrayQueue<DexEvent>>,
220 event_type_filter: Option<&EventTypeFilter>,
221 events: &mut Vec<DexEvent>,
222 ) {
223 if transaction.signatures.is_empty() {
224 return;
225 }
226
227 let signature = transaction.signatures[0];
228 if let VersionedMessage::V0(m) = &transaction.message {
229 if !m.address_table_lookups.is_empty() {
230 log::debug!(
231 target: "sol_parser_sdk::shredstream",
232 "V0 tx uses address lookup tables; only static keys are available — \
233 some instruction account indices may resolve to wrong pubkeys (often only 1 BUY gets is_created_buy)"
234 );
235 }
236 }
237 super::pump_ix::parse_transaction_dex_events_with_filter(
239 transaction,
240 signature,
241 slot,
242 tx_index,
243 recv_us,
244 event_type_filter,
245 events,
246 );
247 crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(events);
248
249 for mut event in events.drain(..) {
250 if let Some(meta) = event.metadata_mut() {
251 meta.grpc_recv_us = recv_us;
252 }
253 let _ = queue.push(event);
254 }
255 }
256}