sol_parser_sdk/shredstream/
client.rs1#![allow(deprecated)]
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12use futures::StreamExt;
13use solana_entry::entry::Entry as SolanaEntry;
14use solana_sdk::message::VersionedMessage;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17use tonic::transport::{Channel, Endpoint};
18
19use crate::core::now_micros;
20use crate::grpc::types::EventTypeFilter;
21use crate::shredstream::config::ShredStreamConfig;
22use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
23use crate::DexEvent;
24
25static SHREDSTREAM_DROPPED_EVENTS: AtomicU64 = AtomicU64::new(0);
26
27enum EventSink<'a> {
28 Queue(&'a Arc<ArrayQueue<DexEvent>>),
29 Callback(&'a (dyn Fn(DexEvent) + Send + Sync)),
30}
31
32impl EventSink<'_> {
33 #[inline]
34 fn deliver(&self, event: DexEvent) {
35 match self {
36 EventSink::Queue(queue) => {
37 if queue.push(event).is_err() {
38 record_shredstream_dropped_event();
39 }
40 }
41 EventSink::Callback(callback) => callback(event),
42 }
43 }
44}
45
46#[inline]
47fn record_shredstream_dropped_event() -> u64 {
48 let dropped = SHREDSTREAM_DROPPED_EVENTS.fetch_add(1, Ordering::Relaxed) + 1;
49 if dropped <= 10 || dropped.is_power_of_two() {
50 log::warn!(
51 target: "sol_parser_sdk::shredstream",
52 "ShredStream event queue is full; dropped event count={}",
53 dropped
54 );
55 }
56 dropped
57}
58
59#[derive(Clone)]
61pub struct ShredStreamClient {
62 endpoint: String,
63 config: ShredStreamConfig,
64 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
65}
66
67impl ShredStreamClient {
68 pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
70 Self::new_with_config(endpoint, ShredStreamConfig::default()).await
71 }
72
73 pub async fn new_with_config(
75 endpoint: impl Into<String>,
76 config: ShredStreamConfig,
77 ) -> crate::common::AnyResult<Self> {
78 let endpoint = endpoint.into();
79 let _ = Self::connect_client(&endpoint, &config).await?;
81
82 Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
83 }
84
85 pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
89 self.subscribe_with_filter(None).await
90 }
91
92 pub async fn subscribe_with_filter(
96 &self,
97 event_type_filter: Option<EventTypeFilter>,
98 ) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
99 self.stop().await;
101
102 let queue = Arc::new(ArrayQueue::new(100_000));
103 let queue_clone = Arc::clone(&queue);
104
105 let endpoint = self.endpoint.clone();
106 let config = self.config.clone();
107
108 let handle = tokio::spawn(async move {
109 let mut delay = config.reconnect_delay_ms;
110 let mut attempts = 0u32;
111
112 loop {
113 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
114 log::error!("Max reconnection attempts reached, giving up");
115 break;
116 }
117 attempts += 1;
118
119 match Self::stream_events(
120 &endpoint,
121 &config,
122 event_type_filter.as_ref(),
123 EventSink::Queue(&queue_clone),
124 )
125 .await
126 {
127 Ok(_) => {
128 delay = config.reconnect_delay_ms;
129 attempts = 0;
130 }
131 Err(e) => {
132 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
133 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
134 delay = (delay * 2).min(60_000);
135 }
136 }
137 }
138 });
139
140 *self.subscription_handle.lock().await = Some(handle);
141 Ok(queue)
142 }
143
144 pub async fn subscribe_with_filter_callback<F>(
148 &self,
149 event_type_filter: Option<EventTypeFilter>,
150 callback: F,
151 ) -> crate::common::AnyResult<()>
152 where
153 F: Fn(DexEvent) + Send + Sync + 'static,
154 {
155 self.stop().await;
156
157 let endpoint = self.endpoint.clone();
158 let config = self.config.clone();
159 let callback = Arc::new(callback);
160
161 let handle = tokio::spawn(async move {
162 let mut delay = config.reconnect_delay_ms;
163 let mut attempts = 0u32;
164
165 loop {
166 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
167 log::error!("Max reconnection attempts reached, giving up");
168 break;
169 }
170 attempts += 1;
171
172 match Self::stream_events_callback(
173 &endpoint,
174 &config,
175 event_type_filter.as_ref(),
176 callback.clone(),
177 )
178 .await
179 {
180 Ok(_) => {
181 delay = config.reconnect_delay_ms;
182 attempts = 0;
183 }
184 Err(e) => {
185 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
186 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
187 delay = (delay * 2).min(60_000);
188 }
189 }
190 }
191 });
192
193 *self.subscription_handle.lock().await = Some(handle);
194 Ok(())
195 }
196
197 pub async fn stop(&self) {
199 if let Some(handle) = self.subscription_handle.lock().await.take() {
200 handle.abort();
201 }
202 }
203
204 async fn connect_client(
205 endpoint: &str,
206 config: &ShredStreamConfig,
207 ) -> crate::common::AnyResult<ShredstreamProxyClient<Channel>> {
208 let mut builder = Endpoint::from_shared(endpoint.to_string())?;
209 if config.connection_timeout_ms > 0 {
210 builder = builder.connect_timeout(Duration::from_millis(config.connection_timeout_ms));
211 }
212 let channel = builder.connect().await?;
213 Ok(ShredstreamProxyClient::new(channel)
214 .max_decoding_message_size(config.max_decoding_message_size))
215 }
216
217 async fn stream_events(
219 endpoint: &str,
220 config: &ShredStreamConfig,
221 event_type_filter: Option<&EventTypeFilter>,
222 sink: EventSink<'_>,
223 ) -> Result<(), String> {
224 let mut client = Self::connect_client(endpoint, config).await.map_err(|e| e.to_string())?;
225 let request = tonic::Request::new(SubscribeEntriesRequest {});
226 let response = if config.request_timeout_ms > 0 {
227 tokio::time::timeout(
228 Duration::from_millis(config.request_timeout_ms),
229 client.subscribe_entries(request),
230 )
231 .await
232 .map_err(|_| {
233 format!(
234 "ShredStream subscribe request timed out after {}ms",
235 config.request_timeout_ms
236 )
237 })?
238 .map_err(|e| e.to_string())?
239 } else {
240 client.subscribe_entries(request).await.map_err(|e| e.to_string())?
241 };
242 let mut stream = response.into_inner();
243
244 log::info!("ShredStream connected, receiving entries...");
245
246 let mut events = Vec::with_capacity(4);
247 while let Some(message) = stream.next().await {
248 match message {
249 Ok(entry) => {
250 Self::process_entry(entry, event_type_filter, &sink, &mut events);
251 }
252 Err(e) => {
253 log::error!("Stream error: {:?}", e);
254 return Err(e.to_string());
255 }
256 }
257 }
258
259 Ok(())
260 }
261
262 async fn stream_events_callback(
263 endpoint: &str,
264 config: &ShredStreamConfig,
265 event_type_filter: Option<&EventTypeFilter>,
266 callback: Arc<dyn Fn(DexEvent) + Send + Sync>,
267 ) -> Result<(), String> {
268 Self::stream_events(
269 endpoint,
270 config,
271 event_type_filter,
272 EventSink::Callback(callback.as_ref()),
273 )
274 .await
275 }
276
277 #[inline]
279 fn process_entry(
280 entry: Entry,
281 event_type_filter: Option<&EventTypeFilter>,
282 sink: &EventSink<'_>,
283 events: &mut Vec<DexEvent>,
284 ) {
285 let slot = entry.slot;
286 let recv_us = now_micros();
287
288 let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
290 Ok(e) => e,
291 Err(e) => {
292 log::debug!("Failed to deserialize entries: {}", e);
293 return;
294 }
295 };
296
297 let mut tx_index = 0u64;
299 for entry in entries {
300 for transaction in entry.transactions.iter() {
301 events.clear();
302 Self::process_transaction(
303 transaction,
304 slot,
305 recv_us,
306 tx_index,
307 event_type_filter,
308 events,
309 sink,
310 );
311 tx_index += 1;
312 }
313 }
314 }
315
316 #[inline]
318 fn process_transaction(
319 transaction: &solana_sdk::transaction::VersionedTransaction,
320 slot: u64,
321 recv_us: i64,
322 tx_index: u64,
323 event_type_filter: Option<&EventTypeFilter>,
324 events: &mut Vec<DexEvent>,
325 sink: &EventSink<'_>,
326 ) {
327 if transaction.signatures.is_empty() {
328 return;
329 }
330
331 Self::parse_transaction_events(
332 transaction,
333 slot,
334 recv_us,
335 tx_index,
336 event_type_filter,
337 events,
338 );
339
340 for event in events.drain(..) {
341 sink.deliver(event);
342 }
343 }
344
345 #[inline]
346 fn parse_transaction_events(
347 transaction: &solana_sdk::transaction::VersionedTransaction,
348 slot: u64,
349 recv_us: i64,
350 tx_index: u64,
351 event_type_filter: Option<&EventTypeFilter>,
352 events: &mut Vec<DexEvent>,
353 ) {
354 if transaction.signatures.is_empty() {
355 return;
356 }
357
358 let signature = transaction.signatures[0];
359 if let VersionedMessage::V0(m) = &transaction.message {
360 if !m.address_table_lookups.is_empty() {
361 log::trace!(
362 target: "sol_parser_sdk::shredstream",
363 "V0 tx uses address lookup tables; shred parser will use static accounts and default placeholders for ALT-loaded accounts"
364 );
365 }
366 }
367 super::pump_ix::parse_transaction_dex_events_with_filter(
369 transaction,
370 signature,
371 slot,
372 tx_index,
373 recv_us,
374 event_type_filter,
375 events,
376 );
377 crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(events);
378
379 for event in events.iter_mut() {
380 if let Some(meta) = event.metadata_mut() {
381 meta.grpc_recv_us = recv_us;
382 }
383 }
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::core::events::{EventMetadata, PumpFunCreateTokenEvent};
391 use crate::instr::program_ids::PUMPFUN_PROGRAM_ID;
392 use solana_sdk::hash::Hash;
393 use solana_sdk::message::{
394 compiled_instruction::CompiledInstruction, v0, MessageHeader, VersionedMessage,
395 };
396 use solana_sdk::pubkey::Pubkey;
397 use solana_sdk::signature::Signature;
398 use solana_sdk::transaction::VersionedTransaction;
399 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
400
401 fn push_string(data: &mut Vec<u8>, value: &str) {
402 data.extend_from_slice(&(value.len() as u32).to_le_bytes());
403 data.extend_from_slice(value.as_bytes());
404 }
405
406 fn pumpfun_create_data() -> Vec<u8> {
407 let mut data = Vec::new();
408 data.extend_from_slice(&[24, 30, 200, 40, 5, 28, 7, 119]);
409 push_string(&mut data, "Callback Test");
410 push_string(&mut data, "CBT");
411 push_string(&mut data, "https://example.invalid/callback.json");
412 data.extend_from_slice(Pubkey::new_unique().as_ref());
413 data
414 }
415
416 fn pumpfun_create_tx() -> VersionedTransaction {
417 let mut account_keys = (0..10).map(|_| Pubkey::new_unique()).collect::<Vec<_>>();
418 account_keys.push(PUMPFUN_PROGRAM_ID);
419
420 VersionedTransaction {
421 signatures: vec![Signature::default()],
422 message: VersionedMessage::V0(v0::Message {
423 header: MessageHeader {
424 num_required_signatures: 1,
425 num_readonly_signed_accounts: 0,
426 num_readonly_unsigned_accounts: 0,
427 },
428 account_keys,
429 recent_blockhash: Hash::default(),
430 instructions: vec![CompiledInstruction::new_from_raw_parts(
431 10,
432 pumpfun_create_data(),
433 (0..10).collect(),
434 )],
435 address_table_lookups: Vec::new(),
436 }),
437 }
438 }
439
440 #[test]
441 fn dropped_counter_increments_without_panicking() {
442 let before = SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed);
443 let queue = ArrayQueue::new(1);
444
445 queue
446 .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
447 metadata: EventMetadata::default(),
448 ..Default::default()
449 }))
450 .expect("first push fits");
451
452 if queue
453 .push(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
454 metadata: EventMetadata::default(),
455 ..Default::default()
456 }))
457 .is_err()
458 {
459 record_shredstream_dropped_event();
460 }
461
462 assert!(SHREDSTREAM_DROPPED_EVENTS.load(Ordering::Relaxed) >= before + 1);
463 }
464
465 #[test]
466 fn callback_path_delivers_events_without_queue() {
467 let entries = vec![SolanaEntry {
468 num_hashes: 1,
469 hash: Hash::default(),
470 transactions: vec![pumpfun_create_tx()],
471 }];
472 let entry = Entry { slot: 42, entries: bincode::serialize(&entries).unwrap() };
473 let count = AtomicUsize::new(0);
474
475 let mut events = Vec::with_capacity(4);
476 ShredStreamClient::process_entry(
477 entry,
478 None,
479 &EventSink::Callback(&|event| {
480 assert!(matches!(event, DexEvent::PumpFunCreate(_)));
481 assert_eq!(event.metadata().slot, 42);
482 count.fetch_add(1, AtomicOrdering::Relaxed);
483 }),
484 &mut events,
485 );
486
487 assert_eq!(count.load(AtomicOrdering::Relaxed), 1);
488 }
489}