sol_parser_sdk/shredstream/
client.rs1#![allow(deprecated)]
6
7use std::sync::Arc;
8
9use crossbeam_queue::ArrayQueue;
10use futures::StreamExt;
11use solana_entry::entry::Entry as SolanaEntry;
12use solana_sdk::message::VersionedMessage;
13use tokio::sync::Mutex;
14use tokio::task::JoinHandle;
15
16use crate::core::now_micros;
17use crate::shredstream::config::ShredStreamConfig;
18use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
19use crate::DexEvent;
20
21#[derive(Clone)]
23pub struct ShredStreamClient {
24 endpoint: String,
25 config: ShredStreamConfig,
26 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
27}
28
29impl ShredStreamClient {
30 pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
32 Self::new_with_config(endpoint, ShredStreamConfig::default()).await
33 }
34
35 pub async fn new_with_config(
37 endpoint: impl Into<String>,
38 config: ShredStreamConfig,
39 ) -> crate::common::AnyResult<Self> {
40 let endpoint = endpoint.into();
41 let _ = ShredstreamProxyClient::connect(endpoint.clone()).await?;
43
44 Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
45 }
46
47 pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
51 self.stop().await;
53
54 let queue = Arc::new(ArrayQueue::new(100_000));
55 let queue_clone = Arc::clone(&queue);
56
57 let endpoint = self.endpoint.clone();
58 let config = self.config.clone();
59
60 let handle = tokio::spawn(async move {
61 let mut delay = config.reconnect_delay_ms;
62 let mut attempts = 0u32;
63
64 loop {
65 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
66 log::error!("Max reconnection attempts reached, giving up");
67 break;
68 }
69 attempts += 1;
70
71 match Self::stream_events(&endpoint, &queue_clone).await {
72 Ok(_) => {
73 delay = config.reconnect_delay_ms;
74 attempts = 0;
75 }
76 Err(e) => {
77 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
78 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
79 delay = (delay * 2).min(60_000);
80 }
81 }
82 }
83 });
84
85 *self.subscription_handle.lock().await = Some(handle);
86 Ok(queue)
87 }
88
89 pub async fn stop(&self) {
91 if let Some(handle) = self.subscription_handle.lock().await.take() {
92 handle.abort();
93 }
94 }
95
96 async fn stream_events(
98 endpoint: &str,
99 queue: &Arc<ArrayQueue<DexEvent>>,
100 ) -> Result<(), String> {
101 let mut client = ShredstreamProxyClient::connect(endpoint.to_string())
102 .await
103 .map_err(|e| e.to_string())?;
104 let request = tonic::Request::new(SubscribeEntriesRequest {});
105 let mut stream =
106 client.subscribe_entries(request).await.map_err(|e| e.to_string())?.into_inner();
107
108 log::info!("ShredStream connected, receiving entries...");
109
110 while let Some(message) = stream.next().await {
111 match message {
112 Ok(entry) => {
113 Self::process_entry(entry, queue);
114 }
115 Err(e) => {
116 log::error!("Stream error: {:?}", e);
117 return Err(e.to_string());
118 }
119 }
120 }
121
122 Ok(())
123 }
124
125 #[inline]
127 fn process_entry(entry: Entry, queue: &Arc<ArrayQueue<DexEvent>>) {
128 let slot = entry.slot;
129 let recv_us = now_micros();
130
131 let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
133 Ok(e) => e,
134 Err(e) => {
135 log::debug!("Failed to deserialize entries: {}", e);
136 return;
137 }
138 };
139
140 for entry in entries {
142 for (tx_index, transaction) in entry.transactions.iter().enumerate() {
143 Self::process_transaction(transaction, slot, recv_us, tx_index as u64, queue);
144 }
145 }
146 }
147
148 #[inline]
150 fn process_transaction(
151 transaction: &solana_sdk::transaction::VersionedTransaction,
152 slot: u64,
153 recv_us: i64,
154 tx_index: u64,
155 queue: &Arc<ArrayQueue<DexEvent>>,
156 ) {
157 if transaction.signatures.is_empty() {
158 return;
159 }
160
161 let signature = transaction.signatures[0];
162 if let VersionedMessage::V0(m) = &transaction.message {
163 if !m.address_table_lookups.is_empty() {
164 log::debug!(
165 target: "sol_parser_sdk::shredstream",
166 "V0 tx uses address lookup tables; only static keys are available — \
167 some instruction account indices may resolve to wrong pubkeys (often only 1 BUY gets is_created_buy)"
168 );
169 }
170 }
171 let mut events = Vec::new();
173 super::pump_ix::parse_transaction_pump_events(
174 transaction,
175 signature,
176 slot,
177 tx_index,
178 recv_us,
179 &mut events,
180 );
181 crate::core::pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge(&mut events);
182
183 for mut event in events {
184 if let Some(meta) = event.metadata_mut() {
185 meta.grpc_recv_us = recv_us;
186 }
187 let _ = queue.push(event);
188 }
189 }
190}