sol_parser_sdk/shredstream/
client.rs1use std::sync::Arc;
4
5use crossbeam_queue::ArrayQueue;
6use futures::StreamExt;
7use solana_entry::entry::Entry as SolanaEntry;
8use solana_sdk::message::VersionedMessage;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11
12use crate::core::now_micros;
13use crate::shredstream::config::ShredStreamConfig;
14use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
15use crate::DexEvent;
16
17#[derive(Clone)]
19pub struct ShredStreamClient {
20 endpoint: String,
21 config: ShredStreamConfig,
22 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
23}
24
25impl ShredStreamClient {
26 pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
28 Self::new_with_config(endpoint, ShredStreamConfig::default()).await
29 }
30
31 pub async fn new_with_config(
33 endpoint: impl Into<String>,
34 config: ShredStreamConfig,
35 ) -> crate::common::AnyResult<Self> {
36 let endpoint = endpoint.into();
37 let _ = ShredstreamProxyClient::connect(endpoint.clone()).await?;
39
40 Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
41 }
42
43 pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
47 self.stop().await;
49
50 let queue = Arc::new(ArrayQueue::new(100_000));
51 let queue_clone = Arc::clone(&queue);
52
53 let endpoint = self.endpoint.clone();
54 let config = self.config.clone();
55
56 let handle = tokio::spawn(async move {
57 let mut delay = config.reconnect_delay_ms;
58 let mut attempts = 0u32;
59
60 loop {
61 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
62 log::error!("Max reconnection attempts reached, giving up");
63 break;
64 }
65 attempts += 1;
66
67 match Self::stream_events(&endpoint, &queue_clone).await {
68 Ok(_) => {
69 delay = config.reconnect_delay_ms;
70 attempts = 0;
71 }
72 Err(e) => {
73 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
74 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
75 delay = (delay * 2).min(60_000);
76 }
77 }
78 }
79 });
80
81 *self.subscription_handle.lock().await = Some(handle);
82 Ok(queue)
83 }
84
85 pub async fn stop(&self) {
87 if let Some(handle) = self.subscription_handle.lock().await.take() {
88 handle.abort();
89 }
90 }
91
92 async fn stream_events(
94 endpoint: &str,
95 queue: &Arc<ArrayQueue<DexEvent>>,
96 ) -> Result<(), String> {
97 let mut client = ShredstreamProxyClient::connect(endpoint.to_string())
98 .await
99 .map_err(|e| e.to_string())?;
100 let request = tonic::Request::new(SubscribeEntriesRequest {});
101 let mut stream =
102 client.subscribe_entries(request).await.map_err(|e| e.to_string())?.into_inner();
103
104 log::info!("ShredStream connected, receiving entries...");
105
106 while let Some(message) = stream.next().await {
107 match message {
108 Ok(entry) => {
109 Self::process_entry(entry, queue);
110 }
111 Err(e) => {
112 log::error!("Stream error: {:?}", e);
113 return Err(e.to_string());
114 }
115 }
116 }
117
118 Ok(())
119 }
120
121 #[inline]
123 fn process_entry(entry: Entry, queue: &Arc<ArrayQueue<DexEvent>>) {
124 let slot = entry.slot;
125 let recv_us = now_micros();
126
127 let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
129 Ok(e) => e,
130 Err(e) => {
131 log::debug!("Failed to deserialize entries: {}", e);
132 return;
133 }
134 };
135
136 for entry in entries {
138 for (tx_index, transaction) in entry.transactions.iter().enumerate() {
139 Self::process_transaction(transaction, slot, recv_us, tx_index as u64, queue);
140 }
141 }
142 }
143
144 #[inline]
146 fn process_transaction(
147 transaction: &solana_sdk::transaction::VersionedTransaction,
148 slot: u64,
149 recv_us: i64,
150 tx_index: u64,
151 queue: &Arc<ArrayQueue<DexEvent>>,
152 ) {
153 if transaction.signatures.is_empty() {
154 return;
155 }
156
157 let signature = transaction.signatures[0];
158 if let VersionedMessage::V0(m) = &transaction.message {
159 if !m.address_table_lookups.is_empty() {
160 log::debug!(
161 target: "sol_parser_sdk::shredstream",
162 "V0 tx uses address lookup tables; only static keys are available — \
163 some instruction account indices may resolve to wrong pubkeys (often only 1 BUY gets is_created_buy)"
164 );
165 }
166 }
167 let mut events = Vec::new();
169 super::pump_ix::parse_transaction_pump_events(
170 transaction,
171 signature,
172 slot,
173 tx_index,
174 recv_us,
175 &mut events,
176 );
177 crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(&mut events);
178
179 for mut event in events {
180 if let Some(meta) = event.metadata_mut() {
181 meta.grpc_recv_us = recv_us;
182 }
183 let _ = queue.push(event);
184 }
185 }
186}